• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #189

26 Sep 2023 02:27PM CUT coverage: 77.349% (-0.02%) from 77.369%
#189

push

github-actions

web-flow
Fix workflow options equality (#1868)

1 of 1 new or added line in 1 file covered. (100.0%)

18665 of 24131 relevant lines covered (77.35%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.81
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
37
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
38
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
39
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
40
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
41
import static io.temporal.internal.testservice.StateMachines.State.NONE;
42
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
43
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
44
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
45

46
import com.google.common.base.Preconditions;
47
import com.google.protobuf.Any;
48
import com.google.protobuf.Duration;
49
import com.google.protobuf.InvalidProtocolBufferException;
50
import com.google.protobuf.Timestamp;
51
import com.google.protobuf.util.Durations;
52
import com.google.protobuf.util.Timestamps;
53
import io.grpc.Status;
54
import io.grpc.StatusRuntimeException;
55
import io.temporal.api.command.v1.CancelTimerCommandAttributes;
56
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
57
import io.temporal.api.command.v1.CompleteWorkflowExecutionCommandAttributes;
58
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
59
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
60
import io.temporal.api.command.v1.RequestCancelActivityTaskCommandAttributes;
61
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
62
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
63
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
64
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
65
import io.temporal.api.command.v1.StartTimerCommandAttributes;
66
import io.temporal.api.common.v1.Payloads;
67
import io.temporal.api.common.v1.RetryPolicy;
68
import io.temporal.api.common.v1.WorkflowExecution;
69
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
70
import io.temporal.api.enums.v1.EventType;
71
import io.temporal.api.enums.v1.RetryState;
72
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
73
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
74
import io.temporal.api.enums.v1.TimeoutType;
75
import io.temporal.api.errordetails.v1.QueryFailedFailure;
76
import io.temporal.api.failure.v1.ApplicationFailureInfo;
77
import io.temporal.api.failure.v1.Failure;
78
import io.temporal.api.failure.v1.TimeoutFailureInfo;
79
import io.temporal.api.history.v1.*;
80
import io.temporal.api.protocol.v1.Message;
81
import io.temporal.api.query.v1.WorkflowQueryResult;
82
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
83
import io.temporal.api.taskqueue.v1.TaskQueue;
84
import io.temporal.api.update.v1.*;
85
import io.temporal.api.workflowservice.v1.*;
86
import io.temporal.internal.common.ProtobufTimeUtils;
87
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
88
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
89
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
90
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
91
import io.temporal.serviceclient.StatusUtils;
92
import io.temporal.workflow.Functions;
93
import java.util.*;
94
import java.util.concurrent.CompletableFuture;
95
import java.util.concurrent.ForkJoinPool;
96
import javax.annotation.Nonnull;
97
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
99

100
class StateMachines {
×
101

102
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
103

104
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
105
      10L * 365 * 24 * 3600 * 1000;
106
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
107
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
108
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
109
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
110
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
111
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
112
  static final int NO_EVENT_ID = -1;
113

114
  enum State {
1✔
115
    NONE,
1✔
116
    INITIATED,
1✔
117
    STARTED,
1✔
118
    FAILED,
1✔
119
    TIMED_OUT,
1✔
120
    CANCELLATION_REQUESTED,
1✔
121
    CANCELED,
1✔
122
    COMPLETED,
1✔
123
    CONTINUED_AS_NEW,
1✔
124
    TERMINATED,
1✔
125
  }
126

127
  enum Action {
1✔
128
    INITIATE,
1✔
129
    START,
1✔
130
    FAIL,
1✔
131
    TIME_OUT,
1✔
132
    REQUEST_CANCELLATION,
1✔
133
    CANCEL,
1✔
134
    TERMINATE,
1✔
135
    UPDATE,
1✔
136
    COMPLETE,
1✔
137
    CONTINUE_AS_NEW,
1✔
138
    QUERY,
1✔
139
    UPDATE_WORKFLOW_EXECUTION,
1✔
140
  }
141

142
  static final class WorkflowData {
143
    Optional<TestServiceRetryState> retryState;
144
    Duration backoffStartInterval;
145
    String cronSchedule;
146
    Payloads lastCompletionResult;
147
    Optional<Failure> lastFailure;
148
    /**
149
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
150
     */
151
    final @Nonnull String firstExecutionRunId;
152
    /**
153
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
154
     */
155
    final @Nonnull String originalExecutionRunId;
156

157
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
158
    Optional<String> continuedExecutionRunId;
159

160
    Functions.Proc runTimerCancellationHandle;
161

162
    WorkflowData(
163
        Optional<TestServiceRetryState> retryState,
164
        Duration backoffStartInterval,
165
        String cronSchedule,
166
        Payloads lastCompletionResult,
167
        Optional<Failure> lastFailure,
168
        @Nonnull String firstExecutionRunId,
169
        @Nonnull String originalExecutionRunId,
170
        Optional<String> continuedExecutionRunId) {
1✔
171
      this.retryState = retryState;
1✔
172
      this.backoffStartInterval = backoffStartInterval;
1✔
173
      this.cronSchedule = cronSchedule;
1✔
174
      this.lastCompletionResult = lastCompletionResult;
1✔
175
      this.firstExecutionRunId =
1✔
176
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
177
      this.originalExecutionRunId =
1✔
178
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
179
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
180
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
181
    }
1✔
182

183
    @Override
184
    public String toString() {
185
      return "WorkflowData{"
×
186
          + "retryState="
187
          + retryState
188
          + ", backoffStartInterval="
189
          + backoffStartInterval
190
          + ", cronSchedule='"
191
          + cronSchedule
192
          + '\''
193
          + ", lastCompletionResult="
194
          + lastCompletionResult
195
          + ", firstExecutionRunId='"
196
          + firstExecutionRunId
197
          + '\''
198
          + ", originalExecutionRunId='"
199
          + originalExecutionRunId
200
          + '\''
201
          + ", continuedExecutionRunId="
202
          + continuedExecutionRunId
203
          + '}';
204
    }
205
  }
206

207
  static final class WorkflowTaskData {
208

209
    final TestWorkflowStore store;
210

211
    boolean workflowCompleted;
212

213
    /** id of the last started event which completed successfully */
214
    long lastSuccessfulStartedEventId;
215

216
    final StartWorkflowExecutionRequest startRequest;
217

218
    long startedEventId = NO_EVENT_ID;
1✔
219

220
    PollWorkflowTaskQueueResponse.Builder workflowTask;
221

222
    /**
223
     * Events that are added during execution of a workflow task. They have to be buffered to be
224
     * added after the events generated by a workflow task. Without this the determinism will be
225
     * broken on replay.
226
     */
227
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
228

229
    /**
230
     * Update requests that are added during execution of a workflow task. They have to be buffered
231
     * to be added to the next workflow task.
232
     */
233
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
234

235
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
236

237
    long scheduledEventId = NO_EVENT_ID;
1✔
238

239
    int attempt = 0;
1✔
240

241
    /** Query requests received during workflow task processing (after start) */
242
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
243

244
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
245
        new HashMap<>();
246

247
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
248
      this.store = store;
1✔
249
      this.startRequest = startRequest;
1✔
250
    }
1✔
251

252
    void clear() {
253
      startedEventId = NO_EVENT_ID;
1✔
254
      workflowTask = null;
1✔
255
      scheduledEventId = NO_EVENT_ID;
1✔
256
      attempt = 0;
1✔
257
    }
1✔
258

259
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
260
      return Optional.ofNullable(
1✔
261
          updateRequest.getOrDefault(
1✔
262
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
263
    }
264

265
    @Override
266
    public String toString() {
267
      return "WorkflowTaskData{"
×
268
          + "store="
269
          + store
270
          + ", workflowCompleted="
271
          + workflowCompleted
272
          + ", lastSuccessfulStartedEventId="
273
          + lastSuccessfulStartedEventId
274
          + ", startRequest="
275
          + startRequest
276
          + ", startedEventId="
277
          + startedEventId
278
          + ", workflowTask="
279
          + workflowTask
280
          + ", bufferedEvents="
281
          + bufferedEvents
282
          + ", scheduledEventId="
283
          + scheduledEventId
284
          + ", attempt="
285
          + attempt
286
          + ", queryBuffer="
287
          + queryBuffer
288
          + ", consistentQueryRequests="
289
          + consistentQueryRequests
290
          + ", updateRequest="
291
          + updateRequest
292
          + ", updateRequestBuffer="
293
          + updateRequestBuffer
294
          + '}';
295
    }
296
  }
297

298
  static final class ActivityTaskData {
299

300
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
301
    ActivityTaskScheduledEventAttributes scheduledEvent;
302
    ActivityTask activityTask;
303

304
    final TestWorkflowStore store;
305

306
    long scheduledEventId = NO_EVENT_ID;
1✔
307
    long startedEventId = NO_EVENT_ID;
1✔
308
    public HistoryEvent startedEvent;
309
    Payloads heartbeatDetails;
310
    long lastHeartbeatTime;
311
    TestServiceRetryState retryState;
312
    Duration nextBackoffInterval;
313
    String identity;
314

315
    ActivityTaskData(
316
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
317
      this.store = store;
1✔
318
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
319
    }
1✔
320

321
    @Override
322
    public String toString() {
323
      return "ActivityTaskData{"
×
324
          + "startWorkflowExecutionRequest="
325
          + startWorkflowExecutionRequest
326
          + ", scheduledEvent="
327
          + scheduledEvent
328
          + ", activityTask="
329
          + activityTask
330
          + ", store="
331
          + store
332
          + ", scheduledEventId="
333
          + scheduledEventId
334
          + ", startedEventId="
335
          + startedEventId
336
          + ", startedEvent="
337
          + startedEvent
338
          + ", heartbeatDetails="
339
          + heartbeatDetails
340
          + ", lastHeartbeatTime="
341
          + lastHeartbeatTime
342
          + ", retryState="
343
          + retryState
344
          + ", nextBackoffInterval="
345
          + nextBackoffInterval
346
          + '}';
347
    }
348

349
    public int getAttempt() {
350
      return retryState != null ? retryState.getAttempt() : 1;
1✔
351
    }
352
  }
353

354
  static final class SignalExternalData {
1✔
355
    long initiatedEventId = NO_EVENT_ID;
1✔
356
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
357

358
    @Override
359
    public String toString() {
360
      return "SignalExternalData{"
×
361
          + "initiatedEventId="
362
          + initiatedEventId
363
          + ", initiatedEvent="
364
          + initiatedEvent
365
          + '}';
366
    }
367
  }
368

369
  static final class CancelExternalData {
1✔
370
    long initiatedEventId = NO_EVENT_ID;
1✔
371
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
372

373
    @Override
374
    public String toString() {
375
      return "CancelExternalData{"
×
376
          + "initiatedEventId="
377
          + initiatedEventId
378
          + ", initiatedEvent="
379
          + initiatedEvent
380
          + '}';
381
    }
382
  }
383

384
  static final class ChildWorkflowData {
385

386
    final TestWorkflowService service;
387
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
388
    long initiatedEventId;
389
    long startedEventId;
390
    WorkflowExecution execution;
391

392
    public ChildWorkflowData(TestWorkflowService service) {
1✔
393
      this.service = service;
1✔
394
    }
1✔
395

396
    @Override
397
    public String toString() {
398
      return "ChildWorkflowData{"
×
399
          + "service="
400
          + service
401
          + ", initiatedEvent="
402
          + initiatedEvent
403
          + ", initiatedEventId="
404
          + initiatedEventId
405
          + ", startedEventId="
406
          + startedEventId
407
          + ", execution="
408
          + execution
409
          + '}';
410
    }
411
  }
412

413
  static final class TimerData {
1✔
414
    TimerStartedEventAttributes startedEvent;
415
    public long startedEventId;
416

417
    @Override
418
    public String toString() {
419
      return "TimerData{"
×
420
          + "startedEvent="
421
          + startedEvent
422
          + ", startedEventId="
423
          + startedEventId
424
          + '}';
425
    }
426
  }
427

428
  /** Represents an accepted update workflow execution request */
429
  static final class UpdateWorkflowExecutionData {
430
    final String id;
431
    final CompletableFuture<UpdateWorkflowExecutionResponse> acceptance;
432
    final CompletableFuture<UpdateWorkflowExecutionResponse> complete;
433
    final Request initialRequest;
434

435
    public UpdateWorkflowExecutionData(
436
        String id,
437
        Request initialRequest,
438
        CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
439
        CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
1✔
440
      this.id = id;
1✔
441
      this.initialRequest = initialRequest;
1✔
442
      this.acceptance = acceptance;
1✔
443
      this.complete = complete;
1✔
444
    }
1✔
445

446
    @Override
447
    public String toString() {
448
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
449
    }
450
  }
451

452
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
453
    return new StateMachine<>(data)
1✔
454
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
455
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
456
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
457
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
458
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
459
        .add(
1✔
460
            STARTED,
461
            REQUEST_CANCELLATION,
462
            CANCELLATION_REQUESTED,
463
            StateMachines::requestWorkflowCancellation)
464
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
465
        .add(
1✔
466
            CANCELLATION_REQUESTED,
467
            REQUEST_CANCELLATION,
468
            CANCELLATION_REQUESTED,
469
            StateMachines::noop)
470
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
471
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
472
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
473
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
474
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
475
  }
476

477
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
478
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
479
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
480
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
481
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
482
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
483
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
484
        // StateMachines::queryWhileScheduled)
485
        //        .add(
486
        //            INITIATED_QUERY_ONLY,
487
        //            INITIATE,
488
        //            INITIATED,
489
        //            StateMachines::convertQueryWorkflowTaskToReal)
490
        //        .add(
491
        //            INITIATED_QUERY_ONLY,
492
        //            START,
493
        //            STARTED_QUERY_ONLY,
494
        //            StateMachines::startQueryOnlyWorkflowTask)
495
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
496
        // StateMachines::needsWorkflowTask)
497
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
498
        // StateMachines::needsWorkflowTaskDueToQuery)
499
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
500
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
501
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
502
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
503
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
504
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
505
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
506
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
507
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
508
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
509
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
510
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
511
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
512
  }
513

514
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
515
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
516
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
517
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
518
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
519
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
520
        .add(
1✔
521
            INITIATED,
522
            REQUEST_CANCELLATION,
523
            CANCELLATION_REQUESTED,
524
            StateMachines::requestActivityCancellation)
525
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
526
        // Transitions to initiated in case of a retry
527
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
528
        // Transitions to initiated in case of a retry
529
        .add(
1✔
530
            STARTED,
531
            TIME_OUT,
532
            new State[] {TIMED_OUT, INITIATED},
533
            StateMachines::timeoutActivityTask)
534
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
535
        .add(
1✔
536
            STARTED,
537
            REQUEST_CANCELLATION,
538
            CANCELLATION_REQUESTED,
539
            StateMachines::requestActivityCancellation)
540
        .add(
1✔
541
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
542
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
543
        .add(
1✔
544
            CANCELLATION_REQUESTED,
545
            UPDATE,
546
            CANCELLATION_REQUESTED,
547
            StateMachines::heartbeatActivityTask)
548
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
549
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
550
  }
551

552
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
553
      TestWorkflowService service) {
554
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
555
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
556
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
557
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
558
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
559
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
560
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
561
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
562
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
563
  }
564

565
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
566
      String updateId,
567
      Request initialRequest,
568
      CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
569
      CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
570
    return new StateMachine<>(
1✔
571
            new UpdateWorkflowExecutionData(updateId, initialRequest, acceptance, complete))
572
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
573
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
574
  }
575

576
  public static StateMachine<TimerData> newTimerStateMachine() {
577
    return new StateMachine<>(new TimerData())
1✔
578
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
579
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
580
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
581
  }
582

583
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
584
    return new StateMachine<>(new SignalExternalData())
1✔
585
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
586
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
587
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
588
  }
589

590
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
591
    return new StateMachine<>(new CancelExternalData())
1✔
592
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
593
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
594
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
595
  }
596

597
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
598

599
  private static void timeoutChildWorkflow(
600
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
601
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
602
    ChildWorkflowExecutionTimedOutEventAttributes a =
603
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
604
            .setNamespace(ie.getNamespace())
1✔
605
            .setStartedEventId(data.startedEventId)
1✔
606
            .setWorkflowExecution(data.execution)
1✔
607
            .setWorkflowType(ie.getWorkflowType())
1✔
608
            .setRetryState(retryState)
1✔
609
            .setInitiatedEventId(data.initiatedEventId)
1✔
610
            .build();
1✔
611
    HistoryEvent event =
612
        HistoryEvent.newBuilder()
1✔
613
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
614
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
615
            .build();
1✔
616
    ctx.addEvent(event);
1✔
617
  }
1✔
618

619
  private static void startChildWorkflowFailed(
620
      RequestContext ctx,
621
      ChildWorkflowData data,
622
      StartChildWorkflowExecutionFailedEventAttributes a,
623
      long notUsed) {
624
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
625
        a.toBuilder()
1✔
626
            .setInitiatedEventId(data.initiatedEventId)
1✔
627
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
628
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
629
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
630
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
631
    }
632
    HistoryEvent event =
633
        HistoryEvent.newBuilder()
1✔
634
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
635
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
636
            .build();
1✔
637
    ctx.addEvent(event);
1✔
638
  }
1✔
639

640
  private static void childWorkflowStarted(
641
      RequestContext ctx,
642
      ChildWorkflowData data,
643
      ChildWorkflowExecutionStartedEventAttributes a,
644
      long notUsed) {
645
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
646
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
647
    HistoryEvent event =
648
        HistoryEvent.newBuilder()
1✔
649
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
650
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
651
            .build();
1✔
652
    long startedEventId = ctx.addEvent(event);
1✔
653
    ctx.onCommit(
1✔
654
        (historySize) -> {
655
          data.startedEventId = startedEventId;
1✔
656
          data.execution = updatedAttr.getWorkflowExecution();
1✔
657
        });
1✔
658
  }
1✔
659

660
  private static void childWorkflowCompleted(
661
      RequestContext ctx,
662
      ChildWorkflowData data,
663
      ChildWorkflowExecutionCompletedEventAttributes a,
664
      long notUsed) {
665
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
666
        a.toBuilder()
1✔
667
            .setInitiatedEventId(data.initiatedEventId)
1✔
668
            .setStartedEventId(data.startedEventId)
1✔
669
            .build();
1✔
670
    HistoryEvent event =
671
        HistoryEvent.newBuilder()
1✔
672
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
673
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
674
            .build();
1✔
675
    ctx.addEvent(event);
1✔
676
  }
1✔
677

678
  private static void childWorkflowFailed(
679
      RequestContext ctx,
680
      ChildWorkflowData data,
681
      ChildWorkflowExecutionFailedEventAttributes a,
682
      long notUsed) {
683
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
684
        a.toBuilder()
1✔
685
            .setInitiatedEventId(data.initiatedEventId)
1✔
686
            .setStartedEventId(data.startedEventId)
1✔
687
            .setWorkflowExecution(data.execution)
1✔
688
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
689
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
690
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
691
    }
692
    HistoryEvent event =
693
        HistoryEvent.newBuilder()
1✔
694
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
695
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
696
            .build();
1✔
697
    ctx.addEvent(event);
1✔
698
  }
1✔
699

700
  private static void childWorkflowCanceled(
701
      RequestContext ctx,
702
      ChildWorkflowData data,
703
      ChildWorkflowExecutionCanceledEventAttributes a,
704
      long notUsed) {
705
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
706
        a.toBuilder()
1✔
707
            .setInitiatedEventId(data.initiatedEventId)
1✔
708
            .setStartedEventId(data.startedEventId)
1✔
709
            .build();
1✔
710
    HistoryEvent event =
711
        HistoryEvent.newBuilder()
1✔
712
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
713
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
714
            .build();
1✔
715
    ctx.addEvent(event);
1✔
716
  }
1✔
717

718
  private static void initiateChildWorkflow(
719
      RequestContext ctx,
720
      ChildWorkflowData data,
721
      StartChildWorkflowExecutionCommandAttributes d,
722
      long workflowTaskCompletedEventId) {
723
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
724
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
725
            .setControl(d.getControl())
1✔
726
            .setInput(d.getInput())
1✔
727
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
728
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
729
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
730
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
731
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
732
            .setTaskQueue(d.getTaskQueue())
1✔
733
            .setWorkflowId(d.getWorkflowId())
1✔
734
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
735
            .setWorkflowType(d.getWorkflowType())
1✔
736
            .setCronSchedule(d.getCronSchedule())
1✔
737
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
738
    if (d.hasHeader()) {
1✔
739
      a.setHeader(d.getHeader());
1✔
740
    }
741
    if (d.hasMemo()) {
1✔
742
      a.setMemo(d.getMemo());
1✔
743
    }
744
    if (d.hasRetryPolicy()) {
1✔
745
      a.setRetryPolicy(d.getRetryPolicy());
1✔
746
    }
747
    HistoryEvent event =
748
        HistoryEvent.newBuilder()
1✔
749
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
750
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
751
            .build();
1✔
752
    long initiatedEventId = ctx.addEvent(event);
1✔
753
    ctx.onCommit(
1✔
754
        (historySize) -> {
755
          data.initiatedEventId = initiatedEventId;
1✔
756
          data.initiatedEvent = a.build();
1✔
757
          StartWorkflowExecutionRequest.Builder startChild =
758
              StartWorkflowExecutionRequest.newBuilder()
1✔
759
                  .setRequestId(UUID.randomUUID().toString())
1✔
760
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
761
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
762
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
763
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
764
                  .setTaskQueue(d.getTaskQueue())
1✔
765
                  .setWorkflowId(d.getWorkflowId())
1✔
766
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
767
                  .setWorkflowType(d.getWorkflowType())
1✔
768
                  .setCronSchedule(d.getCronSchedule());
1✔
769
          if (d.hasHeader()) {
1✔
770
            startChild.setHeader(d.getHeader());
1✔
771
          }
772
          if (d.hasSearchAttributes()) {
1✔
773
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
774
          }
775
          if (d.hasMemo()) {
1✔
776
            startChild.setMemo(d.getMemo());
1✔
777
          }
778
          if (d.hasRetryPolicy()) {
1✔
779
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
780
          }
781
          if (d.hasInput()) {
1✔
782
            startChild.setInput(d.getInput());
1✔
783
          }
784
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
785
        });
1✔
786
  }
1✔
787

788
  private static void addStartChildTask(
789
      RequestContext ctx,
790
      ChildWorkflowData data,
791
      long initiatedEventId,
792
      StartWorkflowExecutionRequest startChild) {
793
    ForkJoinPool.commonPool()
1✔
794
        .execute(
1✔
795
            () -> {
796
              try {
797
                data.service.startWorkflowExecutionImpl(
1✔
798
                    startChild,
799
                    java.time.Duration.ZERO,
800
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
801
                    OptionalLong.of(data.initiatedEventId),
1✔
802
                    null);
803
              } catch (StatusRuntimeException e) {
1✔
804
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
805
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
806
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
807
                          .setInitiatedEventId(initiatedEventId)
1✔
808
                          .setCause(
1✔
809
                              StartChildWorkflowExecutionFailedCause
810
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
811
                          .build();
1✔
812
                  try {
813
                    ctx.getWorkflowMutableState()
1✔
814
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
815
                  } catch (Throwable ee) {
×
816
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
817
                  }
1✔
818
                } else {
1✔
819
                  log.error("Unexpected failure starting a child workflow", e);
×
820
                }
821
              } catch (Exception e) {
×
822
                log.error("Unexpected failure starting a child workflow", e);
×
823
              }
1✔
824
            });
1✔
825
  }
1✔
826

827
  private static void startWorkflow(
828
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
829
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
830
      throw Status.INVALID_ARGUMENT
×
831
          .withDescription("negative workflowExecution timeout")
×
832
          .asRuntimeException();
×
833
    }
834
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
835
      throw Status.INVALID_ARGUMENT
×
836
          .withDescription("negative workflowRun timeout")
×
837
          .asRuntimeException();
×
838
    }
839
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
840
      throw Status.INVALID_ARGUMENT
×
841
          .withDescription("negative workflowTaskTimeoutSeconds")
×
842
          .asRuntimeException();
×
843
    }
844

845
    WorkflowExecutionStartedEventAttributes.Builder a =
846
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
847
            .setWorkflowType(request.getWorkflowType())
1✔
848
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
849
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
850
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
851
            .setIdentity(request.getIdentity())
1✔
852
            .setInput(request.getInput())
1✔
853
            .setTaskQueue(request.getTaskQueue())
1✔
854
            .setAttempt(1);
1✔
855
    data.retryState.ifPresent(
1✔
856
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
857
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
858
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
859
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
860
    if (data.lastCompletionResult != null) {
1✔
861
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
862
    }
863
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
864
    if (request.hasMemo()) {
1✔
865
      a.setMemo(request.getMemo());
1✔
866
    }
867
    if (request.hasSearchAttributes()) {
1✔
868
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
869
    }
870
    if (request.hasHeader()) {
1✔
871
      a.setHeader(request.getHeader());
1✔
872
    }
873
    String cronSchedule = request.getCronSchedule();
1✔
874
    if (!cronSchedule.trim().isEmpty()) {
1✔
875
      try {
876
        CronUtils.parseCron(cronSchedule);
1✔
877
        a.setCronSchedule(cronSchedule);
1✔
878
      } catch (Exception e) {
×
879
        throw Status.INVALID_ARGUMENT
×
880
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
881
            .withCause(e)
×
882
            .asRuntimeException();
×
883
      }
1✔
884
    }
885
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
886
    if (parent.isPresent()) {
1✔
887
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
888
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
889
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
890
    }
891
    HistoryEvent event =
892
        HistoryEvent.newBuilder()
1✔
893
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
894
            .setWorkflowExecutionStartedEventAttributes(a)
1✔
895
            .build();
1✔
896
    ctx.addEvent(event);
1✔
897
  }
1✔
898

899
  private static void completeWorkflow(
900
      RequestContext ctx,
901
      WorkflowData data,
902
      CompleteWorkflowExecutionCommandAttributes d,
903
      long workflowTaskCompletedEventId) {
904
    WorkflowExecutionCompletedEventAttributes.Builder a =
905
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
906
            .setResult(d.getResult())
1✔
907
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
908
    HistoryEvent event =
909
        HistoryEvent.newBuilder()
1✔
910
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
911
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
912
            .build();
1✔
913
    ctx.addEvent(event);
1✔
914
  }
1✔
915

916
  private static void continueAsNewWorkflow(
917
      RequestContext ctx,
918
      WorkflowData data,
919
      ContinueAsNewWorkflowExecutionCommandAttributes d,
920
      long workflowTaskCompletedEventId) {
921
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
922
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
923
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
924
    a.setInput(d.getInput());
1✔
925
    if (d.hasHeader()) {
1✔
926
      a.setHeader(d.getHeader());
1✔
927
    }
928
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
929
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
930
    } else {
931
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
932
    }
933
    if (d.hasTaskQueue()) {
1✔
934
      a.setTaskQueue(d.getTaskQueue());
1✔
935
    } else {
936
      a.setTaskQueue(sr.getTaskQueue());
1✔
937
    }
938
    if (d.hasWorkflowType()) {
1✔
939
      a.setWorkflowType(d.getWorkflowType());
1✔
940
    } else {
941
      a.setWorkflowType(sr.getWorkflowType());
1✔
942
    }
943
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
944
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
945
    } else {
946
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
947
    }
948
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
949
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
950
    if (d.hasLastCompletionResult()) {
1✔
951
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
952
    }
953
    if (d.hasFailure()) {
1✔
954
      a.setFailure(d.getFailure());
1✔
955
    }
956
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
957
    HistoryEvent event =
958
        HistoryEvent.newBuilder()
1✔
959
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
960
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
961
            .build();
1✔
962
    ctx.addEvent(event);
1✔
963
  }
1✔
964

965
  private static void failWorkflow(
966
      RequestContext ctx,
967
      WorkflowData data,
968
      FailWorkflowExecutionCommandAttributes d,
969
      long workflowTaskCompletedEventId) {
970
    WorkflowExecutionFailedEventAttributes.Builder a =
971
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
972
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
973
    if (d.hasFailure()) {
1✔
974
      a.setFailure(d.getFailure());
1✔
975
    }
976
    HistoryEvent event =
977
        HistoryEvent.newBuilder()
1✔
978
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
979
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
980
            .build();
1✔
981
    ctx.addEvent(event);
1✔
982
  }
1✔
983

984
  private static void timeoutWorkflow(
985
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
986
    WorkflowExecutionTimedOutEventAttributes.Builder a =
987
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
988
    HistoryEvent event =
989
        HistoryEvent.newBuilder()
1✔
990
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
991
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
992
            .build();
1✔
993
    ctx.addEvent(event);
1✔
994
  }
1✔
995

996
  private static void cancelWorkflow(
997
      RequestContext ctx,
998
      WorkflowData data,
999
      CancelWorkflowExecutionCommandAttributes d,
1000
      long workflowTaskCompletedEventId) {
1001
    WorkflowExecutionCanceledEventAttributes.Builder a =
1002
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1003
            .setDetails(d.getDetails())
1✔
1004
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1005
    HistoryEvent event =
1006
        HistoryEvent.newBuilder()
1✔
1007
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1008
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1009
            .build();
1✔
1010
    ctx.addEvent(event);
1✔
1011
  }
1✔
1012

1013
  private static void terminateWorkflow(
1014
      RequestContext ctx,
1015
      WorkflowData data,
1016
      TerminateWorkflowExecutionRequest d,
1017
      long workflowTaskCompletedEventId) {
1018
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1019
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1020
            .setDetails(d.getDetails())
1✔
1021
            .setIdentity(d.getIdentity())
1✔
1022
            .setReason(d.getReason());
1✔
1023
    HistoryEvent event =
1024
        HistoryEvent.newBuilder()
1✔
1025
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1026
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1027
            .build();
1✔
1028
    ctx.addEvent(event);
1✔
1029
  }
1✔
1030

1031
  private static void requestWorkflowCancellation(
1032
      RequestContext ctx,
1033
      WorkflowData data,
1034
      RequestCancelWorkflowExecutionRequest cancelRequest,
1035
      long notUsed) {
1036
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1037
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1038
            .setIdentity(cancelRequest.getIdentity());
1✔
1039
    HistoryEvent cancelRequested =
1040
        HistoryEvent.newBuilder()
1✔
1041
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1042
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1043
            .build();
1✔
1044
    ctx.addEvent(cancelRequested);
1✔
1045
  }
1✔
1046

1047
  private static void scheduleActivityTask(
1048
      RequestContext ctx,
1049
      ActivityTaskData data,
1050
      ScheduleActivityTaskCommandAttributes d,
1051
      long workflowTaskCompletedEventId) {
1052
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1053
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1054
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1055
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1056

1057
    ActivityTaskScheduledEventAttributes.Builder a =
1058
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1059
            .setInput(d.getInput())
1✔
1060
            .setActivityId(d.getActivityId())
1✔
1061
            .setActivityType(d.getActivityType())
1✔
1062
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1063
            .setRetryPolicy(retryPolicy)
1✔
1064
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1065
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1066
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1067
            .setTaskQueue(d.getTaskQueue())
1✔
1068
            .setHeader(d.getHeader())
1✔
1069
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1070

1071
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1072
    data.scheduledEvent = a.build();
1✔
1073
    HistoryEvent event =
1074
        HistoryEvent.newBuilder()
1✔
1075
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1076
            .setActivityTaskScheduledEventAttributes(a)
1✔
1077
            .build();
1✔
1078
    long scheduledEventId = ctx.addEvent(event);
1✔
1079

1080
    PollActivityTaskQueueResponse.Builder taskResponse =
1081
        PollActivityTaskQueueResponse.newBuilder()
1✔
1082
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1083
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1084
            .setActivityType(d.getActivityType())
1✔
1085
            .setWorkflowExecution(ctx.getExecution())
1✔
1086
            .setActivityId(d.getActivityId())
1✔
1087
            .setInput(d.getInput())
1✔
1088
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1089
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1090
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1091
            .setScheduledTime(ctx.currentTime())
1✔
1092
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1093
            .setHeader(d.getHeader())
1✔
1094
            .setAttempt(1);
1✔
1095

1096
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1097
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1098
    ctx.addActivityTask(activityTask);
1✔
1099
    ctx.onCommit(
1✔
1100
        (historySize) -> {
1101
          data.scheduledEventId = scheduledEventId;
1✔
1102
          data.activityTask = activityTask;
1✔
1103
          data.retryState = retryState;
1✔
1104
        });
1✔
1105
  }
1✔
1106

1107
  private static void requestActivityCancellation(
1108
      RequestContext ctx,
1109
      ActivityTaskData data,
1110
      RequestCancelActivityTaskCommandAttributes d,
1111
      long workflowTaskCompletedEventId) {
1112
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1113
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1114
            .setScheduledEventId(d.getScheduledEventId())
1✔
1115
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1116
    HistoryEvent event =
1117
        HistoryEvent.newBuilder()
1✔
1118
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1119
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1120
            .build();
1✔
1121
    ctx.addEvent(event);
1✔
1122
  }
1✔
1123

1124
  private static void scheduleWorkflowTask(
1125
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1126
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1127
    long scheduledEventId;
1128
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1129
    WorkflowTaskScheduledEventAttributes a =
1130
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1131
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1132
            .setTaskQueue(taskQueue)
1✔
1133
            .setAttempt(++data.attempt)
1✔
1134
            .build();
1✔
1135
    HistoryEvent event =
1136
        HistoryEvent.newBuilder()
1✔
1137
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1138
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1139
            .build();
1✔
1140
    scheduledEventId = ctx.addEvent(event);
1✔
1141
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1142
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1143
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1144
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1145
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1146
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1147
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1148
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1149
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1150
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1151
    ctx.onCommit(
1✔
1152
        (historySize) -> {
1153
          data.scheduledEventId = scheduledEventId;
1✔
1154
          data.workflowTask = workflowTaskResponse;
1✔
1155
          // Move buffered update request to new workflow task
1156
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1157
          data.updateRequestBuffer.clear();
1✔
1158
        });
1✔
1159
  }
1✔
1160

1161
  private static void convertQueryWorkflowTaskToReal(
1162
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1163
    StartWorkflowExecutionRequest request = data.startRequest;
×
1164
    WorkflowTaskScheduledEventAttributes a =
1165
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1166
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1167
            .setTaskQueue(request.getTaskQueue())
×
1168
            .setAttempt(data.attempt)
×
1169
            .build();
×
1170
    HistoryEvent event =
1171
        HistoryEvent.newBuilder()
×
1172
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1173
            .setWorkflowTaskScheduledEventAttributes(a)
×
1174
            .build();
×
1175
    long scheduledEventId = ctx.addEvent(event);
×
1176
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1177
  }
×
1178

1179
  private static void scheduleQueryWorkflowTask(
1180
      RequestContext ctx,
1181
      WorkflowTaskData data,
1182
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1183
      long notUsed) {
1184
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1185
    StartWorkflowExecutionRequest request = data.startRequest;
×
1186
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1187
        PollWorkflowTaskQueueResponse.newBuilder();
×
1188
    StickyExecutionAttributes stickyAttributes =
×
1189
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1190
    String taskQueue =
1191
        stickyAttributes == null
×
1192
            ? request.getTaskQueue().getName()
×
1193
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1194
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1195
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1196
    workflowTaskResponse.setAttempt(++data.attempt);
×
1197
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1198
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1199
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1200
    ctx.onCommit(
×
1201
        (historySize) -> {
1202
          if (data.lastSuccessfulStartedEventId > 0) {
×
1203
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1204
          }
1205
          data.scheduledEventId = NO_EVENT_ID;
×
1206
          data.workflowTask = workflowTaskResponse;
×
1207
          if (query != null) {
×
1208
            data.consistentQueryRequests.put(query.getKey(), query);
×
1209
          }
1210
        });
×
1211
  }
×
1212

1213
  private static void queryWhileScheduled(
1214
      RequestContext ctx,
1215
      WorkflowTaskData data,
1216
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1217
      long notUsed) {
1218
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1219
  }
1✔
1220

1221
  private static void bufferQuery(
1222
      RequestContext ctx,
1223
      WorkflowTaskData data,
1224
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1225
      long notUsed) {
1226
    data.queryBuffer.put(query.getKey(), query);
1✔
1227
  }
1✔
1228

1229
  private static void bufferUpdate(
1230
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1231
    if (data.getUpdateRequest(update.getId()).isPresent()) {
×
1232
      throw Status.INTERNAL
×
1233
          .withDescription("Update ID already exists: " + update.getId())
×
1234
          .asRuntimeException();
×
1235
    }
1236
    data.updateRequestBuffer.put(update.getId(), update);
×
1237
  }
×
1238

1239
  private static void addUpdate(
1240
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1241
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1242
      throw Status.INTERNAL
×
1243
          .withDescription("Update ID already exists: " + update.getId())
×
1244
          .asRuntimeException();
×
1245
    }
1246
    data.updateRequest.put(update.getId(), update);
1✔
1247
  }
1✔
1248

1249
  private static void startWorkflowTask(
1250
      RequestContext ctx,
1251
      WorkflowTaskData data,
1252
      PollWorkflowTaskQueueRequest request,
1253
      long notUsed) {
1254
    WorkflowTaskStartedEventAttributes a =
1255
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1256
            .setIdentity(request.getIdentity())
1✔
1257
            .setScheduledEventId(data.scheduledEventId)
1✔
1258
            .build();
1✔
1259
    HistoryEvent event =
1260
        HistoryEvent.newBuilder()
1✔
1261
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1262
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1263
            .build();
1✔
1264
    long startedEventId = ctx.addEvent(event);
1✔
1265
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1266
  }
1✔
1267

1268
  private static void startQueryOnlyWorkflowTask(
1269
      RequestContext ctx,
1270
      WorkflowTaskData data,
1271
      PollWorkflowTaskQueueRequest request,
1272
      long notUsed) {
1273
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1274
  }
×
1275

1276
  private static void startWorkflowTaskImpl(
1277
      RequestContext ctx,
1278
      WorkflowTaskData data,
1279
      PollWorkflowTaskQueueRequest request,
1280
      long startedEventId,
1281
      boolean queryOnly) {
1282
    ctx.onCommit(
1✔
1283
        (historySize) -> {
1284
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1285
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1286
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1287
          task.setTaskToken(taskToken.toBytes());
1✔
1288
          GetWorkflowExecutionHistoryRequest getRequest =
1289
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1290
                  .setNamespace(request.getNamespace())
1✔
1291
                  .setExecution(ctx.getExecution())
1✔
1292
                  .build();
1✔
1293
          List<HistoryEvent> events;
1294
          events =
1✔
1295
              data.store
1296
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1297
                  .getHistory()
1✔
1298
                  .getEventsList();
1✔
1299
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1300
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1301
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1302
          }
1303
          if (queryOnly && !data.workflowCompleted) {
1✔
1304
            events = new ArrayList<>(events); // convert list to mutable
×
1305
            // Add "fake" workflow task scheduled and started if workflow is not closed
1306
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1307
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1308
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1309
                    .setTaskQueue(request.getTaskQueue())
×
1310
                    .setAttempt(data.attempt)
×
1311
                    .build();
×
1312
            HistoryEvent scheduledEvent =
1313
                HistoryEvent.newBuilder()
×
1314
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1315
                    .setEventId(lastEventId + 1)
×
1316
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1317
                    .build();
×
1318
            events.add(scheduledEvent);
×
1319
            WorkflowTaskStartedEventAttributes startedAttributes =
1320
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1321
                    .setIdentity(request.getIdentity())
×
1322
                    .setScheduledEventId(lastEventId + 1)
×
1323
                    .build();
×
1324
            HistoryEvent startedEvent =
1325
                HistoryEvent.newBuilder()
×
1326
                    .setEventId(lastEventId + 1)
×
1327
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1328
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1329
                    .build();
×
1330
            events.add(startedEvent);
×
1331
            task.setStartedEventId(lastEventId + 2);
×
1332
          }
1333
          // get it from previous started event id.
1334
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1335
          // Transfer the queries
1336
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1337
              data.consistentQueryRequests;
1338
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1339
              queries.entrySet()) {
1✔
1340
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1341
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1342
          }
1✔
1343
          // Transfer the messages
1344
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1345
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1346
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1347
            Message updateMessage =
1348
                Message.newBuilder()
1✔
1349
                    .setId(update.getKey() + "/request")
1✔
1350
                    .setProtocolInstanceId(update.getKey())
1✔
1351
                    .setEventId(data.scheduledEventId)
1✔
1352
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1353
                    .build();
1✔
1354
            task.addMessages(updateMessage);
1✔
1355
          }
1✔
1356
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1357
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1358
          }
1359
          if (!queryOnly) {
1✔
1360
            data.startedEventId = startedEventId;
1✔
1361
          }
1362
        });
1✔
1363
  }
1✔
1364

1365
  private static void startActivityTask(
1366
      RequestContext ctx,
1367
      ActivityTaskData data,
1368
      PollActivityTaskQueueRequest request,
1369
      long notUsed) {
1370
    ActivityTaskStartedEventAttributes.Builder a =
1371
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1372
            .setIdentity(request.getIdentity())
1✔
1373
            .setScheduledEventId(data.scheduledEventId);
1✔
1374
    a.setAttempt(data.getAttempt());
1✔
1375
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1376
    // history. But in the case of retry it happens only after an activity completion.
1377
    Timestamp timestamp = data.store.currentTime();
1✔
1378
    HistoryEvent event =
1379
        HistoryEvent.newBuilder()
1✔
1380
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1381
            .setEventTime(timestamp)
1✔
1382
            .setActivityTaskStartedEventAttributes(a)
1✔
1383
            .build();
1✔
1384
    long startedEventId;
1385
    startedEventId = NO_EVENT_ID;
1✔
1386
    ctx.onCommit(
1✔
1387
        (historySize) -> {
1388
          data.startedEventId = startedEventId;
1✔
1389
          data.startedEvent = event;
1✔
1390
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1391
          task.setTaskToken(
1✔
1392
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1393
                  .toBytes());
1✔
1394
          task.setStartedTime(timestamp);
1✔
1395
        });
1✔
1396
  }
1✔
1397

1398
  private static void completeWorkflowTask(
1399
      RequestContext ctx,
1400
      WorkflowTaskData data,
1401
      RespondWorkflowTaskCompletedRequest request,
1402
      long notUsed) {
1403
    WorkflowTaskCompletedEventAttributes.Builder a =
1404
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1405
            .setIdentity(request.getIdentity())
1✔
1406
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1407
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1408
            .setSdkMetadata(request.getSdkMetadata())
1✔
1409
            .setScheduledEventId(data.scheduledEventId);
1✔
1410
    HistoryEvent event =
1411
        HistoryEvent.newBuilder()
1✔
1412
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1413
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1414
            .build();
1✔
1415
    ctx.addEvent(event);
1✔
1416
    ctx.onCommit(
1✔
1417
        (historySize) -> {
1418
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1419
          data.clear();
1✔
1420
        });
1✔
1421
  }
1✔
1422

1423
  private static void completeQuery(
1424
      RequestContext ctx,
1425
      WorkflowTaskData data,
1426
      RespondWorkflowTaskCompletedRequest request,
1427
      long notUsed) {
1428
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1429
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1430
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1431
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1432
      if (query != null) {
×
1433
        WorkflowQueryResult value = resultEntry.getValue();
×
1434
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1435
        switch (value.getResultType()) {
×
1436
          case QUERY_RESULT_TYPE_ANSWERED:
1437
            QueryWorkflowResponse response =
1438
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1439
            result.complete(response);
×
1440
            break;
×
1441
          case QUERY_RESULT_TYPE_FAILED:
1442
            result.completeExceptionally(
×
1443
                StatusUtils.newException(
×
1444
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1445
                    QueryFailedFailure.getDefaultInstance(),
×
1446
                    QueryFailedFailure.getDescriptor()));
×
1447
            break;
×
1448
          default:
1449
            throw Status.INVALID_ARGUMENT
×
1450
                .withDescription("Invalid query result type: " + value.getResultType())
×
1451
                .asRuntimeException();
×
1452
        }
1453
      }
1454
    }
×
1455
    ctx.onCommit(
×
1456
        (historySize) -> {
1457
          data.clear();
×
1458
          ctx.unlockTimer("completeQuery");
×
1459
        });
×
1460
  }
×
1461

1462
  private static void failQueryWorkflowTask(
1463
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1464
    data.consistentQueryRequests
×
1465
        .entrySet()
×
1466
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1467
    if (!data.consistentQueryRequests.isEmpty()) {
×
1468
      ctx.setNeedWorkflowTask(true);
×
1469
    }
1470
    ctx.unlockTimer("failQueryWorkflowTask");
×
1471
  }
×
1472

1473
  private static void failWorkflowTask(
1474
      RequestContext ctx,
1475
      WorkflowTaskData data,
1476
      RespondWorkflowTaskFailedRequest request,
1477
      long notUsed) {
1478
    WorkflowTaskFailedEventAttributes.Builder a =
1479
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1480
            .setIdentity(request.getIdentity())
1✔
1481
            .setStartedEventId(data.startedEventId)
1✔
1482
            .setScheduledEventId(data.scheduledEventId)
1✔
1483
            .setCause(request.getCause());
1✔
1484
    if (request.hasFailure()) {
1✔
1485
      a.setFailure(request.getFailure());
1✔
1486
    }
1487
    HistoryEvent event =
1488
        HistoryEvent.newBuilder()
1✔
1489
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1490
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1491
            .build();
1✔
1492
    ctx.addEvent(event);
1✔
1493
    ctx.setNeedWorkflowTask(true);
1✔
1494
  }
1✔
1495

1496
  private static void timeoutWorkflowTask(
1497
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1498
    WorkflowTaskTimedOutEventAttributes.Builder a =
1499
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1500
            .setStartedEventId(data.startedEventId)
1✔
1501
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1502
            .setScheduledEventId(data.scheduledEventId);
1✔
1503
    HistoryEvent event =
1504
        HistoryEvent.newBuilder()
1✔
1505
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1506
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1507
            .build();
1✔
1508
    ctx.addEvent(event);
1✔
1509
    ctx.setNeedWorkflowTask(true);
1✔
1510
  }
1✔
1511

1512
  private static void needsWorkflowTask(
1513
      RequestContext requestContext,
1514
      WorkflowTaskData workflowTaskData,
1515
      Object notUsedRequest,
1516
      long notUsed) {
1517
    requestContext.setNeedWorkflowTask(true);
1✔
1518
  }
1✔
1519

1520
  private static void completeActivityTask(
1521
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1522
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1523
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1524
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1525
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1526
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1527
    } else {
1528
      throw new IllegalArgumentException("Unknown request: " + request);
×
1529
    }
1530
  }
1✔
1531

1532
  private static void completeActivityTaskByTaskToken(
1533
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1534
    ActivityTaskCompletedEventAttributes.Builder a =
1535
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1536
            .setIdentity(request.getIdentity())
1✔
1537
            .setScheduledEventId(data.scheduledEventId)
1✔
1538
            .setResult(request.getResult())
1✔
1539
            .setStartedEventId(data.startedEventId);
1✔
1540
    HistoryEvent event =
1541
        HistoryEvent.newBuilder()
1✔
1542
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1543
            .setActivityTaskCompletedEventAttributes(a)
1✔
1544
            .build();
1✔
1545
    ctx.addEvent(event);
1✔
1546
  }
1✔
1547

1548
  private static void completeActivityTaskById(
1549
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1550
    ActivityTaskCompletedEventAttributes.Builder a =
1551
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1552
            .setIdentity(request.getIdentity())
×
1553
            .setScheduledEventId(data.scheduledEventId)
×
1554
            .setResult(request.getResult())
×
1555
            .setStartedEventId(data.startedEventId);
×
1556
    HistoryEvent event =
1557
        HistoryEvent.newBuilder()
×
1558
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1559
            .setActivityTaskCompletedEventAttributes(a)
×
1560
            .build();
×
1561
    ctx.addEvent(event);
×
1562
  }
×
1563

1564
  private static State failActivityTask(
1565
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1566
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1567
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1568
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1569
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1570
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1571
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1572
    } else {
1573
      throw new IllegalArgumentException("Unknown request: " + request);
×
1574
    }
1575
  }
1576

1577
  private static State failActivityTaskByRequestType(
1578
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1579
    if (!failure.hasApplicationFailureInfo()) {
1✔
1580
      throw new IllegalArgumentException(
×
1581
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1582
    }
1583
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1584
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1585
      return INITIATED;
1✔
1586
    }
1587
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1588
    ActivityTaskFailedEventAttributes.Builder attributes =
1589
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1590
            .setIdentity(identity)
1✔
1591
            .setScheduledEventId(data.scheduledEventId)
1✔
1592
            .setFailure(failure)
1✔
1593
            .setRetryState(retryState)
1✔
1594
            .setStartedEventId(data.startedEventId);
1✔
1595
    HistoryEvent event =
1596
        HistoryEvent.newBuilder()
1✔
1597
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1598
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1599
            .build();
1✔
1600
    ctx.addEvent(event);
1✔
1601
    return FAILED;
1✔
1602
  }
1603

1604
  private static State timeoutActivityTask(
1605
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1606
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1607

1608
    // chaining with the previous run failure if we are preparing the final failure
1609
    Failure failure =
1✔
1610
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1611

1612
    RetryState retryState;
1613
    switch (timeoutType) {
1✔
1614
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1615
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1616
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1617
        // set a larger ScheduleToStart timeout.
1618
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1619
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
1620
        break;
1✔
1621
      case TIMEOUT_TYPE_START_TO_CLOSE:
1622
      case TIMEOUT_TYPE_HEARTBEAT:
1623
        // not chaining with the previous run failure if we are preparing the failure to be stored
1624
        // for the next iteration
1625
        Optional<Failure> lastFailure =
1✔
1626
            Optional.of(
1✔
1627
                newTimeoutFailure(
1✔
1628
                    timeoutType,
1629
                    // we move heartbeatDetails to the new top level (this cause is used for
1630
                    // scheduleToClose only)
1631
                    Optional.empty(),
1✔
1632
                    // prune to don't have too deep nesting of failures
1633
                    Optional.empty()));
1✔
1634

1635
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
1636
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1637
          return INITIATED;
1✔
1638
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
1639
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
1640
          // attemptActivityRetry();
1641
          // start to close timeout would return as "max attempts reached".
1642

1643
          Preconditions.checkState(
1✔
1644
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
1645
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
1646
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
1647
              timeoutType);
1648

1649
          // heartbeat is preserved as the cause for the scheduleToClose timeout
1650
          // But we effectively omit startToClose timeout with scheduleToClose timeout
1651
          Optional<Failure> cause =
1652
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
1653

1654
          failure =
1✔
1655
              newTimeoutFailure(
1✔
1656
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1657
                  Optional.ofNullable(data.heartbeatDetails),
1✔
1658
                  cause);
1659
        }
1✔
1660
        break;
1661
      default:
1662
        throw new IllegalStateException(
×
1663
            "Not implemented behavior for timeout type: " + timeoutType);
1664
    }
1665

1666
    ActivityTaskTimedOutEventAttributes.Builder a =
1667
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
1668
            .setScheduledEventId(data.scheduledEventId)
1✔
1669
            .setRetryState(retryState)
1✔
1670
            .setStartedEventId(data.startedEventId)
1✔
1671
            .setFailure(failure);
1✔
1672
    HistoryEvent event =
1673
        HistoryEvent.newBuilder()
1✔
1674
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
1675
            .setActivityTaskTimedOutEventAttributes(a)
1✔
1676
            .build();
1✔
1677
    ctx.addEvent(event);
1✔
1678
    return TIMED_OUT;
1✔
1679
  }
1680

1681
  private static Failure newTimeoutFailure(
1682
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
1683
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
1684
    if (lastHeartbeatDetails.isPresent()) {
1✔
1685
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
1686
    }
1687
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
1688
    if (cause.isPresent()) {
1✔
1689
      result.setCause(cause.get());
1✔
1690
    }
1691
    return result.build();
1✔
1692
  }
1693

1694
  private static RetryState attemptActivityRetry(
1695
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
1696
    if (data.retryState == null) {
1✔
1697
      throw new IllegalStateException("RetryPolicy is always present");
×
1698
    }
1699
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
1700
    if (info.isPresent()) {
1✔
1701
      if (info.get().getNonRetryable()) {
1✔
1702
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
1703
      }
1704
    }
1705
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
1706
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
1707
        data.retryState.getBackoffIntervalInSeconds(
1✔
1708
            info.map(ApplicationFailureInfo::getType), data.store.currentTime());
1✔
1709
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1710
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
1711
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1712
      if (data.heartbeatDetails != null) {
1✔
1713
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
1714
      }
1715
      ctx.onCommit(
1✔
1716
          (historySize) -> {
1717
            data.retryState = nextAttempt;
1✔
1718
            task.setAttempt(nextAttempt.getAttempt());
1✔
1719
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
1720
          });
1✔
1721
    } else {
1✔
1722
      data.nextBackoffInterval = Durations.ZERO;
1✔
1723
    }
1724
    return backoffInterval.getRetryState();
1✔
1725
  }
1726

1727
  private static void reportActivityTaskCancellation(
1728
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1729
    Payloads details = null;
1✔
1730
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
1731
      {
1732
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
1733

1734
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
1735
      }
1✔
1736
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
1737
      {
1738
        RespondActivityTaskCanceledByIdRequest cr =
×
1739
            (RespondActivityTaskCanceledByIdRequest) request;
1740
        details = cr.hasDetails() ? cr.getDetails() : null;
×
1741
      }
×
1742
    } else if (request != null) {
1✔
1743
      throw Status.INTERNAL
×
1744
          .withDescription("Unexpected request type: " + request)
×
1745
          .asRuntimeException();
×
1746
    }
1747
    ActivityTaskCanceledEventAttributes.Builder a =
1748
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
1749
            .setScheduledEventId(data.scheduledEventId)
1✔
1750
            .setStartedEventId(data.startedEventId);
1✔
1751
    if (details != null) {
1✔
1752
      a.setDetails(details);
×
1753
    }
1754
    HistoryEvent event =
1755
        HistoryEvent.newBuilder()
1✔
1756
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
1757
            .setActivityTaskCanceledEventAttributes(a)
1✔
1758
            .build();
1✔
1759
    ctx.addEvent(event);
1✔
1760
  }
1✔
1761

1762
  private static void heartbeatActivityTask(
1763
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
1764
    data.heartbeatDetails = details;
1✔
1765
  }
1✔
1766

1767
  private static void acceptUpdate(
1768
      RequestContext ctx,
1769
      UpdateWorkflowExecutionData data,
1770
      Message msg,
1771
      long workflowTaskCompletedEventId) {
1772
    try {
1773
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
1774

1775
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
1776
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
1777
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
1778
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
1779
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
1780
              .setAcceptedRequest(data.initialRequest)
1✔
1781
              .build();
1✔
1782

1783
      HistoryEvent event =
1784
          HistoryEvent.newBuilder()
1✔
1785
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
1786
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
1787
              .build();
1✔
1788
      // If the workflow is finished we can't write more events
1789
      // to history so if the message was processed after the workflow
1790
      // was closed there is nothing we can do.
1791
      // The real server also has this same problem
1792
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1793
        ctx.addEvent(event);
1✔
1794
      }
1795

1796
      UpdateWorkflowExecutionResponse response =
1797
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1798
              .setUpdateRef(
1✔
1799
                  UpdateRef.newBuilder()
1✔
1800
                      .setWorkflowExecution(ctx.getExecution())
1✔
1801
                      .setUpdateId(data.id))
1✔
1802
              .build();
1✔
1803

1804
      data.acceptance.complete(response);
1✔
1805
    } catch (InvalidProtocolBufferException e) {
×
1806
      throw new RuntimeException(e);
×
1807
    }
1✔
1808
  }
1✔
1809

1810
  private static void completeUpdate(
1811
      RequestContext ctx,
1812
      UpdateWorkflowExecutionData data,
1813
      Message msg,
1814
      long workflowTaskCompletedEventId) {
1815
    try {
1816
      Response response = msg.getBody().unpack(Response.class);
1✔
1817

1818
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
1819
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
1820
              .setMeta(response.getMeta())
1✔
1821
              .setOutcome(response.getOutcome())
1✔
1822
              .build();
1✔
1823

1824
      HistoryEvent event =
1825
          HistoryEvent.newBuilder()
1✔
1826
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
1827
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
1828
              .build();
1✔
1829
      // If the workflow is finished we can't write more events
1830
      // to history so if the message was processed after the workflow
1831
      // was closed there is nothing we can do.
1832
      // The real server also has this same problem
1833
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1834
        ctx.addEvent(event);
1✔
1835
      }
1836

1837
      UpdateWorkflowExecutionResponse updateResponse =
1838
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1839
              .setUpdateRef(
1✔
1840
                  UpdateRef.newBuilder()
1✔
1841
                      .setWorkflowExecution(ctx.getExecution())
1✔
1842
                      .setUpdateId(data.id))
1✔
1843
              .setOutcome(response.getOutcome())
1✔
1844
              .build();
1✔
1845

1846
      data.complete.complete(updateResponse);
1✔
1847
    } catch (InvalidProtocolBufferException e) {
×
1848
      throw new RuntimeException(e);
×
1849
    }
1✔
1850
  }
1✔
1851

1852
  private static void startTimer(
1853
      RequestContext ctx,
1854
      TimerData data,
1855
      StartTimerCommandAttributes d,
1856
      long workflowTaskCompletedEventId) {
1857
    TimerStartedEventAttributes.Builder a =
1858
        TimerStartedEventAttributes.newBuilder()
1✔
1859
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1860
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
1861
            .setTimerId(d.getTimerId());
1✔
1862
    HistoryEvent event =
1863
        HistoryEvent.newBuilder()
1✔
1864
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
1865
            .setTimerStartedEventAttributes(a)
1✔
1866
            .build();
1✔
1867
    long startedEventId = ctx.addEvent(event);
1✔
1868
    ctx.onCommit(
1✔
1869
        (historySize) -> {
1870
          data.startedEvent = a.build();
1✔
1871
          data.startedEventId = startedEventId;
1✔
1872
        });
1✔
1873
  }
1✔
1874

1875
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1876
    TimerFiredEventAttributes.Builder a =
1877
        TimerFiredEventAttributes.newBuilder()
1✔
1878
            .setTimerId(data.startedEvent.getTimerId())
1✔
1879
            .setStartedEventId(data.startedEventId);
1✔
1880
    HistoryEvent event =
1881
        HistoryEvent.newBuilder()
1✔
1882
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
1883
            .setTimerFiredEventAttributes(a)
1✔
1884
            .build();
1✔
1885
    ctx.addEvent(event);
1✔
1886
  }
1✔
1887

1888
  private static void cancelTimer(
1889
      RequestContext ctx,
1890
      TimerData data,
1891
      CancelTimerCommandAttributes d,
1892
      long workflowTaskCompletedEventId) {
1893
    TimerCanceledEventAttributes.Builder a =
1894
        TimerCanceledEventAttributes.newBuilder()
1✔
1895
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1896
            .setTimerId(d.getTimerId())
1✔
1897
            .setStartedEventId(data.startedEventId);
1✔
1898
    HistoryEvent event =
1899
        HistoryEvent.newBuilder()
1✔
1900
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
1901
            .setTimerCanceledEventAttributes(a)
1✔
1902
            .build();
1✔
1903
    ctx.addEvent(event);
1✔
1904
  }
1✔
1905

1906
  private static void initiateExternalSignal(
1907
      RequestContext ctx,
1908
      SignalExternalData data,
1909
      SignalExternalWorkflowExecutionCommandAttributes d,
1910
      long workflowTaskCompletedEventId) {
1911
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1912
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1913
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1914
            .setControl(d.getControl())
1✔
1915
            .setInput(d.getInput())
1✔
1916
            .setNamespace(d.getNamespace())
1✔
1917
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1918
            .setSignalName(d.getSignalName())
1✔
1919
            .setWorkflowExecution(d.getExecution());
1✔
1920

1921
    HistoryEvent event =
1922
        HistoryEvent.newBuilder()
1✔
1923
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1924
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1925
            .build();
1✔
1926
    long initiatedEventId = ctx.addEvent(event);
1✔
1927
    ctx.onCommit(
1✔
1928
        (historySize) -> {
1929
          data.initiatedEventId = initiatedEventId;
1✔
1930
          data.initiatedEvent = a.build();
1✔
1931
        });
1✔
1932
  }
1✔
1933

1934
  private static void failExternalSignal(
1935
      RequestContext ctx,
1936
      SignalExternalData data,
1937
      SignalExternalWorkflowExecutionFailedCause cause,
1938
      long notUsed) {
1939
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1940
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
1941
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1942
            .setInitiatedEventId(data.initiatedEventId)
1✔
1943
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
1944
            .setControl(initiatedEvent.getControl())
1✔
1945
            .setCause(cause)
1✔
1946
            .setNamespace(initiatedEvent.getNamespace());
1✔
1947
    HistoryEvent event =
1948
        HistoryEvent.newBuilder()
1✔
1949
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
1950
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
1951
            .build();
1✔
1952
    ctx.addEvent(event);
1✔
1953
  }
1✔
1954

1955
  private static void completeExternalSignal(
1956
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1957
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1958
    WorkflowExecution signaledExecution =
1✔
1959
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
1960
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
1961
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
1962
            .setInitiatedEventId(data.initiatedEventId)
1✔
1963
            .setWorkflowExecution(signaledExecution)
1✔
1964
            .setControl(initiatedEvent.getControl())
1✔
1965
            .setNamespace(initiatedEvent.getNamespace());
1✔
1966
    HistoryEvent event =
1967
        HistoryEvent.newBuilder()
1✔
1968
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
1969
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
1970
            .build();
1✔
1971
    ctx.addEvent(event);
1✔
1972
  }
1✔
1973

1974
  private static void initiateExternalCancellation(
1975
      RequestContext ctx,
1976
      CancelExternalData data,
1977
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
1978
      long workflowTaskCompletedEventId) {
1979
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1980
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1981
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1982
            .setControl(d.getControl())
1✔
1983
            .setNamespace(d.getNamespace())
1✔
1984
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1985
            .setWorkflowExecution(
1✔
1986
                WorkflowExecution.newBuilder()
1✔
1987
                    .setWorkflowId(d.getWorkflowId())
1✔
1988
                    .setRunId(d.getRunId())
1✔
1989
                    .build());
1✔
1990

1991
    HistoryEvent event =
1992
        HistoryEvent.newBuilder()
1✔
1993
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1994
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1995
            .build();
1✔
1996
    long initiatedEventId = ctx.addEvent(event);
1✔
1997
    ctx.onCommit(
1✔
1998
        (historySize) -> {
1999
          data.initiatedEventId = initiatedEventId;
1✔
2000
          data.initiatedEvent = a.build();
1✔
2001
        });
1✔
2002
  }
1✔
2003

2004
  private static void reportExternalCancellationRequested(
2005
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2006
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2007
        data.initiatedEvent;
2008
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2009
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2010
            .setInitiatedEventId(data.initiatedEventId)
1✔
2011
            .setWorkflowExecution(
1✔
2012
                WorkflowExecution.newBuilder()
1✔
2013
                    .setRunId(runId)
1✔
2014
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2015
                    .build())
1✔
2016
            .setNamespace(initiatedEvent.getNamespace());
1✔
2017
    HistoryEvent event =
2018
        HistoryEvent.newBuilder()
1✔
2019
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2020
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2021
            .build();
1✔
2022
    ctx.addEvent(event);
1✔
2023
  }
1✔
2024

2025
  private static void failExternalCancellation(
2026
      RequestContext ctx,
2027
      CancelExternalData data,
2028
      CancelExternalWorkflowExecutionFailedCause cause,
2029
      long notUsed) {
2030
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2031
        data.initiatedEvent;
2032
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2033
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2034
            .setInitiatedEventId(data.initiatedEventId)
×
2035
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2036
            .setControl(initiatedEvent.getControl())
×
2037
            .setCause(cause)
×
2038
            .setNamespace(initiatedEvent.getNamespace());
×
2039
    HistoryEvent event =
2040
        HistoryEvent.newBuilder()
×
2041
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2042
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2043
            .build();
×
2044
    ctx.addEvent(event);
×
2045
  }
×
2046

2047
  // Mimics the default activity retry policy of a standard Temporal server.
2048
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2049
    Duration initialInterval =
2050
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2051
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2052
            : originalPolicy.getInitialInterval();
1✔
2053

2054
    return RetryPolicy.newBuilder()
1✔
2055
        .setInitialInterval(initialInterval)
1✔
2056
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2057
        .setMaximumInterval(
1✔
2058
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2059
                ? Durations.fromMillis(
1✔
2060
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2061
                        * Durations.toMillis(initialInterval))
1✔
2062
                : originalPolicy.getMaximumInterval())
1✔
2063
        .setBackoffCoefficient(
1✔
2064
            originalPolicy.getBackoffCoefficient() == 0
1✔
2065
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2066
                : originalPolicy.getBackoffCoefficient())
1✔
2067
        .setMaximumAttempts(
1✔
2068
            originalPolicy.getMaximumAttempts() == 0
1✔
2069
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2070
                : originalPolicy.getMaximumAttempts())
1✔
2071
        .build();
1✔
2072
  }
2073
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc