• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #171

pending completion
#171

push

github-actions

web-flow
Update README.md (#1765)

Fix typo in spring-boot-autoconfigure readme

17817 of 21804 relevant lines covered (81.71%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.07
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
37
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
38
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
39
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
40
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
41
import static io.temporal.internal.testservice.StateMachines.State.NONE;
42
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
43
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
44
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
45

46
import com.google.common.base.Preconditions;
47
import com.google.protobuf.Any;
48
import com.google.protobuf.Duration;
49
import com.google.protobuf.InvalidProtocolBufferException;
50
import com.google.protobuf.Timestamp;
51
import com.google.protobuf.util.Durations;
52
import com.google.protobuf.util.Timestamps;
53
import io.grpc.Status;
54
import io.grpc.StatusRuntimeException;
55
import io.temporal.api.command.v1.CancelTimerCommandAttributes;
56
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
57
import io.temporal.api.command.v1.CompleteWorkflowExecutionCommandAttributes;
58
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
59
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
60
import io.temporal.api.command.v1.RequestCancelActivityTaskCommandAttributes;
61
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
62
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
63
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
64
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
65
import io.temporal.api.command.v1.StartTimerCommandAttributes;
66
import io.temporal.api.common.v1.Payloads;
67
import io.temporal.api.common.v1.RetryPolicy;
68
import io.temporal.api.common.v1.WorkflowExecution;
69
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
70
import io.temporal.api.enums.v1.EventType;
71
import io.temporal.api.enums.v1.RetryState;
72
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
73
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
74
import io.temporal.api.enums.v1.TimeoutType;
75
import io.temporal.api.errordetails.v1.QueryFailedFailure;
76
import io.temporal.api.failure.v1.ApplicationFailureInfo;
77
import io.temporal.api.failure.v1.Failure;
78
import io.temporal.api.failure.v1.TimeoutFailureInfo;
79
import io.temporal.api.history.v1.*;
80
import io.temporal.api.protocol.v1.Message;
81
import io.temporal.api.query.v1.WorkflowQueryResult;
82
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
83
import io.temporal.api.taskqueue.v1.TaskQueue;
84
import io.temporal.api.update.v1.*;
85
import io.temporal.api.workflowservice.v1.*;
86
import io.temporal.internal.common.ProtobufTimeUtils;
87
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
88
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
89
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
90
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
91
import io.temporal.serviceclient.StatusUtils;
92
import io.temporal.workflow.Functions;
93
import java.util.*;
94
import java.util.concurrent.CompletableFuture;
95
import java.util.concurrent.ForkJoinPool;
96
import javax.annotation.Nonnull;
97
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
99

100
class StateMachines {
×
101

102
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
103

104
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
105
      10L * 365 * 24 * 3600 * 1000;
106
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
107
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
108
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
109
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
110
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
111
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
112
  static final int NO_EVENT_ID = -1;
113

114
  enum State {
1✔
115
    NONE,
1✔
116
    INITIATED,
1✔
117
    STARTED,
1✔
118
    FAILED,
1✔
119
    TIMED_OUT,
1✔
120
    CANCELLATION_REQUESTED,
1✔
121
    CANCELED,
1✔
122
    COMPLETED,
1✔
123
    CONTINUED_AS_NEW,
1✔
124
    TERMINATED,
1✔
125
  }
126

127
  enum Action {
1✔
128
    INITIATE,
1✔
129
    START,
1✔
130
    FAIL,
1✔
131
    TIME_OUT,
1✔
132
    REQUEST_CANCELLATION,
1✔
133
    CANCEL,
1✔
134
    TERMINATE,
1✔
135
    UPDATE,
1✔
136
    COMPLETE,
1✔
137
    CONTINUE_AS_NEW,
1✔
138
    QUERY,
1✔
139
    UPDATE_WORKFLOW_EXECUTION,
1✔
140
  }
141

142
  static final class WorkflowData {
143
    Optional<TestServiceRetryState> retryState;
144
    Duration backoffStartInterval;
145
    String cronSchedule;
146
    Payloads lastCompletionResult;
147
    Optional<Failure> lastFailure;
148
    /**
149
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
150
     */
151
    final @Nonnull String firstExecutionRunId;
152
    /**
153
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
154
     */
155
    final @Nonnull String originalExecutionRunId;
156

157
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
158
    Optional<String> continuedExecutionRunId;
159

160
    Functions.Proc runTimerCancellationHandle;
161

162
    WorkflowData(
163
        Optional<TestServiceRetryState> retryState,
164
        Duration backoffStartInterval,
165
        String cronSchedule,
166
        Payloads lastCompletionResult,
167
        Optional<Failure> lastFailure,
168
        @Nonnull String firstExecutionRunId,
169
        @Nonnull String originalExecutionRunId,
170
        Optional<String> continuedExecutionRunId) {
1✔
171
      this.retryState = retryState;
1✔
172
      this.backoffStartInterval = backoffStartInterval;
1✔
173
      this.cronSchedule = cronSchedule;
1✔
174
      this.lastCompletionResult = lastCompletionResult;
1✔
175
      this.firstExecutionRunId =
1✔
176
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
177
      this.originalExecutionRunId =
1✔
178
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
179
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
180
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
181
    }
1✔
182

183
    @Override
184
    public String toString() {
185
      return "WorkflowData{"
×
186
          + "retryState="
187
          + retryState
188
          + ", backoffStartInterval="
189
          + backoffStartInterval
190
          + ", cronSchedule='"
191
          + cronSchedule
192
          + '\''
193
          + ", lastCompletionResult="
194
          + lastCompletionResult
195
          + ", firstExecutionRunId='"
196
          + firstExecutionRunId
197
          + '\''
198
          + ", originalExecutionRunId='"
199
          + originalExecutionRunId
200
          + '\''
201
          + ", continuedExecutionRunId="
202
          + continuedExecutionRunId
203
          + '}';
204
    }
205
  }
206

207
  static final class WorkflowTaskData {
208

209
    final TestWorkflowStore store;
210

211
    boolean workflowCompleted;
212

213
    /** id of the last started event which completed successfully */
214
    long lastSuccessfulStartedEventId;
215

216
    final StartWorkflowExecutionRequest startRequest;
217

218
    long startedEventId = NO_EVENT_ID;
1✔
219

220
    PollWorkflowTaskQueueResponse.Builder workflowTask;
221

222
    /**
223
     * Events that are added during execution of a workflow task. They have to be buffered to be
224
     * added after the events generated by a workflow task. Without this the determinism will be
225
     * broken on replay.
226
     */
227
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
228

229
    /**
230
     * Update requests that are added during execution of a workflow task. They have to be buffered
231
     * to be added to the next workflow task.
232
     */
233
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
234

235
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
236

237
    long scheduledEventId = NO_EVENT_ID;
1✔
238

239
    int attempt = 0;
1✔
240

241
    /** Query requests received during workflow task processing (after start) */
242
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
243

244
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
245
        new HashMap<>();
246

247
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
248
      this.store = store;
1✔
249
      this.startRequest = startRequest;
1✔
250
    }
1✔
251

252
    void clear() {
253
      startedEventId = NO_EVENT_ID;
1✔
254
      workflowTask = null;
1✔
255
      scheduledEventId = NO_EVENT_ID;
1✔
256
      attempt = 0;
1✔
257
    }
1✔
258

259
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
260
      return Optional.ofNullable(
1✔
261
          updateRequest.getOrDefault(
1✔
262
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
263
    }
264

265
    @Override
266
    public String toString() {
267
      return "WorkflowTaskData{"
×
268
          + "store="
269
          + store
270
          + ", workflowCompleted="
271
          + workflowCompleted
272
          + ", lastSuccessfulStartedEventId="
273
          + lastSuccessfulStartedEventId
274
          + ", startRequest="
275
          + startRequest
276
          + ", startedEventId="
277
          + startedEventId
278
          + ", workflowTask="
279
          + workflowTask
280
          + ", bufferedEvents="
281
          + bufferedEvents
282
          + ", scheduledEventId="
283
          + scheduledEventId
284
          + ", attempt="
285
          + attempt
286
          + ", queryBuffer="
287
          + queryBuffer
288
          + ", consistentQueryRequests="
289
          + consistentQueryRequests
290
          + ", updateRequest="
291
          + updateRequest
292
          + ", updateRequestBuffer="
293
          + updateRequestBuffer
294
          + '}';
295
    }
296
  }
297

298
  static final class ActivityTaskData {
299

300
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
301
    ActivityTaskScheduledEventAttributes scheduledEvent;
302
    ActivityTask activityTask;
303

304
    final TestWorkflowStore store;
305

306
    long scheduledEventId = NO_EVENT_ID;
1✔
307
    long startedEventId = NO_EVENT_ID;
1✔
308
    public HistoryEvent startedEvent;
309
    Payloads heartbeatDetails;
310
    long lastHeartbeatTime;
311
    TestServiceRetryState retryState;
312
    Duration nextBackoffInterval;
313
    String identity;
314

315
    ActivityTaskData(
316
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
317
      this.store = store;
1✔
318
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
319
    }
1✔
320

321
    @Override
322
    public String toString() {
323
      return "ActivityTaskData{"
×
324
          + "startWorkflowExecutionRequest="
325
          + startWorkflowExecutionRequest
326
          + ", scheduledEvent="
327
          + scheduledEvent
328
          + ", activityTask="
329
          + activityTask
330
          + ", store="
331
          + store
332
          + ", scheduledEventId="
333
          + scheduledEventId
334
          + ", startedEventId="
335
          + startedEventId
336
          + ", startedEvent="
337
          + startedEvent
338
          + ", heartbeatDetails="
339
          + heartbeatDetails
340
          + ", lastHeartbeatTime="
341
          + lastHeartbeatTime
342
          + ", retryState="
343
          + retryState
344
          + ", nextBackoffInterval="
345
          + nextBackoffInterval
346
          + '}';
347
    }
348

349
    public int getAttempt() {
350
      return retryState != null ? retryState.getAttempt() : 1;
1✔
351
    }
352
  }
353

354
  static final class SignalExternalData {
1✔
355
    long initiatedEventId = NO_EVENT_ID;
1✔
356
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
357

358
    @Override
359
    public String toString() {
360
      return "SignalExternalData{"
×
361
          + "initiatedEventId="
362
          + initiatedEventId
363
          + ", initiatedEvent="
364
          + initiatedEvent
365
          + '}';
366
    }
367
  }
368

369
  static final class CancelExternalData {
1✔
370
    long initiatedEventId = NO_EVENT_ID;
1✔
371
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
372

373
    @Override
374
    public String toString() {
375
      return "CancelExternalData{"
×
376
          + "initiatedEventId="
377
          + initiatedEventId
378
          + ", initiatedEvent="
379
          + initiatedEvent
380
          + '}';
381
    }
382
  }
383

384
  static final class ChildWorkflowData {
385

386
    final TestWorkflowService service;
387
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
388
    long initiatedEventId;
389
    long startedEventId;
390
    WorkflowExecution execution;
391

392
    public ChildWorkflowData(TestWorkflowService service) {
1✔
393
      this.service = service;
1✔
394
    }
1✔
395

396
    @Override
397
    public String toString() {
398
      return "ChildWorkflowData{"
×
399
          + "service="
400
          + service
401
          + ", initiatedEvent="
402
          + initiatedEvent
403
          + ", initiatedEventId="
404
          + initiatedEventId
405
          + ", startedEventId="
406
          + startedEventId
407
          + ", execution="
408
          + execution
409
          + '}';
410
    }
411
  }
412

413
  static final class TimerData {
1✔
414
    TimerStartedEventAttributes startedEvent;
415
    public long startedEventId;
416

417
    @Override
418
    public String toString() {
419
      return "TimerData{"
×
420
          + "startedEvent="
421
          + startedEvent
422
          + ", startedEventId="
423
          + startedEventId
424
          + '}';
425
    }
426
  }
427

428
  /** Represents an accepted update workflow execution request */
429
  static final class UpdateWorkflowExecutionData {
430
    final String id;
431
    final CompletableFuture<UpdateWorkflowExecutionResponse> acceptance;
432
    final CompletableFuture<UpdateWorkflowExecutionResponse> complete;
433

434
    public UpdateWorkflowExecutionData(
435
        String id,
436
        CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
437
        CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
1✔
438
      this.id = id;
1✔
439
      this.acceptance = acceptance;
1✔
440
      this.complete = complete;
1✔
441
    }
1✔
442

443
    @Override
444
    public String toString() {
445
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
446
    }
447
  }
448

449
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
450
    return new StateMachine<>(data)
1✔
451
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
452
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
453
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
454
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
455
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
456
        .add(
1✔
457
            STARTED,
458
            REQUEST_CANCELLATION,
459
            CANCELLATION_REQUESTED,
460
            StateMachines::requestWorkflowCancellation)
461
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
462
        .add(
1✔
463
            CANCELLATION_REQUESTED,
464
            REQUEST_CANCELLATION,
465
            CANCELLATION_REQUESTED,
466
            StateMachines::noop)
467
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
468
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
469
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
470
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
471
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
472
  }
473

474
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
475
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
476
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
477
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
478
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
479
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
480
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
481
        // StateMachines::queryWhileScheduled)
482
        //        .add(
483
        //            INITIATED_QUERY_ONLY,
484
        //            INITIATE,
485
        //            INITIATED,
486
        //            StateMachines::convertQueryWorkflowTaskToReal)
487
        //        .add(
488
        //            INITIATED_QUERY_ONLY,
489
        //            START,
490
        //            STARTED_QUERY_ONLY,
491
        //            StateMachines::startQueryOnlyWorkflowTask)
492
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
493
        // StateMachines::needsWorkflowTask)
494
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
495
        // StateMachines::needsWorkflowTaskDueToQuery)
496
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
497
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
498
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
499
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
500
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
501
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
502
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
503
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
504
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
505
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
506
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
507
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
508
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
509
  }
510

511
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
512
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
513
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
514
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
515
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
516
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
517
        .add(
1✔
518
            INITIATED,
519
            REQUEST_CANCELLATION,
520
            CANCELLATION_REQUESTED,
521
            StateMachines::requestActivityCancellation)
522
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
523
        // Transitions to initiated in case of a retry
524
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
525
        // Transitions to initiated in case of a retry
526
        .add(
1✔
527
            STARTED,
528
            TIME_OUT,
529
            new State[] {TIMED_OUT, INITIATED},
530
            StateMachines::timeoutActivityTask)
531
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
532
        .add(
1✔
533
            STARTED,
534
            REQUEST_CANCELLATION,
535
            CANCELLATION_REQUESTED,
536
            StateMachines::requestActivityCancellation)
537
        .add(
1✔
538
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
539
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
540
        .add(
1✔
541
            CANCELLATION_REQUESTED,
542
            UPDATE,
543
            CANCELLATION_REQUESTED,
544
            StateMachines::heartbeatActivityTask)
545
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
546
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
547
  }
548

549
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
550
      TestWorkflowService service) {
551
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
552
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
553
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
554
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
555
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
556
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
557
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
558
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
559
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
560
  }
561

562
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
563
      String updateId,
564
      CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
565
      CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
566
    return new StateMachine<>(new UpdateWorkflowExecutionData(updateId, acceptance, complete))
1✔
567
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
568
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
569
  }
570

571
  public static StateMachine<TimerData> newTimerStateMachine() {
572
    return new StateMachine<>(new TimerData())
1✔
573
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
574
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
575
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
576
  }
577

578
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
579
    return new StateMachine<>(new SignalExternalData())
1✔
580
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
581
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
582
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
583
  }
584

585
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
586
    return new StateMachine<>(new CancelExternalData())
1✔
587
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
588
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
589
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
590
  }
591

592
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
593

594
  private static void timeoutChildWorkflow(
595
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
596
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
597
    ChildWorkflowExecutionTimedOutEventAttributes a =
598
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
599
            .setNamespace(ie.getNamespace())
1✔
600
            .setStartedEventId(data.startedEventId)
1✔
601
            .setWorkflowExecution(data.execution)
1✔
602
            .setWorkflowType(ie.getWorkflowType())
1✔
603
            .setRetryState(retryState)
1✔
604
            .setInitiatedEventId(data.initiatedEventId)
1✔
605
            .build();
1✔
606
    HistoryEvent event =
607
        HistoryEvent.newBuilder()
1✔
608
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
609
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
610
            .build();
1✔
611
    ctx.addEvent(event);
1✔
612
  }
1✔
613

614
  private static void startChildWorkflowFailed(
615
      RequestContext ctx,
616
      ChildWorkflowData data,
617
      StartChildWorkflowExecutionFailedEventAttributes a,
618
      long notUsed) {
619
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
620
        a.toBuilder()
1✔
621
            .setInitiatedEventId(data.initiatedEventId)
1✔
622
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
623
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
624
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
625
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
626
    }
627
    HistoryEvent event =
628
        HistoryEvent.newBuilder()
1✔
629
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
630
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
631
            .build();
1✔
632
    ctx.addEvent(event);
1✔
633
  }
1✔
634

635
  private static void childWorkflowStarted(
636
      RequestContext ctx,
637
      ChildWorkflowData data,
638
      ChildWorkflowExecutionStartedEventAttributes a,
639
      long notUsed) {
640
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
641
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
642
    HistoryEvent event =
643
        HistoryEvent.newBuilder()
1✔
644
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
645
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
646
            .build();
1✔
647
    long startedEventId = ctx.addEvent(event);
1✔
648
    ctx.onCommit(
1✔
649
        (historySize) -> {
650
          data.startedEventId = startedEventId;
1✔
651
          data.execution = updatedAttr.getWorkflowExecution();
1✔
652
        });
1✔
653
  }
1✔
654

655
  private static void childWorkflowCompleted(
656
      RequestContext ctx,
657
      ChildWorkflowData data,
658
      ChildWorkflowExecutionCompletedEventAttributes a,
659
      long notUsed) {
660
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
661
        a.toBuilder()
1✔
662
            .setInitiatedEventId(data.initiatedEventId)
1✔
663
            .setStartedEventId(data.startedEventId)
1✔
664
            .build();
1✔
665
    HistoryEvent event =
666
        HistoryEvent.newBuilder()
1✔
667
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
668
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
669
            .build();
1✔
670
    ctx.addEvent(event);
1✔
671
  }
1✔
672

673
  private static void childWorkflowFailed(
674
      RequestContext ctx,
675
      ChildWorkflowData data,
676
      ChildWorkflowExecutionFailedEventAttributes a,
677
      long notUsed) {
678
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
679
        a.toBuilder()
1✔
680
            .setInitiatedEventId(data.initiatedEventId)
1✔
681
            .setStartedEventId(data.startedEventId)
1✔
682
            .setWorkflowExecution(data.execution)
1✔
683
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
684
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
685
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
686
    }
687
    HistoryEvent event =
688
        HistoryEvent.newBuilder()
1✔
689
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
690
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
691
            .build();
1✔
692
    ctx.addEvent(event);
1✔
693
  }
1✔
694

695
  private static void childWorkflowCanceled(
696
      RequestContext ctx,
697
      ChildWorkflowData data,
698
      ChildWorkflowExecutionCanceledEventAttributes a,
699
      long notUsed) {
700
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
701
        a.toBuilder()
1✔
702
            .setInitiatedEventId(data.initiatedEventId)
1✔
703
            .setStartedEventId(data.startedEventId)
1✔
704
            .build();
1✔
705
    HistoryEvent event =
706
        HistoryEvent.newBuilder()
1✔
707
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
708
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
709
            .build();
1✔
710
    ctx.addEvent(event);
1✔
711
  }
1✔
712

713
  private static void initiateChildWorkflow(
714
      RequestContext ctx,
715
      ChildWorkflowData data,
716
      StartChildWorkflowExecutionCommandAttributes d,
717
      long workflowTaskCompletedEventId) {
718
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
719
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
720
            .setControl(d.getControl())
1✔
721
            .setInput(d.getInput())
1✔
722
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
723
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
724
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
725
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
726
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
727
            .setTaskQueue(d.getTaskQueue())
1✔
728
            .setWorkflowId(d.getWorkflowId())
1✔
729
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
730
            .setWorkflowType(d.getWorkflowType())
1✔
731
            .setCronSchedule(d.getCronSchedule())
1✔
732
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
733
    if (d.hasHeader()) {
1✔
734
      a.setHeader(d.getHeader());
1✔
735
    }
736
    if (d.hasMemo()) {
1✔
737
      a.setMemo(d.getMemo());
1✔
738
    }
739
    if (d.hasRetryPolicy()) {
1✔
740
      a.setRetryPolicy(d.getRetryPolicy());
1✔
741
    }
742
    HistoryEvent event =
743
        HistoryEvent.newBuilder()
1✔
744
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
745
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
746
            .build();
1✔
747
    long initiatedEventId = ctx.addEvent(event);
1✔
748
    ctx.onCommit(
1✔
749
        (historySize) -> {
750
          data.initiatedEventId = initiatedEventId;
1✔
751
          data.initiatedEvent = a.build();
1✔
752
          StartWorkflowExecutionRequest.Builder startChild =
753
              StartWorkflowExecutionRequest.newBuilder()
1✔
754
                  .setRequestId(UUID.randomUUID().toString())
1✔
755
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
756
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
757
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
758
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
759
                  .setTaskQueue(d.getTaskQueue())
1✔
760
                  .setWorkflowId(d.getWorkflowId())
1✔
761
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
762
                  .setWorkflowType(d.getWorkflowType())
1✔
763
                  .setCronSchedule(d.getCronSchedule());
1✔
764
          if (d.hasHeader()) {
1✔
765
            startChild.setHeader(d.getHeader());
1✔
766
          }
767
          if (d.hasSearchAttributes()) {
1✔
768
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
769
          }
770
          if (d.hasMemo()) {
1✔
771
            startChild.setMemo(d.getMemo());
1✔
772
          }
773
          if (d.hasRetryPolicy()) {
1✔
774
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
775
          }
776
          if (d.hasInput()) {
1✔
777
            startChild.setInput(d.getInput());
1✔
778
          }
779
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
780
        });
1✔
781
  }
1✔
782

783
  private static void addStartChildTask(
784
      RequestContext ctx,
785
      ChildWorkflowData data,
786
      long initiatedEventId,
787
      StartWorkflowExecutionRequest startChild) {
788
    ForkJoinPool.commonPool()
1✔
789
        .execute(
1✔
790
            () -> {
791
              try {
792
                data.service.startWorkflowExecutionImpl(
1✔
793
                    startChild,
794
                    java.time.Duration.ZERO,
795
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
796
                    OptionalLong.of(data.initiatedEventId),
1✔
797
                    null);
798
              } catch (StatusRuntimeException e) {
1✔
799
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
800
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
801
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
802
                          .setInitiatedEventId(initiatedEventId)
1✔
803
                          .setCause(
1✔
804
                              StartChildWorkflowExecutionFailedCause
805
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
806
                          .build();
1✔
807
                  try {
808
                    ctx.getWorkflowMutableState()
1✔
809
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
810
                  } catch (Throwable ee) {
×
811
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
812
                  }
1✔
813
                } else {
1✔
814
                  log.error("Unexpected failure starting a child workflow", e);
×
815
                }
816
              } catch (Exception e) {
×
817
                log.error("Unexpected failure starting a child workflow", e);
×
818
              }
1✔
819
            });
1✔
820
  }
1✔
821

822
  private static void startWorkflow(
823
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
824
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
825
      throw Status.INVALID_ARGUMENT
×
826
          .withDescription("negative workflowExecution timeout")
×
827
          .asRuntimeException();
×
828
    }
829
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
830
      throw Status.INVALID_ARGUMENT
×
831
          .withDescription("negative workflowRun timeout")
×
832
          .asRuntimeException();
×
833
    }
834
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
835
      throw Status.INVALID_ARGUMENT
×
836
          .withDescription("negative workflowTaskTimeoutSeconds")
×
837
          .asRuntimeException();
×
838
    }
839

840
    WorkflowExecutionStartedEventAttributes.Builder a =
841
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
842
            .setWorkflowType(request.getWorkflowType())
1✔
843
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
844
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
845
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
846
            .setIdentity(request.getIdentity())
1✔
847
            .setInput(request.getInput())
1✔
848
            .setTaskQueue(request.getTaskQueue())
1✔
849
            .setAttempt(1);
1✔
850
    data.retryState.ifPresent(
1✔
851
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
852
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
853
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
854
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
855
    if (data.lastCompletionResult != null) {
1✔
856
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
857
    }
858
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
859
    if (request.hasMemo()) {
1✔
860
      a.setMemo(request.getMemo());
1✔
861
    }
862
    if (request.hasSearchAttributes()) {
1✔
863
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
864
    }
865
    if (request.hasHeader()) {
1✔
866
      a.setHeader(request.getHeader());
1✔
867
    }
868
    String cronSchedule = request.getCronSchedule();
1✔
869
    if (!cronSchedule.trim().isEmpty()) {
1✔
870
      try {
871
        CronUtils.parseCron(cronSchedule);
1✔
872
        a.setCronSchedule(cronSchedule);
1✔
873
      } catch (Exception e) {
×
874
        throw Status.INVALID_ARGUMENT
×
875
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
876
            .withCause(e)
×
877
            .asRuntimeException();
×
878
      }
1✔
879
    }
880
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
881
    if (parent.isPresent()) {
1✔
882
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
883
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
884
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
885
    }
886
    HistoryEvent event =
887
        HistoryEvent.newBuilder()
1✔
888
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
889
            .setWorkflowExecutionStartedEventAttributes(a)
1✔
890
            .build();
1✔
891
    ctx.addEvent(event);
1✔
892
  }
1✔
893

894
  private static void completeWorkflow(
895
      RequestContext ctx,
896
      WorkflowData data,
897
      CompleteWorkflowExecutionCommandAttributes d,
898
      long workflowTaskCompletedEventId) {
899
    WorkflowExecutionCompletedEventAttributes.Builder a =
900
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
901
            .setResult(d.getResult())
1✔
902
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
903
    HistoryEvent event =
904
        HistoryEvent.newBuilder()
1✔
905
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
906
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
907
            .build();
1✔
908
    ctx.addEvent(event);
1✔
909
  }
1✔
910

911
  private static void continueAsNewWorkflow(
912
      RequestContext ctx,
913
      WorkflowData data,
914
      ContinueAsNewWorkflowExecutionCommandAttributes d,
915
      long workflowTaskCompletedEventId) {
916
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
917
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
918
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
919
    a.setInput(d.getInput());
1✔
920
    if (d.hasHeader()) {
1✔
921
      a.setHeader(d.getHeader());
1✔
922
    }
923
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
924
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
925
    } else {
926
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
927
    }
928
    if (d.hasTaskQueue()) {
1✔
929
      a.setTaskQueue(d.getTaskQueue());
1✔
930
    } else {
931
      a.setTaskQueue(sr.getTaskQueue());
1✔
932
    }
933
    if (d.hasWorkflowType()) {
1✔
934
      a.setWorkflowType(d.getWorkflowType());
1✔
935
    } else {
936
      a.setWorkflowType(sr.getWorkflowType());
1✔
937
    }
938
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
939
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
940
    } else {
941
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
942
    }
943
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
944
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
945
    if (d.hasLastCompletionResult()) {
1✔
946
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
947
    }
948
    if (d.hasFailure()) {
1✔
949
      a.setFailure(d.getFailure());
1✔
950
    }
951
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
952
    HistoryEvent event =
953
        HistoryEvent.newBuilder()
1✔
954
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
955
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
956
            .build();
1✔
957
    ctx.addEvent(event);
1✔
958
  }
1✔
959

960
  private static void failWorkflow(
961
      RequestContext ctx,
962
      WorkflowData data,
963
      FailWorkflowExecutionCommandAttributes d,
964
      long workflowTaskCompletedEventId) {
965
    WorkflowExecutionFailedEventAttributes.Builder a =
966
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
967
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
968
    if (d.hasFailure()) {
1✔
969
      a.setFailure(d.getFailure());
1✔
970
    }
971
    HistoryEvent event =
972
        HistoryEvent.newBuilder()
1✔
973
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
974
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
975
            .build();
1✔
976
    ctx.addEvent(event);
1✔
977
  }
1✔
978

979
  private static void timeoutWorkflow(
980
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
981
    WorkflowExecutionTimedOutEventAttributes.Builder a =
982
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
983
    HistoryEvent event =
984
        HistoryEvent.newBuilder()
1✔
985
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
986
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
987
            .build();
1✔
988
    ctx.addEvent(event);
1✔
989
  }
1✔
990

991
  private static void cancelWorkflow(
992
      RequestContext ctx,
993
      WorkflowData data,
994
      CancelWorkflowExecutionCommandAttributes d,
995
      long workflowTaskCompletedEventId) {
996
    WorkflowExecutionCanceledEventAttributes.Builder a =
997
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
998
            .setDetails(d.getDetails())
1✔
999
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1000
    HistoryEvent event =
1001
        HistoryEvent.newBuilder()
1✔
1002
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1003
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1004
            .build();
1✔
1005
    ctx.addEvent(event);
1✔
1006
  }
1✔
1007

1008
  private static void terminateWorkflow(
1009
      RequestContext ctx,
1010
      WorkflowData data,
1011
      TerminateWorkflowExecutionRequest d,
1012
      long workflowTaskCompletedEventId) {
1013
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1014
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1015
            .setDetails(d.getDetails())
1✔
1016
            .setIdentity(d.getIdentity())
1✔
1017
            .setReason(d.getReason());
1✔
1018
    HistoryEvent event =
1019
        HistoryEvent.newBuilder()
1✔
1020
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1021
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1022
            .build();
1✔
1023
    ctx.addEvent(event);
1✔
1024
  }
1✔
1025

1026
  private static void requestWorkflowCancellation(
1027
      RequestContext ctx,
1028
      WorkflowData data,
1029
      RequestCancelWorkflowExecutionRequest cancelRequest,
1030
      long notUsed) {
1031
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1032
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1033
            .setIdentity(cancelRequest.getIdentity());
1✔
1034
    HistoryEvent cancelRequested =
1035
        HistoryEvent.newBuilder()
1✔
1036
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1037
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1038
            .build();
1✔
1039
    ctx.addEvent(cancelRequested);
1✔
1040
  }
1✔
1041

1042
  private static void scheduleActivityTask(
1043
      RequestContext ctx,
1044
      ActivityTaskData data,
1045
      ScheduleActivityTaskCommandAttributes d,
1046
      long workflowTaskCompletedEventId) {
1047
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1048
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1049
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1050
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1051

1052
    ActivityTaskScheduledEventAttributes.Builder a =
1053
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1054
            .setInput(d.getInput())
1✔
1055
            .setActivityId(d.getActivityId())
1✔
1056
            .setActivityType(d.getActivityType())
1✔
1057
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1058
            .setRetryPolicy(retryPolicy)
1✔
1059
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1060
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1061
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1062
            .setTaskQueue(d.getTaskQueue())
1✔
1063
            .setHeader(d.getHeader())
1✔
1064
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1065

1066
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1067
    data.scheduledEvent = a.build();
1✔
1068
    HistoryEvent event =
1069
        HistoryEvent.newBuilder()
1✔
1070
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1071
            .setActivityTaskScheduledEventAttributes(a)
1✔
1072
            .build();
1✔
1073
    long scheduledEventId = ctx.addEvent(event);
1✔
1074

1075
    PollActivityTaskQueueResponse.Builder taskResponse =
1076
        PollActivityTaskQueueResponse.newBuilder()
1✔
1077
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1078
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1079
            .setActivityType(d.getActivityType())
1✔
1080
            .setWorkflowExecution(ctx.getExecution())
1✔
1081
            .setActivityId(d.getActivityId())
1✔
1082
            .setInput(d.getInput())
1✔
1083
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1084
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1085
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1086
            .setScheduledTime(ctx.currentTime())
1✔
1087
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1088
            .setHeader(d.getHeader())
1✔
1089
            .setAttempt(1);
1✔
1090

1091
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1092
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1093
    ctx.addActivityTask(activityTask);
1✔
1094
    ctx.onCommit(
1✔
1095
        (historySize) -> {
1096
          data.scheduledEventId = scheduledEventId;
1✔
1097
          data.activityTask = activityTask;
1✔
1098
          data.retryState = retryState;
1✔
1099
        });
1✔
1100
  }
1✔
1101

1102
  private static void requestActivityCancellation(
1103
      RequestContext ctx,
1104
      ActivityTaskData data,
1105
      RequestCancelActivityTaskCommandAttributes d,
1106
      long workflowTaskCompletedEventId) {
1107
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1108
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1109
            .setScheduledEventId(d.getScheduledEventId())
1✔
1110
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1111
    HistoryEvent event =
1112
        HistoryEvent.newBuilder()
1✔
1113
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1114
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1115
            .build();
1✔
1116
    ctx.addEvent(event);
1✔
1117
  }
1✔
1118

1119
  private static void scheduleWorkflowTask(
1120
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1121
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1122
    long scheduledEventId;
1123
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1124
    WorkflowTaskScheduledEventAttributes a =
1125
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1126
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1127
            .setTaskQueue(taskQueue)
1✔
1128
            .setAttempt(++data.attempt)
1✔
1129
            .build();
1✔
1130
    HistoryEvent event =
1131
        HistoryEvent.newBuilder()
1✔
1132
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1133
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1134
            .build();
1✔
1135
    scheduledEventId = ctx.addEvent(event);
1✔
1136
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1137
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1138
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1139
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1140
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1141
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1142
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1143
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1144
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1145
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1146
    ctx.onCommit(
1✔
1147
        (historySize) -> {
1148
          data.scheduledEventId = scheduledEventId;
1✔
1149
          data.workflowTask = workflowTaskResponse;
1✔
1150
          // Move buffered update request to new workflow task
1151
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1152
          data.updateRequestBuffer.clear();
1✔
1153
        });
1✔
1154
  }
1✔
1155

1156
  private static void convertQueryWorkflowTaskToReal(
1157
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1158
    StartWorkflowExecutionRequest request = data.startRequest;
×
1159
    WorkflowTaskScheduledEventAttributes a =
1160
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1161
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1162
            .setTaskQueue(request.getTaskQueue())
×
1163
            .setAttempt(data.attempt)
×
1164
            .build();
×
1165
    HistoryEvent event =
1166
        HistoryEvent.newBuilder()
×
1167
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1168
            .setWorkflowTaskScheduledEventAttributes(a)
×
1169
            .build();
×
1170
    long scheduledEventId = ctx.addEvent(event);
×
1171
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1172
  }
×
1173

1174
  private static void scheduleQueryWorkflowTask(
1175
      RequestContext ctx,
1176
      WorkflowTaskData data,
1177
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1178
      long notUsed) {
1179
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1180
    StartWorkflowExecutionRequest request = data.startRequest;
×
1181
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1182
        PollWorkflowTaskQueueResponse.newBuilder();
×
1183
    StickyExecutionAttributes stickyAttributes =
×
1184
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1185
    String taskQueue =
1186
        stickyAttributes == null
×
1187
            ? request.getTaskQueue().getName()
×
1188
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1189
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1190
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1191
    workflowTaskResponse.setAttempt(++data.attempt);
×
1192
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1193
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1194
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1195
    ctx.onCommit(
×
1196
        (historySize) -> {
1197
          if (data.lastSuccessfulStartedEventId > 0) {
×
1198
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1199
          }
1200
          data.scheduledEventId = NO_EVENT_ID;
×
1201
          data.workflowTask = workflowTaskResponse;
×
1202
          if (query != null) {
×
1203
            data.consistentQueryRequests.put(query.getKey(), query);
×
1204
          }
1205
        });
×
1206
  }
×
1207

1208
  private static void queryWhileScheduled(
1209
      RequestContext ctx,
1210
      WorkflowTaskData data,
1211
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1212
      long notUsed) {
1213
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1214
  }
1✔
1215

1216
  private static void bufferQuery(
1217
      RequestContext ctx,
1218
      WorkflowTaskData data,
1219
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1220
      long notUsed) {
1221
    data.queryBuffer.put(query.getKey(), query);
1✔
1222
  }
1✔
1223

1224
  private static void bufferUpdate(
1225
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1226
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1227
      throw Status.INTERNAL
×
1228
          .withDescription("Update ID already exists: " + update.getId())
×
1229
          .asRuntimeException();
×
1230
    }
1231
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1232
  }
1✔
1233

1234
  private static void addUpdate(
1235
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1236
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1237
      throw Status.INTERNAL
×
1238
          .withDescription("Update ID already exists: " + update.getId())
×
1239
          .asRuntimeException();
×
1240
    }
1241
    data.updateRequest.put(update.getId(), update);
1✔
1242
  }
1✔
1243

1244
  private static void startWorkflowTask(
1245
      RequestContext ctx,
1246
      WorkflowTaskData data,
1247
      PollWorkflowTaskQueueRequest request,
1248
      long notUsed) {
1249
    WorkflowTaskStartedEventAttributes a =
1250
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1251
            .setIdentity(request.getIdentity())
1✔
1252
            .setScheduledEventId(data.scheduledEventId)
1✔
1253
            .build();
1✔
1254
    HistoryEvent event =
1255
        HistoryEvent.newBuilder()
1✔
1256
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1257
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1258
            .build();
1✔
1259
    long startedEventId = ctx.addEvent(event);
1✔
1260
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1261
  }
1✔
1262

1263
  private static void startQueryOnlyWorkflowTask(
1264
      RequestContext ctx,
1265
      WorkflowTaskData data,
1266
      PollWorkflowTaskQueueRequest request,
1267
      long notUsed) {
1268
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1269
  }
×
1270

1271
  private static void startWorkflowTaskImpl(
1272
      RequestContext ctx,
1273
      WorkflowTaskData data,
1274
      PollWorkflowTaskQueueRequest request,
1275
      long startedEventId,
1276
      boolean queryOnly) {
1277
    ctx.onCommit(
1✔
1278
        (historySize) -> {
1279
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1280
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1281
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1282
          task.setTaskToken(taskToken.toBytes());
1✔
1283
          GetWorkflowExecutionHistoryRequest getRequest =
1284
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1285
                  .setNamespace(request.getNamespace())
1✔
1286
                  .setExecution(ctx.getExecution())
1✔
1287
                  .build();
1✔
1288
          List<HistoryEvent> events;
1289
          events =
1✔
1290
              data.store
1291
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1292
                  .getHistory()
1✔
1293
                  .getEventsList();
1✔
1294
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1295
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1296
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1297
          }
1298
          if (queryOnly && !data.workflowCompleted) {
1✔
1299
            events = new ArrayList<>(events); // convert list to mutable
×
1300
            // Add "fake" workflow task scheduled and started if workflow is not closed
1301
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1302
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1303
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1304
                    .setTaskQueue(request.getTaskQueue())
×
1305
                    .setAttempt(data.attempt)
×
1306
                    .build();
×
1307
            HistoryEvent scheduledEvent =
1308
                HistoryEvent.newBuilder()
×
1309
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1310
                    .setEventId(lastEventId + 1)
×
1311
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1312
                    .build();
×
1313
            events.add(scheduledEvent);
×
1314
            WorkflowTaskStartedEventAttributes startedAttributes =
1315
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1316
                    .setIdentity(request.getIdentity())
×
1317
                    .setScheduledEventId(lastEventId + 1)
×
1318
                    .build();
×
1319
            HistoryEvent startedEvent =
1320
                HistoryEvent.newBuilder()
×
1321
                    .setEventId(lastEventId + 1)
×
1322
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1323
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1324
                    .build();
×
1325
            events.add(startedEvent);
×
1326
            task.setStartedEventId(lastEventId + 2);
×
1327
          }
1328
          // get it from previous started event id.
1329
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1330
          // Transfer the queries
1331
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1332
              data.consistentQueryRequests;
1333
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1334
              queries.entrySet()) {
1✔
1335
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1336
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1337
          }
1✔
1338
          // Transfer the messages
1339
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1340
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1341
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1342
            Message updateMessage =
1343
                Message.newBuilder()
1✔
1344
                    .setId(update.getKey() + "/request")
1✔
1345
                    .setProtocolInstanceId(update.getKey())
1✔
1346
                    .setEventId(data.scheduledEventId)
1✔
1347
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1348
                    .build();
1✔
1349
            task.addMessages(updateMessage);
1✔
1350
          }
1✔
1351
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1352
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1353
          }
1354
          if (!queryOnly) {
1✔
1355
            data.startedEventId = startedEventId;
1✔
1356
          }
1357
        });
1✔
1358
  }
1✔
1359

1360
  private static void startActivityTask(
1361
      RequestContext ctx,
1362
      ActivityTaskData data,
1363
      PollActivityTaskQueueRequest request,
1364
      long notUsed) {
1365
    ActivityTaskStartedEventAttributes.Builder a =
1366
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1367
            .setIdentity(request.getIdentity())
1✔
1368
            .setScheduledEventId(data.scheduledEventId);
1✔
1369
    a.setAttempt(data.getAttempt());
1✔
1370
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1371
    // history. But in the case of retry it happens only after an activity completion.
1372
    Timestamp timestamp = data.store.currentTime();
1✔
1373
    HistoryEvent event =
1374
        HistoryEvent.newBuilder()
1✔
1375
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1376
            .setEventTime(timestamp)
1✔
1377
            .setActivityTaskStartedEventAttributes(a)
1✔
1378
            .build();
1✔
1379
    long startedEventId;
1380
    startedEventId = NO_EVENT_ID;
1✔
1381
    ctx.onCommit(
1✔
1382
        (historySize) -> {
1383
          data.startedEventId = startedEventId;
1✔
1384
          data.startedEvent = event;
1✔
1385
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1386
          task.setTaskToken(
1✔
1387
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1388
                  .toBytes());
1✔
1389
          task.setStartedTime(timestamp);
1✔
1390
        });
1✔
1391
  }
1✔
1392

1393
  private static void completeWorkflowTask(
1394
      RequestContext ctx,
1395
      WorkflowTaskData data,
1396
      RespondWorkflowTaskCompletedRequest request,
1397
      long notUsed) {
1398
    WorkflowTaskCompletedEventAttributes.Builder a =
1399
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1400
            .setIdentity(request.getIdentity())
1✔
1401
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1402
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1403
            .setScheduledEventId(data.scheduledEventId);
1✔
1404
    HistoryEvent event =
1405
        HistoryEvent.newBuilder()
1✔
1406
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1407
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1408
            .build();
1✔
1409
    ctx.addEvent(event);
1✔
1410
    ctx.onCommit(
1✔
1411
        (historySize) -> {
1412
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1413
          data.clear();
1✔
1414
        });
1✔
1415
  }
1✔
1416

1417
  private static void completeQuery(
1418
      RequestContext ctx,
1419
      WorkflowTaskData data,
1420
      RespondWorkflowTaskCompletedRequest request,
1421
      long notUsed) {
1422
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1423
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1424
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1425
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1426
      if (query != null) {
×
1427
        WorkflowQueryResult value = resultEntry.getValue();
×
1428
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1429
        switch (value.getResultType()) {
×
1430
          case QUERY_RESULT_TYPE_ANSWERED:
1431
            QueryWorkflowResponse response =
1432
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1433
            result.complete(response);
×
1434
            break;
×
1435
          case QUERY_RESULT_TYPE_FAILED:
1436
            result.completeExceptionally(
×
1437
                StatusUtils.newException(
×
1438
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1439
                    QueryFailedFailure.getDefaultInstance(),
×
1440
                    QueryFailedFailure.getDescriptor()));
×
1441
            break;
×
1442
          default:
1443
            throw Status.INVALID_ARGUMENT
×
1444
                .withDescription("Invalid query result type: " + value.getResultType())
×
1445
                .asRuntimeException();
×
1446
        }
1447
      }
1448
    }
×
1449
    ctx.onCommit(
×
1450
        (historySize) -> {
1451
          data.clear();
×
1452
          ctx.unlockTimer("completeQuery");
×
1453
        });
×
1454
  }
×
1455

1456
  private static void failQueryWorkflowTask(
1457
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1458
    data.consistentQueryRequests
×
1459
        .entrySet()
×
1460
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1461
    if (!data.consistentQueryRequests.isEmpty()) {
×
1462
      ctx.setNeedWorkflowTask(true);
×
1463
    }
1464
    ctx.unlockTimer("failQueryWorkflowTask");
×
1465
  }
×
1466

1467
  private static void failWorkflowTask(
1468
      RequestContext ctx,
1469
      WorkflowTaskData data,
1470
      RespondWorkflowTaskFailedRequest request,
1471
      long notUsed) {
1472
    WorkflowTaskFailedEventAttributes.Builder a =
1473
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1474
            .setIdentity(request.getIdentity())
1✔
1475
            .setStartedEventId(data.startedEventId)
1✔
1476
            .setScheduledEventId(data.scheduledEventId)
1✔
1477
            .setCause(request.getCause());
1✔
1478
    if (request.hasFailure()) {
1✔
1479
      a.setFailure(request.getFailure());
1✔
1480
    }
1481
    HistoryEvent event =
1482
        HistoryEvent.newBuilder()
1✔
1483
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1484
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1485
            .build();
1✔
1486
    ctx.addEvent(event);
1✔
1487
    ctx.setNeedWorkflowTask(true);
1✔
1488
  }
1✔
1489

1490
  private static void timeoutWorkflowTask(
1491
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1492
    WorkflowTaskTimedOutEventAttributes.Builder a =
1493
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1494
            .setStartedEventId(data.startedEventId)
1✔
1495
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1496
            .setScheduledEventId(data.scheduledEventId);
1✔
1497
    HistoryEvent event =
1498
        HistoryEvent.newBuilder()
1✔
1499
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1500
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1501
            .build();
1✔
1502
    ctx.addEvent(event);
1✔
1503
    ctx.setNeedWorkflowTask(true);
1✔
1504
  }
1✔
1505

1506
  private static void needsWorkflowTask(
1507
      RequestContext requestContext,
1508
      WorkflowTaskData workflowTaskData,
1509
      Object notUsedRequest,
1510
      long notUsed) {
1511
    requestContext.setNeedWorkflowTask(true);
1✔
1512
  }
1✔
1513

1514
  private static void completeActivityTask(
1515
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1516
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1517
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1518
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1519
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1520
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1521
    } else {
1522
      throw new IllegalArgumentException("Unknown request: " + request);
×
1523
    }
1524
  }
1✔
1525

1526
  private static void completeActivityTaskByTaskToken(
1527
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1528
    ActivityTaskCompletedEventAttributes.Builder a =
1529
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1530
            .setIdentity(request.getIdentity())
1✔
1531
            .setScheduledEventId(data.scheduledEventId)
1✔
1532
            .setResult(request.getResult())
1✔
1533
            .setStartedEventId(data.startedEventId);
1✔
1534
    HistoryEvent event =
1535
        HistoryEvent.newBuilder()
1✔
1536
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1537
            .setActivityTaskCompletedEventAttributes(a)
1✔
1538
            .build();
1✔
1539
    ctx.addEvent(event);
1✔
1540
  }
1✔
1541

1542
  private static void completeActivityTaskById(
1543
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1544
    ActivityTaskCompletedEventAttributes.Builder a =
1545
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1546
            .setIdentity(request.getIdentity())
×
1547
            .setScheduledEventId(data.scheduledEventId)
×
1548
            .setResult(request.getResult())
×
1549
            .setStartedEventId(data.startedEventId);
×
1550
    HistoryEvent event =
1551
        HistoryEvent.newBuilder()
×
1552
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1553
            .setActivityTaskCompletedEventAttributes(a)
×
1554
            .build();
×
1555
    ctx.addEvent(event);
×
1556
  }
×
1557

1558
  private static State failActivityTask(
1559
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1560
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1561
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1562
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1563
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1564
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1565
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1566
    } else {
1567
      throw new IllegalArgumentException("Unknown request: " + request);
×
1568
    }
1569
  }
1570

1571
  private static State failActivityTaskByRequestType(
1572
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1573
    if (!failure.hasApplicationFailureInfo()) {
1✔
1574
      throw new IllegalArgumentException(
×
1575
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1576
    }
1577
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1578
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1579
      return INITIATED;
1✔
1580
    }
1581
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1582
    ActivityTaskFailedEventAttributes.Builder attributes =
1583
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1584
            .setIdentity(identity)
1✔
1585
            .setScheduledEventId(data.scheduledEventId)
1✔
1586
            .setFailure(failure)
1✔
1587
            .setRetryState(retryState)
1✔
1588
            .setStartedEventId(data.startedEventId);
1✔
1589
    HistoryEvent event =
1590
        HistoryEvent.newBuilder()
1✔
1591
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1592
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1593
            .build();
1✔
1594
    ctx.addEvent(event);
1✔
1595
    return FAILED;
1✔
1596
  }
1597

1598
  private static State timeoutActivityTask(
1599
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1600
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1601

1602
    // chaining with the previous run failure if we are preparing the final failure
1603
    Failure failure =
1✔
1604
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1605

1606
    RetryState retryState;
1607
    switch (timeoutType) {
1✔
1608
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1609
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1610
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1611
        // set a larger ScheduleToStart timeout.
1612
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1613
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
1614
        break;
1✔
1615
      case TIMEOUT_TYPE_START_TO_CLOSE:
1616
      case TIMEOUT_TYPE_HEARTBEAT:
1617
        // not chaining with the previous run failure if we are preparing the failure to be stored
1618
        // for the next iteration
1619
        Optional<Failure> lastFailure =
1✔
1620
            Optional.of(
1✔
1621
                newTimeoutFailure(
1✔
1622
                    timeoutType,
1623
                    // we move heartbeatDetails to the new top level (this cause is used for
1624
                    // scheduleToClose only)
1625
                    Optional.empty(),
1✔
1626
                    // prune to don't have too deep nesting of failures
1627
                    Optional.empty()));
1✔
1628

1629
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
1630
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1631
          return INITIATED;
1✔
1632
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
1633
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
1634
          // attemptActivityRetry();
1635
          // start to close timeout would return as "max attempts reached".
1636

1637
          Preconditions.checkState(
1✔
1638
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
1639
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
1640
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
1641
              timeoutType);
1642

1643
          // heartbeat is preserved as the cause for the scheduleToClose timeout
1644
          // But we effectively omit startToClose timeout with scheduleToClose timeout
1645
          Optional<Failure> cause =
1646
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
1647

1648
          failure =
1✔
1649
              newTimeoutFailure(
1✔
1650
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1651
                  Optional.ofNullable(data.heartbeatDetails),
1✔
1652
                  cause);
1653
        }
1✔
1654
        break;
1655
      default:
1656
        throw new IllegalStateException(
×
1657
            "Not implemented behavior for timeout type: " + timeoutType);
1658
    }
1659

1660
    ActivityTaskTimedOutEventAttributes.Builder a =
1661
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
1662
            .setScheduledEventId(data.scheduledEventId)
1✔
1663
            .setRetryState(retryState)
1✔
1664
            .setStartedEventId(data.startedEventId)
1✔
1665
            .setFailure(failure);
1✔
1666
    HistoryEvent event =
1667
        HistoryEvent.newBuilder()
1✔
1668
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
1669
            .setActivityTaskTimedOutEventAttributes(a)
1✔
1670
            .build();
1✔
1671
    ctx.addEvent(event);
1✔
1672
    return TIMED_OUT;
1✔
1673
  }
1674

1675
  private static Failure newTimeoutFailure(
1676
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
1677
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
1678
    if (lastHeartbeatDetails.isPresent()) {
1✔
1679
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
1680
    }
1681
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
1682
    if (cause.isPresent()) {
1✔
1683
      result.setCause(cause.get());
1✔
1684
    }
1685
    return result.build();
1✔
1686
  }
1687

1688
  private static RetryState attemptActivityRetry(
1689
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
1690
    if (data.retryState == null) {
1✔
1691
      throw new IllegalStateException("RetryPolicy is always present");
×
1692
    }
1693
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
1694
    if (info.isPresent()) {
1✔
1695
      if (info.get().getNonRetryable()) {
1✔
1696
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
1697
      }
1698
    }
1699
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
1700
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
1701
        data.retryState.getBackoffIntervalInSeconds(
1✔
1702
            info.map(ApplicationFailureInfo::getType), data.store.currentTime());
1✔
1703
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1704
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
1705
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1706
      if (data.heartbeatDetails != null) {
1✔
1707
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
1708
      }
1709
      ctx.onCommit(
1✔
1710
          (historySize) -> {
1711
            data.retryState = nextAttempt;
1✔
1712
            task.setAttempt(nextAttempt.getAttempt());
1✔
1713
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
1714
          });
1✔
1715
    } else {
1✔
1716
      data.nextBackoffInterval = Durations.ZERO;
1✔
1717
    }
1718
    return backoffInterval.getRetryState();
1✔
1719
  }
1720

1721
  private static void reportActivityTaskCancellation(
1722
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1723
    Payloads details = null;
1✔
1724
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
1725
      {
1726
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
1727

1728
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
1729
      }
1✔
1730
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
1731
      {
1732
        RespondActivityTaskCanceledByIdRequest cr =
×
1733
            (RespondActivityTaskCanceledByIdRequest) request;
1734
        details = cr.hasDetails() ? cr.getDetails() : null;
×
1735
      }
×
1736
    } else if (request != null) {
1✔
1737
      throw Status.INTERNAL
×
1738
          .withDescription("Unexpected request type: " + request)
×
1739
          .asRuntimeException();
×
1740
    }
1741
    ActivityTaskCanceledEventAttributes.Builder a =
1742
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
1743
            .setScheduledEventId(data.scheduledEventId)
1✔
1744
            .setStartedEventId(data.startedEventId);
1✔
1745
    if (details != null) {
1✔
1746
      a.setDetails(details);
×
1747
    }
1748
    HistoryEvent event =
1749
        HistoryEvent.newBuilder()
1✔
1750
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
1751
            .setActivityTaskCanceledEventAttributes(a)
1✔
1752
            .build();
1✔
1753
    ctx.addEvent(event);
1✔
1754
  }
1✔
1755

1756
  private static void heartbeatActivityTask(
1757
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
1758
    data.heartbeatDetails = details;
1✔
1759
  }
1✔
1760

1761
  private static void acceptUpdate(
1762
      RequestContext ctx,
1763
      UpdateWorkflowExecutionData data,
1764
      Message msg,
1765
      long workflowTaskCompletedEventId) {
1766
    try {
1767
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
1768

1769
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
1770
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
1771
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
1772
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
1773
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
1774
              .setAcceptedRequest(acceptance.getAcceptedRequest())
1✔
1775
              .build();
1✔
1776

1777
      HistoryEvent event =
1778
          HistoryEvent.newBuilder()
1✔
1779
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
1780
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
1781
              .build();
1✔
1782
      // If the workflow is finished we can't write more events
1783
      // to history so if the message was processed after the workflow
1784
      // was closed there is nothing we can do.
1785
      // The real server also has this same problem
1786
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1787
        ctx.addEvent(event);
1✔
1788
      }
1789

1790
      UpdateWorkflowExecutionResponse response =
1791
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1792
              .setUpdateRef(
1✔
1793
                  UpdateRef.newBuilder()
1✔
1794
                      .setWorkflowExecution(ctx.getExecution())
1✔
1795
                      .setUpdateId(data.id))
1✔
1796
              .setOutcome(Outcome.getDefaultInstance())
1✔
1797
              .build();
1✔
1798

1799
      data.acceptance.complete(response);
1✔
1800
    } catch (InvalidProtocolBufferException e) {
×
1801
      throw new RuntimeException(e);
×
1802
    }
1✔
1803
  }
1✔
1804

1805
  private static void completeUpdate(
1806
      RequestContext ctx,
1807
      UpdateWorkflowExecutionData data,
1808
      Message msg,
1809
      long workflowTaskCompletedEventId) {
1810
    try {
1811
      Response response = msg.getBody().unpack(Response.class);
1✔
1812

1813
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
1814
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
1815
              .setMeta(response.getMeta())
1✔
1816
              .setOutcome(response.getOutcome())
1✔
1817
              .build();
1✔
1818

1819
      HistoryEvent event =
1820
          HistoryEvent.newBuilder()
1✔
1821
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
1822
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
1823
              .build();
1✔
1824
      // If the workflow is finished we can't write more events
1825
      // to history so if the message was processed after the workflow
1826
      // was closed there is nothing we can do.
1827
      // The real server also has this same problem
1828
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1829
        ctx.addEvent(event);
1✔
1830
      }
1831

1832
      UpdateWorkflowExecutionResponse updateResponse =
1833
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1834
              .setUpdateRef(
1✔
1835
                  UpdateRef.newBuilder()
1✔
1836
                      .setWorkflowExecution(ctx.getExecution())
1✔
1837
                      .setUpdateId(data.id))
1✔
1838
              .setOutcome(response.getOutcome())
1✔
1839
              .build();
1✔
1840

1841
      data.complete.complete(updateResponse);
1✔
1842
    } catch (InvalidProtocolBufferException e) {
×
1843
      throw new RuntimeException(e);
×
1844
    }
1✔
1845
  }
1✔
1846

1847
  private static void startTimer(
1848
      RequestContext ctx,
1849
      TimerData data,
1850
      StartTimerCommandAttributes d,
1851
      long workflowTaskCompletedEventId) {
1852
    TimerStartedEventAttributes.Builder a =
1853
        TimerStartedEventAttributes.newBuilder()
1✔
1854
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1855
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
1856
            .setTimerId(d.getTimerId());
1✔
1857
    HistoryEvent event =
1858
        HistoryEvent.newBuilder()
1✔
1859
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
1860
            .setTimerStartedEventAttributes(a)
1✔
1861
            .build();
1✔
1862
    long startedEventId = ctx.addEvent(event);
1✔
1863
    ctx.onCommit(
1✔
1864
        (historySize) -> {
1865
          data.startedEvent = a.build();
1✔
1866
          data.startedEventId = startedEventId;
1✔
1867
        });
1✔
1868
  }
1✔
1869

1870
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1871
    TimerFiredEventAttributes.Builder a =
1872
        TimerFiredEventAttributes.newBuilder()
1✔
1873
            .setTimerId(data.startedEvent.getTimerId())
1✔
1874
            .setStartedEventId(data.startedEventId);
1✔
1875
    HistoryEvent event =
1876
        HistoryEvent.newBuilder()
1✔
1877
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
1878
            .setTimerFiredEventAttributes(a)
1✔
1879
            .build();
1✔
1880
    ctx.addEvent(event);
1✔
1881
  }
1✔
1882

1883
  private static void cancelTimer(
1884
      RequestContext ctx,
1885
      TimerData data,
1886
      CancelTimerCommandAttributes d,
1887
      long workflowTaskCompletedEventId) {
1888
    TimerCanceledEventAttributes.Builder a =
1889
        TimerCanceledEventAttributes.newBuilder()
1✔
1890
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1891
            .setTimerId(d.getTimerId())
1✔
1892
            .setStartedEventId(data.startedEventId);
1✔
1893
    HistoryEvent event =
1894
        HistoryEvent.newBuilder()
1✔
1895
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
1896
            .setTimerCanceledEventAttributes(a)
1✔
1897
            .build();
1✔
1898
    ctx.addEvent(event);
1✔
1899
  }
1✔
1900

1901
  private static void initiateExternalSignal(
1902
      RequestContext ctx,
1903
      SignalExternalData data,
1904
      SignalExternalWorkflowExecutionCommandAttributes d,
1905
      long workflowTaskCompletedEventId) {
1906
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1907
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1908
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1909
            .setControl(d.getControl())
1✔
1910
            .setInput(d.getInput())
1✔
1911
            .setNamespace(d.getNamespace())
1✔
1912
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1913
            .setSignalName(d.getSignalName())
1✔
1914
            .setWorkflowExecution(d.getExecution());
1✔
1915

1916
    HistoryEvent event =
1917
        HistoryEvent.newBuilder()
1✔
1918
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1919
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1920
            .build();
1✔
1921
    long initiatedEventId = ctx.addEvent(event);
1✔
1922
    ctx.onCommit(
1✔
1923
        (historySize) -> {
1924
          data.initiatedEventId = initiatedEventId;
1✔
1925
          data.initiatedEvent = a.build();
1✔
1926
        });
1✔
1927
  }
1✔
1928

1929
  private static void failExternalSignal(
1930
      RequestContext ctx,
1931
      SignalExternalData data,
1932
      SignalExternalWorkflowExecutionFailedCause cause,
1933
      long notUsed) {
1934
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1935
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
1936
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1937
            .setInitiatedEventId(data.initiatedEventId)
1✔
1938
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
1939
            .setControl(initiatedEvent.getControl())
1✔
1940
            .setCause(cause)
1✔
1941
            .setNamespace(initiatedEvent.getNamespace());
1✔
1942
    HistoryEvent event =
1943
        HistoryEvent.newBuilder()
1✔
1944
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
1945
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
1946
            .build();
1✔
1947
    ctx.addEvent(event);
1✔
1948
  }
1✔
1949

1950
  private static void completeExternalSignal(
1951
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1952
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1953
    WorkflowExecution signaledExecution =
1✔
1954
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
1955
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
1956
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
1957
            .setInitiatedEventId(data.initiatedEventId)
1✔
1958
            .setWorkflowExecution(signaledExecution)
1✔
1959
            .setControl(initiatedEvent.getControl())
1✔
1960
            .setNamespace(initiatedEvent.getNamespace());
1✔
1961
    HistoryEvent event =
1962
        HistoryEvent.newBuilder()
1✔
1963
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
1964
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
1965
            .build();
1✔
1966
    ctx.addEvent(event);
1✔
1967
  }
1✔
1968

1969
  private static void initiateExternalCancellation(
1970
      RequestContext ctx,
1971
      CancelExternalData data,
1972
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
1973
      long workflowTaskCompletedEventId) {
1974
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1975
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1976
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1977
            .setControl(d.getControl())
1✔
1978
            .setNamespace(d.getNamespace())
1✔
1979
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1980
            .setWorkflowExecution(
1✔
1981
                WorkflowExecution.newBuilder()
1✔
1982
                    .setWorkflowId(d.getWorkflowId())
1✔
1983
                    .setRunId(d.getRunId())
1✔
1984
                    .build());
1✔
1985

1986
    HistoryEvent event =
1987
        HistoryEvent.newBuilder()
1✔
1988
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1989
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1990
            .build();
1✔
1991
    long initiatedEventId = ctx.addEvent(event);
1✔
1992
    ctx.onCommit(
1✔
1993
        (historySize) -> {
1994
          data.initiatedEventId = initiatedEventId;
1✔
1995
          data.initiatedEvent = a.build();
1✔
1996
        });
1✔
1997
  }
1✔
1998

1999
  private static void reportExternalCancellationRequested(
2000
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2001
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2002
        data.initiatedEvent;
2003
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2004
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2005
            .setInitiatedEventId(data.initiatedEventId)
1✔
2006
            .setWorkflowExecution(
1✔
2007
                WorkflowExecution.newBuilder()
1✔
2008
                    .setRunId(runId)
1✔
2009
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2010
                    .build())
1✔
2011
            .setNamespace(initiatedEvent.getNamespace());
1✔
2012
    HistoryEvent event =
2013
        HistoryEvent.newBuilder()
1✔
2014
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2015
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2016
            .build();
1✔
2017
    ctx.addEvent(event);
1✔
2018
  }
1✔
2019

2020
  private static void failExternalCancellation(
2021
      RequestContext ctx,
2022
      CancelExternalData data,
2023
      CancelExternalWorkflowExecutionFailedCause cause,
2024
      long notUsed) {
2025
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2026
        data.initiatedEvent;
2027
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2028
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2029
            .setInitiatedEventId(data.initiatedEventId)
×
2030
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2031
            .setControl(initiatedEvent.getControl())
×
2032
            .setCause(cause)
×
2033
            .setNamespace(initiatedEvent.getNamespace());
×
2034
    HistoryEvent event =
2035
        HistoryEvent.newBuilder()
×
2036
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2037
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2038
            .build();
×
2039
    ctx.addEvent(event);
×
2040
  }
×
2041

2042
  // Mimics the default activity retry policy of a standard Temporal server.
2043
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2044
    Duration initialInterval =
2045
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2046
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2047
            : originalPolicy.getInitialInterval();
1✔
2048

2049
    return RetryPolicy.newBuilder()
1✔
2050
        .setInitialInterval(initialInterval)
1✔
2051
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2052
        .setMaximumInterval(
1✔
2053
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2054
                ? Durations.fromMillis(
1✔
2055
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2056
                        * Durations.toMillis(initialInterval))
1✔
2057
                : originalPolicy.getMaximumInterval())
1✔
2058
        .setBackoffCoefficient(
1✔
2059
            originalPolicy.getBackoffCoefficient() == 0
1✔
2060
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2061
                : originalPolicy.getBackoffCoefficient())
1✔
2062
        .setMaximumAttempts(
1✔
2063
            originalPolicy.getMaximumAttempts() == 0
1✔
2064
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2065
                : originalPolicy.getMaximumAttempts())
1✔
2066
        .build();
1✔
2067
  }
2068
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc