• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.98
/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.sync.AsyncInternal.AsyncMarker;
24
import static io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal;
25

26
import com.google.common.base.Joiner;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.activity.ActivityOptions;
31
import io.temporal.activity.LocalActivityOptions;
32
import io.temporal.api.common.v1.Payload;
33
import io.temporal.api.common.v1.SearchAttributes;
34
import io.temporal.api.common.v1.WorkflowExecution;
35
import io.temporal.common.RetryOptions;
36
import io.temporal.common.SearchAttributeUpdate;
37
import io.temporal.common.converter.DataConverter;
38
import io.temporal.common.interceptors.Header;
39
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
40
import io.temporal.common.metadata.POJOWorkflowImplMetadata;
41
import io.temporal.common.metadata.POJOWorkflowInterfaceMetadata;
42
import io.temporal.common.metadata.POJOWorkflowMethodMetadata;
43
import io.temporal.internal.WorkflowThreadMarker;
44
import io.temporal.internal.common.ActivityOptionUtils;
45
import io.temporal.internal.common.NonIdempotentHandle;
46
import io.temporal.internal.common.SearchAttributesUtil;
47
import io.temporal.internal.logging.ReplayAwareLogger;
48
import io.temporal.serviceclient.CheckedExceptionWrapper;
49
import io.temporal.workflow.*;
50
import io.temporal.workflow.Functions.Func;
51
import java.lang.reflect.*;
52
import java.time.Duration;
53
import java.util.*;
54
import java.util.function.BiPredicate;
55
import java.util.function.Supplier;
56
import javax.annotation.Nonnull;
57
import javax.annotation.Nullable;
58
import org.slf4j.Logger;
59
import org.slf4j.LoggerFactory;
60

61
/**
62
 * Never reference directly. It is public only because Java doesn't have internal package support.
63
 */
64
public final class WorkflowInternal {
65
  public static final int DEFAULT_VERSION = -1;
66

67
  public static @Nonnull WorkflowThread newWorkflowMethodThread(Runnable runnable, String name) {
68
    Object workflowThread =
69
        currentThreadInternal()
1✔
70
            .getWorkflowContext()
1✔
71
            .getWorkflowInboundInterceptor()
1✔
72
            .newWorkflowMethodThread(runnable, name);
1✔
73
    Preconditions.checkState(
1✔
74
        workflowThread != null,
75
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result to null. "
76
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract.");
77
    Preconditions.checkState(
1✔
78
        workflowThread instanceof WorkflowThread,
79
        "[BUG] One of the custom interceptors illegally overrode newWorkflowMethodThread result. "
80
            + "Check WorkflowInboundCallsInterceptor#newWorkflowMethodThread contract. "
81
            + "Illegal object returned from the interceptors chain: %s",
82
        workflowThread);
83
    return (WorkflowThread) workflowThread;
1✔
84
  }
85

86
  public static Promise<Void> newTimer(Duration duration) {
87
    return getWorkflowOutboundInterceptor().newTimer(duration);
1✔
88
  }
89

90
  /**
91
   * @param capacity the maximum size of the queue
92
   * @return new instance of {@link WorkflowQueue}
93
   * @deprecated this method created a deprecated implementation of the queue that has some methods
94
   *     implemented incorrectly. Please use {@link #newWorkflowQueue(int)} instead.
95
   */
96
  @Deprecated
97
  public static <E> WorkflowQueue<E> newQueue(int capacity) {
98
    return new WorkflowQueueDeprecatedImpl<>(capacity);
1✔
99
  }
100

101
  /**
102
   * Creates a {@link WorkflowQueue} implementation that can be used from workflow code.
103
   *
104
   * @param capacity the maximum size of the queue
105
   * @return new instance of {@link WorkflowQueue}
106
   */
107
  public static <E> WorkflowQueue<E> newWorkflowQueue(int capacity) {
108
    return new WorkflowQueueImpl<>(capacity);
1✔
109
  }
110

111
  public static <E> CompletablePromise<E> newCompletablePromise() {
112
    return new CompletablePromiseImpl<>();
1✔
113
  }
114

115
  public static <E> Promise<E> newPromise(E value) {
116
    CompletablePromise<E> result = Workflow.newPromise();
1✔
117
    result.complete(value);
1✔
118
    return result;
1✔
119
  }
120

121
  public static <E> Promise<E> newFailedPromise(Exception failure) {
122
    CompletablePromise<E> result = new CompletablePromiseImpl<>();
1✔
123
    result.completeExceptionally(CheckedExceptionWrapper.wrap(failure));
1✔
124
    return result;
1✔
125
  }
126

127
  /**
128
   * Register query or queries implementation object. There is no need to register top level
129
   * workflow implementation object as it is done implicitly. Only methods annotated with @{@link
130
   * QueryMethod} are registered. TODO(quinn) LIES!
131
   */
132
  public static void registerListener(Object implementation) {
133
    if (implementation instanceof DynamicSignalHandler) {
1✔
134
      getWorkflowOutboundInterceptor()
1✔
135
          .registerDynamicSignalHandler(
1✔
136
              new WorkflowOutboundCallsInterceptor.RegisterDynamicSignalHandlerInput(
137
                  (DynamicSignalHandler) implementation));
138
      return;
1✔
139
    }
140
    if (implementation instanceof DynamicQueryHandler) {
1✔
141
      getWorkflowOutboundInterceptor()
1✔
142
          .registerDynamicQueryHandler(
1✔
143
              new WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput(
144
                  (DynamicQueryHandler) implementation));
145
      return;
1✔
146
    }
147
    if (implementation instanceof DynamicUpdateHandler) {
1✔
148
      getWorkflowOutboundInterceptor()
×
149
          .registerDynamicUpdateHandler(
×
150
              new WorkflowOutboundCallsInterceptor.RegisterDynamicUpdateHandlerInput(
151
                  (DynamicUpdateHandler) implementation));
152
      return;
×
153
    }
154
    Class<?> cls = implementation.getClass();
1✔
155
    POJOWorkflowImplMetadata workflowMetadata = POJOWorkflowImplMetadata.newListenerInstance(cls);
1✔
156
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getQueryMethods()) {
1✔
157
      Method method = methodMetadata.getWorkflowMethod();
1✔
158
      getWorkflowOutboundInterceptor()
1✔
159
          .registerQuery(
1✔
160
              new WorkflowOutboundCallsInterceptor.RegisterQueryInput(
161
                  methodMetadata.getName(),
1✔
162
                  method.getParameterTypes(),
1✔
163
                  method.getGenericParameterTypes(),
1✔
164
                  (args) -> {
165
                    try {
166
                      return method.invoke(implementation, args);
1✔
167
                    } catch (Throwable e) {
×
168
                      throw CheckedExceptionWrapper.wrap(e);
×
169
                    }
170
                  }));
171
    }
1✔
172
    List<WorkflowOutboundCallsInterceptor.SignalRegistrationRequest> requests = new ArrayList<>();
1✔
173
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getSignalMethods()) {
1✔
174
      Method method = methodMetadata.getWorkflowMethod();
1✔
175
      requests.add(
1✔
176
          new WorkflowOutboundCallsInterceptor.SignalRegistrationRequest(
177
              methodMetadata.getName(),
1✔
178
              method.getParameterTypes(),
1✔
179
              method.getGenericParameterTypes(),
1✔
180
              (args) -> {
181
                try {
182
                  method.invoke(implementation, args);
1✔
183
                } catch (Throwable e) {
1✔
184
                  throw CheckedExceptionWrapper.wrap(e);
1✔
185
                }
1✔
186
              }));
1✔
187
    }
1✔
188
    if (!requests.isEmpty()) {
1✔
189
      getWorkflowOutboundInterceptor()
1✔
190
          .registerSignalHandlers(
1✔
191
              new WorkflowOutboundCallsInterceptor.RegisterSignalHandlersInput(requests));
192
    }
193

194
    // Get all validators and lazily assign them to update handlers as we see them.
195
    Map<String, POJOWorkflowMethodMetadata> validators =
1✔
196
        new HashMap<>(workflowMetadata.getUpdateValidatorMethods().size());
1✔
197
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateValidatorMethods()) {
1✔
198
      Method method = methodMetadata.getWorkflowMethod();
1✔
199
      UpdateValidatorMethod updateValidatorMethod =
1✔
200
          method.getAnnotation(UpdateValidatorMethod.class);
1✔
201
      if (validators.containsKey(updateValidatorMethod.updateName())) {
1✔
202
        throw new IllegalArgumentException(
×
203
            "Duplicate validator for update handle " + updateValidatorMethod.updateName());
×
204
      }
205
      validators.put(updateValidatorMethod.updateName(), methodMetadata);
1✔
206
    }
1✔
207

208
    List<WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest> updateRequests =
1✔
209
        new ArrayList<>();
210
    for (POJOWorkflowMethodMetadata methodMetadata : workflowMetadata.getUpdateMethods()) {
1✔
211
      Method method = methodMetadata.getWorkflowMethod();
1✔
212
      UpdateMethod updateMethod = method.getAnnotation(UpdateMethod.class);
1✔
213
      String updateMethodName = updateMethod.name();
1✔
214
      if (updateMethodName.isEmpty()) {
1✔
215
        updateMethodName = method.getName();
1✔
216
      }
217
      // Check if any validators claim they are the validator for this update
218
      POJOWorkflowMethodMetadata validatorMethodMetadata = validators.remove(updateMethodName);
1✔
219
      Method validatorMethod;
220
      if (validatorMethodMetadata != null) {
1✔
221
        validatorMethod = validatorMethodMetadata.getWorkflowMethod();
1✔
222
        if (!Arrays.equals(validatorMethod.getParameterTypes(), method.getParameterTypes())) {
1✔
223
          throw new IllegalArgumentException(
×
224
              "Validator for: "
225
                  + updateMethodName
226
                  + " type parameters do not match the update handle");
227
        }
228
      } else {
229
        validatorMethod = null;
1✔
230
      }
231
      updateRequests.add(
1✔
232
          new WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest(
233
              methodMetadata.getName(),
1✔
234
              method.getParameterTypes(),
1✔
235
              method.getGenericParameterTypes(),
1✔
236
              (args) -> {
237
                try {
238
                  if (validatorMethod != null) {
1✔
239
                    validatorMethod.invoke(implementation, args);
1✔
240
                  }
241
                } catch (Throwable e) {
1✔
242
                  throw CheckedExceptionWrapper.wrap(e);
1✔
243
                }
1✔
244
              },
1✔
245
              (args) -> {
246
                try {
247
                  return method.invoke(implementation, args);
1✔
248
                } catch (Throwable e) {
1✔
249
                  throw CheckedExceptionWrapper.wrap(e);
1✔
250
                }
251
              }));
252
    }
1✔
253
    if (!updateRequests.isEmpty()) {
1✔
254
      getWorkflowOutboundInterceptor()
1✔
255
          .registerUpdateHandlers(
1✔
256
              new WorkflowOutboundCallsInterceptor.RegisterUpdateHandlersInput(updateRequests));
257
    }
258
    if (!validators.isEmpty()) {
1✔
259
      throw new IllegalArgumentException(
×
260
          "Missing update methods for update validator(s): "
261
              + Joiner.on(", ").join(validators.keySet()));
×
262
    }
263
  }
1✔
264

265
  /** Should be used to get current time instead of {@link System#currentTimeMillis()} */
266
  public static long currentTimeMillis() {
267
    return getWorkflowOutboundInterceptor().currentTimeMillis();
1✔
268
  }
269

270
  public static void setDefaultActivityOptions(ActivityOptions activityOptions) {
271
    getRootWorkflowContext().setDefaultActivityOptions(activityOptions);
1✔
272
  }
1✔
273

274
  public static void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOptions) {
275
    getRootWorkflowContext().applyActivityOptions(activityTypeToOptions);
1✔
276
  }
1✔
277

278
  public static void setDefaultLocalActivityOptions(LocalActivityOptions localActivityOptions) {
279
    getRootWorkflowContext().setDefaultLocalActivityOptions(localActivityOptions);
1✔
280
  }
1✔
281

282
  public static void applyLocalActivityOptions(
283
      Map<String, LocalActivityOptions> activityTypeToOptions) {
284
    getRootWorkflowContext().applyLocalActivityOptions(activityTypeToOptions);
1✔
285
  }
1✔
286

287
  /**
288
   * Creates client stub to activities that implement given interface.
289
   *
290
   * @param activityInterface interface type implemented by activities
291
   * @param options options that together with the properties of {@link
292
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
293
   * @param activityMethodOptions activity method-specific invocation parameters
294
   */
295
  public static <T> T newActivityStub(
296
      Class<T> activityInterface,
297
      ActivityOptions options,
298
      Map<String, ActivityOptions> activityMethodOptions) {
299
    // Merge the activity options we may have received from the workflow with the options we may
300
    // have received in WorkflowImplementationOptions.
301
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
302
    options = (options == null) ? context.getDefaultActivityOptions() : options;
1✔
303

304
    Map<String, ActivityOptions> mergedActivityOptionsMap;
305
    @Nonnull Map<String, ActivityOptions> predefinedActivityOptions = context.getActivityOptions();
1✔
306
    if (activityMethodOptions != null
1✔
307
        && !activityMethodOptions.isEmpty()
×
308
        && predefinedActivityOptions.isEmpty()) {
×
309
      // we need to merge only in this case
310
      mergedActivityOptionsMap = new HashMap<>(predefinedActivityOptions);
×
311
      ActivityOptionUtils.mergePredefinedActivityOptions(
×
312
          mergedActivityOptionsMap, activityMethodOptions);
313
    } else {
314
      mergedActivityOptionsMap =
1✔
315
          MoreObjects.firstNonNull(
1✔
316
              activityMethodOptions,
317
              MoreObjects.firstNonNull(predefinedActivityOptions, Collections.emptyMap()));
1✔
318
    }
319

320
    InvocationHandler invocationHandler =
1✔
321
        ActivityInvocationHandler.newInstance(
1✔
322
            activityInterface,
323
            options,
324
            mergedActivityOptionsMap,
325
            context.getWorkflowOutboundInterceptor());
1✔
326
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
327
  }
328

329
  /**
330
   * Creates client stub to local activities that implement given interface.
331
   *
332
   * @param activityInterface interface type implemented by activities
333
   * @param options options that together with the properties of {@link
334
   *     io.temporal.activity.ActivityMethod} specify the activity invocation parameters
335
   * @param activityMethodOptions activity method-specific invocation parameters
336
   */
337
  public static <T> T newLocalActivityStub(
338
      Class<T> activityInterface,
339
      LocalActivityOptions options,
340
      @Nullable Map<String, LocalActivityOptions> activityMethodOptions) {
341
    // Merge the activity options we may have received from the workflow with the options we may
342
    // have received in WorkflowImplementationOptions.
343
    SyncWorkflowContext context = getRootWorkflowContext();
1✔
344
    options = (options == null) ? context.getDefaultLocalActivityOptions() : options;
1✔
345

346
    Map<String, LocalActivityOptions> mergedLocalActivityOptionsMap;
347
    @Nonnull
348
    Map<String, LocalActivityOptions> predefinedLocalActivityOptions =
1✔
349
        context.getLocalActivityOptions();
1✔
350
    if (activityMethodOptions != null
1✔
351
        && !activityMethodOptions.isEmpty()
×
352
        && predefinedLocalActivityOptions.isEmpty()) {
×
353
      // we need to merge only in this case
354
      mergedLocalActivityOptionsMap = new HashMap<>(predefinedLocalActivityOptions);
×
355
      ActivityOptionUtils.mergePredefinedLocalActivityOptions(
×
356
          mergedLocalActivityOptionsMap, activityMethodOptions);
357
    } else {
358
      mergedLocalActivityOptionsMap =
1✔
359
          MoreObjects.firstNonNull(
1✔
360
              activityMethodOptions,
361
              MoreObjects.firstNonNull(predefinedLocalActivityOptions, Collections.emptyMap()));
1✔
362
    }
363

364
    InvocationHandler invocationHandler =
1✔
365
        LocalActivityInvocationHandler.newInstance(
1✔
366
            activityInterface,
367
            options,
368
            mergedLocalActivityOptionsMap,
369
            WorkflowInternal.getWorkflowOutboundInterceptor());
1✔
370
    return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
1✔
371
  }
372

373
  public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
374
    return ActivityStubImpl.newInstance(options, getWorkflowOutboundInterceptor());
1✔
375
  }
376

377
  public static ActivityStub newUntypedLocalActivityStub(LocalActivityOptions options) {
378
    return LocalActivityStubImpl.newInstance(options, getWorkflowOutboundInterceptor());
1✔
379
  }
380

381
  @SuppressWarnings("unchecked")
382
  public static <T> T newChildWorkflowStub(
383
      Class<T> workflowInterface, ChildWorkflowOptions options) {
384
    return (T)
1✔
385
        Proxy.newProxyInstance(
1✔
386
            workflowInterface.getClassLoader(),
1✔
387
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
388
            new ChildWorkflowInvocationHandler(
389
                workflowInterface, options, getWorkflowOutboundInterceptor()));
1✔
390
  }
391

392
  @SuppressWarnings("unchecked")
393
  public static <T> T newExternalWorkflowStub(
394
      Class<T> workflowInterface, WorkflowExecution execution) {
395
    return (T)
1✔
396
        Proxy.newProxyInstance(
1✔
397
            workflowInterface.getClassLoader(),
1✔
398
            new Class<?>[] {workflowInterface, StubMarker.class, AsyncMarker.class},
399
            new ExternalWorkflowInvocationHandler(
400
                workflowInterface, execution, getWorkflowOutboundInterceptor()));
1✔
401
  }
402

403
  public static Promise<WorkflowExecution> getWorkflowExecution(Object workflowStub) {
404
    if (workflowStub instanceof StubMarker) {
1✔
405
      Object stub = ((StubMarker) workflowStub).__getUntypedStub();
1✔
406
      return ((ChildWorkflowStub) stub).getExecution();
1✔
407
    }
408
    throw new IllegalArgumentException(
×
409
        "Not a workflow stub created through Workflow.newChildWorkflowStub: " + workflowStub);
410
  }
411

412
  public static ChildWorkflowStub newUntypedChildWorkflowStub(
413
      String workflowType, ChildWorkflowOptions options) {
414
    return new ChildWorkflowStubImpl(workflowType, options, getWorkflowOutboundInterceptor());
1✔
415
  }
416

417
  public static ExternalWorkflowStub newUntypedExternalWorkflowStub(WorkflowExecution execution) {
418
    return new ExternalWorkflowStubImpl(execution, getWorkflowOutboundInterceptor());
1✔
419
  }
420

421
  /**
422
   * Creates client stub that can be used to continue this workflow as new.
423
   *
424
   * @param workflowInterface interface type implemented by the next generation of workflow
425
   */
426
  @SuppressWarnings("unchecked")
427
  public static <T> T newContinueAsNewStub(
428
      Class<T> workflowInterface, ContinueAsNewOptions options) {
429
    return (T)
1✔
430
        Proxy.newProxyInstance(
1✔
431
            workflowInterface.getClassLoader(),
1✔
432
            new Class<?>[] {workflowInterface},
433
            new ContinueAsNewWorkflowInvocationHandler(
434
                workflowInterface, options, getWorkflowOutboundInterceptor()));
1✔
435
  }
436

437
  /**
438
   * Execute activity by name.
439
   *
440
   * @param name name of the activity
441
   * @param resultClass activity return type
442
   * @param args list of activity arguments
443
   * @param <R> activity return type
444
   * @return activity result
445
   */
446
  public static <R> R executeActivity(
447
      String name, ActivityOptions options, Class<R> resultClass, Type resultType, Object... args) {
448
    Promise<R> result =
449
        getWorkflowOutboundInterceptor()
×
450
            .executeActivity(
×
451
                new WorkflowOutboundCallsInterceptor.ActivityInput<>(
452
                    name, resultClass, resultType, args, options, Header.empty()))
×
453
            .getResult();
×
454
    if (AsyncInternal.isAsync()) {
×
455
      AsyncInternal.setAsyncResult(result);
×
456
      return null; // ignored
×
457
    }
458
    return result.get();
×
459
  }
460

461
  public static void await(String reason, Supplier<Boolean> unblockCondition)
462
      throws DestroyWorkflowThreadError {
463
    getWorkflowOutboundInterceptor().await(reason, unblockCondition);
1✔
464
  }
1✔
465

466
  public static boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition)
467
      throws DestroyWorkflowThreadError {
468
    return getWorkflowOutboundInterceptor().await(timeout, reason, unblockCondition);
1✔
469
  }
470

471
  public static <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
472
    return getWorkflowOutboundInterceptor().sideEffect(resultClass, resultType, func);
1✔
473
  }
474

475
  public static <R> R mutableSideEffect(
476
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
477
    return getWorkflowOutboundInterceptor()
1✔
478
        .mutableSideEffect(id, resultClass, resultType, updated, func);
1✔
479
  }
480

481
  public static int getVersion(String changeId, int minSupported, int maxSupported) {
482
    return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported);
1✔
483
  }
484

485
  public static <V> Promise<Void> promiseAllOf(Iterable<Promise<V>> promises) {
486
    return new AllOfPromise(promises);
1✔
487
  }
488

489
  public static Promise<Void> promiseAllOf(Promise<?>... promises) {
490
    return new AllOfPromise(promises);
1✔
491
  }
492

493
  public static <V> Promise<V> promiseAnyOf(Iterable<Promise<V>> promises) {
494
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
495
  }
496

497
  public static Promise<Object> promiseAnyOf(Promise<?>... promises) {
498
    return CompletablePromiseImpl.promiseAnyOf(promises);
1✔
499
  }
500

501
  public static CancellationScope newCancellationScope(boolean detached, Runnable runnable) {
502
    return new CancellationScopeImpl(detached, runnable);
1✔
503
  }
504

505
  public static CancellationScope newCancellationScope(
506
      boolean detached, Functions.Proc1<CancellationScope> proc) {
507
    return new CancellationScopeImpl(detached, proc);
1✔
508
  }
509

510
  public static CancellationScopeImpl currentCancellationScope() {
511
    return CancellationScopeImpl.current();
1✔
512
  }
513

514
  public static RuntimeException wrap(Throwable e) {
515
    return CheckedExceptionWrapper.wrap(e);
1✔
516
  }
517

518
  public static Throwable unwrap(Throwable e) {
519
    return CheckedExceptionWrapper.unwrap(e);
1✔
520
  }
521

522
  /** Returns false if not under workflow code. */
523
  public static boolean isReplaying() {
524
    Optional<WorkflowThread> thread = DeterministicRunnerImpl.currentThreadInternalIfPresent();
1✔
525
    return thread.isPresent() && getRootWorkflowContext().isReplaying();
1✔
526
  }
527

528
  public static <T> T getMemo(String key, Class<T> valueClass, Type genericType) {
529
    Payload memo = getRootWorkflowContext().getReplayContext().getMemo(key);
1✔
530
    if (memo == null) {
1✔
531
      return null;
1✔
532
    }
533

534
    return getDataConverter().fromPayload(memo, valueClass, genericType);
1✔
535
  }
536

537
  public static <R> R retry(
538
      RetryOptions options, Optional<Duration> expiration, Functions.Func<R> fn) {
539
    return WorkflowRetryerInternal.retry(
1✔
540
        options.toBuilder().validateBuildWithDefaults(), expiration, fn);
1✔
541
  }
542

543
  public static void continueAsNew(
544
      @Nullable String workflowType, @Nullable ContinueAsNewOptions options, Object[] args) {
545
    getWorkflowOutboundInterceptor()
1✔
546
        .continueAsNew(
×
547
            new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
548
                workflowType, options, args, Header.empty()));
1✔
549
  }
×
550

551
  public static void continueAsNew(
552
      @Nullable String workflowType,
553
      @Nullable ContinueAsNewOptions options,
554
      Object[] args,
555
      WorkflowOutboundCallsInterceptor outboundCallsInterceptor) {
556
    outboundCallsInterceptor.continueAsNew(
1✔
557
        new WorkflowOutboundCallsInterceptor.ContinueAsNewInput(
558
            workflowType, options, args, Header.empty()));
1✔
559
  }
×
560

561
  public static Promise<Void> cancelWorkflow(WorkflowExecution execution) {
562
    return getWorkflowOutboundInterceptor()
×
563
        .cancelWorkflow(new WorkflowOutboundCallsInterceptor.CancelWorkflowInput(execution))
×
564
        .getResult();
×
565
  }
566

567
  public static void sleep(Duration duration) {
568
    getWorkflowOutboundInterceptor().sleep(duration);
1✔
569
  }
1✔
570

571
  public static boolean isWorkflowThread() {
572
    return WorkflowThreadMarker.isWorkflowThread();
1✔
573
  }
574

575
  public static <T> T deadlockDetectorOff(Functions.Func<T> func) {
576
    if (isWorkflowThread()) {
1✔
577
      try (NonIdempotentHandle ignored = getWorkflowThread().lockDeadlockDetector()) {
1✔
578
        return func.apply();
1✔
579
      }
580
    } else {
581
      return func.apply();
1✔
582
    }
583
  }
584

585
  public static WorkflowInfo getWorkflowInfo() {
586
    return new WorkflowInfoImpl(getRootWorkflowContext().getReplayContext());
1✔
587
  }
588

589
  public static Scope getMetricsScope() {
590
    return getRootWorkflowContext().getMetricsScope();
1✔
591
  }
592

593
  private static boolean isLoggingEnabledInReplay() {
594
    return getRootWorkflowContext().isLoggingEnabledInReplay();
×
595
  }
596

597
  public static UUID randomUUID() {
598
    return getRootWorkflowContext().randomUUID();
1✔
599
  }
600

601
  public static Random newRandom() {
602
    return getRootWorkflowContext().newRandom();
1✔
603
  }
604

605
  public static Logger getLogger(Class<?> clazz) {
606
    Logger logger = LoggerFactory.getLogger(clazz);
1✔
607
    return new ReplayAwareLogger(
1✔
608
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
609
  }
610

611
  public static Logger getLogger(String name) {
612
    Logger logger = LoggerFactory.getLogger(name);
×
613
    return new ReplayAwareLogger(
×
614
        logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
615
  }
616

617
  public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
618
    return getRootWorkflowContext().getLastCompletionResult(resultClass, resultType);
1✔
619
  }
620

621
  @Nullable
622
  public static <T> T getSearchAttribute(String name) {
623
    List<T> list = getSearchAttributeValues(name);
1✔
624
    if (list == null) {
1✔
625
      return null;
1✔
626
    }
627
    Preconditions.checkState(list.size() > 0);
1✔
628
    Preconditions.checkState(
1✔
629
        list.size() == 1,
1✔
630
        "search attribute with name '%s' contains a list '%s' of values instead of a single value",
631
        name,
632
        list);
633
    return list.get(0);
1✔
634
  }
635

636
  @Nullable
637
  public static <T> List<T> getSearchAttributeValues(String name) {
638
    SearchAttributes searchAttributes =
639
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
640
    if (searchAttributes == null) {
1✔
641
      return null;
×
642
    }
643
    List<T> decoded = SearchAttributesUtil.decode(searchAttributes, name);
1✔
644
    return decoded != null ? Collections.unmodifiableList(decoded) : null;
1✔
645
  }
646

647
  @Nonnull
648
  public static Map<String, List<?>> getSearchAttributes() {
649
    SearchAttributes searchAttributes =
650
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
651
    if (searchAttributes == null) {
1✔
652
      return Collections.emptyMap();
×
653
    }
654
    return Collections.unmodifiableMap(SearchAttributesUtil.decode(searchAttributes));
1✔
655
  }
656

657
  @Nonnull
658
  public static io.temporal.common.SearchAttributes getTypedSearchAttributes() {
659
    SearchAttributes searchAttributes =
660
        getRootWorkflowContext().getReplayContext().getSearchAttributes();
1✔
661
    return SearchAttributesUtil.decodeTyped(searchAttributes);
1✔
662
  }
663

664
  public static void upsertSearchAttributes(Map<String, ?> searchAttributes) {
665
    getWorkflowOutboundInterceptor().upsertSearchAttributes(searchAttributes);
1✔
666
  }
1✔
667

668
  public static void upsertTypedSearchAttributes(
669
      SearchAttributeUpdate<?>... searchAttributeUpdates) {
670
    getWorkflowOutboundInterceptor().upsertTypedSearchAttributes(searchAttributeUpdates);
1✔
671
  }
1✔
672

673
  public static DataConverter getDataConverter() {
674
    return getRootWorkflowContext().getDataConverter();
1✔
675
  }
676

677
  /**
678
   * Name of the workflow type the interface defines. It is either the interface short name * or
679
   * value of {@link WorkflowMethod#name()} parameter.
680
   *
681
   * @param workflowInterfaceClass interface annotated with @WorkflowInterface
682
   */
683
  public static String getWorkflowType(Class<?> workflowInterfaceClass) {
684
    POJOWorkflowInterfaceMetadata metadata =
1✔
685
        POJOWorkflowInterfaceMetadata.newInstance(workflowInterfaceClass);
1✔
686
    return metadata.getWorkflowType().get();
1✔
687
  }
688

689
  public static Optional<Exception> getPreviousRunFailure() {
690
    return Optional.ofNullable(getRootWorkflowContext().getReplayContext().getPreviousRunFailure())
1✔
691
        // Temporal Failure Values are additional user payload and serialized using user data
692
        // converter
693
        .map(f -> getDataConverter().failureToException(f));
1✔
694
  }
695

696
  private static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
697
    return getRootWorkflowContext().getWorkflowOutboundInterceptor();
1✔
698
  }
699

700
  static SyncWorkflowContext getRootWorkflowContext() {
701
    return DeterministicRunnerImpl.currentThreadInternal().getWorkflowContext();
1✔
702
  }
703

704
  private static WorkflowThread getWorkflowThread() {
705
    return DeterministicRunnerImpl.currentThreadInternal();
1✔
706
  }
707

708
  /** Prohibit instantiation */
709
  private WorkflowInternal() {}
710
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc