• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #172

pending completion
#172

push

github-actions

web-flow
Update CODEOWNERS (#1773)

## What was changed
Update CODEOWNERS so that Security can own the Semgrep rules files and paths.

## Why?
We are adding Semgrep for static analysis to this repository, and only the security team should be able to approve exclusions from the policy.

## Checklist

How was this tested:
We ran this scanner on internal repos with this CODEOWNERS file and it worked as expected.

18029 of 22084 relevant lines covered (81.64%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.06
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
37
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
38
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
39
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
40
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
41
import static io.temporal.internal.testservice.StateMachines.State.NONE;
42
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
43
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
44
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
45

46
import com.google.common.base.Preconditions;
47
import com.google.protobuf.Any;
48
import com.google.protobuf.Duration;
49
import com.google.protobuf.InvalidProtocolBufferException;
50
import com.google.protobuf.Timestamp;
51
import com.google.protobuf.util.Durations;
52
import com.google.protobuf.util.Timestamps;
53
import io.grpc.Status;
54
import io.grpc.StatusRuntimeException;
55
import io.temporal.api.command.v1.CancelTimerCommandAttributes;
56
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
57
import io.temporal.api.command.v1.CompleteWorkflowExecutionCommandAttributes;
58
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
59
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
60
import io.temporal.api.command.v1.RequestCancelActivityTaskCommandAttributes;
61
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
62
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
63
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
64
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
65
import io.temporal.api.command.v1.StartTimerCommandAttributes;
66
import io.temporal.api.common.v1.Payloads;
67
import io.temporal.api.common.v1.RetryPolicy;
68
import io.temporal.api.common.v1.WorkflowExecution;
69
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
70
import io.temporal.api.enums.v1.EventType;
71
import io.temporal.api.enums.v1.RetryState;
72
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
73
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
74
import io.temporal.api.enums.v1.TimeoutType;
75
import io.temporal.api.errordetails.v1.QueryFailedFailure;
76
import io.temporal.api.failure.v1.ApplicationFailureInfo;
77
import io.temporal.api.failure.v1.Failure;
78
import io.temporal.api.failure.v1.TimeoutFailureInfo;
79
import io.temporal.api.history.v1.*;
80
import io.temporal.api.protocol.v1.Message;
81
import io.temporal.api.query.v1.WorkflowQueryResult;
82
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
83
import io.temporal.api.taskqueue.v1.TaskQueue;
84
import io.temporal.api.update.v1.*;
85
import io.temporal.api.workflowservice.v1.*;
86
import io.temporal.internal.common.ProtobufTimeUtils;
87
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
88
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
89
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
90
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
91
import io.temporal.serviceclient.StatusUtils;
92
import io.temporal.workflow.Functions;
93
import java.util.*;
94
import java.util.concurrent.CompletableFuture;
95
import java.util.concurrent.ForkJoinPool;
96
import javax.annotation.Nonnull;
97
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
99

100
class StateMachines {
101

102
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
103

104
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
105
      10L * 365 * 24 * 3600 * 1000;
106
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
107
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
108
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
109
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
110
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
111
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
112
  static final int NO_EVENT_ID = -1;
113

114
  enum State {
115
    NONE,
116
    INITIATED,
117
    STARTED,
118
    FAILED,
119
    TIMED_OUT,
120
    CANCELLATION_REQUESTED,
121
    CANCELED,
122
    COMPLETED,
123
    CONTINUED_AS_NEW,
124
    TERMINATED,
125
  }
126

127
  enum Action {
128
    INITIATE,
129
    START,
130
    FAIL,
131
    TIME_OUT,
132
    REQUEST_CANCELLATION,
133
    CANCEL,
134
    TERMINATE,
135
    UPDATE,
136
    COMPLETE,
137
    CONTINUE_AS_NEW,
138
    QUERY,
139
    UPDATE_WORKFLOW_EXECUTION,
140
  }
141

142
  static final class WorkflowData {
143
    Optional<TestServiceRetryState> retryState;
144
    Duration backoffStartInterval;
145
    String cronSchedule;
146
    Payloads lastCompletionResult;
147
    Optional<Failure> lastFailure;
148
    /**
149
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
150
     */
151
    final @Nonnull String firstExecutionRunId;
152
    /**
153
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
154
     */
155
    final @Nonnull String originalExecutionRunId;
156

157
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
158
    Optional<String> continuedExecutionRunId;
159

160
    Functions.Proc runTimerCancellationHandle;
161

162
    WorkflowData(
163
        Optional<TestServiceRetryState> retryState,
164
        Duration backoffStartInterval,
165
        String cronSchedule,
166
        Payloads lastCompletionResult,
167
        Optional<Failure> lastFailure,
168
        @Nonnull String firstExecutionRunId,
169
        @Nonnull String originalExecutionRunId,
170
        Optional<String> continuedExecutionRunId) {
171
      this.retryState = retryState;
172
      this.backoffStartInterval = backoffStartInterval;
173
      this.cronSchedule = cronSchedule;
174
      this.lastCompletionResult = lastCompletionResult;
175
      this.firstExecutionRunId =
176
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
177
      this.originalExecutionRunId =
178
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
179
      this.continuedExecutionRunId = continuedExecutionRunId;
180
      this.lastFailure = Objects.requireNonNull(lastFailure);
181
    }
182

183
    @Override
184
    public String toString() {
185
      return "WorkflowData{"
186
          + "retryState="
187
          + retryState
188
          + ", backoffStartInterval="
189
          + backoffStartInterval
190
          + ", cronSchedule='"
191
          + cronSchedule
192
          + '\''
193
          + ", lastCompletionResult="
194
          + lastCompletionResult
195
          + ", firstExecutionRunId='"
196
          + firstExecutionRunId
197
          + '\''
198
          + ", originalExecutionRunId='"
199
          + originalExecutionRunId
200
          + '\''
201
          + ", continuedExecutionRunId="
202
          + continuedExecutionRunId
203
          + '}';
204
    }
205
  }
206

207
  static final class WorkflowTaskData {
208

209
    final TestWorkflowStore store;
210

211
    boolean workflowCompleted;
212

213
    /** id of the last started event which completed successfully */
214
    long lastSuccessfulStartedEventId;
215

216
    final StartWorkflowExecutionRequest startRequest;
217

218
    long startedEventId = NO_EVENT_ID;
219

220
    PollWorkflowTaskQueueResponse.Builder workflowTask;
221

222
    /**
223
     * Events that are added during execution of a workflow task. They have to be buffered to be
224
     * added after the events generated by a workflow task. Without this the determinism will be
225
     * broken on replay.
226
     */
227
    final List<RequestContext> bufferedEvents = new ArrayList<>();
228

229
    /**
230
     * Update requests that are added during execution of a workflow task. They have to be buffered
231
     * to be added to the next workflow task.
232
     */
233
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
234

235
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
236

237
    long scheduledEventId = NO_EVENT_ID;
238

239
    int attempt = 0;
240

241
    /** Query requests received during workflow task processing (after start) */
242
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
243

244
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
245
        new HashMap<>();
246

247
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
248
      this.store = store;
249
      this.startRequest = startRequest;
250
    }
251

252
    void clear() {
253
      startedEventId = NO_EVENT_ID;
254
      workflowTask = null;
255
      scheduledEventId = NO_EVENT_ID;
256
      attempt = 0;
257
    }
258

259
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
260
      return Optional.ofNullable(
261
          updateRequest.getOrDefault(
262
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
263
    }
264

265
    @Override
266
    public String toString() {
267
      return "WorkflowTaskData{"
268
          + "store="
269
          + store
270
          + ", workflowCompleted="
271
          + workflowCompleted
272
          + ", lastSuccessfulStartedEventId="
273
          + lastSuccessfulStartedEventId
274
          + ", startRequest="
275
          + startRequest
276
          + ", startedEventId="
277
          + startedEventId
278
          + ", workflowTask="
279
          + workflowTask
280
          + ", bufferedEvents="
281
          + bufferedEvents
282
          + ", scheduledEventId="
283
          + scheduledEventId
284
          + ", attempt="
285
          + attempt
286
          + ", queryBuffer="
287
          + queryBuffer
288
          + ", consistentQueryRequests="
289
          + consistentQueryRequests
290
          + ", updateRequest="
291
          + updateRequest
292
          + ", updateRequestBuffer="
293
          + updateRequestBuffer
294
          + '}';
295
    }
296
  }
297

298
  static final class ActivityTaskData {
299

300
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
301
    ActivityTaskScheduledEventAttributes scheduledEvent;
302
    ActivityTask activityTask;
303

304
    final TestWorkflowStore store;
305

306
    long scheduledEventId = NO_EVENT_ID;
307
    long startedEventId = NO_EVENT_ID;
308
    public HistoryEvent startedEvent;
309
    Payloads heartbeatDetails;
310
    long lastHeartbeatTime;
311
    TestServiceRetryState retryState;
312
    Duration nextBackoffInterval;
313
    String identity;
314

315
    ActivityTaskData(
316
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
317
      this.store = store;
318
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
319
    }
320

321
    @Override
322
    public String toString() {
323
      return "ActivityTaskData{"
324
          + "startWorkflowExecutionRequest="
325
          + startWorkflowExecutionRequest
326
          + ", scheduledEvent="
327
          + scheduledEvent
328
          + ", activityTask="
329
          + activityTask
330
          + ", store="
331
          + store
332
          + ", scheduledEventId="
333
          + scheduledEventId
334
          + ", startedEventId="
335
          + startedEventId
336
          + ", startedEvent="
337
          + startedEvent
338
          + ", heartbeatDetails="
339
          + heartbeatDetails
340
          + ", lastHeartbeatTime="
341
          + lastHeartbeatTime
342
          + ", retryState="
343
          + retryState
344
          + ", nextBackoffInterval="
345
          + nextBackoffInterval
346
          + '}';
347
    }
348

349
    public int getAttempt() {
350
      return retryState != null ? retryState.getAttempt() : 1;
351
    }
352
  }
353

354
  static final class SignalExternalData {
355
    long initiatedEventId = NO_EVENT_ID;
356
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
357

358
    @Override
359
    public String toString() {
360
      return "SignalExternalData{"
361
          + "initiatedEventId="
362
          + initiatedEventId
363
          + ", initiatedEvent="
364
          + initiatedEvent
365
          + '}';
366
    }
367
  }
368

369
  static final class CancelExternalData {
370
    long initiatedEventId = NO_EVENT_ID;
371
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
372

373
    @Override
374
    public String toString() {
375
      return "CancelExternalData{"
376
          + "initiatedEventId="
377
          + initiatedEventId
378
          + ", initiatedEvent="
379
          + initiatedEvent
380
          + '}';
381
    }
382
  }
383

384
  static final class ChildWorkflowData {
385

386
    final TestWorkflowService service;
387
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
388
    long initiatedEventId;
389
    long startedEventId;
390
    WorkflowExecution execution;
391

392
    public ChildWorkflowData(TestWorkflowService service) {
393
      this.service = service;
394
    }
395

396
    @Override
397
    public String toString() {
398
      return "ChildWorkflowData{"
399
          + "service="
400
          + service
401
          + ", initiatedEvent="
402
          + initiatedEvent
403
          + ", initiatedEventId="
404
          + initiatedEventId
405
          + ", startedEventId="
406
          + startedEventId
407
          + ", execution="
408
          + execution
409
          + '}';
410
    }
411
  }
412

413
  static final class TimerData {
414
    TimerStartedEventAttributes startedEvent;
415
    public long startedEventId;
416

417
    @Override
418
    public String toString() {
419
      return "TimerData{"
420
          + "startedEvent="
421
          + startedEvent
422
          + ", startedEventId="
423
          + startedEventId
424
          + '}';
425
    }
426
  }
427

428
  /** Represents an accepted update workflow execution request */
429
  static final class UpdateWorkflowExecutionData {
430
    final String id;
431
    final CompletableFuture<UpdateWorkflowExecutionResponse> acceptance;
432
    final CompletableFuture<UpdateWorkflowExecutionResponse> complete;
433

434
    public UpdateWorkflowExecutionData(
435
        String id,
436
        CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
437
        CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
438
      this.id = id;
439
      this.acceptance = acceptance;
440
      this.complete = complete;
441
    }
442

443
    @Override
444
    public String toString() {
445
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
446
    }
447
  }
448

449
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
450
    return new StateMachine<>(data)
451
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
452
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
453
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
454
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
455
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
456
        .add(
457
            STARTED,
458
            REQUEST_CANCELLATION,
459
            CANCELLATION_REQUESTED,
460
            StateMachines::requestWorkflowCancellation)
461
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
462
        .add(
463
            CANCELLATION_REQUESTED,
464
            REQUEST_CANCELLATION,
465
            CANCELLATION_REQUESTED,
466
            StateMachines::noop)
467
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
468
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
469
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
470
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
471
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
472
  }
473

474
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
475
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
476
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
477
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
478
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
479
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
480
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
481
        // StateMachines::queryWhileScheduled)
482
        //        .add(
483
        //            INITIATED_QUERY_ONLY,
484
        //            INITIATE,
485
        //            INITIATED,
486
        //            StateMachines::convertQueryWorkflowTaskToReal)
487
        //        .add(
488
        //            INITIATED_QUERY_ONLY,
489
        //            START,
490
        //            STARTED_QUERY_ONLY,
491
        //            StateMachines::startQueryOnlyWorkflowTask)
492
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
493
        // StateMachines::needsWorkflowTask)
494
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
495
        // StateMachines::needsWorkflowTaskDueToQuery)
496
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
497
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
498
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
499
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
500
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
501
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
502
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
503
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
504
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
505
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
506
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
507
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
508
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
509
  }
510

511
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
512
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
513
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
514
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
515
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
516
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
517
        .add(
518
            INITIATED,
519
            REQUEST_CANCELLATION,
520
            CANCELLATION_REQUESTED,
521
            StateMachines::requestActivityCancellation)
522
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
523
        // Transitions to initiated in case of a retry
524
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
525
        // Transitions to initiated in case of a retry
526
        .add(
527
            STARTED,
528
            TIME_OUT,
529
            new State[] {TIMED_OUT, INITIATED},
530
            StateMachines::timeoutActivityTask)
531
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
532
        .add(
533
            STARTED,
534
            REQUEST_CANCELLATION,
535
            CANCELLATION_REQUESTED,
536
            StateMachines::requestActivityCancellation)
537
        .add(
538
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
539
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
540
        .add(
541
            CANCELLATION_REQUESTED,
542
            UPDATE,
543
            CANCELLATION_REQUESTED,
544
            StateMachines::heartbeatActivityTask)
545
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
546
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
547
  }
548

549
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
550
      TestWorkflowService service) {
551
    return new StateMachine<>(new ChildWorkflowData(service))
552
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
553
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
554
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
555
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
556
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
557
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
558
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
559
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
560
  }
561

562
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
563
      String updateId,
564
      CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
565
      CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
566
    return new StateMachine<>(new UpdateWorkflowExecutionData(updateId, acceptance, complete))
567
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
568
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
569
  }
570

571
  public static StateMachine<TimerData> newTimerStateMachine() {
572
    return new StateMachine<>(new TimerData())
573
        .add(NONE, START, STARTED, StateMachines::startTimer)
574
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
575
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
576
  }
577

578
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
579
    return new StateMachine<>(new SignalExternalData())
580
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
581
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
582
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
583
  }
584

585
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
586
    return new StateMachine<>(new CancelExternalData())
587
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
588
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
589
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
590
  }
591

592
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
593

594
  private static void timeoutChildWorkflow(
595
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
596
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
597
    ChildWorkflowExecutionTimedOutEventAttributes a =
598
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
599
            .setNamespace(ie.getNamespace())
600
            .setStartedEventId(data.startedEventId)
601
            .setWorkflowExecution(data.execution)
602
            .setWorkflowType(ie.getWorkflowType())
603
            .setRetryState(retryState)
604
            .setInitiatedEventId(data.initiatedEventId)
605
            .build();
606
    HistoryEvent event =
607
        HistoryEvent.newBuilder()
608
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
609
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
610
            .build();
611
    ctx.addEvent(event);
612
  }
613

614
  private static void startChildWorkflowFailed(
615
      RequestContext ctx,
616
      ChildWorkflowData data,
617
      StartChildWorkflowExecutionFailedEventAttributes a,
618
      long notUsed) {
619
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
620
        a.toBuilder()
621
            .setInitiatedEventId(data.initiatedEventId)
622
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
623
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
624
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
625
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
626
    }
627
    HistoryEvent event =
628
        HistoryEvent.newBuilder()
629
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
630
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
631
            .build();
632
    ctx.addEvent(event);
633
  }
634

635
  private static void childWorkflowStarted(
636
      RequestContext ctx,
637
      ChildWorkflowData data,
638
      ChildWorkflowExecutionStartedEventAttributes a,
639
      long notUsed) {
640
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
641
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
642
    HistoryEvent event =
643
        HistoryEvent.newBuilder()
644
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
645
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
646
            .build();
647
    long startedEventId = ctx.addEvent(event);
648
    ctx.onCommit(
649
        (historySize) -> {
650
          data.startedEventId = startedEventId;
651
          data.execution = updatedAttr.getWorkflowExecution();
652
        });
653
  }
654

655
  private static void childWorkflowCompleted(
656
      RequestContext ctx,
657
      ChildWorkflowData data,
658
      ChildWorkflowExecutionCompletedEventAttributes a,
659
      long notUsed) {
660
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
661
        a.toBuilder()
662
            .setInitiatedEventId(data.initiatedEventId)
663
            .setStartedEventId(data.startedEventId)
664
            .build();
665
    HistoryEvent event =
666
        HistoryEvent.newBuilder()
667
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
668
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
669
            .build();
670
    ctx.addEvent(event);
671
  }
672

673
  private static void childWorkflowFailed(
674
      RequestContext ctx,
675
      ChildWorkflowData data,
676
      ChildWorkflowExecutionFailedEventAttributes a,
677
      long notUsed) {
678
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
679
        a.toBuilder()
680
            .setInitiatedEventId(data.initiatedEventId)
681
            .setStartedEventId(data.startedEventId)
682
            .setWorkflowExecution(data.execution)
683
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
684
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
685
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
686
    }
687
    HistoryEvent event =
688
        HistoryEvent.newBuilder()
689
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
690
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
691
            .build();
692
    ctx.addEvent(event);
693
  }
694

695
  private static void childWorkflowCanceled(
696
      RequestContext ctx,
697
      ChildWorkflowData data,
698
      ChildWorkflowExecutionCanceledEventAttributes a,
699
      long notUsed) {
700
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
701
        a.toBuilder()
702
            .setInitiatedEventId(data.initiatedEventId)
703
            .setStartedEventId(data.startedEventId)
704
            .build();
705
    HistoryEvent event =
706
        HistoryEvent.newBuilder()
707
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
708
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
709
            .build();
710
    ctx.addEvent(event);
711
  }
712

713
  private static void initiateChildWorkflow(
714
      RequestContext ctx,
715
      ChildWorkflowData data,
716
      StartChildWorkflowExecutionCommandAttributes d,
717
      long workflowTaskCompletedEventId) {
718
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
719
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
720
            .setControl(d.getControl())
721
            .setInput(d.getInput())
722
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
723
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
724
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
725
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
726
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
727
            .setTaskQueue(d.getTaskQueue())
728
            .setWorkflowId(d.getWorkflowId())
729
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
730
            .setWorkflowType(d.getWorkflowType())
731
            .setCronSchedule(d.getCronSchedule())
732
            .setParentClosePolicy(d.getParentClosePolicy());
733
    if (d.hasHeader()) {
734
      a.setHeader(d.getHeader());
735
    }
736
    if (d.hasMemo()) {
737
      a.setMemo(d.getMemo());
738
    }
739
    if (d.hasRetryPolicy()) {
740
      a.setRetryPolicy(d.getRetryPolicy());
741
    }
742
    HistoryEvent event =
743
        HistoryEvent.newBuilder()
744
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
745
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
746
            .build();
747
    long initiatedEventId = ctx.addEvent(event);
748
    ctx.onCommit(
749
        (historySize) -> {
750
          data.initiatedEventId = initiatedEventId;
751
          data.initiatedEvent = a.build();
752
          StartWorkflowExecutionRequest.Builder startChild =
753
              StartWorkflowExecutionRequest.newBuilder()
754
                  .setRequestId(UUID.randomUUID().toString())
755
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
756
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
757
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
758
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
759
                  .setTaskQueue(d.getTaskQueue())
760
                  .setWorkflowId(d.getWorkflowId())
761
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
762
                  .setWorkflowType(d.getWorkflowType())
763
                  .setCronSchedule(d.getCronSchedule());
764
          if (d.hasHeader()) {
765
            startChild.setHeader(d.getHeader());
766
          }
767
          if (d.hasSearchAttributes()) {
768
            startChild.setSearchAttributes(d.getSearchAttributes());
769
          }
770
          if (d.hasMemo()) {
771
            startChild.setMemo(d.getMemo());
772
          }
773
          if (d.hasRetryPolicy()) {
774
            startChild.setRetryPolicy(d.getRetryPolicy());
775
          }
776
          if (d.hasInput()) {
777
            startChild.setInput(d.getInput());
778
          }
779
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
780
        });
781
  }
782

783
  private static void addStartChildTask(
784
      RequestContext ctx,
785
      ChildWorkflowData data,
786
      long initiatedEventId,
787
      StartWorkflowExecutionRequest startChild) {
788
    ForkJoinPool.commonPool()
789
        .execute(
790
            () -> {
791
              try {
792
                data.service.startWorkflowExecutionImpl(
793
                    startChild,
794
                    java.time.Duration.ZERO,
795
                    Optional.of(ctx.getWorkflowMutableState()),
796
                    OptionalLong.of(data.initiatedEventId),
797
                    null);
798
              } catch (StatusRuntimeException e) {
799
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
800
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
801
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
802
                          .setInitiatedEventId(initiatedEventId)
803
                          .setCause(
804
                              StartChildWorkflowExecutionFailedCause
805
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
806
                          .build();
807
                  try {
808
                    ctx.getWorkflowMutableState()
809
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
810
                  } catch (Throwable ee) {
811
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
812
                  }
813
                } else {
814
                  log.error("Unexpected failure starting a child workflow", e);
815
                }
816
              } catch (Exception e) {
817
                log.error("Unexpected failure starting a child workflow", e);
818
              }
819
            });
820
  }
821

822
  private static void startWorkflow(
823
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
824
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
825
      throw Status.INVALID_ARGUMENT
826
          .withDescription("negative workflowExecution timeout")
827
          .asRuntimeException();
828
    }
829
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
830
      throw Status.INVALID_ARGUMENT
831
          .withDescription("negative workflowRun timeout")
832
          .asRuntimeException();
833
    }
834
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
835
      throw Status.INVALID_ARGUMENT
836
          .withDescription("negative workflowTaskTimeoutSeconds")
837
          .asRuntimeException();
838
    }
839

840
    WorkflowExecutionStartedEventAttributes.Builder a =
841
        WorkflowExecutionStartedEventAttributes.newBuilder()
842
            .setWorkflowType(request.getWorkflowType())
843
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
844
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
845
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
846
            .setIdentity(request.getIdentity())
847
            .setInput(request.getInput())
848
            .setTaskQueue(request.getTaskQueue())
849
            .setAttempt(1);
850
    data.retryState.ifPresent(
851
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
852
    a.setFirstExecutionRunId(data.firstExecutionRunId);
853
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
854
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
855
    if (data.lastCompletionResult != null) {
856
      a.setLastCompletionResult(data.lastCompletionResult);
857
    }
858
    data.lastFailure.ifPresent(a::setContinuedFailure);
859
    if (request.hasMemo()) {
860
      a.setMemo(request.getMemo());
861
    }
862
    if (request.hasSearchAttributes()) {
863
      a.setSearchAttributes((request.getSearchAttributes()));
864
    }
865
    if (request.hasHeader()) {
866
      a.setHeader(request.getHeader());
867
    }
868
    String cronSchedule = request.getCronSchedule();
869
    if (!cronSchedule.trim().isEmpty()) {
870
      try {
871
        CronUtils.parseCron(cronSchedule);
872
        a.setCronSchedule(cronSchedule);
873
      } catch (Exception e) {
874
        throw Status.INVALID_ARGUMENT
875
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
876
            .withCause(e)
877
            .asRuntimeException();
878
      }
879
    }
880
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
881
    if (parent.isPresent()) {
882
      ExecutionId parentExecutionId = parent.get().getExecutionId();
883
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
884
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
885
    }
886
    HistoryEvent event =
887
        HistoryEvent.newBuilder()
888
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
889
            .setWorkflowExecutionStartedEventAttributes(a)
890
            .build();
891
    ctx.addEvent(event);
892
  }
893

894
  private static void completeWorkflow(
895
      RequestContext ctx,
896
      WorkflowData data,
897
      CompleteWorkflowExecutionCommandAttributes d,
898
      long workflowTaskCompletedEventId) {
899
    WorkflowExecutionCompletedEventAttributes.Builder a =
900
        WorkflowExecutionCompletedEventAttributes.newBuilder()
901
            .setResult(d.getResult())
902
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
903
    HistoryEvent event =
904
        HistoryEvent.newBuilder()
905
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
906
            .setWorkflowExecutionCompletedEventAttributes(a)
907
            .build();
908
    ctx.addEvent(event);
909
  }
910

911
  private static void continueAsNewWorkflow(
912
      RequestContext ctx,
913
      WorkflowData data,
914
      ContinueAsNewWorkflowExecutionCommandAttributes d,
915
      long workflowTaskCompletedEventId) {
916
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
917
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
918
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
919
    a.setInput(d.getInput());
920
    if (d.hasHeader()) {
921
      a.setHeader(d.getHeader());
922
    }
923
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
924
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
925
    } else {
926
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
927
    }
928
    if (d.hasTaskQueue()) {
929
      a.setTaskQueue(d.getTaskQueue());
930
    } else {
931
      a.setTaskQueue(sr.getTaskQueue());
932
    }
933
    if (d.hasWorkflowType()) {
934
      a.setWorkflowType(d.getWorkflowType());
935
    } else {
936
      a.setWorkflowType(sr.getWorkflowType());
937
    }
938
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
939
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
940
    } else {
941
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
942
    }
943
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
944
    a.setBackoffStartInterval(d.getBackoffStartInterval());
945
    if (d.hasLastCompletionResult()) {
946
      a.setLastCompletionResult(d.getLastCompletionResult());
947
    }
948
    if (d.hasFailure()) {
949
      a.setFailure(d.getFailure());
950
    }
951
    a.setNewExecutionRunId(UUID.randomUUID().toString());
952
    HistoryEvent event =
953
        HistoryEvent.newBuilder()
954
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
955
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
956
            .build();
957
    ctx.addEvent(event);
958
  }
959

960
  private static void failWorkflow(
961
      RequestContext ctx,
962
      WorkflowData data,
963
      FailWorkflowExecutionCommandAttributes d,
964
      long workflowTaskCompletedEventId) {
965
    WorkflowExecutionFailedEventAttributes.Builder a =
966
        WorkflowExecutionFailedEventAttributes.newBuilder()
967
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
968
    if (d.hasFailure()) {
969
      a.setFailure(d.getFailure());
970
    }
971
    HistoryEvent event =
972
        HistoryEvent.newBuilder()
973
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
974
            .setWorkflowExecutionFailedEventAttributes(a)
975
            .build();
976
    ctx.addEvent(event);
977
  }
978

979
  private static void timeoutWorkflow(
980
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
981
    WorkflowExecutionTimedOutEventAttributes.Builder a =
982
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
983
    HistoryEvent event =
984
        HistoryEvent.newBuilder()
985
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
986
            .setWorkflowExecutionTimedOutEventAttributes(a)
987
            .build();
988
    ctx.addEvent(event);
989
  }
990

991
  private static void cancelWorkflow(
992
      RequestContext ctx,
993
      WorkflowData data,
994
      CancelWorkflowExecutionCommandAttributes d,
995
      long workflowTaskCompletedEventId) {
996
    WorkflowExecutionCanceledEventAttributes.Builder a =
997
        WorkflowExecutionCanceledEventAttributes.newBuilder()
998
            .setDetails(d.getDetails())
999
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1000
    HistoryEvent event =
1001
        HistoryEvent.newBuilder()
1002
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1003
            .setWorkflowExecutionCanceledEventAttributes(a)
1004
            .build();
1005
    ctx.addEvent(event);
1006
  }
1007

1008
  private static void terminateWorkflow(
1009
      RequestContext ctx,
1010
      WorkflowData data,
1011
      TerminateWorkflowExecutionRequest d,
1012
      long workflowTaskCompletedEventId) {
1013
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1014
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1015
            .setDetails(d.getDetails())
1016
            .setIdentity(d.getIdentity())
1017
            .setReason(d.getReason());
1018
    HistoryEvent event =
1019
        HistoryEvent.newBuilder()
1020
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1021
            .setWorkflowExecutionTerminatedEventAttributes(a)
1022
            .build();
1023
    ctx.addEvent(event);
1024
  }
1025

1026
  private static void requestWorkflowCancellation(
1027
      RequestContext ctx,
1028
      WorkflowData data,
1029
      RequestCancelWorkflowExecutionRequest cancelRequest,
1030
      long notUsed) {
1031
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1032
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1033
            .setIdentity(cancelRequest.getIdentity());
1034
    HistoryEvent cancelRequested =
1035
        HistoryEvent.newBuilder()
1036
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1037
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1038
            .build();
1039
    ctx.addEvent(cancelRequested);
1040
  }
1041

1042
  private static void scheduleActivityTask(
1043
      RequestContext ctx,
1044
      ActivityTaskData data,
1045
      ScheduleActivityTaskCommandAttributes d,
1046
      long workflowTaskCompletedEventId) {
1047
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1048
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1049
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1050
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1051

1052
    ActivityTaskScheduledEventAttributes.Builder a =
1053
        ActivityTaskScheduledEventAttributes.newBuilder()
1054
            .setInput(d.getInput())
1055
            .setActivityId(d.getActivityId())
1056
            .setActivityType(d.getActivityType())
1057
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1058
            .setRetryPolicy(retryPolicy)
1059
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1060
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1061
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1062
            .setTaskQueue(d.getTaskQueue())
1063
            .setHeader(d.getHeader())
1064
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1065

1066
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1067
    data.scheduledEvent = a.build();
1068
    HistoryEvent event =
1069
        HistoryEvent.newBuilder()
1070
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1071
            .setActivityTaskScheduledEventAttributes(a)
1072
            .build();
1073
    long scheduledEventId = ctx.addEvent(event);
1074

1075
    PollActivityTaskQueueResponse.Builder taskResponse =
1076
        PollActivityTaskQueueResponse.newBuilder()
1077
            .setWorkflowNamespace(ctx.getNamespace())
1078
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1079
            .setActivityType(d.getActivityType())
1080
            .setWorkflowExecution(ctx.getExecution())
1081
            .setActivityId(d.getActivityId())
1082
            .setInput(d.getInput())
1083
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1084
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1085
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1086
            .setScheduledTime(ctx.currentTime())
1087
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1088
            .setHeader(d.getHeader())
1089
            .setAttempt(1);
1090

1091
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1092
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1093
    ctx.addActivityTask(activityTask);
1094
    ctx.onCommit(
1095
        (historySize) -> {
1096
          data.scheduledEventId = scheduledEventId;
1097
          data.activityTask = activityTask;
1098
          data.retryState = retryState;
1099
        });
1100
  }
1101

1102
  private static void requestActivityCancellation(
1103
      RequestContext ctx,
1104
      ActivityTaskData data,
1105
      RequestCancelActivityTaskCommandAttributes d,
1106
      long workflowTaskCompletedEventId) {
1107
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1108
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1109
            .setScheduledEventId(d.getScheduledEventId())
1110
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1111
    HistoryEvent event =
1112
        HistoryEvent.newBuilder()
1113
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1114
            .setActivityTaskCancelRequestedEventAttributes(a)
1115
            .build();
1116
    ctx.addEvent(event);
1117
  }
1118

1119
  private static void scheduleWorkflowTask(
1120
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1121
    StartWorkflowExecutionRequest request = data.startRequest;
1122
    long scheduledEventId;
1123
    TaskQueue taskQueue = request.getTaskQueue();
1124
    WorkflowTaskScheduledEventAttributes a =
1125
        WorkflowTaskScheduledEventAttributes.newBuilder()
1126
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1127
            .setTaskQueue(taskQueue)
1128
            .setAttempt(++data.attempt)
1129
            .build();
1130
    HistoryEvent event =
1131
        HistoryEvent.newBuilder()
1132
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1133
            .setWorkflowTaskScheduledEventAttributes(a)
1134
            .build();
1135
    scheduledEventId = ctx.addEvent(event);
1136
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1137
        PollWorkflowTaskQueueResponse.newBuilder();
1138
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1139
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1140
    workflowTaskResponse.setAttempt(data.attempt);
1141
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1142
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1143
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1144
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1145
    ctx.setWorkflowTaskForMatching(workflowTask);
1146
    ctx.onCommit(
1147
        (historySize) -> {
1148
          data.scheduledEventId = scheduledEventId;
1149
          data.workflowTask = workflowTaskResponse;
1150
          // Move buffered update request to new workflow task
1151
          data.updateRequest.putAll(data.updateRequestBuffer);
1152
          data.updateRequestBuffer.clear();
1153
        });
1154
  }
1155

1156
  private static void convertQueryWorkflowTaskToReal(
1157
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1158
    StartWorkflowExecutionRequest request = data.startRequest;
1159
    WorkflowTaskScheduledEventAttributes a =
1160
        WorkflowTaskScheduledEventAttributes.newBuilder()
1161
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1162
            .setTaskQueue(request.getTaskQueue())
1163
            .setAttempt(data.attempt)
1164
            .build();
1165
    HistoryEvent event =
1166
        HistoryEvent.newBuilder()
1167
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1168
            .setWorkflowTaskScheduledEventAttributes(a)
1169
            .build();
1170
    long scheduledEventId = ctx.addEvent(event);
1171
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
1172
  }
1173

1174
  private static void scheduleQueryWorkflowTask(
1175
      RequestContext ctx,
1176
      WorkflowTaskData data,
1177
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1178
      long notUsed) {
1179
    ctx.lockTimer("scheduleQueryWorkflowTask");
1180
    StartWorkflowExecutionRequest request = data.startRequest;
1181
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1182
        PollWorkflowTaskQueueResponse.newBuilder();
1183
    StickyExecutionAttributes stickyAttributes =
1184
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
1185
    String taskQueue =
1186
        stickyAttributes == null
1187
            ? request.getTaskQueue().getName()
1188
            : stickyAttributes.getWorkerTaskQueue().getName();
1189
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1190
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1191
    workflowTaskResponse.setAttempt(++data.attempt);
1192
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
1193
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1194
    ctx.setWorkflowTaskForMatching(workflowTask);
1195
    ctx.onCommit(
1196
        (historySize) -> {
1197
          if (data.lastSuccessfulStartedEventId > 0) {
1198
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1199
          }
1200
          data.scheduledEventId = NO_EVENT_ID;
1201
          data.workflowTask = workflowTaskResponse;
1202
          if (query != null) {
1203
            data.consistentQueryRequests.put(query.getKey(), query);
1204
          }
1205
        });
1206
  }
1207

1208
  private static void queryWhileScheduled(
1209
      RequestContext ctx,
1210
      WorkflowTaskData data,
1211
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1212
      long notUsed) {
1213
    data.consistentQueryRequests.put(query.getKey(), query);
1214
  }
1215

1216
  private static void bufferQuery(
1217
      RequestContext ctx,
1218
      WorkflowTaskData data,
1219
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1220
      long notUsed) {
1221
    data.queryBuffer.put(query.getKey(), query);
1222
  }
1223

1224
  private static void bufferUpdate(
1225
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1226
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1227
      throw Status.INTERNAL
1228
          .withDescription("Update ID already exists: " + update.getId())
1229
          .asRuntimeException();
1230
    }
1231
    data.updateRequestBuffer.put(update.getId(), update);
1232
  }
1233

1234
  private static void addUpdate(
1235
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1236
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1237
      throw Status.INTERNAL
1238
          .withDescription("Update ID already exists: " + update.getId())
1239
          .asRuntimeException();
1240
    }
1241
    data.updateRequest.put(update.getId(), update);
1242
  }
1243

1244
  private static void startWorkflowTask(
1245
      RequestContext ctx,
1246
      WorkflowTaskData data,
1247
      PollWorkflowTaskQueueRequest request,
1248
      long notUsed) {
1249
    WorkflowTaskStartedEventAttributes a =
1250
        WorkflowTaskStartedEventAttributes.newBuilder()
1251
            .setIdentity(request.getIdentity())
1252
            .setScheduledEventId(data.scheduledEventId)
1253
            .build();
1254
    HistoryEvent event =
1255
        HistoryEvent.newBuilder()
1256
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1257
            .setWorkflowTaskStartedEventAttributes(a)
1258
            .build();
1259
    long startedEventId = ctx.addEvent(event);
1260
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1261
  }
1262

1263
  private static void startQueryOnlyWorkflowTask(
1264
      RequestContext ctx,
1265
      WorkflowTaskData data,
1266
      PollWorkflowTaskQueueRequest request,
1267
      long notUsed) {
1268
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
1269
  }
1270

1271
  private static void startWorkflowTaskImpl(
1272
      RequestContext ctx,
1273
      WorkflowTaskData data,
1274
      PollWorkflowTaskQueueRequest request,
1275
      long startedEventId,
1276
      boolean queryOnly) {
1277
    ctx.onCommit(
1278
        (historySize) -> {
1279
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1280
          task.setStartedEventId(data.scheduledEventId + 1);
1281
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1282
          task.setTaskToken(taskToken.toBytes());
1283
          GetWorkflowExecutionHistoryRequest getRequest =
1284
              GetWorkflowExecutionHistoryRequest.newBuilder()
1285
                  .setNamespace(request.getNamespace())
1286
                  .setExecution(ctx.getExecution())
1287
                  .build();
1288
          List<HistoryEvent> events;
1289
          events =
1290
              data.store
1291
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1292
                  .getHistory()
1293
                  .getEventsList();
1294
          long lastEventId = events.get(events.size() - 1).getEventId();
1295
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1296
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1297
          }
1298
          if (queryOnly && !data.workflowCompleted) {
1299
            events = new ArrayList<>(events); // convert list to mutable
1300
            // Add "fake" workflow task scheduled and started if workflow is not closed
1301
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1302
                WorkflowTaskScheduledEventAttributes.newBuilder()
1303
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
1304
                    .setTaskQueue(request.getTaskQueue())
1305
                    .setAttempt(data.attempt)
1306
                    .build();
1307
            HistoryEvent scheduledEvent =
1308
                HistoryEvent.newBuilder()
1309
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1310
                    .setEventId(lastEventId + 1)
1311
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
1312
                    .build();
1313
            events.add(scheduledEvent);
1314
            WorkflowTaskStartedEventAttributes startedAttributes =
1315
                WorkflowTaskStartedEventAttributes.newBuilder()
1316
                    .setIdentity(request.getIdentity())
1317
                    .setScheduledEventId(lastEventId + 1)
1318
                    .build();
1319
            HistoryEvent startedEvent =
1320
                HistoryEvent.newBuilder()
1321
                    .setEventId(lastEventId + 1)
1322
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1323
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
1324
                    .build();
1325
            events.add(startedEvent);
1326
            task.setStartedEventId(lastEventId + 2);
1327
          }
1328
          // get it from previous started event id.
1329
          task.setHistory(History.newBuilder().addAllEvents(events));
1330
          // Transfer the queries
1331
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1332
              data.consistentQueryRequests;
1333
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1334
              queries.entrySet()) {
1335
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1336
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1337
          }
1338
          // Transfer the messages
1339
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1340
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1341
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1342
            Message updateMessage =
1343
                Message.newBuilder()
1344
                    .setId(update.getKey() + "/request")
1345
                    .setProtocolInstanceId(update.getKey())
1346
                    .setEventId(data.scheduledEventId)
1347
                    .setBody(Any.pack(updateRequest.getRequest()))
1348
                    .build();
1349
            task.addMessages(updateMessage);
1350
          }
1351
          if (data.lastSuccessfulStartedEventId > 0) {
1352
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1353
          }
1354
          if (!queryOnly) {
1355
            data.startedEventId = startedEventId;
1356
          }
1357
        });
1358
  }
1359

1360
  private static void startActivityTask(
1361
      RequestContext ctx,
1362
      ActivityTaskData data,
1363
      PollActivityTaskQueueRequest request,
1364
      long notUsed) {
1365
    ActivityTaskStartedEventAttributes.Builder a =
1366
        ActivityTaskStartedEventAttributes.newBuilder()
1367
            .setIdentity(request.getIdentity())
1368
            .setScheduledEventId(data.scheduledEventId);
1369
    a.setAttempt(data.getAttempt());
1370
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1371
    // history. But in the case of retry it happens only after an activity completion.
1372
    Timestamp timestamp = data.store.currentTime();
1373
    HistoryEvent event =
1374
        HistoryEvent.newBuilder()
1375
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1376
            .setEventTime(timestamp)
1377
            .setActivityTaskStartedEventAttributes(a)
1378
            .build();
1379
    long startedEventId;
1380
    startedEventId = NO_EVENT_ID;
1381
    ctx.onCommit(
1382
        (historySize) -> {
1383
          data.startedEventId = startedEventId;
1384
          data.startedEvent = event;
1385
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1386
          task.setTaskToken(
1387
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1388
                  .toBytes());
1389
          task.setStartedTime(timestamp);
1390
        });
1391
  }
1392

1393
  private static void completeWorkflowTask(
1394
      RequestContext ctx,
1395
      WorkflowTaskData data,
1396
      RespondWorkflowTaskCompletedRequest request,
1397
      long notUsed) {
1398
    WorkflowTaskCompletedEventAttributes.Builder a =
1399
        WorkflowTaskCompletedEventAttributes.newBuilder()
1400
            .setIdentity(request.getIdentity())
1401
            .setBinaryChecksum(request.getBinaryChecksum())
1402
            .setMeteringMetadata(request.getMeteringMetadata())
1403
            .setScheduledEventId(data.scheduledEventId);
1404
    HistoryEvent event =
1405
        HistoryEvent.newBuilder()
1406
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1407
            .setWorkflowTaskCompletedEventAttributes(a)
1408
            .build();
1409
    ctx.addEvent(event);
1410
    ctx.onCommit(
1411
        (historySize) -> {
1412
          data.lastSuccessfulStartedEventId = data.startedEventId;
1413
          data.clear();
1414
        });
1415
  }
1416

1417
  private static void completeQuery(
1418
      RequestContext ctx,
1419
      WorkflowTaskData data,
1420
      RespondWorkflowTaskCompletedRequest request,
1421
      long notUsed) {
1422
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
1423
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
1424
      TestWorkflowMutableStateImpl.ConsistentQuery query =
1425
          data.consistentQueryRequests.remove(resultEntry.getKey());
1426
      if (query != null) {
1427
        WorkflowQueryResult value = resultEntry.getValue();
1428
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
1429
        switch (value.getResultType()) {
1430
          case QUERY_RESULT_TYPE_ANSWERED:
1431
            QueryWorkflowResponse response =
1432
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
1433
            result.complete(response);
1434
            break;
1435
          case QUERY_RESULT_TYPE_FAILED:
1436
            result.completeExceptionally(
1437
                StatusUtils.newException(
1438
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
1439
                    QueryFailedFailure.getDefaultInstance(),
1440
                    QueryFailedFailure.getDescriptor()));
1441
            break;
1442
          default:
1443
            throw Status.INVALID_ARGUMENT
1444
                .withDescription("Invalid query result type: " + value.getResultType())
1445
                .asRuntimeException();
1446
        }
1447
      }
1448
    }
1449
    ctx.onCommit(
1450
        (historySize) -> {
1451
          data.clear();
1452
          ctx.unlockTimer("completeQuery");
1453
        });
1454
  }
1455

1456
  private static void failQueryWorkflowTask(
1457
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1458
    data.consistentQueryRequests
1459
        .entrySet()
1460
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
1461
    if (!data.consistentQueryRequests.isEmpty()) {
1462
      ctx.setNeedWorkflowTask(true);
1463
    }
1464
    ctx.unlockTimer("failQueryWorkflowTask");
1465
  }
1466

1467
  private static void failWorkflowTask(
1468
      RequestContext ctx,
1469
      WorkflowTaskData data,
1470
      RespondWorkflowTaskFailedRequest request,
1471
      long notUsed) {
1472
    WorkflowTaskFailedEventAttributes.Builder a =
1473
        WorkflowTaskFailedEventAttributes.newBuilder()
1474
            .setIdentity(request.getIdentity())
1475
            .setStartedEventId(data.startedEventId)
1476
            .setScheduledEventId(data.scheduledEventId)
1477
            .setCause(request.getCause());
1478
    if (request.hasFailure()) {
1479
      a.setFailure(request.getFailure());
1480
    }
1481
    HistoryEvent event =
1482
        HistoryEvent.newBuilder()
1483
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1484
            .setWorkflowTaskFailedEventAttributes(a)
1485
            .build();
1486
    ctx.addEvent(event);
1487
    ctx.setNeedWorkflowTask(true);
1488
  }
1489

1490
  private static void timeoutWorkflowTask(
1491
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1492
    WorkflowTaskTimedOutEventAttributes.Builder a =
1493
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1494
            .setStartedEventId(data.startedEventId)
1495
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1496
            .setScheduledEventId(data.scheduledEventId);
1497
    HistoryEvent event =
1498
        HistoryEvent.newBuilder()
1499
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1500
            .setWorkflowTaskTimedOutEventAttributes(a)
1501
            .build();
1502
    ctx.addEvent(event);
1503
    ctx.setNeedWorkflowTask(true);
1504
  }
1505

1506
  private static void needsWorkflowTask(
1507
      RequestContext requestContext,
1508
      WorkflowTaskData workflowTaskData,
1509
      Object notUsedRequest,
1510
      long notUsed) {
1511
    requestContext.setNeedWorkflowTask(true);
1512
  }
1513

1514
  private static void completeActivityTask(
1515
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1516
    data.startedEventId = ctx.addEvent(data.startedEvent);
1517
    if (request instanceof RespondActivityTaskCompletedRequest) {
1518
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1519
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
1520
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
1521
    } else {
1522
      throw new IllegalArgumentException("Unknown request: " + request);
1523
    }
1524
  }
1525

1526
  private static void completeActivityTaskByTaskToken(
1527
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1528
    ActivityTaskCompletedEventAttributes.Builder a =
1529
        ActivityTaskCompletedEventAttributes.newBuilder()
1530
            .setIdentity(request.getIdentity())
1531
            .setScheduledEventId(data.scheduledEventId)
1532
            .setResult(request.getResult())
1533
            .setStartedEventId(data.startedEventId);
1534
    HistoryEvent event =
1535
        HistoryEvent.newBuilder()
1536
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1537
            .setActivityTaskCompletedEventAttributes(a)
1538
            .build();
1539
    ctx.addEvent(event);
1540
  }
1541

1542
  private static void completeActivityTaskById(
1543
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1544
    ActivityTaskCompletedEventAttributes.Builder a =
1545
        ActivityTaskCompletedEventAttributes.newBuilder()
1546
            .setIdentity(request.getIdentity())
1547
            .setScheduledEventId(data.scheduledEventId)
1548
            .setResult(request.getResult())
1549
            .setStartedEventId(data.startedEventId);
1550
    HistoryEvent event =
1551
        HistoryEvent.newBuilder()
1552
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1553
            .setActivityTaskCompletedEventAttributes(a)
1554
            .build();
1555
    ctx.addEvent(event);
1556
  }
1557

1558
  private static State failActivityTask(
1559
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1560
    if (request instanceof RespondActivityTaskFailedRequest) {
1561
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1562
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1563
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
1564
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
1565
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1566
    } else {
1567
      throw new IllegalArgumentException("Unknown request: " + request);
1568
    }
1569
  }
1570

1571
  private static State failActivityTaskByRequestType(
1572
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1573
    if (!failure.hasApplicationFailureInfo()) {
1574
      throw new IllegalArgumentException(
1575
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1576
    }
1577
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1578
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1579
      return INITIATED;
1580
    }
1581
    data.startedEventId = ctx.addEvent(data.startedEvent);
1582
    ActivityTaskFailedEventAttributes.Builder attributes =
1583
        ActivityTaskFailedEventAttributes.newBuilder()
1584
            .setIdentity(identity)
1585
            .setScheduledEventId(data.scheduledEventId)
1586
            .setFailure(failure)
1587
            .setRetryState(retryState)
1588
            .setStartedEventId(data.startedEventId);
1589
    HistoryEvent event =
1590
        HistoryEvent.newBuilder()
1591
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1592
            .setActivityTaskFailedEventAttributes(attributes)
1593
            .build();
1594
    ctx.addEvent(event);
1595
    return FAILED;
1596
  }
1597

1598
  private static State timeoutActivityTask(
1599
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1600
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1601

1602
    // chaining with the previous run failure if we are preparing the final failure
1603
    Failure failure =
1604
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1605

1606
    RetryState retryState;
1607
    switch (timeoutType) {
1608
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1609
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1610
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1611
        // set a larger ScheduleToStart timeout.
1612
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1613
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1614
        break;
1615
      case TIMEOUT_TYPE_START_TO_CLOSE:
1616
      case TIMEOUT_TYPE_HEARTBEAT:
1617
        // not chaining with the previous run failure if we are preparing the failure to be stored
1618
        // for the next iteration
1619
        Optional<Failure> lastFailure =
1620
            Optional.of(
1621
                newTimeoutFailure(
1622
                    timeoutType,
1623
                    // we move heartbeatDetails to the new top level (this cause is used for
1624
                    // scheduleToClose only)
1625
                    Optional.empty(),
1626
                    // prune to don't have too deep nesting of failures
1627
                    Optional.empty()));
1628

1629
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1630
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1631
          return INITIATED;
1632
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1633
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
1634
          // attemptActivityRetry();
1635
          // start to close timeout would return as "max attempts reached".
1636

1637
          Preconditions.checkState(
1638
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
1639
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
1640
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
1641
              timeoutType);
1642

1643
          // heartbeat is preserved as the cause for the scheduleToClose timeout
1644
          // But we effectively omit startToClose timeout with scheduleToClose timeout
1645
          Optional<Failure> cause =
1646
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1647

1648
          failure =
1649
              newTimeoutFailure(
1650
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1651
                  Optional.ofNullable(data.heartbeatDetails),
1652
                  cause);
1653
        }
1654
        break;
1655
      default:
1656
        throw new IllegalStateException(
1657
            "Not implemented behavior for timeout type: " + timeoutType);
1658
    }
1659

1660
    ActivityTaskTimedOutEventAttributes.Builder a =
1661
        ActivityTaskTimedOutEventAttributes.newBuilder()
1662
            .setScheduledEventId(data.scheduledEventId)
1663
            .setRetryState(retryState)
1664
            .setStartedEventId(data.startedEventId)
1665
            .setFailure(failure);
1666
    HistoryEvent event =
1667
        HistoryEvent.newBuilder()
1668
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1669
            .setActivityTaskTimedOutEventAttributes(a)
1670
            .build();
1671
    ctx.addEvent(event);
1672
    return TIMED_OUT;
1673
  }
1674

1675
  private static Failure newTimeoutFailure(
1676
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
1677
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1678
    if (lastHeartbeatDetails.isPresent()) {
1679
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1680
    }
1681
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1682
    if (cause.isPresent()) {
1683
      result.setCause(cause.get());
1684
    }
1685
    return result.build();
1686
  }
1687

1688
  private static RetryState attemptActivityRetry(
1689
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
1690
    if (data.retryState == null) {
1691
      throw new IllegalStateException("RetryPolicy is always present");
1692
    }
1693
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1694
    if (info.isPresent()) {
1695
      if (info.get().getNonRetryable()) {
1696
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1697
      }
1698
    }
1699
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1700
    TestServiceRetryState.BackoffInterval backoffInterval =
1701
        data.retryState.getBackoffIntervalInSeconds(
1702
            info.map(ApplicationFailureInfo::getType), data.store.currentTime());
1703
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1704
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1705
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1706
      if (data.heartbeatDetails != null) {
1707
        task.setHeartbeatDetails(data.heartbeatDetails);
1708
      }
1709
      ctx.onCommit(
1710
          (historySize) -> {
1711
            data.retryState = nextAttempt;
1712
            task.setAttempt(nextAttempt.getAttempt());
1713
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1714
          });
1715
    } else {
1716
      data.nextBackoffInterval = Durations.ZERO;
1717
    }
1718
    return backoffInterval.getRetryState();
1719
  }
1720

1721
  private static void reportActivityTaskCancellation(
1722
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1723
    Payloads details = null;
1724
    if (request instanceof RespondActivityTaskCanceledRequest) {
1725
      {
1726
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1727

1728
        details = cr.hasDetails() ? cr.getDetails() : null;
1729
      }
1730
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1731
      {
1732
        RespondActivityTaskCanceledByIdRequest cr =
1733
            (RespondActivityTaskCanceledByIdRequest) request;
1734
        details = cr.hasDetails() ? cr.getDetails() : null;
1735
      }
1736
    } else if (request != null) {
1737
      throw Status.INTERNAL
1738
          .withDescription("Unexpected request type: " + request)
1739
          .asRuntimeException();
1740
    }
1741
    ActivityTaskCanceledEventAttributes.Builder a =
1742
        ActivityTaskCanceledEventAttributes.newBuilder()
1743
            .setScheduledEventId(data.scheduledEventId)
1744
            .setStartedEventId(data.startedEventId);
1745
    if (details != null) {
1746
      a.setDetails(details);
1747
    }
1748
    HistoryEvent event =
1749
        HistoryEvent.newBuilder()
1750
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1751
            .setActivityTaskCanceledEventAttributes(a)
1752
            .build();
1753
    ctx.addEvent(event);
1754
  }
1755

1756
  private static void heartbeatActivityTask(
1757
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
1758
    data.heartbeatDetails = details;
1759
  }
1760

1761
  private static void acceptUpdate(
1762
      RequestContext ctx,
1763
      UpdateWorkflowExecutionData data,
1764
      Message msg,
1765
      long workflowTaskCompletedEventId) {
1766
    try {
1767
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1768

1769
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
1770
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1771
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1772
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1773
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1774
              .setAcceptedRequest(acceptance.getAcceptedRequest())
1775
              .build();
1776

1777
      HistoryEvent event =
1778
          HistoryEvent.newBuilder()
1779
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1780
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1781
              .build();
1782
      // If the workflow is finished we can't write more events
1783
      // to history so if the message was processed after the workflow
1784
      // was closed there is nothing we can do.
1785
      // The real server also has this same problem
1786
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1787
        ctx.addEvent(event);
1788
      }
1789

1790
      UpdateWorkflowExecutionResponse response =
1791
          UpdateWorkflowExecutionResponse.newBuilder()
1792
              .setUpdateRef(
1793
                  UpdateRef.newBuilder()
1794
                      .setWorkflowExecution(ctx.getExecution())
1795
                      .setUpdateId(data.id))
1796
              .build();
1797

1798
      data.acceptance.complete(response);
1799
    } catch (InvalidProtocolBufferException e) {
1800
      throw new RuntimeException(e);
1801
    }
1802
  }
1803

1804
  private static void completeUpdate(
1805
      RequestContext ctx,
1806
      UpdateWorkflowExecutionData data,
1807
      Message msg,
1808
      long workflowTaskCompletedEventId) {
1809
    try {
1810
      Response response = msg.getBody().unpack(Response.class);
1811

1812
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
1813
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1814
              .setMeta(response.getMeta())
1815
              .setOutcome(response.getOutcome())
1816
              .build();
1817

1818
      HistoryEvent event =
1819
          HistoryEvent.newBuilder()
1820
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1821
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1822
              .build();
1823
      // If the workflow is finished we can't write more events
1824
      // to history so if the message was processed after the workflow
1825
      // was closed there is nothing we can do.
1826
      // The real server also has this same problem
1827
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1828
        ctx.addEvent(event);
1829
      }
1830

1831
      UpdateWorkflowExecutionResponse updateResponse =
1832
          UpdateWorkflowExecutionResponse.newBuilder()
1833
              .setUpdateRef(
1834
                  UpdateRef.newBuilder()
1835
                      .setWorkflowExecution(ctx.getExecution())
1836
                      .setUpdateId(data.id))
1837
              .setOutcome(response.getOutcome())
1838
              .build();
1839

1840
      data.complete.complete(updateResponse);
1841
    } catch (InvalidProtocolBufferException e) {
1842
      throw new RuntimeException(e);
1843
    }
1844
  }
1845

1846
  private static void startTimer(
1847
      RequestContext ctx,
1848
      TimerData data,
1849
      StartTimerCommandAttributes d,
1850
      long workflowTaskCompletedEventId) {
1851
    TimerStartedEventAttributes.Builder a =
1852
        TimerStartedEventAttributes.newBuilder()
1853
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1854
            .setStartToFireTimeout(d.getStartToFireTimeout())
1855
            .setTimerId(d.getTimerId());
1856
    HistoryEvent event =
1857
        HistoryEvent.newBuilder()
1858
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1859
            .setTimerStartedEventAttributes(a)
1860
            .build();
1861
    long startedEventId = ctx.addEvent(event);
1862
    ctx.onCommit(
1863
        (historySize) -> {
1864
          data.startedEvent = a.build();
1865
          data.startedEventId = startedEventId;
1866
        });
1867
  }
1868

1869
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1870
    TimerFiredEventAttributes.Builder a =
1871
        TimerFiredEventAttributes.newBuilder()
1872
            .setTimerId(data.startedEvent.getTimerId())
1873
            .setStartedEventId(data.startedEventId);
1874
    HistoryEvent event =
1875
        HistoryEvent.newBuilder()
1876
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1877
            .setTimerFiredEventAttributes(a)
1878
            .build();
1879
    ctx.addEvent(event);
1880
  }
1881

1882
  private static void cancelTimer(
1883
      RequestContext ctx,
1884
      TimerData data,
1885
      CancelTimerCommandAttributes d,
1886
      long workflowTaskCompletedEventId) {
1887
    TimerCanceledEventAttributes.Builder a =
1888
        TimerCanceledEventAttributes.newBuilder()
1889
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1890
            .setTimerId(d.getTimerId())
1891
            .setStartedEventId(data.startedEventId);
1892
    HistoryEvent event =
1893
        HistoryEvent.newBuilder()
1894
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1895
            .setTimerCanceledEventAttributes(a)
1896
            .build();
1897
    ctx.addEvent(event);
1898
  }
1899

1900
  private static void initiateExternalSignal(
1901
      RequestContext ctx,
1902
      SignalExternalData data,
1903
      SignalExternalWorkflowExecutionCommandAttributes d,
1904
      long workflowTaskCompletedEventId) {
1905
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1906
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1907
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1908
            .setControl(d.getControl())
1909
            .setInput(d.getInput())
1910
            .setNamespace(d.getNamespace())
1911
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1912
            .setSignalName(d.getSignalName())
1913
            .setWorkflowExecution(d.getExecution());
1914

1915
    HistoryEvent event =
1916
        HistoryEvent.newBuilder()
1917
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1918
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1919
            .build();
1920
    long initiatedEventId = ctx.addEvent(event);
1921
    ctx.onCommit(
1922
        (historySize) -> {
1923
          data.initiatedEventId = initiatedEventId;
1924
          data.initiatedEvent = a.build();
1925
        });
1926
  }
1927

1928
  private static void failExternalSignal(
1929
      RequestContext ctx,
1930
      SignalExternalData data,
1931
      SignalExternalWorkflowExecutionFailedCause cause,
1932
      long notUsed) {
1933
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1934
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
1935
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1936
            .setInitiatedEventId(data.initiatedEventId)
1937
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1938
            .setControl(initiatedEvent.getControl())
1939
            .setCause(cause)
1940
            .setNamespace(initiatedEvent.getNamespace());
1941
    HistoryEvent event =
1942
        HistoryEvent.newBuilder()
1943
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1944
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1945
            .build();
1946
    ctx.addEvent(event);
1947
  }
1948

1949
  private static void completeExternalSignal(
1950
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1951
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1952
    WorkflowExecution signaledExecution =
1953
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1954
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
1955
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1956
            .setInitiatedEventId(data.initiatedEventId)
1957
            .setWorkflowExecution(signaledExecution)
1958
            .setControl(initiatedEvent.getControl())
1959
            .setNamespace(initiatedEvent.getNamespace());
1960
    HistoryEvent event =
1961
        HistoryEvent.newBuilder()
1962
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1963
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1964
            .build();
1965
    ctx.addEvent(event);
1966
  }
1967

1968
  private static void initiateExternalCancellation(
1969
      RequestContext ctx,
1970
      CancelExternalData data,
1971
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
1972
      long workflowTaskCompletedEventId) {
1973
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1974
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1975
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1976
            .setControl(d.getControl())
1977
            .setNamespace(d.getNamespace())
1978
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1979
            .setWorkflowExecution(
1980
                WorkflowExecution.newBuilder()
1981
                    .setWorkflowId(d.getWorkflowId())
1982
                    .setRunId(d.getRunId())
1983
                    .build());
1984

1985
    HistoryEvent event =
1986
        HistoryEvent.newBuilder()
1987
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1988
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1989
            .build();
1990
    long initiatedEventId = ctx.addEvent(event);
1991
    ctx.onCommit(
1992
        (historySize) -> {
1993
          data.initiatedEventId = initiatedEventId;
1994
          data.initiatedEvent = a.build();
1995
        });
1996
  }
1997

1998
  private static void reportExternalCancellationRequested(
1999
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2000
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
2001
        data.initiatedEvent;
2002
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2003
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
2004
            .setInitiatedEventId(data.initiatedEventId)
2005
            .setWorkflowExecution(
2006
                WorkflowExecution.newBuilder()
2007
                    .setRunId(runId)
2008
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
2009
                    .build())
2010
            .setNamespace(initiatedEvent.getNamespace());
2011
    HistoryEvent event =
2012
        HistoryEvent.newBuilder()
2013
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
2014
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
2015
            .build();
2016
    ctx.addEvent(event);
2017
  }
2018

2019
  private static void failExternalCancellation(
2020
      RequestContext ctx,
2021
      CancelExternalData data,
2022
      CancelExternalWorkflowExecutionFailedCause cause,
2023
      long notUsed) {
2024
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
2025
        data.initiatedEvent;
2026
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2027
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
2028
            .setInitiatedEventId(data.initiatedEventId)
2029
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
2030
            .setControl(initiatedEvent.getControl())
2031
            .setCause(cause)
2032
            .setNamespace(initiatedEvent.getNamespace());
2033
    HistoryEvent event =
2034
        HistoryEvent.newBuilder()
2035
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
2036
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
2037
            .build();
2038
    ctx.addEvent(event);
2039
  }
2040

2041
  // Mimics the default activity retry policy of a standard Temporal server.
2042
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2043
    Duration initialInterval =
2044
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
2045
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
2046
            : originalPolicy.getInitialInterval();
2047

2048
    return RetryPolicy.newBuilder()
2049
        .setInitialInterval(initialInterval)
2050
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
2051
        .setMaximumInterval(
2052
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
2053
                ? Durations.fromMillis(
2054
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2055
                        * Durations.toMillis(initialInterval))
2056
                : originalPolicy.getMaximumInterval())
2057
        .setBackoffCoefficient(
2058
            originalPolicy.getBackoffCoefficient() == 0
2059
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
2060
                : originalPolicy.getBackoffCoefficient())
2061
        .setMaximumAttempts(
2062
            originalPolicy.getMaximumAttempts() == 0
2063
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
2064
                : originalPolicy.getMaximumAttempts())
2065
        .build();
2066
  }
2067
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc