• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.43
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.client.WorkflowClient.QUERY_TYPE_STACK_TRACE;
24
import static io.temporal.client.WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA;
25
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
26
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
27
import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy;
28
import static io.temporal.internal.common.WorkflowExecutionUtils.makeUserMetaData;
29
import static io.temporal.internal.sync.WorkflowInternal.DEFAULT_VERSION;
30

31
import com.google.common.base.MoreObjects;
32
import com.google.common.base.Preconditions;
33
import com.uber.m3.tally.Scope;
34
import io.temporal.activity.ActivityOptions;
35
import io.temporal.activity.LocalActivityOptions;
36
import io.temporal.api.command.v1.*;
37
import io.temporal.api.common.v1.ActivityType;
38
import io.temporal.api.common.v1.Memo;
39
import io.temporal.api.common.v1.Payload;
40
import io.temporal.api.common.v1.Payloads;
41
import io.temporal.api.common.v1.SearchAttributes;
42
import io.temporal.api.common.v1.WorkflowExecution;
43
import io.temporal.api.common.v1.WorkflowType;
44
import io.temporal.api.enums.v1.ParentClosePolicy;
45
import io.temporal.api.failure.v1.Failure;
46
import io.temporal.api.history.v1.HistoryEvent;
47
import io.temporal.api.sdk.v1.UserMetadata;
48
import io.temporal.api.sdk.v1.WorkflowDefinition;
49
import io.temporal.api.sdk.v1.WorkflowInteractionDefinition;
50
import io.temporal.api.sdk.v1.WorkflowMetadata;
51
import io.temporal.api.taskqueue.v1.TaskQueue;
52
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
53
import io.temporal.client.WorkflowException;
54
import io.temporal.common.RetryOptions;
55
import io.temporal.common.SearchAttributeUpdate;
56
import io.temporal.common.context.ContextPropagator;
57
import io.temporal.common.converter.DataConverter;
58
import io.temporal.common.interceptors.Header;
59
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
60
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
61
import io.temporal.failure.*;
62
import io.temporal.internal.common.ActivityOptionUtils;
63
import io.temporal.internal.common.HeaderUtils;
64
import io.temporal.internal.common.OptionsUtils;
65
import io.temporal.internal.common.ProtobufTimeUtils;
66
import io.temporal.internal.common.SdkFlag;
67
import io.temporal.internal.common.SearchAttributesUtil;
68
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
69
import io.temporal.internal.replay.ReplayWorkflowContext;
70
import io.temporal.internal.replay.WorkflowContext;
71
import io.temporal.internal.statemachines.*;
72
import io.temporal.payload.context.ActivitySerializationContext;
73
import io.temporal.payload.context.WorkflowSerializationContext;
74
import io.temporal.worker.WorkflowImplementationOptions;
75
import io.temporal.workflow.*;
76
import io.temporal.workflow.Functions.Func;
77
import java.lang.reflect.Type;
78
import java.time.Duration;
79
import java.time.Instant;
80
import java.util.*;
81
import java.util.concurrent.atomic.AtomicBoolean;
82
import java.util.concurrent.atomic.AtomicReference;
83
import java.util.function.BiPredicate;
84
import java.util.function.Supplier;
85
import javax.annotation.Nonnull;
86
import javax.annotation.Nullable;
87
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
89

90
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
91
// RootWorkflowOutboundInterceptor
92

93
/**
94
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
95
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
96
 * definition on the worker meets ReplayWorkflowContext which contains information from the
97
 * WorkflowTask
98
 */
99
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
100
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
101

102
  private final String namespace;
103
  private final WorkflowExecution workflowExecution;
104
  private final WorkflowImplementationOptions workflowImplementationOptions;
105
  private final DataConverter dataConverter;
106
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
107
  private final DataConverter dataConverterWithCurrentWorkflowContext;
108
  private final List<ContextPropagator> contextPropagators;
109
  private final SignalDispatcher signalDispatcher;
110
  private final QueryDispatcher queryDispatcher;
111
  private final UpdateDispatcher updateDispatcher;
112

113
  // initialized later when these entities are created
114
  private ReplayWorkflowContext replayContext;
115
  private DeterministicRunner runner;
116

117
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
118
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
119

120
  private ActivityOptions defaultActivityOptions = null;
1✔
121
  private Map<String, ActivityOptions> activityOptionsMap;
122
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
123
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
124
  private NexusServiceOptions defaultNexusServiceOptions = null;
1✔
125
  private Map<String, NexusServiceOptions> nexusServiceOptionsMap;
126
  private boolean readOnly = false;
1✔
127
  private final WorkflowThreadLocal<UpdateInfo> currentUpdateInfo = new WorkflowThreadLocal<>();
1✔
128
  // Map of all running update handlers. Key is the update Id of the update request.
129
  private Map<String, UpdateHandlerInfo> runningUpdateHandlers = new HashMap<>();
1✔
130
  // Map of all running signal handlers. Key is the event Id of the signal event.
131
  private Map<Long, SignalHandlerInfo> runningSignalHandlers = new HashMap<>();
1✔
132
  @Nullable private String currentDetails;
133

134
  public SyncWorkflowContext(
135
      @Nonnull String namespace,
136
      @Nonnull WorkflowExecution workflowExecution,
137
      SignalDispatcher signalDispatcher,
138
      QueryDispatcher queryDispatcher,
139
      UpdateDispatcher updateDispatcher,
140
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
141
      DataConverter dataConverter,
142
      List<ContextPropagator> contextPropagators) {
1✔
143
    this.namespace = namespace;
1✔
144
    this.workflowExecution = workflowExecution;
1✔
145
    this.dataConverter = dataConverter;
1✔
146
    this.dataConverterWithCurrentWorkflowContext =
1✔
147
        dataConverter.withContext(
1✔
148
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
149
    this.contextPropagators = contextPropagators;
1✔
150
    this.signalDispatcher = signalDispatcher;
1✔
151
    this.queryDispatcher = queryDispatcher;
1✔
152
    this.updateDispatcher = updateDispatcher;
1✔
153
    if (workflowImplementationOptions != null) {
1✔
154
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
155
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
156
      this.defaultLocalActivityOptions =
1✔
157
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
158
      this.localActivityOptionsMap =
1✔
159
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
160
      this.defaultNexusServiceOptions =
1✔
161
          workflowImplementationOptions.getDefaultNexusServiceOptions();
1✔
162
      this.nexusServiceOptionsMap =
1✔
163
          new HashMap<>(workflowImplementationOptions.getNexusServiceOptions());
1✔
164
    }
165
    this.workflowImplementationOptions =
1✔
166
        workflowImplementationOptions == null
1✔
167
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
168
            : workflowImplementationOptions;
1✔
169
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
170
    // with actual interceptors through #initHeadInboundCallsInterceptor and
171
    // #initHeadOutboundCallsInterceptor during initialization phase.
172
    // See workflow.initialize() performed inside the workflow root thread inside
173
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
174
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
175
    this.headOutboundInterceptor = this;
1✔
176
  }
1✔
177

178
  public void setReplayContext(ReplayWorkflowContext context) {
179
    this.replayContext = context;
1✔
180
  }
1✔
181

182
  /**
183
   * Using setter, as runner is initialized with this context, so it is not ready during
184
   * construction of this.
185
   */
186
  public void setRunner(DeterministicRunner runner) {
187
    this.runner = runner;
1✔
188
  }
1✔
189

190
  public DeterministicRunner getRunner() {
191
    return runner;
×
192
  }
193

194
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
195
    return headOutboundInterceptor;
1✔
196
  }
197

198
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
199
    return headInboundInterceptor;
1✔
200
  }
201

202
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
203
    headOutboundInterceptor = head;
1✔
204
  }
1✔
205

206
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
207
    headInboundInterceptor = head;
1✔
208
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
209
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
210
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
211
  }
1✔
212

213
  public ActivityOptions getDefaultActivityOptions() {
214
    return defaultActivityOptions;
1✔
215
  }
216

217
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
218
    return activityOptionsMap != null
1!
219
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
220
        : Collections.emptyMap();
×
221
  }
222

223
  public LocalActivityOptions getDefaultLocalActivityOptions() {
224
    return defaultLocalActivityOptions;
1✔
225
  }
226

227
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
228
    return localActivityOptionsMap != null
1!
229
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
230
        : Collections.emptyMap();
×
231
  }
232

233
  public NexusServiceOptions getDefaultNexusServiceOptions() {
234
    return defaultNexusServiceOptions;
1✔
235
  }
236

237
  public @Nonnull Map<String, NexusServiceOptions> getNexusServiceOptions() {
238
    return nexusServiceOptionsMap != null
1!
239
        ? Collections.unmodifiableMap(nexusServiceOptionsMap)
1✔
240
        : Collections.emptyMap();
×
241
  }
242

243
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
244
    this.defaultActivityOptions =
1✔
245
        (this.defaultActivityOptions == null)
1!
246
            ? defaultActivityOptions
×
247
            : this.defaultActivityOptions.toBuilder()
1✔
248
                .mergeActivityOptions(defaultActivityOptions)
1✔
249
                .build();
1✔
250
  }
1✔
251

252
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
253
    Objects.requireNonNull(activityTypeToOption);
1✔
254
    if (this.activityOptionsMap == null) {
1!
255
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
256
      return;
×
257
    }
258
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
259
  }
1✔
260

261
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
262
    this.defaultLocalActivityOptions =
1✔
263
        (this.defaultLocalActivityOptions == null)
1!
264
            ? defaultLocalActivityOptions
×
265
            : this.defaultLocalActivityOptions.toBuilder()
1✔
266
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
267
                .build();
1✔
268
  }
1✔
269

270
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
271
    Objects.requireNonNull(activityTypeToOption);
1✔
272
    if (this.localActivityOptionsMap == null) {
1!
273
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
274
      return;
×
275
    }
276
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
277
        localActivityOptionsMap, activityTypeToOption);
278
  }
1✔
279

280
  @Override
281
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
282
    ActivitySerializationContext serializationContext =
1✔
283
        new ActivitySerializationContext(
284
            replayContext.getNamespace(),
1✔
285
            replayContext.getWorkflowId(),
1✔
286
            replayContext.getWorkflowType().getName(),
1✔
287
            input.getActivityName(),
1✔
288
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
289
            // by the Server in this case
290
            MoreObjects.firstNonNull(
1✔
291
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
292
            false);
293
    DataConverter dataConverterWithActivityContext =
1✔
294
        dataConverter.withContext(serializationContext);
1✔
295
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
296

297
    ActivityOutput<Optional<Payloads>> output =
1✔
298
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
299

300
    // Avoid passing the input to the output handle as it causes the input to be retained for the
301
    // duration of the operation.
302
    Type resultType = input.getResultType();
1✔
303
    Class<T> resultClass = input.getResultClass();
1✔
304
    return new ActivityOutput<>(
1✔
305
        output.getActivityId(),
1✔
306
        output
307
            .getResult()
1✔
308
            .handle(
1✔
309
                (r, f) -> {
310
                  if (f == null) {
1✔
311
                    return resultType != Void.TYPE
1✔
312
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
313
                            0, r, resultClass, resultType)
314
                        : null;
1✔
315
                  } else {
316
                    throw dataConverterWithActivityContext.failureToException(
1✔
317
                        ((FailureWrapperException) f).getFailure());
1✔
318
                  }
319
                }));
320
  }
321

322
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
323
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
324
    ExecuteActivityParameters params =
1✔
325
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
326
    ActivityCallback callback = new ActivityCallback();
1✔
327
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
328
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
329
    CancellationScope.current()
1✔
330
        .getCancellationRequest()
1✔
331
        .thenApply(
1✔
332
            (reason) -> {
333
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
334
              return null;
1✔
335
            });
336
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
337
  }
338

339
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
340
    signalDispatcher.handleInterceptedSignal(input);
1✔
341
  }
1✔
342

343
  public void handleSignal(
344
      String signalName, Optional<Payloads> input, long eventId, Header header) {
345
    signalDispatcher.handleSignal(signalName, input, eventId, header);
1✔
346
  }
1✔
347

348
  public void handleValidateUpdate(
349
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
350
    updateDispatcher.handleValidateUpdate(updateName, updateId, input, eventId, header);
1✔
351
  }
1✔
352

353
  public Optional<Payloads> handleExecuteUpdate(
354
      String updateName, String updateId, Optional<Payloads> input, long eventId, Header header) {
355
    return updateDispatcher.handleExecuteUpdate(updateName, updateId, input, eventId, header);
1✔
356
  }
357

358
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
359
    updateDispatcher.handleInterceptedValidateUpdate(input);
1✔
360
  }
1✔
361

362
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
363
      WorkflowInboundCallsInterceptor.UpdateInput input) {
364
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
1✔
365
  }
366

367
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
368
      WorkflowInboundCallsInterceptor.QueryInput input) {
369
    return queryDispatcher.handleInterceptedQuery(input);
1✔
370
  }
371

372
  public Optional<Payloads> handleQuery(String queryName, Header header, Optional<Payloads> input) {
373
    return queryDispatcher.handleQuery(queryName, header, input);
1✔
374
  }
375

376
  public boolean isEveryHandlerFinished() {
377
    return updateDispatcher.getRunningUpdateHandlers().isEmpty()
1✔
378
        && signalDispatcher.getRunningSignalHandlers().isEmpty();
1✔
379
  }
380

381
  public WorkflowMetadata getWorkflowMetadata() {
382
    WorkflowMetadata.Builder workflowMetadata = WorkflowMetadata.newBuilder();
1✔
383
    WorkflowDefinition.Builder workflowDefinition = WorkflowDefinition.newBuilder();
1✔
384
    // Set the workflow type
385
    if (replayContext.getWorkflowType() != null
1!
386
        && replayContext.getWorkflowType().getName() != null) {
1!
387
      workflowDefinition.setType(replayContext.getWorkflowType().getName());
1✔
388
    }
389
    // Set built in queries
390
    workflowDefinition.addQueryDefinitions(
1✔
391
        WorkflowInteractionDefinition.newBuilder()
1✔
392
            .setName(QUERY_TYPE_STACK_TRACE)
1✔
393
            .setDescription("Current stack trace")
1✔
394
            .build());
1✔
395
    workflowDefinition.addQueryDefinitions(
1✔
396
        WorkflowInteractionDefinition.newBuilder()
1✔
397
            .setName(QUERY_TYPE_WORKFLOW_METADATA)
1✔
398
            .setDescription("Metadata about the workflow")
1✔
399
            .build());
1✔
400
    // Add user defined queries
401
    workflowDefinition.addAllQueryDefinitions(queryDispatcher.getQueryHandlers());
1✔
402
    // Add user defined signals
403
    workflowDefinition.addAllSignalDefinitions(signalDispatcher.getSignalHandlers());
1✔
404
    // Add user defined update handlers
405
    workflowDefinition.addAllUpdateDefinitions(updateDispatcher.getUpdateHandlers());
1✔
406
    // Set the workflow definition
407
    workflowMetadata.setDefinition(workflowDefinition.build());
1✔
408
    // Add the current workflow details
409
    if (currentDetails != null) {
1!
410
      workflowMetadata.setCurrentDetails(currentDetails);
1✔
411
    }
412
    return workflowMetadata.build();
1✔
413
  }
414

415
  private class ActivityCallback {
1✔
416
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
417

418
    public void invoke(Optional<Payloads> output, Failure failure) {
419
      if (failure != null) {
1✔
420
        runner.executeInWorkflowThread(
1✔
421
            "activity failure callback",
422
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
423
      } else {
424
        runner.executeInWorkflowThread(
1✔
425
            "activity completion callback", () -> result.complete(output));
1✔
426
      }
427
    }
1✔
428
  }
429

430
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
431
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
432

433
    @Override
434
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
435
      if (exception != null) {
1✔
436
        runner.executeInWorkflowThread(
1✔
437
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
438
      } else {
439
        runner.executeInWorkflowThread(
1✔
440
            "local activity completion callback", () -> result.complete(successOutput));
1✔
441
      }
442
    }
1✔
443
  }
444

445
  @Override
446
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
447
    ActivitySerializationContext serializationContext =
1✔
448
        new ActivitySerializationContext(
449
            replayContext.getNamespace(),
1✔
450
            replayContext.getWorkflowId(),
1✔
451
            replayContext.getWorkflowType().getName(),
1✔
452
            input.getActivityName(),
1✔
453
            replayContext.getTaskQueue(),
1✔
454
            true);
455
    DataConverter dataConverterWithActivityContext =
1✔
456
        dataConverter.withContext(serializationContext);
1✔
457
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
458

459
    long originalScheduledTime = System.currentTimeMillis();
1✔
460
    CompletablePromise<Optional<Payloads>> serializedResult =
461
        WorkflowInternal.newCompletablePromise();
1✔
462
    executeLocalActivityOverLocalRetryThreshold(
1✔
463
        input.getActivityName(),
1✔
464
        input.getOptions(),
1✔
465
        input.getHeader(),
1✔
466
        payloads,
467
        originalScheduledTime,
468
        1,
469
        null,
470
        serializedResult);
471

472
    // Avoid passing the input to the output handle as it causes the input to be retained for the
473
    // duration of the operation.
474
    Type resultType = input.getResultType();
1✔
475
    Class<R> resultClass = input.getResultClass();
1✔
476
    Promise<R> result =
1✔
477
        serializedResult.handle(
1✔
478
            (r, f) -> {
479
              if (f == null) {
1✔
480
                return resultClass != Void.TYPE
1✔
481
                    ? dataConverterWithActivityContext.fromPayloads(0, r, resultClass, resultType)
1✔
482
                    : null;
1✔
483
              } else {
484
                throw dataConverterWithActivityContext.failureToException(
1✔
485
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
486
              }
487
            });
488

489
    return new LocalActivityOutput<>(result);
1✔
490
  }
491

492
  public void executeLocalActivityOverLocalRetryThreshold(
493
      String activityTypeName,
494
      LocalActivityOptions options,
495
      Header header,
496
      Optional<Payloads> input,
497
      long originalScheduledTime,
498
      int attempt,
499
      @Nullable Failure previousExecutionFailure,
500
      CompletablePromise<Optional<Payloads>> result) {
501
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
502
        executeLocalActivityLocally(
1✔
503
            activityTypeName,
504
            options,
505
            header,
506
            input,
507
            originalScheduledTime,
508
            attempt,
509
            previousExecutionFailure);
510

511
    localExecutionResult.handle(
1✔
512
        (r, e) -> {
513
          if (e == null) {
1✔
514
            result.complete(r);
1✔
515
          } else {
516
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1!
517
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
518
                  (LocalActivityCallback.LocalActivityFailedException) e;
519
              @Nullable Duration backoff = laException.getBackoff();
1✔
520
              if (backoff != null) {
1✔
521
                WorkflowInternal.newTimer(backoff)
1✔
522
                    .thenApply(
1✔
523
                        unused -> {
524
                          executeLocalActivityOverLocalRetryThreshold(
1✔
525
                              activityTypeName,
526
                              options,
527
                              header,
528
                              input,
529
                              originalScheduledTime,
530
                              laException.getLastAttempt() + 1,
1✔
531
                              laException.getFailure(),
1✔
532
                              result);
533
                          return null;
1✔
534
                        });
535
              } else {
536
                // final failure, report back
537
                result.completeExceptionally(laException);
1✔
538
              }
539
            } else {
1✔
540
              // Only LocalActivityFailedException is expected
541
              String exceptionMessage =
×
542
                  String.format(
×
543
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
544
                      activityTypeName);
545
              log.warn(exceptionMessage, e);
×
546
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
547
            }
548
          }
549
          return null;
1✔
550
        });
551
  }
1✔
552

553
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
554
      String activityTypeName,
555
      LocalActivityOptions options,
556
      Header header,
557
      Optional<Payloads> input,
558
      long originalScheduledTime,
559
      int attempt,
560
      @Nullable Failure previousExecutionFailure) {
561

562
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
563
    ExecuteLocalActivityParameters params =
1✔
564
        constructExecuteLocalActivityParameters(
1✔
565
            activityTypeName,
566
            options,
567
            header,
568
            input,
569
            attempt,
570
            originalScheduledTime,
571
            previousExecutionFailure);
572
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
573
    CancellationScope.current()
1✔
574
        .getCancellationRequest()
1✔
575
        .thenApply(
1✔
576
            (reason) -> {
577
              cancellationCallback.apply();
×
578
              return null;
×
579
            });
580
    return callback.result;
1✔
581
  }
582

583
  private ExecuteActivityParameters constructExecuteActivityParameters(
584
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
585
    String taskQueue = options.getTaskQueue();
1✔
586
    if (taskQueue == null) {
1✔
587
      taskQueue = replayContext.getTaskQueue();
1✔
588
    }
589
    ScheduleActivityTaskCommandAttributes.Builder attributes =
590
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
591
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
592
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
593
            .setScheduleToStartTimeout(
1✔
594
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
595
            .setStartToCloseTimeout(
1✔
596
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
597
            .setScheduleToCloseTimeout(
1✔
598
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
599
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
600
            .setRequestEagerExecution(
1✔
601
                !options.isEagerExecutionDisabled()
1✔
602
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
603

604
    input.ifPresent(attributes::setInput);
1✔
605
    RetryOptions retryOptions = options.getRetryOptions();
1✔
606
    if (retryOptions != null) {
1✔
607
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
608
    }
609

610
    // Set the context value.  Use the context propagators from the ActivityOptions
611
    // if present, otherwise use the ones configured on the WorkflowContext
612
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
613
    if (propagators == null) {
1!
614
      propagators = this.contextPropagators;
1✔
615
    }
616
    io.temporal.api.common.v1.Header grpcHeader =
1✔
617
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
618
    attributes.setHeader(grpcHeader);
1✔
619

620
    if (options.getVersioningIntent() != null) {
1!
621
      attributes.setUseWorkflowBuildId(
1✔
622
          options
623
              .getVersioningIntent()
1✔
624
              .determineUseCompatibleFlag(
1✔
625
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
626
    }
627

628
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
629
  }
630

631
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
632
      String name,
633
      LocalActivityOptions options,
634
      Header header,
635
      Optional<Payloads> input,
636
      int attempt,
637
      long originalScheduledTime,
638
      @Nullable Failure previousExecutionFailure) {
639
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
640

641
    PollActivityTaskQueueResponse.Builder activityTask =
642
        PollActivityTaskQueueResponse.newBuilder()
1✔
643
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
644
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
645
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
646
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
647
            // used to pass scheduled time to the local activity code inside
648
            // ActivityExecutionContext#getInfo
649
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
650
            // into the LA queue
651
            .setScheduledTime(
1✔
652
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
653
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
654
            .setAttempt(attempt);
1✔
655

656
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
657
    if (scheduleToCloseTimeout != null) {
1✔
658
      activityTask.setScheduleToCloseTimeout(
1✔
659
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
660
    }
661

662
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
663
    if (startToCloseTimeout != null) {
1✔
664
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
665
    }
666

667
    io.temporal.api.common.v1.Header grpcHeader =
1✔
668
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
669
    activityTask.setHeader(grpcHeader);
1✔
670
    input.ifPresent(activityTask::setInput);
1✔
671
    RetryOptions retryOptions = options.getRetryOptions();
1✔
672
    activityTask.setRetryPolicy(
1✔
673
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
674
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
675
    if (localRetryThreshold == null) {
1✔
676
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
677
    }
678

679
    return new ExecuteLocalActivityParameters(
1✔
680
        activityTask,
681
        options.getScheduleToStartTimeout(),
1✔
682
        originalScheduledTime,
683
        previousExecutionFailure,
684
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
685
        localRetryThreshold);
686
  }
687

688
  @Override
689
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
690
    if (CancellationScope.current().isCancelRequested()) {
1!
691
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
692
      return new ChildWorkflowOutput<>(
×
693
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
694
    }
695

696
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
697
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
698

699
    DataConverter dataConverterWithChildWorkflowContext =
1✔
700
        dataConverter.withContext(
1✔
701
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
702
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
703

704
    @Nullable
705
    Memo memo =
706
        (input.getOptions().getMemo() != null)
1✔
707
            ? Memo.newBuilder()
1✔
708
                .putAllFields(
1✔
709
                    intoPayloadMap(
1✔
710
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
711
                .build()
1✔
712
            : null;
1✔
713

714
    @Nullable
715
    UserMetadata userMetadata =
1✔
716
        makeUserMetaData(
1✔
717
            input.getOptions().getStaticSummary(),
1✔
718
            input.getOptions().getStaticDetails(),
1✔
719
            dataConverterWithChildWorkflowContext);
720

721
    StartChildWorkflowExecutionParameters parameters =
1✔
722
        createChildWorkflowParameters(
1✔
723
            input.getWorkflowId(),
1✔
724
            input.getWorkflowType(),
1✔
725
            input.getOptions(),
1✔
726
            input.getHeader(),
1✔
727
            payloads,
728
            memo,
729
            userMetadata);
730

731
    Functions.Proc1<Exception> cancellationCallback =
1✔
732
        replayContext.startChildWorkflow(
1✔
733
            parameters,
734
            (execution, failure) -> {
735
              if (failure != null) {
1✔
736
                runner.executeInWorkflowThread(
1✔
737
                    "child workflow start failed callback",
738
                    () ->
739
                        executionPromise.completeExceptionally(
1✔
740
                            mapChildWorkflowException(
1✔
741
                                failure, dataConverterWithChildWorkflowContext)));
742
              } else {
743
                runner.executeInWorkflowThread(
1✔
744
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
745
              }
746
            },
1✔
747
            (result, failure) -> {
748
              if (failure != null) {
1✔
749
                runner.executeInWorkflowThread(
1✔
750
                    "child workflow failure callback",
751
                    () ->
752
                        resultPromise.completeExceptionally(
1✔
753
                            mapChildWorkflowException(
1✔
754
                                failure, dataConverterWithChildWorkflowContext)));
755
              } else {
756
                runner.executeInWorkflowThread(
1✔
757
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
758
              }
759
            });
1✔
760
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
761
    CancellationScope.current()
1✔
762
        .getCancellationRequest()
1✔
763
        .thenApply(
1✔
764
            (reason) -> {
765
              if (!callbackCalled.getAndSet(true)) {
1!
766
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
767
              }
768
              return null;
1✔
769
            });
770

771
    // Avoid passing the input to the output handle as it causes the input to be retained for the
772
    // duration of the operation.
773
    Type resultType = input.getResultType();
1✔
774
    Class<R> resultClass = input.getResultClass();
1✔
775
    Promise<R> result =
1✔
776
        resultPromise.thenApply(
1✔
777
            (b) ->
778
                dataConverterWithChildWorkflowContext.fromPayloads(0, b, resultClass, resultType));
1✔
779
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
780
  }
781

782
  @Override
783
  public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
784
      ExecuteNexusOperationInput<R> input) {
785
    Preconditions.checkArgument(
1✔
786
        input.getEndpoint() != null && !input.getEndpoint().isEmpty(), "endpoint must be set");
1!
787
    Preconditions.checkArgument(
1✔
788
        input.getService() != null && !input.getService().isEmpty(), "service must be set");
1!
789

790
    if (CancellationScope.current().isCancelRequested()) {
1!
791
      CanceledFailure canceledFailure =
×
792
          new CanceledFailure("execute nexus operation called from a canceled scope");
793
      return new ExecuteNexusOperationOutput<>(
×
794
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
795
    }
796

797
    CompletablePromise<NexusOperationExecution> operationPromise = Workflow.newPromise();
1✔
798
    CompletablePromise<Optional<Payload>> resultPromise = Workflow.newPromise();
1✔
799

800
    // Not using the context aware data converter because the context will not be available on the
801
    // worker side
802
    Optional<Payload> payload = dataConverter.toPayload(input.getArg());
1✔
803

804
    ScheduleNexusOperationCommandAttributes.Builder attributes =
805
        ScheduleNexusOperationCommandAttributes.newBuilder();
1✔
806
    payload.ifPresent(attributes::setInput);
1✔
807
    attributes.setOperation(input.getOperation());
1✔
808
    attributes.setService(input.getService());
1✔
809
    attributes.setEndpoint(input.getEndpoint());
1✔
810
    attributes.putAllNexusHeader(input.getHeaders());
1✔
811
    attributes.setScheduleToCloseTimeout(
1✔
812
        ProtobufTimeUtils.toProtoDuration(input.getOptions().getScheduleToCloseTimeout()));
1✔
813

814
    Functions.Proc1<Exception> cancellationCallback =
1✔
815
        replayContext.startNexusOperation(
1✔
816
            attributes.build(),
1✔
817
            (operationExec, failure) -> {
818
              if (failure != null) {
1✔
819
                runner.executeInWorkflowThread(
1✔
820
                    "nexus operation start failed callback",
821
                    () ->
822
                        operationPromise.completeExceptionally(
1✔
823
                            dataConverter.failureToException(failure)));
1✔
824
              } else {
825
                runner.executeInWorkflowThread(
1✔
826
                    "nexus operation started callback",
827
                    () ->
828
                        operationPromise.complete(new NexusOperationExecutionImpl(operationExec)));
1✔
829
              }
830
            },
1✔
831
            (Optional<Payload> result, Failure failure) -> {
832
              if (failure != null) {
1✔
833
                runner.executeInWorkflowThread(
1✔
834
                    "nexus operation failure callback",
835
                    () ->
836
                        resultPromise.completeExceptionally(
1✔
837
                            dataConverter.failureToException(failure)));
1✔
838
              } else {
839
                runner.executeInWorkflowThread(
1✔
840
                    "nexus operation completion callback", () -> resultPromise.complete(result));
1✔
841
              }
842
            });
1✔
843
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
844
    CancellationScope.current()
1✔
845
        .getCancellationRequest()
1✔
846
        .thenApply(
1✔
847
            (reason) -> {
848
              if (!callbackCalled.getAndSet(true)) {
1!
849
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
850
              }
851
              return null;
1✔
852
            });
853
    Promise<R> result =
1✔
854
        resultPromise.thenApply(
1✔
855
            (b) ->
856
                input.getResultClass() != Void.class
1✔
857
                    ? dataConverter.fromPayload(
1✔
858
                        b.get(), input.getResultClass(), input.getResultType())
1✔
859
                    : null);
1✔
860
    // We register an empty handler to make sure that this promise is always "accessed" and never
861
    // leads to a log about it being completed exceptionally and non-accessed.
862
    // The "main" operation promise is the one returned from the execute method and that
863
    // promise will always be logged if not accessed.
864
    operationPromise.handle((ex, failure) -> null);
1✔
865
    return new ExecuteNexusOperationOutput<>(result, operationPromise);
1✔
866
  }
867

868
  @SuppressWarnings("deprecation")
869
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
870
      String workflowId,
871
      String name,
872
      ChildWorkflowOptions options,
873
      Header header,
874
      Optional<Payloads> input,
875
      @Nullable Memo memo,
876
      @Nullable UserMetadata metadata) {
877
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
878
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
879
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
880
    attributes.setWorkflowId(workflowId);
1✔
881
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
882
    input.ifPresent(attributes::setInput);
1✔
883
    attributes.setWorkflowRunTimeout(
1✔
884
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
885
    attributes.setWorkflowExecutionTimeout(
1✔
886
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
887
    attributes.setWorkflowTaskTimeout(
1✔
888
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
889
    String taskQueue = options.getTaskQueue();
1✔
890
    if (taskQueue != null) {
1✔
891
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
892
    }
893
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
894
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
895
    }
896
    RetryOptions retryOptions = options.getRetryOptions();
1✔
897
    if (retryOptions != null) {
1✔
898
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
899
    }
900
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
901

902
    if (memo != null) {
1✔
903
      attributes.setMemo(memo);
1✔
904
    }
905

906
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
907
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1!
908
      if (options.getTypedSearchAttributes() != null) {
1!
909
        throw new IllegalArgumentException(
×
910
            "Cannot have both typed search attributes and search attributes");
911
      }
912
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
913
    } else if (options.getTypedSearchAttributes() != null
1✔
914
        && options.getTypedSearchAttributes().size() > 0) {
1!
915
      attributes.setSearchAttributes(
1✔
916
          SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
917
    }
918

919
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
920
    if (propagators == null) {
1!
921
      propagators = this.contextPropagators;
1✔
922
    }
923
    io.temporal.api.common.v1.Header grpcHeader =
1✔
924
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
925
    attributes.setHeader(grpcHeader);
1✔
926

927
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
928
    if (parentClosePolicy != null) {
1✔
929
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
930
    }
931

932
    if (options.getVersioningIntent() != null) {
1!
933
      attributes.setInheritBuildId(
1✔
934
          options
935
              .getVersioningIntent()
1✔
936
              .determineUseCompatibleFlag(
1✔
937
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
938
    }
939
    return new StartChildWorkflowExecutionParameters(
1✔
940
        attributes, options.getCancellationType(), metadata);
1✔
941
  }
942

943
  private static Header extractContextsAndConvertToBytes(
944
      List<ContextPropagator> contextPropagators) {
945
    if (contextPropagators == null) {
1!
946
      return null;
×
947
    }
948
    Map<String, Payload> result = new HashMap<>();
1✔
949
    for (ContextPropagator propagator : contextPropagators) {
1✔
950
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
951
    }
1✔
952
    return new Header(result);
1✔
953
  }
954

955
  private static RuntimeException mapChildWorkflowException(
956
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
957
    if (failure == null) {
1!
958
      return null;
×
959
    }
960
    if (failure instanceof TemporalFailure) {
1✔
961
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
962
    }
963
    if (failure instanceof CanceledFailure) {
1✔
964
      return (CanceledFailure) failure;
1✔
965
    }
966
    if (failure instanceof WorkflowException) {
1!
967
      return (RuntimeException) failure;
×
968
    }
969
    if (failure instanceof ChildWorkflowFailure) {
1✔
970
      return (ChildWorkflowFailure) failure;
1✔
971
    }
972
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1!
973
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
974
    }
975
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
976
    Throwable cause =
1✔
977
        dataConverterWithChildWorkflowContext.failureToException(
1✔
978
            taskFailed.getOriginalCauseFailure());
1✔
979
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
980
    return new ChildWorkflowFailure(
1✔
981
        exception.getInitiatedEventId(),
1✔
982
        exception.getStartedEventId(),
1✔
983
        exception.getWorkflowType(),
1✔
984
        exception.getExecution(),
1✔
985
        exception.getNamespace(),
1✔
986
        exception.getRetryState(),
1✔
987
        cause);
988
  }
989

990
  @Override
991
  public Promise<Void> newTimer(Duration delay) {
992
    return newTimer(delay, TimerOptions.newBuilder().build());
1✔
993
  }
994

995
  @Override
996
  public Promise<Void> newTimer(Duration delay, TimerOptions options) {
997
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
998

999
    @Nullable
1000
    UserMetadata userMetadata =
1✔
1001
        makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext);
1✔
1002

1003
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
1004
        replayContext.newTimer(
1✔
1005
            delay,
1006
            userMetadata,
1007
            (e) ->
1008
                runner.executeInWorkflowThread(
1✔
1009
                    "timer-callback",
1010
                    () -> {
1011
                      if (e == null) {
1✔
1012
                        p.complete(null);
1✔
1013
                      } else {
1014
                        p.completeExceptionally(e);
1✔
1015
                      }
1016
                    }));
1✔
1017
    CancellationScope.current()
1✔
1018
        .getCancellationRequest()
1✔
1019
        .thenApply(
1✔
1020
            (r) -> {
1021
              cancellationHandler.apply(new CanceledFailure(r));
1✔
1022
              return r;
1✔
1023
            });
1024
    return p;
1✔
1025
  }
1026

1027
  @Override
1028
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
1029
    try {
1030
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
1031
      replayContext.sideEffect(
1✔
1032
          () -> {
1033
            try {
1034
              readOnly = true;
1✔
1035
              R r = func.apply();
1✔
1036
              return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
1037
            } finally {
1038
              readOnly = false;
1✔
1039
            }
1040
          },
1041
          (p) ->
1042
              runner.executeInWorkflowThread(
1✔
1043
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
1044
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1045
          0, result.get(), resultClass, resultType);
1✔
1046
    } catch (Exception e) {
1✔
1047
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
1048
      // fail the workflow task by throwing an Error.
1049
      throw new Error(e);
1✔
1050
    }
1051
  }
1052

1053
  @Override
1054
  public <R> R mutableSideEffect(
1055
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1056
    try {
1057
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
1058
    } catch (Exception e) {
1✔
1059
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
1060
      // behavior. So fail the workflow task by throwing an Error.
1061
      throw new Error(e);
1✔
1062
    }
1063
  }
1064

1065
  private <R> R mutableSideEffectImpl(
1066
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
1067
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
1068
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
1069
    replayContext.mutableSideEffect(
1✔
1070
        id,
1071
        (storedBinary) -> {
1072
          Optional<R> stored =
1✔
1073
              storedBinary.map(
1✔
1074
                  (b) ->
1075
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1076
                          0, Optional.of(b), resultClass, resultType));
1✔
1077
          try {
1078
            readOnly = true;
1✔
1079
            R funcResult =
1✔
1080
                Objects.requireNonNull(
1✔
1081
                    func.apply(), "mutableSideEffect function " + "returned null");
1✔
1082
            if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
1083
              unserializedResult.set(funcResult);
1✔
1084
              return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
1085
            }
1086
            return Optional.empty(); // returned only when value doesn't need to be updated
1✔
1087
          } finally {
1088
            readOnly = false;
1✔
1089
          }
1090
        },
1091
        (p) ->
1092
            runner.executeInWorkflowThread(
1✔
1093
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
1094

1095
    if (!result.get().isPresent()) {
1!
1096
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
1097
    }
1098
    // An optimization that avoids unnecessary deserialization of the result.
1099
    R unserialized = unserializedResult.get();
1✔
1100
    if (unserialized != null) {
1✔
1101
      return unserialized;
1✔
1102
    }
1103
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1104
        0, result.get(), resultClass, resultType);
1✔
1105
  }
1106

1107
  @Override
1108
  public int getVersion(String changeId, int minSupported, int maxSupported) {
1109
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
1110
    boolean markerExists =
1✔
1111
        replayContext.getVersion(
1✔
1112
            changeId,
1113
            minSupported,
1114
            maxSupported,
1115
            (v, e) ->
1116
                runner.executeInWorkflowThread(
1✔
1117
                    "version-callback",
1118
                    () -> {
1119
                      if (v != null) {
1✔
1120
                        result.complete(v);
1✔
1121
                      } else {
1122
                        result.completeExceptionally(e);
1✔
1123
                      }
1124
                    }));
1✔
1125
    /*
1126
     * If we are replaying a workflow and encounter a getVersion call it is possible that this call did not exist
1127
     * on the original execution. If the call did not exist on the original execution then we cannot block on results
1128
     * because it can lead to non-deterministic scheduling.
1129
     * */
1130
    if (replayContext.isReplaying()
1✔
1131
        && !markerExists
1132
        && replayContext.tryUseSdkFlag(SdkFlag.SKIP_YIELD_ON_DEFAULT_VERSION)
1✔
1133
        && minSupported == DEFAULT_VERSION) {
1134
      return DEFAULT_VERSION;
1✔
1135
    }
1136

1137
    try {
1138
      return result.get();
1✔
1139
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
1140
      throw new UnsupportedVersion(ex);
1✔
1141
    }
1142
  }
1143

1144
  @Override
1145
  public void registerQuery(RegisterQueryInput request) {
1146
    queryDispatcher.registerQueryHandlers(request);
1✔
1147
  }
1✔
1148

1149
  @Override
1150
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
1151
    signalDispatcher.registerSignalHandlers(input);
1✔
1152
  }
1✔
1153

1154
  @Override
1155
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
1156
    updateDispatcher.registerUpdateHandlers(input);
1✔
1157
  }
1✔
1158

1159
  @Override
1160
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
1161
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
1162
  }
1✔
1163

1164
  @Override
1165
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
1166
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
1167
  }
1✔
1168

1169
  @Override
1170
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
1171
    updateDispatcher.registerDynamicUpdateHandler(input);
1✔
1172
  }
1✔
1173

1174
  @Override
1175
  public UUID randomUUID() {
1176
    return replayContext.randomUUID();
1✔
1177
  }
1178

1179
  @Override
1180
  public Random newRandom() {
1181
    return replayContext.newRandom();
1✔
1182
  }
1183

1184
  public DataConverter getDataConverter() {
1185
    return dataConverter;
×
1186
  }
1187

1188
  public DataConverter getDataConverterWithCurrentWorkflowContext() {
1189
    return dataConverterWithCurrentWorkflowContext;
1✔
1190
  }
1191

1192
  boolean isReplaying() {
1193
    return replayContext.isReplaying();
1✔
1194
  }
1195

1196
  boolean isReadOnly() {
1197
    return readOnly;
1✔
1198
  }
1199

1200
  void setReadOnly(boolean readOnly) {
1201
    this.readOnly = readOnly;
1✔
1202
  }
1✔
1203

1204
  @Override
1205
  public Map<Long, SignalHandlerInfo> getRunningSignalHandlers() {
1206
    return signalDispatcher.getRunningSignalHandlers();
1✔
1207
  }
1208

1209
  @Override
1210
  public Map<String, UpdateHandlerInfo> getRunningUpdateHandlers() {
1211
    return updateDispatcher.getRunningUpdateHandlers();
1✔
1212
  }
1213

1214
  @Override
1215
  public ReplayWorkflowContext getReplayContext() {
1216
    return replayContext;
1✔
1217
  }
1218

1219
  @Override
1220
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
1221
    WorkflowExecution childExecution = input.getExecution();
1✔
1222
    DataConverter dataConverterWithChildWorkflowContext =
1✔
1223
        dataConverter.withContext(
1✔
1224
            new WorkflowSerializationContext(
1225
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
1226
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
1227
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
1228
    attributes.setSignalName(input.getSignalName());
1✔
1229
    attributes.setExecution(childExecution);
1✔
1230
    attributes.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));
1✔
1231
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
1232
    payloads.ifPresent(attributes::setInput);
1✔
1233
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
1234
    Functions.Proc1<Exception> cancellationCallback =
1✔
1235
        replayContext.signalExternalWorkflowExecution(
1✔
1236
            attributes,
1237
            (output, failure) -> {
1238
              if (failure != null) {
1✔
1239
                runner.executeInWorkflowThread(
1✔
1240
                    "child workflow failure callback",
1241
                    () ->
1242
                        result.completeExceptionally(
1✔
1243
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
1244
              } else {
1245
                runner.executeInWorkflowThread(
1✔
1246
                    "child workflow completion callback", () -> result.complete(output));
1✔
1247
              }
1248
            });
1✔
1249
    CancellationScope.current()
1✔
1250
        .getCancellationRequest()
1✔
1251
        .thenApply(
1✔
1252
            (reason) -> {
1253
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
1254
              return null;
1✔
1255
            });
1256
    return new SignalExternalOutput(result);
1✔
1257
  }
1258

1259
  @Override
1260
  public void sleep(Duration duration) {
1261
    newTimer(duration).get();
1✔
1262
  }
1✔
1263

1264
  @Override
1265
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1266
    Promise<Void> timer = newTimer(timeout);
1✔
1267
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1268
    return !timer.isCompleted();
1✔
1269
  }
1270

1271
  @Override
1272
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1273
    WorkflowThread.await(reason, unblockCondition);
1✔
1274
  }
1✔
1275

1276
  @SuppressWarnings("deprecation")
1277
  @Override
1278
  public void continueAsNew(ContinueAsNewInput input) {
1279
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1280
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1281
    String workflowType = input.getWorkflowType();
1✔
1282
    if (workflowType != null) {
1✔
1283
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1284
    }
1285
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1286
    if (options != null) {
1✔
1287
      if (options.getWorkflowRunTimeout() != null) {
1!
1288
        attributes.setWorkflowRunTimeout(
×
1289
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1290
      }
1291
      if (options.getWorkflowTaskTimeout() != null) {
1!
1292
        attributes.setWorkflowTaskTimeout(
×
1293
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1294
      }
1295
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1296
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1297
      }
1298
      if (options.getRetryOptions() != null) {
1✔
1299
        attributes.setRetryPolicy(toRetryPolicy(options.getRetryOptions()));
1✔
1300
      } else if (replayContext.getRetryOptions() != null) {
1✔
1301
        attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
1✔
1302
      }
1303
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1304
      if (searchAttributes != null && !searchAttributes.isEmpty()) {
1!
1305
        if (options.getTypedSearchAttributes() != null) {
×
1306
          throw new IllegalArgumentException(
×
1307
              "Cannot have typed search attributes and search attributes");
1308
        }
1309
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
×
1310
      } else if (options.getTypedSearchAttributes() != null
1✔
1311
          && options.getTypedSearchAttributes().size() > 0) {
1!
1312
        attributes.setSearchAttributes(
1✔
1313
            SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
1314
      }
1315
      Map<String, Object> memo = options.getMemo();
1✔
1316
      if (memo != null) {
1✔
1317
        attributes.setMemo(
1✔
1318
            Memo.newBuilder()
1✔
1319
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1320
      }
1321
      if (options.getVersioningIntent() != null) {
1!
1322
        attributes.setInheritBuildId(
×
1323
            options
1324
                .getVersioningIntent()
×
1325
                .determineUseCompatibleFlag(
×
1326
                    replayContext.getTaskQueue().equals(options.getTaskQueue())));
×
1327
      }
1328
    } else if (replayContext.getRetryOptions() != null) {
1✔
1329
      // Have to copy retry options as server doesn't copy them.
1330
      attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions()));
1✔
1331
    }
1332

1333
    List<ContextPropagator> propagators =
1334
        options != null && options.getContextPropagators() != null
1!
1335
            ? options.getContextPropagators()
×
1336
            : this.contextPropagators;
1✔
1337
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1338
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1339
    attributes.setHeader(grpcHeader);
1✔
1340

1341
    Optional<Payloads> payloads =
1✔
1342
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1343
    payloads.ifPresent(attributes::setInput);
1✔
1344

1345
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1346
    WorkflowThread.exit();
×
1347
  }
×
1348

1349
  @Override
1350
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1351
    CompletablePromise<Void> result = Workflow.newPromise();
×
1352
    replayContext.requestCancelExternalWorkflowExecution(
×
1353
        input.getExecution(),
×
1354
        (r, exception) -> {
1355
          if (exception == null) {
×
1356
            result.complete(null);
×
1357
          } else {
1358
            result.completeExceptionally(exception);
×
1359
          }
1360
        });
×
1361
    return new CancelWorkflowOutput(result);
×
1362
  }
1363

1364
  @Override
1365
  public Scope getMetricsScope() {
1366
    return replayContext.getMetricsScope();
1✔
1367
  }
1368

1369
  public boolean isLoggingEnabledInReplay() {
1370
    return replayContext.getEnableLoggingInReplay();
1✔
1371
  }
1372

1373
  @Override
1374
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1375
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1!
1376
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1377
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1378
    replayContext.upsertSearchAttributes(attr);
1✔
1379
  }
1✔
1380

1381
  @Override
1382
  public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
1383
    SearchAttributes attr = SearchAttributesUtil.encodeTypedUpdates(searchAttributeUpdates);
1✔
1384
    replayContext.upsertSearchAttributes(attr);
1✔
1385
  }
1✔
1386

1387
  @Override
1388
  public void upsertMemo(Map<String, Object> memo) {
1389
    Preconditions.checkArgument(memo != null, "null memo");
1!
1390
    Preconditions.checkArgument(!memo.isEmpty(), "empty memo");
1!
1391
    replayContext.upsertMemo(
1✔
1392
        Memo.newBuilder()
1✔
1393
            .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo))
1✔
1394
            .build());
1✔
1395
  }
1✔
1396

1397
  @Nonnull
1398
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1399
    return runner.newWorkflowThread(runnable, false, name);
1✔
1400
  }
1401

1402
  @Nonnull
1403
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1404
    return runner.newCallbackThread(runnable, name);
1✔
1405
  }
1406

1407
  @Override
1408
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1409
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1410
  }
1411

1412
  @Override
1413
  public long currentTimeMillis() {
1414
    return replayContext.currentTimeMillis();
1✔
1415
  }
1416

1417
  /**
1418
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1419
   * thread and should be replaced with another specific implementation during initialization stage
1420
   * {@code workflow.initialize()} performed inside the workflow root thread.
1421
   *
1422
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1423
   */
1424
  private static final class InitialWorkflowInboundCallsInterceptor
1425
      extends BaseRootWorkflowInboundCallsInterceptor {
1426

1427
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1428
      super(workflowContext);
1✔
1429
    }
1✔
1430

1431
    @Override
1432
    public WorkflowOutput execute(WorkflowInput input) {
1433
      throw new UnsupportedOperationException(
×
1434
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1435
              + "before #execute can be called");
1436
    }
1437
  }
1438

1439
  @Nonnull
1440
  @Override
1441
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1442
    return workflowImplementationOptions;
1✔
1443
  }
1444

1445
  @Override
1446
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1447
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1448
  }
1449

1450
  @Nullable
1451
  @Override
1452
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1453
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1454
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1455
  }
1456

1457
  @Override
1458
  public List<ContextPropagator> getContextPropagators() {
1459
    return contextPropagators;
1✔
1460
  }
1461

1462
  @Override
1463
  public Map<String, Object> getPropagatedContexts() {
1464
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1465
      return new HashMap<>();
1✔
1466
    }
1467

1468
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1469
    Map<String, Object> contextData = new HashMap<>();
1✔
1470
    for (ContextPropagator propagator : contextPropagators) {
1✔
1471
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1472
    }
1✔
1473

1474
    return contextData;
1✔
1475
  }
1476

1477
  public void setCurrentUpdateInfo(UpdateInfo updateInfo) {
1478
    currentUpdateInfo.set(updateInfo);
1✔
1479
  }
1✔
1480

1481
  public void setCurrentDetails(String details) {
1482
    currentDetails = details;
1✔
1483
  }
1✔
1484

1485
  @Nullable
1486
  public String getCurrentDetails() {
1487
    return currentDetails;
1✔
1488
  }
1489

1490
  public Optional<UpdateInfo> getCurrentUpdateInfo() {
1491
    return Optional.ofNullable(currentUpdateInfo.get());
1✔
1492
  }
1493

1494
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1495
  private static class FailureWrapperException extends RuntimeException {
1496
    private final Failure failure;
1497

1498
    public FailureWrapperException(Failure failure) {
1✔
1499
      this.failure = failure;
1✔
1500
    }
1✔
1501

1502
    public Failure getFailure() {
1503
      return failure;
1✔
1504
    }
1505
  }
1506
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc