• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.29
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
26
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;
27
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
28

29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Preconditions;
31
import com.uber.m3.tally.Scope;
32
import io.temporal.activity.ActivityOptions;
33
import io.temporal.activity.LocalActivityOptions;
34
import io.temporal.api.command.v1.*;
35
import io.temporal.api.common.v1.ActivityType;
36
import io.temporal.api.common.v1.Memo;
37
import io.temporal.api.common.v1.Payload;
38
import io.temporal.api.common.v1.Payloads;
39
import io.temporal.api.common.v1.SearchAttributes;
40
import io.temporal.api.common.v1.WorkflowExecution;
41
import io.temporal.api.common.v1.WorkflowType;
42
import io.temporal.api.enums.v1.ParentClosePolicy;
43
import io.temporal.api.failure.v1.Failure;
44
import io.temporal.api.history.v1.HistoryEvent;
45
import io.temporal.api.sdk.v1.UserMetadata;
46
import io.temporal.api.taskqueue.v1.TaskQueue;
47
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
48
import io.temporal.client.WorkflowException;
49
import io.temporal.common.RetryOptions;
50
import io.temporal.common.SearchAttributeUpdate;
51
import io.temporal.common.context.ContextPropagator;
52
import io.temporal.common.converter.DataConverter;
53
import io.temporal.common.interceptors.Header;
54
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
55
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
56
import io.temporal.failure.*;
57
import io.temporal.internal.common.ActivityOptionUtils;
58
import io.temporal.internal.common.HeaderUtils;
59
import io.temporal.internal.common.OptionsUtils;
60
import io.temporal.internal.common.ProtobufTimeUtils;
61
import io.temporal.internal.common.SdkFlag;
62
import io.temporal.internal.common.SearchAttributesUtil;
63
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
64
import io.temporal.internal.replay.ReplayWorkflowContext;
65
import io.temporal.internal.replay.WorkflowContext;
66
import io.temporal.internal.statemachines.*;
67
import io.temporal.payload.context.ActivitySerializationContext;
68
import io.temporal.payload.context.WorkflowSerializationContext;
69
import io.temporal.worker.WorkflowImplementationOptions;
70
import io.temporal.workflow.*;
71
import io.temporal.workflow.Functions.Func;
72
import java.lang.reflect.Type;
73
import java.time.Duration;
74
import java.time.Instant;
75
import java.util.*;
76
import java.util.concurrent.atomic.AtomicBoolean;
77
import java.util.concurrent.atomic.AtomicReference;
78
import java.util.function.BiPredicate;
79
import java.util.function.Supplier;
80
import javax.annotation.Nonnull;
81
import javax.annotation.Nullable;
82
import org.slf4j.Logger;
83
import org.slf4j.LoggerFactory;
84

85
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
86
// RootWorkflowOutboundInterceptor
87

88
/**
89
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
90
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
91
 * definition on the worker meets ReplayWorkflowContext which contains information from the
92
 * WorkflowTask
93
 */
94
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
95
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
96

97
  private final String namespace;
98
  private final WorkflowExecution workflowExecution;
99
  private final WorkflowImplementationOptions workflowImplementationOptions;
100
  private final DataConverter dataConverter;
101
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
102
  private final DataConverter dataConverterWithCurrentWorkflowContext;
103
  private final List<ContextPropagator> contextPropagators;
104
  private final SignalDispatcher signalDispatcher;
105
  private final QueryDispatcher queryDispatcher;
106
  private final UpdateDispatcher updateDispatcher;
107

108
  // initialized later when these entities are created
109
  private ReplayWorkflowContext replayContext;
110
  private DeterministicRunner runner;
111

112
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
113
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
114

115
  private ActivityOptions defaultActivityOptions = null;
1✔
116
  private Map<String, ActivityOptions> activityOptionsMap;
117
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
118
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
119
  private NexusServiceOptions defaultNexusServiceOptions = null;
1✔
120
  private Map<String, NexusServiceOptions> nexusServiceOptionsMap;
121
  private boolean readOnly = false;
1✔
122
  private final WorkflowThreadLocal<UpdateInfo> currentUpdateInfo = new WorkflowThreadLocal<>();
1✔
123
  // Map of all running update handlers. Key is the update Id of the update request.
124
  private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new HashMap<>();
1✔
125
  // Map of all running signal handlers. Key is the event Id of the signal event.
126
  private Map<Long, SignalHandlerInfo> runningSignalHandlers = new HashMap<>();
1✔
127

128
  public SyncWorkflowContext(
129
      @Nonnull String namespace,
130
      @Nonnull WorkflowExecution workflowExecution,
131
      SignalDispatcher signalDispatcher,
132
      QueryDispatcher queryDispatcher,
133
      UpdateDispatcher updateDispatcher,
134
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
135
      DataConverter dataConverter,
136
      List<ContextPropagator> contextPropagators) {
1✔
137
    this.namespace = namespace;
1✔
138
    this.workflowExecution = workflowExecution;
1✔
139
    this.dataConverter = dataConverter;
1✔
140
    this.dataConverterWithCurrentWorkflowContext =
1✔
141
        dataConverter.withContext(
1✔
142
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
143
    this.contextPropagators = contextPropagators;
1✔
144
    this.signalDispatcher = signalDispatcher;
1✔
145
    this.queryDispatcher = queryDispatcher;
1✔
146
    this.updateDispatcher = updateDispatcher;
1✔
147
    if (workflowImplementationOptions != null) {
1✔
148
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
149
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
150
      this.defaultLocalActivityOptions =
1✔
151
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
152
      this.localActivityOptionsMap =
1✔
153
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
154
      this.defaultNexusServiceOptions =
1✔
155
          workflowImplementationOptions.getDefaultNexusServiceOptions();
1✔
156
      this.nexusServiceOptionsMap =
1✔
157
          new HashMap<>(workflowImplementationOptions.getNexusServiceOptions());
1✔
158
    }
159
    this.workflowImplementationOptions =
1✔
160
        workflowImplementationOptions == null
1✔
161
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
162
            : workflowImplementationOptions;
1✔
163
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
164
    // with actual interceptors through #initHeadInboundCallsInterceptor and
165
    // #initHeadOutboundCallsInterceptor during initialization phase.
166
    // See workflow.initialize() performed inside the workflow root thread inside
167
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
168
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
169
    this.headOutboundInterceptor = this;
1✔
170
  }
1✔
171

172
  public void setReplayContext(ReplayWorkflowContext context) {
173
    this.replayContext = context;
1✔
174
  }
1✔
175

176
  /**
177
   * Using setter, as runner is initialized with this context, so it is not ready during
178
   * construction of this.
179
   */
180
  public void setRunner(DeterministicRunner runner) {
181
    this.runner = runner;
1✔
182
  }
1✔
183

184
  public DeterministicRunner getRunner() {
185
    return runner;
×
186
  }
187

188
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
189
    return headOutboundInterceptor;
1✔
190
  }
191

192
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
193
    return headInboundInterceptor;
1✔
194
  }
195

196
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
197
    headOutboundInterceptor = head;
1✔
198
  }
1✔
199

200
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
201
    headInboundInterceptor = head;
1✔
202
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
203
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
204
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
205
  }
1✔
206

207
  public ActivityOptions getDefaultActivityOptions() {
208
    return defaultActivityOptions;
1✔
209
  }
210

211
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
212
    return activityOptionsMap != null
1✔
213
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
214
        : Collections.emptyMap();
×
215
  }
216

217
  public LocalActivityOptions getDefaultLocalActivityOptions() {
218
    return defaultLocalActivityOptions;
1✔
219
  }
220

221
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
222
    return localActivityOptionsMap != null
1✔
223
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
224
        : Collections.emptyMap();
×
225
  }
226

227
  public NexusServiceOptions getDefaultNexusServiceOptions() {
228
    return defaultNexusServiceOptions;
×
229
  }
230

231
  public @Nonnull Map<String, NexusServiceOptions> getNexusServiceOptions() {
232
    return nexusServiceOptionsMap != null
1✔
233
        ? Collections.unmodifiableMap(nexusServiceOptionsMap)
1✔
234
        : Collections.emptyMap();
×
235
  }
236

237
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
238
    this.defaultActivityOptions =
1✔
239
        (this.defaultActivityOptions == null)
1✔
240
            ? defaultActivityOptions
×
241
            : this.defaultActivityOptions.toBuilder()
1✔
242
                .mergeActivityOptions(defaultActivityOptions)
1✔
243
                .build();
1✔
244
  }
1✔
245

246
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
247
    Objects.requireNonNull(activityTypeToOption);
1✔
248
    if (this.activityOptionsMap == null) {
1✔
249
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
250
      return;
×
251
    }
252
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
253
  }
1✔
254

255
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
256
    this.defaultLocalActivityOptions =
1✔
257
        (this.defaultLocalActivityOptions == null)
1✔
258
            ? defaultLocalActivityOptions
×
259
            : this.defaultLocalActivityOptions.toBuilder()
1✔
260
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
261
                .build();
1✔
262
  }
1✔
263

264
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
265
    Objects.requireNonNull(activityTypeToOption);
1✔
266
    if (this.localActivityOptionsMap == null) {
1✔
267
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
268
      return;
×
269
    }
270
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
271
        localActivityOptionsMap, activityTypeToOption);
272
  }
1✔
273

274
  @Override
275
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
276
    ActivitySerializationContext serializationContext =
1✔
277
        new ActivitySerializationContext(
278
            replayContext.getNamespace(),
1✔
279
            replayContext.getWorkflowId(),
1✔
280
            replayContext.getWorkflowType().getName(),
1✔
281
            input.getActivityName(),
1✔
282
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
283
            // by the Server in this case
284
            MoreObjects.firstNonNull(
1✔
285
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
286
            false);
287
    DataConverter dataConverterWithActivityContext =
1✔
288
        dataConverter.withContext(serializationContext);
1✔
289
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
290

291
    ActivityOutput<Optional<Payloads>> output =
1✔
292
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
293

294
    // Avoid passing the input to the output handle as it causes the input to be retained for the
295
    // duration of the operation.
296
    Type resultType = input.getResultType();
1✔
297
    Class<T> resultClass = input.getResultClass();
1✔
298
    return new ActivityOutput<>(
1✔
299
        output.getActivityId(),
1✔
300
        output
301
            .getResult()
1✔
302
            .handle(
1✔
303
                (r, f) -> {
304
                  if (f == null) {
1✔
305
                    return resultType != Void.TYPE
1✔
306
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
307
                            0, r, resultClass, resultType)
308
                        : null;
1✔
309
                  } else {
310
                    throw dataConverterWithActivityContext.failureToException(
1✔
311
                        ((FailureWrapperException) f).getFailure());
1✔
312
                  }
313
                }));
314
  }
315

316
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
317
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
318
    ExecuteActivityParameters params =
1✔
319
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
320
    ActivityCallback callback = new ActivityCallback();
1✔
321
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
322
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
323
    CancellationScope.current()
1✔
324
        .getCancellationRequest()
1✔
325
        .thenApply(
1✔
326
            (reason) -> {
327
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
328
              return null;
1✔
329
            });
330
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
331
  }
332

333
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
334
    signalDispatcher.handleInterceptedSignal(input);
1✔
335
  }
1✔
336

337
  public void handleSignal(
338
      String signalName, Optional<Payloads> input, long eventId, Header header) {
339
    signalDispatcher.handleSignal(signalName, input, eventId, header);
1✔
340
  }
1✔
341

342
  public void handleValidateUpdate(
343
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
344
    updateDispatcher.handleValidateUpdate(updateName, updateId, input, eventId, header);
1✔
345
  }
1✔
346

347
  public Optional<Payloads> handleExecuteUpdate(
348
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
349
    return updateDispatcher.handleExecuteUpdate(updateName, updateId, input, eventId, header);
1✔
350
  }
351

352
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
353
    updateDispatcher.handleInterceptedValidateUpdate(input);
1✔
354
  }
1✔
355

356
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
357
      WorkflowInboundCallsInterceptor.UpdateInput input) {
358
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
1✔
359
  }
360

361
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
362
      WorkflowInboundCallsInterceptor.QueryInput input) {
363
    return queryDispatcher.handleInterceptedQuery(input);
1✔
364
  }
365

366
  public Optional<Payloads> handleQuery(String queryName, Header header, Optional<Payloads> input) {
367
    return queryDispatcher.handleQuery(queryName, header, input);
1✔
368
  }
369

370
  public boolean isEveryHandlerFinished() {
371
    return updateDispatcher.getRunningUpdateHandlers().isEmpty()
1✔
372
        && signalDispatcher.getRunningSignalHandlers().isEmpty();
1✔
373
  }
374

375
  private class ActivityCallback {
1✔
376
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
377

378
    public void invoke(Optional<Payloads> output, Failure failure) {
379
      if (failure != null) {
1✔
380
        runner.executeInWorkflowThread(
1✔
381
            "activity failure callback",
382
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
383
      } else {
384
        runner.executeInWorkflowThread(
1✔
385
            "activity completion callback", () -> result.complete(output));
1✔
386
      }
387
    }
1✔
388
  }
389

390
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
391
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
392

393
    @Override
394
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
395
      if (exception != null) {
1✔
396
        runner.executeInWorkflowThread(
1✔
397
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
398
      } else {
399
        runner.executeInWorkflowThread(
1✔
400
            "local activity completion callback", () -> result.complete(successOutput));
1✔
401
      }
402
    }
1✔
403
  }
404

405
  @Override
406
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
407
    ActivitySerializationContext serializationContext =
1✔
408
        new ActivitySerializationContext(
409
            replayContext.getNamespace(),
1✔
410
            replayContext.getWorkflowId(),
1✔
411
            replayContext.getWorkflowType().getName(),
1✔
412
            input.getActivityName(),
1✔
413
            replayContext.getTaskQueue(),
1✔
414
            true);
415
    DataConverter dataConverterWithActivityContext =
1✔
416
        dataConverter.withContext(serializationContext);
1✔
417
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
418

419
    long originalScheduledTime = System.currentTimeMillis();
1✔
420
    CompletablePromise<Optional<Payloads>> serializedResult =
421
        WorkflowInternal.newCompletablePromise();
1✔
422
    executeLocalActivityOverLocalRetryThreshold(
1✔
423
        input.getActivityName(),
1✔
424
        input.getOptions(),
1✔
425
        input.getHeader(),
1✔
426
        payloads,
427
        originalScheduledTime,
428
        1,
429
        null,
430
        serializedResult);
431

432
    // Avoid passing the input to the output handle as it causes the input to be retained for the
433
    // duration of the operation.
434
    Type resultType = input.getResultType();
1✔
435
    Class<R> resultClass = input.getResultClass();
1✔
436
    Promise<R> result =
1✔
437
        serializedResult.handle(
1✔
438
            (r, f) -> {
439
              if (f == null) {
1✔
440
                return resultClass != Void.TYPE
1✔
441
                    ? dataConverterWithActivityContext.fromPayloads(0, r, resultClass, resultType)
1✔
442
                    : null;
1✔
443
              } else {
444
                throw dataConverterWithActivityContext.failureToException(
1✔
445
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
446
              }
447
            });
448

449
    return new LocalActivityOutput<>(result);
1✔
450
  }
451

452
  public void executeLocalActivityOverLocalRetryThreshold(
453
      String activityTypeName,
454
      LocalActivityOptions options,
455
      Header header,
456
      Optional<Payloads> input,
457
      long originalScheduledTime,
458
      int attempt,
459
      @Nullable Failure previousExecutionFailure,
460
      CompletablePromise<Optional<Payloads>> result) {
461
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
462
        executeLocalActivityLocally(
1✔
463
            activityTypeName,
464
            options,
465
            header,
466
            input,
467
            originalScheduledTime,
468
            attempt,
469
            previousExecutionFailure);
470

471
    localExecutionResult.handle(
1✔
472
        (r, e) -> {
473
          if (e == null) {
1✔
474
            result.complete(r);
1✔
475
          } else {
476
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
477
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
478
                  (LocalActivityCallback.LocalActivityFailedException) e;
479
              @Nullable Duration backoff = laException.getBackoff();
1✔
480
              if (backoff != null) {
1✔
481
                WorkflowInternal.newTimer(backoff)
1✔
482
                    .thenApply(
1✔
483
                        unused -> {
484
                          executeLocalActivityOverLocalRetryThreshold(
1✔
485
                              activityTypeName,
486
                              options,
487
                              header,
488
                              input,
489
                              originalScheduledTime,
490
                              laException.getLastAttempt() + 1,
1✔
491
                              laException.getFailure(),
1✔
492
                              result);
493
                          return null;
1✔
494
                        });
495
              } else {
496
                // final failure, report back
497
                result.completeExceptionally(laException);
1✔
498
              }
499
            } else {
1✔
500
              // Only LocalActivityFailedException is expected
501
              String exceptionMessage =
×
502
                  String.format(
×
503
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
504
                      activityTypeName);
505
              log.warn(exceptionMessage, e);
×
506
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
507
            }
508
          }
509
          return null;
1✔
510
        });
511
  }
1✔
512

513
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
514
      String activityTypeName,
515
      LocalActivityOptions options,
516
      Header header,
517
      Optional<Payloads> input,
518
      long originalScheduledTime,
519
      int attempt,
520
      @Nullable Failure previousExecutionFailure) {
521

522
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
523
    ExecuteLocalActivityParameters params =
1✔
524
        constructExecuteLocalActivityParameters(
1✔
525
            activityTypeName,
526
            options,
527
            header,
528
            input,
529
            attempt,
530
            originalScheduledTime,
531
            previousExecutionFailure);
532
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
533
    CancellationScope.current()
1✔
534
        .getCancellationRequest()
1✔
535
        .thenApply(
1✔
536
            (reason) -> {
537
              cancellationCallback.apply();
×
538
              return null;
×
539
            });
540
    return callback.result;
1✔
541
  }
542

543
  private ExecuteActivityParameters constructExecuteActivityParameters(
544
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
545
    String taskQueue = options.getTaskQueue();
1✔
546
    if (taskQueue == null) {
1✔
547
      taskQueue = replayContext.getTaskQueue();
1✔
548
    }
549
    ScheduleActivityTaskCommandAttributes.Builder attributes =
550
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
551
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
552
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
553
            .setScheduleToStartTimeout(
1✔
554
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
555
            .setStartToCloseTimeout(
1✔
556
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
557
            .setScheduleToCloseTimeout(
1✔
558
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
559
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
560
            .setRequestEagerExecution(
1✔
561
                !options.isEagerExecutionDisabled()
1✔
562
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
563

564
    input.ifPresent(attributes::setInput);
1✔
565
    RetryOptions retryOptions = options.getRetryOptions();
1✔
566
    if (retryOptions != null) {
1✔
567
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
568
    }
569

570
    // Set the context value.  Use the context propagators from the ActivityOptions
571
    // if present, otherwise use the ones configured on the WorkflowContext
572
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
573
    if (propagators == null) {
1✔
574
      propagators = this.contextPropagators;
1✔
575
    }
576
    io.temporal.api.common.v1.Header grpcHeader =
1✔
577
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
578
    attributes.setHeader(grpcHeader);
1✔
579

580
    if (options.getVersioningIntent() != null) {
1✔
581
      attributes.setUseWorkflowBuildId(
1✔
582
          options
583
              .getVersioningIntent()
1✔
584
              .determineUseCompatibleFlag(
1✔
585
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
586
    }
587

588
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
589
  }
590

591
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
592
      String name,
593
      LocalActivityOptions options,
594
      Header header,
595
      Optional<Payloads> input,
596
      int attempt,
597
      long originalScheduledTime,
598
      @Nullable Failure previousExecutionFailure) {
599
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
600

601
    PollActivityTaskQueueResponse.Builder activityTask =
602
        PollActivityTaskQueueResponse.newBuilder()
1✔
603
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
604
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
605
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
606
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
607
            // used to pass scheduled time to the local activity code inside
608
            // ActivityExecutionContext#getInfo
609
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
610
            // into the LA queue
611
            .setScheduledTime(
1✔
612
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
613
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
614
            .setAttempt(attempt);
1✔
615

616
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
617
    if (scheduleToCloseTimeout != null) {
1✔
618
      activityTask.setScheduleToCloseTimeout(
1✔
619
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
620
    }
621

622
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
623
    if (startToCloseTimeout != null) {
1✔
624
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
625
    }
626

627
    io.temporal.api.common.v1.Header grpcHeader =
1✔
628
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
629
    activityTask.setHeader(grpcHeader);
1✔
630
    input.ifPresent(activityTask::setInput);
1✔
631
    RetryOptions retryOptions = options.getRetryOptions();
1✔
632
    activityTask.setRetryPolicy(
1✔
633
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
634
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
635
    if (localRetryThreshold == null) {
1✔
636
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
637
    }
638

639
    return new ExecuteLocalActivityParameters(
1✔
640
        activityTask,
641
        options.getScheduleToStartTimeout(),
1✔
642
        originalScheduledTime,
643
        previousExecutionFailure,
644
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
645
        localRetryThreshold);
646
  }
647

648
  @Override
649
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
650
    if (CancellationScope.current().isCancelRequested()) {
1✔
651
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
652
      return new ChildWorkflowOutput<>(
×
653
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
654
    }
655

656
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
657
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
658

659
    DataConverter dataConverterWithChildWorkflowContext =
1✔
660
        dataConverter.withContext(
1✔
661
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
662
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
663

664
    @Nullable
665
    Memo memo =
666
        (input.getOptions().getMemo() != null)
1✔
667
            ? Memo.newBuilder()
1✔
668
                .putAllFields(
1✔
669
                    intoPayloadMap(
1✔
670
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
671
                .build()
1✔
672
            : null;
1✔
673

674
    @Nullable
675
    UserMetadata userMetadata =
1✔
676
        makeUserMetaData(
1✔
677
            input.getOptions().getStaticSummary(),
1✔
678
            input.getOptions().getStaticDetails(),
1✔
679
            dataConverterWithChildWorkflowContext);
680

681
    StartChildWorkflowExecutionParameters parameters =
1✔
682
        createChildWorkflowParameters(
1✔
683
            input.getWorkflowId(),
1✔
684
            input.getWorkflowType(),
1✔
685
            input.getOptions(),
1✔
686
            input.getHeader(),
1✔
687
            payloads,
688
            memo,
689
            userMetadata);
690

691
    Functions.Proc1<Exception> cancellationCallback =
1✔
692
        replayContext.startChildWorkflow(
1✔
693
            parameters,
694
            (execution, failure) -> {
695
              if (failure != null) {
1✔
696
                runner.executeInWorkflowThread(
1✔
697
                    "child workflow start failed callback",
698
                    () ->
699
                        executionPromise.completeExceptionally(
1✔
700
                            mapChildWorkflowException(
1✔
701
                                failure, dataConverterWithChildWorkflowContext)));
702
              } else {
703
                runner.executeInWorkflowThread(
1✔
704
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
705
              }
706
            },
1✔
707
            (result, failure) -> {
708
              if (failure != null) {
1✔
709
                runner.executeInWorkflowThread(
1✔
710
                    "child workflow failure callback",
711
                    () ->
712
                        resultPromise.completeExceptionally(
1✔
713
                            mapChildWorkflowException(
1✔
714
                                failure, dataConverterWithChildWorkflowContext)));
715
              } else {
716
                runner.executeInWorkflowThread(
1✔
717
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
718
              }
719
            });
1✔
720
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
721
    CancellationScope.current()
1✔
722
        .getCancellationRequest()
1✔
723
        .thenApply(
1✔
724
            (reason) -> {
725
              if (!callbackCalled.getAndSet(true)) {
1✔
726
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
727
              }
728
              return null;
1✔
729
            });
730

731
    // Avoid passing the input to the output handle as it causes the input to be retained for the
732
    // duration of the operation.
733
    Type resultType = input.getResultType();
1✔
734
    Class<R> resultClass = input.getResultClass();
1✔
735
    Promise<R> result =
1✔
736
        resultPromise.thenApply(
1✔
737
            (b) ->
738
                dataConverterWithChildWorkflowContext.fromPayloads(0, b, resultClass, resultType));
1✔
739
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
740
  }
741

742
  @Override
743
  public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
744
      ExecuteNexusOperationInput<R> input) {
745
    Preconditions.checkArgument(
1✔
746
        input.getEndpoint() != null && !input.getEndpoint().isEmpty(), "endpoint must be set");
1✔
747
    Preconditions.checkArgument(
1✔
748
        input.getService() != null && !input.getService().isEmpty(), "service must be set");
1✔
749

750
    if (CancellationScope.current().isCancelRequested()) {
1✔
751
      CanceledFailure canceledFailure =
×
752
          new CanceledFailure("execute nexus operation called from a canceled scope");
753
      return new ExecuteNexusOperationOutput<>(
×
754
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
755
    }
756

757
    CompletablePromise<NexusOperationExecution> operationPromise = Workflow.newPromise();
1✔
758
    CompletablePromise<Optional<Payload>> resultPromise = Workflow.newPromise();
1✔
759

760
    // Not using the context aware data converter because the context will not be available on the
761
    // worker side
762
    Optional<Payload> payload = dataConverter.toPayload(input.getArg());
1✔
763

764
    ScheduleNexusOperationCommandAttributes.Builder attributes =
765
        ScheduleNexusOperationCommandAttributes.newBuilder();
1✔
766
    payload.ifPresent(attributes::setInput);
1✔
767
    attributes.setOperation(input.getOperation());
1✔
768
    attributes.setService(input.getService());
1✔
769
    attributes.setEndpoint(input.getEndpoint());
1✔
770
    attributes.putAllNexusHeader(input.getHeaders());
1✔
771
    attributes.setScheduleToCloseTimeout(
1✔
772
        ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
1✔
773

774
    Functions.Proc1<Exception> cancellationCallback =
1✔
775
        replayContext.startNexusOperation(
1✔
776
            attributes.build(),
1✔
777
            (operationExec, failure) -> {
778
              if (failure != null) {
1✔
779
                runner.executeInWorkflowThread(
1✔
780
                    "nexus operation start failed callback",
781
                    () ->
782
                        operationPromise.completeExceptionally(
1✔
783
                            dataConverter.failureToException(failure)));
1✔
784
              } else {
785
                runner.executeInWorkflowThread(
1✔
786
                    "nexus operation started callback",
787
                    () ->
788
                        operationPromise.complete(new NexusOperationExecutionImpl(operationExec)));
1✔
789
              }
790
            },
1✔
791
            (Optional<Payload> result, Failure failure) -> {
792
              if (failure != null) {
1✔
793
                runner.executeInWorkflowThread(
1✔
794
                    "nexus operation failure callback",
795
                    () ->
796
                        resultPromise.completeExceptionally(
1✔
797
                            dataConverter.failureToException(failure)));
1✔
798
              } else {
799
                runner.executeInWorkflowThread(
1✔
800
                    "nexus operation completion callback", () -> resultPromise.complete(result));
1✔
801
              }
802
            });
1✔
803
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
804
    CancellationScope.current()
1✔
805
        .getCancellationRequest()
1✔
806
        .thenApply(
1✔
807
            (reason) -> {
808
              if (!callbackCalled.getAndSet(true)) {
1✔
809
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
810
              }
811
              return null;
1✔
812
            });
813
    Promise<R> result =
1✔
814
        resultPromise.thenApply(
1✔
815
            (b) ->
816
                input.getResultClass() != Void.class
1✔
817
                    ? dataConverter.fromPayload(
1✔
818
                        b.get(), input.getResultClass(), input.getResultType())
1✔
819
                    : null);
1✔
820
    return new ExecuteNexusOperationOutput<>(result, operationPromise);
1✔
821
  }
822

823
  @SuppressWarnings("deprecation")
824
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
825
      String workflowId,
826
      String name,
827
      ChildWorkflowOptions options,
828
      Header header,
829
      Optional<Payloads> input,
830
      @Nullable Memo memo,
831
      @Nullable UserMetadata metadata) {
832
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
833
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
834
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
835
    attributes.setWorkflowId(workflowId);
1✔
836
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
837
    input.ifPresent(attributes::setInput);
1✔
838
    attributes.setWorkflowRunTimeout(
1✔
839
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
840
    attributes.setWorkflowExecutionTimeout(
1✔
841
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
842
    attributes.setWorkflowTaskTimeout(
1✔
843
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
844
    String taskQueue = options.getTaskQueue();
1✔
845
    if (taskQueue != null) {
1✔
846
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
847
    }
848
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
849
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
850
    }
851
    RetryOptions retryOptions = options.getRetryOptions();
1✔
852
    if (retryOptions != null) {
1✔
853
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
854
    }
855
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
856

857
    if (memo != null) {
1✔
858
      attributes.setMemo(memo);
1✔
859
    }
860

861
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
862
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
863
      if (options.getTypedSearchAttributes() != null) {
1✔
864
        throw new IllegalArgumentException(
×
865
            "Cannot have both typed search attributes and search attributes");
866
      }
867
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
868
    } else if (options.getTypedSearchAttributes() != null) {
1✔
869
      attributes.setSearchAttributes(
1✔
870
          SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
871
    }
872

873
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
874
    if (propagators == null) {
1✔
875
      propagators = this.contextPropagators;
1✔
876
    }
877
    io.temporal.api.common.v1.Header grpcHeader =
1✔
878
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
879
    attributes.setHeader(grpcHeader);
1✔
880

881
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
882
    if (parentClosePolicy != null) {
1✔
883
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
884
    }
885

886
    if (options.getVersioningIntent() != null) {
1✔
887
      attributes.setInheritBuildId(
1✔
888
          options
889
              .getVersioningIntent()
1✔
890
              .determineUseCompatibleFlag(
1✔
891
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
892
    }
893
    return new StartChildWorkflowExecutionParameters(
1✔
894
        attributes, options.getCancellationType(), metadata);
1✔
895
  }
896

897
  private static Header extractContextsAndConvertToBytes(
898
      List<ContextPropagator> contextPropagators) {
899
    if (contextPropagators == null) {
1✔
900
      return null;
×
901
    }
902
    Map<String, Payload> result = new HashMap<>();
1✔
903
    for (ContextPropagator propagator : contextPropagators) {
1✔
904
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
905
    }
1✔
906
    return new Header(result);
1✔
907
  }
908

909
  private static RuntimeException mapChildWorkflowException(
910
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
911
    if (failure == null) {
1✔
912
      return null;
×
913
    }
914
    if (failure instanceof TemporalFailure) {
1✔
915
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
916
    }
917
    if (failure instanceof CanceledFailure) {
1✔
918
      return (CanceledFailure) failure;
1✔
919
    }
920
    if (failure instanceof WorkflowException) {
1✔
921
      return (RuntimeException) failure;
×
922
    }
923
    if (failure instanceof ChildWorkflowFailure) {
1✔
924
      return (ChildWorkflowFailure) failure;
1✔
925
    }
926
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
927
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
928
    }
929
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
930
    Throwable cause =
1✔
931
        dataConverterWithChildWorkflowContext.failureToException(
1✔
932
            taskFailed.getOriginalCauseFailure());
1✔
933
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
934
    return new ChildWorkflowFailure(
1✔
935
        exception.getInitiatedEventId(),
1✔
936
        exception.getStartedEventId(),
1✔
937
        exception.getWorkflowType(),
1✔
938
        exception.getExecution(),
1✔
939
        exception.getNamespace(),
1✔
940
        exception.getRetryState(),
1✔
941
        cause);
942
  }
943

944
  @Override
945
  public Promise<Void> newTimer(Duration delay) {
946
    return newTimer(delay, TimerOptions.newBuilder().build());
1✔
947
  }
948

949
  @Override
950
  public Promise<Void> newTimer(Duration delay, TimerOptions options) {
951
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
952

953
    @Nullable
954
    UserMetadata userMetadata =
1✔
955
        makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
1✔
956

957
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
958
        replayContext.newTimer(
1✔
959
            delay,
960
            userMetadata,
961
            (e) ->
962
                runner.executeInWorkflowThread(
1✔
963
                    "timer-callback",
964
                    () -> {
965
                      if (e == null) {
1✔
966
                        p.complete(null);
1✔
967
                      } else {
968
                        p.completeExceptionally(e);
1✔
969
                      }
970
                    }));
1✔
971
    CancellationScope.current()
1✔
972
        .getCancellationRequest()
1✔
973
        .thenApply(
1✔
974
            (r) -> {
975
              cancellationHandler.apply(new CanceledFailure(r));
1✔
976
              return r;
1✔
977
            });
978
    return p;
1✔
979
  }
980

981
  @Override
982
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
983
    try {
984
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
985
      replayContext.sideEffect(
1✔
986
          () -> {
987
            try {
988
              readOnly = true;
1✔
989
              R r = func.apply();
1✔
990
              return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
991
            } finally {
992
              readOnly = false;
1✔
993
            }
994
          },
995
          (p) ->
996
              runner.executeInWorkflowThread(
1✔
997
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
998
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
999
          0, result.get(), resultClass, resultType);
1✔
1000
    } catch (Exception e) {
1✔
1001
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
1002
      // fail the workflow task by throwing an Error.
1003
      throw new Error(e);
1✔
1004
    }
1005
  }
1006

1007
  @Override
1008
  public <R> R mutableSideEffect(
1009
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1010
    try {
1011
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
1012
    } catch (Exception e) {
1✔
1013
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
1014
      // behavior. So fail the workflow task by throwing an Error.
1015
      throw new Error(e);
1✔
1016
    }
1017
  }
1018

1019
  private <R> R mutableSideEffectImpl(
1020
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1021
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
1022
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
1023
    replayContext.mutableSideEffect(
1✔
1024
        id,
1025
        (storedBinary) -> {
1026
          Optional<R> stored =
1✔
1027
              storedBinary.map(
1✔
1028
                  (b) ->
1029
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1030
                          0, Optional.of(b), resultClass, resultType));
1✔
1031
          try {
1032
            readOnly = true;
1✔
1033
            R funcResult =
1✔
1034
                Objects.requireNonNull(
1✔
1035
                    func.apply(), "mutableSideEffect function " + "returned null");
1✔
1036
            if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
1037
              unserializedResult.set(funcResult);
1✔
1038
              return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
1039
            }
1040
            return Optional.empty(); // returned only when value doesn't need to be updated
1✔
1041
          } finally {
1042
            readOnly = false;
1✔
1043
          }
1044
        },
1045
        (p) ->
1046
            runner.executeInWorkflowThread(
1✔
1047
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
1048

1049
    if (!result.get().isPresent()) {
1✔
1050
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
1051
    }
1052
    // An optimization that avoids unnecessary deserialization of the result.
1053
    R unserialized = unserializedResult.get();
1✔
1054
    if (unserialized != null) {
1✔
1055
      return unserialized;
1✔
1056
    }
1057
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1058
        0, result.get(), resultClass, resultType);
1✔
1059
  }
1060

1061
  @Override
1062
  public int getVersion(String changeId, int minSupported, int maxSupported) {
1063
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
1064
    boolean markerExists =
1✔
1065
        replayContext.getVersion(
1✔
1066
            changeId,
1067
            minSupported,
1068
            maxSupported,
1069
            (v, e) ->
1070
                runner.executeInWorkflowThread(
1✔
1071
                    "version-callback",
1072
                    () -> {
1073
                      if (v != null) {
1✔
1074
                        result.complete(v);
1✔
1075
                      } else {
1076
                        result.completeExceptionally(e);
1✔
1077
                      }
1078
                    }));
1✔
1079
    /*
1080
     * If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist
1081
     * on the original execution. If the call did not exist on the original execution then we cannot block on results
1082
     * because it can lead to non-deterministic scheduling.
1083
     * */
1084
    if (replayContext.isReplaying()
1✔
1085
        && !markerExists
1086
        && replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
1✔
1087
        && minSupported == DEFAULT_VERSION) {
1088
      return DEFAULT_VERSION;
1✔
1089
    }
1090

1091
    try {
1092
      return result.get();
1✔
1093
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
1094
      throw new UnsupportedVersion(ex);
1✔
1095
    }
1096
  }
1097

1098
  @Override
1099
  public void registerQuery(RegisterQueryInput request) {
1100
    queryDispatcher.registerQueryHandlers(request);
1✔
1101
  }
1✔
1102

1103
  @Override
1104
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
1105
    signalDispatcher.registerSignalHandlers(input);
1✔
1106
  }
1✔
1107

1108
  @Override
1109
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
1110
    updateDispatcher.registerUpdateHandlers(input);
1✔
1111
  }
1✔
1112

1113
  @Override
1114
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
1115
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
1116
  }
1✔
1117

1118
  @Override
1119
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
1120
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
1121
  }
1✔
1122

1123
  @Override
1124
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
1125
    updateDispatcher.registerDynamicUpdateHandler(input);
1✔
1126
  }
1✔
1127

1128
  @Override
1129
  public UUID randomUUID() {
1130
    return replayContext.randomUUID();
1✔
1131
  }
1132

1133
  @Override
1134
  public Random newRandom() {
1135
    return replayContext.newRandom();
1✔
1136
  }
1137

1138
  public DataConverter getDataConverter() {
1139
    return dataConverter;
×
1140
  }
1141

1142
  public DataConverter getDataConverterWithCurrentWorkflowContext() {
1143
    return dataConverterWithCurrentWorkflowContext;
1✔
1144
  }
1145

1146
  boolean isReplaying() {
1147
    return replayContext.isReplaying();
1✔
1148
  }
1149

1150
  boolean isReadOnly() {
1151
    return readOnly;
1✔
1152
  }
1153

1154
  void setReadOnly(boolean readOnly) {
1155
    this.readOnly = readOnly;
1✔
1156
  }
1✔
1157

1158
  @Override
1159
  public Map<Long, SignalHandlerInfo> getRunningSignalHandlers() {
1160
    return signalDispatcher.getRunningSignalHandlers();
1✔
1161
  }
1162

1163
  @Override
1164
  public Map<String, UpdateHandlerInfo> getRunningUpdateHandlers() {
1165
    return updateDispatcher.getRunningUpdateHandlers();
1✔
1166
  }
1167

1168
  @Override
1169
  public ReplayWorkflowContext getReplayContext() {
1170
    return replayContext;
1✔
1171
  }
1172

1173
  @Override
1174
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
1175
    WorkflowExecution childExecution = input.getExecution();
1✔
1176
    DataConverter dataConverterWithChildWorkflowContext =
1✔
1177
        dataConverter.withContext(
1✔
1178
            new WorkflowSerializationContext(
1179
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
1180
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
1181
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
1182
    attributes.setSignalName(input.getSignalName());
1✔
1183
    attributes.setExecution(childExecution);
1✔
1184
    attributes.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));
1✔
1185
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
1186
    payloads.ifPresent(attributes::setInput);
1✔
1187
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
1188
    Functions.Proc1<Exception> cancellationCallback =
1✔
1189
        replayContext.signalExternalWorkflowExecution(
1✔
1190
            attributes,
1191
            (output, failure) -> {
1192
              if (failure != null) {
1✔
1193
                runner.executeInWorkflowThread(
1✔
1194
                    "child workflow failure callback",
1195
                    () ->
1196
                        result.completeExceptionally(
1✔
1197
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
1198
              } else {
1199
                runner.executeInWorkflowThread(
1✔
1200
                    "child workflow completion callback", () -> result.complete(output));
1✔
1201
              }
1202
            });
1✔
1203
    CancellationScope.current()
1✔
1204
        .getCancellationRequest()
1✔
1205
        .thenApply(
1✔
1206
            (reason) -> {
1207
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
1208
              return null;
1✔
1209
            });
1210
    return new SignalExternalOutput(result);
1✔
1211
  }
1212

1213
  @Override
1214
  public void sleep(Duration duration) {
1215
    newTimer(duration).get();
1✔
1216
  }
1✔
1217

1218
  @Override
1219
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1220
    Promise<Void> timer = newTimer(timeout);
1✔
1221
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1222
    return !timer.isCompleted();
1✔
1223
  }
1224

1225
  @Override
1226
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1227
    WorkflowThread.await(reason, unblockCondition);
1✔
1228
  }
1✔
1229

1230
  @SuppressWarnings("deprecation")
1231
  @Override
1232
  public void continueAsNew(ContinueAsNewInput input) {
1233
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1234
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1235
    String workflowType = input.getWorkflowType();
1✔
1236
    if (workflowType != null) {
1✔
1237
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1238
    }
1239
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1240
    if (options != null) {
1✔
1241
      if (options.getWorkflowRunTimeout() != null) {
1✔
1242
        attributes.setWorkflowRunTimeout(
×
1243
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1244
      }
1245
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1246
        attributes.setWorkflowTaskTimeout(
×
1247
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1248
      }
1249
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1250
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1251
      }
1252
      if (options.getRetryOptions() != null) {
1✔
1253
        attributes.setRetryPolicy(toRetryPolicy(options.getRetryOptions()));
1✔
1254
      } else if (replayContext.getRetryOptions() != null) {
1✔
1255
        attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
1✔
1256
      }
1257
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1258
      if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
1259
        if (options.getTypedSearchAttributes() != null) {
×
1260
          throw new IllegalArgumentException(
×
1261
              "Cannot have typed search attributes and search attributes");
1262
        }
1263
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
×
1264
      } else if (options.getTypedSearchAttributes() != null) {
1✔
1265
        attributes.setSearchAttributes(
1✔
1266
            SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
1267
      }
1268
      Map<String, Object> memo = options.getMemo();
1✔
1269
      if (memo != null) {
1✔
1270
        attributes.setMemo(
1✔
1271
            Memo.newBuilder()
1✔
1272
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1273
      }
1274
      if (options.getVersioningIntent() != null) {
1✔
1275
        attributes.setInheritBuildId(
×
1276
            options
1277
                .getVersioningIntent()
×
1278
                .determineUseCompatibleFlag(
×
1279
                    replayContext.getTaskQueue().equals(options.getTaskQueue())));
×
1280
      }
1281
    } else if (replayContext.getRetryOptions() != null) {
1✔
1282
      // Have to copy retry options as server doesn't copy them.
1283
      attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
1✔
1284
    }
1285

1286
    List<ContextPropagator> propagators =
1287
        options != null && options.getContextPropagators() != null
1✔
1288
            ? options.getContextPropagators()
×
1289
            : this.contextPropagators;
1✔
1290
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1291
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1292
    attributes.setHeader(grpcHeader);
1✔
1293

1294
    Optional<Payloads> payloads =
1✔
1295
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1296
    payloads.ifPresent(attributes::setInput);
1✔
1297

1298
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1299
    WorkflowThread.exit();
×
1300
  }
×
1301

1302
  @Override
1303
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1304
    CompletablePromise<Void> result = Workflow.newPromise();
×
1305
    replayContext.requestCancelExternalWorkflowExecution(
×
1306
        input.getExecution(),
×
1307
        (r, exception) -> {
1308
          if (exception == null) {
×
1309
            result.complete(null);
×
1310
          } else {
1311
            result.completeExceptionally(exception);
×
1312
          }
1313
        });
×
1314
    return new CancelWorkflowOutput(result);
×
1315
  }
1316

1317
  @Override
1318
  public Scope getMetricsScope() {
1319
    return replayContext.getMetricsScope();
1✔
1320
  }
1321

1322
  public boolean isLoggingEnabledInReplay() {
1323
    return replayContext.getEnableLoggingInReplay();
1✔
1324
  }
1325

1326
  @Override
1327
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1328
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1329
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1330
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1331
    replayContext.upsertSearchAttributes(attr);
1✔
1332
  }
1✔
1333

1334
  @Override
1335
  public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
1336
    SearchAttributes attr = SearchAttributesUtil.encodeTypedUpdates(searchAttributeUpdates);
1✔
1337
    replayContext.upsertSearchAttributes(attr);
1✔
1338
  }
1✔
1339

1340
  @Override
1341
  public void upsertMemo(Map<String, Object> memo) {
1342
    Preconditions.checkArgument(memo != null, "null memo");
1✔
1343
    Preconditions.checkArgument(!memo.isEmpty(), "empty memo");
1✔
1344
    replayContext.upsertMemo(
1✔
1345
        Memo.newBuilder()
1✔
1346
            .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo))
1✔
1347
            .build());
1✔
1348
  }
1✔
1349

1350
  @Nonnull
1351
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1352
    return runner.newWorkflowThread(runnable, false, name);
1✔
1353
  }
1354

1355
  @Nonnull
1356
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1357
    return runner.newCallbackThread(runnable, name);
1✔
1358
  }
1359

1360
  @Override
1361
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1362
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1363
  }
1364

1365
  @Override
1366
  public long currentTimeMillis() {
1367
    return replayContext.currentTimeMillis();
1✔
1368
  }
1369

1370
  /**
1371
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1372
   * thread and should be replaced with another specific implementation during initialization stage
1373
   * {@code workflow.initialize()} performed inside the workflow root thread.
1374
   *
1375
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1376
   */
1377
  private static final class InitialWorkflowInboundCallsInterceptor
1378
      extends BaseRootWorkflowInboundCallsInterceptor {
1379

1380
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1381
      super(workflowContext);
1✔
1382
    }
1✔
1383

1384
    @Override
1385
    public WorkflowOutput execute(WorkflowInput input) {
1386
      throw new UnsupportedOperationException(
×
1387
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1388
              + "before #execute can be called");
1389
    }
1390
  }
1391

1392
  @Nonnull
1393
  @Override
1394
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1395
    return workflowImplementationOptions;
1✔
1396
  }
1397

1398
  @Override
1399
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1400
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1401
  }
1402

1403
  @Nullable
1404
  @Override
1405
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1406
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1407
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1408
  }
1409

1410
  @Override
1411
  public List<ContextPropagator> getContextPropagators() {
1412
    return contextPropagators;
1✔
1413
  }
1414

1415
  @Override
1416
  public Map<String, Object> getPropagatedContexts() {
1417
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1418
      return new HashMap<>();
1✔
1419
    }
1420

1421
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1422
    Map<String, Object> contextData = new HashMap<>();
1✔
1423
    for (ContextPropagator propagator : contextPropagators) {
1✔
1424
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1425
    }
1✔
1426

1427
    return contextData;
1✔
1428
  }
1429

1430
  public void setCurrentUpdateInfo(UpdateInfo updateInfo) {
1431
    currentUpdateInfo.set(updateInfo);
1✔
1432
  }
1✔
1433

1434
  public Optional<UpdateInfo> getCurrentUpdateInfo() {
1435
    return Optional.ofNullable(currentUpdateInfo.get());
1✔
1436
  }
1437

1438
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1439
  private static class FailureWrapperException extends RuntimeException {
1440
    private final Failure failure;
1441

1442
    public FailureWrapperException(Failure failure) {
1✔
1443
      this.failure = failure;
1✔
1444
    }
1✔
1445

1446
    public Failure getFailure() {
1447
      return failure;
1✔
1448
    }
1449
  }
1450
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc