• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.38
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
36
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
37
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
38
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
39
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
40
import static io.temporal.internal.testservice.StateMachines.State.NONE;
41
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
42
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
43
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
44

45
import com.google.common.base.Preconditions;
46
import com.google.protobuf.Duration;
47
import com.google.protobuf.Timestamp;
48
import com.google.protobuf.util.Durations;
49
import com.google.protobuf.util.Timestamps;
50
import io.grpc.Status;
51
import io.grpc.StatusRuntimeException;
52
import io.temporal.api.command.v1.CancelTimerCommandAttributes;
53
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
54
import io.temporal.api.command.v1.CompleteWorkflowExecutionCommandAttributes;
55
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
56
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
57
import io.temporal.api.command.v1.RequestCancelActivityTaskCommandAttributes;
58
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
59
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
60
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
61
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
62
import io.temporal.api.command.v1.StartTimerCommandAttributes;
63
import io.temporal.api.common.v1.Payloads;
64
import io.temporal.api.common.v1.RetryPolicy;
65
import io.temporal.api.common.v1.WorkflowExecution;
66
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
67
import io.temporal.api.enums.v1.EventType;
68
import io.temporal.api.enums.v1.RetryState;
69
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
70
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
71
import io.temporal.api.enums.v1.TimeoutType;
72
import io.temporal.api.errordetails.v1.QueryFailedFailure;
73
import io.temporal.api.failure.v1.ApplicationFailureInfo;
74
import io.temporal.api.failure.v1.Failure;
75
import io.temporal.api.failure.v1.TimeoutFailureInfo;
76
import io.temporal.api.history.v1.ActivityTaskCancelRequestedEventAttributes;
77
import io.temporal.api.history.v1.ActivityTaskCanceledEventAttributes;
78
import io.temporal.api.history.v1.ActivityTaskCompletedEventAttributes;
79
import io.temporal.api.history.v1.ActivityTaskFailedEventAttributes;
80
import io.temporal.api.history.v1.ActivityTaskScheduledEventAttributes;
81
import io.temporal.api.history.v1.ActivityTaskStartedEventAttributes;
82
import io.temporal.api.history.v1.ActivityTaskTimedOutEventAttributes;
83
import io.temporal.api.history.v1.ChildWorkflowExecutionCanceledEventAttributes;
84
import io.temporal.api.history.v1.ChildWorkflowExecutionCompletedEventAttributes;
85
import io.temporal.api.history.v1.ChildWorkflowExecutionFailedEventAttributes;
86
import io.temporal.api.history.v1.ChildWorkflowExecutionStartedEventAttributes;
87
import io.temporal.api.history.v1.ChildWorkflowExecutionTimedOutEventAttributes;
88
import io.temporal.api.history.v1.ExternalWorkflowExecutionCancelRequestedEventAttributes;
89
import io.temporal.api.history.v1.ExternalWorkflowExecutionSignaledEventAttributes;
90
import io.temporal.api.history.v1.History;
91
import io.temporal.api.history.v1.HistoryEvent;
92
import io.temporal.api.history.v1.RequestCancelExternalWorkflowExecutionFailedEventAttributes;
93
import io.temporal.api.history.v1.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes;
94
import io.temporal.api.history.v1.SignalExternalWorkflowExecutionFailedEventAttributes;
95
import io.temporal.api.history.v1.SignalExternalWorkflowExecutionInitiatedEventAttributes;
96
import io.temporal.api.history.v1.StartChildWorkflowExecutionFailedEventAttributes;
97
import io.temporal.api.history.v1.StartChildWorkflowExecutionInitiatedEventAttributes;
98
import io.temporal.api.history.v1.TimerCanceledEventAttributes;
99
import io.temporal.api.history.v1.TimerFiredEventAttributes;
100
import io.temporal.api.history.v1.TimerStartedEventAttributes;
101
import io.temporal.api.history.v1.WorkflowExecutionCancelRequestedEventAttributes;
102
import io.temporal.api.history.v1.WorkflowExecutionCanceledEventAttributes;
103
import io.temporal.api.history.v1.WorkflowExecutionCompletedEventAttributes;
104
import io.temporal.api.history.v1.WorkflowExecutionContinuedAsNewEventAttributes;
105
import io.temporal.api.history.v1.WorkflowExecutionFailedEventAttributes;
106
import io.temporal.api.history.v1.WorkflowExecutionStartedEventAttributes;
107
import io.temporal.api.history.v1.WorkflowExecutionTerminatedEventAttributes;
108
import io.temporal.api.history.v1.WorkflowExecutionTimedOutEventAttributes;
109
import io.temporal.api.history.v1.WorkflowTaskCompletedEventAttributes;
110
import io.temporal.api.history.v1.WorkflowTaskFailedEventAttributes;
111
import io.temporal.api.history.v1.WorkflowTaskScheduledEventAttributes;
112
import io.temporal.api.history.v1.WorkflowTaskStartedEventAttributes;
113
import io.temporal.api.history.v1.WorkflowTaskTimedOutEventAttributes;
114
import io.temporal.api.query.v1.WorkflowQueryResult;
115
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
116
import io.temporal.api.taskqueue.v1.TaskQueue;
117
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
118
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
119
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
120
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
121
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
122
import io.temporal.api.workflowservice.v1.QueryWorkflowRequest;
123
import io.temporal.api.workflowservice.v1.QueryWorkflowResponse;
124
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
125
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledByIdRequest;
126
import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest;
127
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedByIdRequest;
128
import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest;
129
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedByIdRequest;
130
import io.temporal.api.workflowservice.v1.RespondActivityTaskFailedRequest;
131
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
132
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedRequest;
133
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionRequest;
134
import io.temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest;
135
import io.temporal.internal.common.ProtobufTimeUtils;
136
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
137
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
138
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
139
import io.temporal.serviceclient.StatusUtils;
140
import io.temporal.workflow.Functions;
141
import java.util.ArrayList;
142
import java.util.HashMap;
143
import java.util.List;
144
import java.util.Map;
145
import java.util.Objects;
146
import java.util.Optional;
147
import java.util.OptionalLong;
148
import java.util.UUID;
149
import java.util.concurrent.CompletableFuture;
150
import java.util.concurrent.ForkJoinPool;
151
import javax.annotation.Nonnull;
152
import org.slf4j.Logger;
153
import org.slf4j.LoggerFactory;
154

155
class StateMachines {
×
156

157
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
158

159
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
160
      10L * 365 * 24 * 3600 * 1000;
161
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
162
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
163
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
164
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
165
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
166
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
167
  static final int NO_EVENT_ID = -1;
168

169
  enum State {
1✔
170
    NONE,
1✔
171
    INITIATED,
1✔
172
    STARTED,
1✔
173
    FAILED,
1✔
174
    TIMED_OUT,
1✔
175
    CANCELLATION_REQUESTED,
1✔
176
    CANCELED,
1✔
177
    COMPLETED,
1✔
178
    CONTINUED_AS_NEW,
1✔
179
    TERMINATED,
1✔
180
  }
181

182
  enum Action {
1✔
183
    INITIATE,
1✔
184
    START,
1✔
185
    FAIL,
1✔
186
    TIME_OUT,
1✔
187
    REQUEST_CANCELLATION,
1✔
188
    CANCEL,
1✔
189
    TERMINATE,
1✔
190
    UPDATE,
1✔
191
    COMPLETE,
1✔
192
    CONTINUE_AS_NEW,
1✔
193
    QUERY
1✔
194
  }
195

196
  static final class WorkflowData {
197
    Optional<TestServiceRetryState> retryState;
198
    Duration backoffStartInterval;
199
    String cronSchedule;
200
    Payloads lastCompletionResult;
201
    Optional<Failure> lastFailure;
202
    /**
203
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
204
     */
205
    final @Nonnull String firstExecutionRunId;
206
    /**
207
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
208
     */
209
    final @Nonnull String originalExecutionRunId;
210

211
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
212
    Optional<String> continuedExecutionRunId;
213

214
    Functions.Proc runTimerCancellationHandle;
215

216
    WorkflowData(
217
        Optional<TestServiceRetryState> retryState,
218
        Duration backoffStartInterval,
219
        String cronSchedule,
220
        Payloads lastCompletionResult,
221
        Optional<Failure> lastFailure,
222
        @Nonnull String firstExecutionRunId,
223
        @Nonnull String originalExecutionRunId,
224
        Optional<String> continuedExecutionRunId) {
1✔
225
      this.retryState = retryState;
1✔
226
      this.backoffStartInterval = backoffStartInterval;
1✔
227
      this.cronSchedule = cronSchedule;
1✔
228
      this.lastCompletionResult = lastCompletionResult;
1✔
229
      this.firstExecutionRunId =
1✔
230
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
231
      this.originalExecutionRunId =
1✔
232
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
233
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
234
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
235
    }
1✔
236

237
    @Override
238
    public String toString() {
239
      return "WorkflowData{"
×
240
          + "retryState="
241
          + retryState
242
          + ", backoffStartInterval="
243
          + backoffStartInterval
244
          + ", cronSchedule='"
245
          + cronSchedule
246
          + '\''
247
          + ", lastCompletionResult="
248
          + lastCompletionResult
249
          + ", firstExecutionRunId='"
250
          + firstExecutionRunId
251
          + '\''
252
          + ", originalExecutionRunId='"
253
          + originalExecutionRunId
254
          + '\''
255
          + ", continuedExecutionRunId="
256
          + continuedExecutionRunId
257
          + '}';
258
    }
259
  }
260

261
  static final class WorkflowTaskData {
262

263
    final TestWorkflowStore store;
264

265
    boolean workflowCompleted;
266

267
    /** id of the last started event which completed successfully */
268
    long lastSuccessfulStartedEventId;
269

270
    final StartWorkflowExecutionRequest startRequest;
271

272
    long startedEventId = NO_EVENT_ID;
1✔
273

274
    PollWorkflowTaskQueueResponse.Builder workflowTask;
275

276
    /**
277
     * Events that are added during execution of a workflow task. They have to be buffered to be
278
     * added after the events generated by a workflow task. Without this the determinism will be
279
     * broken on replay.
280
     */
281
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
282

283
    long scheduledEventId = NO_EVENT_ID;
1✔
284

285
    int attempt = 0;
1✔
286

287
    /** Query requests received during workflow task processing (after start) */
288
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
289

290
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
291
        new HashMap<>();
292

293
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
294
      this.store = store;
1✔
295
      this.startRequest = startRequest;
1✔
296
    }
1✔
297

298
    void clear() {
299
      startedEventId = NO_EVENT_ID;
1✔
300
      workflowTask = null;
1✔
301
      scheduledEventId = NO_EVENT_ID;
1✔
302
      attempt = 0;
1✔
303
    }
1✔
304

305
    @Override
306
    public String toString() {
307
      return "WorkflowTaskData{"
×
308
          + "store="
309
          + store
310
          + ", workflowCompleted="
311
          + workflowCompleted
312
          + ", lastSuccessfulStartedEventId="
313
          + lastSuccessfulStartedEventId
314
          + ", startRequest="
315
          + startRequest
316
          + ", startedEventId="
317
          + startedEventId
318
          + ", workflowTask="
319
          + workflowTask
320
          + ", bufferedEvents="
321
          + bufferedEvents
322
          + ", scheduledEventId="
323
          + scheduledEventId
324
          + ", attempt="
325
          + attempt
326
          + ", queryBuffer="
327
          + queryBuffer
328
          + ", consistentQueryRequests="
329
          + consistentQueryRequests
330
          + '}';
331
    }
332
  }
333

334
  static final class ActivityTaskData {
335

336
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
337
    ActivityTaskScheduledEventAttributes scheduledEvent;
338
    ActivityTask activityTask;
339

340
    final TestWorkflowStore store;
341

342
    long scheduledEventId = NO_EVENT_ID;
1✔
343
    long startedEventId = NO_EVENT_ID;
1✔
344
    public HistoryEvent startedEvent;
345
    Payloads heartbeatDetails;
346
    long lastHeartbeatTime;
347
    TestServiceRetryState retryState;
348
    Duration nextBackoffInterval;
349
    String identity;
350

351
    ActivityTaskData(
352
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
353
      this.store = store;
1✔
354
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
355
    }
1✔
356

357
    @Override
358
    public String toString() {
359
      return "ActivityTaskData{"
×
360
          + "startWorkflowExecutionRequest="
361
          + startWorkflowExecutionRequest
362
          + ", scheduledEvent="
363
          + scheduledEvent
364
          + ", activityTask="
365
          + activityTask
366
          + ", store="
367
          + store
368
          + ", scheduledEventId="
369
          + scheduledEventId
370
          + ", startedEventId="
371
          + startedEventId
372
          + ", startedEvent="
373
          + startedEvent
374
          + ", heartbeatDetails="
375
          + heartbeatDetails
376
          + ", lastHeartbeatTime="
377
          + lastHeartbeatTime
378
          + ", retryState="
379
          + retryState
380
          + ", nextBackoffInterval="
381
          + nextBackoffInterval
382
          + '}';
383
    }
384

385
    public int getAttempt() {
386
      return retryState != null ? retryState.getAttempt() : 1;
1✔
387
    }
388
  }
389

390
  static final class SignalExternalData {
1✔
391
    long initiatedEventId = NO_EVENT_ID;
1✔
392
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
393

394
    @Override
395
    public String toString() {
396
      return "SignalExternalData{"
×
397
          + "initiatedEventId="
398
          + initiatedEventId
399
          + ", initiatedEvent="
400
          + initiatedEvent
401
          + '}';
402
    }
403
  }
404

405
  static final class CancelExternalData {
1✔
406
    long initiatedEventId = NO_EVENT_ID;
1✔
407
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
408

409
    @Override
410
    public String toString() {
411
      return "CancelExternalData{"
×
412
          + "initiatedEventId="
413
          + initiatedEventId
414
          + ", initiatedEvent="
415
          + initiatedEvent
416
          + '}';
417
    }
418
  }
419

420
  static final class ChildWorkflowData {
421

422
    final TestWorkflowService service;
423
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
424
    long initiatedEventId;
425
    long startedEventId;
426
    WorkflowExecution execution;
427

428
    public ChildWorkflowData(TestWorkflowService service) {
1✔
429
      this.service = service;
1✔
430
    }
1✔
431

432
    @Override
433
    public String toString() {
434
      return "ChildWorkflowData{"
×
435
          + "service="
436
          + service
437
          + ", initiatedEvent="
438
          + initiatedEvent
439
          + ", initiatedEventId="
440
          + initiatedEventId
441
          + ", startedEventId="
442
          + startedEventId
443
          + ", execution="
444
          + execution
445
          + '}';
446
    }
447
  }
448

449
  static final class TimerData {
1✔
450
    TimerStartedEventAttributes startedEvent;
451
    public long startedEventId;
452

453
    @Override
454
    public String toString() {
455
      return "TimerData{"
×
456
          + "startedEvent="
457
          + startedEvent
458
          + ", startedEventId="
459
          + startedEventId
460
          + '}';
461
    }
462
  }
463

464
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
465
    return new StateMachine<>(data)
1✔
466
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
467
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
468
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
469
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
470
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
471
        .add(
1✔
472
            STARTED,
473
            REQUEST_CANCELLATION,
474
            CANCELLATION_REQUESTED,
475
            StateMachines::requestWorkflowCancellation)
476
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
477
        .add(
1✔
478
            CANCELLATION_REQUESTED,
479
            REQUEST_CANCELLATION,
480
            CANCELLATION_REQUESTED,
481
            StateMachines::noop)
482
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
483
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
484
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
485
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
486
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
487
  }
488

489
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
490
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
491
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
492
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
493
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
494
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
495
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
496
        // StateMachines::queryWhileScheduled)
497
        //        .add(
498
        //            INITIATED_QUERY_ONLY,
499
        //            INITIATE,
500
        //            INITIATED,
501
        //            StateMachines::convertQueryWorkflowTaskToReal)
502
        //        .add(
503
        //            INITIATED_QUERY_ONLY,
504
        //            START,
505
        //            STARTED_QUERY_ONLY,
506
        //            StateMachines::startQueryOnlyWorkflowTask)
507
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
508
        // StateMachines::needsWorkflowTask)
509
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
510
        // StateMachines::needsWorkflowTaskDueToQuery)
511
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
512
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
513
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
514
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
515
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
516
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
517
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
518
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
519
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
520
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
521
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
522
  }
523

524
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
525
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
526
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
527
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
528
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
529
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
530
        .add(
1✔
531
            INITIATED,
532
            REQUEST_CANCELLATION,
533
            CANCELLATION_REQUESTED,
534
            StateMachines::requestActivityCancellation)
535
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
536
        // Transitions to initiated in case of a retry
537
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
538
        // Transitions to initiated in case of a retry
539
        .add(
1✔
540
            STARTED,
541
            TIME_OUT,
542
            new State[] {TIMED_OUT, INITIATED},
543
            StateMachines::timeoutActivityTask)
544
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
545
        .add(
1✔
546
            STARTED,
547
            REQUEST_CANCELLATION,
548
            CANCELLATION_REQUESTED,
549
            StateMachines::requestActivityCancellation)
550
        .add(
1✔
551
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
552
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
553
        .add(
1✔
554
            CANCELLATION_REQUESTED,
555
            UPDATE,
556
            CANCELLATION_REQUESTED,
557
            StateMachines::heartbeatActivityTask)
558
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
559
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
560
  }
561

562
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
563
      TestWorkflowService service) {
564
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
565
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
566
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
567
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
568
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
569
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
570
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
571
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
572
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
573
  }
574

575
  public static StateMachine<TimerData> newTimerStateMachine() {
576
    return new StateMachine<>(new TimerData())
1✔
577
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
578
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
579
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
580
  }
581

582
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
583
    return new StateMachine<>(new SignalExternalData())
1✔
584
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
585
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
586
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
587
  }
588

589
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
590
    return new StateMachine<>(new CancelExternalData())
1✔
591
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
592
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
593
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
594
  }
595

596
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
597

598
  private static void timeoutChildWorkflow(
599
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
600
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
601
    ChildWorkflowExecutionTimedOutEventAttributes a =
602
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
603
            .setNamespace(ie.getNamespace())
1✔
604
            .setStartedEventId(data.startedEventId)
1✔
605
            .setWorkflowExecution(data.execution)
1✔
606
            .setWorkflowType(ie.getWorkflowType())
1✔
607
            .setRetryState(retryState)
1✔
608
            .setInitiatedEventId(data.initiatedEventId)
1✔
609
            .build();
1✔
610
    HistoryEvent event =
611
        HistoryEvent.newBuilder()
1✔
612
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
613
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
614
            .build();
1✔
615
    ctx.addEvent(event);
1✔
616
  }
1✔
617

618
  private static void startChildWorkflowFailed(
619
      RequestContext ctx,
620
      ChildWorkflowData data,
621
      StartChildWorkflowExecutionFailedEventAttributes a,
622
      long notUsed) {
623
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
624
        a.toBuilder()
1✔
625
            .setInitiatedEventId(data.initiatedEventId)
1✔
626
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
627
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
628
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
629
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
630
    }
631
    HistoryEvent event =
632
        HistoryEvent.newBuilder()
1✔
633
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
634
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
635
            .build();
1✔
636
    ctx.addEvent(event);
1✔
637
  }
1✔
638

639
  private static void childWorkflowStarted(
640
      RequestContext ctx,
641
      ChildWorkflowData data,
642
      ChildWorkflowExecutionStartedEventAttributes a,
643
      long notUsed) {
644
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
645
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
646
    HistoryEvent event =
647
        HistoryEvent.newBuilder()
1✔
648
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
649
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
650
            .build();
1✔
651
    long startedEventId = ctx.addEvent(event);
1✔
652
    ctx.onCommit(
1✔
653
        (historySize) -> {
654
          data.startedEventId = startedEventId;
1✔
655
          data.execution = updatedAttr.getWorkflowExecution();
1✔
656
        });
1✔
657
  }
1✔
658

659
  private static void childWorkflowCompleted(
660
      RequestContext ctx,
661
      ChildWorkflowData data,
662
      ChildWorkflowExecutionCompletedEventAttributes a,
663
      long notUsed) {
664
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
665
        a.toBuilder()
1✔
666
            .setInitiatedEventId(data.initiatedEventId)
1✔
667
            .setStartedEventId(data.startedEventId)
1✔
668
            .build();
1✔
669
    HistoryEvent event =
670
        HistoryEvent.newBuilder()
1✔
671
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
672
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
673
            .build();
1✔
674
    ctx.addEvent(event);
1✔
675
  }
1✔
676

677
  private static void childWorkflowFailed(
678
      RequestContext ctx,
679
      ChildWorkflowData data,
680
      ChildWorkflowExecutionFailedEventAttributes a,
681
      long notUsed) {
682
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
683
        a.toBuilder()
1✔
684
            .setInitiatedEventId(data.initiatedEventId)
1✔
685
            .setStartedEventId(data.startedEventId)
1✔
686
            .setWorkflowExecution(data.execution)
1✔
687
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
688
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
689
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
690
    }
691
    HistoryEvent event =
692
        HistoryEvent.newBuilder()
1✔
693
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
694
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
695
            .build();
1✔
696
    ctx.addEvent(event);
1✔
697
  }
1✔
698

699
  private static void childWorkflowCanceled(
700
      RequestContext ctx,
701
      ChildWorkflowData data,
702
      ChildWorkflowExecutionCanceledEventAttributes a,
703
      long notUsed) {
704
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
705
        a.toBuilder()
1✔
706
            .setInitiatedEventId(data.initiatedEventId)
1✔
707
            .setStartedEventId(data.startedEventId)
1✔
708
            .build();
1✔
709
    HistoryEvent event =
710
        HistoryEvent.newBuilder()
1✔
711
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
712
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
713
            .build();
1✔
714
    ctx.addEvent(event);
1✔
715
  }
1✔
716

717
  private static void initiateChildWorkflow(
718
      RequestContext ctx,
719
      ChildWorkflowData data,
720
      StartChildWorkflowExecutionCommandAttributes d,
721
      long workflowTaskCompletedEventId) {
722
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
723
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
724
            .setControl(d.getControl())
1✔
725
            .setInput(d.getInput())
1✔
726
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
727
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
728
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
729
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
730
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
731
            .setTaskQueue(d.getTaskQueue())
1✔
732
            .setWorkflowId(d.getWorkflowId())
1✔
733
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
734
            .setWorkflowType(d.getWorkflowType())
1✔
735
            .setCronSchedule(d.getCronSchedule())
1✔
736
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
737
    if (d.hasHeader()) {
1✔
738
      a.setHeader(d.getHeader());
1✔
739
    }
740
    if (d.hasMemo()) {
1✔
741
      a.setMemo(d.getMemo());
1✔
742
    }
743
    if (d.hasRetryPolicy()) {
1✔
744
      a.setRetryPolicy(d.getRetryPolicy());
1✔
745
    }
746
    HistoryEvent event =
747
        HistoryEvent.newBuilder()
1✔
748
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
749
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
750
            .build();
1✔
751
    long initiatedEventId = ctx.addEvent(event);
1✔
752
    ctx.onCommit(
1✔
753
        (historySize) -> {
754
          data.initiatedEventId = initiatedEventId;
1✔
755
          data.initiatedEvent = a.build();
1✔
756
          StartWorkflowExecutionRequest.Builder startChild =
757
              StartWorkflowExecutionRequest.newBuilder()
1✔
758
                  .setRequestId(UUID.randomUUID().toString())
1✔
759
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
760
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
761
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
762
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
763
                  .setTaskQueue(d.getTaskQueue())
1✔
764
                  .setWorkflowId(d.getWorkflowId())
1✔
765
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
766
                  .setWorkflowType(d.getWorkflowType())
1✔
767
                  .setCronSchedule(d.getCronSchedule());
1✔
768
          if (d.hasHeader()) {
1✔
769
            startChild.setHeader(d.getHeader());
1✔
770
          }
771
          if (d.hasSearchAttributes()) {
1✔
772
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
773
          }
774
          if (d.hasMemo()) {
1✔
775
            startChild.setMemo(d.getMemo());
1✔
776
          }
777
          if (d.hasRetryPolicy()) {
1✔
778
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
779
          }
780
          if (d.hasInput()) {
1✔
781
            startChild.setInput(d.getInput());
1✔
782
          }
783
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
784
        });
1✔
785
  }
1✔
786

787
  private static void addStartChildTask(
788
      RequestContext ctx,
789
      ChildWorkflowData data,
790
      long initiatedEventId,
791
      StartWorkflowExecutionRequest startChild) {
792
    ForkJoinPool.commonPool()
1✔
793
        .execute(
1✔
794
            () -> {
795
              try {
796
                data.service.startWorkflowExecutionImpl(
1✔
797
                    startChild,
798
                    java.time.Duration.ZERO,
799
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
800
                    OptionalLong.of(data.initiatedEventId),
1✔
801
                    null);
802
              } catch (StatusRuntimeException e) {
1✔
803
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
804
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
805
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
806
                          .setInitiatedEventId(initiatedEventId)
1✔
807
                          .setCause(
1✔
808
                              StartChildWorkflowExecutionFailedCause
809
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
810
                          .build();
1✔
811
                  try {
812
                    ctx.getWorkflowMutableState()
1✔
813
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
814
                  } catch (Throwable ee) {
×
815
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
816
                  }
1✔
817
                } else {
1✔
818
                  log.error("Unexpected failure starting a child workflow", e);
×
819
                }
820
              } catch (Exception e) {
×
821
                log.error("Unexpected failure starting a child workflow", e);
×
822
              }
1✔
823
            });
1✔
824
  }
1✔
825

826
  private static void startWorkflow(
827
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
828
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
829
      throw Status.INVALID_ARGUMENT
×
830
          .withDescription("negative workflowExecution timeout")
×
831
          .asRuntimeException();
×
832
    }
833
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
834
      throw Status.INVALID_ARGUMENT
×
835
          .withDescription("negative workflowRun timeout")
×
836
          .asRuntimeException();
×
837
    }
838
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
839
      throw Status.INVALID_ARGUMENT
×
840
          .withDescription("negative workflowTaskTimeoutSeconds")
×
841
          .asRuntimeException();
×
842
    }
843

844
    WorkflowExecutionStartedEventAttributes.Builder a =
845
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
846
            .setWorkflowType(request.getWorkflowType())
1✔
847
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
848
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
849
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
850
            .setIdentity(request.getIdentity())
1✔
851
            .setInput(request.getInput())
1✔
852
            .setTaskQueue(request.getTaskQueue())
1✔
853
            .setAttempt(1);
1✔
854
    data.retryState.ifPresent(
1✔
855
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
856
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
857
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
858
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
859
    if (data.lastCompletionResult != null) {
1✔
860
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
861
    }
862
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
863
    if (request.hasMemo()) {
1✔
864
      a.setMemo(request.getMemo());
1✔
865
    }
866
    if (request.hasSearchAttributes()) {
1✔
867
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
868
    }
869
    if (request.hasHeader()) {
1✔
870
      a.setHeader(request.getHeader());
1✔
871
    }
872
    String cronSchedule = request.getCronSchedule();
1✔
873
    if (!cronSchedule.trim().isEmpty()) {
1✔
874
      try {
875
        CronUtils.parseCron(cronSchedule);
1✔
876
        a.setCronSchedule(cronSchedule);
1✔
877
      } catch (Exception e) {
×
878
        throw Status.INVALID_ARGUMENT
×
879
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
880
            .withCause(e)
×
881
            .asRuntimeException();
×
882
      }
1✔
883
    }
884
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
885
    if (parent.isPresent()) {
1✔
886
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
887
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
888
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
889
    }
890
    HistoryEvent event =
891
        HistoryEvent.newBuilder()
1✔
892
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
893
            .setWorkflowExecutionStartedEventAttributes(a)
1✔
894
            .build();
1✔
895
    ctx.addEvent(event);
1✔
896
  }
1✔
897

898
  private static void completeWorkflow(
899
      RequestContext ctx,
900
      WorkflowData data,
901
      CompleteWorkflowExecutionCommandAttributes d,
902
      long workflowTaskCompletedEventId) {
903
    WorkflowExecutionCompletedEventAttributes.Builder a =
904
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
905
            .setResult(d.getResult())
1✔
906
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
907
    HistoryEvent event =
908
        HistoryEvent.newBuilder()
1✔
909
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
910
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
911
            .build();
1✔
912
    ctx.addEvent(event);
1✔
913
  }
1✔
914

915
  private static void continueAsNewWorkflow(
916
      RequestContext ctx,
917
      WorkflowData data,
918
      ContinueAsNewWorkflowExecutionCommandAttributes d,
919
      long workflowTaskCompletedEventId) {
920
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
921
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
922
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
923
    a.setInput(d.getInput());
1✔
924
    if (d.hasHeader()) {
1✔
925
      a.setHeader(d.getHeader());
1✔
926
    }
927
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
928
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
929
    } else {
930
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
931
    }
932
    if (d.hasTaskQueue()) {
1✔
933
      a.setTaskQueue(d.getTaskQueue());
1✔
934
    } else {
935
      a.setTaskQueue(sr.getTaskQueue());
1✔
936
    }
937
    if (d.hasWorkflowType()) {
1✔
938
      a.setWorkflowType(d.getWorkflowType());
1✔
939
    } else {
940
      a.setWorkflowType(sr.getWorkflowType());
1✔
941
    }
942
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
943
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
944
    } else {
945
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
946
    }
947
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
948
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
949
    if (d.hasLastCompletionResult()) {
1✔
950
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
951
    }
952
    if (d.hasFailure()) {
1✔
953
      a.setFailure(d.getFailure());
1✔
954
    }
955
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
956
    HistoryEvent event =
957
        HistoryEvent.newBuilder()
1✔
958
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
959
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
960
            .build();
1✔
961
    ctx.addEvent(event);
1✔
962
  }
1✔
963

964
  private static void failWorkflow(
965
      RequestContext ctx,
966
      WorkflowData data,
967
      FailWorkflowExecutionCommandAttributes d,
968
      long workflowTaskCompletedEventId) {
969
    WorkflowExecutionFailedEventAttributes.Builder a =
970
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
971
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
972
    if (d.hasFailure()) {
1✔
973
      a.setFailure(d.getFailure());
1✔
974
    }
975
    HistoryEvent event =
976
        HistoryEvent.newBuilder()
1✔
977
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
978
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
979
            .build();
1✔
980
    ctx.addEvent(event);
1✔
981
  }
1✔
982

983
  private static void timeoutWorkflow(
984
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
985
    WorkflowExecutionTimedOutEventAttributes.Builder a =
986
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
987
    HistoryEvent event =
988
        HistoryEvent.newBuilder()
1✔
989
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
990
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
991
            .build();
1✔
992
    ctx.addEvent(event);
1✔
993
  }
1✔
994

995
  private static void cancelWorkflow(
996
      RequestContext ctx,
997
      WorkflowData data,
998
      CancelWorkflowExecutionCommandAttributes d,
999
      long workflowTaskCompletedEventId) {
1000
    WorkflowExecutionCanceledEventAttributes.Builder a =
1001
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1002
            .setDetails(d.getDetails())
1✔
1003
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1004
    HistoryEvent event =
1005
        HistoryEvent.newBuilder()
1✔
1006
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1007
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1008
            .build();
1✔
1009
    ctx.addEvent(event);
1✔
1010
  }
1✔
1011

1012
  private static void terminateWorkflow(
1013
      RequestContext ctx,
1014
      WorkflowData data,
1015
      TerminateWorkflowExecutionRequest d,
1016
      long workflowTaskCompletedEventId) {
1017
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1018
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1019
            .setDetails(d.getDetails())
1✔
1020
            .setIdentity(d.getIdentity())
1✔
1021
            .setReason(d.getReason());
1✔
1022
    HistoryEvent event =
1023
        HistoryEvent.newBuilder()
1✔
1024
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1025
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1026
            .build();
1✔
1027
    ctx.addEvent(event);
1✔
1028
  }
1✔
1029

1030
  private static void requestWorkflowCancellation(
1031
      RequestContext ctx,
1032
      WorkflowData data,
1033
      RequestCancelWorkflowExecutionRequest cancelRequest,
1034
      long notUsed) {
1035
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1036
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1037
            .setIdentity(cancelRequest.getIdentity());
1✔
1038
    HistoryEvent cancelRequested =
1039
        HistoryEvent.newBuilder()
1✔
1040
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1041
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1042
            .build();
1✔
1043
    ctx.addEvent(cancelRequested);
1✔
1044
  }
1✔
1045

1046
  private static void scheduleActivityTask(
1047
      RequestContext ctx,
1048
      ActivityTaskData data,
1049
      ScheduleActivityTaskCommandAttributes d,
1050
      long workflowTaskCompletedEventId) {
1051
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1052
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1053
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1054
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1055

1056
    ActivityTaskScheduledEventAttributes.Builder a =
1057
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1058
            .setInput(d.getInput())
1✔
1059
            .setActivityId(d.getActivityId())
1✔
1060
            .setActivityType(d.getActivityType())
1✔
1061
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1062
            .setRetryPolicy(retryPolicy)
1✔
1063
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1064
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1065
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1066
            .setTaskQueue(d.getTaskQueue())
1✔
1067
            .setHeader(d.getHeader())
1✔
1068
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1069

1070
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1071
    data.scheduledEvent = a.build();
1✔
1072
    HistoryEvent event =
1073
        HistoryEvent.newBuilder()
1✔
1074
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1075
            .setActivityTaskScheduledEventAttributes(a)
1✔
1076
            .build();
1✔
1077
    long scheduledEventId = ctx.addEvent(event);
1✔
1078

1079
    PollActivityTaskQueueResponse.Builder taskResponse =
1080
        PollActivityTaskQueueResponse.newBuilder()
1✔
1081
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1082
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1083
            .setActivityType(d.getActivityType())
1✔
1084
            .setWorkflowExecution(ctx.getExecution())
1✔
1085
            .setActivityId(d.getActivityId())
1✔
1086
            .setInput(d.getInput())
1✔
1087
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1088
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1089
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1090
            .setScheduledTime(ctx.currentTime())
1✔
1091
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1092
            .setHeader(d.getHeader())
1✔
1093
            .setAttempt(1);
1✔
1094

1095
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1096
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1097
    ctx.addActivityTask(activityTask);
1✔
1098
    ctx.onCommit(
1✔
1099
        (historySize) -> {
1100
          data.scheduledEventId = scheduledEventId;
1✔
1101
          data.activityTask = activityTask;
1✔
1102
          data.retryState = retryState;
1✔
1103
        });
1✔
1104
  }
1✔
1105

1106
  private static void requestActivityCancellation(
1107
      RequestContext ctx,
1108
      ActivityTaskData data,
1109
      RequestCancelActivityTaskCommandAttributes d,
1110
      long workflowTaskCompletedEventId) {
1111
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1112
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1113
            .setScheduledEventId(d.getScheduledEventId())
1✔
1114
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1115
    HistoryEvent event =
1116
        HistoryEvent.newBuilder()
1✔
1117
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1118
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1119
            .build();
1✔
1120
    ctx.addEvent(event);
1✔
1121
  }
1✔
1122

1123
  private static void scheduleWorkflowTask(
1124
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1125
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1126
    long scheduledEventId;
1127
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1128
    WorkflowTaskScheduledEventAttributes a =
1129
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1130
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1131
            .setTaskQueue(taskQueue)
1✔
1132
            .setAttempt(++data.attempt)
1✔
1133
            .build();
1✔
1134
    HistoryEvent event =
1135
        HistoryEvent.newBuilder()
1✔
1136
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1137
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1138
            .build();
1✔
1139
    scheduledEventId = ctx.addEvent(event);
1✔
1140
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1141
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1142
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1143
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1144
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1145
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1146
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1147
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1148
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1149
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1150
    ctx.onCommit(
1✔
1151
        (historySize) -> {
1152
          data.scheduledEventId = scheduledEventId;
1✔
1153
          data.workflowTask = workflowTaskResponse;
1✔
1154
        });
1✔
1155
  }
1✔
1156

1157
  private static void convertQueryWorkflowTaskToReal(
1158
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1159
    StartWorkflowExecutionRequest request = data.startRequest;
×
1160
    WorkflowTaskScheduledEventAttributes a =
1161
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1162
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1163
            .setTaskQueue(request.getTaskQueue())
×
1164
            .setAttempt(data.attempt)
×
1165
            .build();
×
1166
    HistoryEvent event =
1167
        HistoryEvent.newBuilder()
×
1168
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1169
            .setWorkflowTaskScheduledEventAttributes(a)
×
1170
            .build();
×
1171
    long scheduledEventId = ctx.addEvent(event);
×
1172
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1173
  }
×
1174

1175
  private static void scheduleQueryWorkflowTask(
1176
      RequestContext ctx,
1177
      WorkflowTaskData data,
1178
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1179
      long notUsed) {
1180
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1181
    StartWorkflowExecutionRequest request = data.startRequest;
×
1182
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1183
        PollWorkflowTaskQueueResponse.newBuilder();
×
1184
    StickyExecutionAttributes stickyAttributes =
×
1185
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1186
    String taskQueue =
1187
        stickyAttributes == null
×
1188
            ? request.getTaskQueue().getName()
×
1189
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1190
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1191
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1192
    workflowTaskResponse.setAttempt(++data.attempt);
×
1193
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1194
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1195
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1196
    ctx.onCommit(
×
1197
        (historySize) -> {
1198
          if (data.lastSuccessfulStartedEventId > 0) {
×
1199
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1200
          }
1201
          data.scheduledEventId = NO_EVENT_ID;
×
1202
          data.workflowTask = workflowTaskResponse;
×
1203
          if (query != null) {
×
1204
            data.consistentQueryRequests.put(query.getKey(), query);
×
1205
          }
1206
        });
×
1207
  }
×
1208

1209
  private static void queryWhileScheduled(
1210
      RequestContext ctx,
1211
      WorkflowTaskData data,
1212
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1213
      long notUsed) {
1214
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1215
  }
1✔
1216

1217
  private static void bufferQuery(
1218
      RequestContext ctx,
1219
      WorkflowTaskData data,
1220
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1221
      long notUsed) {
1222
    data.queryBuffer.put(query.getKey(), query);
1✔
1223
  }
1✔
1224

1225
  private static void startWorkflowTask(
1226
      RequestContext ctx,
1227
      WorkflowTaskData data,
1228
      PollWorkflowTaskQueueRequest request,
1229
      long notUsed) {
1230
    WorkflowTaskStartedEventAttributes a =
1231
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1232
            .setIdentity(request.getIdentity())
1✔
1233
            .setScheduledEventId(data.scheduledEventId)
1✔
1234
            .build();
1✔
1235
    HistoryEvent event =
1236
        HistoryEvent.newBuilder()
1✔
1237
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1238
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1239
            .build();
1✔
1240
    long startedEventId = ctx.addEvent(event);
1✔
1241
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1242
  }
1✔
1243

1244
  private static void startQueryOnlyWorkflowTask(
1245
      RequestContext ctx,
1246
      WorkflowTaskData data,
1247
      PollWorkflowTaskQueueRequest request,
1248
      long notUsed) {
1249
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1250
  }
×
1251

1252
  private static void startWorkflowTaskImpl(
1253
      RequestContext ctx,
1254
      WorkflowTaskData data,
1255
      PollWorkflowTaskQueueRequest request,
1256
      long startedEventId,
1257
      boolean queryOnly) {
1258
    ctx.onCommit(
1✔
1259
        (historySize) -> {
1260
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1261
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1262
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1263
          task.setTaskToken(taskToken.toBytes());
1✔
1264
          GetWorkflowExecutionHistoryRequest getRequest =
1265
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1266
                  .setNamespace(request.getNamespace())
1✔
1267
                  .setExecution(ctx.getExecution())
1✔
1268
                  .build();
1✔
1269
          List<HistoryEvent> events;
1270
          events =
1✔
1271
              data.store
1272
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1273
                  .getHistory()
1✔
1274
                  .getEventsList();
1✔
1275
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1276
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1277
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1278
          }
1279
          if (queryOnly && !data.workflowCompleted) {
1✔
1280
            events = new ArrayList<>(events); // convert list to mutable
×
1281
            // Add "fake" workflow task scheduled and started if workflow is not closed
1282
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1283
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1284
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1285
                    .setTaskQueue(request.getTaskQueue())
×
1286
                    .setAttempt(data.attempt)
×
1287
                    .build();
×
1288
            HistoryEvent scheduledEvent =
1289
                HistoryEvent.newBuilder()
×
1290
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1291
                    .setEventId(lastEventId + 1)
×
1292
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1293
                    .build();
×
1294
            events.add(scheduledEvent);
×
1295
            WorkflowTaskStartedEventAttributes startedAttributes =
1296
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1297
                    .setIdentity(request.getIdentity())
×
1298
                    .setScheduledEventId(lastEventId + 1)
×
1299
                    .build();
×
1300
            HistoryEvent startedEvent =
1301
                HistoryEvent.newBuilder()
×
1302
                    .setEventId(lastEventId + 1)
×
1303
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1304
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1305
                    .build();
×
1306
            events.add(startedEvent);
×
1307
            task.setStartedEventId(lastEventId + 2);
×
1308
          }
1309
          // get it from previous started event id.
1310
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1311
          // Transfer the queries
1312
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1313
              data.consistentQueryRequests;
1314
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1315
              queries.entrySet()) {
1✔
1316
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1317
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1318
          }
1✔
1319
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1320
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1321
          }
1322
          if (!queryOnly) {
1✔
1323
            data.startedEventId = startedEventId;
1✔
1324
          }
1325
        });
1✔
1326
  }
1✔
1327

1328
  private static void startActivityTask(
1329
      RequestContext ctx,
1330
      ActivityTaskData data,
1331
      PollActivityTaskQueueRequest request,
1332
      long notUsed) {
1333
    ActivityTaskStartedEventAttributes.Builder a =
1334
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1335
            .setIdentity(request.getIdentity())
1✔
1336
            .setScheduledEventId(data.scheduledEventId);
1✔
1337
    a.setAttempt(data.getAttempt());
1✔
1338
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1339
    // history. But in the case of retry it happens only after an activity completion.
1340
    Timestamp timestamp = data.store.currentTime();
1✔
1341
    HistoryEvent event =
1342
        HistoryEvent.newBuilder()
1✔
1343
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1344
            .setEventTime(timestamp)
1✔
1345
            .setActivityTaskStartedEventAttributes(a)
1✔
1346
            .build();
1✔
1347
    long startedEventId;
1348
    startedEventId = NO_EVENT_ID;
1✔
1349
    ctx.onCommit(
1✔
1350
        (historySize) -> {
1351
          data.startedEventId = startedEventId;
1✔
1352
          data.startedEvent = event;
1✔
1353
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1354
          task.setTaskToken(
1✔
1355
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1356
                  .toBytes());
1✔
1357
          task.setStartedTime(timestamp);
1✔
1358
        });
1✔
1359
  }
1✔
1360

1361
  private static void completeWorkflowTask(
1362
      RequestContext ctx,
1363
      WorkflowTaskData data,
1364
      RespondWorkflowTaskCompletedRequest request,
1365
      long notUsed) {
1366
    WorkflowTaskCompletedEventAttributes.Builder a =
1367
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1368
            .setIdentity(request.getIdentity())
1✔
1369
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1370
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1371
            .setScheduledEventId(data.scheduledEventId);
1✔
1372
    HistoryEvent event =
1373
        HistoryEvent.newBuilder()
1✔
1374
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1375
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1376
            .build();
1✔
1377
    ctx.addEvent(event);
1✔
1378
    ctx.onCommit(
1✔
1379
        (historySize) -> {
1380
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1381
          data.clear();
1✔
1382
        });
1✔
1383
  }
1✔
1384

1385
  private static void completeQuery(
1386
      RequestContext ctx,
1387
      WorkflowTaskData data,
1388
      RespondWorkflowTaskCompletedRequest request,
1389
      long notUsed) {
1390
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1391
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1392
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1393
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1394
      if (query != null) {
×
1395
        WorkflowQueryResult value = resultEntry.getValue();
×
1396
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1397
        switch (value.getResultType()) {
×
1398
          case QUERY_RESULT_TYPE_ANSWERED:
1399
            QueryWorkflowResponse response =
1400
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1401
            result.complete(response);
×
1402
            break;
×
1403
          case QUERY_RESULT_TYPE_FAILED:
1404
            result.completeExceptionally(
×
1405
                StatusUtils.newException(
×
1406
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1407
                    QueryFailedFailure.getDefaultInstance(),
×
1408
                    QueryFailedFailure.getDescriptor()));
×
1409
            break;
×
1410
          default:
1411
            throw Status.INVALID_ARGUMENT
×
1412
                .withDescription("Invalid query result type: " + value.getResultType())
×
1413
                .asRuntimeException();
×
1414
        }
1415
      }
1416
    }
×
1417
    ctx.onCommit(
×
1418
        (historySize) -> {
1419
          data.clear();
×
1420
          ctx.unlockTimer("completeQuery");
×
1421
        });
×
1422
  }
×
1423

1424
  private static void failQueryWorkflowTask(
1425
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1426
    data.consistentQueryRequests
×
1427
        .entrySet()
×
1428
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1429
    if (!data.consistentQueryRequests.isEmpty()) {
×
1430
      ctx.setNeedWorkflowTask(true);
×
1431
    }
1432
    ctx.unlockTimer("failQueryWorkflowTask");
×
1433
  }
×
1434

1435
  private static void failWorkflowTask(
1436
      RequestContext ctx,
1437
      WorkflowTaskData data,
1438
      RespondWorkflowTaskFailedRequest request,
1439
      long notUsed) {
1440
    WorkflowTaskFailedEventAttributes.Builder a =
1441
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1442
            .setIdentity(request.getIdentity())
1✔
1443
            .setStartedEventId(data.startedEventId)
1✔
1444
            .setScheduledEventId(data.scheduledEventId)
1✔
1445
            .setCause(request.getCause());
1✔
1446
    if (request.hasFailure()) {
1✔
1447
      a.setFailure(request.getFailure());
1✔
1448
    }
1449
    HistoryEvent event =
1450
        HistoryEvent.newBuilder()
1✔
1451
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1452
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1453
            .build();
1✔
1454
    ctx.addEvent(event);
1✔
1455
    ctx.setNeedWorkflowTask(true);
1✔
1456
  }
1✔
1457

1458
  private static void timeoutWorkflowTask(
1459
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1460
    WorkflowTaskTimedOutEventAttributes.Builder a =
1461
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1462
            .setStartedEventId(data.startedEventId)
1✔
1463
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1464
            .setScheduledEventId(data.scheduledEventId);
1✔
1465
    HistoryEvent event =
1466
        HistoryEvent.newBuilder()
1✔
1467
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1468
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1469
            .build();
1✔
1470
    ctx.addEvent(event);
1✔
1471
    ctx.setNeedWorkflowTask(true);
1✔
1472
  }
1✔
1473

1474
  private static void needsWorkflowTask(
1475
      RequestContext requestContext,
1476
      WorkflowTaskData workflowTaskData,
1477
      Object notUsedRequest,
1478
      long notUsed) {
1479
    requestContext.setNeedWorkflowTask(true);
1✔
1480
  }
1✔
1481

1482
  private static void completeActivityTask(
1483
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1484
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1485
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1486
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1487
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1488
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1489
    } else {
1490
      throw new IllegalArgumentException("Unknown request: " + request);
×
1491
    }
1492
  }
1✔
1493

1494
  private static void completeActivityTaskByTaskToken(
1495
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1496
    ActivityTaskCompletedEventAttributes.Builder a =
1497
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1498
            .setIdentity(request.getIdentity())
1✔
1499
            .setScheduledEventId(data.scheduledEventId)
1✔
1500
            .setResult(request.getResult())
1✔
1501
            .setIdentity(request.getIdentity())
1✔
1502
            .setStartedEventId(data.startedEventId);
1✔
1503
    HistoryEvent event =
1504
        HistoryEvent.newBuilder()
1✔
1505
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1506
            .setActivityTaskCompletedEventAttributes(a)
1✔
1507
            .build();
1✔
1508
    ctx.addEvent(event);
1✔
1509
  }
1✔
1510

1511
  private static void completeActivityTaskById(
1512
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1513
    ActivityTaskCompletedEventAttributes.Builder a =
1514
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1515
            .setIdentity(request.getIdentity())
×
1516
            .setScheduledEventId(data.scheduledEventId)
×
1517
            .setResult(request.getResult())
×
1518
            .setIdentity(request.getIdentity())
×
1519
            .setStartedEventId(data.startedEventId);
×
1520
    HistoryEvent event =
1521
        HistoryEvent.newBuilder()
×
1522
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1523
            .setActivityTaskCompletedEventAttributes(a)
×
1524
            .build();
×
1525
    ctx.addEvent(event);
×
1526
  }
×
1527

1528
  private static State failActivityTask(
1529
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1530
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1531
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1532
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1533
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1534
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1535
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1536
    } else {
1537
      throw new IllegalArgumentException("Unknown request: " + request);
×
1538
    }
1539
  }
1540

1541
  private static State failActivityTaskByRequestType(
1542
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1543
    if (!failure.hasApplicationFailureInfo()) {
1✔
1544
      throw new IllegalArgumentException(
×
1545
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1546
    }
1547
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1548
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1549
      return INITIATED;
1✔
1550
    }
1551
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1552
    ActivityTaskFailedEventAttributes.Builder attributes =
1553
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1554
            .setIdentity(identity)
1✔
1555
            .setScheduledEventId(data.scheduledEventId)
1✔
1556
            .setFailure(failure)
1✔
1557
            .setRetryState(retryState)
1✔
1558
            .setIdentity(identity)
1✔
1559
            .setStartedEventId(data.startedEventId);
1✔
1560
    HistoryEvent event =
1561
        HistoryEvent.newBuilder()
1✔
1562
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1563
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1564
            .build();
1✔
1565
    ctx.addEvent(event);
1✔
1566
    return FAILED;
1✔
1567
  }
1568

1569
  private static State timeoutActivityTask(
1570
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1571
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1572

1573
    // chaining with the previous run failure if we are preparing the final failure
1574
    Failure failure =
1✔
1575
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1576

1577
    RetryState retryState;
1578
    switch (timeoutType) {
1✔
1579
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1580
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1581
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1582
        // set a larger ScheduleToStart timeout.
1583
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1584
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
1585
        break;
1✔
1586
      case TIMEOUT_TYPE_START_TO_CLOSE:
1587
      case TIMEOUT_TYPE_HEARTBEAT:
1588
        // not chaining with the previous run failure if we are preparing the failure to be stored
1589
        // for the next iteration
1590
        Optional<Failure> lastFailure =
1✔
1591
            Optional.of(
1✔
1592
                newTimeoutFailure(
1✔
1593
                    timeoutType,
1594
                    // we move heartbeatDetails to the new top level (this cause is used for
1595
                    // scheduleToClose only)
1596
                    Optional.empty(),
1✔
1597
                    // prune to don't have too deep nesting of failures
1598
                    Optional.empty()));
1✔
1599

1600
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
1601
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1602
          return INITIATED;
1✔
1603
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
1604
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
1605
          // attemptActivityRetry();
1606
          // start to close timeout would return as "max attempts reached".
1607

1608
          Preconditions.checkState(
1✔
1609
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
1610
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
1611
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
1612
              timeoutType);
1613

1614
          // heartbeat is preserved as the cause for the scheduleToClose timeout
1615
          // But we effectively omit startToClose timeout with scheduleToClose timeout
1616
          Optional<Failure> cause =
1617
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
1618

1619
          failure =
1✔
1620
              newTimeoutFailure(
1✔
1621
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1622
                  Optional.ofNullable(data.heartbeatDetails),
1✔
1623
                  cause);
1624
        }
1✔
1625
        break;
1626
      default:
1627
        throw new IllegalStateException(
×
1628
            "Not implemented behavior for timeout type: " + timeoutType);
1629
    }
1630

1631
    ActivityTaskTimedOutEventAttributes.Builder a =
1632
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
1633
            .setScheduledEventId(data.scheduledEventId)
1✔
1634
            .setRetryState(retryState)
1✔
1635
            .setStartedEventId(data.startedEventId)
1✔
1636
            .setFailure(failure);
1✔
1637
    HistoryEvent event =
1638
        HistoryEvent.newBuilder()
1✔
1639
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
1640
            .setActivityTaskTimedOutEventAttributes(a)
1✔
1641
            .build();
1✔
1642
    ctx.addEvent(event);
1✔
1643
    return TIMED_OUT;
1✔
1644
  }
1645

1646
  private static Failure newTimeoutFailure(
1647
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
1648
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
1649
    if (lastHeartbeatDetails.isPresent()) {
1✔
1650
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
1651
    }
1652
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
1653
    if (cause.isPresent()) {
1✔
1654
      result.setCause(cause.get());
1✔
1655
    }
1656
    return result.build();
1✔
1657
  }
1658

1659
  private static RetryState attemptActivityRetry(
1660
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
1661
    if (data.retryState == null) {
1✔
1662
      throw new IllegalStateException("RetryPolicy is always present");
×
1663
    }
1664
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
1665
    if (info.isPresent()) {
1✔
1666
      if (info.get().getNonRetryable()) {
1✔
1667
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
1668
      }
1669
    }
1670
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
1671
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
1672
        data.retryState.getBackoffIntervalInSeconds(
1✔
1673
            info.map(ApplicationFailureInfo::getType), data.store.currentTime());
1✔
1674
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1675
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
1676
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1677
      if (data.heartbeatDetails != null) {
1✔
1678
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
1679
      }
1680
      ctx.onCommit(
1✔
1681
          (historySize) -> {
1682
            data.retryState = nextAttempt;
1✔
1683
            task.setAttempt(nextAttempt.getAttempt());
1✔
1684
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
1685
          });
1✔
1686
    } else {
1✔
1687
      data.nextBackoffInterval = Durations.ZERO;
1✔
1688
    }
1689
    return backoffInterval.getRetryState();
1✔
1690
  }
1691

1692
  private static void reportActivityTaskCancellation(
1693
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1694
    Payloads details = null;
1✔
1695
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
1696
      {
1697
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
×
1698

1699
        details = cr.hasDetails() ? cr.getDetails() : null;
×
1700
      }
×
1701
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
1702
      {
1703
        RespondActivityTaskCanceledByIdRequest cr =
×
1704
            (RespondActivityTaskCanceledByIdRequest) request;
1705
        details = cr.hasDetails() ? cr.getDetails() : null;
×
1706
      }
×
1707
    } else if (request != null) {
1✔
1708
      throw Status.INTERNAL
×
1709
          .withDescription("Unexpected request type: " + request)
×
1710
          .asRuntimeException();
×
1711
    }
1712
    ActivityTaskCanceledEventAttributes.Builder a =
1713
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
1714
            .setScheduledEventId(data.scheduledEventId)
1✔
1715
            .setStartedEventId(data.startedEventId);
1✔
1716
    if (details != null) {
1✔
1717
      a.setDetails(details);
×
1718
    }
1719
    HistoryEvent event =
1720
        HistoryEvent.newBuilder()
1✔
1721
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
1722
            .setActivityTaskCanceledEventAttributes(a)
1✔
1723
            .build();
1✔
1724
    ctx.addEvent(event);
1✔
1725
  }
1✔
1726

1727
  private static void heartbeatActivityTask(
1728
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
1729
    data.heartbeatDetails = details;
1✔
1730
  }
1✔
1731

1732
  private static void startTimer(
1733
      RequestContext ctx,
1734
      TimerData data,
1735
      StartTimerCommandAttributes d,
1736
      long workflowTaskCompletedEventId) {
1737
    TimerStartedEventAttributes.Builder a =
1738
        TimerStartedEventAttributes.newBuilder()
1✔
1739
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1740
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
1741
            .setTimerId(d.getTimerId());
1✔
1742
    HistoryEvent event =
1743
        HistoryEvent.newBuilder()
1✔
1744
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
1745
            .setTimerStartedEventAttributes(a)
1✔
1746
            .build();
1✔
1747
    long startedEventId = ctx.addEvent(event);
1✔
1748
    ctx.onCommit(
1✔
1749
        (historySize) -> {
1750
          data.startedEvent = a.build();
1✔
1751
          data.startedEventId = startedEventId;
1✔
1752
        });
1✔
1753
  }
1✔
1754

1755
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1756
    TimerFiredEventAttributes.Builder a =
1757
        TimerFiredEventAttributes.newBuilder()
1✔
1758
            .setTimerId(data.startedEvent.getTimerId())
1✔
1759
            .setStartedEventId(data.startedEventId);
1✔
1760
    HistoryEvent event =
1761
        HistoryEvent.newBuilder()
1✔
1762
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
1763
            .setTimerFiredEventAttributes(a)
1✔
1764
            .build();
1✔
1765
    ctx.addEvent(event);
1✔
1766
  }
1✔
1767

1768
  private static void cancelTimer(
1769
      RequestContext ctx,
1770
      TimerData data,
1771
      CancelTimerCommandAttributes d,
1772
      long workflowTaskCompletedEventId) {
1773
    TimerCanceledEventAttributes.Builder a =
1774
        TimerCanceledEventAttributes.newBuilder()
1✔
1775
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1776
            .setTimerId(d.getTimerId())
1✔
1777
            .setStartedEventId(data.startedEventId);
1✔
1778
    HistoryEvent event =
1779
        HistoryEvent.newBuilder()
1✔
1780
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
1781
            .setTimerCanceledEventAttributes(a)
1✔
1782
            .build();
1✔
1783
    ctx.addEvent(event);
1✔
1784
  }
1✔
1785

1786
  private static void initiateExternalSignal(
1787
      RequestContext ctx,
1788
      SignalExternalData data,
1789
      SignalExternalWorkflowExecutionCommandAttributes d,
1790
      long workflowTaskCompletedEventId) {
1791
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1792
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1793
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1794
            .setControl(d.getControl())
1✔
1795
            .setInput(d.getInput())
1✔
1796
            .setNamespace(d.getNamespace())
1✔
1797
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1798
            .setSignalName(d.getSignalName())
1✔
1799
            .setWorkflowExecution(d.getExecution());
1✔
1800

1801
    HistoryEvent event =
1802
        HistoryEvent.newBuilder()
1✔
1803
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1804
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1805
            .build();
1✔
1806
    long initiatedEventId = ctx.addEvent(event);
1✔
1807
    ctx.onCommit(
1✔
1808
        (historySize) -> {
1809
          data.initiatedEventId = initiatedEventId;
1✔
1810
          data.initiatedEvent = a.build();
1✔
1811
        });
1✔
1812
  }
1✔
1813

1814
  private static void failExternalSignal(
1815
      RequestContext ctx,
1816
      SignalExternalData data,
1817
      SignalExternalWorkflowExecutionFailedCause cause,
1818
      long notUsed) {
1819
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1820
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
1821
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1822
            .setInitiatedEventId(data.initiatedEventId)
1✔
1823
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
1824
            .setControl(initiatedEvent.getControl())
1✔
1825
            .setCause(cause)
1✔
1826
            .setNamespace(initiatedEvent.getNamespace());
1✔
1827
    HistoryEvent event =
1828
        HistoryEvent.newBuilder()
1✔
1829
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
1830
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
1831
            .build();
1✔
1832
    ctx.addEvent(event);
1✔
1833
  }
1✔
1834

1835
  private static void completeExternalSignal(
1836
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1837
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1838
    WorkflowExecution signaledExecution =
1✔
1839
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
1840
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
1841
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
1842
            .setInitiatedEventId(data.initiatedEventId)
1✔
1843
            .setWorkflowExecution(signaledExecution)
1✔
1844
            .setControl(initiatedEvent.getControl())
1✔
1845
            .setNamespace(initiatedEvent.getNamespace());
1✔
1846
    HistoryEvent event =
1847
        HistoryEvent.newBuilder()
1✔
1848
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
1849
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
1850
            .build();
1✔
1851
    ctx.addEvent(event);
1✔
1852
  }
1✔
1853

1854
  private static void initiateExternalCancellation(
1855
      RequestContext ctx,
1856
      CancelExternalData data,
1857
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
1858
      long workflowTaskCompletedEventId) {
1859
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1860
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1861
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1862
            .setControl(d.getControl())
1✔
1863
            .setNamespace(d.getNamespace())
1✔
1864
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1865
            .setWorkflowExecution(
1✔
1866
                WorkflowExecution.newBuilder()
1✔
1867
                    .setWorkflowId(d.getWorkflowId())
1✔
1868
                    .setRunId(d.getRunId())
1✔
1869
                    .build());
1✔
1870

1871
    HistoryEvent event =
1872
        HistoryEvent.newBuilder()
1✔
1873
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1874
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1875
            .build();
1✔
1876
    long initiatedEventId = ctx.addEvent(event);
1✔
1877
    ctx.onCommit(
1✔
1878
        (historySize) -> {
1879
          data.initiatedEventId = initiatedEventId;
1✔
1880
          data.initiatedEvent = a.build();
1✔
1881
        });
1✔
1882
  }
1✔
1883

1884
  private static void reportExternalCancellationRequested(
1885
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
1886
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
1887
        data.initiatedEvent;
1888
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
1889
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1890
            .setInitiatedEventId(data.initiatedEventId)
1✔
1891
            .setWorkflowExecution(
1✔
1892
                WorkflowExecution.newBuilder()
1✔
1893
                    .setRunId(runId)
1✔
1894
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
1895
                    .build())
1✔
1896
            .setNamespace(initiatedEvent.getNamespace());
1✔
1897
    HistoryEvent event =
1898
        HistoryEvent.newBuilder()
1✔
1899
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1900
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1901
            .build();
1✔
1902
    ctx.addEvent(event);
1✔
1903
  }
1✔
1904

1905
  private static void failExternalCancellation(
1906
      RequestContext ctx,
1907
      CancelExternalData data,
1908
      CancelExternalWorkflowExecutionFailedCause cause,
1909
      long notUsed) {
1910
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
1911
        data.initiatedEvent;
1912
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
1913
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
1914
            .setInitiatedEventId(data.initiatedEventId)
×
1915
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
1916
            .setControl(initiatedEvent.getControl())
×
1917
            .setCause(cause)
×
1918
            .setNamespace(initiatedEvent.getNamespace());
×
1919
    HistoryEvent event =
1920
        HistoryEvent.newBuilder()
×
1921
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
1922
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
1923
            .build();
×
1924
    ctx.addEvent(event);
×
1925
  }
×
1926

1927
  // Mimics the default activity retry policy of a standard Temporal server.
1928
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
1929
    Duration initialInterval =
1930
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
1931
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
1932
            : originalPolicy.getInitialInterval();
1✔
1933

1934
    return RetryPolicy.newBuilder()
1✔
1935
        .setInitialInterval(initialInterval)
1✔
1936
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
1937
        .setMaximumInterval(
1✔
1938
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
1939
                ? Durations.fromMillis(
1✔
1940
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
1941
                        * Durations.toMillis(initialInterval))
1✔
1942
                : originalPolicy.getMaximumInterval())
1✔
1943
        .setBackoffCoefficient(
1✔
1944
            originalPolicy.getBackoffCoefficient() == 0
1✔
1945
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
1946
                : originalPolicy.getBackoffCoefficient())
1✔
1947
        .setMaximumAttempts(
1✔
1948
            originalPolicy.getMaximumAttempts() == 0
1✔
1949
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
1950
                : originalPolicy.getMaximumAttempts())
1✔
1951
        .build();
1✔
1952
  }
1953
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc