• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 16

16 Apr 2024 01:28AM UTC coverage: 60.239% (-0.1%) from 60.343%
16

push

buildkite

mstifflin
Remove unnecessary sidecar command, try executing with lower resources

11446 of 19001 relevant lines covered (60.24%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.5
/src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.sync;
19

20
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
21
import static com.uber.cadence.internal.sync.WorkflowInternal.CADENCE_CHANGE_VERSION;
22

23
import com.uber.cadence.ActivityType;
24
import com.uber.cadence.SearchAttributes;
25
import com.uber.cadence.WorkflowExecution;
26
import com.uber.cadence.WorkflowType;
27
import com.uber.cadence.activity.ActivityOptions;
28
import com.uber.cadence.activity.LocalActivityOptions;
29
import com.uber.cadence.common.RetryOptions;
30
import com.uber.cadence.context.ContextPropagator;
31
import com.uber.cadence.converter.DataConverter;
32
import com.uber.cadence.internal.common.InternalUtils;
33
import com.uber.cadence.internal.common.RetryParameters;
34
import com.uber.cadence.internal.replay.ActivityTaskFailedException;
35
import com.uber.cadence.internal.replay.ActivityTaskTimeoutException;
36
import com.uber.cadence.internal.replay.ChildWorkflowTaskFailedException;
37
import com.uber.cadence.internal.replay.ContinueAsNewWorkflowExecutionParameters;
38
import com.uber.cadence.internal.replay.DecisionContext;
39
import com.uber.cadence.internal.replay.ExecuteActivityParameters;
40
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
41
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
42
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
43
import com.uber.cadence.internal.tracing.TracingPropagator;
44
import com.uber.cadence.worker.WorkflowImplementationOptions;
45
import com.uber.cadence.workflow.ActivityException;
46
import com.uber.cadence.workflow.ActivityFailureException;
47
import com.uber.cadence.workflow.ActivityTimeoutException;
48
import com.uber.cadence.workflow.CancellationScope;
49
import com.uber.cadence.workflow.ChildWorkflowException;
50
import com.uber.cadence.workflow.ChildWorkflowFailureException;
51
import com.uber.cadence.workflow.ChildWorkflowOptions;
52
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
53
import com.uber.cadence.workflow.CompletablePromise;
54
import com.uber.cadence.workflow.ContinueAsNewOptions;
55
import com.uber.cadence.workflow.Functions;
56
import com.uber.cadence.workflow.Functions.Func;
57
import com.uber.cadence.workflow.Promise;
58
import com.uber.cadence.workflow.SignalExternalWorkflowException;
59
import com.uber.cadence.workflow.Workflow;
60
import com.uber.cadence.workflow.WorkflowInterceptor;
61
import com.uber.m3.tally.Scope;
62
import io.opentracing.Span;
63
import io.opentracing.Tracer;
64
import io.opentracing.noop.NoopTracerFactory;
65
import java.lang.reflect.Type;
66
import java.time.Duration;
67
import java.util.HashMap;
68
import java.util.List;
69
import java.util.Map;
70
import java.util.Objects;
71
import java.util.Optional;
72
import java.util.Random;
73
import java.util.UUID;
74
import java.util.concurrent.CancellationException;
75
import java.util.concurrent.TimeUnit;
76
import java.util.concurrent.atomic.AtomicReference;
77
import java.util.function.BiPredicate;
78
import java.util.function.Consumer;
79
import java.util.function.Function;
80
import java.util.function.Supplier;
81
import org.slf4j.Logger;
82
import org.slf4j.LoggerFactory;
83

84
final class SyncDecisionContext implements WorkflowInterceptor {
85

86
  private static final Logger log = LoggerFactory.getLogger(SyncDecisionContext.class);
1✔
87

88
  private final DecisionContext context;
89
  private DeterministicRunner runner;
90
  private final DataConverter converter;
91
  private final List<ContextPropagator> contextPropagators;
92
  private final WorkflowInterceptor headInterceptor;
93
  private final WorkflowTimers timers = new WorkflowTimers();
1✔
94
  private final Map<String, Functions.Func1<byte[], byte[]>> queryCallbacks = new HashMap<>();
1✔
95
  private final byte[] lastCompletionResult;
96
  private final WorkflowImplementationOptions workflowImplementationOptions;
97
  private final TracingPropagator tracingPropagator;
98

99
  public SyncDecisionContext(
100
      DecisionContext context,
101
      DataConverter converter,
102
      List<ContextPropagator> contextPropagators,
103
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
104
      byte[] lastCompletionResult,
105
      WorkflowImplementationOptions workflowImplementationOptions) {
106
    this(
1✔
107
        context,
108
        converter,
109
        contextPropagators,
110
        interceptorFactory,
111
        lastCompletionResult,
112
        workflowImplementationOptions,
113
        NoopTracerFactory.create());
1✔
114
  }
1✔
115

116
  public SyncDecisionContext(
117
      DecisionContext context,
118
      DataConverter converter,
119
      List<ContextPropagator> contextPropagators,
120
      Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
121
      byte[] lastCompletionResult,
122
      WorkflowImplementationOptions workflowImplementationOptions,
123
      Tracer tracer) {
1✔
124
    this.context = context;
1✔
125
    this.converter = converter;
1✔
126
    this.contextPropagators = contextPropagators;
1✔
127
    WorkflowInterceptor interceptor = interceptorFactory.apply(this);
1✔
128
    if (interceptor == null) {
1✔
129
      log.warn("WorkflowInterceptor factory returned null interceptor");
×
130
      interceptor = this;
×
131
    }
132
    this.headInterceptor = interceptor;
1✔
133
    this.lastCompletionResult = lastCompletionResult;
1✔
134
    this.workflowImplementationOptions = workflowImplementationOptions;
1✔
135
    this.tracingPropagator = new TracingPropagator(tracer);
1✔
136
  }
1✔
137

138
  /**
139
   * Using setter, as runner is initialized with this context, so it is not ready during
140
   * construction of this.
141
   */
142
  public void setRunner(DeterministicRunner runner) {
143
    this.runner = runner;
1✔
144
  }
1✔
145

146
  public DeterministicRunner getRunner() {
147
    return runner;
1✔
148
  }
149

150
  public WorkflowInterceptor getWorkflowInterceptor() {
151
    return headInterceptor;
1✔
152
  }
153

154
  @Override
155
  public byte[] executeWorkflow(
156
      SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
157
    Span span = tracingPropagator.activateSpanForExecuteWorkflow(context);
1✔
158
    try {
159
      return workflowDefinition.execute(input.getInput());
1✔
160
    } finally {
161
      span.finish();
1✔
162
    }
163
  }
164

165
  /**
166
   * Schedule an activity task for the provided activity name and input. The activity task is not
167
   * necessarily executed in the same thread.
168
   */
169
  @Override
170
  public <T> Promise<T> executeActivity(
171
      String activityName,
172
      Class<T> resultClass,
173
      Type resultType,
174
      Object[] args,
175
      ActivityOptions options) {
176
    RetryOptions retryOptions = options.getRetryOptions();
1✔
177
    // Replays a legacy history that used the client side retry correctly
178
    if (retryOptions != null && !context.isServerSideActivityRetry()) {
1✔
179
      return WorkflowRetryerInternal.retryAsync(
1✔
180
          retryOptions,
181
          () -> executeActivityOnce(activityName, options, args, resultClass, resultType));
1✔
182
    }
183
    return executeActivityOnce(activityName, options, args, resultClass, resultType);
1✔
184
  }
185

186
  private <T> Promise<T> executeActivityOnce(
187
      String name, ActivityOptions options, Object[] args, Class<T> returnClass, Type returnType) {
188
    byte[] input = converter.toData(args);
1✔
189
    Promise<byte[]> binaryResult = executeActivityOnce(name, options, input);
1✔
190
    if (returnClass == Void.TYPE) {
1✔
191
      return binaryResult.thenApply((r) -> null);
1✔
192
    }
193
    return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType));
1✔
194
  }
195

196
  private Promise<byte[]> executeActivityOnce(String name, ActivityOptions options, byte[] input) {
197
    ActivityCallback callback = new ActivityCallback();
1✔
198
    ExecuteActivityParameters params = constructExecuteActivityParameters(name, options, input);
1✔
199
    Consumer<Exception> cancellationCallback =
1✔
200
        context.scheduleActivityTask(params, callback::invoke);
1✔
201
    CancellationScope.current()
1✔
202
        .getCancellationRequest()
1✔
203
        .thenApply(
1✔
204
            (reason) -> {
205
              cancellationCallback.accept(new CancellationException(reason));
1✔
206
              return null;
1✔
207
            });
208
    return callback.result;
1✔
209
  }
210

211
  private class ActivityCallback {
1✔
212
    private CompletablePromise<byte[]> result = Workflow.newPromise();
1✔
213

214
    public void invoke(byte[] output, Exception failure) {
215
      if (failure != null) {
1✔
216
        runner.executeInWorkflowThread(
1✔
217
            "activity failure callback",
218
            () -> result.completeExceptionally(mapActivityException(failure)));
1✔
219
      } else {
220
        runner.executeInWorkflowThread(
1✔
221
            "activity completion callback", () -> result.complete(output));
1✔
222
      }
223
    }
1✔
224
  }
225

226
  private RuntimeException mapActivityException(Exception failure) {
227
    if (failure == null) {
1✔
228
      return null;
×
229
    }
230
    if (failure instanceof CancellationException) {
1✔
231
      return (CancellationException) failure;
1✔
232
    }
233
    if (failure instanceof ActivityTaskFailedException) {
1✔
234
      ActivityTaskFailedException taskFailed = (ActivityTaskFailedException) failure;
1✔
235
      String causeClassName = taskFailed.getReason();
1✔
236
      Class<? extends Exception> causeClass;
237
      Exception cause;
238
      try {
239
        @SuppressWarnings("unchecked") // cc is just to have a place to put this annotation
240
        Class<? extends Exception> cc = (Class<? extends Exception>) Class.forName(causeClassName);
1✔
241
        causeClass = cc;
1✔
242
        cause = getDataConverter().fromData(taskFailed.getDetails(), causeClass, causeClass);
1✔
243
      } catch (Exception e) {
×
244
        cause = e;
×
245
      }
1✔
246
      if (cause instanceof SimulatedTimeoutExceptionInternal) {
1✔
247
        // This exception is thrown only in unit tests to mock the activity timeouts
248
        SimulatedTimeoutExceptionInternal testTimeout = (SimulatedTimeoutExceptionInternal) cause;
1✔
249
        return new ActivityTimeoutException(
1✔
250
            taskFailed.getEventId(),
1✔
251
            taskFailed.getActivityType(),
1✔
252
            taskFailed.getActivityId(),
1✔
253
            testTimeout.getTimeoutType(),
1✔
254
            testTimeout.getDetails(),
1✔
255
            getDataConverter());
1✔
256
      }
257
      return new ActivityFailureException(
1✔
258
          taskFailed.getEventId(), taskFailed.getActivityType(), taskFailed.getActivityId(), cause);
1✔
259
    }
260
    if (failure instanceof ActivityTaskTimeoutException) {
1✔
261
      ActivityTaskTimeoutException timedOut = (ActivityTaskTimeoutException) failure;
1✔
262
      return new ActivityTimeoutException(
1✔
263
          timedOut.getEventId(),
1✔
264
          timedOut.getActivityType(),
1✔
265
          timedOut.getActivityId(),
1✔
266
          timedOut.getTimeoutType(),
1✔
267
          timedOut.getDetails(),
1✔
268
          getDataConverter());
1✔
269
    }
270
    if (failure instanceof ActivityException) {
1✔
271
      return (ActivityException) failure;
1✔
272
    }
273
    throw new IllegalArgumentException(
×
274
        "Unexpected exception type: " + failure.getClass().getName(), failure);
×
275
  }
276

277
  @Override
278
  public <R> Promise<R> executeLocalActivity(
279
      String activityName,
280
      Class<R> resultClass,
281
      Type resultType,
282
      Object[] args,
283
      LocalActivityOptions options) {
284
    if (options.getRetryOptions() != null) {
1✔
285
      options.getRetryOptions().validate();
1✔
286
    }
287

288
    long startTime = WorkflowInternal.currentTimeMillis();
1✔
289
    return WorkflowRetryerInternal.retryAsync(
1✔
290
        (attempt, currentStart) ->
291
            executeLocalActivityOnce(
1✔
292
                activityName,
293
                options,
294
                args,
295
                resultClass,
296
                resultType,
297
                currentStart - startTime,
1✔
298
                attempt),
1✔
299
        1,
300
        startTime);
301
  }
302

303
  private <T> Promise<T> executeLocalActivityOnce(
304
      String name,
305
      LocalActivityOptions options,
306
      Object[] args,
307
      Class<T> returnClass,
308
      Type returnType,
309
      long elapsed,
310
      int attempt) {
311
    byte[] input = converter.toData(args);
1✔
312
    Promise<byte[]> binaryResult = executeLocalActivityOnce(name, options, input, elapsed, attempt);
1✔
313
    if (returnClass == Void.TYPE) {
1✔
314
      return binaryResult.thenApply((r) -> null);
1✔
315
    }
316
    return binaryResult.thenApply((r) -> converter.fromData(r, returnClass, returnType));
1✔
317
  }
318

319
  private Promise<byte[]> executeLocalActivityOnce(
320
      String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
321
    ActivityCallback callback = new ActivityCallback();
1✔
322
    ExecuteLocalActivityParameters params =
1✔
323
        constructExecuteLocalActivityParameters(name, options, input, elapsed, attempt);
1✔
324
    Consumer<Exception> cancellationCallback =
1✔
325
        context.scheduleLocalActivityTask(params, callback::invoke);
1✔
326
    CancellationScope.current()
1✔
327
        .getCancellationRequest()
1✔
328
        .thenApply(
1✔
329
            (reason) -> {
330
              cancellationCallback.accept(new CancellationException(reason));
×
331
              return null;
×
332
            });
333
    return callback.result;
1✔
334
  }
335

336
  private ExecuteActivityParameters constructExecuteActivityParameters(
337
      String name, ActivityOptions options, byte[] input) {
338
    ExecuteActivityParameters parameters = new ExecuteActivityParameters();
1✔
339
    // TODO: Real task list
340
    String taskList = options.getTaskList();
1✔
341
    if (taskList == null) {
1✔
342
      taskList = context.getTaskList();
1✔
343
    }
344
    parameters
1✔
345
        .withActivityType(new ActivityType().setName(name))
1✔
346
        .withInput(input)
1✔
347
        .withTaskList(taskList)
1✔
348
        .withScheduleToStartTimeoutSeconds(options.getScheduleToStartTimeout().getSeconds())
1✔
349
        .withStartToCloseTimeoutSeconds(options.getStartToCloseTimeout().getSeconds())
1✔
350
        .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds())
1✔
351
        .setHeartbeatTimeoutSeconds(options.getHeartbeatTimeout().getSeconds());
1✔
352
    RetryOptions retryOptions = options.getRetryOptions();
1✔
353
    if (retryOptions != null) {
1✔
354
      parameters.setRetryParameters(new RetryParameters(retryOptions));
1✔
355
    }
356

357
    // Set the context value.  Use the context propagators from the ActivityOptions
358
    // if present, otherwise use the ones configured on the DecisionContext
359
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
360
    if (propagators == null) {
1✔
361
      propagators = this.contextPropagators;
1✔
362
    }
363
    parameters.setContext(extractContextsAndConvertToBytes(propagators));
1✔
364

365
    return parameters;
1✔
366
  }
367

368
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
369
      String name, LocalActivityOptions options, byte[] input, long elapsed, int attempt) {
370
    ExecuteLocalActivityParameters parameters =
1✔
371
        new ExecuteLocalActivityParameters()
372
            .withActivityType(new ActivityType().setName(name))
1✔
373
            .withInput(input)
1✔
374
            .withScheduleToCloseTimeoutSeconds(options.getScheduleToCloseTimeout().getSeconds());
1✔
375

376
    RetryOptions retryOptions = options.getRetryOptions();
1✔
377
    if (retryOptions != null) {
1✔
378
      parameters.setRetryOptions(retryOptions);
1✔
379
    }
380
    parameters.setAttempt(attempt);
1✔
381
    parameters.setElapsedTime(elapsed);
1✔
382
    parameters.setWorkflowDomain(this.context.getDomain());
1✔
383
    parameters.setWorkflowExecution(this.context.getWorkflowExecution());
1✔
384

385
    List<ContextPropagator> propagators =
1✔
386
        Optional.ofNullable(options.getContextPropagators()).orElse(contextPropagators);
1✔
387
    parameters.setContext(extractContextsAndConvertToBytes(propagators));
1✔
388

389
    return parameters;
1✔
390
  }
391

392
  @Override
393
  public <R> WorkflowResult<R> executeChildWorkflow(
394
      String workflowType,
395
      Class<R> returnClass,
396
      Type returnType,
397
      Object[] args,
398
      ChildWorkflowOptions options) {
399
    byte[] input = converter.toData(args);
1✔
400
    CompletablePromise<WorkflowExecution> execution = Workflow.newPromise();
1✔
401
    Promise<byte[]> output = executeChildWorkflow(workflowType, options, input, execution);
1✔
402
    Promise<R> result = output.thenApply((b) -> converter.fromData(b, returnClass, returnType));
1✔
403
    return new WorkflowResult<>(result, execution);
1✔
404
  }
405

406
  private Promise<byte[]> executeChildWorkflow(
407
      String name,
408
      ChildWorkflowOptions options,
409
      byte[] input,
410
      CompletablePromise<WorkflowExecution> executionResult) {
411
    RetryOptions retryOptions = options.getRetryOptions();
1✔
412
    // This condition is for backwards compatibility with the code that
413
    // used client side retry before the server side retry existed.
414
    if (retryOptions != null && !context.isServerSideChildWorkflowRetry()) {
1✔
415
      ChildWorkflowOptions o1 =
1✔
416
          new ChildWorkflowOptions.Builder()
417
              .setTaskList(options.getTaskList())
1✔
418
              .setExecutionStartToCloseTimeout(options.getExecutionStartToCloseTimeout())
1✔
419
              .setTaskStartToCloseTimeout(options.getTaskStartToCloseTimeout())
1✔
420
              .setWorkflowId(options.getWorkflowId())
1✔
421
              .setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
1✔
422
              .setMemo(options.getMemo())
1✔
423
              .setSearchAttributes(options.getSearchAttributes())
1✔
424
              .setParentClosePolicy(options.getParentClosePolicy())
1✔
425
              .build();
1✔
426
      return WorkflowRetryerInternal.retryAsync(
1✔
427
          retryOptions, () -> executeChildWorkflowOnce(name, o1, input, executionResult));
1✔
428
    }
429
    return executeChildWorkflowOnce(name, options, input, executionResult);
1✔
430
  }
431

432
  /** @param executionResult promise that is set bu this method when child workflow is started. */
433
  private Promise<byte[]> executeChildWorkflowOnce(
434
      String name,
435
      ChildWorkflowOptions options,
436
      byte[] input,
437
      CompletablePromise<WorkflowExecution> executionResult) {
438
    RetryParameters retryParameters = null;
1✔
439
    RetryOptions retryOptions = options.getRetryOptions();
1✔
440
    if (retryOptions != null) {
1✔
441
      retryParameters = new RetryParameters(retryOptions);
1✔
442
    }
443
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
444
    if (propagators == null) {
1✔
445
      propagators = this.contextPropagators;
1✔
446
    }
447

448
    StartChildWorkflowExecutionParameters parameters =
1✔
449
        new StartChildWorkflowExecutionParameters.Builder()
450
            .setWorkflowType(new WorkflowType().setName(name))
1✔
451
            .setWorkflowId(options.getWorkflowId())
1✔
452
            .setInput(input)
1✔
453
            .setExecutionStartToCloseTimeoutSeconds(
1✔
454
                options.getExecutionStartToCloseTimeout().getSeconds())
1✔
455
            .setDomain(options.getDomain())
1✔
456
            .setTaskList(options.getTaskList())
1✔
457
            .setTaskStartToCloseTimeoutSeconds(options.getTaskStartToCloseTimeout().getSeconds())
1✔
458
            .setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy())
1✔
459
            .setRetryParameters(retryParameters)
1✔
460
            .setCronSchedule(options.getCronSchedule())
1✔
461
            .setMemo(options.getMemo())
1✔
462
            .setSearchAttributes(options.getSearchAttributes())
1✔
463
            .setContext(extractContextsAndConvertToBytes(propagators))
1✔
464
            .setParentClosePolicy(options.getParentClosePolicy())
1✔
465
            .build();
1✔
466
    CompletablePromise<byte[]> result = Workflow.newPromise();
1✔
467
    Consumer<Exception> cancellationCallback =
1✔
468
        context.startChildWorkflow(
1✔
469
            parameters,
470
            (we) ->
471
                runner.executeInWorkflowThread(
1✔
472
                    "child workflow completion callback", () -> executionResult.complete(we)),
1✔
473
            (output, failure) -> {
474
              if (failure != null) {
1✔
475
                runner.executeInWorkflowThread(
1✔
476
                    "child workflow failure callback",
477
                    () -> result.completeExceptionally(mapChildWorkflowException(failure)));
1✔
478
              } else {
479
                runner.executeInWorkflowThread(
1✔
480
                    "child workflow completion callback", () -> result.complete(output));
1✔
481
              }
482
            });
1✔
483
    CancellationScope.current()
1✔
484
        .getCancellationRequest()
1✔
485
        .thenApply(
1✔
486
            (reason) -> {
487
              cancellationCallback.accept(new CancellationException(reason));
1✔
488
              return null;
1✔
489
            });
490
    return result;
1✔
491
  }
492

493
  private Map<String, byte[]> extractContextsAndConvertToBytes(
494
      List<ContextPropagator> contextPropagators) {
495
    if (contextPropagators == null) {
1✔
496
      return null;
×
497
    }
498
    Map<String, byte[]> result = new HashMap<>();
1✔
499
    for (ContextPropagator propagator : contextPropagators) {
1✔
500
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
501
    }
1✔
502
    // inject trace span context
503
    tracingPropagator.inject(result);
1✔
504
    return result;
1✔
505
  }
506

507
  private RuntimeException mapChildWorkflowException(Exception failure) {
508
    if (failure == null) {
1✔
509
      return null;
×
510
    }
511
    if (failure instanceof CancellationException) {
1✔
512
      return (CancellationException) failure;
1✔
513
    }
514
    if (failure instanceof ChildWorkflowException) {
1✔
515
      return (ChildWorkflowException) failure;
1✔
516
    }
517
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
518
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
519
    }
520
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
521
    String causeClassName = taskFailed.getReason();
1✔
522
    Exception cause;
523
    try {
524
      @SuppressWarnings("unchecked")
525
      Class<? extends Exception> causeClass =
1✔
526
          (Class<? extends Exception>) Class.forName(causeClassName);
1✔
527
      cause = getDataConverter().fromData(taskFailed.getDetails(), causeClass, causeClass);
1✔
528
    } catch (Exception e) {
×
529
      cause = e;
×
530
    }
1✔
531
    if (cause instanceof SimulatedTimeoutExceptionInternal) {
1✔
532
      // This exception is thrown only in unit tests to mock the child workflow timeouts
533
      return new ChildWorkflowTimedOutException(
1✔
534
          taskFailed.getEventId(), taskFailed.getWorkflowExecution(), taskFailed.getWorkflowType());
1✔
535
    }
536
    return new ChildWorkflowFailureException(
1✔
537
        taskFailed.getEventId(),
1✔
538
        taskFailed.getWorkflowExecution(),
1✔
539
        taskFailed.getWorkflowType(),
1✔
540
        cause);
541
  }
542

543
  @Override
544
  public Promise<Void> newTimer(Duration delay) {
545
    Objects.requireNonNull(delay);
1✔
546
    long delaySeconds = roundUpToSeconds(delay).getSeconds();
1✔
547
    if (delaySeconds < 0) {
1✔
548
      throw new IllegalArgumentException("negative delay");
×
549
    }
550
    if (delaySeconds == 0) {
1✔
551
      return Workflow.newPromise(null);
1✔
552
    }
553
    CompletablePromise<Void> timer = Workflow.newPromise();
1✔
554
    long fireTime = context.currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
1✔
555
    timers.addTimer(fireTime, timer);
1✔
556
    CancellationScope.current()
1✔
557
        .getCancellationRequest()
1✔
558
        .thenApply(
1✔
559
            (reason) -> {
560
              timers.removeTimer(fireTime, timer);
1✔
561
              timer.completeExceptionally(new CancellationException(reason));
1✔
562
              return null;
1✔
563
            });
564
    return timer;
1✔
565
  }
566

567
  @Override
568
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
569
    DataConverter dataConverter = getDataConverter();
1✔
570
    byte[] result =
1✔
571
        context.sideEffect(
1✔
572
            () -> {
573
              R r = func.apply();
1✔
574
              return dataConverter.toData(r);
1✔
575
            });
576
    return dataConverter.fromData(result, resultClass, resultType);
1✔
577
  }
578

579
  @Override
580
  public <R> R mutableSideEffect(
581
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
582
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
583
    // As lambda below never returns Optional.empty() if there is a stored value
584
    // it is safe to call get on mutableSideEffect result.
585
    Optional<byte[]> optionalBytes =
1✔
586
        context.mutableSideEffect(
1✔
587
            id,
588
            converter,
589
            (storedBinary) -> {
590
              Optional<R> stored =
1✔
591
                  storedBinary.map((b) -> converter.fromData(b, resultClass, resultType));
1✔
592
              R funcResult =
1✔
593
                  Objects.requireNonNull(
1✔
594
                      func.apply(), "mutableSideEffect function " + "returned null");
1✔
595
              if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
596
                unserializedResult.set(funcResult);
1✔
597
                return Optional.of(converter.toData(funcResult));
1✔
598
              }
599
              return Optional.empty(); // returned only when value doesn't need to be updated
1✔
600
            });
601
    if (!optionalBytes.isPresent()) {
1✔
602
      throw new IllegalArgumentException(
×
603
          "No value found for mutableSideEffectId="
604
              + id
605
              + ", during replay it usually indicates a different workflow runId than the original one");
606
    }
607
    byte[] binaryResult = optionalBytes.get();
1✔
608
    // An optimization that avoids unnecessary deserialization of the result.
609
    R unserialized = unserializedResult.get();
1✔
610
    if (unserialized != null) {
1✔
611
      return unserialized;
1✔
612
    }
613
    return converter.fromData(binaryResult, resultClass, resultType);
1✔
614
  }
615

616
  @Override
617
  public int getVersion(String changeID, int minSupported, int maxSupported) {
618
    return context.getVersion(changeID, converter, minSupported, maxSupported);
1✔
619
  }
620

621
  void fireTimers() {
622
    timers.fireTimers(context.currentTimeMillis());
1✔
623
  }
1✔
624

625
  boolean hasTimersToFire() {
626
    return timers.hasTimersToFire(context.currentTimeMillis());
1✔
627
  }
628

629
  long getNextFireTime() {
630
    return timers.getNextFireTime();
1✔
631
  }
632

633
  public byte[] query(String type, byte[] args) {
634
    Functions.Func1<byte[], byte[]> callback = queryCallbacks.get(type);
1✔
635
    if (callback == null) {
1✔
636
      throw new IllegalArgumentException(
×
637
          "Unknown query type: " + type + ", knownTypes=" + queryCallbacks.keySet());
×
638
    }
639
    return callback.apply(args);
1✔
640
  }
641

642
  @Override
643
  public void registerQuery(
644
      String queryType, Type[] argTypes, Functions.Func1<Object[], Object> callback) {
645
    //    if (queryCallbacks.containsKey(queryType)) {
646
    //      throw new IllegalStateException("Query \"" + queryType + "\" is already registered");
647
    //    }
648
    queryCallbacks.put(
1✔
649
        queryType,
650
        (input) -> {
651
          Object[] args = converter.fromDataArray(input, argTypes);
1✔
652
          Object result = callback.apply(args);
1✔
653
          return converter.toData(result);
1✔
654
        });
655
  }
1✔
656

657
  @Override
658
  public UUID randomUUID() {
659
    return context.randomUUID();
1✔
660
  }
661

662
  @Override
663
  public Random newRandom() {
664
    return context.newRandom();
1✔
665
  }
666

667
  public DataConverter getDataConverter() {
668
    return converter;
1✔
669
  }
670

671
  boolean isReplaying() {
672
    return context.isReplaying();
1✔
673
  }
674

675
  public DecisionContext getContext() {
676
    return context;
1✔
677
  }
678

679
  @Override
680
  public Promise<Void> signalExternalWorkflow(
681
      WorkflowExecution execution, String signalName, Object[] args) {
682
    return this.signalExternalWorkflow(null, execution, signalName, args);
1✔
683
  }
684

685
  @Override
686
  public Promise<Void> signalExternalWorkflow(
687
      String domain, WorkflowExecution execution, String signalName, Object[] args) {
688
    SignalExternalWorkflowParameters parameters = new SignalExternalWorkflowParameters();
1✔
689
    parameters.setSignalName(signalName);
1✔
690
    parameters.setWorkflowId(execution.getWorkflowId());
1✔
691
    parameters.setRunId(execution.getRunId());
1✔
692
    parameters.setDomain(domain);
1✔
693
    byte[] input = getDataConverter().toData(args);
1✔
694
    parameters.setInput(input);
1✔
695
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
696

697
    Consumer<Exception> cancellationCallback =
1✔
698
        context.signalWorkflowExecution(
1✔
699
            parameters,
700
            (output, failure) -> {
701
              if (failure != null) {
1✔
702
                runner.executeInWorkflowThread(
1✔
703
                    "child workflow failure callback",
704
                    () -> result.completeExceptionally(mapSignalWorkflowException(failure)));
1✔
705
              } else {
706
                runner.executeInWorkflowThread(
1✔
707
                    "child workflow completion callback", () -> result.complete(output));
1✔
708
              }
709
            });
1✔
710
    CancellationScope.current()
1✔
711
        .getCancellationRequest()
1✔
712
        .thenApply(
1✔
713
            (reason) -> {
714
              cancellationCallback.accept(new CancellationException(reason));
1✔
715
              return null;
1✔
716
            });
717
    return result;
1✔
718
  }
719

720
  @Override
721
  public void sleep(Duration duration) {
722
    WorkflowThread.await(
1✔
723
        duration.toMillis(),
1✔
724
        "sleep",
725
        () -> {
726
          CancellationScope.throwCancelled();
1✔
727
          return false;
1✔
728
        });
729
  }
1✔
730

731
  @Override
732
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
733
    return WorkflowThread.await(timeout.toMillis(), reason, unblockCondition);
1✔
734
  }
735

736
  @Override
737
  public void await(String reason, Supplier<Boolean> unblockCondition) {
738
    WorkflowThread.await(reason, unblockCondition);
1✔
739
  }
1✔
740

741
  @Override
742
  public void continueAsNew(
743
      Optional<String> workflowType, Optional<ContinueAsNewOptions> options, Object[] args) {
744
    ContinueAsNewWorkflowExecutionParameters parameters =
1✔
745
        new ContinueAsNewWorkflowExecutionParameters();
746
    if (workflowType.isPresent()) {
1✔
747
      parameters.setWorkflowType(workflowType.get());
1✔
748
    }
749
    if (options.isPresent()) {
1✔
750
      ContinueAsNewOptions ops = options.get();
1✔
751
      parameters.setExecutionStartToCloseTimeoutSeconds(
1✔
752
          (int) ops.getExecutionStartToCloseTimeout().getSeconds());
1✔
753
      parameters.setTaskStartToCloseTimeoutSeconds(
1✔
754
          (int) ops.getTaskStartToCloseTimeout().getSeconds());
1✔
755
      parameters.setTaskList(ops.getTaskList());
1✔
756
    }
757
    parameters.setInput(getDataConverter().toData(args));
1✔
758
    context.continueAsNewOnCompletion(parameters);
1✔
759
    WorkflowThread.exit(null);
×
760
  }
×
761

762
  @Override
763
  public Promise<Void> cancelWorkflow(WorkflowExecution execution) {
764
    return context.requestCancelWorkflowExecution(execution);
×
765
  }
766

767
  private RuntimeException mapSignalWorkflowException(Exception failure) {
768
    if (failure == null) {
1✔
769
      return null;
×
770
    }
771
    if (failure instanceof CancellationException) {
1✔
772
      return (CancellationException) failure;
1✔
773
    }
774

775
    if (!(failure instanceof SignalExternalWorkflowException)) {
1✔
776
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
777
    }
778
    return (SignalExternalWorkflowException) failure;
1✔
779
  }
780

781
  public Scope getMetricsScope() {
782
    return context.getMetricsScope();
1✔
783
  }
784

785
  public boolean isLoggingEnabledInReplay() {
786
    return context.getEnableLoggingInReplay();
1✔
787
  }
788

789
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
790
    if (lastCompletionResult == null || lastCompletionResult.length == 0) {
1✔
791
      return null;
1✔
792
    }
793

794
    DataConverter dataConverter = getDataConverter();
1✔
795
    return dataConverter.fromData(lastCompletionResult, resultClass, resultType);
1✔
796
  }
797

798
  @Override
799
  public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
800
    if (searchAttributes.isEmpty()) {
1✔
801
      throw new IllegalArgumentException("Empty search attributes");
1✔
802
    }
803

804
    if (searchAttributes.containsKey(CADENCE_CHANGE_VERSION)) {
1✔
805
      throw new IllegalArgumentException(
×
806
          "CadenceChangeVersion is a reserved key that cannot be set, please use other key");
807
    }
808

809
    SearchAttributes attr = InternalUtils.convertMapToSearchAttributes(searchAttributes);
1✔
810
    context.upsertSearchAttributes(attr);
1✔
811
  }
1✔
812

813
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
814
    return workflowImplementationOptions;
1✔
815
  }
816
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc