• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #157

pending completion
#157

push

github-actions

web-flow
Provide SerializationContext for PayloadConverter and PayloadCodec (#1695)

Issue #1694

497 of 497 new or added lines in 32 files covered. (100.0%)

16942 of 20806 relevant lines covered (81.43%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
26

27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.activity.ActivityOptions;
31
import io.temporal.activity.LocalActivityOptions;
32
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
33
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
36
import io.temporal.api.common.v1.ActivityType;
37
import io.temporal.api.common.v1.Memo;
38
import io.temporal.api.common.v1.Payload;
39
import io.temporal.api.common.v1.Payloads;
40
import io.temporal.api.common.v1.SearchAttributes;
41
import io.temporal.api.common.v1.WorkflowExecution;
42
import io.temporal.api.common.v1.WorkflowType;
43
import io.temporal.api.enums.v1.ParentClosePolicy;
44
import io.temporal.api.failure.v1.Failure;
45
import io.temporal.api.history.v1.HistoryEvent;
46
import io.temporal.api.taskqueue.v1.TaskQueue;
47
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
48
import io.temporal.client.WorkflowException;
49
import io.temporal.common.RetryOptions;
50
import io.temporal.common.context.ContextPropagator;
51
import io.temporal.common.converter.DataConverter;
52
import io.temporal.common.interceptors.Header;
53
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
54
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
55
import io.temporal.failure.*;
56
import io.temporal.internal.common.ActivityOptionUtils;
57
import io.temporal.internal.common.OptionsUtils;
58
import io.temporal.internal.common.ProtobufTimeUtils;
59
import io.temporal.internal.common.SearchAttributesUtil;
60
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
61
import io.temporal.internal.replay.ReplayWorkflowContext;
62
import io.temporal.internal.replay.WorkflowContext;
63
import io.temporal.internal.statemachines.*;
64
import io.temporal.payload.context.ActivitySerializationContext;
65
import io.temporal.payload.context.WorkflowSerializationContext;
66
import io.temporal.worker.WorkflowImplementationOptions;
67
import io.temporal.workflow.CancellationScope;
68
import io.temporal.workflow.ChildWorkflowOptions;
69
import io.temporal.workflow.CompletablePromise;
70
import io.temporal.workflow.ContinueAsNewOptions;
71
import io.temporal.workflow.Functions;
72
import io.temporal.workflow.Functions.Func;
73
import io.temporal.workflow.Promise;
74
import io.temporal.workflow.Workflow;
75
import java.lang.reflect.Type;
76
import java.time.Duration;
77
import java.time.Instant;
78
import java.util.*;
79
import java.util.concurrent.atomic.AtomicBoolean;
80
import java.util.concurrent.atomic.AtomicReference;
81
import java.util.function.BiPredicate;
82
import java.util.function.Supplier;
83
import javax.annotation.Nonnull;
84
import javax.annotation.Nullable;
85
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
87

88
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
89
// RootWorkflowOutboundInterceptor
90
/**
91
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
92
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
93
 * definition on the worker meets ReplayWorkflowContext which contains information from the
94
 * WorkflowTask
95
 */
96
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
97
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
98

99
  private final String namespace;
100
  private final WorkflowExecution workflowExecution;
101
  private final WorkflowImplementationOptions workflowImplementationOptions;
102
  private final DataConverter dataConverter;
103
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
104
  private final DataConverter dataConverterWithCurrentWorkflowContext;
105
  private final List<ContextPropagator> contextPropagators;
106
  private final SignalDispatcher signalDispatcher;
107
  private final QueryDispatcher queryDispatcher;
108

109
  // initialized later when these entities are created
110
  private ReplayWorkflowContext replayContext;
111
  private DeterministicRunner runner;
112

113
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
114
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
115

116
  private ActivityOptions defaultActivityOptions = null;
1✔
117
  private Map<String, ActivityOptions> activityOptionsMap;
118
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
119
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
120

121
  public SyncWorkflowContext(
122
      @Nonnull String namespace,
123
      @Nonnull WorkflowExecution workflowExecution,
124
      SignalDispatcher signalDispatcher,
125
      QueryDispatcher queryDispatcher,
126
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
127
      DataConverter dataConverter,
128
      List<ContextPropagator> contextPropagators) {
1✔
129
    this.namespace = namespace;
1✔
130
    this.workflowExecution = workflowExecution;
1✔
131
    this.dataConverter = dataConverter;
1✔
132
    this.dataConverterWithCurrentWorkflowContext =
1✔
133
        dataConverter.withContext(
1✔
134
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
135
    this.contextPropagators = contextPropagators;
1✔
136
    this.signalDispatcher = signalDispatcher;
1✔
137
    this.queryDispatcher = queryDispatcher;
1✔
138
    if (workflowImplementationOptions != null) {
1✔
139
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
140
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
141
      this.defaultLocalActivityOptions =
1✔
142
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
143
      this.localActivityOptionsMap =
1✔
144
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
145
    }
146
    this.workflowImplementationOptions =
1✔
147
        workflowImplementationOptions == null
1✔
148
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
149
            : workflowImplementationOptions;
1✔
150
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
151
    // with actual interceptors through #initHeadInboundCallsInterceptor and
152
    // #initHeadOutboundCallsInterceptor during initialization phase.
153
    // See workflow.initialize() performed inside the workflow root thread inside
154
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
155
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
156
    this.headOutboundInterceptor = this;
1✔
157
  }
1✔
158

159
  public void setReplayContext(ReplayWorkflowContext context) {
160
    this.replayContext = context;
1✔
161
  }
1✔
162

163
  /**
164
   * Using setter, as runner is initialized with this context, so it is not ready during
165
   * construction of this.
166
   */
167
  public void setRunner(DeterministicRunner runner) {
168
    this.runner = runner;
1✔
169
  }
1✔
170

171
  public DeterministicRunner getRunner() {
172
    return runner;
×
173
  }
174

175
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
176
    return headOutboundInterceptor;
1✔
177
  }
178

179
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
180
    return headInboundInterceptor;
1✔
181
  }
182

183
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
184
    headOutboundInterceptor = head;
1✔
185
  }
1✔
186

187
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
188
    headInboundInterceptor = head;
1✔
189
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
190
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
191
  }
1✔
192

193
  public ActivityOptions getDefaultActivityOptions() {
194
    return defaultActivityOptions;
1✔
195
  }
196

197
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
198
    return activityOptionsMap != null
1✔
199
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
200
        : Collections.emptyMap();
×
201
  }
202

203
  public LocalActivityOptions getDefaultLocalActivityOptions() {
204
    return defaultLocalActivityOptions;
1✔
205
  }
206

207
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
208
    return localActivityOptionsMap != null
1✔
209
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
210
        : Collections.emptyMap();
×
211
  }
212

213
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
214
    this.defaultActivityOptions =
1✔
215
        (this.defaultActivityOptions == null)
1✔
216
            ? defaultActivityOptions
×
217
            : this.defaultActivityOptions.toBuilder()
1✔
218
                .mergeActivityOptions(defaultActivityOptions)
1✔
219
                .build();
1✔
220
  }
1✔
221

222
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
223
    Objects.requireNonNull(activityTypeToOption);
1✔
224
    if (this.activityOptionsMap == null) {
1✔
225
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
226
      return;
×
227
    }
228
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
229
  }
1✔
230

231
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
232
    this.defaultLocalActivityOptions =
1✔
233
        (this.defaultLocalActivityOptions == null)
1✔
234
            ? defaultLocalActivityOptions
×
235
            : this.defaultLocalActivityOptions.toBuilder()
1✔
236
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
237
                .build();
1✔
238
  }
1✔
239

240
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
241
    Objects.requireNonNull(activityTypeToOption);
1✔
242
    if (this.localActivityOptionsMap == null) {
1✔
243
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
244
      return;
×
245
    }
246
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
247
        localActivityOptionsMap, activityTypeToOption);
248
  }
1✔
249

250
  @Override
251
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
252
    ActivitySerializationContext serializationContext =
1✔
253
        new ActivitySerializationContext(
254
            replayContext.getNamespace(),
1✔
255
            replayContext.getWorkflowId(),
1✔
256
            replayContext.getWorkflowType().getName(),
1✔
257
            input.getActivityName(),
1✔
258
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
259
            // by the Server in this case
260
            MoreObjects.firstNonNull(
1✔
261
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
262
            false);
263
    DataConverter dataConverterWithActivityContext =
1✔
264
        dataConverter.withContext(serializationContext);
1✔
265
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
266

267
    ActivityOutput<Optional<Payloads>> output =
1✔
268
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
269

270
    return new ActivityOutput<>(
1✔
271
        output.getActivityId(),
1✔
272
        output
273
            .getResult()
1✔
274
            .handle(
1✔
275
                (r, f) -> {
276
                  if (f == null) {
1✔
277
                    return input.getResultType() != Void.TYPE
1✔
278
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
279
                            0, r, input.getResultClass(), input.getResultType())
1✔
280
                        : null;
1✔
281
                  } else {
282
                    throw dataConverterWithActivityContext.failureToException(
1✔
283
                        ((FailureWrapperException) f).getFailure());
1✔
284
                  }
285
                }));
286
  }
287

288
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
289
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
290
    ExecuteActivityParameters params =
1✔
291
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
292
    ActivityCallback callback = new ActivityCallback();
1✔
293
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
294
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
295
    CancellationScope.current()
1✔
296
        .getCancellationRequest()
1✔
297
        .thenApply(
1✔
298
            (reason) -> {
299
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
300
              return null;
1✔
301
            });
302
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
303
  }
304

305
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
306
    signalDispatcher.handleInterceptedSignal(input);
1✔
307
  }
1✔
308

309
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
310
    signalDispatcher.handleSignal(signalName, input, eventId);
1✔
311
  }
1✔
312

313
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
314
      WorkflowInboundCallsInterceptor.QueryInput input) {
315
    return queryDispatcher.handleInterceptedQuery(input);
1✔
316
  }
317

318
  public Optional<Payloads> handleQuery(String queryName, Optional<Payloads> input) {
319
    return queryDispatcher.handleQuery(queryName, input);
1✔
320
  }
321

322
  private class ActivityCallback {
1✔
323
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
324

325
    public void invoke(Optional<Payloads> output, Failure failure) {
326
      if (failure != null) {
1✔
327
        runner.executeInWorkflowThread(
1✔
328
            "activity failure callback",
329
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
330
      } else {
331
        runner.executeInWorkflowThread(
1✔
332
            "activity completion callback", () -> result.complete(output));
1✔
333
      }
334
    }
1✔
335
  }
336

337
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
338
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
339

340
    @Override
341
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
342
      if (exception != null) {
1✔
343
        runner.executeInWorkflowThread(
1✔
344
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
345
      } else {
346
        runner.executeInWorkflowThread(
1✔
347
            "local activity completion callback", () -> result.complete(successOutput));
1✔
348
      }
349
    }
1✔
350
  }
351

352
  @Override
353
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
354
    ActivitySerializationContext serializationContext =
1✔
355
        new ActivitySerializationContext(
356
            replayContext.getNamespace(),
1✔
357
            replayContext.getWorkflowId(),
1✔
358
            replayContext.getWorkflowType().getName(),
1✔
359
            input.getActivityName(),
1✔
360
            replayContext.getTaskQueue(),
1✔
361
            true);
362
    DataConverter dataConverterWithActivityContext =
1✔
363
        dataConverter.withContext(serializationContext);
1✔
364
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
365

366
    long originalScheduledTime = System.currentTimeMillis();
1✔
367
    CompletablePromise<Optional<Payloads>> serializedResult =
368
        WorkflowInternal.newCompletablePromise();
1✔
369
    executeLocalActivityOverLocalRetryThreshold(
1✔
370
        input.getActivityName(),
1✔
371
        input.getOptions(),
1✔
372
        input.getHeader(),
1✔
373
        payloads,
374
        originalScheduledTime,
375
        1,
376
        null,
377
        serializedResult);
378

379
    Promise<R> result =
1✔
380
        serializedResult.handle(
1✔
381
            (r, f) -> {
382
              if (f == null) {
1✔
383
                return input.getResultClass() != Void.TYPE
1✔
384
                    ? dataConverterWithActivityContext.fromPayloads(
1✔
385
                        0, r, input.getResultClass(), input.getResultType())
1✔
386
                    : null;
1✔
387
              } else {
388
                throw dataConverterWithActivityContext.failureToException(
1✔
389
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
390
              }
391
            });
392

393
    return new LocalActivityOutput<>(result);
1✔
394
  }
395

396
  public void executeLocalActivityOverLocalRetryThreshold(
397
      String activityTypeName,
398
      LocalActivityOptions options,
399
      Header header,
400
      Optional<Payloads> input,
401
      long originalScheduledTime,
402
      int attempt,
403
      @Nullable Failure previousExecutionFailure,
404
      CompletablePromise<Optional<Payloads>> result) {
405
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
406
        executeLocalActivityLocally(
1✔
407
            activityTypeName,
408
            options,
409
            header,
410
            input,
411
            originalScheduledTime,
412
            attempt,
413
            previousExecutionFailure);
414

415
    localExecutionResult.handle(
1✔
416
        (r, e) -> {
417
          if (e == null) {
1✔
418
            result.complete(r);
1✔
419
          } else {
420
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
421
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
422
                  (LocalActivityCallback.LocalActivityFailedException) e;
423
              @Nullable Duration backoff = laException.getBackoff();
1✔
424
              if (backoff != null) {
1✔
425
                WorkflowInternal.newTimer(backoff)
1✔
426
                    .thenApply(
1✔
427
                        unused -> {
428
                          executeLocalActivityOverLocalRetryThreshold(
1✔
429
                              activityTypeName,
430
                              options,
431
                              header,
432
                              input,
433
                              originalScheduledTime,
434
                              laException.getLastAttempt() + 1,
1✔
435
                              laException.getFailure(),
1✔
436
                              result);
437
                          return null;
1✔
438
                        });
439
              } else {
440
                // final failure, report back
441
                result.completeExceptionally(laException);
1✔
442
              }
443
            } else {
1✔
444
              // Only LocalActivityFailedException is expected
445
              String exceptionMessage =
×
446
                  String.format(
×
447
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
448
                      activityTypeName);
449
              log.warn(exceptionMessage, e);
×
450
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
451
            }
452
          }
453
          return null;
1✔
454
        });
455
  }
1✔
456

457
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
458
      String activityTypeName,
459
      LocalActivityOptions options,
460
      Header header,
461
      Optional<Payloads> input,
462
      long originalScheduledTime,
463
      int attempt,
464
      @Nullable Failure previousExecutionFailure) {
465

466
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
467
    ExecuteLocalActivityParameters params =
1✔
468
        constructExecuteLocalActivityParameters(
1✔
469
            activityTypeName,
470
            options,
471
            header,
472
            input,
473
            attempt,
474
            originalScheduledTime,
475
            previousExecutionFailure);
476
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
477
    CancellationScope.current()
1✔
478
        .getCancellationRequest()
1✔
479
        .thenApply(
1✔
480
            (reason) -> {
481
              cancellationCallback.apply();
×
482
              return null;
×
483
            });
484
    return callback.result;
1✔
485
  }
486

487
  private ExecuteActivityParameters constructExecuteActivityParameters(
488
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
489
    String taskQueue = options.getTaskQueue();
1✔
490
    if (taskQueue == null) {
1✔
491
      taskQueue = replayContext.getTaskQueue();
1✔
492
    }
493
    ScheduleActivityTaskCommandAttributes.Builder attributes =
494
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
495
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
496
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
497
            .setScheduleToStartTimeout(
1✔
498
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
499
            .setStartToCloseTimeout(
1✔
500
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
501
            .setScheduleToCloseTimeout(
1✔
502
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
503
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
504
            .setRequestEagerExecution(
1✔
505
                !options.isEagerExecutionDisabled()
1✔
506
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
507

508
    input.ifPresent(attributes::setInput);
1✔
509
    RetryOptions retryOptions = options.getRetryOptions();
1✔
510
    if (retryOptions != null) {
1✔
511
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
512
    }
513

514
    // Set the context value.  Use the context propagators from the ActivityOptions
515
    // if present, otherwise use the ones configured on the WorkflowContext
516
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
517
    if (propagators == null) {
1✔
518
      propagators = this.contextPropagators;
1✔
519
    }
520
    io.temporal.api.common.v1.Header grpcHeader =
1✔
521
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
522
    attributes.setHeader(grpcHeader);
1✔
523
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
524
  }
525

526
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
527
      String name,
528
      LocalActivityOptions options,
529
      Header header,
530
      Optional<Payloads> input,
531
      int attempt,
532
      long originalScheduledTime,
533
      @Nullable Failure previousExecutionFailure) {
534
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
535

536
    PollActivityTaskQueueResponse.Builder activityTask =
537
        PollActivityTaskQueueResponse.newBuilder()
1✔
538
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
539
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
540
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
541
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
542
            // used to pass scheduled time to the local activity code inside
543
            // ActivityExecutionContext#getInfo
544
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
545
            // into the LA queue
546
            .setScheduledTime(
1✔
547
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
548
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
549
            .setAttempt(attempt);
1✔
550

551
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
552
    if (scheduleToCloseTimeout != null) {
1✔
553
      activityTask.setScheduleToCloseTimeout(
1✔
554
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
555
    }
556

557
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
558
    if (startToCloseTimeout != null) {
1✔
559
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
560
    }
561

562
    io.temporal.api.common.v1.Header grpcHeader =
1✔
563
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
564
    activityTask.setHeader(grpcHeader);
1✔
565
    input.ifPresent(activityTask::setInput);
1✔
566
    RetryOptions retryOptions = options.getRetryOptions();
1✔
567
    activityTask.setRetryPolicy(
1✔
568
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
569
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
570
    if (localRetryThreshold == null) {
1✔
571
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
572
    }
573

574
    return new ExecuteLocalActivityParameters(
1✔
575
        activityTask,
576
        options.getScheduleToStartTimeout(),
1✔
577
        originalScheduledTime,
578
        previousExecutionFailure,
579
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
580
        localRetryThreshold);
581
  }
582

583
  @Override
584
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
585
    if (CancellationScope.current().isCancelRequested()) {
1✔
586
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
587
      return new ChildWorkflowOutput<>(
×
588
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
589
    }
590

591
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
592
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
593

594
    DataConverter dataConverterWithChildWorkflowContext =
1✔
595
        dataConverter.withContext(
1✔
596
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
597
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
598

599
    @Nullable
600
    Memo memo =
601
        (input.getOptions().getMemo() != null)
1✔
602
            ? Memo.newBuilder()
1✔
603
                .putAllFields(
1✔
604
                    intoPayloadMap(
1✔
605
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
606
                .build()
1✔
607
            : null;
1✔
608

609
    StartChildWorkflowExecutionParameters parameters =
1✔
610
        createChildWorkflowParameters(
1✔
611
            input.getWorkflowId(),
1✔
612
            input.getWorkflowType(),
1✔
613
            input.getOptions(),
1✔
614
            input.getHeader(),
1✔
615
            payloads,
616
            memo);
617

618
    Functions.Proc1<Exception> cancellationCallback =
1✔
619
        replayContext.startChildWorkflow(
1✔
620
            parameters,
621
            (execution, failure) -> {
622
              if (failure != null) {
1✔
623
                runner.executeInWorkflowThread(
1✔
624
                    "child workflow start failed callback",
625
                    () ->
626
                        executionPromise.completeExceptionally(
1✔
627
                            mapChildWorkflowException(
1✔
628
                                failure, dataConverterWithChildWorkflowContext)));
629
              } else {
630
                runner.executeInWorkflowThread(
1✔
631
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
632
              }
633
            },
1✔
634
            (result, failure) -> {
635
              if (failure != null) {
1✔
636
                runner.executeInWorkflowThread(
1✔
637
                    "child workflow failure callback",
638
                    () ->
639
                        resultPromise.completeExceptionally(
1✔
640
                            mapChildWorkflowException(
1✔
641
                                failure, dataConverterWithChildWorkflowContext)));
642
              } else {
643
                runner.executeInWorkflowThread(
1✔
644
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
645
              }
646
            });
1✔
647
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
648
    CancellationScope.current()
1✔
649
        .getCancellationRequest()
1✔
650
        .thenApply(
1✔
651
            (reason) -> {
652
              if (!callbackCalled.getAndSet(true)) {
1✔
653
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
654
              }
655
              return null;
1✔
656
            });
657

658
    Promise<R> result =
1✔
659
        resultPromise.thenApply(
1✔
660
            (b) ->
661
                dataConverterWithChildWorkflowContext.fromPayloads(
1✔
662
                    0, b, input.getResultClass(), input.getResultType()));
1✔
663
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
664
  }
665

666
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
667
      String workflowId,
668
      String name,
669
      ChildWorkflowOptions options,
670
      Header header,
671
      Optional<Payloads> input,
672
      @Nullable Memo memo) {
673
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
674
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
675
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
676
    attributes.setWorkflowId(workflowId);
1✔
677
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
678
    input.ifPresent(attributes::setInput);
1✔
679
    attributes.setWorkflowRunTimeout(
1✔
680
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
681
    attributes.setWorkflowExecutionTimeout(
1✔
682
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
683
    attributes.setWorkflowTaskTimeout(
1✔
684
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
685
    String taskQueue = options.getTaskQueue();
1✔
686
    if (taskQueue != null) {
1✔
687
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
688
    }
689
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
690
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
691
    }
692
    RetryOptions retryOptions = options.getRetryOptions();
1✔
693
    if (retryOptions != null) {
1✔
694
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
695
    }
696
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
697

698
    if (memo != null) {
1✔
699
      attributes.setMemo(memo);
1✔
700
    }
701

702
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
703
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
704
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
705
    }
706

707
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
708
    if (propagators == null) {
1✔
709
      propagators = this.contextPropagators;
1✔
710
    }
711
    io.temporal.api.common.v1.Header grpcHeader =
1✔
712
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
713
    attributes.setHeader(grpcHeader);
1✔
714

715
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
716
    if (parentClosePolicy != null) {
1✔
717
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
718
    }
719
    return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType());
1✔
720
  }
721

722
  private static Header extractContextsAndConvertToBytes(
723
      List<ContextPropagator> contextPropagators) {
724
    if (contextPropagators == null) {
1✔
725
      return null;
×
726
    }
727
    Map<String, Payload> result = new HashMap<>();
1✔
728
    for (ContextPropagator propagator : contextPropagators) {
1✔
729
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
730
    }
1✔
731
    return new Header(result);
1✔
732
  }
733

734
  private static RuntimeException mapChildWorkflowException(
735
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
736
    if (failure == null) {
1✔
737
      return null;
×
738
    }
739
    if (failure instanceof TemporalFailure) {
1✔
740
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
741
    }
742
    if (failure instanceof CanceledFailure) {
1✔
743
      return (CanceledFailure) failure;
1✔
744
    }
745
    if (failure instanceof WorkflowException) {
1✔
746
      return (RuntimeException) failure;
×
747
    }
748
    if (failure instanceof ChildWorkflowFailure) {
1✔
749
      return (ChildWorkflowFailure) failure;
1✔
750
    }
751
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
752
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
753
    }
754
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
755
    Throwable cause =
1✔
756
        dataConverterWithChildWorkflowContext.failureToException(
1✔
757
            taskFailed.getOriginalCauseFailure());
1✔
758
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
759
    return new ChildWorkflowFailure(
1✔
760
        exception.getInitiatedEventId(),
1✔
761
        exception.getStartedEventId(),
1✔
762
        exception.getWorkflowType(),
1✔
763
        exception.getExecution(),
1✔
764
        exception.getNamespace(),
1✔
765
        exception.getRetryState(),
1✔
766
        cause);
767
  }
768

769
  @Override
770
  public Promise<Void> newTimer(Duration delay) {
771
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
772
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
773
        replayContext.newTimer(
1✔
774
            delay,
775
            (e) ->
776
                runner.executeInWorkflowThread(
1✔
777
                    "timer-callback",
778
                    () -> {
779
                      if (e == null) {
1✔
780
                        p.complete(null);
1✔
781
                      } else {
782
                        p.completeExceptionally(e);
1✔
783
                      }
784
                    }));
1✔
785
    CancellationScope.current()
1✔
786
        .getCancellationRequest()
1✔
787
        .thenApply(
1✔
788
            (r) -> {
789
              cancellationHandler.apply(new CanceledFailure(r));
1✔
790
              return r;
1✔
791
            });
792
    return p;
1✔
793
  }
794

795
  @Override
796
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
797
    try {
798
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
799
      replayContext.sideEffect(
1✔
800
          () -> {
801
            R r = func.apply();
1✔
802
            return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
803
          },
804
          (p) ->
805
              runner.executeInWorkflowThread(
1✔
806
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
807
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
808
          0, result.get(), resultClass, resultType);
1✔
809
    } catch (Exception e) {
×
810
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
811
      // fail the workflow task by throwing an Error.
812
      throw new Error(e);
×
813
    }
814
  }
815

816
  @Override
817
  public <R> R mutableSideEffect(
818
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
819
    try {
820
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
821
    } catch (Exception e) {
×
822
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
823
      // behavior. So fail the workflow task by throwing an Error.
824
      throw new Error(e);
×
825
    }
826
  }
827

828
  private <R> R mutableSideEffectImpl(
829
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
830
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
831
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
832
    replayContext.mutableSideEffect(
1✔
833
        id,
834
        (storedBinary) -> {
835
          Optional<R> stored =
1✔
836
              storedBinary.map(
1✔
837
                  (b) ->
838
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
839
                          0, Optional.of(b), resultClass, resultType));
1✔
840
          R funcResult =
1✔
841
              Objects.requireNonNull(func.apply(), "mutableSideEffect function " + "returned null");
1✔
842
          if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
843
            unserializedResult.set(funcResult);
1✔
844
            return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
845
          }
846
          return Optional.empty(); // returned only when value doesn't need to be updated
1✔
847
        },
848
        (p) ->
849
            runner.executeInWorkflowThread(
1✔
850
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
851

852
    if (!result.get().isPresent()) {
1✔
853
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
854
    }
855
    // An optimization that avoids unnecessary deserialization of the result.
856
    R unserialized = unserializedResult.get();
1✔
857
    if (unserialized != null) {
1✔
858
      return unserialized;
1✔
859
    }
860
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
861
        0, result.get(), resultClass, resultType);
1✔
862
  }
863

864
  @Override
865
  public int getVersion(String changeId, int minSupported, int maxSupported) {
866
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
867
    replayContext.getVersion(
1✔
868
        changeId,
869
        minSupported,
870
        maxSupported,
871
        (v, e) ->
872
            runner.executeInWorkflowThread(
1✔
873
                "version-callback",
874
                () -> {
875
                  if (v != null) {
1✔
876
                    result.complete(v);
1✔
877
                  } else {
878
                    result.completeExceptionally(e);
1✔
879
                  }
880
                }));
1✔
881
    try {
882
      return result.get();
1✔
883
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
884
      throw new UnsupportedVersion(ex);
1✔
885
    }
886
  }
887

888
  @Override
889
  public void registerQuery(RegisterQueryInput request) {
890
    queryDispatcher.registerQueryHandlers(request);
1✔
891
  }
1✔
892

893
  @Override
894
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
895
    signalDispatcher.registerSignalHandlers(input);
1✔
896
  }
1✔
897

898
  @Override
899
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
900
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
901
  }
1✔
902

903
  @Override
904
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
905
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
906
  }
1✔
907

908
  @Override
909
  public UUID randomUUID() {
910
    return replayContext.randomUUID();
1✔
911
  }
912

913
  @Override
914
  public Random newRandom() {
915
    return replayContext.newRandom();
1✔
916
  }
917

918
  public DataConverter getDataConverter() {
919
    return dataConverter;
1✔
920
  }
921

922
  boolean isReplaying() {
923
    return replayContext.isReplaying();
1✔
924
  }
925

926
  @Override
927
  public ReplayWorkflowContext getReplayContext() {
928
    return replayContext;
1✔
929
  }
930

931
  @Override
932
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
933
    WorkflowExecution childExecution = input.getExecution();
1✔
934
    DataConverter dataConverterWithChildWorkflowContext =
1✔
935
        dataConverter.withContext(
1✔
936
            new WorkflowSerializationContext(
937
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
938
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
939
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
940
    attributes.setSignalName(input.getSignalName());
1✔
941
    attributes.setExecution(childExecution);
1✔
942
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
943
    payloads.ifPresent(attributes::setInput);
1✔
944
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
945
    Functions.Proc1<Exception> cancellationCallback =
1✔
946
        replayContext.signalExternalWorkflowExecution(
1✔
947
            attributes,
948
            (output, failure) -> {
949
              if (failure != null) {
1✔
950
                runner.executeInWorkflowThread(
1✔
951
                    "child workflow failure callback",
952
                    () ->
953
                        result.completeExceptionally(
1✔
954
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
955
              } else {
956
                runner.executeInWorkflowThread(
1✔
957
                    "child workflow completion callback", () -> result.complete(output));
1✔
958
              }
959
            });
1✔
960
    CancellationScope.current()
1✔
961
        .getCancellationRequest()
1✔
962
        .thenApply(
1✔
963
            (reason) -> {
964
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
965
              return null;
1✔
966
            });
967
    return new SignalExternalOutput(result);
1✔
968
  }
969

970
  @Override
971
  public void sleep(Duration duration) {
972
    newTimer(duration).get();
1✔
973
  }
1✔
974

975
  @Override
976
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
977
    Promise<Void> timer = newTimer(timeout);
1✔
978
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
979
    return !timer.isCompleted();
1✔
980
  }
981

982
  @Override
983
  public void await(String reason, Supplier<Boolean> unblockCondition) {
984
    WorkflowThread.await(reason, unblockCondition);
1✔
985
  }
1✔
986

987
  @Override
988
  public void continueAsNew(ContinueAsNewInput input) {
989
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
990
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
991
    String workflowType = input.getWorkflowType();
1✔
992
    if (workflowType != null) {
1✔
993
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
994
    }
995
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
996
    if (options != null) {
1✔
997
      if (options.getWorkflowRunTimeout() != null) {
1✔
998
        attributes.setWorkflowRunTimeout(
×
999
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1000
      }
1001
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1002
        attributes.setWorkflowTaskTimeout(
×
1003
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1004
      }
1005
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1006
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1007
      }
1008
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1009
      if (searchAttributes != null) {
1✔
1010
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
1011
      }
1012
      Map<String, Object> memo = options.getMemo();
1✔
1013
      if (memo != null) {
1✔
1014
        attributes.setMemo(
1✔
1015
            Memo.newBuilder()
1✔
1016
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1017
      }
1018
    }
1019

1020
    List<ContextPropagator> propagators =
1021
        options != null && options.getContextPropagators() != null
1✔
1022
            ? options.getContextPropagators()
×
1023
            : this.contextPropagators;
1✔
1024
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1025
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1026
    attributes.setHeader(grpcHeader);
1✔
1027

1028
    Optional<Payloads> payloads =
1✔
1029
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1030
    payloads.ifPresent(attributes::setInput);
1✔
1031

1032
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1033
    WorkflowThread.exit();
×
1034
  }
×
1035

1036
  @Override
1037
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1038
    CompletablePromise<Void> result = Workflow.newPromise();
×
1039
    replayContext.requestCancelExternalWorkflowExecution(
×
1040
        input.getExecution(),
×
1041
        (r, exception) -> {
1042
          if (exception == null) {
×
1043
            result.complete(null);
×
1044
          } else {
1045
            result.completeExceptionally(exception);
×
1046
          }
1047
        });
×
1048
    return new CancelWorkflowOutput(result);
×
1049
  }
1050

1051
  public Scope getMetricsScope() {
1052
    return replayContext.getMetricsScope();
1✔
1053
  }
1054

1055
  public boolean isLoggingEnabledInReplay() {
1056
    return replayContext.getEnableLoggingInReplay();
×
1057
  }
1058

1059
  @Override
1060
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1061
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1062
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1063
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1064
    replayContext.upsertSearchAttributes(attr);
1✔
1065
  }
1✔
1066

1067
  @Nonnull
1068
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1069
    return runner.newWorkflowThread(runnable, false, name);
1✔
1070
  }
1071

1072
  @Nonnull
1073
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1074
    return runner.newCallbackThread(runnable, name);
1✔
1075
  }
1076

1077
  @Override
1078
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1079
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1080
  }
1081

1082
  @Override
1083
  public long currentTimeMillis() {
1084
    return replayContext.currentTimeMillis();
1✔
1085
  }
1086

1087
  /**
1088
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1089
   * thread and should be replaced with another specific implementation during initialization stage
1090
   * {@code workflow.initialize()} performed inside the workflow root thread.
1091
   *
1092
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1093
   */
1094
  private static final class InitialWorkflowInboundCallsInterceptor
1095
      extends BaseRootWorkflowInboundCallsInterceptor {
1096

1097
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1098
      super(workflowContext);
1✔
1099
    }
1✔
1100

1101
    @Override
1102
    public WorkflowOutput execute(WorkflowInput input) {
1103
      throw new UnsupportedOperationException(
×
1104
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1105
              + "before #execute can be called");
1106
    }
1107
  }
1108

1109
  @Nonnull
1110
  @Override
1111
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1112
    return workflowImplementationOptions;
1✔
1113
  }
1114

1115
  @Override
1116
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1117
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1118
  }
1119

1120
  @Nullable
1121
  @Override
1122
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1123
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1124
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1125
  }
1126

1127
  @Override
1128
  public List<ContextPropagator> getContextPropagators() {
1129
    return contextPropagators;
1✔
1130
  }
1131

1132
  @Override
1133
  public Map<String, Object> getPropagatedContexts() {
1134
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1135
      return new HashMap<>();
1✔
1136
    }
1137

1138
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1139
    Map<String, Object> contextData = new HashMap<>();
1✔
1140
    for (ContextPropagator propagator : contextPropagators) {
1✔
1141
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1142
    }
1✔
1143

1144
    return contextData;
1✔
1145
  }
1146

1147
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1148
  private static class FailureWrapperException extends RuntimeException {
1149
    private final Failure failure;
1150

1151
    public FailureWrapperException(Failure failure) {
1✔
1152
      this.failure = failure;
1✔
1153
    }
1✔
1154

1155
    public Failure getFailure() {
1156
      return failure;
1✔
1157
    }
1158
  }
1159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc