• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.24
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.sync.AsyncInternal.AsyncMarker;
24
import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal;
25

26
import com.google.common.base.Joiner;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.nexusrpc.ServiceDefinition;
31
import io.temporal.activity.ActivityOptions;
32
import io.temporal.activity.LocalActivityOptions;
33
import io.temporal.api.common.v1.Payload;
34
import io.temporal.api.common.v1.SearchAttributes;
35
import io.temporal.api.common.v1.WorkflowExecution;
36
import io.temporal.common.RetryOptions;
37
import io.temporal.common.SearchAttributeUpdate;
38
import io.temporal.common.converter.DataConverter;
39
import io.temporal.common.interceptors.Header;
40
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
41
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
42
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
43
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
44
import io.temporal.internal.WorkflowThreadMarker;
45
import io.temporal.internal.common.ActivityOptionUtils;
46
import io.temporal.internal.common.NonIdempotentHandle;
47
import io.temporal.internal.common.SearchAttributesUtil;
48
import io.temporal.internal.logging.ReplayAwareLogger;
49
import io.temporal.internal.statemachines.UnsupportedContinueAsNewRequest;
50
import io.temporal.serviceclient.CheckedExceptionWrapper;
51
import io.temporal.workflow.*;
52
import io.temporal.workflow.Functions.Func;
53
import java.lang.reflect.*;
54
import java.time.Duration;
55
import java.util.*;
56
import java.util.function.BiPredicate;
57
import java.util.function.Supplier;
58
import javax.annotation.Nonnull;
59
import javax.annotation.Nullable;
60
import org.slf4j.Logger;
61
import org.slf4j.LoggerFactory;
62

63
/**
64
 * Never reference directly. It is public only because Java doesn't have internal package support.
65
 */
66
public final class WorkflowInternal {
67
  public static final int DEFAULT_VERSION = -1;
68

69
  public static @Nonnull WorkflowThread newWorkflowMethodThread(Runnable runnable, String name) {
70
    Object workflowThread =
71
        currentThreadInternal()
1✔
72
            .getWorkflowContext()
1✔
73
            .getWorkflowInboundInterceptor()
1✔
74
            .newWorkflowMethodThread(runnable, name);
1✔
75
    Preconditions.checkState(
1✔
76
        workflowThread != null,
77
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result to null. "
78
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract.");
79
    Preconditions.checkState(
1✔
80
        workflowThread instanceof WorkflowThread,
81
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result. "
82
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract. "
83
            + "Illegal object returned from the interceptors chain: %s",
84
        workflowThread);
85
    return (WorkflowThread) workflowThread;
1✔
86
  }
87

88
  public static Promise<Void> newTimer(Duration duration) {
89
    assertNotReadOnly("schedule timer");
1✔
90
    return getWorkflowOutboundInterceptor().newTimer(duration);
1✔
91
  }
92

93
  public static Promise<Void> newTimer(Duration duration, TimerOptions options) {
94
    assertNotReadOnly("schedule timer");
×
95
    return getWorkflowOutboundInterceptor().newTimer(duration, options);
×
96
  }
97

98
  /**
99
   * @param capacity the maximum size of the queue
100
   * @return new instance of {@link WorkflowQueue}
101
   * @deprecated this method created a deprecated implementation of the queue that has some methods
102
   *     implemented incorrectly. Please use {@link #newWorkflowQueue(int)} instead.
103
   */
104
  @Deprecated
105
  public static <E> WorkflowQueue<E> newQueue(int capacity) {
106
    return new WorkflowQueueDeprecatedImpl<>(capacity);
1✔
107
  }
108

109
  /**
110
   * Creates a {@link WorkflowQueue} implementation that can be used from workflow code.
111
   *
112
   * @param capacity the maximum size of the queue
113
   * @return new instance of {@link WorkflowQueue}
114
   */
115
  public static <E> WorkflowQueue<E> newWorkflowQueue(int capacity) {
116
    return new WorkflowQueueImpl<>(capacity);
1✔
117
  }
118

119
  public static WorkflowLock newWorkflowLock() {
120
    return new WorkflowLockImpl();
1✔
121
  }
122

123
  public static WorkflowSemaphore newWorkflowSemaphore(int permits) {
124
    return new WorkflowSemaphoreImpl(permits);
1✔
125
  }
126

127
  public static <E> CompletablePromise<E> newCompletablePromise() {
128
    return new CompletablePromiseImpl<>();
1✔
129
  }
130

131
  public static <E> Promise<E> newPromise(E value) {
132
    CompletablePromise<E> result = Workflow.newPromise();
1✔
133
    result.complete(value);
1✔
134
    return result;
1✔
135
  }
136

137
  public static <E> Promise<E> newFailedPromise(Exception failure) {
138
    CompletablePromise<E> result = new CompletablePromiseImpl<>();
1✔
139
    result.completeExceptionally(CheckedExceptionWrapper.wrap(failure));
1✔
140
    return result;
1✔
141
  }
142

143
  /**
144
   * Register query or queries implementation object. There is no need to register top level
145
   * workflow implementation object as it is done implicitly. Only methods annotated with @{@link
146
   * QueryMethod} are registered. TODO(quinn) LIES!
147
   */
148
  public static void registerListener(Object implementation) {
149
    if (implementation instanceof DynamicSignalHandler) {
1✔
150
      getWorkflowOutboundInterceptor()
1✔
151
          .registerDynamicSignalHandler(
1✔
152
              new WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput(
153
                  (DynamicSignalHandler) implementation));
154
      return;
1✔
155
    }
156
    if (implementation instanceof DynamicQueryHandler) {
1✔
157
      getWorkflowOutboundInterceptor()
1✔
158
          .registerDynamicQueryHandler(
1✔
159
              new WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput(
160
                  (DynamicQueryHandler) implementation));
161
      return;
1✔
162
    }
163
    if (implementation instanceof DynamicUpdateHandler) {
1✔
164
      getWorkflowOutboundInterceptor()
1✔
165
          .registerDynamicUpdateHandler(
1✔
166
              new WorkflowOutboundCallsInterceptor.RegisterDynamicUpdateHandlerInput(
167
                  (DynamicUpdateHandler) implementation));
168
      return;
1✔
169
    }
170
    Class<?> cls = implementation.getClass();
1✔
171
    POJOWorkflowImplMetadata workflowMetadata = POJOWorkflowImplMetadata.newListenerInstance(cls);
1✔
172
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getQueryMethods()) {
1✔
173
      Method method = methodMetadata.getWorkflowMethod();
1✔
174
      getWorkflowOutboundInterceptor()
1✔
175
          .registerQuery(
1✔
176
              new WorkflowOutboundCallsInterceptor.RegisterQueryInput(
177
                  methodMetadata.getName(),
1✔
178
                  method.getParameterTypes(),
1✔
179
                  method.getGenericParameterTypes(),
1✔
180
                  (args) -> {
181
                    try {
182
                      return method.invoke(implementation, args);
1✔
183
                    } catch (Throwable e) {
×
184
                      throw CheckedExceptionWrapper.wrap(e);
×
185
                    }
186
                  }));
187
    }
1✔
188
    List<WorkflowOutboundCallsInterceptor.SignalRegistrationRequest> requests = new ArrayList<>();
1✔
189
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getSignalMethods()) {
1✔
190
      Method method = methodMetadata.getWorkflowMethod();
1✔
191
      SignalMethod signalMethod = method.getAnnotation(SignalMethod.class);
1✔
192
      requests.add(
1✔
193
          new WorkflowOutboundCallsInterceptor.SignalRegistrationRequest(
194
              methodMetadata.getName(),
1✔
195
              signalMethod.unfinishedPolicy(),
1✔
196
              method.getParameterTypes(),
1✔
197
              method.getGenericParameterTypes(),
1✔
198
              (args) -> {
199
                try {
200
                  method.invoke(implementation, args);
1✔
201
                } catch (Throwable e) {
1✔
202
                  throw CheckedExceptionWrapper.wrap(e);
1✔
203
                }
1✔
204
              }));
1✔
205
    }
1✔
206
    if (!requests.isEmpty()) {
1✔
207
      getWorkflowOutboundInterceptor()
1✔
208
          .registerSignalHandlers(
1✔
209
              new WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput(requests));
210
    }
211

212
    // Get all validators and lazily assign them to update handlers as we see them.
213
    Map<String, POJOWorkflowMethodMetadata> validators =
1✔
214
        new HashMap<>(workflowMetadata.getUpdateValidatorMethods().size());
1✔
215
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateValidatorMethods()) {
1✔
216
      Method method = methodMetadata.getWorkflowMethod();
1✔
217
      UpdateValidatorMethod updateValidatorMethod =
1✔
218
          method.getAnnotation(UpdateValidatorMethod.class);
1✔
219
      if (validators.containsKey(updateValidatorMethod.updateName())) {
1✔
220
        throw new IllegalArgumentException(
×
221
            "Duplicate validator for update handle " + updateValidatorMethod.updateName());
×
222
      }
223
      validators.put(updateValidatorMethod.updateName(), methodMetadata);
1✔
224
    }
1✔
225

226
    List<WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest> updateRequests =
1✔
227
        new ArrayList<>();
228
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateMethods()) {
1✔
229
      Method method = methodMetadata.getWorkflowMethod();
1✔
230
      UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
1✔
231
      // Get the update name, defaulting to the method name if not specified.
232
      String updateMethodName = updateMethod.name();
1✔
233
      if (updateMethodName.isEmpty()) {
1✔
234
        updateMethodName = method.getName();
1✔
235
      }
236
      // Check if any validators claim they are the validator for this update
237
      POJOWorkflowMethodMetadata validatorMethodMetadata = validators.remove(updateMethodName);
1✔
238
      Method validatorMethod;
239
      if (validatorMethodMetadata != null) {
1✔
240
        validatorMethod = validatorMethodMetadata.getWorkflowMethod();
1✔
241
        if (!Arrays.equals(validatorMethod.getParameterTypes(), method.getParameterTypes())) {
1✔
242
          throw new IllegalArgumentException(
×
243
              "Validator for: "
244
                  + updateMethodName
245
                  + " type parameters do not match the update handle");
246
        }
247
      } else {
248
        validatorMethod = null;
1✔
249
      }
250
      updateRequests.add(
1✔
251
          new WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest(
252
              methodMetadata.getName(),
1✔
253
              updateMethod.unfinishedPolicy(),
1✔
254
              method.getParameterTypes(),
1✔
255
              method.getGenericParameterTypes(),
1✔
256
              (args) -> {
257
                try {
258
                  if (validatorMethod != null) {
1✔
259
                    validatorMethod.invoke(implementation, args);
1✔
260
                  }
261
                } catch (Throwable e) {
1✔
262
                  throw CheckedExceptionWrapper.wrap(e);
1✔
263
                }
1✔
264
              },
1✔
265
              (args) -> {
266
                try {
267
                  return method.invoke(implementation, args);
1✔
268
                } catch (Throwable e) {
1✔
269
                  throw CheckedExceptionWrapper.wrap(e);
1✔
270
                }
271
              }));
272
    }
1✔
273
    if (!updateRequests.isEmpty()) {
1✔
274
      getWorkflowOutboundInterceptor()
1✔
275
          .registerUpdateHandlers(
1✔
276
              new WorkflowOutboundCallsInterceptor.RegisterUpdateHandlersInput(updateRequests));
277
    }
278
    if (!validators.isEmpty()) {
1✔
279
      throw new IllegalArgumentException(
×
280
          "Missing update methods for update validator(s): "
281
              + Joiner.on(", ").join(validators.keySet()));
×
282
    }
283
  }
1✔
284

285
  /** Should be used to get current time instead of {@link System#currentTimeMillis()} */
286
  public static long currentTimeMillis() {
287
    return getWorkflowOutboundInterceptor().currentTimeMillis();
1✔
288
  }
289

290
  public static void setDefaultActivityOptions(ActivityOptions activityOptions) {
291
    getRootWorkflowContext().setDefaultActivityOptions(activityOptions);
1✔
292
  }
1✔
293

294
  public static void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOptions) {
295
    getRootWorkflowContext().applyActivityOptions(activityTypeToOptions);
1✔
296
  }
1✔
297

298
  public static void setDefaultLocalActivityOptions(LocalActivityOptions localActivityOptions) {
299
    getRootWorkflowContext().setDefaultLocalActivityOptions(localActivityOptions);
1✔
300
  }
1✔
301

302
  public static void applyLocalActivityOptions(
303
      Map<String, LocalActivityOptions> activityTypeToOptions) {
304
    getRootWorkflowContext().applyLocalActivityOptions(activityTypeToOptions);
1✔
305
  }
1✔
306

307
  /**
308
   * Creates client stub to activities that implement given interface.
309
   *
310
   * @param activityInterface interface type implemented by activities
311
   * @param options options that together with the properties of {@link
312
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
313
   * @param activityMethodOptions activity method-specific invocation parameters
314
   */
315
  public static <T> T newActivityStub(
316
      Class<T> activityInterface,
317
      ActivityOptions options,
318
      Map<String, ActivityOptions> activityMethodOptions) {
319
    // Merge the activity options we may have received from the workflow with the options we may
320
    // have received in WorkflowImplementationOptions.
321
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
322
    options = (options == null) ? context.getDefaultActivityOptions() : options;
1✔
323

324
    Map<String, ActivityOptions> mergedActivityOptionsMap;
325
    @Nonnull Map<String, ActivityOptions> predefinedActivityOptions = context.getActivityOptions();
1✔
326
    if (activityMethodOptions != null
1✔
327
        && !activityMethodOptions.isEmpty()
×
328
        && predefinedActivityOptions.isEmpty()) {
×
329
      // we need to merge only in this case
330
      mergedActivityOptionsMap = new HashMap<>(predefinedActivityOptions);
×
331
      ActivityOptionUtils.mergePredefinedActivityOptions(
×
332
          mergedActivityOptionsMap, activityMethodOptions);
333
    } else {
334
      mergedActivityOptionsMap =
1✔
335
          MoreObjects.firstNonNull(
1✔
336
              activityMethodOptions,
337
              MoreObjects.firstNonNull(predefinedActivityOptions, Collections.emptyMap()));
1✔
338
    }
339

340
    InvocationHandler invocationHandler =
1✔
341
        ActivityInvocationHandler.newInstance(
1✔
342
            activityInterface,
343
            options,
344
            mergedActivityOptionsMap,
345
            context.getWorkflowOutboundInterceptor(),
1✔
346
            () -> assertNotReadOnly("schedule activity"));
1✔
347
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
348
  }
349

350
  /**
351
   * Creates client stub to local activities that implement given interface.
352
   *
353
   * @param activityInterface interface type implemented by activities
354
   * @param options options that together with the properties of {@link
355
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
356
   * @param activityMethodOptions activity method-specific invocation parameters
357
   */
358
  public static <T> T newLocalActivityStub(
359
      Class<T> activityInterface,
360
      LocalActivityOptions options,
361
      @Nullable Map<String, LocalActivityOptions> activityMethodOptions) {
362
    // Merge the activity options we may have received from the workflow with the options we may
363
    // have received in WorkflowImplementationOptions.
364
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
365
    options = (options == null) ? context.getDefaultLocalActivityOptions() : options;
1✔
366

367
    Map<String, LocalActivityOptions> mergedLocalActivityOptionsMap;
368
    @Nonnull
369
    Map<String, LocalActivityOptions> predefinedLocalActivityOptions =
1✔
370
        context.getLocalActivityOptions();
1✔
371
    if (activityMethodOptions != null
1✔
372
        && !activityMethodOptions.isEmpty()
×
373
        && predefinedLocalActivityOptions.isEmpty()) {
×
374
      // we need to merge only in this case
375
      mergedLocalActivityOptionsMap = new HashMap<>(predefinedLocalActivityOptions);
×
376
      ActivityOptionUtils.mergePredefinedLocalActivityOptions(
×
377
          mergedLocalActivityOptionsMap, activityMethodOptions);
378
    } else {
379
      mergedLocalActivityOptionsMap =
1✔
380
          MoreObjects.firstNonNull(
1✔
381
              activityMethodOptions,
382
              MoreObjects.firstNonNull(predefinedLocalActivityOptions, Collections.emptyMap()));
1✔
383
    }
384

385
    InvocationHandler invocationHandler =
1✔
386
        LocalActivityInvocationHandler.newInstance(
1✔
387
            activityInterface,
388
            options,
389
            mergedLocalActivityOptionsMap,
390
            WorkflowInternal.getWorkflowOutboundInterceptor(),
1✔
391
            () -> assertNotReadOnly("schedule local activity"));
1✔
392
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
393
  }
394

395
  public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
396
    return ActivityStubImpl.newInstance(
1✔
397
        options, getWorkflowOutboundInterceptor(), () -> assertNotReadOnly("schedule activity"));
1✔
398
  }
399

400
  public static ActivityStub newUntypedLocalActivityStub(LocalActivityOptions options) {
401
    return LocalActivityStubImpl.newInstance(
1✔
402
        options,
403
        getWorkflowOutboundInterceptor(),
1✔
404
        () -> assertNotReadOnly("schedule local activity"));
1✔
405
  }
406

407
  @SuppressWarnings("unchecked")
408
  public static <T> T newChildWorkflowStub(
409
      Class<T> workflowInterface, ChildWorkflowOptions options) {
410
    return (T)
1✔
411
        Proxy.newProxyInstance(
1✔
412
            workflowInterface.getClassLoader(),
1✔
413
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
414
            new ChildWorkflowInvocationHandler(
415
                workflowInterface,
416
                options,
417
                getWorkflowOutboundInterceptor(),
1✔
418
                WorkflowInternal::assertNotReadOnly));
419
  }
420

421
  @SuppressWarnings("unchecked")
422
  public static <T> T newExternalWorkflowStub(
423
      Class<T> workflowInterface, WorkflowExecution execution) {
424
    return (T)
1✔
425
        Proxy.newProxyInstance(
1✔
426
            workflowInterface.getClassLoader(),
1✔
427
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
428
            new ExternalWorkflowInvocationHandler(
429
                workflowInterface,
430
                execution,
431
                getWorkflowOutboundInterceptor(),
1✔
432
                WorkflowInternal::assertNotReadOnly));
433
  }
434

435
  public static Promise<WorkflowExecution> getWorkflowExecution(Object workflowStub) {
436
    if (workflowStub instanceof StubMarker) {
1✔
437
      Object stub = ((StubMarker) workflowStub).__getUntypedStub();
1✔
438
      return ((ChildWorkflowStub) stub).getExecution();
1✔
439
    }
440
    throw new IllegalArgumentException(
×
441
        "Not a workflow stub created through Workflow.newChildWorkflowStub: " + workflowStub);
442
  }
443

444
  public static ChildWorkflowStub newUntypedChildWorkflowStub(
445
      String workflowType, ChildWorkflowOptions options) {
446
    return new ChildWorkflowStubImpl(
1✔
447
        workflowType,
448
        options,
449
        getWorkflowOutboundInterceptor(),
1✔
450
        WorkflowInternal::assertNotReadOnly);
451
  }
452

453
  public static ExternalWorkflowStub newUntypedExternalWorkflowStub(WorkflowExecution execution) {
454
    return new ExternalWorkflowStubImpl(
1✔
455
        execution, getWorkflowOutboundInterceptor(), WorkflowInternal::assertNotReadOnly);
1✔
456
  }
457

458
  /**
459
   * Creates client stub that can be used to continue this workflow as new.
460
   *
461
   * @param workflowInterface interface type implemented by the next generation of workflow
462
   */
463
  @SuppressWarnings("unchecked")
464
  public static <T> T newContinueAsNewStub(
465
      Class<T> workflowInterface, ContinueAsNewOptions options) {
466
    return (T)
1✔
467
        Proxy.newProxyInstance(
1✔
468
            workflowInterface.getClassLoader(),
1✔
469
            new Class<?>[] {workflowInterface},
470
            new ContinueAsNewWorkflowInvocationHandler(
471
                workflowInterface, options, getWorkflowOutboundInterceptor()));
1✔
472
  }
473

474
  /**
475
   * Execute activity by name.
476
   *
477
   * @param name name of the activity
478
   * @param resultClass activity return type
479
   * @param args list of activity arguments
480
   * @param <R> activity return type
481
   * @return activity result
482
   */
483
  public static <R> R executeActivity(
484
      String name, ActivityOptions options, Class<R> resultClass, Type resultType, Object... args) {
485
    assertNotReadOnly("schedule activity");
×
486
    Promise<R> result =
487
        getWorkflowOutboundInterceptor()
×
488
            .executeActivity(
×
489
                new WorkflowOutboundCallsInterceptor.ActivityInput<>(
490
                    name, resultClass, resultType, args, options, Header.empty()))
×
491
            .getResult();
×
492
    if (AsyncInternal.isAsync()) {
×
493
      AsyncInternal.setAsyncResult(result);
×
494
      return null; // ignored
×
495
    }
496
    return result.get();
×
497
  }
498

499
  public static void await(String reason, Supplier<Boolean> unblockCondition)
500
      throws DestroyWorkflowThreadError {
501
    assertNotReadOnly(reason);
1✔
502
    getWorkflowOutboundInterceptor()
1✔
503
        .await(
1✔
504
            reason,
505
            () -> {
506
              getRootWorkflowContext().setReadOnly(true);
1✔
507
              try {
508
                return unblockCondition.get();
1✔
509
              } finally {
510
                getRootWorkflowContext().setReadOnly(false);
1✔
511
              }
512
            });
513
  }
1✔
514

515
  public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
516
      throws DestroyWorkflowThreadError {
517
    assertNotReadOnly(reason);
1✔
518
    return getWorkflowOutboundInterceptor()
1✔
519
        .await(
1✔
520
            timeout,
521
            reason,
522
            () -> {
523
              getRootWorkflowContext().setReadOnly(true);
1✔
524
              try {
525
                return unblockCondition.get();
1✔
526
              } finally {
527
                getRootWorkflowContext().setReadOnly(false);
1✔
528
              }
529
            });
530
  }
531

532
  public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
533
    assertNotReadOnly("side effect");
1✔
534
    return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func);
1✔
535
  }
536

537
  public static <R> R mutableSideEffect(
538
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
539
    assertNotReadOnly("mutable side effect");
1✔
540
    return getWorkflowOutboundInterceptor()
1✔
541
        .mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
542
  }
543

544
  public static int getVersion(String changeId, int minSupported, int maxSupported) {
545
    assertNotReadOnly("get version");
1✔
546
    return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
1✔
547
  }
548

549
  public static <V> Promise<Void> promiseAllOf(Iterable<Promise<V>> promises) {
550
    return new AllOfPromise(promises);
1✔
551
  }
552

553
  public static Promise<Void> promiseAllOf(Promise<?>... promises) {
554
    return new AllOfPromise(promises);
1✔
555
  }
556

557
  public static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
558
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
559
  }
560

561
  public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
562
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
563
  }
564

565
  public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
566
    return new CancellationScopeImpl(detached, runnable);
1✔
567
  }
568

569
  public static CancellationScope newCancellationScope(
570
      boolean detached, Functions.Proc1<CancellationScope> proc) {
571
    return new CancellationScopeImpl(detached, proc);
1✔
572
  }
573

574
  public static CancellationScopeImpl currentCancellationScope() {
575
    return CancellationScopeImpl.current();
1✔
576
  }
577

578
  public static RuntimeException wrap(Throwable e) {
579
    return CheckedExceptionWrapper.wrap(e);
1✔
580
  }
581

582
  public static Throwable unwrap(Throwable e) {
583
    return CheckedExceptionWrapper.unwrap(e);
1✔
584
  }
585

586
  /** Returns false if not under workflow code. */
587
  public static boolean isReplaying() {
588
    Optional<WorkflowThread> thread = DeterministicRunnerImpl.currentThreadInternalIfPresent();
1✔
589
    return thread.isPresent() && getRootWorkflowContext().isReplaying();
1✔
590
  }
591

592
  public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
593
    Payload memo = getRootWorkflowContext().getReplayContext().getMemo(key);
1✔
594
    if (memo == null) {
1✔
595
      return null;
1✔
596
    }
597

598
    return getDataConverterWithCurrentWorkflowContext().fromPayload(memo, valueClass, genericType);
1✔
599
  }
600

601
  public static <R> R retry(
602
      RetryOptions options, Optional<Duration> expiration, Functions.Func<R> fn) {
603
    assertNotReadOnly("retry");
1✔
604
    return WorkflowRetryerInternal.retry(
1✔
605
        options.toBuilder().validateBuildWithDefaults(), expiration, fn);
1✔
606
  }
607

608
  public static void continueAsNew(
609
      @Nullable String workflowType, @Nullable ContinueAsNewOptions options, Object[] args) {
610
    assertNotReadOnly("continue as new");
1✔
611
    assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
1✔
612
    getWorkflowOutboundInterceptor()
1✔
613
        .continueAsNew(
×
614
            new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
615
                workflowType, options, args, Header.empty()));
1✔
616
  }
×
617

618
  public static void continueAsNew(
619
      @Nullable String workflowType,
620
      @Nullable ContinueAsNewOptions options,
621
      Object[] args,
622
      WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
623
    assertNotReadOnly("continue as new");
1✔
624
    assertNotInUpdateHandler("ContinueAsNew is not supported in an update handler");
1✔
625
    outboundCallsInterceptor.continueAsNew(
1✔
626
        new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
627
            workflowType, options, args, Header.empty()));
1✔
628
  }
×
629

630
  public static Promise<Void> cancelWorkflow(WorkflowExecution execution) {
631
    assertNotReadOnly("cancel workflow");
×
632
    return getWorkflowOutboundInterceptor()
×
633
        .cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
×
634
        .getResult();
×
635
  }
636

637
  public static void sleep(Duration duration) {
638
    assertNotReadOnly("sleep");
1✔
639
    getWorkflowOutboundInterceptor().sleep(duration);
1✔
640
  }
1✔
641

642
  public static boolean isWorkflowThread() {
643
    return WorkflowThreadMarker.isWorkflowThread();
1✔
644
  }
645

646
  public static <T> T deadlockDetectorOff(Functions.Func<T> func) {
647
    if (isWorkflowThread()) {
1✔
648
      try (NonIdempotentHandle ignored = getWorkflowThread().lockDeadlockDetector()) {
1✔
649
        return func.apply();
1✔
650
      }
651
    } else {
652
      return func.apply();
1✔
653
    }
654
  }
655

656
  public static WorkflowInfo getWorkflowInfo() {
657
    return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
1✔
658
  }
659

660
  public static Optional<UpdateInfo> getCurrentUpdateInfo() {
661
    return getRootWorkflowContext().getCurrentUpdateInfo();
1✔
662
  }
663

664
  public static Scope getMetricsScope() {
665
    return getWorkflowOutboundInterceptor().getMetricsScope();
1✔
666
  }
667

668
  private static boolean isLoggingEnabledInReplay() {
669
    return getRootWorkflowContext().isLoggingEnabledInReplay();
1✔
670
  }
671

672
  public static UUID randomUUID() {
673
    assertNotReadOnly("random UUID");
1✔
674
    return getRootWorkflowContext().randomUUID();
1✔
675
  }
676

677
  public static Random newRandom() {
678
    assertNotReadOnly("random");
1✔
679
    return getRootWorkflowContext().newRandom();
1✔
680
  }
681

682
  public static Logger getLogger(Class<?> clazz) {
683
    Logger logger = LoggerFactory.getLogger(clazz);
1✔
684
    return new ReplayAwareLogger(
1✔
685
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
686
  }
687

688
  public static Logger getLogger(String name) {
689
    Logger logger = LoggerFactory.getLogger(name);
1✔
690
    return new ReplayAwareLogger(
1✔
691
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
692
  }
693

694
  public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
695
    return getRootWorkflowContext().getLastCompletionResult(resultClass, resultType);
1✔
696
  }
697

698
  @Nullable
699
  public static <T> T getSearchAttribute(String name) {
700
    List<T> list = getSearchAttributeValues(name);
1✔
701
    if (list == null) {
1✔
702
      return null;
1✔
703
    }
704
    Preconditions.checkState(list.size() > 0);
1✔
705
    Preconditions.checkState(
1✔
706
        list.size() == 1,
1✔
707
        "search attribute with name '%s' contains a list '%s' of values instead of a single value",
708
        name,
709
        list);
710
    return list.get(0);
1✔
711
  }
712

713
  @Nullable
714
  public static <T> List<T> getSearchAttributeValues(String name) {
715
    SearchAttributes searchAttributes =
716
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
717
    if (searchAttributes == null) {
1✔
718
      return null;
×
719
    }
720
    List<T> decoded = SearchAttributesUtil.decode(searchAttributes, name);
1✔
721
    return decoded != null ? Collections.unmodifiableList(decoded) : null;
1✔
722
  }
723

724
  @Nonnull
725
  public static Map<String, List<?>> getSearchAttributes() {
726
    SearchAttributes searchAttributes =
727
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
728
    if (searchAttributes == null) {
1✔
729
      return Collections.emptyMap();
×
730
    }
731
    return Collections.unmodifiableMap(SearchAttributesUtil.decode(searchAttributes));
1✔
732
  }
733

734
  @Nonnull
735
  public static io.temporal.common.SearchAttributes getTypedSearchAttributes() {
736
    SearchAttributes searchAttributes =
737
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
738
    return SearchAttributesUtil.decodeTyped(searchAttributes);
1✔
739
  }
740

741
  public static void upsertSearchAttributes(Map<String, ?> searchAttributes) {
742
    assertNotReadOnly("upsert search attribute");
1✔
743
    getWorkflowOutboundInterceptor().upsertSearchAttributes(searchAttributes);
1✔
744
  }
1✔
745

746
  public static void upsertTypedSearchAttributes(
747
      SearchAttributeUpdate<?>... searchAttributeUpdates) {
748
    assertNotReadOnly("upsert search attribute");
1✔
749
    getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
750
  }
1✔
751

752
  public static void upsertMemo(Map<String, Object> memo) {
753
    assertNotReadOnly("upsert memo");
1✔
754
    getWorkflowOutboundInterceptor().upsertMemo(memo);
1✔
755
  }
1✔
756

757
  public static DataConverter getDataConverter() {
758
    return getRootWorkflowContext().getDataConverter();
×
759
  }
760

761
  static DataConverter getDataConverterWithCurrentWorkflowContext() {
762
    return getRootWorkflowContext().getDataConverterWithCurrentWorkflowContext();
1✔
763
  }
764

765
  /**
766
   * Name of the workflow type the interface defines. It is either the interface short name * or
767
   * value of {@link WorkflowMethod#name()} parameter.
768
   *
769
   * @param workflowInterfaceClass interface annotated with @WorkflowInterface
770
   */
771
  public static String getWorkflowType(Class<?> workflowInterfaceClass) {
772
    POJOWorkflowInterfaceMetadata metadata =
1✔
773
        POJOWorkflowInterfaceMetadata.newInstance(workflowInterfaceClass);
1✔
774
    return metadata.getWorkflowType().get();
1✔
775
  }
776

777
  public static Optional<Exception> getPreviousRunFailure() {
778
    return Optional.ofNullable(getRootWorkflowContext().getReplayContext().getPreviousRunFailure())
1✔
779
        // Temporal Failure Values are additional user payload and serialized using user data
780
        // converter
781
        .map(f -> getDataConverterWithCurrentWorkflowContext().failureToException(f));
1✔
782
  }
783

784
  public static boolean isEveryHandlerFinished() {
785
    return getRootWorkflowContext().isEveryHandlerFinished();
1✔
786
  }
787

788
  public static <T> T newNexusServiceStub(Class<T> serviceInterface, NexusServiceOptions options) {
789
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
790
    NexusServiceOptions baseOptions =
791
        (options == null) ? context.getDefaultNexusServiceOptions() : options;
1✔
792

793
    @Nonnull
794
    Map<String, NexusServiceOptions> predefinedNexusServiceOptions =
1✔
795
        context.getNexusServiceOptions();
1✔
796

797
    ServiceDefinition serviceDef = ServiceDefinition.fromClass(serviceInterface);
1✔
798
    NexusServiceOptions mergedOptions =
1✔
799
        NexusServiceOptions.newBuilder(predefinedNexusServiceOptions.get(serviceDef.getName()))
1✔
800
            .mergeNexusServiceOptions(baseOptions)
1✔
801
            .build();
1✔
802
    return (T)
1✔
803
        Proxy.newProxyInstance(
1✔
804
            serviceInterface.getClassLoader(),
1✔
805
            new Class<?>[] {serviceInterface, StubMarker.class, AsyncInternal.AsyncMarker.class},
806
            new NexusServiceInvocationHandler(
807
                serviceDef,
808
                mergedOptions,
809
                getWorkflowOutboundInterceptor(),
1✔
810
                WorkflowInternal::assertNotReadOnly));
811
  }
812

813
  public static NexusServiceStub newUntypedNexusServiceStub(
814
      String service, NexusServiceOptions options) {
815
    return new NexusServiceStubImpl(
1✔
816
        service, options, getWorkflowOutboundInterceptor(), WorkflowInternal::assertNotReadOnly);
1✔
817
  }
818

819
  public static <T, R> NexusOperationHandle<R> startNexusOperation(
820
      Functions.Func1<T, R> operation, T arg) {
821
    return StartNexusCallInternal.startNexusOperation(() -> operation.apply(arg));
1✔
822
  }
823

824
  public static <R> NexusOperationHandle<R> startNexusOperation(Functions.Func<R> operation) {
825
    return StartNexusCallInternal.startNexusOperation(() -> operation.apply());
×
826
  }
827

828
  static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
829
    return getRootWorkflowContext().getWorkflowOutboundInterceptor();
1✔
830
  }
831

832
  static SyncWorkflowContext getRootWorkflowContext() {
833
    return DeterministicRunnerImpl.currentThreadInternal().getWorkflowContext();
1✔
834
  }
835

836
  static boolean isReadOnly() {
837
    return getRootWorkflowContext().isReadOnly();
1✔
838
  }
839

840
  static void assertNotReadOnly(String action) {
841
    if (isReadOnly()) {
1✔
842
      throw new ReadOnlyException(action);
1✔
843
    }
844
  }
1✔
845

846
  static void assertNotInUpdateHandler(String message) {
847
    if (getCurrentUpdateInfo().isPresent()) {
1✔
848
      throw new UnsupportedContinueAsNewRequest(message);
1✔
849
    }
850
  }
1✔
851

852
  private static WorkflowThread getWorkflowThread() {
853
    return DeterministicRunnerImpl.currentThreadInternal();
1✔
854
  }
855

856
  /** Prohibit instantiation */
857
  private WorkflowInternal() {}
858
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc