• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #175

pending completion
#175

push

github-actions

web-flow
Worker / Build Id versioning (#1786)

Implement new worker build id based versioning feature

236 of 236 new or added lines in 24 files covered. (100.0%)

18343 of 23697 relevant lines covered (77.41%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.67
/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.sync;
22

23
import static io.temporal.internal.common.HeaderUtils.intoPayloadMap;
24
import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc;
25
import static io.temporal.internal.common.SerializerUtils.toRetryPolicy;
26

27
import com.google.common.base.MoreObjects;
28
import com.google.common.base.Preconditions;
29
import com.uber.m3.tally.Scope;
30
import io.temporal.activity.ActivityOptions;
31
import io.temporal.activity.LocalActivityOptions;
32
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
33
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
34
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
35
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
36
import io.temporal.api.common.v1.ActivityType;
37
import io.temporal.api.common.v1.Memo;
38
import io.temporal.api.common.v1.Payload;
39
import io.temporal.api.common.v1.Payloads;
40
import io.temporal.api.common.v1.SearchAttributes;
41
import io.temporal.api.common.v1.WorkflowExecution;
42
import io.temporal.api.common.v1.WorkflowType;
43
import io.temporal.api.enums.v1.ParentClosePolicy;
44
import io.temporal.api.failure.v1.Failure;
45
import io.temporal.api.history.v1.HistoryEvent;
46
import io.temporal.api.taskqueue.v1.TaskQueue;
47
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
48
import io.temporal.client.WorkflowException;
49
import io.temporal.common.RetryOptions;
50
import io.temporal.common.SearchAttributeUpdate;
51
import io.temporal.common.context.ContextPropagator;
52
import io.temporal.common.converter.DataConverter;
53
import io.temporal.common.interceptors.Header;
54
import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor;
55
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor;
56
import io.temporal.failure.*;
57
import io.temporal.internal.common.ActivityOptionUtils;
58
import io.temporal.internal.common.OptionsUtils;
59
import io.temporal.internal.common.ProtobufTimeUtils;
60
import io.temporal.internal.common.SearchAttributesUtil;
61
import io.temporal.internal.replay.ChildWorkflowTaskFailedException;
62
import io.temporal.internal.replay.ReplayWorkflowContext;
63
import io.temporal.internal.replay.WorkflowContext;
64
import io.temporal.internal.statemachines.*;
65
import io.temporal.payload.context.ActivitySerializationContext;
66
import io.temporal.payload.context.WorkflowSerializationContext;
67
import io.temporal.worker.WorkflowImplementationOptions;
68
import io.temporal.workflow.CancellationScope;
69
import io.temporal.workflow.ChildWorkflowOptions;
70
import io.temporal.workflow.CompletablePromise;
71
import io.temporal.workflow.ContinueAsNewOptions;
72
import io.temporal.workflow.Functions;
73
import io.temporal.workflow.Functions.Func;
74
import io.temporal.workflow.Promise;
75
import io.temporal.workflow.Workflow;
76
import java.lang.reflect.Type;
77
import java.time.Duration;
78
import java.time.Instant;
79
import java.util.*;
80
import java.util.concurrent.atomic.AtomicBoolean;
81
import java.util.concurrent.atomic.AtomicReference;
82
import java.util.function.BiPredicate;
83
import java.util.function.Supplier;
84
import javax.annotation.Nonnull;
85
import javax.annotation.Nullable;
86
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
88

89
// TODO separate WorkflowOutboundCallsInterceptor functionality from this class into
90
// RootWorkflowOutboundInterceptor
91

92
/**
93
 * Root, the most top level WorkflowContext that unites all relevant contexts, handlers, options,
94
 * states, etc. It's created when SyncWorkflow which represent a context of a workflow type
95
 * definition on the worker meets ReplayWorkflowContext which contains information from the
96
 * WorkflowTask
97
 */
98
final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCallsInterceptor {
99
  private static final Logger log = LoggerFactory.getLogger(SyncWorkflowContext.class);
1✔
100

101
  private final String namespace;
102
  private final WorkflowExecution workflowExecution;
103
  private final WorkflowImplementationOptions workflowImplementationOptions;
104
  private final DataConverter dataConverter;
105
  // to be used in this class, should not be passed down. Pass the original #dataConverter instead
106
  private final DataConverter dataConverterWithCurrentWorkflowContext;
107
  private final List<ContextPropagator> contextPropagators;
108
  private final SignalDispatcher signalDispatcher;
109
  private final QueryDispatcher queryDispatcher;
110
  private final UpdateDispatcher updateDispatcher;
111

112
  // initialized later when these entities are created
113
  private ReplayWorkflowContext replayContext;
114
  private DeterministicRunner runner;
115

116
  private WorkflowInboundCallsInterceptor headInboundInterceptor;
117
  private WorkflowOutboundCallsInterceptor headOutboundInterceptor;
118

119
  private ActivityOptions defaultActivityOptions = null;
1✔
120
  private Map<String, ActivityOptions> activityOptionsMap;
121
  private LocalActivityOptions defaultLocalActivityOptions = null;
1✔
122
  private Map<String, LocalActivityOptions> localActivityOptionsMap;
123

124
  public SyncWorkflowContext(
125
      @Nonnull String namespace,
126
      @Nonnull WorkflowExecution workflowExecution,
127
      SignalDispatcher signalDispatcher,
128
      QueryDispatcher queryDispatcher,
129
      UpdateDispatcher updateDispatcher,
130
      @Nullable WorkflowImplementationOptions workflowImplementationOptions,
131
      DataConverter dataConverter,
132
      List<ContextPropagator> contextPropagators) {
1✔
133
    this.namespace = namespace;
1✔
134
    this.workflowExecution = workflowExecution;
1✔
135
    this.dataConverter = dataConverter;
1✔
136
    this.dataConverterWithCurrentWorkflowContext =
1✔
137
        dataConverter.withContext(
1✔
138
            new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId()));
1✔
139
    this.contextPropagators = contextPropagators;
1✔
140
    this.signalDispatcher = signalDispatcher;
1✔
141
    this.queryDispatcher = queryDispatcher;
1✔
142
    this.updateDispatcher = updateDispatcher;
1✔
143
    if (workflowImplementationOptions != null) {
1✔
144
      this.defaultActivityOptions = workflowImplementationOptions.getDefaultActivityOptions();
1✔
145
      this.activityOptionsMap = new HashMap<>(workflowImplementationOptions.getActivityOptions());
1✔
146
      this.defaultLocalActivityOptions =
1✔
147
          workflowImplementationOptions.getDefaultLocalActivityOptions();
1✔
148
      this.localActivityOptionsMap =
1✔
149
          new HashMap<>(workflowImplementationOptions.getLocalActivityOptions());
1✔
150
    }
151
    this.workflowImplementationOptions =
1✔
152
        workflowImplementationOptions == null
1✔
153
            ? WorkflowImplementationOptions.getDefaultInstance()
1✔
154
            : workflowImplementationOptions;
1✔
155
    // initial values for headInboundInterceptor and headOutboundInterceptor until they initialized
156
    // with actual interceptors through #initHeadInboundCallsInterceptor and
157
    // #initHeadOutboundCallsInterceptor during initialization phase.
158
    // See workflow.initialize() performed inside the workflow root thread inside
159
    // SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
160
    this.headInboundInterceptor = new InitialWorkflowInboundCallsInterceptor(this);
1✔
161
    this.headOutboundInterceptor = this;
1✔
162
  }
1✔
163

164
  public void setReplayContext(ReplayWorkflowContext context) {
165
    this.replayContext = context;
1✔
166
  }
1✔
167

168
  /**
169
   * Using setter, as runner is initialized with this context, so it is not ready during
170
   * construction of this.
171
   */
172
  public void setRunner(DeterministicRunner runner) {
173
    this.runner = runner;
1✔
174
  }
1✔
175

176
  public DeterministicRunner getRunner() {
177
    return runner;
×
178
  }
179

180
  public WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() {
181
    return headOutboundInterceptor;
1✔
182
  }
183

184
  public WorkflowInboundCallsInterceptor getWorkflowInboundInterceptor() {
185
    return headInboundInterceptor;
1✔
186
  }
187

188
  public void initHeadOutboundCallsInterceptor(WorkflowOutboundCallsInterceptor head) {
189
    headOutboundInterceptor = head;
1✔
190
  }
1✔
191

192
  public void initHeadInboundCallsInterceptor(WorkflowInboundCallsInterceptor head) {
193
    headInboundInterceptor = head;
1✔
194
    signalDispatcher.setInboundCallsInterceptor(head);
1✔
195
    queryDispatcher.setInboundCallsInterceptor(head);
1✔
196
    updateDispatcher.setInboundCallsInterceptor(head);
1✔
197
  }
1✔
198

199
  public ActivityOptions getDefaultActivityOptions() {
200
    return defaultActivityOptions;
1✔
201
  }
202

203
  public @Nonnull Map<String, ActivityOptions> getActivityOptions() {
204
    return activityOptionsMap != null
1✔
205
        ? Collections.unmodifiableMap(activityOptionsMap)
1✔
206
        : Collections.emptyMap();
×
207
  }
208

209
  public LocalActivityOptions getDefaultLocalActivityOptions() {
210
    return defaultLocalActivityOptions;
1✔
211
  }
212

213
  public @Nonnull Map<String, LocalActivityOptions> getLocalActivityOptions() {
214
    return localActivityOptionsMap != null
1✔
215
        ? Collections.unmodifiableMap(localActivityOptionsMap)
1✔
216
        : Collections.emptyMap();
×
217
  }
218

219
  public void setDefaultActivityOptions(ActivityOptions defaultActivityOptions) {
220
    this.defaultActivityOptions =
1✔
221
        (this.defaultActivityOptions == null)
1✔
222
            ? defaultActivityOptions
×
223
            : this.defaultActivityOptions.toBuilder()
1✔
224
                .mergeActivityOptions(defaultActivityOptions)
1✔
225
                .build();
1✔
226
  }
1✔
227

228
  public void applyActivityOptions(Map<String, ActivityOptions> activityTypeToOption) {
229
    Objects.requireNonNull(activityTypeToOption);
1✔
230
    if (this.activityOptionsMap == null) {
1✔
231
      this.activityOptionsMap = new HashMap<>(activityTypeToOption);
×
232
      return;
×
233
    }
234
    ActivityOptionUtils.mergePredefinedActivityOptions(activityOptionsMap, activityTypeToOption);
1✔
235
  }
1✔
236

237
  public void setDefaultLocalActivityOptions(LocalActivityOptions defaultLocalActivityOptions) {
238
    this.defaultLocalActivityOptions =
1✔
239
        (this.defaultLocalActivityOptions == null)
1✔
240
            ? defaultLocalActivityOptions
×
241
            : this.defaultLocalActivityOptions.toBuilder()
1✔
242
                .mergeActivityOptions(defaultLocalActivityOptions)
1✔
243
                .build();
1✔
244
  }
1✔
245

246
  public void applyLocalActivityOptions(Map<String, LocalActivityOptions> activityTypeToOption) {
247
    Objects.requireNonNull(activityTypeToOption);
1✔
248
    if (this.localActivityOptionsMap == null) {
1✔
249
      this.localActivityOptionsMap = new HashMap<>(activityTypeToOption);
×
250
      return;
×
251
    }
252
    ActivityOptionUtils.mergePredefinedLocalActivityOptions(
1✔
253
        localActivityOptionsMap, activityTypeToOption);
254
  }
1✔
255

256
  @Override
257
  public <T> ActivityOutput<T> executeActivity(ActivityInput<T> input) {
258
    ActivitySerializationContext serializationContext =
1✔
259
        new ActivitySerializationContext(
260
            replayContext.getNamespace(),
1✔
261
            replayContext.getWorkflowId(),
1✔
262
            replayContext.getWorkflowType().getName(),
1✔
263
            input.getActivityName(),
1✔
264
            // input.getOptions().getTaskQueue() may be not specified, workflow task queue is used
265
            // by the Server in this case
266
            MoreObjects.firstNonNull(
1✔
267
                input.getOptions().getTaskQueue(), replayContext.getTaskQueue()),
1✔
268
            false);
269
    DataConverter dataConverterWithActivityContext =
1✔
270
        dataConverter.withContext(serializationContext);
1✔
271
    Optional<Payloads> args = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
272

273
    ActivityOutput<Optional<Payloads>> output =
1✔
274
        executeActivityOnce(input.getActivityName(), input.getOptions(), input.getHeader(), args);
1✔
275

276
    return new ActivityOutput<>(
1✔
277
        output.getActivityId(),
1✔
278
        output
279
            .getResult()
1✔
280
            .handle(
1✔
281
                (r, f) -> {
282
                  if (f == null) {
1✔
283
                    return input.getResultType() != Void.TYPE
1✔
284
                        ? dataConverterWithActivityContext.fromPayloads(
1✔
285
                            0, r, input.getResultClass(), input.getResultType())
1✔
286
                        : null;
1✔
287
                  } else {
288
                    throw dataConverterWithActivityContext.failureToException(
1✔
289
                        ((FailureWrapperException) f).getFailure());
1✔
290
                  }
291
                }));
292
  }
293

294
  private ActivityOutput<Optional<Payloads>> executeActivityOnce(
295
      String activityTypeName, ActivityOptions options, Header header, Optional<Payloads> input) {
296
    ExecuteActivityParameters params =
1✔
297
        constructExecuteActivityParameters(activityTypeName, options, header, input);
1✔
298
    ActivityCallback callback = new ActivityCallback();
1✔
299
    ReplayWorkflowContext.ScheduleActivityTaskOutput activityOutput =
1✔
300
        replayContext.scheduleActivityTask(params, callback::invoke);
1✔
301
    CancellationScope.current()
1✔
302
        .getCancellationRequest()
1✔
303
        .thenApply(
1✔
304
            (reason) -> {
305
              activityOutput.getCancellationHandle().apply(new CanceledFailure(reason));
1✔
306
              return null;
1✔
307
            });
308
    return new ActivityOutput<>(activityOutput.getActivityId(), callback.result);
1✔
309
  }
310

311
  public void handleInterceptedSignal(WorkflowInboundCallsInterceptor.SignalInput input) {
312
    signalDispatcher.handleInterceptedSignal(input);
1✔
313
  }
1✔
314

315
  public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
316
    signalDispatcher.handleSignal(signalName, input, eventId);
1✔
317
  }
1✔
318

319
  public void handleValidateUpdate(String updateName, Optional<Payloads> input, long eventId) {
320
    updateDispatcher.handleValidateUpdate(updateName, input, eventId);
1✔
321
  }
1✔
322

323
  public Optional<Payloads> handleExecuteUpdate(
324
      String updateName, Optional<Payloads> input, long eventId) {
325
    return updateDispatcher.handleExecuteUpdate(updateName, input, eventId);
1✔
326
  }
327

328
  public void handleInterceptedValidateUpdate(WorkflowInboundCallsInterceptor.UpdateInput input) {
329
    updateDispatcher.handleInterceptedValidateUpdate(input);
1✔
330
  }
1✔
331

332
  public WorkflowInboundCallsInterceptor.UpdateOutput handleInterceptedExecuteUpdate(
333
      WorkflowInboundCallsInterceptor.UpdateInput input) {
334
    return updateDispatcher.handleInterceptedExecuteUpdate(input);
1✔
335
  }
336

337
  public WorkflowInboundCallsInterceptor.QueryOutput handleInterceptedQuery(
338
      WorkflowInboundCallsInterceptor.QueryInput input) {
339
    return queryDispatcher.handleInterceptedQuery(input);
1✔
340
  }
341

342
  public Optional<Payloads> handleQuery(String queryName, Optional<Payloads> input) {
343
    return queryDispatcher.handleQuery(queryName, input);
1✔
344
  }
345

346
  private class ActivityCallback {
1✔
347
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
348

349
    public void invoke(Optional<Payloads> output, Failure failure) {
350
      if (failure != null) {
1✔
351
        runner.executeInWorkflowThread(
1✔
352
            "activity failure callback",
353
            () -> result.completeExceptionally(new FailureWrapperException(failure)));
1✔
354
      } else {
355
        runner.executeInWorkflowThread(
1✔
356
            "activity completion callback", () -> result.complete(output));
1✔
357
      }
358
    }
1✔
359
  }
360

361
  private class LocalActivityCallbackImpl implements LocalActivityCallback {
1✔
362
    private final CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
363

364
    @Override
365
    public void apply(Optional<Payloads> successOutput, LocalActivityFailedException exception) {
366
      if (exception != null) {
1✔
367
        runner.executeInWorkflowThread(
1✔
368
            "local activity failure callback", () -> result.completeExceptionally(exception));
1✔
369
      } else {
370
        runner.executeInWorkflowThread(
1✔
371
            "local activity completion callback", () -> result.complete(successOutput));
1✔
372
      }
373
    }
1✔
374
  }
375

376
  @Override
377
  public <R> LocalActivityOutput<R> executeLocalActivity(LocalActivityInput<R> input) {
378
    ActivitySerializationContext serializationContext =
1✔
379
        new ActivitySerializationContext(
380
            replayContext.getNamespace(),
1✔
381
            replayContext.getWorkflowId(),
1✔
382
            replayContext.getWorkflowType().getName(),
1✔
383
            input.getActivityName(),
1✔
384
            replayContext.getTaskQueue(),
1✔
385
            true);
386
    DataConverter dataConverterWithActivityContext =
1✔
387
        dataConverter.withContext(serializationContext);
1✔
388
    Optional<Payloads> payloads = dataConverterWithActivityContext.toPayloads(input.getArgs());
1✔
389

390
    long originalScheduledTime = System.currentTimeMillis();
1✔
391
    CompletablePromise<Optional<Payloads>> serializedResult =
392
        WorkflowInternal.newCompletablePromise();
1✔
393
    executeLocalActivityOverLocalRetryThreshold(
1✔
394
        input.getActivityName(),
1✔
395
        input.getOptions(),
1✔
396
        input.getHeader(),
1✔
397
        payloads,
398
        originalScheduledTime,
399
        1,
400
        null,
401
        serializedResult);
402

403
    Promise<R> result =
1✔
404
        serializedResult.handle(
1✔
405
            (r, f) -> {
406
              if (f == null) {
1✔
407
                return input.getResultClass() != Void.TYPE
1✔
408
                    ? dataConverterWithActivityContext.fromPayloads(
1✔
409
                        0, r, input.getResultClass(), input.getResultType())
1✔
410
                    : null;
1✔
411
              } else {
412
                throw dataConverterWithActivityContext.failureToException(
1✔
413
                    ((LocalActivityCallback.LocalActivityFailedException) f).getFailure());
1✔
414
              }
415
            });
416

417
    return new LocalActivityOutput<>(result);
1✔
418
  }
419

420
  public void executeLocalActivityOverLocalRetryThreshold(
421
      String activityTypeName,
422
      LocalActivityOptions options,
423
      Header header,
424
      Optional<Payloads> input,
425
      long originalScheduledTime,
426
      int attempt,
427
      @Nullable Failure previousExecutionFailure,
428
      CompletablePromise<Optional<Payloads>> result) {
429
    CompletablePromise<Optional<Payloads>> localExecutionResult =
1✔
430
        executeLocalActivityLocally(
1✔
431
            activityTypeName,
432
            options,
433
            header,
434
            input,
435
            originalScheduledTime,
436
            attempt,
437
            previousExecutionFailure);
438

439
    localExecutionResult.handle(
1✔
440
        (r, e) -> {
441
          if (e == null) {
1✔
442
            result.complete(r);
1✔
443
          } else {
444
            if ((e instanceof LocalActivityCallback.LocalActivityFailedException)) {
1✔
445
              LocalActivityCallback.LocalActivityFailedException laException =
1✔
446
                  (LocalActivityCallback.LocalActivityFailedException) e;
447
              @Nullable Duration backoff = laException.getBackoff();
1✔
448
              if (backoff != null) {
1✔
449
                WorkflowInternal.newTimer(backoff)
1✔
450
                    .thenApply(
1✔
451
                        unused -> {
452
                          executeLocalActivityOverLocalRetryThreshold(
1✔
453
                              activityTypeName,
454
                              options,
455
                              header,
456
                              input,
457
                              originalScheduledTime,
458
                              laException.getLastAttempt() + 1,
1✔
459
                              laException.getFailure(),
1✔
460
                              result);
461
                          return null;
1✔
462
                        });
463
              } else {
464
                // final failure, report back
465
                result.completeExceptionally(laException);
1✔
466
              }
467
            } else {
1✔
468
              // Only LocalActivityFailedException is expected
469
              String exceptionMessage =
×
470
                  String.format(
×
471
                      "[BUG] Local Activity State Machine callback for activityType %s returned unexpected exception",
472
                      activityTypeName);
473
              log.warn(exceptionMessage, e);
×
474
              replayContext.failWorkflowTask(new IllegalStateException(exceptionMessage, e));
×
475
            }
476
          }
477
          return null;
1✔
478
        });
479
  }
1✔
480

481
  private CompletablePromise<Optional<Payloads>> executeLocalActivityLocally(
482
      String activityTypeName,
483
      LocalActivityOptions options,
484
      Header header,
485
      Optional<Payloads> input,
486
      long originalScheduledTime,
487
      int attempt,
488
      @Nullable Failure previousExecutionFailure) {
489

490
    LocalActivityCallbackImpl callback = new LocalActivityCallbackImpl();
1✔
491
    ExecuteLocalActivityParameters params =
1✔
492
        constructExecuteLocalActivityParameters(
1✔
493
            activityTypeName,
494
            options,
495
            header,
496
            input,
497
            attempt,
498
            originalScheduledTime,
499
            previousExecutionFailure);
500
    Functions.Proc cancellationCallback = replayContext.scheduleLocalActivityTask(params, callback);
1✔
501
    CancellationScope.current()
1✔
502
        .getCancellationRequest()
1✔
503
        .thenApply(
1✔
504
            (reason) -> {
505
              cancellationCallback.apply();
×
506
              return null;
×
507
            });
508
    return callback.result;
1✔
509
  }
510

511
  private ExecuteActivityParameters constructExecuteActivityParameters(
512
      String name, ActivityOptions options, Header header, Optional<Payloads> input) {
513
    String taskQueue = options.getTaskQueue();
1✔
514
    if (taskQueue == null) {
1✔
515
      taskQueue = replayContext.getTaskQueue();
1✔
516
    }
517
    ScheduleActivityTaskCommandAttributes.Builder attributes =
518
        ScheduleActivityTaskCommandAttributes.newBuilder()
1✔
519
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
520
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue))
1✔
521
            .setScheduleToStartTimeout(
1✔
522
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToStartTimeout()))
1✔
523
            .setStartToCloseTimeout(
1✔
524
                ProtobufTimeUtils.toProtoDuration(options.getStartToCloseTimeout()))
1✔
525
            .setScheduleToCloseTimeout(
1✔
526
                ProtobufTimeUtils.toProtoDuration(options.getScheduleToCloseTimeout()))
1✔
527
            .setHeartbeatTimeout(ProtobufTimeUtils.toProtoDuration(options.getHeartbeatTimeout()))
1✔
528
            .setRequestEagerExecution(
1✔
529
                !options.isEagerExecutionDisabled()
1✔
530
                    && Objects.equals(taskQueue, replayContext.getTaskQueue()));
1✔
531

532
    input.ifPresent(attributes::setInput);
1✔
533
    RetryOptions retryOptions = options.getRetryOptions();
1✔
534
    if (retryOptions != null) {
1✔
535
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
536
    }
537

538
    // Set the context value.  Use the context propagators from the ActivityOptions
539
    // if present, otherwise use the ones configured on the WorkflowContext
540
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
541
    if (propagators == null) {
1✔
542
      propagators = this.contextPropagators;
1✔
543
    }
544
    io.temporal.api.common.v1.Header grpcHeader =
1✔
545
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
546
    attributes.setHeader(grpcHeader);
1✔
547

548
    if (options.getVersioningIntent() != null) {
1✔
549
      attributes.setUseCompatibleVersion(
1✔
550
          options
551
              .getVersioningIntent()
1✔
552
              .determineUseCompatibleFlag(
1✔
553
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
554
    }
555

556
    return new ExecuteActivityParameters(attributes, options.getCancellationType());
1✔
557
  }
558

559
  private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters(
560
      String name,
561
      LocalActivityOptions options,
562
      Header header,
563
      Optional<Payloads> input,
564
      int attempt,
565
      long originalScheduledTime,
566
      @Nullable Failure previousExecutionFailure) {
567
    options = LocalActivityOptions.newBuilder(options).validateAndBuildWithDefaults();
1✔
568

569
    PollActivityTaskQueueResponse.Builder activityTask =
570
        PollActivityTaskQueueResponse.newBuilder()
1✔
571
            .setActivityId(this.replayContext.randomUUID().toString())
1✔
572
            .setWorkflowNamespace(this.replayContext.getNamespace())
1✔
573
            .setWorkflowType(this.replayContext.getWorkflowType())
1✔
574
            .setWorkflowExecution(this.replayContext.getWorkflowExecution())
1✔
575
            // used to pass scheduled time to the local activity code inside
576
            // ActivityExecutionContext#getInfo
577
            // setCurrentAttemptScheduledTime is called inside LocalActivityWorker before submitting
578
            // into the LA queue
579
            .setScheduledTime(
1✔
580
                ProtobufTimeUtils.toProtoTimestamp(Instant.ofEpochMilli(originalScheduledTime)))
1✔
581
            .setActivityType(ActivityType.newBuilder().setName(name))
1✔
582
            .setAttempt(attempt);
1✔
583

584
    Duration scheduleToCloseTimeout = options.getScheduleToCloseTimeout();
1✔
585
    if (scheduleToCloseTimeout != null) {
1✔
586
      activityTask.setScheduleToCloseTimeout(
1✔
587
          ProtobufTimeUtils.toProtoDuration(scheduleToCloseTimeout));
1✔
588
    }
589

590
    Duration startToCloseTimeout = options.getStartToCloseTimeout();
1✔
591
    if (startToCloseTimeout != null) {
1✔
592
      activityTask.setStartToCloseTimeout(ProtobufTimeUtils.toProtoDuration(startToCloseTimeout));
1✔
593
    }
594

595
    io.temporal.api.common.v1.Header grpcHeader =
1✔
596
        toHeaderGrpc(header, extractContextsAndConvertToBytes(contextPropagators));
1✔
597
    activityTask.setHeader(grpcHeader);
1✔
598
    input.ifPresent(activityTask::setInput);
1✔
599
    RetryOptions retryOptions = options.getRetryOptions();
1✔
600
    activityTask.setRetryPolicy(
1✔
601
        toRetryPolicy(RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults()));
1✔
602
    Duration localRetryThreshold = options.getLocalRetryThreshold();
1✔
603
    if (localRetryThreshold == null) {
1✔
604
      localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3);
1✔
605
    }
606

607
    return new ExecuteLocalActivityParameters(
1✔
608
        activityTask,
609
        options.getScheduleToStartTimeout(),
1✔
610
        originalScheduledTime,
611
        previousExecutionFailure,
612
        options.isDoNotIncludeArgumentsIntoMarker(),
1✔
613
        localRetryThreshold);
614
  }
615

616
  @Override
617
  public <R> ChildWorkflowOutput<R> executeChildWorkflow(ChildWorkflowInput<R> input) {
618
    if (CancellationScope.current().isCancelRequested()) {
1✔
619
      CanceledFailure canceledFailure = new CanceledFailure("execute called from a canceled scope");
×
620
      return new ChildWorkflowOutput<>(
×
621
          Workflow.newFailedPromise(canceledFailure), Workflow.newFailedPromise(canceledFailure));
×
622
    }
623

624
    CompletablePromise<WorkflowExecution> executionPromise = Workflow.newPromise();
1✔
625
    CompletablePromise<Optional<Payloads>> resultPromise = Workflow.newPromise();
1✔
626

627
    DataConverter dataConverterWithChildWorkflowContext =
1✔
628
        dataConverter.withContext(
1✔
629
            new WorkflowSerializationContext(replayContext.getNamespace(), input.getWorkflowId()));
1✔
630
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
631

632
    @Nullable
633
    Memo memo =
634
        (input.getOptions().getMemo() != null)
1✔
635
            ? Memo.newBuilder()
1✔
636
                .putAllFields(
1✔
637
                    intoPayloadMap(
1✔
638
                        dataConverterWithChildWorkflowContext, input.getOptions().getMemo()))
1✔
639
                .build()
1✔
640
            : null;
1✔
641

642
    StartChildWorkflowExecutionParameters parameters =
1✔
643
        createChildWorkflowParameters(
1✔
644
            input.getWorkflowId(),
1✔
645
            input.getWorkflowType(),
1✔
646
            input.getOptions(),
1✔
647
            input.getHeader(),
1✔
648
            payloads,
649
            memo);
650

651
    Functions.Proc1<Exception> cancellationCallback =
1✔
652
        replayContext.startChildWorkflow(
1✔
653
            parameters,
654
            (execution, failure) -> {
655
              if (failure != null) {
1✔
656
                runner.executeInWorkflowThread(
1✔
657
                    "child workflow start failed callback",
658
                    () ->
659
                        executionPromise.completeExceptionally(
1✔
660
                            mapChildWorkflowException(
1✔
661
                                failure, dataConverterWithChildWorkflowContext)));
662
              } else {
663
                runner.executeInWorkflowThread(
1✔
664
                    "child workflow started callback", () -> executionPromise.complete(execution));
1✔
665
              }
666
            },
1✔
667
            (result, failure) -> {
668
              if (failure != null) {
1✔
669
                runner.executeInWorkflowThread(
1✔
670
                    "child workflow failure callback",
671
                    () ->
672
                        resultPromise.completeExceptionally(
1✔
673
                            mapChildWorkflowException(
1✔
674
                                failure, dataConverterWithChildWorkflowContext)));
675
              } else {
676
                runner.executeInWorkflowThread(
1✔
677
                    "child workflow completion callback", () -> resultPromise.complete(result));
1✔
678
              }
679
            });
1✔
680
    AtomicBoolean callbackCalled = new AtomicBoolean();
1✔
681
    CancellationScope.current()
1✔
682
        .getCancellationRequest()
1✔
683
        .thenApply(
1✔
684
            (reason) -> {
685
              if (!callbackCalled.getAndSet(true)) {
1✔
686
                cancellationCallback.apply(new CanceledFailure(reason));
1✔
687
              }
688
              return null;
1✔
689
            });
690

691
    Promise<R> result =
1✔
692
        resultPromise.thenApply(
1✔
693
            (b) ->
694
                dataConverterWithChildWorkflowContext.fromPayloads(
1✔
695
                    0, b, input.getResultClass(), input.getResultType()));
1✔
696
    return new ChildWorkflowOutput<>(result, executionPromise);
1✔
697
  }
698

699
  @SuppressWarnings("deprecation")
700
  private StartChildWorkflowExecutionParameters createChildWorkflowParameters(
701
      String workflowId,
702
      String name,
703
      ChildWorkflowOptions options,
704
      Header header,
705
      Optional<Payloads> input,
706
      @Nullable Memo memo) {
707
    final StartChildWorkflowExecutionCommandAttributes.Builder attributes =
708
        StartChildWorkflowExecutionCommandAttributes.newBuilder()
1✔
709
            .setWorkflowType(WorkflowType.newBuilder().setName(name).build());
1✔
710
    attributes.setWorkflowId(workflowId);
1✔
711
    attributes.setNamespace(OptionsUtils.safeGet(options.getNamespace()));
1✔
712
    input.ifPresent(attributes::setInput);
1✔
713
    attributes.setWorkflowRunTimeout(
1✔
714
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
1✔
715
    attributes.setWorkflowExecutionTimeout(
1✔
716
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowExecutionTimeout()));
1✔
717
    attributes.setWorkflowTaskTimeout(
1✔
718
        ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
1✔
719
    String taskQueue = options.getTaskQueue();
1✔
720
    if (taskQueue != null) {
1✔
721
      attributes.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
722
    }
723
    if (options.getWorkflowIdReusePolicy() != null) {
1✔
724
      attributes.setWorkflowIdReusePolicy(options.getWorkflowIdReusePolicy());
1✔
725
    }
726
    RetryOptions retryOptions = options.getRetryOptions();
1✔
727
    if (retryOptions != null) {
1✔
728
      attributes.setRetryPolicy(toRetryPolicy(retryOptions));
1✔
729
    }
730
    attributes.setCronSchedule(OptionsUtils.safeGet(options.getCronSchedule()));
1✔
731

732
    if (memo != null) {
1✔
733
      attributes.setMemo(memo);
1✔
734
    }
735

736
    Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
737
    if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
738
      if (options.getTypedSearchAttributes() != null) {
1✔
739
        throw new IllegalArgumentException(
×
740
            "Cannot have both typed search attributes and search attributes");
741
      }
742
      attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
1✔
743
    } else if (options.getTypedSearchAttributes() != null) {
1✔
744
      attributes.setSearchAttributes(
1✔
745
          SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
746
    }
747

748
    List<ContextPropagator> propagators = options.getContextPropagators();
1✔
749
    if (propagators == null) {
1✔
750
      propagators = this.contextPropagators;
1✔
751
    }
752
    io.temporal.api.common.v1.Header grpcHeader =
1✔
753
        toHeaderGrpc(header, extractContextsAndConvertToBytes(propagators));
1✔
754
    attributes.setHeader(grpcHeader);
1✔
755

756
    ParentClosePolicy parentClosePolicy = options.getParentClosePolicy();
1✔
757
    if (parentClosePolicy != null) {
1✔
758
      attributes.setParentClosePolicy(parentClosePolicy);
1✔
759
    }
760

761
    if (options.getVersioningIntent() != null) {
1✔
762
      attributes.setUseCompatibleVersion(
1✔
763
          options
764
              .getVersioningIntent()
1✔
765
              .determineUseCompatibleFlag(
1✔
766
                  replayContext.getTaskQueue().equals(options.getTaskQueue())));
1✔
767
    }
768
    return new StartChildWorkflowExecutionParameters(attributes, options.getCancellationType());
1✔
769
  }
770

771
  private static Header extractContextsAndConvertToBytes(
772
      List<ContextPropagator> contextPropagators) {
773
    if (contextPropagators == null) {
1✔
774
      return null;
×
775
    }
776
    Map<String, Payload> result = new HashMap<>();
1✔
777
    for (ContextPropagator propagator : contextPropagators) {
1✔
778
      result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
1✔
779
    }
1✔
780
    return new Header(result);
1✔
781
  }
782

783
  private static RuntimeException mapChildWorkflowException(
784
      Exception failure, DataConverter dataConverterWithChildWorkflowContext) {
785
    if (failure == null) {
1✔
786
      return null;
×
787
    }
788
    if (failure instanceof TemporalFailure) {
1✔
789
      ((TemporalFailure) failure).setDataConverter(dataConverterWithChildWorkflowContext);
1✔
790
    }
791
    if (failure instanceof CanceledFailure) {
1✔
792
      return (CanceledFailure) failure;
1✔
793
    }
794
    if (failure instanceof WorkflowException) {
1✔
795
      return (RuntimeException) failure;
×
796
    }
797
    if (failure instanceof ChildWorkflowFailure) {
1✔
798
      return (ChildWorkflowFailure) failure;
1✔
799
    }
800
    if (!(failure instanceof ChildWorkflowTaskFailedException)) {
1✔
801
      return new IllegalArgumentException("Unexpected exception type: ", failure);
×
802
    }
803
    ChildWorkflowTaskFailedException taskFailed = (ChildWorkflowTaskFailedException) failure;
1✔
804
    Throwable cause =
1✔
805
        dataConverterWithChildWorkflowContext.failureToException(
1✔
806
            taskFailed.getOriginalCauseFailure());
1✔
807
    ChildWorkflowFailure exception = taskFailed.getException();
1✔
808
    return new ChildWorkflowFailure(
1✔
809
        exception.getInitiatedEventId(),
1✔
810
        exception.getStartedEventId(),
1✔
811
        exception.getWorkflowType(),
1✔
812
        exception.getExecution(),
1✔
813
        exception.getNamespace(),
1✔
814
        exception.getRetryState(),
1✔
815
        cause);
816
  }
817

818
  @Override
819
  public Promise<Void> newTimer(Duration delay) {
820
    CompletablePromise<Void> p = Workflow.newPromise();
1✔
821
    Functions.Proc1<RuntimeException> cancellationHandler =
1✔
822
        replayContext.newTimer(
1✔
823
            delay,
824
            (e) ->
825
                runner.executeInWorkflowThread(
1✔
826
                    "timer-callback",
827
                    () -> {
828
                      if (e == null) {
1✔
829
                        p.complete(null);
1✔
830
                      } else {
831
                        p.completeExceptionally(e);
1✔
832
                      }
833
                    }));
1✔
834
    CancellationScope.current()
1✔
835
        .getCancellationRequest()
1✔
836
        .thenApply(
1✔
837
            (r) -> {
838
              cancellationHandler.apply(new CanceledFailure(r));
1✔
839
              return r;
1✔
840
            });
841
    return p;
1✔
842
  }
843

844
  @Override
845
  public <R> R sideEffect(Class<R> resultClass, Type resultType, Func<R> func) {
846
    try {
847
      CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
848
      replayContext.sideEffect(
1✔
849
          () -> {
850
            R r = func.apply();
1✔
851
            return dataConverterWithCurrentWorkflowContext.toPayloads(r);
1✔
852
          },
853
          (p) ->
854
              runner.executeInWorkflowThread(
1✔
855
                  "side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
856
      return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
857
          0, result.get(), resultClass, resultType);
1✔
858
    } catch (Exception e) {
×
859
      // SideEffect cannot throw normal exception as it can lead to non-deterministic behavior. So
860
      // fail the workflow task by throwing an Error.
861
      throw new Error(e);
×
862
    }
863
  }
864

865
  @Override
866
  public <R> R mutableSideEffect(
867
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
868
    try {
869
      return mutableSideEffectImpl(id, resultClass, resultType, updated, func);
1✔
870
    } catch (Exception e) {
×
871
      // MutableSideEffect cannot throw normal exception as it can lead to non-deterministic
872
      // behavior. So fail the workflow task by throwing an Error.
873
      throw new Error(e);
×
874
    }
875
  }
876

877
  private <R> R mutableSideEffectImpl(
878
      String id, Class<R> resultClass, Type resultType, BiPredicate<R, R> updated, Func<R> func) {
879
    CompletablePromise<Optional<Payloads>> result = Workflow.newPromise();
1✔
880
    AtomicReference<R> unserializedResult = new AtomicReference<>();
1✔
881
    replayContext.mutableSideEffect(
1✔
882
        id,
883
        (storedBinary) -> {
884
          Optional<R> stored =
1✔
885
              storedBinary.map(
1✔
886
                  (b) ->
887
                      dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
888
                          0, Optional.of(b), resultClass, resultType));
1✔
889
          R funcResult =
1✔
890
              Objects.requireNonNull(func.apply(), "mutableSideEffect function " + "returned null");
1✔
891
          if (!stored.isPresent() || updated.test(stored.get(), funcResult)) {
1✔
892
            unserializedResult.set(funcResult);
1✔
893
            return dataConverterWithCurrentWorkflowContext.toPayloads(funcResult);
1✔
894
          }
895
          return Optional.empty(); // returned only when value doesn't need to be updated
1✔
896
        },
897
        (p) ->
898
            runner.executeInWorkflowThread(
1✔
899
                "mutable-side-effect-callback", () -> result.complete(Objects.requireNonNull(p))));
1✔
900

901
    if (!result.get().isPresent()) {
1✔
902
      throw new IllegalArgumentException("No value found for mutableSideEffectId=" + id);
×
903
    }
904
    // An optimization that avoids unnecessary deserialization of the result.
905
    R unserialized = unserializedResult.get();
1✔
906
    if (unserialized != null) {
1✔
907
      return unserialized;
1✔
908
    }
909
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
910
        0, result.get(), resultClass, resultType);
1✔
911
  }
912

913
  @Override
914
  public int getVersion(String changeId, int minSupported, int maxSupported) {
915
    CompletablePromise<Integer> result = Workflow.newPromise();
1✔
916
    replayContext.getVersion(
1✔
917
        changeId,
918
        minSupported,
919
        maxSupported,
920
        (v, e) ->
921
            runner.executeInWorkflowThread(
1✔
922
                "version-callback",
923
                () -> {
924
                  if (v != null) {
1✔
925
                    result.complete(v);
1✔
926
                  } else {
927
                    result.completeExceptionally(e);
1✔
928
                  }
929
                }));
1✔
930
    try {
931
      return result.get();
1✔
932
    } catch (UnsupportedVersion.UnsupportedVersionException ex) {
1✔
933
      throw new UnsupportedVersion(ex);
1✔
934
    }
935
  }
936

937
  @Override
938
  public void registerQuery(RegisterQueryInput request) {
939
    queryDispatcher.registerQueryHandlers(request);
1✔
940
  }
1✔
941

942
  @Override
943
  public void registerSignalHandlers(RegisterSignalHandlersInput input) {
944
    signalDispatcher.registerSignalHandlers(input);
1✔
945
  }
1✔
946

947
  @Override
948
  public void registerUpdateHandlers(RegisterUpdateHandlersInput input) {
949
    updateDispatcher.registerUpdateHandlers(input);
1✔
950
  }
1✔
951

952
  @Override
953
  public void registerDynamicSignalHandler(RegisterDynamicSignalHandlerInput input) {
954
    signalDispatcher.registerDynamicSignalHandler(input);
1✔
955
  }
1✔
956

957
  @Override
958
  public void registerDynamicQueryHandler(RegisterDynamicQueryHandlerInput input) {
959
    queryDispatcher.registerDynamicQueryHandler(input);
1✔
960
  }
1✔
961

962
  @Override
963
  public void registerDynamicUpdateHandler(RegisterDynamicUpdateHandlerInput input) {
964
    updateDispatcher.registerDynamicUpdateHandler(input);
×
965
  }
×
966

967
  @Override
968
  public UUID randomUUID() {
969
    return replayContext.randomUUID();
1✔
970
  }
971

972
  @Override
973
  public Random newRandom() {
974
    return replayContext.newRandom();
1✔
975
  }
976

977
  public DataConverter getDataConverter() {
978
    return dataConverter;
1✔
979
  }
980

981
  boolean isReplaying() {
982
    return replayContext.isReplaying();
1✔
983
  }
984

985
  @Override
986
  public ReplayWorkflowContext getReplayContext() {
987
    return replayContext;
1✔
988
  }
989

990
  @Override
991
  public SignalExternalOutput signalExternalWorkflow(SignalExternalInput input) {
992
    WorkflowExecution childExecution = input.getExecution();
1✔
993
    DataConverter dataConverterWithChildWorkflowContext =
1✔
994
        dataConverter.withContext(
1✔
995
            new WorkflowSerializationContext(
996
                replayContext.getNamespace(), childExecution.getWorkflowId()));
1✔
997
    SignalExternalWorkflowExecutionCommandAttributes.Builder attributes =
998
        SignalExternalWorkflowExecutionCommandAttributes.newBuilder();
1✔
999
    attributes.setSignalName(input.getSignalName());
1✔
1000
    attributes.setExecution(childExecution);
1✔
1001
    Optional<Payloads> payloads = dataConverterWithChildWorkflowContext.toPayloads(input.getArgs());
1✔
1002
    payloads.ifPresent(attributes::setInput);
1✔
1003
    CompletablePromise<Void> result = Workflow.newPromise();
1✔
1004
    Functions.Proc1<Exception> cancellationCallback =
1✔
1005
        replayContext.signalExternalWorkflowExecution(
1✔
1006
            attributes,
1007
            (output, failure) -> {
1008
              if (failure != null) {
1✔
1009
                runner.executeInWorkflowThread(
1✔
1010
                    "child workflow failure callback",
1011
                    () ->
1012
                        result.completeExceptionally(
1✔
1013
                            dataConverterWithChildWorkflowContext.failureToException(failure)));
1✔
1014
              } else {
1015
                runner.executeInWorkflowThread(
1✔
1016
                    "child workflow completion callback", () -> result.complete(output));
1✔
1017
              }
1018
            });
1✔
1019
    CancellationScope.current()
1✔
1020
        .getCancellationRequest()
1✔
1021
        .thenApply(
1✔
1022
            (reason) -> {
1023
              cancellationCallback.apply(new CanceledFailure(reason));
1✔
1024
              return null;
1✔
1025
            });
1026
    return new SignalExternalOutput(result);
1✔
1027
  }
1028

1029
  @Override
1030
  public void sleep(Duration duration) {
1031
    newTimer(duration).get();
1✔
1032
  }
1✔
1033

1034
  @Override
1035
  public boolean await(Duration timeout, String reason, Supplier<Boolean> unblockCondition) {
1036
    Promise<Void> timer = newTimer(timeout);
1✔
1037
    WorkflowThread.await(reason, () -> (timer.isCompleted() || unblockCondition.get()));
1✔
1038
    return !timer.isCompleted();
1✔
1039
  }
1040

1041
  @Override
1042
  public void await(String reason, Supplier<Boolean> unblockCondition) {
1043
    WorkflowThread.await(reason, unblockCondition);
1✔
1044
  }
1✔
1045

1046
  @SuppressWarnings("deprecation")
1047
  @Override
1048
  public void continueAsNew(ContinueAsNewInput input) {
1049
    ContinueAsNewWorkflowExecutionCommandAttributes.Builder attributes =
1050
        ContinueAsNewWorkflowExecutionCommandAttributes.newBuilder();
1✔
1051
    String workflowType = input.getWorkflowType();
1✔
1052
    if (workflowType != null) {
1✔
1053
      attributes.setWorkflowType(WorkflowType.newBuilder().setName(workflowType));
1✔
1054
    }
1055
    @Nullable ContinueAsNewOptions options = input.getOptions();
1✔
1056
    if (options != null) {
1✔
1057
      if (options.getWorkflowRunTimeout() != null) {
1✔
1058
        attributes.setWorkflowRunTimeout(
×
1059
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowRunTimeout()));
×
1060
      }
1061
      if (options.getWorkflowTaskTimeout() != null) {
1✔
1062
        attributes.setWorkflowTaskTimeout(
×
1063
            ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
×
1064
      }
1065
      if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
1✔
1066
        attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
1✔
1067
      }
1068
      Map<String, Object> searchAttributes = options.getSearchAttributes();
1✔
1069
      if (searchAttributes != null && !searchAttributes.isEmpty()) {
1✔
1070
        if (options.getTypedSearchAttributes() != null) {
×
1071
          throw new IllegalArgumentException(
×
1072
              "Cannot have typed search attributes and search attributes");
1073
        }
1074
        attributes.setSearchAttributes(SearchAttributesUtil.encode(searchAttributes));
×
1075
      } else if (options.getTypedSearchAttributes() != null) {
1✔
1076
        attributes.setSearchAttributes(
1✔
1077
            SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes()));
1✔
1078
      }
1079
      Map<String, Object> memo = options.getMemo();
1✔
1080
      if (memo != null) {
1✔
1081
        attributes.setMemo(
1✔
1082
            Memo.newBuilder()
1✔
1083
                .putAllFields(intoPayloadMap(dataConverterWithCurrentWorkflowContext, memo)));
1✔
1084
      }
1085
      if (options.getVersioningIntent() != null) {
1✔
1086
        attributes.setUseCompatibleVersion(
×
1087
            options
1088
                .getVersioningIntent()
×
1089
                .determineUseCompatibleFlag(
×
1090
                    replayContext.getTaskQueue().equals(options.getTaskQueue())));
×
1091
      }
1092
    }
1093

1094
    List<ContextPropagator> propagators =
1095
        options != null && options.getContextPropagators() != null
1✔
1096
            ? options.getContextPropagators()
×
1097
            : this.contextPropagators;
1✔
1098
    io.temporal.api.common.v1.Header grpcHeader =
1✔
1099
        toHeaderGrpc(input.getHeader(), extractContextsAndConvertToBytes(propagators));
1✔
1100
    attributes.setHeader(grpcHeader);
1✔
1101

1102
    Optional<Payloads> payloads =
1✔
1103
        dataConverterWithCurrentWorkflowContext.toPayloads(input.getArgs());
1✔
1104
    payloads.ifPresent(attributes::setInput);
1✔
1105

1106
    replayContext.continueAsNewOnCompletion(attributes.build());
1✔
1107
    WorkflowThread.exit();
×
1108
  }
×
1109

1110
  @Override
1111
  public CancelWorkflowOutput cancelWorkflow(CancelWorkflowInput input) {
1112
    CompletablePromise<Void> result = Workflow.newPromise();
×
1113
    replayContext.requestCancelExternalWorkflowExecution(
×
1114
        input.getExecution(),
×
1115
        (r, exception) -> {
1116
          if (exception == null) {
×
1117
            result.complete(null);
×
1118
          } else {
1119
            result.completeExceptionally(exception);
×
1120
          }
1121
        });
×
1122
    return new CancelWorkflowOutput(result);
×
1123
  }
1124

1125
  public Scope getMetricsScope() {
1126
    return replayContext.getMetricsScope();
1✔
1127
  }
1128

1129
  public boolean isLoggingEnabledInReplay() {
1130
    return replayContext.getEnableLoggingInReplay();
×
1131
  }
1132

1133
  @Override
1134
  public void upsertSearchAttributes(Map<String, ?> searchAttributes) {
1135
    Preconditions.checkArgument(searchAttributes != null, "null search attributes");
1✔
1136
    Preconditions.checkArgument(!searchAttributes.isEmpty(), "empty search attributes");
1✔
1137
    SearchAttributes attr = SearchAttributesUtil.encode(searchAttributes);
1✔
1138
    replayContext.upsertSearchAttributes(attr);
1✔
1139
  }
1✔
1140

1141
  @Override
1142
  public void upsertTypedSearchAttributes(SearchAttributeUpdate<?>... searchAttributeUpdates) {
1143
    SearchAttributes attr = SearchAttributesUtil.encodeTypedUpdates(searchAttributeUpdates);
1✔
1144
    replayContext.upsertSearchAttributes(attr);
1✔
1145
  }
1✔
1146

1147
  @Nonnull
1148
  public Object newWorkflowMethodThreadIntercepted(Runnable runnable, @Nullable String name) {
1149
    return runner.newWorkflowThread(runnable, false, name);
1✔
1150
  }
1151

1152
  @Nonnull
1153
  public Object newWorkflowCallbackThreadIntercepted(Runnable runnable, @Nullable String name) {
1154
    return runner.newCallbackThread(runnable, name);
1✔
1155
  }
1156

1157
  @Override
1158
  public Object newChildThread(Runnable runnable, boolean detached, String name) {
1159
    return runner.newWorkflowThread(runnable, detached, name);
1✔
1160
  }
1161

1162
  @Override
1163
  public long currentTimeMillis() {
1164
    return replayContext.currentTimeMillis();
1✔
1165
  }
1166

1167
  /**
1168
   * This WorkflowInboundCallsInterceptor is used during creation of the initial root workflow
1169
   * thread and should be replaced with another specific implementation during initialization stage
1170
   * {@code workflow.initialize()} performed inside the workflow root thread.
1171
   *
1172
   * @see SyncWorkflow#start(HistoryEvent, ReplayWorkflowContext)
1173
   */
1174
  private static final class InitialWorkflowInboundCallsInterceptor
1175
      extends BaseRootWorkflowInboundCallsInterceptor {
1176

1177
    public InitialWorkflowInboundCallsInterceptor(SyncWorkflowContext workflowContext) {
1178
      super(workflowContext);
1✔
1179
    }
1✔
1180

1181
    @Override
1182
    public WorkflowOutput execute(WorkflowInput input) {
1183
      throw new UnsupportedOperationException(
×
1184
          "SyncWorkflowContext should be initialized with a non-initial WorkflowInboundCallsInterceptor "
1185
              + "before #execute can be called");
1186
    }
1187
  }
1188

1189
  @Nonnull
1190
  @Override
1191
  public WorkflowImplementationOptions getWorkflowImplementationOptions() {
1192
    return workflowImplementationOptions;
1✔
1193
  }
1194

1195
  @Override
1196
  public Failure mapWorkflowExceptionToFailure(Throwable failure) {
1197
    return dataConverterWithCurrentWorkflowContext.exceptionToFailure(failure);
1✔
1198
  }
1199

1200
  @Nullable
1201
  @Override
1202
  public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
1203
    return dataConverterWithCurrentWorkflowContext.fromPayloads(
1✔
1204
        0, Optional.ofNullable(replayContext.getLastCompletionResult()), resultClass, resultType);
1✔
1205
  }
1206

1207
  @Override
1208
  public List<ContextPropagator> getContextPropagators() {
1209
    return contextPropagators;
1✔
1210
  }
1211

1212
  @Override
1213
  public Map<String, Object> getPropagatedContexts() {
1214
    if (contextPropagators == null || contextPropagators.isEmpty()) {
1✔
1215
      return new HashMap<>();
1✔
1216
    }
1217

1218
    Map<String, Payload> headerData = new HashMap<>(replayContext.getHeader());
1✔
1219
    Map<String, Object> contextData = new HashMap<>();
1✔
1220
    for (ContextPropagator propagator : contextPropagators) {
1✔
1221
      contextData.put(propagator.getName(), propagator.deserializeContext(headerData));
1✔
1222
    }
1✔
1223

1224
    return contextData;
1✔
1225
  }
1226

1227
  /** Simple wrapper over a failure just to allow completing the CompletablePromise as a failure */
1228
  private static class FailureWrapperException extends RuntimeException {
1229
    private final Failure failure;
1230

1231
    public FailureWrapperException(Failure failure) {
1✔
1232
      this.failure = failure;
1✔
1233
    }
1✔
1234

1235
    public Failure getFailure() {
1236
      return failure;
1✔
1237
    }
1238
  }
1239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc