• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #201

16 Oct 2023 03:47PM UTC coverage: 77.389% (+0.02%) from 77.368%
#201

push

github-actions

web-flow
Apply data converter context in more places (#1896)

Add data converter context to memo, lastFailure and schedules

23 of 23 new or added lines in 5 files covered. (100.0%)

18718 of 24187 relevant lines covered (77.39%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.41
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
26
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
27

28
import com.google.common.base.MoreObjects;
29
import com.google.common.base.Preconditions;
30
import com.uber.m3.tally.Scope;
31
import io.temporal.activity.ActivityOptions;
32
import io.temporal.activity.LocalActivityOptions;
33
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
35
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
36
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
37
import io.temporal.api.common.v1.ActivityType;
38
import io.temporal.api.common.v1.Memo;
39
import io.temporal.api.common.v1.Payload;
40
import io.temporal.api.common.v1.Payloads;
41
import io.temporal.api.common.v1.SearchAttributes;
42
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.api.common.v1.WorkflowType;
44
import io.temporal.api.enums.v1.ParentClosePolicy;
45
import io.temporal.api.failure.v1.Failure;
46
import io.temporal.api.history.v1.HistoryEvent;
47
import io.temporal.api.taskqueue.v1.TaskQueue;
48
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
49
import io.temporal.client.WorkflowException;
50
import io.temporal.common.RetryOptions;
51
import io.temporal.common.SearchAttributeUpdate;
52
import io.temporal.common.context.ContextPropagator;
53
import io.temporal.common.converter.DataConverter;
54
import io.temporal.common.interceptors.Header;
55
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
56
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
57
import io.temporal.failure.*;
58
import io.temporal.internal.common.ActivityOptionUtils;
59
import io.temporal.internal.common.HeaderUtils;
60
import io.temporal.internal.common.OptionsUtils;
61
import io.temporal.internal.common.ProtobufTimeUtils;
62
import io.temporal.internal.common.SdkFlag;
63
import io.temporal.internal.common.SearchAttributesUtil;
64
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
65
import io.temporal.internal.replay.ReplayWorkflowContext;
66
import io.temporal.internal.replay.WorkflowContext;
67
import io.temporal.internal.statemachines.*;
68
import io.temporal.payload.context.ActivitySerializationContext;
69
import io.temporal.payload.context.WorkflowSerializationContext;
70
import io.temporal.worker.WorkflowImplementationOptions;
71
import io.temporal.workflow.CancellationScope;
72
import io.temporal.workflow.ChildWorkflowOptions;
73
import io.temporal.workflow.CompletablePromise;
74
import io.temporal.workflow.ContinueAsNewOptions;
75
import io.temporal.workflow.Functions;
76
import io.temporal.workflow.Functions.Func;
77
import io.temporal.workflow.Promise;
78
import io.temporal.workflow.Workflow;
79
import java.lang.reflect.Type;
80
import java.time.Duration;
81
import java.time.Instant;
82
import java.util.*;
83
import java.util.concurrent.atomic.AtomicBoolean;
84
import java.util.concurrent.atomic.AtomicReference;
85
import java.util.function.BiPredicate;
86
import java.util.function.Supplier;
87
import javax.annotation.Nonnull;
88
import javax.annotation.Nullable;
89
import org.slf4j.Logger;
90
import org.slf4j.LoggerFactory;
91

92
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
93
// RootWorkflowOutboundInterceptor
94

95
/**
96
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
97
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
98
 * definition on the worker meets ReplayWorkflowContext which contains information from the
99
 * WorkflowTask
100
 */
101
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
102
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
103

104
  private final String namespace;
105
  private final WorkflowExecution workflowExecution;
106
  private final WorkflowImplementationOptions workflowImplementationOptions;
107
  private final DataConverter dataConverter;
108
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
109
  private final DataConverter dataConverterWithCurrentWorkflowContext;
110
  private final List<ContextPropagator> contextPropagators;
111
  private final SignalDispatcher signalDispatcher;
112
  private final QueryDispatcher queryDispatcher;
113
  private final UpdateDispatcher updateDispatcher;
114

115
  // initialized later when these entities are created
116
  private ReplayWorkflowContext replayContext;
117
  private DeterministicRunner runner;
118

119
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
120
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
121

122
  private ActivityOptions defaultActivityOptions = null;
1✔
123
  private Map<String, ActivityOptions> activityOptionsMap;
124
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
125
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
126
  private boolean readOnly = false;
1✔
127

128
  public SyncWorkflowContext(
129
      @Nonnull String namespace,
130
      @Nonnull WorkflowExecution workflowExecution,
131
      SignalDispatcher signalDispatcher,
132
      QueryDispatcher queryDispatcher,
133
      UpdateDispatcher updateDispatcher,
134
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
135
      DataConverter dataConverter,
136
      List<ContextPropagator> contextPropagators) {
1✔
137
    this.namespace = namespace;
1✔
138
    this.workflowExecution = workflowExecution;
1✔
139
    this.dataConverter = dataConverter;
1✔
140
    this.dataConverterWithCurrentWorkflowContext =
1✔
141
        dataConverter.withContext(
1✔
142
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
143
    this.contextPropagators = contextPropagators;
1✔
144
    this.signalDispatcher = signalDispatcher;
1✔
145
    this.queryDispatcher = queryDispatcher;
1✔
146
    this.updateDispatcher = updateDispatcher;
1✔
147
    if (workflowImplementationOptions != null) {
1✔
148
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
149
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
150
      this.defaultLocalActivityOptions =
1✔
151
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
152
      this.localActivityOptionsMap =
1✔
153
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
154
    }
155
    this.workflowImplementationOptions =
1✔
156
        workflowImplementationOptions == null
1✔
157
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
158
            : workflowImplementationOptions;
1✔
159
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
160
    // with actual interceptors through #initHeadInboundCallsInterceptor and
161
    // #initHeadOutboundCallsInterceptor during initialization phase.
162
    // See workflow.initialize() performed inside the workflow root thread inside
163
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
164
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
165
    this.headOutboundInterceptor = this;
1✔
166
  }
1✔
167

168
  public void setReplayContext(ReplayWorkflowContext context) {
169
    this.replayContext = context;
1✔
170
  }
1✔
171

172
  /**
173
   * Using setter, as runner is initialized with this context, so it is not ready during
174
   * construction of this.
175
   */
176
  public void setRunner(DeterministicRunner runner) {
177
    this.runner = runner;
1✔
178
  }
1✔
179

180
  public DeterministicRunner getRunner() {
181
    return runner;
×
182
  }
183

184
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
185
    return headOutboundInterceptor;
1✔
186
  }
187

188
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
189
    return headInboundInterceptor;
1✔
190
  }
191

192
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
193
    headOutboundInterceptor = head;
1✔
194
  }
1✔
195

196
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
197
    headInboundInterceptor = head;
1✔
198
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
199
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
200
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
201
  }
1✔
202

203
  public ActivityOptions getDefaultActivityOptions() {
204
    return defaultActivityOptions;
1✔
205
  }
206

207
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
208
    return activityOptionsMap != null
1✔
209
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
210
        : Collections.emptyMap();
×
211
  }
212

213
  public LocalActivityOptions getDefaultLocalActivityOptions() {
214
    return defaultLocalActivityOptions;
1✔
215
  }
216

217
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
218
    return localActivityOptionsMap != null
1✔
219
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
220
        : Collections.emptyMap();
×
221
  }
222

223
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
224
    this.defaultActivityOptions =
1✔
225
        (this.defaultActivityOptions == null)
1✔
226
            ? defaultActivityOptions
×
227
            : this.defaultActivityOptions.toBuilder()
1✔
228
                .mergeActivityOptions(defaultActivityOptions)
1✔
229
                .build();
1✔
230
  }
1✔
231

232
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
233
    Objects.requireNonNull(activityTypeToOption);
1✔
234
    if (this.activityOptionsMap == null) {
1✔
235
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
236
      return;
×
237
    }
238
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
239
  }
1✔
240

241
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
242
    this.defaultLocalActivityOptions =
1✔
243
        (this.defaultLocalActivityOptions == null)
1✔
244
            ? defaultLocalActivityOptions
×
245
            : this.defaultLocalActivityOptions.toBuilder()
1✔
246
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
247
                .build();
1✔
248
  }
1✔
249

250
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
251
    Objects.requireNonNull(activityTypeToOption);
1✔
252
    if (this.localActivityOptionsMap == null) {
1✔
253
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
254
      return;
×
255
    }
256
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
257
        localActivityOptionsMap, activityTypeToOption);
258
  }
1✔
259

260
  @Override
261
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
262
    ActivitySerializationContext serializationContext =
1✔
263
        new ActivitySerializationContext(
264
            replayContext.getNamespace(),
1✔
265
            replayContext.getWorkflowId(),
1✔
266
            replayContext.getWorkflowType().getName(),
1✔
267
            input.getActivityName(),
1✔
268
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
269
            // by the Server in this case
270
            MoreObjects.firstNonNull(
1✔
271
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
272
            false);
273
    DataConverter dataConverterWithActivityContext =
1✔
274
        dataConverter.withContext(serializationContext);
1✔
275
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
276

277
    ActivityOutput<Optional<Payloads>> output =
1✔
278
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
279

280
    return new ActivityOutput<>(
1✔
281
        output.getActivityId(),
1✔
282
        output
283
            .getResult()
1✔
284
            .handle(
1✔
285
                (r, f) -> {
286
                  if (f == null) {
1✔
287
                    return input.getResultType() != Void.TYPE
1✔
288
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
289
                            0, r, input.getResultClass(), input.getResultType())
1✔
290
                        : null;
1✔
291
                  } else {
292
                    throw dataConverterWithActivityContext.failureToException(
1✔
293
                        ((FailureWrapperException) f).getFailure());
1✔
294
                  }
295
                }));
296
  }
297

298
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
299
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
300
    ExecuteActivityParameters params =
1✔
301
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
302
    ActivityCallback callback = new ActivityCallback();
1✔
303
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
304
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
305
    CancellationScope.current()
1✔
306
        .getCancellationRequest()
1✔
307
        .thenApply(
1✔
308
            (reason) -> {
309
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
310
              return null;
1✔
311
            });
312
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
313
  }
314

315
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
316
    signalDispatcher.handleInterceptedSignal(input);
1✔
317
  }
1✔
318

319
  public void handleSignal(
320
      String signalName, Optional<Payloads> input, long eventId, Header header) {
321
    signalDispatcher.handleSignal(signalName, input, eventId, header);
1✔
322
  }
1✔
323

324
  public void handleValidateUpdate(
325
      String updateName, Optional<Payloads> input, long eventId, Header header) {
326
    updateDispatcher.handleValidateUpdate(updateName, input, eventId, header);
1✔
327
  }
1✔
328

329
  public Optional<Payloads> handleExecuteUpdate(
330
      String updateName, Optional<Payloads> input, long eventId, Header header) {
331
    return updateDispatcher.handleExecuteUpdate(updateName, input, eventId, header);
1✔
332
  }
333

334
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
335
    updateDispatcher.handleInterceptedValidateUpdate(input);
1✔
336
  }
1✔
337

338
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
339
      WorkflowInboundCallsInterceptor.UpdateInput input) {
340
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
1✔
341
  }
342

343
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
344
      WorkflowInboundCallsInterceptor.QueryInput input) {
345
    return queryDispatcher.handleInterceptedQuery(input);
1✔
346
  }
347

348
  public Optional<Payloads> handleQuery(String queryName, Header header, Optional<Payloads> input) {
349
    return queryDispatcher.handleQuery(queryName, header, input);
1✔
350
  }
351

352
  private class ActivityCallback {
1✔
353
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
354

355
    public void invoke(Optional<Payloads> output, Failure failure) {
356
      if (failure != null) {
1✔
357
        runner.executeInWorkflowThread(
1✔
358
            "activity failure callback",
359
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
360
      } else {
361
        runner.executeInWorkflowThread(
1✔
362
            "activity completion callback", () -> result.complete(output));
1✔
363
      }
364
    }
1✔
365
  }
366

367
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
368
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
369

370
    @Override
371
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
372
      if (exception != null) {
1✔
373
        runner.executeInWorkflowThread(
1✔
374
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
375
      } else {
376
        runner.executeInWorkflowThread(
1✔
377
            "local activity completion callback", () -> result.complete(successOutput));
1✔
378
      }
379
    }
1✔
380
  }
381

382
  @Override
383
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
384
    ActivitySerializationContext serializationContext =
1✔
385
        new ActivitySerializationContext(
386
            replayContext.getNamespace(),
1✔
387
            replayContext.getWorkflowId(),
1✔
388
            replayContext.getWorkflowType().getName(),
1✔
389
            input.getActivityName(),
1✔
390
            replayContext.getTaskQueue(),
1✔
391
            true);
392
    DataConverter dataConverterWithActivityContext =
1✔
393
        dataConverter.withContext(serializationContext);
1✔
394
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
395

396
    long originalScheduledTime = System.currentTimeMillis();
1✔
397
    CompletablePromise<Optional<Payloads>> serializedResult =
398
        WorkflowInternal.newCompletablePromise();
1✔
399
    executeLocalActivityOverLocalRetryThreshold(
1✔
400
        input.getActivityName(),
1✔
401
        input.getOptions(),
1✔
402
        input.getHeader(),
1✔
403
        payloads,
404
        originalScheduledTime,
405
        1,
406
        null,
407
        serializedResult);
408

409
    Promise<R> result =
1✔
410
        serializedResult.handle(
1✔
411
            (r, f) -> {
412
              if (f == null) {
1✔
413
                return input.getResultClass() != Void.TYPE
1✔
414
                    ? dataConverterWithActivityContext.fromPayloads(
1✔
415
                        0, r, input.getResultClass(), input.getResultType())
1✔
416
                    : null;
1✔
417
              } else {
418
                throw dataConverterWithActivityContext.failureToException(
1✔
419
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
420
              }
421
            });
422

423
    return new LocalActivityOutput<>(result);
1✔
424
  }
425

426
  public void executeLocalActivityOverLocalRetryThreshold(
427
      String activityTypeName,
428
      LocalActivityOptions options,
429
      Header header,
430
      Optional<Payloads> input,
431
      long originalScheduledTime,
432
      int attempt,
433
      @Nullable Failure previousExecutionFailure,
434
      CompletablePromise<Optional<Payloads>> result) {
435
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
436
        executeLocalActivityLocally(
1✔
437
            activityTypeName,
438
            options,
439
            header,
440
            input,
441
            originalScheduledTime,
442
            attempt,
443
            previousExecutionFailure);
444

445
    localExecutionResult.handle(
1✔
446
        (r, e) -> {
447
          if (e == null) {
1✔
448
            result.complete(r);
1✔
449
          } else {
450
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
451
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
452
                  (LocalActivityCallback.LocalActivityFailedException) e;
453
              @Nullable Duration backoff = laException.getBackoff();
1✔
454
              if (backoff != null) {
1✔
455
                WorkflowInternal.newTimer(backoff)
1✔
456
                    .thenApply(
1✔
457
                        unused -> {
458
                          executeLocalActivityOverLocalRetryThreshold(
1✔
459
                              activityTypeName,
460
                              options,
461
                              header,
462
                              input,
463
                              originalScheduledTime,
464
                              laException.getLastAttempt() + 1,
1✔
465
                              laException.getFailure(),
1✔
466
                              result);
467
                          return null;
1✔
468
                        });
469
              } else {
470
                // final failure, report back
471
                result.completeExceptionally(laException);
1✔
472
              }
473
            } else {
1✔
474
              // Only LocalActivityFailedException is expected
475
              String exceptionMessage =
×
476
                  String.format(
×
477
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
478
                      activityTypeName);
479
              log.warn(exceptionMessage, e);
×
480
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
481
            }
482
          }
483
          return null;
1✔
484
        });
485
  }
1✔
486

487
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
488
      String activityTypeName,
489
      LocalActivityOptions options,
490
      Header header,
491
      Optional<Payloads> input,
492
      long originalScheduledTime,
493
      int attempt,
494
      @Nullable Failure previousExecutionFailure) {
495

496
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
497
    ExecuteLocalActivityParameters params =
1✔
498
        constructExecuteLocalActivityParameters(
1✔
499
            activityTypeName,
500
            options,
501
            header,
502
            input,
503
            attempt,
504
            originalScheduledTime,
505
            previousExecutionFailure);
506
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
507
    CancellationScope.current()
1✔
508
        .getCancellationRequest()
1✔
509
        .thenApply(
1✔
510
            (reason) -> {
511
              cancellationCallback.apply();
×
512
              return null;
×
513
            });
514
    return callback.result;
1✔
515
  }
516

517
  private ExecuteActivityParameters constructExecuteActivityParameters(
518
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
519
    String taskQueue = options.getTaskQueue();
1✔
520
    if (taskQueue == null) {
1✔
521
      taskQueue = replayContext.getTaskQueue();
1✔
522
    }
523
    ScheduleActivityTaskCommandAttributes.Builder attributes =
524
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
525
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
526
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
527
            .setScheduleToStartTimeout(
1✔
528
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
529
            .setStartToCloseTimeout(
1✔
530
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
531
            .setScheduleToCloseTimeout(
1✔
532
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
533
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
534
            .setRequestEagerExecution(
1✔
535
                !options.isEagerExecutionDisabled()
1✔
536
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
537

538
    input.ifPresent(attributes::setInput);
1✔
539
    RetryOptions retryOptions = options.getRetryOptions();
1✔
540
    if (retryOptions != null) {
1✔
541
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
542
    }
543

544
    // Set the context value.  Use the context propagators from the ActivityOptions
545
    // if present, otherwise use the ones configured on the WorkflowContext
546
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
547
    if (propagators == null) {
1✔
548
      propagators = this.contextPropagators;
1✔
549
    }
550
    io.temporal.api.common.v1.Header grpcHeader =
1✔
551
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
552
    attributes.setHeader(grpcHeader);
1✔
553

554
    if (options.getVersioningIntent() != null) {
1✔
555
      attributes.setUseCompatibleVersion(
1✔
556
          options
557
              .getVersioningIntent()
1✔
558
              .determineUseCompatibleFlag(
1✔
559
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
560
    }
561

562
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
563
  }
564

565
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
566
      String name,
567
      LocalActivityOptions options,
568
      Header header,
569
      Optional<Payloads> input,
570
      int attempt,
571
      long originalScheduledTime,
572
      @Nullable Failure previousExecutionFailure) {
573
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
574

575
    PollActivityTaskQueueResponse.Builder activityTask =
576
        PollActivityTaskQueueResponse.newBuilder()
1✔
577
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
578
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
579
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
580
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
581
            // used to pass scheduled time to the local activity code inside
582
            // ActivityExecutionContext#getInfo
583
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
584
            // into the LA queue
585
            .setScheduledTime(
1✔
586
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
587
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
588
            .setAttempt(attempt);
1✔
589

590
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
591
    if (scheduleToCloseTimeout != null) {
1✔
592
      activityTask.setScheduleToCloseTimeout(
1✔
593
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
594
    }
595

596
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
597
    if (startToCloseTimeout != null) {
1✔
598
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
599
    }
600

601
    io.temporal.api.common.v1.Header grpcHeader =
1✔
602
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
603
    activityTask.setHeader(grpcHeader);
1✔
604
    input.ifPresent(activityTask::setInput);
1✔
605
    RetryOptions retryOptions = options.getRetryOptions();
1✔
606
    activityTask.setRetryPolicy(
1✔
607
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
608
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
609
    if (localRetryThreshold == null) {
1✔
610
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
611
    }
612

613
    return new ExecuteLocalActivityParameters(
1✔
614
        activityTask,
615
        options.getScheduleToStartTimeout(),
1✔
616
        originalScheduledTime,
617
        previousExecutionFailure,
618
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
619
        localRetryThreshold);
620
  }
621

622
  @Override
623
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
624
    if (CancellationScope.current().isCancelRequested()) {
1✔
625
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
626
      return new ChildWorkflowOutput<>(
×
627
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
628
    }
629

630
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
631
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
632

633
    DataConverter dataConverterWithChildWorkflowContext =
1✔
634
        dataConverter.withContext(
1✔
635
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
636
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
637

638
    @Nullable
639
    Memo memo =
640
        (input.getOptions().getMemo() != null)
1✔
641
            ? Memo.newBuilder()
1✔
642
                .putAllFields(
1✔
643
                    intoPayloadMap(
1✔
644
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
645
                .build()
1✔
646
            : null;
1✔
647

648
    StartChildWorkflowExecutionParameters parameters =
1✔
649
        createChildWorkflowParameters(
1✔
650
            input.getWorkflowId(),
1✔
651
            input.getWorkflowType(),
1✔
652
            input.getOptions(),
1✔
653
            input.getHeader(),
1✔
654
            payloads,
655
            memo);
656

657
    Functions.Proc1<Exception> cancellationCallback =
1✔
658
        replayContext.startChildWorkflow(
1✔
659
            parameters,
660
            (execution, failure) -> {
661
              if (failure != null) {
1✔
662
                runner.executeInWorkflowThread(
1✔
663
                    "child workflow start failed callback",
664
                    () ->
665
                        executionPromise.completeExceptionally(
1✔
666
                            mapChildWorkflowException(
1✔
667
                                failure, dataConverterWithChildWorkflowContext)));
668
              } else {
669
                runner.executeInWorkflowThread(
1✔
670
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
671
              }
672
            },
1✔
673
            (result, failure) -> {
674
              if (failure != null) {
1✔
675
                runner.executeInWorkflowThread(
1✔
676
                    "child workflow failure callback",
677
                    () ->
678
                        resultPromise.completeExceptionally(
1✔
679
                            mapChildWorkflowException(
1✔
680
                                failure, dataConverterWithChildWorkflowContext)));
681
              } else {
682
                runner.executeInWorkflowThread(
1✔
683
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
684
              }
685
            });
1✔
686
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
687
    CancellationScope.current()
1✔
688
        .getCancellationRequest()
1✔
689
        .thenApply(
1✔
690
            (reason) -> {
691
              if (!callbackCalled.getAndSet(true)) {
1✔
692
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
693
              }
694
              return null;
1✔
695
            });
696

697
    Promise<R> result =
1✔
698
        resultPromise.thenApply(
1✔
699
            (b) ->
700
                dataConverterWithChildWorkflowContext.fromPayloads(
1✔
701
                    0, b, input.getResultClass(), input.getResultType()));
1✔
702
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
703
  }
704

705
  @SuppressWarnings("deprecation")
706
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
707
      String workflowId,
708
      String name,
709
      ChildWorkflowOptions options,
710
      Header header,
711
      Optional<Payloads> input,
712
      @Nullable Memo memo) {
713
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
714
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
715
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
716
    attributes.setWorkflowId(workflowId);
1✔
717
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
718
    input.ifPresent(attributes::setInput);
1✔
719
    attributes.setWorkflowRunTimeout(
1✔
720
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
721
    attributes.setWorkflowExecutionTimeout(
1✔
722
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
723
    attributes.setWorkflowTaskTimeout(
1✔
724
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
725
    String taskQueue = options.getTaskQueue();
1✔
726
    if (taskQueue != null) {
1✔
727
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
728
    }
729
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
730
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
731
    }
732
    RetryOptions retryOptions = options.getRetryOptions();
1✔
733
    if (retryOptions != null) {
1✔
734
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
735
    }
736
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
737

738
    if (memo != null) {
1✔
739
      attributes.setMemo(memo);
1✔
740
    }
741

742
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
743
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
744
      if (options.getTypedSearchAttributes() != null) {
1✔
745
        throw new IllegalArgumentException(
×
746
            "Cannot have both typed search attributes and search attributes");
747
      }
748
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
749
    } else if (options.getTypedSearchAttributes() != null) {
1✔
750
      attributes.setSearchAttributes(
1✔
751
          SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
752
    }
753

754
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
755
    if (propagators == null) {
1✔
756
      propagators = this.contextPropagators;
1✔
757
    }
758
    io.temporal.api.common.v1.Header grpcHeader =
1✔
759
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
760
    attributes.setHeader(grpcHeader);
1✔
761

762
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
763
    if (parentClosePolicy != null) {
1✔
764
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
765
    }
766

767
    if (options.getVersioningIntent() != null) {
1✔
768
      attributes.setUseCompatibleVersion(
1✔
769
          options
770
              .getVersioningIntent()
1✔
771
              .determineUseCompatibleFlag(
1✔
772
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
773
    }
774
    return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType());
1✔
775
  }
776

777
  private static Header extractContextsAndConvertToBytes(
778
      List<ContextPropagator> contextPropagators) {
779
    if (contextPropagators == null) {
1✔
780
      return null;
×
781
    }
782
    Map<String, Payload> result = new HashMap<>();
1✔
783
    for (ContextPropagator propagator : contextPropagators) {
1✔
784
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
785
    }
1✔
786
    return new Header(result);
1✔
787
  }
788

789
  private static RuntimeException mapChildWorkflowException(
790
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
791
    if (failure == null) {
1✔
792
      return null;
×
793
    }
794
    if (failure instanceof TemporalFailure) {
1✔
795
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
796
    }
797
    if (failure instanceof CanceledFailure) {
1✔
798
      return (CanceledFailure) failure;
1✔
799
    }
800
    if (failure instanceof WorkflowException) {
1✔
801
      return (RuntimeException) failure;
×
802
    }
803
    if (failure instanceof ChildWorkflowFailure) {
1✔
804
      return (ChildWorkflowFailure) failure;
1✔
805
    }
806
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
807
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
808
    }
809
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
810
    Throwable cause =
1✔
811
        dataConverterWithChildWorkflowContext.failureToException(
1✔
812
            taskFailed.getOriginalCauseFailure());
1✔
813
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
814
    return new ChildWorkflowFailure(
1✔
815
        exception.getInitiatedEventId(),
1✔
816
        exception.getStartedEventId(),
1✔
817
        exception.getWorkflowType(),
1✔
818
        exception.getExecution(),
1✔
819
        exception.getNamespace(),
1✔
820
        exception.getRetryState(),
1✔
821
        cause);
822
  }
823

824
  @Override
825
  public Promise<Void> newTimer(Duration delay) {
826
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
827
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
828
        replayContext.newTimer(
1✔
829
            delay,
830
            (e) ->
831
                runner.executeInWorkflowThread(
1✔
832
                    "timer-callback",
833
                    () -> {
834
                      if (e == null) {
1✔
835
                        p.complete(null);
1✔
836
                      } else {
837
                        p.completeExceptionally(e);
1✔
838
                      }
839
                    }));
1✔
840
    CancellationScope.current()
1✔
841
        .getCancellationRequest()
1✔
842
        .thenApply(
1✔
843
            (r) -> {
844
              cancellationHandler.apply(new CanceledFailure(r));
1✔
845
              return r;
1✔
846
            });
847
    return p;
1✔
848
  }
849

850
  @Override
851
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
852
    try {
853
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
854
      replayContext.sideEffect(
1✔
855
          () -> {
856
            try {
857
              readOnly = true;
1✔
858
              R r = func.apply();
1✔
859
              return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
860
            } finally {
861
              readOnly = false;
1✔
862
            }
863
          },
864
          (p) ->
865
              runner.executeInWorkflowThread(
1✔
866
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
867
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
868
          0, result.get(), resultClass, resultType);
1✔
869
    } catch (Exception e) {
1✔
870
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
871
      // fail the workflow task by throwing an Error.
872
      throw new Error(e);
1✔
873
    }
874
  }
875

876
  @Override
877
  public <R> R mutableSideEffect(
878
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
879
    try {
880
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
881
    } catch (Exception e) {
1✔
882
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
883
      // behavior. So fail the workflow task by throwing an Error.
884
      throw new Error(e);
1✔
885
    }
886
  }
887

888
  private <R> R mutableSideEffectImpl(
889
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
890
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
891
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
892
    replayContext.mutableSideEffect(
1✔
893
        id,
894
        (storedBinary) -> {
895
          Optional<R> stored =
1✔
896
              storedBinary.map(
1✔
897
                  (b) ->
898
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
899
                          0, Optional.of(b), resultClass, resultType));
1✔
900
          try {
901
            readOnly = true;
1✔
902
            R funcResult =
1✔
903
                Objects.requireNonNull(
1✔
904
                    func.apply(), "mutableSideEffect function " + "returned null");
1✔
905
            if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
906
              unserializedResult.set(funcResult);
1✔
907
              return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
908
            }
909
            return Optional.empty(); // returned only when value doesn't need to be updated
1✔
910
          } finally {
911
            readOnly = false;
1✔
912
          }
913
        },
914
        (p) ->
915
            runner.executeInWorkflowThread(
1✔
916
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
917

918
    if (!result.get().isPresent()) {
1✔
919
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
920
    }
921
    // An optimization that avoids unnecessary deserialization of the result.
922
    R unserialized = unserializedResult.get();
1✔
923
    if (unserialized != null) {
1✔
924
      return unserialized;
1✔
925
    }
926
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
927
        0, result.get(), resultClass, resultType);
1✔
928
  }
929

930
  @Override
931
  public int getVersion(String changeId, int minSupported, int maxSupported) {
932
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
933
    boolean markerExists =
1✔
934
        replayContext.getVersion(
1✔
935
            changeId,
936
            minSupported,
937
            maxSupported,
938
            (v, e) ->
939
                runner.executeInWorkflowThread(
1✔
940
                    "version-callback",
941
                    () -> {
942
                      if (v != null) {
1✔
943
                        result.complete(v);
1✔
944
                      } else {
945
                        result.completeExceptionally(e);
1✔
946
                      }
947
                    }));
1✔
948
    /*
949
     * If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist
950
     * on the original execution. If the call did not exist on the original execution then we cannot block on results
951
     * because it can lead to non-deterministic scheduling.
952
     * */
953
    if (replayContext.isReplaying()
1✔
954
        && !markerExists
955
        && replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
1✔
956
        && minSupported == DEFAULT_VERSION) {
957
      return DEFAULT_VERSION;
1✔
958
    }
959

960
    try {
961
      return result.get();
1✔
962
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
963
      throw new UnsupportedVersion(ex);
1✔
964
    }
965
  }
966

967
  @Override
968
  public void registerQuery(RegisterQueryInput request) {
969
    queryDispatcher.registerQueryHandlers(request);
1✔
970
  }
1✔
971

972
  @Override
973
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
974
    signalDispatcher.registerSignalHandlers(input);
1✔
975
  }
1✔
976

977
  @Override
978
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
979
    updateDispatcher.registerUpdateHandlers(input);
1✔
980
  }
1✔
981

982
  @Override
983
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
984
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
985
  }
1✔
986

987
  @Override
988
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
989
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
990
  }
1✔
991

992
  @Override
993
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
994
    updateDispatcher.registerDynamicUpdateHandler(input);
×
995
  }
×
996

997
  @Override
998
  public UUID randomUUID() {
999
    return replayContext.randomUUID();
1✔
1000
  }
1001

1002
  @Override
1003
  public Random newRandom() {
1004
    return replayContext.newRandom();
1✔
1005
  }
1006

1007
  public DataConverter getDataConverter() {
1008
    return dataConverter;
×
1009
  }
1010

1011
  public DataConverter getDataConverterWithCurrentWorkflowContext() {
1012
    return dataConverterWithCurrentWorkflowContext;
1✔
1013
  }
1014

1015
  boolean isReplaying() {
1016
    return replayContext.isReplaying();
1✔
1017
  }
1018

1019
  boolean isReadOnly() {
1020
    return readOnly;
1✔
1021
  }
1022

1023
  void setReadOnly(boolean readOnly) {
1024
    this.readOnly = readOnly;
1✔
1025
  }
1✔
1026

1027
  @Override
1028
  public ReplayWorkflowContext getReplayContext() {
1029
    return replayContext;
1✔
1030
  }
1031

1032
  @Override
1033
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
1034
    WorkflowExecution childExecution = input.getExecution();
1✔
1035
    DataConverter dataConverterWithChildWorkflowContext =
1✔
1036
        dataConverter.withContext(
1✔
1037
            new WorkflowSerializationContext(
1038
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
1039
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
1040
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
1041
    attributes.setSignalName(input.getSignalName());
1✔
1042
    attributes.setExecution(childExecution);
1✔
1043
    attributes.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));
1✔
1044
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
1045
    payloads.ifPresent(attributes::setInput);
1✔
1046
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
1047
    Functions.Proc1<Exception> cancellationCallback =
1✔
1048
        replayContext.signalExternalWorkflowExecution(
1✔
1049
            attributes,
1050
            (output, failure) -> {
1051
              if (failure != null) {
1✔
1052
                runner.executeInWorkflowThread(
1✔
1053
                    "child workflow failure callback",
1054
                    () ->
1055
                        result.completeExceptionally(
1✔
1056
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
1057
              } else {
1058
                runner.executeInWorkflowThread(
1✔
1059
                    "child workflow completion callback", () -> result.complete(output));
1✔
1060
              }
1061
            });
1✔
1062
    CancellationScope.current()
1✔
1063
        .getCancellationRequest()
1✔
1064
        .thenApply(
1✔
1065
            (reason) -> {
1066
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
1067
              return null;
1✔
1068
            });
1069
    return new SignalExternalOutput(result);
1✔
1070
  }
1071

1072
  @Override
1073
  public void sleep(Duration duration) {
1074
    newTimer(duration).get();
1✔
1075
  }
1✔
1076

1077
  @Override
1078
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1079
    Promise<Void> timer = newTimer(timeout);
1✔
1080
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1081
    return !timer.isCompleted();
1✔
1082
  }
1083

1084
  @Override
1085
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1086
    WorkflowThread.await(reason, unblockCondition);
1✔
1087
  }
1✔
1088

1089
  @SuppressWarnings("deprecation")
1090
  @Override
1091
  public void continueAsNew(ContinueAsNewInput input) {
1092
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1093
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1094
    String workflowType = input.getWorkflowType();
1✔
1095
    if (workflowType != null) {
1✔
1096
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1097
    }
1098
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1099
    if (options != null) {
1✔
1100
      if (options.getWorkflowRunTimeout() != null) {
1✔
1101
        attributes.setWorkflowRunTimeout(
×
1102
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1103
      }
1104
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1105
        attributes.setWorkflowTaskTimeout(
×
1106
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1107
      }
1108
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1109
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1110
      }
1111
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1112
      if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
1113
        if (options.getTypedSearchAttributes() != null) {
×
1114
          throw new IllegalArgumentException(
×
1115
              "Cannot have typed search attributes and search attributes");
1116
        }
1117
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
×
1118
      } else if (options.getTypedSearchAttributes() != null) {
1✔
1119
        attributes.setSearchAttributes(
1✔
1120
            SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
1121
      }
1122
      Map<String, Object> memo = options.getMemo();
1✔
1123
      if (memo != null) {
1✔
1124
        attributes.setMemo(
1✔
1125
            Memo.newBuilder()
1✔
1126
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1127
      }
1128
      if (options.getVersioningIntent() != null) {
1✔
1129
        attributes.setUseCompatibleVersion(
×
1130
            options
1131
                .getVersioningIntent()
×
1132
                .determineUseCompatibleFlag(
×
1133
                    replayContext.getTaskQueue().equals(options.getTaskQueue())));
×
1134
      }
1135
    }
1136

1137
    List<ContextPropagator> propagators =
1138
        options != null && options.getContextPropagators() != null
1✔
1139
            ? options.getContextPropagators()
×
1140
            : this.contextPropagators;
1✔
1141
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1142
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1143
    attributes.setHeader(grpcHeader);
1✔
1144

1145
    Optional<Payloads> payloads =
1✔
1146
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1147
    payloads.ifPresent(attributes::setInput);
1✔
1148

1149
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1150
    WorkflowThread.exit();
×
1151
  }
×
1152

1153
  @Override
1154
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1155
    CompletablePromise<Void> result = Workflow.newPromise();
×
1156
    replayContext.requestCancelExternalWorkflowExecution(
×
1157
        input.getExecution(),
×
1158
        (r, exception) -> {
1159
          if (exception == null) {
×
1160
            result.complete(null);
×
1161
          } else {
1162
            result.completeExceptionally(exception);
×
1163
          }
1164
        });
×
1165
    return new CancelWorkflowOutput(result);
×
1166
  }
1167

1168
  public Scope getMetricsScope() {
1169
    return replayContext.getMetricsScope();
1✔
1170
  }
1171

1172
  public boolean isLoggingEnabledInReplay() {
1173
    return replayContext.getEnableLoggingInReplay();
×
1174
  }
1175

1176
  @Override
1177
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1178
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1179
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1180
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1181
    replayContext.upsertSearchAttributes(attr);
1✔
1182
  }
1✔
1183

1184
  @Override
1185
  public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
1186
    SearchAttributes attr = SearchAttributesUtil.encodeTypedUpdates(searchAttributeUpdates);
1✔
1187
    replayContext.upsertSearchAttributes(attr);
1✔
1188
  }
1✔
1189

1190
  @Nonnull
1191
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1192
    return runner.newWorkflowThread(runnable, false, name);
1✔
1193
  }
1194

1195
  @Nonnull
1196
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1197
    return runner.newCallbackThread(runnable, name);
1✔
1198
  }
1199

1200
  @Override
1201
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1202
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1203
  }
1204

1205
  @Override
1206
  public long currentTimeMillis() {
1207
    return replayContext.currentTimeMillis();
1✔
1208
  }
1209

1210
  /**
1211
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1212
   * thread and should be replaced with another specific implementation during initialization stage
1213
   * {@code workflow.initialize()} performed inside the workflow root thread.
1214
   *
1215
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1216
   */
1217
  private static final class InitialWorkflowInboundCallsInterceptor
1218
      extends BaseRootWorkflowInboundCallsInterceptor {
1219

1220
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1221
      super(workflowContext);
1✔
1222
    }
1✔
1223

1224
    @Override
1225
    public WorkflowOutput execute(WorkflowInput input) {
1226
      throw new UnsupportedOperationException(
×
1227
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1228
              + "before #execute can be called");
1229
    }
1230
  }
1231

1232
  @Nonnull
1233
  @Override
1234
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1235
    return workflowImplementationOptions;
1✔
1236
  }
1237

1238
  @Override
1239
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1240
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1241
  }
1242

1243
  @Nullable
1244
  @Override
1245
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1246
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1247
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1248
  }
1249

1250
  @Override
1251
  public List<ContextPropagator> getContextPropagators() {
1252
    return contextPropagators;
1✔
1253
  }
1254

1255
  @Override
1256
  public Map<String, Object> getPropagatedContexts() {
1257
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1258
      return new HashMap<>();
1✔
1259
    }
1260

1261
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1262
    Map<String, Object> contextData = new HashMap<>();
1✔
1263
    for (ContextPropagator propagator : contextPropagators) {
1✔
1264
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1265
    }
1✔
1266

1267
    return contextData;
1✔
1268
  }
1269

1270
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1271
  private static class FailureWrapperException extends RuntimeException {
1272
    private final Failure failure;
1273

1274
    public FailureWrapperException(Failure failure) {
1✔
1275
      this.failure = failure;
1✔
1276
    }
1✔
1277

1278
    public Failure getFailure() {
1279
      return failure;
1✔
1280
    }
1281
  }
1282
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc