• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #181

pending completion
#181

push

github-actions

web-flow
Properly wrap exceptions from schedule client (#1827)

Wrap schedule exception

37 of 37 new or added lines in 1 file covered. (100.0%)

18557 of 23894 relevant lines covered (77.66%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.55
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
26
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.base.Preconditions;
30
import com.uber.m3.tally.Scope;
31
import io.temporal.activity.ActivityOptions;
32
import io.temporal.activity.LocalActivityOptions;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
35
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
36
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
37
import io.temporal.api.common.v1.ActivityType;
38
import io.temporal.api.common.v1.Memo;
39
import io.temporal.api.common.v1.Payload;
40
import io.temporal.api.common.v1.Payloads;
41
import io.temporal.api.common.v1.SearchAttributes;
42
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.api.common.v1.WorkflowType;
44
import io.temporal.api.enums.v1.ParentClosePolicy;
45
import io.temporal.api.failure.v1.Failure;
46
import io.temporal.api.history.v1.HistoryEvent;
47
import io.temporal.api.taskqueue.v1.TaskQueue;
48
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
49
import io.temporal.client.WorkflowException;
50
import io.temporal.common.RetryOptions;
51
import io.temporal.common.SearchAttributeUpdate;
52
import io.temporal.common.context.ContextPropagator;
53
import io.temporal.common.converter.DataConverter;
54
import io.temporal.common.interceptors.Header;
55
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
56
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
57
import io.temporal.failure.*;
58
import io.temporal.internal.common.ActivityOptionUtils;
59
import io.temporal.internal.common.OptionsUtils;
60
import io.temporal.internal.common.ProtobufTimeUtils;
61
import io.temporal.internal.common.SdkFlag;
62
import io.temporal.internal.common.SearchAttributesUtil;
63
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
64
import io.temporal.internal.replay.ReplayWorkflowContext;
65
import io.temporal.internal.replay.WorkflowContext;
66
import io.temporal.internal.statemachines.*;
67
import io.temporal.payload.context.ActivitySerializationContext;
68
import io.temporal.payload.context.WorkflowSerializationContext;
69
import io.temporal.worker.WorkflowImplementationOptions;
70
import io.temporal.workflow.CancellationScope;
71
import io.temporal.workflow.ChildWorkflowOptions;
72
import io.temporal.workflow.CompletablePromise;
73
import io.temporal.workflow.ContinueAsNewOptions;
74
import io.temporal.workflow.Functions;
75
import io.temporal.workflow.Functions.Func;
76
import io.temporal.workflow.Promise;
77
import io.temporal.workflow.Workflow;
78
import java.lang.reflect.Type;
79
import java.time.Duration;
80
import java.time.Instant;
81
import java.util.*;
82
import java.util.concurrent.atomic.AtomicBoolean;
83
import java.util.concurrent.atomic.AtomicReference;
84
import java.util.function.BiPredicate;
85
import java.util.function.Supplier;
86
import javax.annotation.Nonnull;
87
import javax.annotation.Nullable;
88
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
90

91
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
92
// RootWorkflowOutboundInterceptor
93

94
/**
95
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
96
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
97
 * definition on the worker meets ReplayWorkflowContext which contains information from the
98
 * WorkflowTask
99
 */
100
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
101
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
102

103
  private final String namespace;
104
  private final WorkflowExecution workflowExecution;
105
  private final WorkflowImplementationOptions workflowImplementationOptions;
106
  private final DataConverter dataConverter;
107
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
108
  private final DataConverter dataConverterWithCurrentWorkflowContext;
109
  private final List<ContextPropagator> contextPropagators;
110
  private final SignalDispatcher signalDispatcher;
111
  private final QueryDispatcher queryDispatcher;
112
  private final UpdateDispatcher updateDispatcher;
113

114
  // initialized later when these entities are created
115
  private ReplayWorkflowContext replayContext;
116
  private DeterministicRunner runner;
117

118
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
119
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
120

121
  private ActivityOptions defaultActivityOptions = null;
1✔
122
  private Map<String, ActivityOptions> activityOptionsMap;
123
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
124
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
125
  private boolean readOnly = false;
1✔
126

127
  public SyncWorkflowContext(
128
      @Nonnull String namespace,
129
      @Nonnull WorkflowExecution workflowExecution,
130
      SignalDispatcher signalDispatcher,
131
      QueryDispatcher queryDispatcher,
132
      UpdateDispatcher updateDispatcher,
133
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
134
      DataConverter dataConverter,
135
      List<ContextPropagator> contextPropagators) {
1✔
136
    this.namespace = namespace;
1✔
137
    this.workflowExecution = workflowExecution;
1✔
138
    this.dataConverter = dataConverter;
1✔
139
    this.dataConverterWithCurrentWorkflowContext =
1✔
140
        dataConverter.withContext(
1✔
141
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
142
    this.contextPropagators = contextPropagators;
1✔
143
    this.signalDispatcher = signalDispatcher;
1✔
144
    this.queryDispatcher = queryDispatcher;
1✔
145
    this.updateDispatcher = updateDispatcher;
1✔
146
    if (workflowImplementationOptions != null) {
1✔
147
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
148
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
149
      this.defaultLocalActivityOptions =
1✔
150
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
151
      this.localActivityOptionsMap =
1✔
152
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
153
    }
154
    this.workflowImplementationOptions =
1✔
155
        workflowImplementationOptions == null
1✔
156
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
157
            : workflowImplementationOptions;
1✔
158
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
159
    // with actual interceptors through #initHeadInboundCallsInterceptor and
160
    // #initHeadOutboundCallsInterceptor during initialization phase.
161
    // See workflow.initialize() performed inside the workflow root thread inside
162
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
163
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
164
    this.headOutboundInterceptor = this;
1✔
165
  }
1✔
166

167
  public void setReplayContext(ReplayWorkflowContext context) {
168
    this.replayContext = context;
1✔
169
  }
1✔
170

171
  /**
172
   * Using setter, as runner is initialized with this context, so it is not ready during
173
   * construction of this.
174
   */
175
  public void setRunner(DeterministicRunner runner) {
176
    this.runner = runner;
1✔
177
  }
1✔
178

179
  public DeterministicRunner getRunner() {
180
    return runner;
×
181
  }
182

183
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
184
    return headOutboundInterceptor;
1✔
185
  }
186

187
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
188
    return headInboundInterceptor;
1✔
189
  }
190

191
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
192
    headOutboundInterceptor = head;
1✔
193
  }
1✔
194

195
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
196
    headInboundInterceptor = head;
1✔
197
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
198
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
199
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
200
  }
1✔
201

202
  public ActivityOptions getDefaultActivityOptions() {
203
    return defaultActivityOptions;
1✔
204
  }
205

206
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
207
    return activityOptionsMap != null
1✔
208
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
209
        : Collections.emptyMap();
×
210
  }
211

212
  public LocalActivityOptions getDefaultLocalActivityOptions() {
213
    return defaultLocalActivityOptions;
1✔
214
  }
215

216
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
217
    return localActivityOptionsMap != null
1✔
218
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
219
        : Collections.emptyMap();
×
220
  }
221

222
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
223
    this.defaultActivityOptions =
1✔
224
        (this.defaultActivityOptions == null)
1✔
225
            ? defaultActivityOptions
×
226
            : this.defaultActivityOptions.toBuilder()
1✔
227
                .mergeActivityOptions(defaultActivityOptions)
1✔
228
                .build();
1✔
229
  }
1✔
230

231
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
232
    Objects.requireNonNull(activityTypeToOption);
1✔
233
    if (this.activityOptionsMap == null) {
1✔
234
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
235
      return;
×
236
    }
237
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
238
  }
1✔
239

240
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
241
    this.defaultLocalActivityOptions =
1✔
242
        (this.defaultLocalActivityOptions == null)
1✔
243
            ? defaultLocalActivityOptions
×
244
            : this.defaultLocalActivityOptions.toBuilder()
1✔
245
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
246
                .build();
1✔
247
  }
1✔
248

249
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
250
    Objects.requireNonNull(activityTypeToOption);
1✔
251
    if (this.localActivityOptionsMap == null) {
1✔
252
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
253
      return;
×
254
    }
255
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
256
        localActivityOptionsMap, activityTypeToOption);
257
  }
1✔
258

259
  @Override
260
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
261
    ActivitySerializationContext serializationContext =
1✔
262
        new ActivitySerializationContext(
263
            replayContext.getNamespace(),
1✔
264
            replayContext.getWorkflowId(),
1✔
265
            replayContext.getWorkflowType().getName(),
1✔
266
            input.getActivityName(),
1✔
267
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
268
            // by the Server in this case
269
            MoreObjects.firstNonNull(
1✔
270
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
271
            false);
272
    DataConverter dataConverterWithActivityContext =
1✔
273
        dataConverter.withContext(serializationContext);
1✔
274
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
275

276
    ActivityOutput<Optional<Payloads>> output =
1✔
277
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
278

279
    return new ActivityOutput<>(
1✔
280
        output.getActivityId(),
1✔
281
        output
282
            .getResult()
1✔
283
            .handle(
1✔
284
                (r, f) -> {
285
                  if (f == null) {
1✔
286
                    return input.getResultType() != Void.TYPE
1✔
287
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
288
                            0, r, input.getResultClass(), input.getResultType())
1✔
289
                        : null;
1✔
290
                  } else {
291
                    throw dataConverterWithActivityContext.failureToException(
1✔
292
                        ((FailureWrapperException) f).getFailure());
1✔
293
                  }
294
                }));
295
  }
296

297
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
298
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
299
    ExecuteActivityParameters params =
1✔
300
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
301
    ActivityCallback callback = new ActivityCallback();
1✔
302
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
303
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
304
    CancellationScope.current()
1✔
305
        .getCancellationRequest()
1✔
306
        .thenApply(
1✔
307
            (reason) -> {
308
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
309
              return null;
1✔
310
            });
311
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
312
  }
313

314
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
315
    signalDispatcher.handleInterceptedSignal(input);
1✔
316
  }
1✔
317

318
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
319
    signalDispatcher.handleSignal(signalName, input, eventId);
1✔
320
  }
1✔
321

322
  public void handleValidateUpdate(String updateName, Optional<Payloads> input, long eventId) {
323
    updateDispatcher.handleValidateUpdate(updateName, input, eventId);
1✔
324
  }
1✔
325

326
  public Optional<Payloads> handleExecuteUpdate(
327
      String updateName, Optional<Payloads> input, long eventId) {
328
    return updateDispatcher.handleExecuteUpdate(updateName, input, eventId);
1✔
329
  }
330

331
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
332
    updateDispatcher.handleInterceptedValidateUpdate(input);
1✔
333
  }
1✔
334

335
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
336
      WorkflowInboundCallsInterceptor.UpdateInput input) {
337
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
1✔
338
  }
339

340
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
341
      WorkflowInboundCallsInterceptor.QueryInput input) {
342
    return queryDispatcher.handleInterceptedQuery(input);
1✔
343
  }
344

345
  public Optional<Payloads> handleQuery(String queryName, Optional<Payloads> input) {
346
    return queryDispatcher.handleQuery(queryName, input);
1✔
347
  }
348

349
  private class ActivityCallback {
1✔
350
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
351

352
    public void invoke(Optional<Payloads> output, Failure failure) {
353
      if (failure != null) {
1✔
354
        runner.executeInWorkflowThread(
1✔
355
            "activity failure callback",
356
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
357
      } else {
358
        runner.executeInWorkflowThread(
1✔
359
            "activity completion callback", () -> result.complete(output));
1✔
360
      }
361
    }
1✔
362
  }
363

364
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
365
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
366

367
    @Override
368
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
369
      if (exception != null) {
1✔
370
        runner.executeInWorkflowThread(
1✔
371
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
372
      } else {
373
        runner.executeInWorkflowThread(
1✔
374
            "local activity completion callback", () -> result.complete(successOutput));
1✔
375
      }
376
    }
1✔
377
  }
378

379
  @Override
380
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
381
    ActivitySerializationContext serializationContext =
1✔
382
        new ActivitySerializationContext(
383
            replayContext.getNamespace(),
1✔
384
            replayContext.getWorkflowId(),
1✔
385
            replayContext.getWorkflowType().getName(),
1✔
386
            input.getActivityName(),
1✔
387
            replayContext.getTaskQueue(),
1✔
388
            true);
389
    DataConverter dataConverterWithActivityContext =
1✔
390
        dataConverter.withContext(serializationContext);
1✔
391
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
392

393
    long originalScheduledTime = System.currentTimeMillis();
1✔
394
    CompletablePromise<Optional<Payloads>> serializedResult =
395
        WorkflowInternal.newCompletablePromise();
1✔
396
    executeLocalActivityOverLocalRetryThreshold(
1✔
397
        input.getActivityName(),
1✔
398
        input.getOptions(),
1✔
399
        input.getHeader(),
1✔
400
        payloads,
401
        originalScheduledTime,
402
        1,
403
        null,
404
        serializedResult);
405

406
    Promise<R> result =
1✔
407
        serializedResult.handle(
1✔
408
            (r, f) -> {
409
              if (f == null) {
1✔
410
                return input.getResultClass() != Void.TYPE
1✔
411
                    ? dataConverterWithActivityContext.fromPayloads(
1✔
412
                        0, r, input.getResultClass(), input.getResultType())
1✔
413
                    : null;
1✔
414
              } else {
415
                throw dataConverterWithActivityContext.failureToException(
1✔
416
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
417
              }
418
            });
419

420
    return new LocalActivityOutput<>(result);
1✔
421
  }
422

423
  public void executeLocalActivityOverLocalRetryThreshold(
424
      String activityTypeName,
425
      LocalActivityOptions options,
426
      Header header,
427
      Optional<Payloads> input,
428
      long originalScheduledTime,
429
      int attempt,
430
      @Nullable Failure previousExecutionFailure,
431
      CompletablePromise<Optional<Payloads>> result) {
432
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
433
        executeLocalActivityLocally(
1✔
434
            activityTypeName,
435
            options,
436
            header,
437
            input,
438
            originalScheduledTime,
439
            attempt,
440
            previousExecutionFailure);
441

442
    localExecutionResult.handle(
1✔
443
        (r, e) -> {
444
          if (e == null) {
1✔
445
            result.complete(r);
1✔
446
          } else {
447
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
448
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
449
                  (LocalActivityCallback.LocalActivityFailedException) e;
450
              @Nullable Duration backoff = laException.getBackoff();
1✔
451
              if (backoff != null) {
1✔
452
                WorkflowInternal.newTimer(backoff)
1✔
453
                    .thenApply(
1✔
454
                        unused -> {
455
                          executeLocalActivityOverLocalRetryThreshold(
1✔
456
                              activityTypeName,
457
                              options,
458
                              header,
459
                              input,
460
                              originalScheduledTime,
461
                              laException.getLastAttempt() + 1,
1✔
462
                              laException.getFailure(),
1✔
463
                              result);
464
                          return null;
1✔
465
                        });
466
              } else {
467
                // final failure, report back
468
                result.completeExceptionally(laException);
1✔
469
              }
470
            } else {
1✔
471
              // Only LocalActivityFailedException is expected
472
              String exceptionMessage =
×
473
                  String.format(
×
474
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
475
                      activityTypeName);
476
              log.warn(exceptionMessage, e);
×
477
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
478
            }
479
          }
480
          return null;
1✔
481
        });
482
  }
1✔
483

484
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
485
      String activityTypeName,
486
      LocalActivityOptions options,
487
      Header header,
488
      Optional<Payloads> input,
489
      long originalScheduledTime,
490
      int attempt,
491
      @Nullable Failure previousExecutionFailure) {
492

493
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
494
    ExecuteLocalActivityParameters params =
1✔
495
        constructExecuteLocalActivityParameters(
1✔
496
            activityTypeName,
497
            options,
498
            header,
499
            input,
500
            attempt,
501
            originalScheduledTime,
502
            previousExecutionFailure);
503
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
504
    CancellationScope.current()
1✔
505
        .getCancellationRequest()
1✔
506
        .thenApply(
1✔
507
            (reason) -> {
508
              cancellationCallback.apply();
×
509
              return null;
×
510
            });
511
    return callback.result;
1✔
512
  }
513

514
  private ExecuteActivityParameters constructExecuteActivityParameters(
515
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
516
    String taskQueue = options.getTaskQueue();
1✔
517
    if (taskQueue == null) {
1✔
518
      taskQueue = replayContext.getTaskQueue();
1✔
519
    }
520
    ScheduleActivityTaskCommandAttributes.Builder attributes =
521
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
522
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
523
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
524
            .setScheduleToStartTimeout(
1✔
525
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
526
            .setStartToCloseTimeout(
1✔
527
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
528
            .setScheduleToCloseTimeout(
1✔
529
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
530
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
531
            .setRequestEagerExecution(
1✔
532
                !options.isEagerExecutionDisabled()
1✔
533
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
534

535
    input.ifPresent(attributes::setInput);
1✔
536
    RetryOptions retryOptions = options.getRetryOptions();
1✔
537
    if (retryOptions != null) {
1✔
538
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
539
    }
540

541
    // Set the context value.  Use the context propagators from the ActivityOptions
542
    // if present, otherwise use the ones configured on the WorkflowContext
543
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
544
    if (propagators == null) {
1✔
545
      propagators = this.contextPropagators;
1✔
546
    }
547
    io.temporal.api.common.v1.Header grpcHeader =
1✔
548
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
549
    attributes.setHeader(grpcHeader);
1✔
550

551
    if (options.getVersioningIntent() != null) {
1✔
552
      attributes.setUseCompatibleVersion(
1✔
553
          options
554
              .getVersioningIntent()
1✔
555
              .determineUseCompatibleFlag(
1✔
556
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
557
    }
558

559
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
560
  }
561

562
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
563
      String name,
564
      LocalActivityOptions options,
565
      Header header,
566
      Optional<Payloads> input,
567
      int attempt,
568
      long originalScheduledTime,
569
      @Nullable Failure previousExecutionFailure) {
570
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
571

572
    PollActivityTaskQueueResponse.Builder activityTask =
573
        PollActivityTaskQueueResponse.newBuilder()
1✔
574
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
575
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
576
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
577
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
578
            // used to pass scheduled time to the local activity code inside
579
            // ActivityExecutionContext#getInfo
580
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
581
            // into the LA queue
582
            .setScheduledTime(
1✔
583
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
584
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
585
            .setAttempt(attempt);
1✔
586

587
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
588
    if (scheduleToCloseTimeout != null) {
1✔
589
      activityTask.setScheduleToCloseTimeout(
1✔
590
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
591
    }
592

593
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
594
    if (startToCloseTimeout != null) {
1✔
595
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
596
    }
597

598
    io.temporal.api.common.v1.Header grpcHeader =
1✔
599
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
600
    activityTask.setHeader(grpcHeader);
1✔
601
    input.ifPresent(activityTask::setInput);
1✔
602
    RetryOptions retryOptions = options.getRetryOptions();
1✔
603
    activityTask.setRetryPolicy(
1✔
604
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
605
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
606
    if (localRetryThreshold == null) {
1✔
607
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
608
    }
609

610
    return new ExecuteLocalActivityParameters(
1✔
611
        activityTask,
612
        options.getScheduleToStartTimeout(),
1✔
613
        originalScheduledTime,
614
        previousExecutionFailure,
615
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
616
        localRetryThreshold);
617
  }
618

619
  @Override
620
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
621
    if (CancellationScope.current().isCancelRequested()) {
1✔
622
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
623
      return new ChildWorkflowOutput<>(
×
624
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
625
    }
626

627
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
628
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
629

630
    DataConverter dataConverterWithChildWorkflowContext =
1✔
631
        dataConverter.withContext(
1✔
632
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
633
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
634

635
    @Nullable
636
    Memo memo =
637
        (input.getOptions().getMemo() != null)
1✔
638
            ? Memo.newBuilder()
1✔
639
                .putAllFields(
1✔
640
                    intoPayloadMap(
1✔
641
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
642
                .build()
1✔
643
            : null;
1✔
644

645
    StartChildWorkflowExecutionParameters parameters =
1✔
646
        createChildWorkflowParameters(
1✔
647
            input.getWorkflowId(),
1✔
648
            input.getWorkflowType(),
1✔
649
            input.getOptions(),
1✔
650
            input.getHeader(),
1✔
651
            payloads,
652
            memo);
653

654
    Functions.Proc1<Exception> cancellationCallback =
1✔
655
        replayContext.startChildWorkflow(
1✔
656
            parameters,
657
            (execution, failure) -> {
658
              if (failure != null) {
1✔
659
                runner.executeInWorkflowThread(
1✔
660
                    "child workflow start failed callback",
661
                    () ->
662
                        executionPromise.completeExceptionally(
1✔
663
                            mapChildWorkflowException(
1✔
664
                                failure, dataConverterWithChildWorkflowContext)));
665
              } else {
666
                runner.executeInWorkflowThread(
1✔
667
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
668
              }
669
            },
1✔
670
            (result, failure) -> {
671
              if (failure != null) {
1✔
672
                runner.executeInWorkflowThread(
1✔
673
                    "child workflow failure callback",
674
                    () ->
675
                        resultPromise.completeExceptionally(
1✔
676
                            mapChildWorkflowException(
1✔
677
                                failure, dataConverterWithChildWorkflowContext)));
678
              } else {
679
                runner.executeInWorkflowThread(
1✔
680
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
681
              }
682
            });
1✔
683
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
684
    CancellationScope.current()
1✔
685
        .getCancellationRequest()
1✔
686
        .thenApply(
1✔
687
            (reason) -> {
688
              if (!callbackCalled.getAndSet(true)) {
1✔
689
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
690
              }
691
              return null;
1✔
692
            });
693

694
    Promise<R> result =
1✔
695
        resultPromise.thenApply(
1✔
696
            (b) ->
697
                dataConverterWithChildWorkflowContext.fromPayloads(
1✔
698
                    0, b, input.getResultClass(), input.getResultType()));
1✔
699
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
700
  }
701

702
  @SuppressWarnings("deprecation")
703
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
704
      String workflowId,
705
      String name,
706
      ChildWorkflowOptions options,
707
      Header header,
708
      Optional<Payloads> input,
709
      @Nullable Memo memo) {
710
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
711
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
712
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
713
    attributes.setWorkflowId(workflowId);
1✔
714
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
715
    input.ifPresent(attributes::setInput);
1✔
716
    attributes.setWorkflowRunTimeout(
1✔
717
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
718
    attributes.setWorkflowExecutionTimeout(
1✔
719
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
720
    attributes.setWorkflowTaskTimeout(
1✔
721
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
722
    String taskQueue = options.getTaskQueue();
1✔
723
    if (taskQueue != null) {
1✔
724
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
725
    }
726
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
727
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
728
    }
729
    RetryOptions retryOptions = options.getRetryOptions();
1✔
730
    if (retryOptions != null) {
1✔
731
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
732
    }
733
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
734

735
    if (memo != null) {
1✔
736
      attributes.setMemo(memo);
1✔
737
    }
738

739
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
740
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
741
      if (options.getTypedSearchAttributes() != null) {
1✔
742
        throw new IllegalArgumentException(
×
743
            "Cannot have both typed search attributes and search attributes");
744
      }
745
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
746
    } else if (options.getTypedSearchAttributes() != null) {
1✔
747
      attributes.setSearchAttributes(
1✔
748
          SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
749
    }
750

751
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
752
    if (propagators == null) {
1✔
753
      propagators = this.contextPropagators;
1✔
754
    }
755
    io.temporal.api.common.v1.Header grpcHeader =
1✔
756
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
757
    attributes.setHeader(grpcHeader);
1✔
758

759
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
760
    if (parentClosePolicy != null) {
1✔
761
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
762
    }
763

764
    if (options.getVersioningIntent() != null) {
1✔
765
      attributes.setUseCompatibleVersion(
1✔
766
          options
767
              .getVersioningIntent()
1✔
768
              .determineUseCompatibleFlag(
1✔
769
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
770
    }
771
    return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType());
1✔
772
  }
773

774
  private static Header extractContextsAndConvertToBytes(
775
      List<ContextPropagator> contextPropagators) {
776
    if (contextPropagators == null) {
1✔
777
      return null;
×
778
    }
779
    Map<String, Payload> result = new HashMap<>();
1✔
780
    for (ContextPropagator propagator : contextPropagators) {
1✔
781
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
782
    }
1✔
783
    return new Header(result);
1✔
784
  }
785

786
  private static RuntimeException mapChildWorkflowException(
787
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
788
    if (failure == null) {
1✔
789
      return null;
×
790
    }
791
    if (failure instanceof TemporalFailure) {
1✔
792
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
793
    }
794
    if (failure instanceof CanceledFailure) {
1✔
795
      return (CanceledFailure) failure;
1✔
796
    }
797
    if (failure instanceof WorkflowException) {
1✔
798
      return (RuntimeException) failure;
×
799
    }
800
    if (failure instanceof ChildWorkflowFailure) {
1✔
801
      return (ChildWorkflowFailure) failure;
1✔
802
    }
803
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
804
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
805
    }
806
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
807
    Throwable cause =
1✔
808
        dataConverterWithChildWorkflowContext.failureToException(
1✔
809
            taskFailed.getOriginalCauseFailure());
1✔
810
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
811
    return new ChildWorkflowFailure(
1✔
812
        exception.getInitiatedEventId(),
1✔
813
        exception.getStartedEventId(),
1✔
814
        exception.getWorkflowType(),
1✔
815
        exception.getExecution(),
1✔
816
        exception.getNamespace(),
1✔
817
        exception.getRetryState(),
1✔
818
        cause);
819
  }
820

821
  @Override
822
  public Promise<Void> newTimer(Duration delay) {
823
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
824
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
825
        replayContext.newTimer(
1✔
826
            delay,
827
            (e) ->
828
                runner.executeInWorkflowThread(
1✔
829
                    "timer-callback",
830
                    () -> {
831
                      if (e == null) {
1✔
832
                        p.complete(null);
1✔
833
                      } else {
834
                        p.completeExceptionally(e);
1✔
835
                      }
836
                    }));
1✔
837
    CancellationScope.current()
1✔
838
        .getCancellationRequest()
1✔
839
        .thenApply(
1✔
840
            (r) -> {
841
              cancellationHandler.apply(new CanceledFailure(r));
1✔
842
              return r;
1✔
843
            });
844
    return p;
1✔
845
  }
846

847
  @Override
848
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
849
    try {
850
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
851
      replayContext.sideEffect(
1✔
852
          () -> {
853
            try {
854
              readOnly = true;
1✔
855
              R r = func.apply();
1✔
856
              return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
857
            } finally {
858
              readOnly = false;
1✔
859
            }
860
          },
861
          (p) ->
862
              runner.executeInWorkflowThread(
1✔
863
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
864
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
865
          0, result.get(), resultClass, resultType);
1✔
866
    } catch (Exception e) {
1✔
867
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
868
      // fail the workflow task by throwing an Error.
869
      throw new Error(e);
1✔
870
    }
871
  }
872

873
  @Override
874
  public <R> R mutableSideEffect(
875
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
876
    try {
877
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
878
    } catch (Exception e) {
1✔
879
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
880
      // behavior. So fail the workflow task by throwing an Error.
881
      throw new Error(e);
1✔
882
    }
883
  }
884

885
  private <R> R mutableSideEffectImpl(
886
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
887
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
888
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
889
    replayContext.mutableSideEffect(
1✔
890
        id,
891
        (storedBinary) -> {
892
          Optional<R> stored =
1✔
893
              storedBinary.map(
1✔
894
                  (b) ->
895
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
896
                          0, Optional.of(b), resultClass, resultType));
1✔
897
          try {
898
            readOnly = true;
1✔
899
            R funcResult =
1✔
900
                Objects.requireNonNull(
1✔
901
                    func.apply(), "mutableSideEffect function " + "returned null");
1✔
902
            if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
903
              unserializedResult.set(funcResult);
1✔
904
              return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
905
            }
906
            return Optional.empty(); // returned only when value doesn't need to be updated
1✔
907
          } finally {
908
            readOnly = false;
1✔
909
          }
910
        },
911
        (p) ->
912
            runner.executeInWorkflowThread(
1✔
913
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
914

915
    if (!result.get().isPresent()) {
1✔
916
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
917
    }
918
    // An optimization that avoids unnecessary deserialization of the result.
919
    R unserialized = unserializedResult.get();
1✔
920
    if (unserialized != null) {
1✔
921
      return unserialized;
1✔
922
    }
923
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
924
        0, result.get(), resultClass, resultType);
1✔
925
  }
926

927
  @Override
928
  public int getVersion(String changeId, int minSupported, int maxSupported) {
929
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
930
    boolean markerExists =
1✔
931
        replayContext.getVersion(
1✔
932
            changeId,
933
            minSupported,
934
            maxSupported,
935
            (v, e) ->
936
                runner.executeInWorkflowThread(
1✔
937
                    "version-callback",
938
                    () -> {
939
                      if (v != null) {
1✔
940
                        result.complete(v);
1✔
941
                      } else {
942
                        result.completeExceptionally(e);
1✔
943
                      }
944
                    }));
1✔
945
    /*
946
     * If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist
947
     * on the original execution. If the call did not exist on the original execution then we cannot block on results
948
     * because it can lead to non-deterministic scheduling.
949
     * */
950
    if (replayContext.isReplaying()
1✔
951
        && !markerExists
952
        && replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
1✔
953
        && minSupported == DEFAULT_VERSION) {
954
      return DEFAULT_VERSION;
1✔
955
    }
956

957
    try {
958
      return result.get();
1✔
959
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
960
      throw new UnsupportedVersion(ex);
1✔
961
    }
962
  }
963

964
  @Override
965
  public void registerQuery(RegisterQueryInput request) {
966
    queryDispatcher.registerQueryHandlers(request);
1✔
967
  }
1✔
968

969
  @Override
970
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
971
    signalDispatcher.registerSignalHandlers(input);
1✔
972
  }
1✔
973

974
  @Override
975
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
976
    updateDispatcher.registerUpdateHandlers(input);
1✔
977
  }
1✔
978

979
  @Override
980
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
981
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
982
  }
1✔
983

984
  @Override
985
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
986
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
987
  }
1✔
988

989
  @Override
990
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
991
    updateDispatcher.registerDynamicUpdateHandler(input);
×
992
  }
×
993

994
  @Override
995
  public UUID randomUUID() {
996
    return replayContext.randomUUID();
1✔
997
  }
998

999
  @Override
1000
  public Random newRandom() {
1001
    return replayContext.newRandom();
1✔
1002
  }
1003

1004
  public DataConverter getDataConverter() {
1005
    return dataConverter;
1✔
1006
  }
1007

1008
  boolean isReplaying() {
1009
    return replayContext.isReplaying();
1✔
1010
  }
1011

1012
  boolean isReadOnly() {
1013
    return readOnly;
1✔
1014
  }
1015

1016
  void setReadOnly(boolean readOnly) {
1017
    this.readOnly = readOnly;
1✔
1018
  }
1✔
1019

1020
  @Override
1021
  public ReplayWorkflowContext getReplayContext() {
1022
    return replayContext;
1✔
1023
  }
1024

1025
  @Override
1026
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
1027
    WorkflowExecution childExecution = input.getExecution();
1✔
1028
    DataConverter dataConverterWithChildWorkflowContext =
1✔
1029
        dataConverter.withContext(
1✔
1030
            new WorkflowSerializationContext(
1031
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
1032
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
1033
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
1034
    attributes.setSignalName(input.getSignalName());
1✔
1035
    attributes.setExecution(childExecution);
1✔
1036
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
1037
    payloads.ifPresent(attributes::setInput);
1✔
1038
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
1039
    Functions.Proc1<Exception> cancellationCallback =
1✔
1040
        replayContext.signalExternalWorkflowExecution(
1✔
1041
            attributes,
1042
            (output, failure) -> {
1043
              if (failure != null) {
1✔
1044
                runner.executeInWorkflowThread(
1✔
1045
                    "child workflow failure callback",
1046
                    () ->
1047
                        result.completeExceptionally(
1✔
1048
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
1049
              } else {
1050
                runner.executeInWorkflowThread(
1✔
1051
                    "child workflow completion callback", () -> result.complete(output));
1✔
1052
              }
1053
            });
1✔
1054
    CancellationScope.current()
1✔
1055
        .getCancellationRequest()
1✔
1056
        .thenApply(
1✔
1057
            (reason) -> {
1058
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
1059
              return null;
1✔
1060
            });
1061
    return new SignalExternalOutput(result);
1✔
1062
  }
1063

1064
  @Override
1065
  public void sleep(Duration duration) {
1066
    newTimer(duration).get();
1✔
1067
  }
1✔
1068

1069
  @Override
1070
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1071
    Promise<Void> timer = newTimer(timeout);
1✔
1072
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1073
    return !timer.isCompleted();
1✔
1074
  }
1075

1076
  @Override
1077
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1078
    WorkflowThread.await(reason, unblockCondition);
1✔
1079
  }
1✔
1080

1081
  @SuppressWarnings("deprecation")
1082
  @Override
1083
  public void continueAsNew(ContinueAsNewInput input) {
1084
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1085
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1086
    String workflowType = input.getWorkflowType();
1✔
1087
    if (workflowType != null) {
1✔
1088
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1089
    }
1090
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1091
    if (options != null) {
1✔
1092
      if (options.getWorkflowRunTimeout() != null) {
1✔
1093
        attributes.setWorkflowRunTimeout(
×
1094
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1095
      }
1096
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1097
        attributes.setWorkflowTaskTimeout(
×
1098
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1099
      }
1100
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1101
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1102
      }
1103
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1104
      if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
1105
        if (options.getTypedSearchAttributes() != null) {
×
1106
          throw new IllegalArgumentException(
×
1107
              "Cannot have typed search attributes and search attributes");
1108
        }
1109
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
×
1110
      } else if (options.getTypedSearchAttributes() != null) {
1✔
1111
        attributes.setSearchAttributes(
1✔
1112
            SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
1113
      }
1114
      Map<String, Object> memo = options.getMemo();
1✔
1115
      if (memo != null) {
1✔
1116
        attributes.setMemo(
1✔
1117
            Memo.newBuilder()
1✔
1118
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1119
      }
1120
      if (options.getVersioningIntent() != null) {
1✔
1121
        attributes.setUseCompatibleVersion(
×
1122
            options
1123
                .getVersioningIntent()
×
1124
                .determineUseCompatibleFlag(
×
1125
                    replayContext.getTaskQueue().equals(options.getTaskQueue())));
×
1126
      }
1127
    }
1128

1129
    List<ContextPropagator> propagators =
1130
        options != null && options.getContextPropagators() != null
1✔
1131
            ? options.getContextPropagators()
×
1132
            : this.contextPropagators;
1✔
1133
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1134
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1135
    attributes.setHeader(grpcHeader);
1✔
1136

1137
    Optional<Payloads> payloads =
1✔
1138
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1139
    payloads.ifPresent(attributes::setInput);
1✔
1140

1141
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1142
    WorkflowThread.exit();
×
1143
  }
×
1144

1145
  @Override
1146
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1147
    CompletablePromise<Void> result = Workflow.newPromise();
×
1148
    replayContext.requestCancelExternalWorkflowExecution(
×
1149
        input.getExecution(),
×
1150
        (r, exception) -> {
1151
          if (exception == null) {
×
1152
            result.complete(null);
×
1153
          } else {
1154
            result.completeExceptionally(exception);
×
1155
          }
1156
        });
×
1157
    return new CancelWorkflowOutput(result);
×
1158
  }
1159

1160
  public Scope getMetricsScope() {
1161
    return replayContext.getMetricsScope();
1✔
1162
  }
1163

1164
  public boolean isLoggingEnabledInReplay() {
1165
    return replayContext.getEnableLoggingInReplay();
×
1166
  }
1167

1168
  @Override
1169
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1170
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1171
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1172
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1173
    replayContext.upsertSearchAttributes(attr);
1✔
1174
  }
1✔
1175

1176
  @Override
1177
  public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
1178
    SearchAttributes attr = SearchAttributesUtil.encodeTypedUpdates(searchAttributeUpdates);
1✔
1179
    replayContext.upsertSearchAttributes(attr);
1✔
1180
  }
1✔
1181

1182
  @Nonnull
1183
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1184
    return runner.newWorkflowThread(runnable, false, name);
1✔
1185
  }
1186

1187
  @Nonnull
1188
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1189
    return runner.newCallbackThread(runnable, name);
1✔
1190
  }
1191

1192
  @Override
1193
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1194
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1195
  }
1196

1197
  @Override
1198
  public long currentTimeMillis() {
1199
    return replayContext.currentTimeMillis();
1✔
1200
  }
1201

1202
  /**
1203
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1204
   * thread and should be replaced with another specific implementation during initialization stage
1205
   * {@code workflow.initialize()} performed inside the workflow root thread.
1206
   *
1207
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1208
   */
1209
  private static final class InitialWorkflowInboundCallsInterceptor
1210
      extends BaseRootWorkflowInboundCallsInterceptor {
1211

1212
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1213
      super(workflowContext);
1✔
1214
    }
1✔
1215

1216
    @Override
1217
    public WorkflowOutput execute(WorkflowInput input) {
1218
      throw new UnsupportedOperationException(
×
1219
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1220
              + "before #execute can be called");
1221
    }
1222
  }
1223

1224
  @Nonnull
1225
  @Override
1226
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1227
    return workflowImplementationOptions;
1✔
1228
  }
1229

1230
  @Override
1231
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1232
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1233
  }
1234

1235
  @Nullable
1236
  @Override
1237
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1238
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1239
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1240
  }
1241

1242
  @Override
1243
  public List<ContextPropagator> getContextPropagators() {
1244
    return contextPropagators;
1✔
1245
  }
1246

1247
  @Override
1248
  public Map<String, Object> getPropagatedContexts() {
1249
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1250
      return new HashMap<>();
1✔
1251
    }
1252

1253
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1254
    Map<String, Object> contextData = new HashMap<>();
1✔
1255
    for (ContextPropagator propagator : contextPropagators) {
1✔
1256
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1257
    }
1✔
1258

1259
    return contextData;
1✔
1260
  }
1261

1262
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1263
  private static class FailureWrapperException extends RuntimeException {
1264
    private final Failure failure;
1265

1266
    public FailureWrapperException(Failure failure) {
1✔
1267
      this.failure = failure;
1✔
1268
    }
1✔
1269

1270
    public Failure getFailure() {
1271
      return failure;
1✔
1272
    }
1273
  }
1274
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc