• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
26

27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.activity.ActivityOptions;
31
import io.temporal.activity.LocalActivityOptions;
32
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
33
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
36
import io.temporal.api.common.v1.ActivityType;
37
import io.temporal.api.common.v1.Memo;
38
import io.temporal.api.common.v1.Payload;
39
import io.temporal.api.common.v1.Payloads;
40
import io.temporal.api.common.v1.SearchAttributes;
41
import io.temporal.api.common.v1.WorkflowExecution;
42
import io.temporal.api.common.v1.WorkflowType;
43
import io.temporal.api.enums.v1.ParentClosePolicy;
44
import io.temporal.api.failure.v1.Failure;
45
import io.temporal.api.history.v1.HistoryEvent;
46
import io.temporal.api.taskqueue.v1.TaskQueue;
47
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
48
import io.temporal.client.WorkflowException;
49
import io.temporal.common.RetryOptions;
50
import io.temporal.common.context.ContextPropagator;
51
import io.temporal.common.converter.DataConverter;
52
import io.temporal.common.interceptors.Header;
53
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
54
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
55
import io.temporal.failure.*;
56
import io.temporal.internal.common.ActivityOptionUtils;
57
import io.temporal.internal.common.OptionsUtils;
58
import io.temporal.internal.common.ProtobufTimeUtils;
59
import io.temporal.internal.common.SearchAttributesUtil;
60
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
61
import io.temporal.internal.replay.ReplayWorkflowContext;
62
import io.temporal.internal.replay.WorkflowContext;
63
import io.temporal.internal.statemachines.*;
64
import io.temporal.payload.context.ActivitySerializationContext;
65
import io.temporal.payload.context.WorkflowSerializationContext;
66
import io.temporal.worker.WorkflowImplementationOptions;
67
import io.temporal.workflow.CancellationScope;
68
import io.temporal.workflow.ChildWorkflowOptions;
69
import io.temporal.workflow.CompletablePromise;
70
import io.temporal.workflow.ContinueAsNewOptions;
71
import io.temporal.workflow.Functions;
72
import io.temporal.workflow.Functions.Func;
73
import io.temporal.workflow.Promise;
74
import io.temporal.workflow.Workflow;
75
import java.lang.reflect.Type;
76
import java.time.Duration;
77
import java.time.Instant;
78
import java.util.*;
79
import java.util.concurrent.atomic.AtomicBoolean;
80
import java.util.concurrent.atomic.AtomicReference;
81
import java.util.function.BiPredicate;
82
import java.util.function.Supplier;
83
import javax.annotation.Nonnull;
84
import javax.annotation.Nullable;
85
import org.slf4j.Logger;
86
import org.slf4j.LoggerFactory;
87

88
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
89
// RootWorkflowOutboundInterceptor
90
/**
91
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
92
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
93
 * definition on the worker meets ReplayWorkflowContext which contains information from the
94
 * WorkflowTask
95
 */
96
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
97
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
98

99
  private final String namespace;
100
  private final WorkflowExecution workflowExecution;
101
  private final WorkflowImplementationOptions workflowImplementationOptions;
102
  private final DataConverter dataConverter;
103
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
104
  private final DataConverter dataConverterWithCurrentWorkflowContext;
105
  private final List<ContextPropagator> contextPropagators;
106
  private final SignalDispatcher signalDispatcher;
107
  private final QueryDispatcher queryDispatcher;
108
  private final UpdateDispatcher updateDispatcher;
109

110
  // initialized later when these entities are created
111
  private ReplayWorkflowContext replayContext;
112
  private DeterministicRunner runner;
113

114
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
115
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
116

117
  private ActivityOptions defaultActivityOptions = null;
1✔
118
  private Map<String, ActivityOptions> activityOptionsMap;
119
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
120
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
121

122
  public SyncWorkflowContext(
123
      @Nonnull String namespace,
124
      @Nonnull WorkflowExecution workflowExecution,
125
      SignalDispatcher signalDispatcher,
126
      QueryDispatcher queryDispatcher,
127
      UpdateDispatcher updateDispatcher,
128
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
129
      DataConverter dataConverter,
130
      List<ContextPropagator> contextPropagators) {
1✔
131
    this.namespace = namespace;
1✔
132
    this.workflowExecution = workflowExecution;
1✔
133
    this.dataConverter = dataConverter;
1✔
134
    this.dataConverterWithCurrentWorkflowContext =
1✔
135
        dataConverter.withContext(
1✔
136
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
137
    this.contextPropagators = contextPropagators;
1✔
138
    this.signalDispatcher = signalDispatcher;
1✔
139
    this.queryDispatcher = queryDispatcher;
1✔
140
    this.updateDispatcher = updateDispatcher;
1✔
141
    if (workflowImplementationOptions != null) {
1✔
142
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
143
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
144
      this.defaultLocalActivityOptions =
1✔
145
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
146
      this.localActivityOptionsMap =
1✔
147
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
148
    }
149
    this.workflowImplementationOptions =
1✔
150
        workflowImplementationOptions == null
1✔
151
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
152
            : workflowImplementationOptions;
1✔
153
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
154
    // with actual interceptors through #initHeadInboundCallsInterceptor and
155
    // #initHeadOutboundCallsInterceptor during initialization phase.
156
    // See workflow.initialize() performed inside the workflow root thread inside
157
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
158
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
159
    this.headOutboundInterceptor = this;
1✔
160
  }
1✔
161

162
  public void setReplayContext(ReplayWorkflowContext context) {
163
    this.replayContext = context;
1✔
164
  }
1✔
165

166
  /**
167
   * Using setter, as runner is initialized with this context, so it is not ready during
168
   * construction of this.
169
   */
170
  public void setRunner(DeterministicRunner runner) {
171
    this.runner = runner;
1✔
172
  }
1✔
173

174
  public DeterministicRunner getRunner() {
175
    return runner;
×
176
  }
177

178
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
179
    return headOutboundInterceptor;
1✔
180
  }
181

182
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
183
    return headInboundInterceptor;
1✔
184
  }
185

186
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
187
    headOutboundInterceptor = head;
1✔
188
  }
1✔
189

190
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
191
    headInboundInterceptor = head;
1✔
192
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
193
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
194
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
195
  }
1✔
196

197
  public ActivityOptions getDefaultActivityOptions() {
198
    return defaultActivityOptions;
1✔
199
  }
200

201
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
202
    return activityOptionsMap != null
1✔
203
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
204
        : Collections.emptyMap();
×
205
  }
206

207
  public LocalActivityOptions getDefaultLocalActivityOptions() {
208
    return defaultLocalActivityOptions;
1✔
209
  }
210

211
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
212
    return localActivityOptionsMap != null
1✔
213
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
214
        : Collections.emptyMap();
×
215
  }
216

217
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
218
    this.defaultActivityOptions =
1✔
219
        (this.defaultActivityOptions == null)
1✔
220
            ? defaultActivityOptions
×
221
            : this.defaultActivityOptions.toBuilder()
1✔
222
                .mergeActivityOptions(defaultActivityOptions)
1✔
223
                .build();
1✔
224
  }
1✔
225

226
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
227
    Objects.requireNonNull(activityTypeToOption);
1✔
228
    if (this.activityOptionsMap == null) {
1✔
229
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
230
      return;
×
231
    }
232
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
233
  }
1✔
234

235
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
236
    this.defaultLocalActivityOptions =
1✔
237
        (this.defaultLocalActivityOptions == null)
1✔
238
            ? defaultLocalActivityOptions
×
239
            : this.defaultLocalActivityOptions.toBuilder()
1✔
240
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
241
                .build();
1✔
242
  }
1✔
243

244
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
245
    Objects.requireNonNull(activityTypeToOption);
1✔
246
    if (this.localActivityOptionsMap == null) {
1✔
247
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
248
      return;
×
249
    }
250
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
251
        localActivityOptionsMap, activityTypeToOption);
252
  }
1✔
253

254
  @Override
255
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
256
    ActivitySerializationContext serializationContext =
1✔
257
        new ActivitySerializationContext(
258
            replayContext.getNamespace(),
1✔
259
            replayContext.getWorkflowId(),
1✔
260
            replayContext.getWorkflowType().getName(),
1✔
261
            input.getActivityName(),
1✔
262
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
263
            // by the Server in this case
264
            MoreObjects.firstNonNull(
1✔
265
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
266
            false);
267
    DataConverter dataConverterWithActivityContext =
1✔
268
        dataConverter.withContext(serializationContext);
1✔
269
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
270

271
    ActivityOutput<Optional<Payloads>> output =
1✔
272
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
273

274
    return new ActivityOutput<>(
1✔
275
        output.getActivityId(),
1✔
276
        output
277
            .getResult()
1✔
278
            .handle(
1✔
279
                (r, f) -> {
280
                  if (f == null) {
1✔
281
                    return input.getResultType() != Void.TYPE
1✔
282
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
283
                            0, r, input.getResultClass(), input.getResultType())
1✔
284
                        : null;
1✔
285
                  } else {
286
                    throw dataConverterWithActivityContext.failureToException(
1✔
287
                        ((FailureWrapperException) f).getFailure());
1✔
288
                  }
289
                }));
290
  }
291

292
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
293
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
294
    ExecuteActivityParameters params =
1✔
295
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
296
    ActivityCallback callback = new ActivityCallback();
1✔
297
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
298
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
299
    CancellationScope.current()
1✔
300
        .getCancellationRequest()
1✔
301
        .thenApply(
1✔
302
            (reason) -> {
303
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
304
              return null;
1✔
305
            });
306
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
307
  }
308

309
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
310
    signalDispatcher.handleInterceptedSignal(input);
1✔
311
  }
1✔
312

313
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
314
    signalDispatcher.handleSignal(signalName, input, eventId);
1✔
315
  }
1✔
316

317
  public void handleValidateUpdate(String updateName, Optional<Payloads> input, long eventId) {
318
    updateDispatcher.handleValidateUpdate(updateName, input, eventId);
×
319
  }
×
320

321
  public Optional<Payloads> handleExecuteUpdate(
322
      String updateName, Optional<Payloads> input, long eventId) {
323
    return updateDispatcher.handleExecuteUpdate(updateName, input, eventId);
×
324
  }
325

326
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
327
    updateDispatcher.handleInterceptedValidateUpdate(input);
×
328
  }
×
329

330
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
331
      WorkflowInboundCallsInterceptor.UpdateInput input) {
332
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
×
333
  }
334

335
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
336
      WorkflowInboundCallsInterceptor.QueryInput input) {
337
    return queryDispatcher.handleInterceptedQuery(input);
1✔
338
  }
339

340
  public Optional<Payloads> handleQuery(String queryName, Optional<Payloads> input) {
341
    return queryDispatcher.handleQuery(queryName, input);
1✔
342
  }
343

344
  private class ActivityCallback {
1✔
345
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
346

347
    public void invoke(Optional<Payloads> output, Failure failure) {
348
      if (failure != null) {
1✔
349
        runner.executeInWorkflowThread(
1✔
350
            "activity failure callback",
351
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
352
      } else {
353
        runner.executeInWorkflowThread(
1✔
354
            "activity completion callback", () -> result.complete(output));
1✔
355
      }
356
    }
1✔
357
  }
358

359
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
360
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
361

362
    @Override
363
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
364
      if (exception != null) {
1✔
365
        runner.executeInWorkflowThread(
1✔
366
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
367
      } else {
368
        runner.executeInWorkflowThread(
1✔
369
            "local activity completion callback", () -> result.complete(successOutput));
1✔
370
      }
371
    }
1✔
372
  }
373

374
  @Override
375
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
376
    ActivitySerializationContext serializationContext =
1✔
377
        new ActivitySerializationContext(
378
            replayContext.getNamespace(),
1✔
379
            replayContext.getWorkflowId(),
1✔
380
            replayContext.getWorkflowType().getName(),
1✔
381
            input.getActivityName(),
1✔
382
            replayContext.getTaskQueue(),
1✔
383
            true);
384
    DataConverter dataConverterWithActivityContext =
1✔
385
        dataConverter.withContext(serializationContext);
1✔
386
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
387

388
    long originalScheduledTime = System.currentTimeMillis();
1✔
389
    CompletablePromise<Optional<Payloads>> serializedResult =
390
        WorkflowInternal.newCompletablePromise();
1✔
391
    executeLocalActivityOverLocalRetryThreshold(
1✔
392
        input.getActivityName(),
1✔
393
        input.getOptions(),
1✔
394
        input.getHeader(),
1✔
395
        payloads,
396
        originalScheduledTime,
397
        1,
398
        null,
399
        serializedResult);
400

401
    Promise<R> result =
1✔
402
        serializedResult.handle(
1✔
403
            (r, f) -> {
404
              if (f == null) {
1✔
405
                return input.getResultClass() != Void.TYPE
1✔
406
                    ? dataConverterWithActivityContext.fromPayloads(
1✔
407
                        0, r, input.getResultClass(), input.getResultType())
1✔
408
                    : null;
1✔
409
              } else {
410
                throw dataConverterWithActivityContext.failureToException(
1✔
411
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
412
              }
413
            });
414

415
    return new LocalActivityOutput<>(result);
1✔
416
  }
417

418
  public void executeLocalActivityOverLocalRetryThreshold(
419
      String activityTypeName,
420
      LocalActivityOptions options,
421
      Header header,
422
      Optional<Payloads> input,
423
      long originalScheduledTime,
424
      int attempt,
425
      @Nullable Failure previousExecutionFailure,
426
      CompletablePromise<Optional<Payloads>> result) {
427
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
428
        executeLocalActivityLocally(
1✔
429
            activityTypeName,
430
            options,
431
            header,
432
            input,
433
            originalScheduledTime,
434
            attempt,
435
            previousExecutionFailure);
436

437
    localExecutionResult.handle(
1✔
438
        (r, e) -> {
439
          if (e == null) {
1✔
440
            result.complete(r);
1✔
441
          } else {
442
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
443
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
444
                  (LocalActivityCallback.LocalActivityFailedException) e;
445
              @Nullable Duration backoff = laException.getBackoff();
1✔
446
              if (backoff != null) {
1✔
447
                WorkflowInternal.newTimer(backoff)
1✔
448
                    .thenApply(
1✔
449
                        unused -> {
450
                          executeLocalActivityOverLocalRetryThreshold(
1✔
451
                              activityTypeName,
452
                              options,
453
                              header,
454
                              input,
455
                              originalScheduledTime,
456
                              laException.getLastAttempt() + 1,
1✔
457
                              laException.getFailure(),
1✔
458
                              result);
459
                          return null;
1✔
460
                        });
461
              } else {
462
                // final failure, report back
463
                result.completeExceptionally(laException);
1✔
464
              }
465
            } else {
1✔
466
              // Only LocalActivityFailedException is expected
467
              String exceptionMessage =
×
468
                  String.format(
×
469
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
470
                      activityTypeName);
471
              log.warn(exceptionMessage, e);
×
472
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
473
            }
474
          }
475
          return null;
1✔
476
        });
477
  }
1✔
478

479
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
480
      String activityTypeName,
481
      LocalActivityOptions options,
482
      Header header,
483
      Optional<Payloads> input,
484
      long originalScheduledTime,
485
      int attempt,
486
      @Nullable Failure previousExecutionFailure) {
487

488
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
489
    ExecuteLocalActivityParameters params =
1✔
490
        constructExecuteLocalActivityParameters(
1✔
491
            activityTypeName,
492
            options,
493
            header,
494
            input,
495
            attempt,
496
            originalScheduledTime,
497
            previousExecutionFailure);
498
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
499
    CancellationScope.current()
1✔
500
        .getCancellationRequest()
1✔
501
        .thenApply(
1✔
502
            (reason) -> {
503
              cancellationCallback.apply();
×
504
              return null;
×
505
            });
506
    return callback.result;
1✔
507
  }
508

509
  private ExecuteActivityParameters constructExecuteActivityParameters(
510
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
511
    String taskQueue = options.getTaskQueue();
1✔
512
    if (taskQueue == null) {
1✔
513
      taskQueue = replayContext.getTaskQueue();
1✔
514
    }
515
    ScheduleActivityTaskCommandAttributes.Builder attributes =
516
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
517
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
518
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
519
            .setScheduleToStartTimeout(
1✔
520
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
521
            .setStartToCloseTimeout(
1✔
522
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
523
            .setScheduleToCloseTimeout(
1✔
524
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
525
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
526
            .setRequestEagerExecution(
1✔
527
                !options.isEagerExecutionDisabled()
1✔
528
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
529

530
    input.ifPresent(attributes::setInput);
1✔
531
    RetryOptions retryOptions = options.getRetryOptions();
1✔
532
    if (retryOptions != null) {
1✔
533
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
534
    }
535

536
    // Set the context value.  Use the context propagators from the ActivityOptions
537
    // if present, otherwise use the ones configured on the WorkflowContext
538
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
539
    if (propagators == null) {
1✔
540
      propagators = this.contextPropagators;
1✔
541
    }
542
    io.temporal.api.common.v1.Header grpcHeader =
1✔
543
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
544
    attributes.setHeader(grpcHeader);
1✔
545
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
546
  }
547

548
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
549
      String name,
550
      LocalActivityOptions options,
551
      Header header,
552
      Optional<Payloads> input,
553
      int attempt,
554
      long originalScheduledTime,
555
      @Nullable Failure previousExecutionFailure) {
556
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
557

558
    PollActivityTaskQueueResponse.Builder activityTask =
559
        PollActivityTaskQueueResponse.newBuilder()
1✔
560
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
561
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
562
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
563
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
564
            // used to pass scheduled time to the local activity code inside
565
            // ActivityExecutionContext#getInfo
566
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
567
            // into the LA queue
568
            .setScheduledTime(
1✔
569
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
570
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
571
            .setAttempt(attempt);
1✔
572

573
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
574
    if (scheduleToCloseTimeout != null) {
1✔
575
      activityTask.setScheduleToCloseTimeout(
1✔
576
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
577
    }
578

579
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
580
    if (startToCloseTimeout != null) {
1✔
581
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
582
    }
583

584
    io.temporal.api.common.v1.Header grpcHeader =
1✔
585
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
586
    activityTask.setHeader(grpcHeader);
1✔
587
    input.ifPresent(activityTask::setInput);
1✔
588
    RetryOptions retryOptions = options.getRetryOptions();
1✔
589
    activityTask.setRetryPolicy(
1✔
590
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
591
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
592
    if (localRetryThreshold == null) {
1✔
593
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
594
    }
595

596
    return new ExecuteLocalActivityParameters(
1✔
597
        activityTask,
598
        options.getScheduleToStartTimeout(),
1✔
599
        originalScheduledTime,
600
        previousExecutionFailure,
601
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
602
        localRetryThreshold);
603
  }
604

605
  @Override
606
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
607
    if (CancellationScope.current().isCancelRequested()) {
1✔
608
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
609
      return new ChildWorkflowOutput<>(
×
610
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
611
    }
612

613
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
614
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
615

616
    DataConverter dataConverterWithChildWorkflowContext =
1✔
617
        dataConverter.withContext(
1✔
618
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
619
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
620

621
    @Nullable
622
    Memo memo =
623
        (input.getOptions().getMemo() != null)
1✔
624
            ? Memo.newBuilder()
1✔
625
                .putAllFields(
1✔
626
                    intoPayloadMap(
1✔
627
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
628
                .build()
1✔
629
            : null;
1✔
630

631
    StartChildWorkflowExecutionParameters parameters =
1✔
632
        createChildWorkflowParameters(
1✔
633
            input.getWorkflowId(),
1✔
634
            input.getWorkflowType(),
1✔
635
            input.getOptions(),
1✔
636
            input.getHeader(),
1✔
637
            payloads,
638
            memo);
639

640
    Functions.Proc1<Exception> cancellationCallback =
1✔
641
        replayContext.startChildWorkflow(
1✔
642
            parameters,
643
            (execution, failure) -> {
644
              if (failure != null) {
1✔
645
                runner.executeInWorkflowThread(
1✔
646
                    "child workflow start failed callback",
647
                    () ->
648
                        executionPromise.completeExceptionally(
1✔
649
                            mapChildWorkflowException(
1✔
650
                                failure, dataConverterWithChildWorkflowContext)));
651
              } else {
652
                runner.executeInWorkflowThread(
1✔
653
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
654
              }
655
            },
1✔
656
            (result, failure) -> {
657
              if (failure != null) {
1✔
658
                runner.executeInWorkflowThread(
1✔
659
                    "child workflow failure callback",
660
                    () ->
661
                        resultPromise.completeExceptionally(
1✔
662
                            mapChildWorkflowException(
1✔
663
                                failure, dataConverterWithChildWorkflowContext)));
664
              } else {
665
                runner.executeInWorkflowThread(
1✔
666
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
667
              }
668
            });
1✔
669
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
670
    CancellationScope.current()
1✔
671
        .getCancellationRequest()
1✔
672
        .thenApply(
1✔
673
            (reason) -> {
674
              if (!callbackCalled.getAndSet(true)) {
1✔
675
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
676
              }
677
              return null;
1✔
678
            });
679

680
    Promise<R> result =
1✔
681
        resultPromise.thenApply(
1✔
682
            (b) ->
683
                dataConverterWithChildWorkflowContext.fromPayloads(
1✔
684
                    0, b, input.getResultClass(), input.getResultType()));
1✔
685
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
686
  }
687

688
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
689
      String workflowId,
690
      String name,
691
      ChildWorkflowOptions options,
692
      Header header,
693
      Optional<Payloads> input,
694
      @Nullable Memo memo) {
695
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
696
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
697
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
698
    attributes.setWorkflowId(workflowId);
1✔
699
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
700
    input.ifPresent(attributes::setInput);
1✔
701
    attributes.setWorkflowRunTimeout(
1✔
702
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
703
    attributes.setWorkflowExecutionTimeout(
1✔
704
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
705
    attributes.setWorkflowTaskTimeout(
1✔
706
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
707
    String taskQueue = options.getTaskQueue();
1✔
708
    if (taskQueue != null) {
1✔
709
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
710
    }
711
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
712
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
713
    }
714
    RetryOptions retryOptions = options.getRetryOptions();
1✔
715
    if (retryOptions != null) {
1✔
716
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
717
    }
718
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
719

720
    if (memo != null) {
1✔
721
      attributes.setMemo(memo);
1✔
722
    }
723

724
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
725
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
726
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
727
    }
728

729
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
730
    if (propagators == null) {
1✔
731
      propagators = this.contextPropagators;
1✔
732
    }
733
    io.temporal.api.common.v1.Header grpcHeader =
1✔
734
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
735
    attributes.setHeader(grpcHeader);
1✔
736

737
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
738
    if (parentClosePolicy != null) {
1✔
739
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
740
    }
741
    return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType());
1✔
742
  }
743

744
  private static Header extractContextsAndConvertToBytes(
745
      List<ContextPropagator> contextPropagators) {
746
    if (contextPropagators == null) {
1✔
747
      return null;
×
748
    }
749
    Map<String, Payload> result = new HashMap<>();
1✔
750
    for (ContextPropagator propagator : contextPropagators) {
1✔
751
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
752
    }
1✔
753
    return new Header(result);
1✔
754
  }
755

756
  private static RuntimeException mapChildWorkflowException(
757
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
758
    if (failure == null) {
1✔
759
      return null;
×
760
    }
761
    if (failure instanceof TemporalFailure) {
1✔
762
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
763
    }
764
    if (failure instanceof CanceledFailure) {
1✔
765
      return (CanceledFailure) failure;
1✔
766
    }
767
    if (failure instanceof WorkflowException) {
1✔
768
      return (RuntimeException) failure;
×
769
    }
770
    if (failure instanceof ChildWorkflowFailure) {
1✔
771
      return (ChildWorkflowFailure) failure;
1✔
772
    }
773
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
774
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
775
    }
776
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
777
    Throwable cause =
1✔
778
        dataConverterWithChildWorkflowContext.failureToException(
1✔
779
            taskFailed.getOriginalCauseFailure());
1✔
780
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
781
    return new ChildWorkflowFailure(
1✔
782
        exception.getInitiatedEventId(),
1✔
783
        exception.getStartedEventId(),
1✔
784
        exception.getWorkflowType(),
1✔
785
        exception.getExecution(),
1✔
786
        exception.getNamespace(),
1✔
787
        exception.getRetryState(),
1✔
788
        cause);
789
  }
790

791
  @Override
792
  public Promise<Void> newTimer(Duration delay) {
793
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
794
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
795
        replayContext.newTimer(
1✔
796
            delay,
797
            (e) ->
798
                runner.executeInWorkflowThread(
1✔
799
                    "timer-callback",
800
                    () -> {
801
                      if (e == null) {
1✔
802
                        p.complete(null);
1✔
803
                      } else {
804
                        p.completeExceptionally(e);
1✔
805
                      }
806
                    }));
1✔
807
    CancellationScope.current()
1✔
808
        .getCancellationRequest()
1✔
809
        .thenApply(
1✔
810
            (r) -> {
811
              cancellationHandler.apply(new CanceledFailure(r));
1✔
812
              return r;
1✔
813
            });
814
    return p;
1✔
815
  }
816

817
  @Override
818
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
819
    try {
820
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
821
      replayContext.sideEffect(
1✔
822
          () -> {
823
            R r = func.apply();
1✔
824
            return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
825
          },
826
          (p) ->
827
              runner.executeInWorkflowThread(
1✔
828
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
829
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
830
          0, result.get(), resultClass, resultType);
1✔
831
    } catch (Exception e) {
×
832
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
833
      // fail the workflow task by throwing an Error.
834
      throw new Error(e);
×
835
    }
836
  }
837

838
  @Override
839
  public <R> R mutableSideEffect(
840
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
841
    try {
842
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
843
    } catch (Exception e) {
×
844
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
845
      // behavior. So fail the workflow task by throwing an Error.
846
      throw new Error(e);
×
847
    }
848
  }
849

850
  private <R> R mutableSideEffectImpl(
851
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
852
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
853
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
854
    replayContext.mutableSideEffect(
1✔
855
        id,
856
        (storedBinary) -> {
857
          Optional<R> stored =
1✔
858
              storedBinary.map(
1✔
859
                  (b) ->
860
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
861
                          0, Optional.of(b), resultClass, resultType));
1✔
862
          R funcResult =
1✔
863
              Objects.requireNonNull(func.apply(), "mutableSideEffect function " + "returned null");
1✔
864
          if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
865
            unserializedResult.set(funcResult);
1✔
866
            return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
867
          }
868
          return Optional.empty(); // returned only when value doesn't need to be updated
1✔
869
        },
870
        (p) ->
871
            runner.executeInWorkflowThread(
1✔
872
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
873

874
    if (!result.get().isPresent()) {
1✔
875
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
876
    }
877
    // An optimization that avoids unnecessary deserialization of the result.
878
    R unserialized = unserializedResult.get();
1✔
879
    if (unserialized != null) {
1✔
880
      return unserialized;
1✔
881
    }
882
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
883
        0, result.get(), resultClass, resultType);
1✔
884
  }
885

886
  @Override
887
  public int getVersion(String changeId, int minSupported, int maxSupported) {
888
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
889
    replayContext.getVersion(
1✔
890
        changeId,
891
        minSupported,
892
        maxSupported,
893
        (v, e) ->
894
            runner.executeInWorkflowThread(
1✔
895
                "version-callback",
896
                () -> {
897
                  if (v != null) {
1✔
898
                    result.complete(v);
1✔
899
                  } else {
900
                    result.completeExceptionally(e);
1✔
901
                  }
902
                }));
1✔
903
    try {
904
      return result.get();
1✔
905
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
906
      throw new UnsupportedVersion(ex);
1✔
907
    }
908
  }
909

910
  @Override
911
  public void registerQuery(RegisterQueryInput request) {
912
    queryDispatcher.registerQueryHandlers(request);
1✔
913
  }
1✔
914

915
  @Override
916
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
917
    signalDispatcher.registerSignalHandlers(input);
1✔
918
  }
1✔
919

920
  @Override
921
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
922
    updateDispatcher.registerUpdateHandlers(input);
×
923
  }
×
924

925
  @Override
926
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
927
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
928
  }
1✔
929

930
  @Override
931
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
932
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
933
  }
1✔
934

935
  @Override
936
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
937
    updateDispatcher.registerDynamicUpdateHandler(input);
×
938
  }
×
939

940
  @Override
941
  public UUID randomUUID() {
942
    return replayContext.randomUUID();
1✔
943
  }
944

945
  @Override
946
  public Random newRandom() {
947
    return replayContext.newRandom();
1✔
948
  }
949

950
  public DataConverter getDataConverter() {
951
    return dataConverter;
1✔
952
  }
953

954
  boolean isReplaying() {
955
    return replayContext.isReplaying();
1✔
956
  }
957

958
  @Override
959
  public ReplayWorkflowContext getReplayContext() {
960
    return replayContext;
1✔
961
  }
962

963
  @Override
964
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
965
    WorkflowExecution childExecution = input.getExecution();
1✔
966
    DataConverter dataConverterWithChildWorkflowContext =
1✔
967
        dataConverter.withContext(
1✔
968
            new WorkflowSerializationContext(
969
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
970
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
971
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
972
    attributes.setSignalName(input.getSignalName());
1✔
973
    attributes.setExecution(childExecution);
1✔
974
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
975
    payloads.ifPresent(attributes::setInput);
1✔
976
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
977
    Functions.Proc1<Exception> cancellationCallback =
1✔
978
        replayContext.signalExternalWorkflowExecution(
1✔
979
            attributes,
980
            (output, failure) -> {
981
              if (failure != null) {
1✔
982
                runner.executeInWorkflowThread(
1✔
983
                    "child workflow failure callback",
984
                    () ->
985
                        result.completeExceptionally(
1✔
986
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
987
              } else {
988
                runner.executeInWorkflowThread(
1✔
989
                    "child workflow completion callback", () -> result.complete(output));
1✔
990
              }
991
            });
1✔
992
    CancellationScope.current()
1✔
993
        .getCancellationRequest()
1✔
994
        .thenApply(
1✔
995
            (reason) -> {
996
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
997
              return null;
1✔
998
            });
999
    return new SignalExternalOutput(result);
1✔
1000
  }
1001

1002
  @Override
1003
  public void sleep(Duration duration) {
1004
    newTimer(duration).get();
1✔
1005
  }
1✔
1006

1007
  @Override
1008
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1009
    Promise<Void> timer = newTimer(timeout);
1✔
1010
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1011
    return !timer.isCompleted();
1✔
1012
  }
1013

1014
  @Override
1015
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1016
    WorkflowThread.await(reason, unblockCondition);
1✔
1017
  }
1✔
1018

1019
  @Override
1020
  public void continueAsNew(ContinueAsNewInput input) {
1021
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1022
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1023
    String workflowType = input.getWorkflowType();
1✔
1024
    if (workflowType != null) {
1✔
1025
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1026
    }
1027
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1028
    if (options != null) {
1✔
1029
      if (options.getWorkflowRunTimeout() != null) {
1✔
1030
        attributes.setWorkflowRunTimeout(
×
1031
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1032
      }
1033
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1034
        attributes.setWorkflowTaskTimeout(
×
1035
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1036
      }
1037
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1038
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1039
      }
1040
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1041
      if (searchAttributes != null) {
1✔
1042
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
1043
      }
1044
      Map<String, Object> memo = options.getMemo();
1✔
1045
      if (memo != null) {
1✔
1046
        attributes.setMemo(
1✔
1047
            Memo.newBuilder()
1✔
1048
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1049
      }
1050
    }
1051

1052
    List<ContextPropagator> propagators =
1053
        options != null && options.getContextPropagators() != null
1✔
1054
            ? options.getContextPropagators()
×
1055
            : this.contextPropagators;
1✔
1056
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1057
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1058
    attributes.setHeader(grpcHeader);
1✔
1059

1060
    Optional<Payloads> payloads =
1✔
1061
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1062
    payloads.ifPresent(attributes::setInput);
1✔
1063

1064
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1065
    WorkflowThread.exit();
×
1066
  }
×
1067

1068
  @Override
1069
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1070
    CompletablePromise<Void> result = Workflow.newPromise();
×
1071
    replayContext.requestCancelExternalWorkflowExecution(
×
1072
        input.getExecution(),
×
1073
        (r, exception) -> {
1074
          if (exception == null) {
×
1075
            result.complete(null);
×
1076
          } else {
1077
            result.completeExceptionally(exception);
×
1078
          }
1079
        });
×
1080
    return new CancelWorkflowOutput(result);
×
1081
  }
1082

1083
  public Scope getMetricsScope() {
1084
    return replayContext.getMetricsScope();
1✔
1085
  }
1086

1087
  public boolean isLoggingEnabledInReplay() {
1088
    return replayContext.getEnableLoggingInReplay();
×
1089
  }
1090

1091
  @Override
1092
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1093
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1094
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1095
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1096
    replayContext.upsertSearchAttributes(attr);
1✔
1097
  }
1✔
1098

1099
  @Nonnull
1100
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1101
    return runner.newWorkflowThread(runnable, false, name);
1✔
1102
  }
1103

1104
  @Nonnull
1105
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1106
    return runner.newCallbackThread(runnable, name);
1✔
1107
  }
1108

1109
  @Override
1110
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1111
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1112
  }
1113

1114
  @Override
1115
  public long currentTimeMillis() {
1116
    return replayContext.currentTimeMillis();
1✔
1117
  }
1118

1119
  /**
1120
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1121
   * thread and should be replaced with another specific implementation during initialization stage
1122
   * {@code workflow.initialize()} performed inside the workflow root thread.
1123
   *
1124
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1125
   */
1126
  private static final class InitialWorkflowInboundCallsInterceptor
1127
      extends BaseRootWorkflowInboundCallsInterceptor {
1128

1129
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1130
      super(workflowContext);
1✔
1131
    }
1✔
1132

1133
    @Override
1134
    public WorkflowOutput execute(WorkflowInput input) {
1135
      throw new UnsupportedOperationException(
×
1136
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1137
              + "before #execute can be called");
1138
    }
1139
  }
1140

1141
  @Nonnull
1142
  @Override
1143
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1144
    return workflowImplementationOptions;
1✔
1145
  }
1146

1147
  @Override
1148
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1149
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1150
  }
1151

1152
  @Nullable
1153
  @Override
1154
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1155
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1156
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1157
  }
1158

1159
  @Override
1160
  public List<ContextPropagator> getContextPropagators() {
1161
    return contextPropagators;
1✔
1162
  }
1163

1164
  @Override
1165
  public Map<String, Object> getPropagatedContexts() {
1166
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1167
      return new HashMap<>();
1✔
1168
    }
1169

1170
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1171
    Map<String, Object> contextData = new HashMap<>();
1✔
1172
    for (ContextPropagator propagator : contextPropagators) {
1✔
1173
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1174
    }
1✔
1175

1176
    return contextData;
1✔
1177
  }
1178

1179
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1180
  private static class FailureWrapperException extends RuntimeException {
1181
    private final Failure failure;
1182

1183
    public FailureWrapperException(Failure failure) {
1✔
1184
      this.failure = failure;
1✔
1185
    }
1✔
1186

1187
    public Failure getFailure() {
1188
      return failure;
1✔
1189
    }
1190
  }
1191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc