• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.5
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.sync.AsyncInternal.AsyncMarker;
24
import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal;
25

26
import com.google.common.base.Joiner;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.nexusrpc.ServiceDefinition;
31
import io.temporal.activity.ActivityOptions;
32
import io.temporal.activity.LocalActivityOptions;
33
import io.temporal.api.common.v1.Payload;
34
import io.temporal.api.common.v1.SearchAttributes;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.common.RetryOptions;
37
import io.temporal.common.SearchAttributeUpdate;
38
import io.temporal.common.converter.DataConverter;
39
import io.temporal.common.interceptors.Header;
40
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
41
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
42
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
43
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
44
import io.temporal.internal.WorkflowThreadMarker;
45
import io.temporal.internal.common.ActivityOptionUtils;
46
import io.temporal.internal.common.NonIdempotentHandle;
47
import io.temporal.internal.common.SearchAttributesUtil;
48
import io.temporal.internal.logging.ReplayAwareLogger;
49
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
50
import io.temporal.serviceclient.CheckedExceptionWrapper;
51
import io.temporal.workflow.*;
52
import io.temporal.workflow.Functions.Func;
53
import java.lang.reflect.*;
54
import java.time.Duration;
55
import java.util.*;
56
import java.util.function.BiPredicate;
57
import java.util.function.Supplier;
58
import javax.annotation.Nonnull;
59
import javax.annotation.Nullable;
60
import org.slf4j.Logger;
61
import org.slf4j.LoggerFactory;
62

63
/**
64
 * Never reference directly. It is public only because Java doesn't have internal package support.
65
 */
66
public final class WorkflowInternal {
67
  public static final int DEFAULT_VERSION = -1;
68

69
  public static @Nonnull WorkflowThread newWorkflowMethodThread(Runnable runnable, String name) {
70
    Object workflowThread =
71
        currentThreadInternal()
1✔
72
            .getWorkflowContext()
1✔
73
            .getWorkflowInboundInterceptor()
1✔
74
            .newWorkflowMethodThread(runnable, name);
1✔
75
    Preconditions.checkState(
1!
76
        workflowThread != null,
77
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result to null. "
78
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract.");
79
    Preconditions.checkState(
1✔
80
        workflowThread instanceof WorkflowThread,
81
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result. "
82
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract. "
83
            + "Illegal object returned from the interceptors chain: %s",
84
        workflowThread);
85
    return (WorkflowThread) workflowThread;
1✔
86
  }
87

88
  public static Promise<Void> newTimer(Duration duration) {
89
    assertNotReadOnly("schedule timer");
1✔
90
    return getWorkflowOutboundInterceptor().newTimer(duration);
1✔
91
  }
92

93
  public static Promise<Void> newTimer(Duration duration, TimerOptions options) {
94
    assertNotReadOnly("schedule timer");
×
95
    return getWorkflowOutboundInterceptor().newTimer(duration, options);
×
96
  }
97

98
  /**
99
   * @param capacity the maximum size of the queue
100
   * @return new instance of {@link WorkflowQueue}
101
   * @deprecated this method created a deprecated implementation of the queue that has some methods
102
   *     implemented incorrectly. Please use {@link #newWorkflowQueue(int)} instead.
103
   */
104
  @Deprecated
105
  public static <E> WorkflowQueue<E> newQueue(int capacity) {
106
    return new WorkflowQueueDeprecatedImpl<>(capacity);
1✔
107
  }
108

109
  /**
110
   * Creates a {@link WorkflowQueue} implementation that can be used from workflow code.
111
   *
112
   * @param capacity the maximum size of the queue
113
   * @return new instance of {@link WorkflowQueue}
114
   */
115
  public static <E> WorkflowQueue<E> newWorkflowQueue(int capacity) {
116
    return new WorkflowQueueImpl<>(capacity);
1✔
117
  }
118

119
  public static WorkflowLock newWorkflowLock() {
120
    return new WorkflowLockImpl();
1✔
121
  }
122

123
  public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
124
    return new WorkflowSemaphoreImpl(permits);
1✔
125
  }
126

127
  public static <E> CompletablePromise<E> newCompletablePromise() {
128
    return new CompletablePromiseImpl<>();
1✔
129
  }
130

131
  public static <E> Promise<E> newPromise(E value) {
132
    CompletablePromise<E> result = Workflow.newPromise();
1✔
133
    result.complete(value);
1✔
134
    return result;
1✔
135
  }
136

137
  public static <E> Promise<E> newFailedPromise(Exception failure) {
138
    CompletablePromise<E> result = new CompletablePromiseImpl<>();
1✔
139
    result.completeExceptionally(CheckedExceptionWrapper.wrap(failure));
1✔
140
    return result;
1✔
141
  }
142

143
  /**
144
   * Register query or queries implementation object. There is no need to register top level
145
   * workflow implementation object as it is done implicitly. Only methods annotated with @{@link
146
   * QueryMethod} are registered. TODO(quinn) LIES!
147
   */
148
  public static void registerListener(Object implementation) {
149
    if (implementation instanceof DynamicSignalHandler) {
1✔
150
      getWorkflowOutboundInterceptor()
1✔
151
          .registerDynamicSignalHandler(
1✔
152
              new WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput(
153
                  (DynamicSignalHandler) implementation));
154
      return;
1✔
155
    }
156
    if (implementation instanceof DynamicQueryHandler) {
1✔
157
      getWorkflowOutboundInterceptor()
1✔
158
          .registerDynamicQueryHandler(
1✔
159
              new WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput(
160
                  (DynamicQueryHandler) implementation));
161
      return;
1✔
162
    }
163
    if (implementation instanceof DynamicUpdateHandler) {
1✔
164
      getWorkflowOutboundInterceptor()
1✔
165
          .registerDynamicUpdateHandler(
1✔
166
              new WorkflowOutboundCallsInterceptor.RegisterDynamicUpdateHandlerInput(
167
                  (DynamicUpdateHandler) implementation));
168
      return;
1✔
169
    }
170
    Class<?> cls = implementation.getClass();
1✔
171
    POJOWorkflowImplMetadata workflowMetadata = POJOWorkflowImplMetadata.newListenerInstance(cls);
1✔
172
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getQueryMethods()) {
1✔
173
      Method method = methodMetadata.getWorkflowMethod();
1✔
174
      getWorkflowOutboundInterceptor()
1✔
175
          .registerQuery(
1✔
176
              new WorkflowOutboundCallsInterceptor.RegisterQueryInput(
177
                  methodMetadata.getName(),
1✔
178
                  methodMetadata.getDescription(),
1✔
179
                  method.getParameterTypes(),
1✔
180
                  method.getGenericParameterTypes(),
1✔
181
                  (args) -> {
182
                    try {
183
                      return method.invoke(implementation, args);
1✔
184
                    } catch (Throwable e) {
×
185
                      throw CheckedExceptionWrapper.wrap(e);
×
186
                    }
187
                  }));
188
    }
1✔
189
    List<WorkflowOutboundCallsInterceptor.SignalRegistrationRequest> requests = new ArrayList<>();
1✔
190
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getSignalMethods()) {
1✔
191
      Method method = methodMetadata.getWorkflowMethod();
1✔
192
      SignalMethod signalMethod = method.getAnnotation(SignalMethod.class);
1✔
193
      requests.add(
1✔
194
          new WorkflowOutboundCallsInterceptor.SignalRegistrationRequest(
195
              methodMetadata.getName(),
1✔
196
              methodMetadata.getDescription(),
1✔
197
              signalMethod.unfinishedPolicy(),
1✔
198
              method.getParameterTypes(),
1✔
199
              method.getGenericParameterTypes(),
1✔
200
              (args) -> {
201
                try {
202
                  method.invoke(implementation, args);
1✔
203
                } catch (Throwable e) {
1✔
204
                  throw CheckedExceptionWrapper.wrap(e);
1✔
205
                }
1✔
206
              }));
1✔
207
    }
1✔
208
    if (!requests.isEmpty()) {
1✔
209
      getWorkflowOutboundInterceptor()
1✔
210
          .registerSignalHandlers(
1✔
211
              new WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput(requests));
212
    }
213

214
    // Get all validators and lazily assign them to update handlers as we see them.
215
    Map<String, POJOWorkflowMethodMetadata> validators =
1✔
216
        new HashMap<>(workflowMetadata.getUpdateValidatorMethods().size());
1✔
217
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateValidatorMethods()) {
1✔
218
      Method method = methodMetadata.getWorkflowMethod();
1✔
219
      UpdateValidatorMethod updateValidatorMethod =
1✔
220
          method.getAnnotation(UpdateValidatorMethod.class);
1✔
221
      if (validators.containsKey(updateValidatorMethod.updateName())) {
1!
222
        throw new IllegalArgumentException(
×
223
            "Duplicate validator for update handle " + updateValidatorMethod.updateName());
×
224
      }
225
      validators.put(updateValidatorMethod.updateName(), methodMetadata);
1✔
226
    }
1✔
227

228
    List<WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest> updateRequests =
1✔
229
        new ArrayList<>();
230
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateMethods()) {
1✔
231
      Method method = methodMetadata.getWorkflowMethod();
1✔
232
      UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
1✔
233
      // Get the update name, defaulting to the method name if not specified.
234
      String updateMethodName = updateMethod.name();
1✔
235
      if (updateMethodName.isEmpty()) {
1✔
236
        updateMethodName = method.getName();
1✔
237
      }
238
      // Check if any validators claim they are the validator for this update
239
      POJOWorkflowMethodMetadata validatorMethodMetadata = validators.remove(updateMethodName);
1✔
240
      Method validatorMethod;
241
      if (validatorMethodMetadata != null) {
1✔
242
        validatorMethod = validatorMethodMetadata.getWorkflowMethod();
1✔
243
        if (!Arrays.equals(validatorMethod.getParameterTypes(), method.getParameterTypes())) {
1!
244
          throw new IllegalArgumentException(
×
245
              "Validator for: "
246
                  + updateMethodName
247
                  + " type parameters do not match the update handle");
248
        }
249
      } else {
250
        validatorMethod = null;
1✔
251
      }
252
      updateRequests.add(
1✔
253
          new WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest(
254
              methodMetadata.getName(),
1✔
255
              methodMetadata.getDescription(),
1✔
256
              updateMethod.unfinishedPolicy(),
1✔
257
              method.getParameterTypes(),
1✔
258
              method.getGenericParameterTypes(),
1✔
259
              (args) -> {
260
                try {
261
                  if (validatorMethod != null) {
1✔
262
                    validatorMethod.invoke(implementation, args);
1✔
263
                  }
264
                } catch (Throwable e) {
1✔
265
                  throw CheckedExceptionWrapper.wrap(e);
1✔
266
                }
1✔
267
              },
1✔
268
              (args) -> {
269
                try {
270
                  return method.invoke(implementation, args);
1✔
271
                } catch (Throwable e) {
1✔
272
                  throw CheckedExceptionWrapper.wrap(e);
1✔
273
                }
274
              }));
275
    }
1✔
276
    if (!updateRequests.isEmpty()) {
1✔
277
      getWorkflowOutboundInterceptor()
1✔
278
          .registerUpdateHandlers(
1✔
279
              new WorkflowOutboundCallsInterceptor.RegisterUpdateHandlersInput(updateRequests));
280
    }
281
    if (!validators.isEmpty()) {
1!
282
      throw new IllegalArgumentException(
×
283
          "Missing update methods for update validator(s): "
284
              + Joiner.on(", ").join(validators.keySet()));
×
285
    }
286
  }
1✔
287

288
  /** Should be used to get current time instead of {@link System#currentTimeMillis()} */
289
  public static long currentTimeMillis() {
290
    return getWorkflowOutboundInterceptor().currentTimeMillis();
1✔
291
  }
292

293
  public static void setDefaultActivityOptions(ActivityOptions activityOptions) {
294
    getRootWorkflowContext().setDefaultActivityOptions(activityOptions);
1✔
295
  }
1✔
296

297
  public static void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOptions) {
298
    getRootWorkflowContext().applyActivityOptions(activityTypeToOptions);
1✔
299
  }
1✔
300

301
  public static void setDefaultLocalActivityOptions(LocalActivityOptions localActivityOptions) {
302
    getRootWorkflowContext().setDefaultLocalActivityOptions(localActivityOptions);
1✔
303
  }
1✔
304

305
  public static void applyLocalActivityOptions(
306
      Map<String, LocalActivityOptions> activityTypeToOptions) {
307
    getRootWorkflowContext().applyLocalActivityOptions(activityTypeToOptions);
1✔
308
  }
1✔
309

310
  /**
311
   * Creates client stub to activities that implement given interface.
312
   *
313
   * @param activityInterface interface type implemented by activities
314
   * @param options options that together with the properties of {@link
315
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
316
   * @param activityMethodOptions activity method-specific invocation parameters
317
   */
318
  public static <T> T newActivityStub(
319
      Class<T> activityInterface,
320
      ActivityOptions options,
321
      Map<String, ActivityOptions> activityMethodOptions) {
322
    // Merge the activity options we may have received from the workflow with the options we may
323
    // have received in WorkflowImplementationOptions.
324
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
325
    options = (options == null) ? context.getDefaultActivityOptions() : options;
1✔
326

327
    Map<String, ActivityOptions> mergedActivityOptionsMap;
328
    @Nonnull Map<String, ActivityOptions> predefinedActivityOptions = context.getActivityOptions();
1✔
329
    if (activityMethodOptions != null
1!
330
        && !activityMethodOptions.isEmpty()
×
331
        && predefinedActivityOptions.isEmpty()) {
×
332
      // we need to merge only in this case
333
      mergedActivityOptionsMap = new HashMap<>(predefinedActivityOptions);
×
334
      ActivityOptionUtils.mergePredefinedActivityOptions(
×
335
          mergedActivityOptionsMap, activityMethodOptions);
336
    } else {
337
      mergedActivityOptionsMap =
1✔
338
          MoreObjects.firstNonNull(
1✔
339
              activityMethodOptions,
340
              MoreObjects.firstNonNull(predefinedActivityOptions, Collections.emptyMap()));
1✔
341
    }
342

343
    InvocationHandler invocationHandler =
1✔
344
        ActivityInvocationHandler.newInstance(
1✔
345
            activityInterface,
346
            options,
347
            mergedActivityOptionsMap,
348
            context.getWorkflowOutboundInterceptor(),
1✔
349
            () -> assertNotReadOnly("schedule activity"));
1✔
350
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
351
  }
352

353
  /**
354
   * Creates client stub to local activities that implement given interface.
355
   *
356
   * @param activityInterface interface type implemented by activities
357
   * @param options options that together with the properties of {@link
358
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
359
   * @param activityMethodOptions activity method-specific invocation parameters
360
   */
361
  public static <T> T newLocalActivityStub(
362
      Class<T> activityInterface,
363
      LocalActivityOptions options,
364
      @Nullable Map<String, LocalActivityOptions> activityMethodOptions) {
365
    // Merge the activity options we may have received from the workflow with the options we may
366
    // have received in WorkflowImplementationOptions.
367
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
368
    options = (options == null) ? context.getDefaultLocalActivityOptions() : options;
1✔
369

370
    Map<String, LocalActivityOptions> mergedLocalActivityOptionsMap;
371
    @Nonnull
372
    Map<String, LocalActivityOptions> predefinedLocalActivityOptions =
1✔
373
        context.getLocalActivityOptions();
1✔
374
    if (activityMethodOptions != null
1!
375
        && !activityMethodOptions.isEmpty()
×
376
        && predefinedLocalActivityOptions.isEmpty()) {
×
377
      // we need to merge only in this case
378
      mergedLocalActivityOptionsMap = new HashMap<>(predefinedLocalActivityOptions);
×
379
      ActivityOptionUtils.mergePredefinedLocalActivityOptions(
×
380
          mergedLocalActivityOptionsMap, activityMethodOptions);
381
    } else {
382
      mergedLocalActivityOptionsMap =
1✔
383
          MoreObjects.firstNonNull(
1✔
384
              activityMethodOptions,
385
              MoreObjects.firstNonNull(predefinedLocalActivityOptions, Collections.emptyMap()));
1✔
386
    }
387

388
    InvocationHandler invocationHandler =
1✔
389
        LocalActivityInvocationHandler.newInstance(
1✔
390
            activityInterface,
391
            options,
392
            mergedLocalActivityOptionsMap,
393
            WorkflowInternal.getWorkflowOutboundInterceptor(),
1✔
394
            () -> assertNotReadOnly("schedule local activity"));
1✔
395
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
396
  }
397

398
  public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
399
    return ActivityStubImpl.newInstance(
1✔
400
        options, getWorkflowOutboundInterceptor(), () -> assertNotReadOnly("schedule activity"));
1✔
401
  }
402

403
  public static ActivityStub newUntypedLocalActivityStub(LocalActivityOptions options) {
404
    return LocalActivityStubImpl.newInstance(
1✔
405
        options,
406
        getWorkflowOutboundInterceptor(),
1✔
407
        () -> assertNotReadOnly("schedule local activity"));
1✔
408
  }
409

410
  @SuppressWarnings("unchecked")
411
  public static <T> T newChildWorkflowStub(
412
      Class<T> workflowInterface, ChildWorkflowOptions options) {
413
    return (T)
1✔
414
        Proxy.newProxyInstance(
1✔
415
            workflowInterface.getClassLoader(),
1✔
416
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
417
            new ChildWorkflowInvocationHandler(
418
                workflowInterface,
419
                options,
420
                getWorkflowOutboundInterceptor(),
1✔
421
                WorkflowInternal::assertNotReadOnly));
422
  }
423

424
  @SuppressWarnings("unchecked")
425
  public static <T> T newExternalWorkflowStub(
426
      Class<T> workflowInterface, WorkflowExecution execution) {
427
    return (T)
1✔
428
        Proxy.newProxyInstance(
1✔
429
            workflowInterface.getClassLoader(),
1✔
430
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
431
            new ExternalWorkflowInvocationHandler(
432
                workflowInterface,
433
                execution,
434
                getWorkflowOutboundInterceptor(),
1✔
435
                WorkflowInternal::assertNotReadOnly));
436
  }
437

438
  public static Promise<WorkflowExecution> getWorkflowExecution(Object workflowStub) {
439
    if (workflowStub instanceof StubMarker) {
1!
440
      Object stub = ((StubMarker) workflowStub).__getUntypedStub();
1✔
441
      return ((ChildWorkflowStub) stub).getExecution();
1✔
442
    }
443
    throw new IllegalArgumentException(
×
444
        "Not a workflow stub created through Workflow.newChildWorkflowStub: " + workflowStub);
445
  }
446

447
  public static ChildWorkflowStub newUntypedChildWorkflowStub(
448
      String workflowType, ChildWorkflowOptions options) {
449
    return new ChildWorkflowStubImpl(
1✔
450
        workflowType,
451
        options,
452
        getWorkflowOutboundInterceptor(),
1✔
453
        WorkflowInternal::assertNotReadOnly);
454
  }
455

456
  public static ExternalWorkflowStub newUntypedExternalWorkflowStub(WorkflowExecution execution) {
457
    return new ExternalWorkflowStubImpl(
1✔
458
        execution, getWorkflowOutboundInterceptor(), WorkflowInternal::assertNotReadOnly);
1✔
459
  }
460

461
  /**
462
   * Creates client stub that can be used to continue this workflow as new.
463
   *
464
   * @param workflowInterface interface type implemented by the next generation of workflow
465
   */
466
  @SuppressWarnings("unchecked")
467
  public static <T> T newContinueAsNewStub(
468
      Class<T> workflowInterface, ContinueAsNewOptions options) {
469
    return (T)
1✔
470
        Proxy.newProxyInstance(
1✔
471
            workflowInterface.getClassLoader(),
1✔
472
            new Class<?>[] {workflowInterface},
473
            new ContinueAsNewWorkflowInvocationHandler(
474
                workflowInterface, options, getWorkflowOutboundInterceptor()));
1✔
475
  }
476

477
  /**
478
   * Execute activity by name.
479
   *
480
   * @param name name of the activity
481
   * @param resultClass activity return type
482
   * @param args list of activity arguments
483
   * @param <R> activity return type
484
   * @return activity result
485
   */
486
  public static <R> R executeActivity(
487
      String name, ActivityOptions options, Class<R> resultClass, Type resultType, Object... args) {
488
    assertNotReadOnly("schedule activity");
×
489
    Promise<R> result =
490
        getWorkflowOutboundInterceptor()
×
491
            .executeActivity(
×
492
                new WorkflowOutboundCallsInterceptor.ActivityInput<>(
493
                    name, resultClass, resultType, args, options, Header.empty()))
×
494
            .getResult();
×
495
    if (AsyncInternal.isAsync()) {
×
496
      AsyncInternal.setAsyncResult(result);
×
497
      return null; // ignored
×
498
    }
499
    return result.get();
×
500
  }
501

502
  public static void await(String reason, Supplier<Boolean> unblockCondition)
503
      throws DestroyWorkflowThreadError {
504
    assertNotReadOnly(reason);
1✔
505
    getWorkflowOutboundInterceptor()
1✔
506
        .await(
1✔
507
            reason,
508
            () -> {
509
              getRootWorkflowContext().setReadOnly(true);
1✔
510
              try {
511
                return unblockCondition.get();
1✔
512
              } finally {
513
                getRootWorkflowContext().setReadOnly(false);
1✔
514
              }
515
            });
516
  }
1✔
517

518
  public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
519
      throws DestroyWorkflowThreadError {
520
    assertNotReadOnly(reason);
1✔
521
    return getWorkflowOutboundInterceptor()
1✔
522
        .await(
1✔
523
            timeout,
524
            reason,
525
            () -> {
526
              getRootWorkflowContext().setReadOnly(true);
1✔
527
              try {
528
                return unblockCondition.get();
1✔
529
              } finally {
530
                getRootWorkflowContext().setReadOnly(false);
1✔
531
              }
532
            });
533
  }
534

535
  public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
536
    assertNotReadOnly("side effect");
1✔
537
    return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func);
1✔
538
  }
539

540
  public static <R> R mutableSideEffect(
541
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
542
    assertNotReadOnly("mutable side effect");
1✔
543
    return getWorkflowOutboundInterceptor()
1✔
544
        .mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
545
  }
546

547
  public static int getVersion(String changeId, int minSupported, int maxSupported) {
548
    assertNotReadOnly("get version");
1✔
549
    return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
1✔
550
  }
551

552
  public static <V> Promise<Void> promiseAllOf(Iterable<Promise<V>> promises) {
553
    return new AllOfPromise(promises);
1✔
554
  }
555

556
  public static Promise<Void> promiseAllOf(Promise<?>... promises) {
557
    return new AllOfPromise(promises);
1✔
558
  }
559

560
  public static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
561
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
562
  }
563

564
  public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
565
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
566
  }
567

568
  public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
569
    return new CancellationScopeImpl(detached, runnable);
1✔
570
  }
571

572
  public static CancellationScope newCancellationScope(
573
      boolean detached, Functions.Proc1<CancellationScope> proc) {
574
    return new CancellationScopeImpl(detached, proc);
1✔
575
  }
576

577
  public static CancellationScopeImpl currentCancellationScope() {
578
    return CancellationScopeImpl.current();
1✔
579
  }
580

581
  public static RuntimeException wrap(Throwable e) {
582
    return CheckedExceptionWrapper.wrap(e);
1✔
583
  }
584

585
  public static Throwable unwrap(Throwable e) {
586
    return CheckedExceptionWrapper.unwrap(e);
1✔
587
  }
588

589
  /** Returns false if not under workflow code. */
590
  public static boolean isReplaying() {
591
    Optional<WorkflowThread> thread = DeterministicRunnerImpl.currentThreadInternalIfPresent();
1✔
592
    return thread.isPresent() && getRootWorkflowContext().isReplaying();
1✔
593
  }
594

595
  public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
596
    Payload memo = getRootWorkflowContext().getReplayContext().getMemo(key);
1✔
597
    if (memo == null) {
1✔
598
      return null;
1✔
599
    }
600

601
    return getDataConverterWithCurrentWorkflowContext().fromPayload(memo, valueClass, genericType);
1✔
602
  }
603

604
  public static <R> R retry(
605
      RetryOptions options, Optional<Duration> expiration, Functions.Func<R> fn) {
606
    assertNotReadOnly("retry");
1✔
607
    return WorkflowRetryerInternal.retry(
1✔
608
        options.toBuilder().validateBuildWithDefaults(), expiration, fn);
1✔
609
  }
610

611
  public static void continueAsNew(
612
      @Nullable String workflowType, @Nullable ContinueAsNewOptions options, Object[] args) {
613
    assertNotReadOnly("continue as new");
1✔
614
    assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
1✔
615
    getWorkflowOutboundInterceptor()
1✔
616
        .continueAsNew(
×
617
            new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
618
                workflowType, options, args, Header.empty()));
1✔
619
  }
×
620

621
  public static void continueAsNew(
622
      @Nullable String workflowType,
623
      @Nullable ContinueAsNewOptions options,
624
      Object[] args,
625
      WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
626
    assertNotReadOnly("continue as new");
1✔
627
    assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
1✔
628
    outboundCallsInterceptor.continueAsNew(
1✔
629
        new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
630
            workflowType, options, args, Header.empty()));
1✔
631
  }
×
632

633
  public static Promise<Void> cancelWorkflow(WorkflowExecution execution) {
634
    assertNotReadOnly("cancel workflow");
×
635
    return getWorkflowOutboundInterceptor()
×
636
        .cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
×
637
        .getResult();
×
638
  }
639

640
  public static void sleep(Duration duration) {
641
    assertNotReadOnly("sleep");
1✔
642
    getWorkflowOutboundInterceptor().sleep(duration);
1✔
643
  }
1✔
644

645
  public static boolean isWorkflowThread() {
646
    return WorkflowThreadMarker.isWorkflowThread();
1✔
647
  }
648

649
  public static <T> T deadlockDetectorOff(Functions.Func<T> func) {
650
    if (isWorkflowThread()) {
1✔
651
      try (NonIdempotentHandle ignored = getWorkflowThread().lockDeadlockDetector()) {
1✔
652
        return func.apply();
1✔
653
      }
654
    } else {
655
      return func.apply();
1✔
656
    }
657
  }
658

659
  public static WorkflowInfo getWorkflowInfo() {
660
    return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
1✔
661
  }
662

663
  public static Optional<UpdateInfo> getCurrentUpdateInfo() {
664
    return getRootWorkflowContext().getCurrentUpdateInfo();
1✔
665
  }
666

667
  public static Scope getMetricsScope() {
668
    return getWorkflowOutboundInterceptor().getMetricsScope();
1✔
669
  }
670

671
  private static boolean isLoggingEnabledInReplay() {
672
    return getRootWorkflowContext().isLoggingEnabledInReplay();
1✔
673
  }
674

675
  public static UUID randomUUID() {
676
    assertNotReadOnly("random UUID");
1✔
677
    return getRootWorkflowContext().randomUUID();
1✔
678
  }
679

680
  public static Random newRandom() {
681
    assertNotReadOnly("random");
1✔
682
    return getRootWorkflowContext().newRandom();
1✔
683
  }
684

685
  public static Logger getLogger(Class<?> clazz) {
686
    Logger logger = LoggerFactory.getLogger(clazz);
1✔
687
    return new ReplayAwareLogger(
1✔
688
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
689
  }
690

691
  public static Logger getLogger(String name) {
692
    Logger logger = LoggerFactory.getLogger(name);
1✔
693
    return new ReplayAwareLogger(
1✔
694
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
695
  }
696

697
  public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
698
    return getRootWorkflowContext().getLastCompletionResult(resultClass, resultType);
1✔
699
  }
700

701
  @Nullable
702
  public static <T> T getSearchAttribute(String name) {
703
    List<T> list = getSearchAttributeValues(name);
1✔
704
    if (list == null) {
1✔
705
      return null;
1✔
706
    }
707
    Preconditions.checkState(list.size() > 0);
1!
708
    Preconditions.checkState(
1✔
709
        list.size() == 1,
1!
710
        "search attribute with name '%s' contains a list '%s' of values instead of a single value",
711
        name,
712
        list);
713
    return list.get(0);
1✔
714
  }
715

716
  @Nullable
717
  public static <T> List<T> getSearchAttributeValues(String name) {
718
    SearchAttributes searchAttributes =
719
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
720
    if (searchAttributes == null) {
1!
721
      return null;
×
722
    }
723
    List<T> decoded = SearchAttributesUtil.decode(searchAttributes, name);
1✔
724
    return decoded != null ? Collections.unmodifiableList(decoded) : null;
1✔
725
  }
726

727
  @Nonnull
728
  public static Map<String, List<?>> getSearchAttributes() {
729
    SearchAttributes searchAttributes =
730
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
731
    if (searchAttributes == null) {
1!
732
      return Collections.emptyMap();
×
733
    }
734
    return Collections.unmodifiableMap(SearchAttributesUtil.decode(searchAttributes));
1✔
735
  }
736

737
  @Nonnull
738
  public static io.temporal.common.SearchAttributes getTypedSearchAttributes() {
739
    SearchAttributes searchAttributes =
740
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
741
    return SearchAttributesUtil.decodeTyped(searchAttributes);
1✔
742
  }
743

744
  public static void upsertSearchAttributes(Map<String, ?> searchAttributes) {
745
    assertNotReadOnly("upsert search attribute");
1✔
746
    getWorkflowOutboundInterceptor().upsertSearchAttributes(searchAttributes);
1✔
747
  }
1✔
748

749
  public static void upsertTypedSearchAttributes(
750
      SearchAttributeUpdate<?>... searchAttributeUpdates) {
751
    assertNotReadOnly("upsert search attribute");
1✔
752
    getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
753
  }
1✔
754

755
  public static void upsertMemo(Map<String, Object> memo) {
756
    assertNotReadOnly("upsert memo");
1✔
757
    getWorkflowOutboundInterceptor().upsertMemo(memo);
1✔
758
  }
1✔
759

760
  public static DataConverter getDataConverter() {
761
    return getRootWorkflowContext().getDataConverter();
×
762
  }
763

764
  static DataConverter getDataConverterWithCurrentWorkflowContext() {
765
    return getRootWorkflowContext().getDataConverterWithCurrentWorkflowContext();
1✔
766
  }
767

768
  /**
769
   * Name of the workflow type the interface defines. It is either the interface short name * or
770
   * value of {@link WorkflowMethod#name()} parameter.
771
   *
772
   * @param workflowInterfaceClass interface annotated with @WorkflowInterface
773
   */
774
  public static String getWorkflowType(Class<?> workflowInterfaceClass) {
775
    POJOWorkflowInterfaceMetadata metadata =
1✔
776
        POJOWorkflowInterfaceMetadata.newInstance(workflowInterfaceClass);
1✔
777
    return metadata.getWorkflowType().get();
1✔
778
  }
779

780
  public static Optional<Exception> getPreviousRunFailure() {
781
    return Optional.ofNullable(getRootWorkflowContext().getReplayContext().getPreviousRunFailure())
1✔
782
        // Temporal Failure Values are additional user payload and serialized using user data
783
        // converter
784
        .map(f -> getDataConverterWithCurrentWorkflowContext().failureToException(f));
1✔
785
  }
786

787
  public static boolean isEveryHandlerFinished() {
788
    return getRootWorkflowContext().isEveryHandlerFinished();
1✔
789
  }
790

791
  public static <T> T newNexusServiceStub(Class<T> serviceInterface, NexusServiceOptions options) {
792
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
793
    NexusServiceOptions baseOptions =
794
        (options == null) ? context.getDefaultNexusServiceOptions() : options;
1✔
795

796
    @Nonnull
797
    Map<String, NexusServiceOptions> predefinedNexusServiceOptions =
1✔
798
        context.getNexusServiceOptions();
1✔
799

800
    ServiceDefinition serviceDef = ServiceDefinition.fromClass(serviceInterface);
1✔
801
    NexusServiceOptions mergedOptions =
1✔
802
        NexusServiceOptions.newBuilder(predefinedNexusServiceOptions.get(serviceDef.getName()))
1✔
803
            .mergeNexusServiceOptions(baseOptions)
1✔
804
            .build();
1✔
805
    return (T)
1✔
806
        Proxy.newProxyInstance(
1✔
807
            serviceInterface.getClassLoader(),
1✔
808
            new Class<?>[] {serviceInterface, StubMarker.class, AsyncInternal.AsyncMarker.class},
809
            new NexusServiceInvocationHandler(
810
                serviceDef,
811
                mergedOptions,
812
                getWorkflowOutboundInterceptor(),
1✔
813
                WorkflowInternal::assertNotReadOnly));
814
  }
815

816
  public static NexusServiceStub newUntypedNexusServiceStub(
817
      String service, NexusServiceOptions options) {
818
    return new NexusServiceStubImpl(
1✔
819
        service, options, getWorkflowOutboundInterceptor(), WorkflowInternal::assertNotReadOnly);
1✔
820
  }
821

822
  public static <T, R> NexusOperationHandle<R> startNexusOperation(
823
      Functions.Func1<T, R> operation, T arg) {
824
    return StartNexusCallInternal.startNexusOperation(() -> operation.apply(arg));
1✔
825
  }
826

827
  public static <R> NexusOperationHandle<R> startNexusOperation(Functions.Func<R> operation) {
828
    return StartNexusCallInternal.startNexusOperation(() -> operation.apply());
×
829
  }
830

831
  public static void setCurrentDetails(String details) {
832
    getRootWorkflowContext().setCurrentDetails(details);
1✔
833
  }
1✔
834

835
  @Nullable
836
  public static String getCurrentDetails() {
837
    return getRootWorkflowContext().getCurrentDetails();
1✔
838
  }
839

840
  static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
841
    return getRootWorkflowContext().getWorkflowOutboundInterceptor();
1✔
842
  }
843

844
  static SyncWorkflowContext getRootWorkflowContext() {
845
    return DeterministicRunnerImpl.currentThreadInternal().getWorkflowContext();
1✔
846
  }
847

848
  static boolean isReadOnly() {
849
    return getRootWorkflowContext().isReadOnly();
1✔
850
  }
851

852
  static void assertNotReadOnly(String action) {
853
    if (isReadOnly()) {
1✔
854
      throw new ReadOnlyException(action);
1✔
855
    }
856
  }
1✔
857

858
  static void assertNotInUpdateHandler(String message) {
859
    if (getCurrentUpdateInfo().isPresent()) {
1✔
860
      throw new UnsupportedContinueAsNewRequest(message);
1✔
861
    }
862
  }
1✔
863

864
  private static WorkflowThread getWorkflowThread() {
865
    return DeterministicRunnerImpl.currentThreadInternal();
1✔
866
  }
867

868
  /** Prohibit instantiation */
869
  private WorkflowInternal() {}
870
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc