• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1855

pending completion
1855

push

buildkite

web-flow
3.9.0 release (#823)

11117 of 18414 relevant lines covered (60.37%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.37
/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
21
import static com.uber.cadence.internal.sync.WorkflowInternal.CADENCE_CHANGE_VERSION;
22

23
import com.uber.cadence.ActivityType;
24
import com.uber.cadence.SearchAttributes;
25
import com.uber.cadence.WorkflowExecution;
26
import com.uber.cadence.WorkflowType;
27
import com.uber.cadence.activity.ActivityOptions;
28
import com.uber.cadence.activity.LocalActivityOptions;
29
import com.uber.cadence.common.RetryOptions;
30
import com.uber.cadence.context.ContextPropagator;
31
import com.uber.cadence.converter.DataConverter;
32
import com.uber.cadence.internal.common.InternalUtils;
33
import com.uber.cadence.internal.common.RetryParameters;
34
import com.uber.cadence.internal.replay.ActivityTaskFailedException;
35
import com.uber.cadence.internal.replay.ActivityTaskTimeoutException;
36
import com.uber.cadence.internal.replay.ChildWorkflowTaskFailedException;
37
import com.uber.cadence.internal.replay.ContinueAsNewWorkflowExecutionParameters;
38
import com.uber.cadence.internal.replay.DecisionContext;
39
import com.uber.cadence.internal.replay.ExecuteActivityParameters;
40
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
41
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
42
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
43
import com.uber.cadence.worker.WorkflowImplementationOptions;
44
import com.uber.cadence.workflow.ActivityException;
45
import com.uber.cadence.workflow.ActivityFailureException;
46
import com.uber.cadence.workflow.ActivityTimeoutException;
47
import com.uber.cadence.workflow.CancellationScope;
48
import com.uber.cadence.workflow.ChildWorkflowException;
49
import com.uber.cadence.workflow.ChildWorkflowFailureException;
50
import com.uber.cadence.workflow.ChildWorkflowOptions;
51
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
52
import com.uber.cadence.workflow.CompletablePromise;
53
import com.uber.cadence.workflow.ContinueAsNewOptions;
54
import com.uber.cadence.workflow.Functions;
55
import com.uber.cadence.workflow.Functions.Func;
56
import com.uber.cadence.workflow.Promise;
57
import com.uber.cadence.workflow.SignalExternalWorkflowException;
58
import com.uber.cadence.workflow.Workflow;
59
import com.uber.cadence.workflow.WorkflowInterceptor;
60
import com.uber.m3.tally.Scope;
61
import java.lang.reflect.Type;
62
import java.time.Duration;
63
import java.util.HashMap;
64
import java.util.List;
65
import java.util.Map;
66
import java.util.Objects;
67
import java.util.Optional;
68
import java.util.Random;
69
import java.util.UUID;
70
import java.util.concurrent.CancellationException;
71
import java.util.concurrent.TimeUnit;
72
import java.util.concurrent.atomic.AtomicReference;
73
import java.util.function.BiPredicate;
74
import java.util.function.Consumer;
75
import java.util.function.Function;
76
import java.util.function.Supplier;
77
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
79

80
final class SyncDecisionContext implements WorkflowInterceptor {
81

82
  private static final Logger log = LoggerFactory.getLogger(SyncDecisionContext.class);
1✔
83

84
  private final DecisionContext context;
85
  private DeterministicRunner runner;
86
  private final DataConverter converter;
87
  private final List<ContextPropagator> contextPropagators;
88
  private final WorkflowInterceptor headInterceptor;
89
  private final WorkflowTimers timers = new WorkflowTimers();
1✔
90
  private final Map<String, Functions.Func1<byte[], byte[]>> queryCallbacks = new HashMap<>();
1✔
91
  private final byte[] lastCompletionResult;
92
  private final WorkflowImplementationOptions workflowImplementationOptions;
93

94
  public SyncDecisionContext(
95
      DecisionContext context,
96
      DataConverter converter,
97
      List<ContextPropagator> contextPropagators,
98
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
99
      byte[] lastCompletionResult,
100
      WorkflowImplementationOptions workflowImplementationOptions) {
1✔
101
    this.context = context;
1✔
102
    this.converter = converter;
1✔
103
    this.contextPropagators = contextPropagators;
1✔
104
    WorkflowInterceptor interceptor = interceptorFactory.apply(this);
1✔
105
    if (interceptor == null) {
1✔
106
      log.warn("WorkflowInterceptor factory returned null interceptor");
×
107
      interceptor = this;
×
108
    }
109
    this.headInterceptor = interceptor;
1✔
110
    this.lastCompletionResult = lastCompletionResult;
1✔
111
    this.workflowImplementationOptions = workflowImplementationOptions;
1✔
112
  }
1✔
113

114
  /**
115
   * Using setter, as runner is initialized with this context, so it is not ready during
116
   * construction of this.
117
   */
118
  public void setRunner(DeterministicRunner runner) {
119
    this.runner = runner;
1✔
120
  }
1✔
121

122
  public DeterministicRunner getRunner() {
123
    return runner;
1✔
124
  }
125

126
  public WorkflowInterceptor getWorkflowInterceptor() {
127
    return headInterceptor;
1✔
128
  }
129

130
  @Override
131
  public byte[] executeWorkflow(
132
      SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
133
    return workflowDefinition.execute(input.getInput());
1✔
134
  }
135

136
  @Override
137
  public <T> Promise<T> executeActivity(
138
      String activityName,
139
      Class<T> resultClass,
140
      Type resultType,
141
      Object[] args,
142
      ActivityOptions options) {
143
    RetryOptions retryOptions = options.getRetryOptions();
1✔
144
    // Replays a legacy history that used the client side retry correctly
145
    if (retryOptions != null && !context.isServerSideActivityRetry()) {
1✔
146
      return WorkflowRetryerInternal.retryAsync(
1✔
147
          retryOptions,
148
          () -> executeActivityOnce(activityName, options, args, resultClass, resultType));
1✔
149
    }
150
    return executeActivityOnce(activityName, options, args, resultClass, resultType);
1✔
151
  }
152

153
  private <T> Promise<T> executeActivityOnce(
154
      String name, ActivityOptions options, Object[] args, Class<T> returnClass, Type returnType) {
155
    byte[] input = converter.toData(args);
1✔
156
    Promise<byte[]> binaryResult = executeActivityOnce(name, options, input);
1✔
157
    if (returnClass == Void.TYPE) {
1✔
158
      return binaryResult.thenApply((r) -> null);
1✔
159
    }
160
    return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType));
1✔
161
  }
162

163
  private Promise<byte[]> executeActivityOnce(String name, ActivityOptions options, byte[] input) {
164
    ActivityCallback callback = new ActivityCallback();
1✔
165
    ExecuteActivityParameters params = constructExecuteActivityParameters(name, options, input);
1✔
166
    Consumer<Exception> cancellationCallback =
1✔
167
        context.scheduleActivityTask(params, callback::invoke);
1✔
168
    CancellationScope.current()
1✔
169
        .getCancellationRequest()
1✔
170
        .thenApply(
1✔
171
            (reason) -> {
172
              cancellationCallback.accept(new CancellationException(reason));
1✔
173
              return null;
1✔
174
            });
175
    return callback.result;
1✔
176
  }
177

178
  private class ActivityCallback {
1✔
179
    private CompletablePromise<byte[]> result = Workflow.newPromise();
1✔
180

181
    public void invoke(byte[] output, Exception failure) {
182
      if (failure != null) {
1✔
183
        runner.executeInWorkflowThread(
1✔
184
            "activity failure callback",
185
            () -> result.completeExceptionally(mapActivityException(failure)));
1✔
186
      } else {
187
        runner.executeInWorkflowThread(
1✔
188
            "activity completion callback", () -> result.complete(output));
1✔
189
      }
190
    }
1✔
191
  }
192

193
  private RuntimeException mapActivityException(Exception failure) {
194
    if (failure == null) {
1✔
195
      return null;
×
196
    }
197
    if (failure instanceof CancellationException) {
1✔
198
      return (CancellationException) failure;
1✔
199
    }
200
    if (failure instanceof ActivityTaskFailedException) {
1✔
201
      ActivityTaskFailedException taskFailed = (ActivityTaskFailedException) failure;
1✔
202
      String causeClassName = taskFailed.getReason();
1✔
203
      Class<? extends Exception> causeClass;
204
      Exception cause;
205
      try {
206
        @SuppressWarnings("unchecked") // cc is just to have a place to put this annotation
207
        Class<? extends Exception> cc = (Class<? extends Exception>) Class.forName(causeClassName);
1✔
208
        causeClass = cc;
1✔
209
        cause = getDataConverter().fromData(taskFailed.getDetails(), causeClass, causeClass);
1✔
210
      } catch (Exception e) {
×
211
        cause = e;
×
212
      }
1✔
213
      if (cause instanceof SimulatedTimeoutExceptionInternal) {
1✔
214
        // This exception is thrown only in unit tests to mock the activity timeouts
215
        SimulatedTimeoutExceptionInternal testTimeout = (SimulatedTimeoutExceptionInternal) cause;
1✔
216
        return new ActivityTimeoutException(
1✔
217
            taskFailed.getEventId(),
1✔
218
            taskFailed.getActivityType(),
1✔
219
            taskFailed.getActivityId(),
1✔
220
            testTimeout.getTimeoutType(),
1✔
221
            testTimeout.getDetails(),
1✔
222
            getDataConverter());
1✔
223
      }
224
      return new ActivityFailureException(
1✔
225
          taskFailed.getEventId(), taskFailed.getActivityType(), taskFailed.getActivityId(), cause);
1✔
226
    }
227
    if (failure instanceof ActivityTaskTimeoutException) {
1✔
228
      ActivityTaskTimeoutException timedOut = (ActivityTaskTimeoutException) failure;
1✔
229
      return new ActivityTimeoutException(
1✔
230
          timedOut.getEventId(),
1✔
231
          timedOut.getActivityType(),
1✔
232
          timedOut.getActivityId(),
1✔
233
          timedOut.getTimeoutType(),
1✔
234
          timedOut.getDetails(),
1✔
235
          getDataConverter());
1✔
236
    }
237
    if (failure instanceof ActivityException) {
1✔
238
      return (ActivityException) failure;
1✔
239
    }
240
    throw new IllegalArgumentException(
×
241
        "Unexpected exception type: " + failure.getClass().getName(), failure);
×
242
  }
243

244
  @Override
245
  public <R> Promise<R> executeLocalActivity(
246
      String activityName,
247
      Class<R> resultClass,
248
      Type resultType,
249
      Object[] args,
250
      LocalActivityOptions options) {
251
    if (options.getRetryOptions() != null) {
1✔
252
      options.getRetryOptions().validate();
1✔
253
    }
254

255
    long startTime = WorkflowInternal.currentTimeMillis();
1✔
256
    return WorkflowRetryerInternal.retryAsync(
1✔
257
        (attempt, currentStart) ->
258
            executeLocalActivityOnce(
1✔
259
                activityName,
260
                options,
261
                args,
262
                resultClass,
263
                resultType,
264
                currentStart - startTime,
1✔
265
                attempt),
1✔
266
        1,
267
        startTime);
268
  }
269

270
  private <T> Promise<T> executeLocalActivityOnce(
271
      String name,
272
      LocalActivityOptions options,
273
      Object[] args,
274
      Class<T> returnClass,
275
      Type returnType,
276
      long elapsed,
277
      int attempt) {
278
    byte[] input = converter.toData(args);
1✔
279
    Promise<byte[]> binaryResult = executeLocalActivityOnce(name, options, input, elapsed, attempt);
1✔
280
    if (returnClass == Void.TYPE) {
1✔
281
      return binaryResult.thenApply((r) -> null);
1✔
282
    }
283
    return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType));
1✔
284
  }
285

286
  private Promise<byte[]> executeLocalActivityOnce(
287
      String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
288
    ActivityCallback callback = new ActivityCallback();
1✔
289
    ExecuteLocalActivityParameters params =
1✔
290
        constructExecuteLocalActivityParameters(name, options, input, elapsed, attempt);
1✔
291
    Consumer<Exception> cancellationCallback =
1✔
292
        context.scheduleLocalActivityTask(params, callback::invoke);
1✔
293
    CancellationScope.current()
1✔
294
        .getCancellationRequest()
1✔
295
        .thenApply(
1✔
296
            (reason) -> {
297
              cancellationCallback.accept(new CancellationException(reason));
×
298
              return null;
×
299
            });
300
    return callback.result;
1✔
301
  }
302

303
  private ExecuteActivityParameters constructExecuteActivityParameters(
304
      String name, ActivityOptions options, byte[] input) {
305
    ExecuteActivityParameters parameters = new ExecuteActivityParameters();
1✔
306
    // TODO: Real task list
307
    String taskList = options.getTaskList();
1✔
308
    if (taskList == null) {
1✔
309
      taskList = context.getTaskList();
1✔
310
    }
311
    parameters
1✔
312
        .withActivityType(new ActivityType().setName(name))
1✔
313
        .withInput(input)
1✔
314
        .withTaskList(taskList)
1✔
315
        .withScheduleToStartTimeoutSeconds(options.getScheduleToStartTimeout().getSeconds())
1✔
316
        .withStartToCloseTimeoutSeconds(options.getStartToCloseTimeout().getSeconds())
1✔
317
        .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds())
1✔
318
        .setHeartbeatTimeoutSeconds(options.getHeartbeatTimeout().getSeconds());
1✔
319
    RetryOptions retryOptions = options.getRetryOptions();
1✔
320
    if (retryOptions != null) {
1✔
321
      parameters.setRetryParameters(new RetryParameters(retryOptions));
1✔
322
    }
323

324
    // Set the context value.  Use the context propagators from the ActivityOptions
325
    // if present, otherwise use the ones configured on the DecisionContext
326
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
327
    if (propagators == null) {
1✔
328
      propagators = this.contextPropagators;
1✔
329
    }
330
    parameters.setContext(extractContextsAndConvertToBytes(propagators));
1✔
331

332
    return parameters;
1✔
333
  }
334

335
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
336
      String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
337
    ExecuteLocalActivityParameters parameters =
1✔
338
        new ExecuteLocalActivityParameters()
339
            .withActivityType(new ActivityType().setName(name))
1✔
340
            .withInput(input)
1✔
341
            .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
1✔
342

343
    RetryOptions retryOptions = options.getRetryOptions();
1✔
344
    if (retryOptions != null) {
1✔
345
      parameters.setRetryOptions(retryOptions);
1✔
346
    }
347
    parameters.setAttempt(attempt);
1✔
348
    parameters.setElapsedTime(elapsed);
1✔
349
    parameters.setWorkflowDomain(this.context.getDomain());
1✔
350
    parameters.setWorkflowExecution(this.context.getWorkflowExecution());
1✔
351

352
    List<ContextPropagator> propagators =
1✔
353
        Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators);
1✔
354
    parameters.setContext(extractContextsAndConvertToBytes(propagators));
1✔
355

356
    return parameters;
1✔
357
  }
358

359
  @Override
360
  public <R> WorkflowResult<R> executeChildWorkflow(
361
      String workflowType,
362
      Class<R> returnClass,
363
      Type returnType,
364
      Object[] args,
365
      ChildWorkflowOptions options) {
366
    byte[] input = converter.toData(args);
1✔
367
    CompletablePromise<WorkflowExecution> execution = Workflow.newPromise();
1✔
368
    Promise<byte[]> output = executeChildWorkflow(workflowType, options, input, execution);
1✔
369
    Promise<R> result = output.thenApply((b) -> converter.fromData(b, returnClass, returnType));
1✔
370
    return new WorkflowResult<>(result, execution);
1✔
371
  }
372

373
  private Promise<byte[]> executeChildWorkflow(
374
      String name,
375
      ChildWorkflowOptions options,
376
      byte[] input,
377
      CompletablePromise<WorkflowExecution> executionResult) {
378
    RetryOptions retryOptions = options.getRetryOptions();
1✔
379
    // This condition is for backwards compatibility with the code that
380
    // used client side retry before the server side retry existed.
381
    if (retryOptions != null && !context.isServerSideChildWorkflowRetry()) {
1✔
382
      ChildWorkflowOptions o1 =
1✔
383
          new ChildWorkflowOptions.Builder()
384
              .setTaskList(options.getTaskList())
1✔
385
              .setExecutionStartToCloseTimeout(options.getExecutionStartToCloseTimeout())
1✔
386
              .setTaskStartToCloseTimeout(options.getTaskStartToCloseTimeout())
1✔
387
              .setWorkflowId(options.getWorkflowId())
1✔
388
              .setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
1✔
389
              .setMemo(options.getMemo())
1✔
390
              .setSearchAttributes(options.getSearchAttributes())
1✔
391
              .setParentClosePolicy(options.getParentClosePolicy())
1✔
392
              .build();
1✔
393
      return WorkflowRetryerInternal.retryAsync(
1✔
394
          retryOptions, () -> executeChildWorkflowOnce(name, o1, input, executionResult));
1✔
395
    }
396
    return executeChildWorkflowOnce(name, options, input, executionResult);
1✔
397
  }
398

399
  /** @param executionResult promise that is set bu this method when child workflow is started. */
400
  private Promise<byte[]> executeChildWorkflowOnce(
401
      String name,
402
      ChildWorkflowOptions options,
403
      byte[] input,
404
      CompletablePromise<WorkflowExecution> executionResult) {
405
    RetryParameters retryParameters = null;
1✔
406
    RetryOptions retryOptions = options.getRetryOptions();
1✔
407
    if (retryOptions != null) {
1✔
408
      retryParameters = new RetryParameters(retryOptions);
1✔
409
    }
410
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
411
    if (propagators == null) {
1✔
412
      propagators = this.contextPropagators;
1✔
413
    }
414

415
    StartChildWorkflowExecutionParameters parameters =
1✔
416
        new StartChildWorkflowExecutionParameters.Builder()
417
            .setWorkflowType(new WorkflowType().setName(name))
1✔
418
            .setWorkflowId(options.getWorkflowId())
1✔
419
            .setInput(input)
1✔
420
            .setExecutionStartToCloseTimeoutSeconds(
1✔
421
                options.getExecutionStartToCloseTimeout().getSeconds())
1✔
422
            .setDomain(options.getDomain())
1✔
423
            .setTaskList(options.getTaskList())
1✔
424
            .setTaskStartToCloseTimeoutSeconds(options.getTaskStartToCloseTimeout().getSeconds())
1✔
425
            .setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
1✔
426
            .setRetryParameters(retryParameters)
1✔
427
            .setCronSchedule(options.getCronSchedule())
1✔
428
            .setMemo(options.getMemo())
1✔
429
            .setSearchAttributes(options.getSearchAttributes())
1✔
430
            .setContext(extractContextsAndConvertToBytes(propagators))
1✔
431
            .setParentClosePolicy(options.getParentClosePolicy())
1✔
432
            .build();
1✔
433
    CompletablePromise<byte[]> result = Workflow.newPromise();
1✔
434
    Consumer<Exception> cancellationCallback =
1✔
435
        context.startChildWorkflow(
1✔
436
            parameters,
437
            (we) ->
438
                runner.executeInWorkflowThread(
1✔
439
                    "child workflow completion callback", () -> executionResult.complete(we)),
1✔
440
            (output, failure) -> {
441
              if (failure != null) {
1✔
442
                runner.executeInWorkflowThread(
1✔
443
                    "child workflow failure callback",
444
                    () -> result.completeExceptionally(mapChildWorkflowException(failure)));
1✔
445
              } else {
446
                runner.executeInWorkflowThread(
1✔
447
                    "child workflow completion callback", () -> result.complete(output));
1✔
448
              }
449
            });
1✔
450
    CancellationScope.current()
1✔
451
        .getCancellationRequest()
1✔
452
        .thenApply(
1✔
453
            (reason) -> {
454
              cancellationCallback.accept(new CancellationException(reason));
1✔
455
              return null;
1✔
456
            });
457
    return result;
1✔
458
  }
459

460
  private Map<String, byte[]> extractContextsAndConvertToBytes(
461
      List<ContextPropagator> contextPropagators) {
462
    if (contextPropagators == null) {
1✔
463
      return null;
×
464
    }
465
    Map<String, byte[]> result = new HashMap<>();
1✔
466
    for (ContextPropagator propagator : contextPropagators) {
1✔
467
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
468
    }
1✔
469
    return result;
1✔
470
  }
471

472
  private RuntimeException mapChildWorkflowException(Exception failure) {
473
    if (failure == null) {
1✔
474
      return null;
×
475
    }
476
    if (failure instanceof CancellationException) {
1✔
477
      return (CancellationException) failure;
1✔
478
    }
479
    if (failure instanceof ChildWorkflowException) {
1✔
480
      return (ChildWorkflowException) failure;
1✔
481
    }
482
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
483
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
484
    }
485
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
486
    String causeClassName = taskFailed.getReason();
1✔
487
    Exception cause;
488
    try {
489
      @SuppressWarnings("unchecked")
490
      Class<? extends Exception> causeClass =
1✔
491
          (Class<? extends Exception>) Class.forName(causeClassName);
1✔
492
      cause = getDataConverter().fromData(taskFailed.getDetails(), causeClass, causeClass);
1✔
493
    } catch (Exception e) {
×
494
      cause = e;
×
495
    }
1✔
496
    if (cause instanceof SimulatedTimeoutExceptionInternal) {
1✔
497
      // This exception is thrown only in unit tests to mock the child workflow timeouts
498
      return new ChildWorkflowTimedOutException(
1✔
499
          taskFailed.getEventId(), taskFailed.getWorkflowExecution(), taskFailed.getWorkflowType());
1✔
500
    }
501
    return new ChildWorkflowFailureException(
1✔
502
        taskFailed.getEventId(),
1✔
503
        taskFailed.getWorkflowExecution(),
1✔
504
        taskFailed.getWorkflowType(),
1✔
505
        cause);
506
  }
507

508
  @Override
509
  public Promise<Void> newTimer(Duration delay) {
510
    Objects.requireNonNull(delay);
1✔
511
    long delaySeconds = roundUpToSeconds(delay).getSeconds();
1✔
512
    if (delaySeconds < 0) {
1✔
513
      throw new IllegalArgumentException("negative delay");
×
514
    }
515
    if (delaySeconds == 0) {
1✔
516
      return Workflow.newPromise(null);
1✔
517
    }
518
    CompletablePromise<Void> timer = Workflow.newPromise();
1✔
519
    long fireTime = context.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
1✔
520
    timers.addTimer(fireTime, timer);
1✔
521
    CancellationScope.current()
1✔
522
        .getCancellationRequest()
1✔
523
        .thenApply(
1✔
524
            (reason) -> {
525
              timers.removeTimer(fireTime, timer);
1✔
526
              timer.completeExceptionally(new CancellationException(reason));
1✔
527
              return null;
1✔
528
            });
529
    return timer;
1✔
530
  }
531

532
  @Override
533
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
534
    DataConverter dataConverter = getDataConverter();
1✔
535
    byte[] result =
1✔
536
        context.sideEffect(
1✔
537
            () -> {
538
              R r = func.apply();
1✔
539
              return dataConverter.toData(r);
1✔
540
            });
541
    return dataConverter.fromData(result, resultClass, resultType);
1✔
542
  }
543

544
  @Override
545
  public <R> R mutableSideEffect(
546
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
547
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
548
    // As lambda below never returns Optional.empty() if there is a stored value
549
    // it is safe to call get on mutableSideEffect result.
550
    Optional<byte[]> optionalBytes =
1✔
551
        context.mutableSideEffect(
1✔
552
            id,
553
            converter,
554
            (storedBinary) -> {
555
              Optional<R> stored =
1✔
556
                  storedBinary.map((b) -> converter.fromData(b, resultClass, resultType));
1✔
557
              R funcResult =
1✔
558
                  Objects.requireNonNull(
1✔
559
                      func.apply(), "mutableSideEffect function " + "returned null");
1✔
560
              if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
561
                unserializedResult.set(funcResult);
1✔
562
                return Optional.of(converter.toData(funcResult));
1✔
563
              }
564
              return Optional.empty(); // returned only when value doesn't need to be updated
1✔
565
            });
566
    if (!optionalBytes.isPresent()) {
1✔
567
      throw new IllegalArgumentException(
×
568
          "No value found for mutableSideEffectId="
569
              + id
570
              + ", during replay it usually indicates a different workflow runId than the original one");
571
    }
572
    byte[] binaryResult = optionalBytes.get();
1✔
573
    // An optimization that avoids unnecessary deserialization of the result.
574
    R unserialized = unserializedResult.get();
1✔
575
    if (unserialized != null) {
1✔
576
      return unserialized;
1✔
577
    }
578
    return converter.fromData(binaryResult, resultClass, resultType);
1✔
579
  }
580

581
  @Override
582
  public int getVersion(String changeID, int minSupported, int maxSupported) {
583
    return context.getVersion(changeID, converter, minSupported, maxSupported);
1✔
584
  }
585

586
  void fireTimers() {
587
    timers.fireTimers(context.currentTimeMillis());
1✔
588
  }
1✔
589

590
  boolean hasTimersToFire() {
591
    return timers.hasTimersToFire(context.currentTimeMillis());
1✔
592
  }
593

594
  long getNextFireTime() {
595
    return timers.getNextFireTime();
1✔
596
  }
597

598
  public byte[] query(String type, byte[] args) {
599
    Functions.Func1<byte[], byte[]> callback = queryCallbacks.get(type);
1✔
600
    if (callback == null) {
1✔
601
      throw new IllegalArgumentException(
×
602
          "Unknown query type: " + type + ", knownTypes=" + queryCallbacks.keySet());
×
603
    }
604
    return callback.apply(args);
1✔
605
  }
606

607
  @Override
608
  public void registerQuery(
609
      String queryType, Type[] argTypes, Functions.Func1<Object[], Object> callback) {
610
    //    if (queryCallbacks.containsKey(queryType)) {
611
    //      throw new IllegalStateException("Query \"" + queryType + "\" is already registered");
612
    //    }
613
    queryCallbacks.put(
1✔
614
        queryType,
615
        (input) -> {
616
          Object[] args = converter.fromDataArray(input, argTypes);
1✔
617
          Object result = callback.apply(args);
1✔
618
          return converter.toData(result);
1✔
619
        });
620
  }
1✔
621

622
  @Override
623
  public UUID randomUUID() {
624
    return context.randomUUID();
1✔
625
  }
626

627
  @Override
628
  public Random newRandom() {
629
    return context.newRandom();
1✔
630
  }
631

632
  public DataConverter getDataConverter() {
633
    return converter;
1✔
634
  }
635

636
  boolean isReplaying() {
637
    return context.isReplaying();
1✔
638
  }
639

640
  public DecisionContext getContext() {
641
    return context;
1✔
642
  }
643

644
  @Override
645
  public Promise<Void> signalExternalWorkflow(
646
      WorkflowExecution execution, String signalName, Object[] args) {
647
    return this.signalExternalWorkflow(null, execution, signalName, args);
1✔
648
  }
649

650
  @Override
651
  public Promise<Void> signalExternalWorkflow(
652
      String domain, WorkflowExecution execution, String signalName, Object[] args) {
653
    SignalExternalWorkflowParameters parameters = new SignalExternalWorkflowParameters();
1✔
654
    parameters.setSignalName(signalName);
1✔
655
    parameters.setWorkflowId(execution.getWorkflowId());
1✔
656
    parameters.setRunId(execution.getRunId());
1✔
657
    parameters.setDomain(domain);
1✔
658
    byte[] input = getDataConverter().toData(args);
1✔
659
    parameters.setInput(input);
1✔
660
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
661

662
    Consumer<Exception> cancellationCallback =
1✔
663
        context.signalWorkflowExecution(
1✔
664
            parameters,
665
            (output, failure) -> {
666
              if (failure != null) {
1✔
667
                runner.executeInWorkflowThread(
1✔
668
                    "child workflow failure callback",
669
                    () -> result.completeExceptionally(mapSignalWorkflowException(failure)));
1✔
670
              } else {
671
                runner.executeInWorkflowThread(
1✔
672
                    "child workflow completion callback", () -> result.complete(output));
1✔
673
              }
674
            });
1✔
675
    CancellationScope.current()
1✔
676
        .getCancellationRequest()
1✔
677
        .thenApply(
1✔
678
            (reason) -> {
679
              cancellationCallback.accept(new CancellationException(reason));
1✔
680
              return null;
1✔
681
            });
682
    return result;
1✔
683
  }
684

685
  @Override
686
  public void sleep(Duration duration) {
687
    WorkflowThread.await(
1✔
688
        duration.toMillis(),
1✔
689
        "sleep",
690
        () -> {
691
          CancellationScope.throwCancelled();
1✔
692
          return false;
1✔
693
        });
694
  }
1✔
695

696
  @Override
697
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
698
    return WorkflowThread.await(timeout.toMillis(), reason, unblockCondition);
1✔
699
  }
700

701
  @Override
702
  public void await(String reason, Supplier<Boolean> unblockCondition) {
703
    WorkflowThread.await(reason, unblockCondition);
1✔
704
  }
1✔
705

706
  @Override
707
  public void continueAsNew(
708
      Optional<String> workflowType, Optional<ContinueAsNewOptions> options, Object[] args) {
709
    ContinueAsNewWorkflowExecutionParameters parameters =
1✔
710
        new ContinueAsNewWorkflowExecutionParameters();
711
    if (workflowType.isPresent()) {
1✔
712
      parameters.setWorkflowType(workflowType.get());
1✔
713
    }
714
    if (options.isPresent()) {
1✔
715
      ContinueAsNewOptions ops = options.get();
1✔
716
      parameters.setExecutionStartToCloseTimeoutSeconds(
1✔
717
          (int) ops.getExecutionStartToCloseTimeout().getSeconds());
1✔
718
      parameters.setTaskStartToCloseTimeoutSeconds(
1✔
719
          (int) ops.getTaskStartToCloseTimeout().getSeconds());
1✔
720
      parameters.setTaskList(ops.getTaskList());
1✔
721
    }
722
    parameters.setInput(getDataConverter().toData(args));
1✔
723
    context.continueAsNewOnCompletion(parameters);
1✔
724
    WorkflowThread.exit(null);
×
725
  }
×
726

727
  @Override
728
  public Promise<Void> cancelWorkflow(WorkflowExecution execution) {
729
    return context.requestCancelWorkflowExecution(execution);
×
730
  }
731

732
  private RuntimeException mapSignalWorkflowException(Exception failure) {
733
    if (failure == null) {
1✔
734
      return null;
×
735
    }
736
    if (failure instanceof CancellationException) {
1✔
737
      return (CancellationException) failure;
1✔
738
    }
739

740
    if (!(failure instanceof SignalExternalWorkflowException)) {
1✔
741
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
742
    }
743
    return (SignalExternalWorkflowException) failure;
1✔
744
  }
745

746
  public Scope getMetricsScope() {
747
    return context.getMetricsScope();
1✔
748
  }
749

750
  public boolean isLoggingEnabledInReplay() {
751
    return context.getEnableLoggingInReplay();
1✔
752
  }
753

754
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
755
    if (lastCompletionResult == null || lastCompletionResult.length == 0) {
1✔
756
      return null;
1✔
757
    }
758

759
    DataConverter dataConverter = getDataConverter();
1✔
760
    return dataConverter.fromData(lastCompletionResult, resultClass, resultType);
1✔
761
  }
762

763
  @Override
764
  public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
765
    if (searchAttributes.isEmpty()) {
1✔
766
      throw new IllegalArgumentException("Empty search attributes");
1✔
767
    }
768

769
    if (searchAttributes.containsKey(CADENCE_CHANGE_VERSION)) {
1✔
770
      throw new IllegalArgumentException(
×
771
          "CadenceChangeVersion is a reserved key that cannot be set, please use other key");
772
    }
773

774
    SearchAttributes attr = InternalUtils.convertMapToSearchAttributes(searchAttributes);
1✔
775
    context.upsertSearchAttributes(attr);
1✔
776
  }
1✔
777

778
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
779
    return workflowImplementationOptions;
1✔
780
  }
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc