• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #201

16 Oct 2023 03:47PM UTC coverage: 77.389% (+0.02%) from 77.368%
#201

push

github-actions

web-flow
Apply data converter context in more places (#1896)

Add data converter context to memo, lastFailure and schedules

23 of 23 new or added lines in 5 files covered. (100.0%)

18718 of 24187 relevant lines covered (77.39%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.9
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
37
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
38
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
39
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
40
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
41
import static io.temporal.internal.testservice.StateMachines.State.NONE;
42
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
43
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
44
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
45

46
import com.google.common.base.Preconditions;
47
import com.google.protobuf.Any;
48
import com.google.protobuf.Duration;
49
import com.google.protobuf.InvalidProtocolBufferException;
50
import com.google.protobuf.Timestamp;
51
import com.google.protobuf.util.Durations;
52
import com.google.protobuf.util.Timestamps;
53
import io.grpc.Status;
54
import io.grpc.StatusRuntimeException;
55
import io.temporal.api.command.v1.CancelTimerCommandAttributes;
56
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
57
import io.temporal.api.command.v1.CompleteWorkflowExecutionCommandAttributes;
58
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
59
import io.temporal.api.command.v1.FailWorkflowExecutionCommandAttributes;
60
import io.temporal.api.command.v1.RequestCancelActivityTaskCommandAttributes;
61
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
62
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
63
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
64
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
65
import io.temporal.api.command.v1.StartTimerCommandAttributes;
66
import io.temporal.api.common.v1.Payloads;
67
import io.temporal.api.common.v1.RetryPolicy;
68
import io.temporal.api.common.v1.WorkflowExecution;
69
import io.temporal.api.enums.v1.CancelExternalWorkflowExecutionFailedCause;
70
import io.temporal.api.enums.v1.EventType;
71
import io.temporal.api.enums.v1.RetryState;
72
import io.temporal.api.enums.v1.SignalExternalWorkflowExecutionFailedCause;
73
import io.temporal.api.enums.v1.StartChildWorkflowExecutionFailedCause;
74
import io.temporal.api.enums.v1.TimeoutType;
75
import io.temporal.api.errordetails.v1.QueryFailedFailure;
76
import io.temporal.api.failure.v1.ApplicationFailureInfo;
77
import io.temporal.api.failure.v1.Failure;
78
import io.temporal.api.failure.v1.TimeoutFailureInfo;
79
import io.temporal.api.history.v1.*;
80
import io.temporal.api.protocol.v1.Message;
81
import io.temporal.api.query.v1.WorkflowQueryResult;
82
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
83
import io.temporal.api.taskqueue.v1.TaskQueue;
84
import io.temporal.api.update.v1.*;
85
import io.temporal.api.workflowservice.v1.*;
86
import io.temporal.internal.common.ProtobufTimeUtils;
87
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
88
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
89
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
90
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
91
import io.temporal.serviceclient.StatusUtils;
92
import io.temporal.workflow.Functions;
93
import java.util.*;
94
import java.util.concurrent.CompletableFuture;
95
import java.util.concurrent.ForkJoinPool;
96
import javax.annotation.Nonnull;
97
import org.slf4j.Logger;
98
import org.slf4j.LoggerFactory;
99

100
class StateMachines {
×
101

102
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
103

104
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
105
      10L * 365 * 24 * 3600 * 1000;
106
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
107
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
108
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
109
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
110
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
111
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
112
  static final int NO_EVENT_ID = -1;
113

114
  enum State {
1✔
115
    NONE,
1✔
116
    INITIATED,
1✔
117
    STARTED,
1✔
118
    FAILED,
1✔
119
    TIMED_OUT,
1✔
120
    CANCELLATION_REQUESTED,
1✔
121
    CANCELED,
1✔
122
    COMPLETED,
1✔
123
    CONTINUED_AS_NEW,
1✔
124
    TERMINATED,
1✔
125
  }
126

127
  enum Action {
1✔
128
    INITIATE,
1✔
129
    START,
1✔
130
    FAIL,
1✔
131
    TIME_OUT,
1✔
132
    REQUEST_CANCELLATION,
1✔
133
    CANCEL,
1✔
134
    TERMINATE,
1✔
135
    UPDATE,
1✔
136
    COMPLETE,
1✔
137
    CONTINUE_AS_NEW,
1✔
138
    QUERY,
1✔
139
    UPDATE_WORKFLOW_EXECUTION,
1✔
140
  }
141

142
  static final class WorkflowData {
143
    Optional<TestServiceRetryState> retryState;
144
    Duration backoffStartInterval;
145
    String cronSchedule;
146
    Payloads lastCompletionResult;
147
    Optional<Failure> lastFailure;
148
    /**
149
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
150
     */
151
    final @Nonnull String firstExecutionRunId;
152
    /**
153
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
154
     */
155
    final @Nonnull String originalExecutionRunId;
156

157
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
158
    Optional<String> continuedExecutionRunId;
159

160
    Functions.Proc runTimerCancellationHandle;
161

162
    WorkflowData(
163
        Optional<TestServiceRetryState> retryState,
164
        Duration backoffStartInterval,
165
        String cronSchedule,
166
        Payloads lastCompletionResult,
167
        Optional<Failure> lastFailure,
168
        @Nonnull String firstExecutionRunId,
169
        @Nonnull String originalExecutionRunId,
170
        Optional<String> continuedExecutionRunId) {
1✔
171
      this.retryState = retryState;
1✔
172
      this.backoffStartInterval = backoffStartInterval;
1✔
173
      this.cronSchedule = cronSchedule;
1✔
174
      this.lastCompletionResult = lastCompletionResult;
1✔
175
      this.firstExecutionRunId =
1✔
176
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
177
      this.originalExecutionRunId =
1✔
178
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
179
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
180
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
181
    }
1✔
182

183
    @Override
184
    public String toString() {
185
      return "WorkflowData{"
×
186
          + "retryState="
187
          + retryState
188
          + ", backoffStartInterval="
189
          + backoffStartInterval
190
          + ", cronSchedule='"
191
          + cronSchedule
192
          + '\''
193
          + ", lastCompletionResult="
194
          + lastCompletionResult
195
          + ", firstExecutionRunId='"
196
          + firstExecutionRunId
197
          + '\''
198
          + ", originalExecutionRunId='"
199
          + originalExecutionRunId
200
          + '\''
201
          + ", continuedExecutionRunId="
202
          + continuedExecutionRunId
203
          + '}';
204
    }
205
  }
206

207
  static final class WorkflowTaskData {
208

209
    final TestWorkflowStore store;
210

211
    boolean workflowCompleted;
212

213
    /** id of the last started event which completed successfully */
214
    long lastSuccessfulStartedEventId;
215

216
    final StartWorkflowExecutionRequest startRequest;
217

218
    long startedEventId = NO_EVENT_ID;
1✔
219

220
    PollWorkflowTaskQueueResponse.Builder workflowTask;
221

222
    /**
223
     * Events that are added during execution of a workflow task. They have to be buffered to be
224
     * added after the events generated by a workflow task. Without this the determinism will be
225
     * broken on replay.
226
     */
227
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
228

229
    /**
230
     * Update requests that are added during execution of a workflow task. They have to be buffered
231
     * to be added to the next workflow task.
232
     */
233
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
234

235
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
236

237
    long scheduledEventId = NO_EVENT_ID;
1✔
238

239
    int attempt = 0;
1✔
240

241
    /** Query requests received during workflow task processing (after start) */
242
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
243

244
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
245
        new HashMap<>();
246

247
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
248
      this.store = store;
1✔
249
      this.startRequest = startRequest;
1✔
250
    }
1✔
251

252
    void clear() {
253
      startedEventId = NO_EVENT_ID;
1✔
254
      workflowTask = null;
1✔
255
      scheduledEventId = NO_EVENT_ID;
1✔
256
      attempt = 0;
1✔
257
    }
1✔
258

259
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
260
      return Optional.ofNullable(
1✔
261
          updateRequest.getOrDefault(
1✔
262
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
263
    }
264

265
    @Override
266
    public String toString() {
267
      return "WorkflowTaskData{"
×
268
          + "store="
269
          + store
270
          + ", workflowCompleted="
271
          + workflowCompleted
272
          + ", lastSuccessfulStartedEventId="
273
          + lastSuccessfulStartedEventId
274
          + ", startRequest="
275
          + startRequest
276
          + ", startedEventId="
277
          + startedEventId
278
          + ", workflowTask="
279
          + workflowTask
280
          + ", bufferedEvents="
281
          + bufferedEvents
282
          + ", scheduledEventId="
283
          + scheduledEventId
284
          + ", attempt="
285
          + attempt
286
          + ", queryBuffer="
287
          + queryBuffer
288
          + ", consistentQueryRequests="
289
          + consistentQueryRequests
290
          + ", updateRequest="
291
          + updateRequest
292
          + ", updateRequestBuffer="
293
          + updateRequestBuffer
294
          + '}';
295
    }
296
  }
297

298
  static final class ActivityTaskData {
299

300
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
301
    ActivityTaskScheduledEventAttributes scheduledEvent;
302
    ActivityTask activityTask;
303

304
    final TestWorkflowStore store;
305

306
    long scheduledEventId = NO_EVENT_ID;
1✔
307
    long startedEventId = NO_EVENT_ID;
1✔
308
    public HistoryEvent startedEvent;
309
    Payloads heartbeatDetails;
310
    long lastHeartbeatTime;
311
    TestServiceRetryState retryState;
312
    Duration nextBackoffInterval;
313
    String identity;
314

315
    ActivityTaskData(
316
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
317
      this.store = store;
1✔
318
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
319
    }
1✔
320

321
    @Override
322
    public String toString() {
323
      return "ActivityTaskData{"
×
324
          + "startWorkflowExecutionRequest="
325
          + startWorkflowExecutionRequest
326
          + ", scheduledEvent="
327
          + scheduledEvent
328
          + ", activityTask="
329
          + activityTask
330
          + ", store="
331
          + store
332
          + ", scheduledEventId="
333
          + scheduledEventId
334
          + ", startedEventId="
335
          + startedEventId
336
          + ", startedEvent="
337
          + startedEvent
338
          + ", heartbeatDetails="
339
          + heartbeatDetails
340
          + ", lastHeartbeatTime="
341
          + lastHeartbeatTime
342
          + ", retryState="
343
          + retryState
344
          + ", nextBackoffInterval="
345
          + nextBackoffInterval
346
          + '}';
347
    }
348

349
    public int getAttempt() {
350
      return retryState != null ? retryState.getAttempt() : 1;
1✔
351
    }
352
  }
353

354
  static final class SignalExternalData {
1✔
355
    long initiatedEventId = NO_EVENT_ID;
1✔
356
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
357

358
    @Override
359
    public String toString() {
360
      return "SignalExternalData{"
×
361
          + "initiatedEventId="
362
          + initiatedEventId
363
          + ", initiatedEvent="
364
          + initiatedEvent
365
          + '}';
366
    }
367
  }
368

369
  static final class CancelExternalData {
1✔
370
    long initiatedEventId = NO_EVENT_ID;
1✔
371
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
372

373
    @Override
374
    public String toString() {
375
      return "CancelExternalData{"
×
376
          + "initiatedEventId="
377
          + initiatedEventId
378
          + ", initiatedEvent="
379
          + initiatedEvent
380
          + '}';
381
    }
382
  }
383

384
  static final class ChildWorkflowData {
385

386
    final TestWorkflowService service;
387
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
388
    long initiatedEventId;
389
    long startedEventId;
390
    WorkflowExecution execution;
391

392
    public ChildWorkflowData(TestWorkflowService service) {
1✔
393
      this.service = service;
1✔
394
    }
1✔
395

396
    @Override
397
    public String toString() {
398
      return "ChildWorkflowData{"
×
399
          + "service="
400
          + service
401
          + ", initiatedEvent="
402
          + initiatedEvent
403
          + ", initiatedEventId="
404
          + initiatedEventId
405
          + ", startedEventId="
406
          + startedEventId
407
          + ", execution="
408
          + execution
409
          + '}';
410
    }
411
  }
412

413
  static final class TimerData {
1✔
414
    TimerStartedEventAttributes startedEvent;
415
    public long startedEventId;
416

417
    @Override
418
    public String toString() {
419
      return "TimerData{"
×
420
          + "startedEvent="
421
          + startedEvent
422
          + ", startedEventId="
423
          + startedEventId
424
          + '}';
425
    }
426
  }
427

428
  /** Represents an accepted update workflow execution request */
429
  static final class UpdateWorkflowExecutionData {
430
    final String id;
431
    final CompletableFuture<UpdateWorkflowExecutionResponse> acceptance;
432
    final CompletableFuture<UpdateWorkflowExecutionResponse> complete;
433
    final Request initialRequest;
434

435
    public UpdateWorkflowExecutionData(
436
        String id,
437
        Request initialRequest,
438
        CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
439
        CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
1✔
440
      this.id = id;
1✔
441
      this.initialRequest = initialRequest;
1✔
442
      this.acceptance = acceptance;
1✔
443
      this.complete = complete;
1✔
444
    }
1✔
445

446
    @Override
447
    public String toString() {
448
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
449
    }
450
  }
451

452
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
453
    return new StateMachine<>(data)
1✔
454
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
455
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
456
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
457
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
458
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
459
        .add(
1✔
460
            STARTED,
461
            REQUEST_CANCELLATION,
462
            CANCELLATION_REQUESTED,
463
            StateMachines::requestWorkflowCancellation)
464
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
465
        .add(
1✔
466
            CANCELLATION_REQUESTED,
467
            REQUEST_CANCELLATION,
468
            CANCELLATION_REQUESTED,
469
            StateMachines::noop)
470
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
471
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
472
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
473
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
474
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
475
  }
476

477
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
478
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
479
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
480
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
481
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
482
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
483
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
484
        // StateMachines::queryWhileScheduled)
485
        //        .add(
486
        //            INITIATED_QUERY_ONLY,
487
        //            INITIATE,
488
        //            INITIATED,
489
        //            StateMachines::convertQueryWorkflowTaskToReal)
490
        //        .add(
491
        //            INITIATED_QUERY_ONLY,
492
        //            START,
493
        //            STARTED_QUERY_ONLY,
494
        //            StateMachines::startQueryOnlyWorkflowTask)
495
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
496
        // StateMachines::needsWorkflowTask)
497
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
498
        // StateMachines::needsWorkflowTaskDueToQuery)
499
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
500
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
501
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
502
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
503
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
504
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
505
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
506
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
507
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
508
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
509
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
510
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
511
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
512
  }
513

514
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
515
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
516
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
517
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
518
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
519
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
520
        .add(
1✔
521
            INITIATED,
522
            REQUEST_CANCELLATION,
523
            CANCELLATION_REQUESTED,
524
            StateMachines::requestActivityCancellation)
525
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
526
        // Transitions to initiated in case of a retry
527
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
528
        // Transitions to initiated in case of a retry
529
        .add(
1✔
530
            STARTED,
531
            TIME_OUT,
532
            new State[] {TIMED_OUT, INITIATED},
533
            StateMachines::timeoutActivityTask)
534
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
535
        .add(
1✔
536
            STARTED,
537
            REQUEST_CANCELLATION,
538
            CANCELLATION_REQUESTED,
539
            StateMachines::requestActivityCancellation)
540
        .add(
1✔
541
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
542
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
543
        .add(
1✔
544
            CANCELLATION_REQUESTED,
545
            UPDATE,
546
            CANCELLATION_REQUESTED,
547
            StateMachines::heartbeatActivityTask)
548
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
549
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
550
  }
551

552
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
553
      TestWorkflowService service) {
554
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
555
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
556
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
557
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
558
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
559
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
560
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
561
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
562
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
563
  }
564

565
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
566
      String updateId,
567
      Request initialRequest,
568
      CompletableFuture<UpdateWorkflowExecutionResponse> acceptance,
569
      CompletableFuture<UpdateWorkflowExecutionResponse> complete) {
570
    return new StateMachine<>(
1✔
571
            new UpdateWorkflowExecutionData(updateId, initialRequest, acceptance, complete))
572
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
573
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
574
  }
575

576
  public static StateMachine<TimerData> newTimerStateMachine() {
577
    return new StateMachine<>(new TimerData())
1✔
578
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
579
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
580
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
581
  }
582

583
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
584
    return new StateMachine<>(new SignalExternalData())
1✔
585
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
586
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
587
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
588
  }
589

590
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
591
    return new StateMachine<>(new CancelExternalData())
1✔
592
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
593
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
594
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
595
  }
596

597
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
598

599
  private static void timeoutChildWorkflow(
600
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
601
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
602
    ChildWorkflowExecutionTimedOutEventAttributes a =
603
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
604
            .setNamespace(ie.getNamespace())
1✔
605
            .setStartedEventId(data.startedEventId)
1✔
606
            .setWorkflowExecution(data.execution)
1✔
607
            .setWorkflowType(ie.getWorkflowType())
1✔
608
            .setRetryState(retryState)
1✔
609
            .setInitiatedEventId(data.initiatedEventId)
1✔
610
            .build();
1✔
611
    HistoryEvent event =
612
        HistoryEvent.newBuilder()
1✔
613
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
614
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
615
            .build();
1✔
616
    ctx.addEvent(event);
1✔
617
  }
1✔
618

619
  private static void startChildWorkflowFailed(
620
      RequestContext ctx,
621
      ChildWorkflowData data,
622
      StartChildWorkflowExecutionFailedEventAttributes a,
623
      long notUsed) {
624
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
625
        a.toBuilder()
1✔
626
            .setInitiatedEventId(data.initiatedEventId)
1✔
627
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
628
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
629
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
630
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
631
    }
632
    HistoryEvent event =
633
        HistoryEvent.newBuilder()
1✔
634
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
635
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
636
            .build();
1✔
637
    ctx.addEvent(event);
1✔
638
  }
1✔
639

640
  private static void childWorkflowStarted(
641
      RequestContext ctx,
642
      ChildWorkflowData data,
643
      ChildWorkflowExecutionStartedEventAttributes a,
644
      long notUsed) {
645
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
646
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
647
    HistoryEvent event =
648
        HistoryEvent.newBuilder()
1✔
649
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
650
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
651
            .build();
1✔
652
    long startedEventId = ctx.addEvent(event);
1✔
653
    ctx.onCommit(
1✔
654
        (historySize) -> {
655
          data.startedEventId = startedEventId;
1✔
656
          data.execution = updatedAttr.getWorkflowExecution();
1✔
657
        });
1✔
658
  }
1✔
659

660
  private static void childWorkflowCompleted(
661
      RequestContext ctx,
662
      ChildWorkflowData data,
663
      ChildWorkflowExecutionCompletedEventAttributes a,
664
      long notUsed) {
665
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
666
        a.toBuilder()
1✔
667
            .setInitiatedEventId(data.initiatedEventId)
1✔
668
            .setStartedEventId(data.startedEventId)
1✔
669
            .build();
1✔
670
    HistoryEvent event =
671
        HistoryEvent.newBuilder()
1✔
672
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
673
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
674
            .build();
1✔
675
    ctx.addEvent(event);
1✔
676
  }
1✔
677

678
  private static void childWorkflowFailed(
679
      RequestContext ctx,
680
      ChildWorkflowData data,
681
      ChildWorkflowExecutionFailedEventAttributes a,
682
      long notUsed) {
683
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
684
        a.toBuilder()
1✔
685
            .setInitiatedEventId(data.initiatedEventId)
1✔
686
            .setStartedEventId(data.startedEventId)
1✔
687
            .setWorkflowExecution(data.execution)
1✔
688
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
689
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
690
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
691
    }
692
    HistoryEvent event =
693
        HistoryEvent.newBuilder()
1✔
694
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
695
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
696
            .build();
1✔
697
    ctx.addEvent(event);
1✔
698
  }
1✔
699

700
  private static void childWorkflowCanceled(
701
      RequestContext ctx,
702
      ChildWorkflowData data,
703
      ChildWorkflowExecutionCanceledEventAttributes a,
704
      long notUsed) {
705
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
706
        a.toBuilder()
1✔
707
            .setInitiatedEventId(data.initiatedEventId)
1✔
708
            .setStartedEventId(data.startedEventId)
1✔
709
            .build();
1✔
710
    HistoryEvent event =
711
        HistoryEvent.newBuilder()
1✔
712
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
713
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
714
            .build();
1✔
715
    ctx.addEvent(event);
1✔
716
  }
1✔
717

718
  private static void initiateChildWorkflow(
719
      RequestContext ctx,
720
      ChildWorkflowData data,
721
      StartChildWorkflowExecutionCommandAttributes d,
722
      long workflowTaskCompletedEventId) {
723
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
724
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
725
            .setControl(d.getControl())
1✔
726
            .setInput(d.getInput())
1✔
727
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
728
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
729
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
730
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
731
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
732
            .setTaskQueue(d.getTaskQueue())
1✔
733
            .setWorkflowId(d.getWorkflowId())
1✔
734
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
735
            .setWorkflowType(d.getWorkflowType())
1✔
736
            .setCronSchedule(d.getCronSchedule())
1✔
737
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
738
    if (d.hasHeader()) {
1✔
739
      a.setHeader(d.getHeader());
1✔
740
    }
741
    if (d.hasMemo()) {
1✔
742
      a.setMemo(d.getMemo());
1✔
743
    }
744
    if (d.hasRetryPolicy()) {
1✔
745
      a.setRetryPolicy(d.getRetryPolicy());
1✔
746
    }
747
    HistoryEvent event =
748
        HistoryEvent.newBuilder()
1✔
749
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
750
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
751
            .build();
1✔
752
    long initiatedEventId = ctx.addEvent(event);
1✔
753
    ctx.onCommit(
1✔
754
        (historySize) -> {
755
          data.initiatedEventId = initiatedEventId;
1✔
756
          data.initiatedEvent = a.build();
1✔
757
          StartWorkflowExecutionRequest.Builder startChild =
758
              StartWorkflowExecutionRequest.newBuilder()
1✔
759
                  .setRequestId(UUID.randomUUID().toString())
1✔
760
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
761
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
762
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
763
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
764
                  .setTaskQueue(d.getTaskQueue())
1✔
765
                  .setWorkflowId(d.getWorkflowId())
1✔
766
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
767
                  .setWorkflowType(d.getWorkflowType())
1✔
768
                  .setCronSchedule(d.getCronSchedule());
1✔
769
          if (d.hasHeader()) {
1✔
770
            startChild.setHeader(d.getHeader());
1✔
771
          }
772
          if (d.hasSearchAttributes()) {
1✔
773
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
774
          }
775
          if (d.hasMemo()) {
1✔
776
            startChild.setMemo(d.getMemo());
1✔
777
          }
778
          if (d.hasRetryPolicy()) {
1✔
779
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
780
          }
781
          if (d.hasInput()) {
1✔
782
            startChild.setInput(d.getInput());
1✔
783
          }
784
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
785
        });
1✔
786
  }
1✔
787

788
  private static void addStartChildTask(
789
      RequestContext ctx,
790
      ChildWorkflowData data,
791
      long initiatedEventId,
792
      StartWorkflowExecutionRequest startChild) {
793
    ForkJoinPool.commonPool()
1✔
794
        .execute(
1✔
795
            () -> {
796
              try {
797
                data.service.startWorkflowExecutionImpl(
1✔
798
                    startChild,
799
                    java.time.Duration.ZERO,
800
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
801
                    OptionalLong.of(data.initiatedEventId),
1✔
802
                    null);
803
              } catch (StatusRuntimeException e) {
1✔
804
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
805
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
806
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
807
                          .setInitiatedEventId(initiatedEventId)
1✔
808
                          .setCause(
1✔
809
                              StartChildWorkflowExecutionFailedCause
810
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
811
                          .build();
1✔
812
                  try {
813
                    ctx.getWorkflowMutableState()
1✔
814
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
815
                  } catch (Throwable ee) {
×
816
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
817
                  }
1✔
818
                } else {
1✔
819
                  log.error("Unexpected failure starting a child workflow", e);
×
820
                }
821
              } catch (Exception e) {
×
822
                log.error("Unexpected failure starting a child workflow", e);
×
823
              }
1✔
824
            });
1✔
825
  }
1✔
826

827
  private static void startWorkflow(
828
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
829
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
830
      throw Status.INVALID_ARGUMENT
×
831
          .withDescription("negative workflowExecution timeout")
×
832
          .asRuntimeException();
×
833
    }
834
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
835
      throw Status.INVALID_ARGUMENT
×
836
          .withDescription("negative workflowRun timeout")
×
837
          .asRuntimeException();
×
838
    }
839
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
840
      throw Status.INVALID_ARGUMENT
×
841
          .withDescription("negative workflowTaskTimeoutSeconds")
×
842
          .asRuntimeException();
×
843
    }
844
    if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
1✔
845
      throw Status.INVALID_ARGUMENT
×
846
          .withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
×
847
          .asRuntimeException();
×
848
    }
849

850
    WorkflowExecutionStartedEventAttributes.Builder a =
851
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
852
            .setWorkflowType(request.getWorkflowType())
1✔
853
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
854
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
855
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
856
            .setIdentity(request.getIdentity())
1✔
857
            .setInput(request.getInput())
1✔
858
            .setTaskQueue(request.getTaskQueue())
1✔
859
            .setAttempt(1);
1✔
860
    data.retryState.ifPresent(
1✔
861
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
862
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
863
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
864
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
865
    if (data.lastCompletionResult != null) {
1✔
866
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
867
    }
868
    if (request.hasWorkflowStartDelay()) {
1✔
869
      a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1✔
870
    }
871
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
872
    if (request.hasMemo()) {
1✔
873
      a.setMemo(request.getMemo());
1✔
874
    }
875
    if (request.hasSearchAttributes()) {
1✔
876
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
877
    }
878
    if (request.hasHeader()) {
1✔
879
      a.setHeader(request.getHeader());
1✔
880
    }
881
    String cronSchedule = request.getCronSchedule();
1✔
882
    if (!cronSchedule.trim().isEmpty()) {
1✔
883
      try {
884
        CronUtils.parseCron(cronSchedule);
1✔
885
        a.setCronSchedule(cronSchedule);
1✔
886
      } catch (Exception e) {
×
887
        throw Status.INVALID_ARGUMENT
×
888
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
889
            .withCause(e)
×
890
            .asRuntimeException();
×
891
      }
1✔
892
    }
893
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
894
    if (parent.isPresent()) {
1✔
895
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
896
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
897
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
898
    }
899
    HistoryEvent event =
900
        HistoryEvent.newBuilder()
1✔
901
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
902
            .setWorkflowExecutionStartedEventAttributes(a)
1✔
903
            .build();
1✔
904
    ctx.addEvent(event);
1✔
905
  }
1✔
906

907
  private static void completeWorkflow(
908
      RequestContext ctx,
909
      WorkflowData data,
910
      CompleteWorkflowExecutionCommandAttributes d,
911
      long workflowTaskCompletedEventId) {
912
    WorkflowExecutionCompletedEventAttributes.Builder a =
913
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
914
            .setResult(d.getResult())
1✔
915
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
916
    HistoryEvent event =
917
        HistoryEvent.newBuilder()
1✔
918
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
919
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
920
            .build();
1✔
921
    ctx.addEvent(event);
1✔
922
  }
1✔
923

924
  private static void continueAsNewWorkflow(
925
      RequestContext ctx,
926
      WorkflowData data,
927
      ContinueAsNewWorkflowExecutionCommandAttributes d,
928
      long workflowTaskCompletedEventId) {
929
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
930
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
931
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
932
    a.setInput(d.getInput());
1✔
933
    if (d.hasHeader()) {
1✔
934
      a.setHeader(d.getHeader());
1✔
935
    }
936
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
937
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
938
    } else {
939
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
940
    }
941
    if (d.hasTaskQueue()) {
1✔
942
      a.setTaskQueue(d.getTaskQueue());
1✔
943
    } else {
944
      a.setTaskQueue(sr.getTaskQueue());
1✔
945
    }
946
    if (d.hasWorkflowType()) {
1✔
947
      a.setWorkflowType(d.getWorkflowType());
1✔
948
    } else {
949
      a.setWorkflowType(sr.getWorkflowType());
1✔
950
    }
951
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
952
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
953
    } else {
954
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
955
    }
956
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
957
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
958
    if (d.hasLastCompletionResult()) {
1✔
959
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
960
    }
961
    if (d.hasFailure()) {
1✔
962
      a.setFailure(d.getFailure());
1✔
963
    }
964
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
965
    HistoryEvent event =
966
        HistoryEvent.newBuilder()
1✔
967
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
968
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
969
            .build();
1✔
970
    ctx.addEvent(event);
1✔
971
  }
1✔
972

973
  private static void failWorkflow(
974
      RequestContext ctx,
975
      WorkflowData data,
976
      FailWorkflowExecutionCommandAttributes d,
977
      long workflowTaskCompletedEventId) {
978
    WorkflowExecutionFailedEventAttributes.Builder a =
979
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
980
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
981
    if (d.hasFailure()) {
1✔
982
      a.setFailure(d.getFailure());
1✔
983
    }
984
    HistoryEvent event =
985
        HistoryEvent.newBuilder()
1✔
986
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
987
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
988
            .build();
1✔
989
    ctx.addEvent(event);
1✔
990
  }
1✔
991

992
  private static void timeoutWorkflow(
993
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
994
    WorkflowExecutionTimedOutEventAttributes.Builder a =
995
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
996
    HistoryEvent event =
997
        HistoryEvent.newBuilder()
1✔
998
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
999
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
1000
            .build();
1✔
1001
    ctx.addEvent(event);
1✔
1002
  }
1✔
1003

1004
  private static void cancelWorkflow(
1005
      RequestContext ctx,
1006
      WorkflowData data,
1007
      CancelWorkflowExecutionCommandAttributes d,
1008
      long workflowTaskCompletedEventId) {
1009
    WorkflowExecutionCanceledEventAttributes.Builder a =
1010
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1011
            .setDetails(d.getDetails())
1✔
1012
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1013
    HistoryEvent event =
1014
        HistoryEvent.newBuilder()
1✔
1015
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1016
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1017
            .build();
1✔
1018
    ctx.addEvent(event);
1✔
1019
  }
1✔
1020

1021
  private static void terminateWorkflow(
1022
      RequestContext ctx,
1023
      WorkflowData data,
1024
      TerminateWorkflowExecutionRequest d,
1025
      long workflowTaskCompletedEventId) {
1026
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1027
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1028
            .setDetails(d.getDetails())
1✔
1029
            .setIdentity(d.getIdentity())
1✔
1030
            .setReason(d.getReason());
1✔
1031
    HistoryEvent event =
1032
        HistoryEvent.newBuilder()
1✔
1033
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1034
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1035
            .build();
1✔
1036
    ctx.addEvent(event);
1✔
1037
  }
1✔
1038

1039
  private static void requestWorkflowCancellation(
1040
      RequestContext ctx,
1041
      WorkflowData data,
1042
      RequestCancelWorkflowExecutionRequest cancelRequest,
1043
      long notUsed) {
1044
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1045
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1046
            .setIdentity(cancelRequest.getIdentity());
1✔
1047
    HistoryEvent cancelRequested =
1048
        HistoryEvent.newBuilder()
1✔
1049
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1050
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1051
            .build();
1✔
1052
    ctx.addEvent(cancelRequested);
1✔
1053
  }
1✔
1054

1055
  private static void scheduleActivityTask(
1056
      RequestContext ctx,
1057
      ActivityTaskData data,
1058
      ScheduleActivityTaskCommandAttributes d,
1059
      long workflowTaskCompletedEventId) {
1060
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1061
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1062
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1063
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1064

1065
    ActivityTaskScheduledEventAttributes.Builder a =
1066
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1067
            .setInput(d.getInput())
1✔
1068
            .setActivityId(d.getActivityId())
1✔
1069
            .setActivityType(d.getActivityType())
1✔
1070
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1071
            .setRetryPolicy(retryPolicy)
1✔
1072
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1073
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1074
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1075
            .setTaskQueue(d.getTaskQueue())
1✔
1076
            .setHeader(d.getHeader())
1✔
1077
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1078

1079
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1080
    data.scheduledEvent = a.build();
1✔
1081
    HistoryEvent event =
1082
        HistoryEvent.newBuilder()
1✔
1083
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1084
            .setActivityTaskScheduledEventAttributes(a)
1✔
1085
            .build();
1✔
1086
    long scheduledEventId = ctx.addEvent(event);
1✔
1087

1088
    PollActivityTaskQueueResponse.Builder taskResponse =
1089
        PollActivityTaskQueueResponse.newBuilder()
1✔
1090
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1091
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1092
            .setActivityType(d.getActivityType())
1✔
1093
            .setWorkflowExecution(ctx.getExecution())
1✔
1094
            .setActivityId(d.getActivityId())
1✔
1095
            .setInput(d.getInput())
1✔
1096
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1097
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1098
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1099
            .setScheduledTime(ctx.currentTime())
1✔
1100
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1101
            .setHeader(d.getHeader())
1✔
1102
            .setAttempt(1);
1✔
1103

1104
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1105
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1106
    ctx.addActivityTask(activityTask);
1✔
1107
    ctx.onCommit(
1✔
1108
        (historySize) -> {
1109
          data.scheduledEventId = scheduledEventId;
1✔
1110
          data.activityTask = activityTask;
1✔
1111
          data.retryState = retryState;
1✔
1112
        });
1✔
1113
  }
1✔
1114

1115
  private static void requestActivityCancellation(
1116
      RequestContext ctx,
1117
      ActivityTaskData data,
1118
      RequestCancelActivityTaskCommandAttributes d,
1119
      long workflowTaskCompletedEventId) {
1120
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1121
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1122
            .setScheduledEventId(d.getScheduledEventId())
1✔
1123
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1124
    HistoryEvent event =
1125
        HistoryEvent.newBuilder()
1✔
1126
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1127
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1128
            .build();
1✔
1129
    ctx.addEvent(event);
1✔
1130
  }
1✔
1131

1132
  private static void scheduleWorkflowTask(
1133
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1134
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1135
    long scheduledEventId;
1136
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1137
    WorkflowTaskScheduledEventAttributes a =
1138
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1139
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1140
            .setTaskQueue(taskQueue)
1✔
1141
            .setAttempt(++data.attempt)
1✔
1142
            .build();
1✔
1143
    HistoryEvent event =
1144
        HistoryEvent.newBuilder()
1✔
1145
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1146
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1147
            .build();
1✔
1148
    scheduledEventId = ctx.addEvent(event);
1✔
1149
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1150
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1151
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1152
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1153
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1154
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1155
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1156
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1157
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1158
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1159
    ctx.onCommit(
1✔
1160
        (historySize) -> {
1161
          data.scheduledEventId = scheduledEventId;
1✔
1162
          data.workflowTask = workflowTaskResponse;
1✔
1163
          // Move buffered update request to new workflow task
1164
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1165
          data.updateRequestBuffer.clear();
1✔
1166
        });
1✔
1167
  }
1✔
1168

1169
  private static void convertQueryWorkflowTaskToReal(
1170
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1171
    StartWorkflowExecutionRequest request = data.startRequest;
×
1172
    WorkflowTaskScheduledEventAttributes a =
1173
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1174
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1175
            .setTaskQueue(request.getTaskQueue())
×
1176
            .setAttempt(data.attempt)
×
1177
            .build();
×
1178
    HistoryEvent event =
1179
        HistoryEvent.newBuilder()
×
1180
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1181
            .setWorkflowTaskScheduledEventAttributes(a)
×
1182
            .build();
×
1183
    long scheduledEventId = ctx.addEvent(event);
×
1184
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1185
  }
×
1186

1187
  private static void scheduleQueryWorkflowTask(
1188
      RequestContext ctx,
1189
      WorkflowTaskData data,
1190
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1191
      long notUsed) {
1192
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1193
    StartWorkflowExecutionRequest request = data.startRequest;
×
1194
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1195
        PollWorkflowTaskQueueResponse.newBuilder();
×
1196
    StickyExecutionAttributes stickyAttributes =
×
1197
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1198
    String taskQueue =
1199
        stickyAttributes == null
×
1200
            ? request.getTaskQueue().getName()
×
1201
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1202
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1203
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1204
    workflowTaskResponse.setAttempt(++data.attempt);
×
1205
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1206
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1207
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1208
    ctx.onCommit(
×
1209
        (historySize) -> {
1210
          if (data.lastSuccessfulStartedEventId > 0) {
×
1211
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1212
          }
1213
          data.scheduledEventId = NO_EVENT_ID;
×
1214
          data.workflowTask = workflowTaskResponse;
×
1215
          if (query != null) {
×
1216
            data.consistentQueryRequests.put(query.getKey(), query);
×
1217
          }
1218
        });
×
1219
  }
×
1220

1221
  private static void queryWhileScheduled(
1222
      RequestContext ctx,
1223
      WorkflowTaskData data,
1224
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1225
      long notUsed) {
1226
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1227
  }
1✔
1228

1229
  private static void bufferQuery(
1230
      RequestContext ctx,
1231
      WorkflowTaskData data,
1232
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1233
      long notUsed) {
1234
    data.queryBuffer.put(query.getKey(), query);
1✔
1235
  }
1✔
1236

1237
  private static void bufferUpdate(
1238
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1239
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1240
      throw Status.INTERNAL
×
1241
          .withDescription("Update ID already exists: " + update.getId())
×
1242
          .asRuntimeException();
×
1243
    }
1244
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1245
  }
1✔
1246

1247
  private static void addUpdate(
1248
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1249
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1250
      throw Status.INTERNAL
×
1251
          .withDescription("Update ID already exists: " + update.getId())
×
1252
          .asRuntimeException();
×
1253
    }
1254
    data.updateRequest.put(update.getId(), update);
1✔
1255
  }
1✔
1256

1257
  private static void startWorkflowTask(
1258
      RequestContext ctx,
1259
      WorkflowTaskData data,
1260
      PollWorkflowTaskQueueRequest request,
1261
      long notUsed) {
1262
    WorkflowTaskStartedEventAttributes a =
1263
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1264
            .setIdentity(request.getIdentity())
1✔
1265
            .setScheduledEventId(data.scheduledEventId)
1✔
1266
            .build();
1✔
1267
    HistoryEvent event =
1268
        HistoryEvent.newBuilder()
1✔
1269
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1270
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1271
            .build();
1✔
1272
    long startedEventId = ctx.addEvent(event);
1✔
1273
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1274
  }
1✔
1275

1276
  private static void startQueryOnlyWorkflowTask(
1277
      RequestContext ctx,
1278
      WorkflowTaskData data,
1279
      PollWorkflowTaskQueueRequest request,
1280
      long notUsed) {
1281
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1282
  }
×
1283

1284
  private static void startWorkflowTaskImpl(
1285
      RequestContext ctx,
1286
      WorkflowTaskData data,
1287
      PollWorkflowTaskQueueRequest request,
1288
      long startedEventId,
1289
      boolean queryOnly) {
1290
    ctx.onCommit(
1✔
1291
        (historySize) -> {
1292
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1293
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1294
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1295
          task.setTaskToken(taskToken.toBytes());
1✔
1296
          GetWorkflowExecutionHistoryRequest getRequest =
1297
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1298
                  .setNamespace(request.getNamespace())
1✔
1299
                  .setExecution(ctx.getExecution())
1✔
1300
                  .build();
1✔
1301
          List<HistoryEvent> events;
1302
          events =
1✔
1303
              data.store
1304
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1305
                  .getHistory()
1✔
1306
                  .getEventsList();
1✔
1307
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1308
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1309
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1310
          }
1311
          if (queryOnly && !data.workflowCompleted) {
1✔
1312
            events = new ArrayList<>(events); // convert list to mutable
×
1313
            // Add "fake" workflow task scheduled and started if workflow is not closed
1314
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1315
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1316
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1317
                    .setTaskQueue(request.getTaskQueue())
×
1318
                    .setAttempt(data.attempt)
×
1319
                    .build();
×
1320
            HistoryEvent scheduledEvent =
1321
                HistoryEvent.newBuilder()
×
1322
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1323
                    .setEventId(lastEventId + 1)
×
1324
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1325
                    .build();
×
1326
            events.add(scheduledEvent);
×
1327
            WorkflowTaskStartedEventAttributes startedAttributes =
1328
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1329
                    .setIdentity(request.getIdentity())
×
1330
                    .setScheduledEventId(lastEventId + 1)
×
1331
                    .build();
×
1332
            HistoryEvent startedEvent =
1333
                HistoryEvent.newBuilder()
×
1334
                    .setEventId(lastEventId + 1)
×
1335
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1336
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1337
                    .build();
×
1338
            events.add(startedEvent);
×
1339
            task.setStartedEventId(lastEventId + 2);
×
1340
          }
1341
          // get it from previous started event id.
1342
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1343
          // Transfer the queries
1344
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1345
              data.consistentQueryRequests;
1346
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1347
              queries.entrySet()) {
1✔
1348
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1349
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1350
          }
1✔
1351
          // Transfer the messages
1352
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1353
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1354
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1355
            Message updateMessage =
1356
                Message.newBuilder()
1✔
1357
                    .setId(update.getKey() + "/request")
1✔
1358
                    .setProtocolInstanceId(update.getKey())
1✔
1359
                    .setEventId(data.scheduledEventId)
1✔
1360
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1361
                    .build();
1✔
1362
            task.addMessages(updateMessage);
1✔
1363
          }
1✔
1364
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1365
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1366
          }
1367
          if (!queryOnly) {
1✔
1368
            data.startedEventId = startedEventId;
1✔
1369
          }
1370
        });
1✔
1371
  }
1✔
1372

1373
  private static void startActivityTask(
1374
      RequestContext ctx,
1375
      ActivityTaskData data,
1376
      PollActivityTaskQueueRequest request,
1377
      long notUsed) {
1378
    ActivityTaskStartedEventAttributes.Builder a =
1379
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1380
            .setIdentity(request.getIdentity())
1✔
1381
            .setScheduledEventId(data.scheduledEventId);
1✔
1382
    a.setAttempt(data.getAttempt());
1✔
1383
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1384
    // history. But in the case of retry it happens only after an activity completion.
1385
    Timestamp timestamp = data.store.currentTime();
1✔
1386
    HistoryEvent event =
1387
        HistoryEvent.newBuilder()
1✔
1388
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1389
            .setEventTime(timestamp)
1✔
1390
            .setActivityTaskStartedEventAttributes(a)
1✔
1391
            .build();
1✔
1392
    long startedEventId;
1393
    startedEventId = NO_EVENT_ID;
1✔
1394
    ctx.onCommit(
1✔
1395
        (historySize) -> {
1396
          data.startedEventId = startedEventId;
1✔
1397
          data.startedEvent = event;
1✔
1398
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1399
          task.setTaskToken(
1✔
1400
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1401
                  .toBytes());
1✔
1402
          task.setStartedTime(timestamp);
1✔
1403
        });
1✔
1404
  }
1✔
1405

1406
  private static void completeWorkflowTask(
1407
      RequestContext ctx,
1408
      WorkflowTaskData data,
1409
      RespondWorkflowTaskCompletedRequest request,
1410
      long notUsed) {
1411
    WorkflowTaskCompletedEventAttributes.Builder a =
1412
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1413
            .setIdentity(request.getIdentity())
1✔
1414
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1415
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1416
            .setSdkMetadata(request.getSdkMetadata())
1✔
1417
            .setScheduledEventId(data.scheduledEventId);
1✔
1418
    HistoryEvent event =
1419
        HistoryEvent.newBuilder()
1✔
1420
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1421
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1422
            .build();
1✔
1423
    ctx.addEvent(event);
1✔
1424
    ctx.onCommit(
1✔
1425
        (historySize) -> {
1426
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1427
          data.clear();
1✔
1428
        });
1✔
1429
  }
1✔
1430

1431
  private static void completeQuery(
1432
      RequestContext ctx,
1433
      WorkflowTaskData data,
1434
      RespondWorkflowTaskCompletedRequest request,
1435
      long notUsed) {
1436
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1437
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1438
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1439
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1440
      if (query != null) {
×
1441
        WorkflowQueryResult value = resultEntry.getValue();
×
1442
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1443
        switch (value.getResultType()) {
×
1444
          case QUERY_RESULT_TYPE_ANSWERED:
1445
            QueryWorkflowResponse response =
1446
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1447
            result.complete(response);
×
1448
            break;
×
1449
          case QUERY_RESULT_TYPE_FAILED:
1450
            result.completeExceptionally(
×
1451
                StatusUtils.newException(
×
1452
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1453
                    QueryFailedFailure.getDefaultInstance(),
×
1454
                    QueryFailedFailure.getDescriptor()));
×
1455
            break;
×
1456
          default:
1457
            throw Status.INVALID_ARGUMENT
×
1458
                .withDescription("Invalid query result type: " + value.getResultType())
×
1459
                .asRuntimeException();
×
1460
        }
1461
      }
1462
    }
×
1463
    ctx.onCommit(
×
1464
        (historySize) -> {
1465
          data.clear();
×
1466
          ctx.unlockTimer("completeQuery");
×
1467
        });
×
1468
  }
×
1469

1470
  private static void failQueryWorkflowTask(
1471
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1472
    data.consistentQueryRequests
×
1473
        .entrySet()
×
1474
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1475
    if (!data.consistentQueryRequests.isEmpty()) {
×
1476
      ctx.setNeedWorkflowTask(true);
×
1477
    }
1478
    ctx.unlockTimer("failQueryWorkflowTask");
×
1479
  }
×
1480

1481
  private static void failWorkflowTask(
1482
      RequestContext ctx,
1483
      WorkflowTaskData data,
1484
      RespondWorkflowTaskFailedRequest request,
1485
      long notUsed) {
1486
    WorkflowTaskFailedEventAttributes.Builder a =
1487
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1488
            .setIdentity(request.getIdentity())
1✔
1489
            .setStartedEventId(data.startedEventId)
1✔
1490
            .setScheduledEventId(data.scheduledEventId)
1✔
1491
            .setCause(request.getCause());
1✔
1492
    if (request.hasFailure()) {
1✔
1493
      a.setFailure(request.getFailure());
1✔
1494
    }
1495
    HistoryEvent event =
1496
        HistoryEvent.newBuilder()
1✔
1497
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1498
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1499
            .build();
1✔
1500
    ctx.addEvent(event);
1✔
1501
    ctx.setNeedWorkflowTask(true);
1✔
1502
  }
1✔
1503

1504
  private static void timeoutWorkflowTask(
1505
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1506
    WorkflowTaskTimedOutEventAttributes.Builder a =
1507
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1508
            .setStartedEventId(data.startedEventId)
1✔
1509
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1510
            .setScheduledEventId(data.scheduledEventId);
1✔
1511
    HistoryEvent event =
1512
        HistoryEvent.newBuilder()
1✔
1513
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1514
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1515
            .build();
1✔
1516
    ctx.addEvent(event);
1✔
1517
    ctx.setNeedWorkflowTask(true);
1✔
1518
  }
1✔
1519

1520
  private static void needsWorkflowTask(
1521
      RequestContext requestContext,
1522
      WorkflowTaskData workflowTaskData,
1523
      Object notUsedRequest,
1524
      long notUsed) {
1525
    requestContext.setNeedWorkflowTask(true);
1✔
1526
  }
1✔
1527

1528
  private static void completeActivityTask(
1529
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1530
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1531
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1532
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1533
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1534
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1535
    } else {
1536
      throw new IllegalArgumentException("Unknown request: " + request);
×
1537
    }
1538
  }
1✔
1539

1540
  private static void completeActivityTaskByTaskToken(
1541
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1542
    ActivityTaskCompletedEventAttributes.Builder a =
1543
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1544
            .setIdentity(request.getIdentity())
1✔
1545
            .setScheduledEventId(data.scheduledEventId)
1✔
1546
            .setResult(request.getResult())
1✔
1547
            .setStartedEventId(data.startedEventId);
1✔
1548
    HistoryEvent event =
1549
        HistoryEvent.newBuilder()
1✔
1550
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1551
            .setActivityTaskCompletedEventAttributes(a)
1✔
1552
            .build();
1✔
1553
    ctx.addEvent(event);
1✔
1554
  }
1✔
1555

1556
  private static void completeActivityTaskById(
1557
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1558
    ActivityTaskCompletedEventAttributes.Builder a =
1559
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1560
            .setIdentity(request.getIdentity())
×
1561
            .setScheduledEventId(data.scheduledEventId)
×
1562
            .setResult(request.getResult())
×
1563
            .setStartedEventId(data.startedEventId);
×
1564
    HistoryEvent event =
1565
        HistoryEvent.newBuilder()
×
1566
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1567
            .setActivityTaskCompletedEventAttributes(a)
×
1568
            .build();
×
1569
    ctx.addEvent(event);
×
1570
  }
×
1571

1572
  private static State failActivityTask(
1573
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1574
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1575
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1576
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1577
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1578
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1579
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1580
    } else {
1581
      throw new IllegalArgumentException("Unknown request: " + request);
×
1582
    }
1583
  }
1584

1585
  private static State failActivityTaskByRequestType(
1586
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1587
    if (!failure.hasApplicationFailureInfo()) {
1✔
1588
      throw new IllegalArgumentException(
×
1589
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1590
    }
1591
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1592
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1593
      return INITIATED;
1✔
1594
    }
1595
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1596
    ActivityTaskFailedEventAttributes.Builder attributes =
1597
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1598
            .setIdentity(identity)
1✔
1599
            .setScheduledEventId(data.scheduledEventId)
1✔
1600
            .setFailure(failure)
1✔
1601
            .setRetryState(retryState)
1✔
1602
            .setStartedEventId(data.startedEventId);
1✔
1603
    HistoryEvent event =
1604
        HistoryEvent.newBuilder()
1✔
1605
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1606
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1607
            .build();
1✔
1608
    ctx.addEvent(event);
1✔
1609
    return FAILED;
1✔
1610
  }
1611

1612
  private static State timeoutActivityTask(
1613
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1614
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1615

1616
    // chaining with the previous run failure if we are preparing the final failure
1617
    Failure failure =
1✔
1618
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1619

1620
    RetryState retryState;
1621
    switch (timeoutType) {
1✔
1622
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1623
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1624
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1625
        // set a larger ScheduleToStart timeout.
1626
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1627
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
1628
        break;
1✔
1629
      case TIMEOUT_TYPE_START_TO_CLOSE:
1630
      case TIMEOUT_TYPE_HEARTBEAT:
1631
        // not chaining with the previous run failure if we are preparing the failure to be stored
1632
        // for the next iteration
1633
        Optional<Failure> lastFailure =
1✔
1634
            Optional.of(
1✔
1635
                newTimeoutFailure(
1✔
1636
                    timeoutType,
1637
                    // we move heartbeatDetails to the new top level (this cause is used for
1638
                    // scheduleToClose only)
1639
                    Optional.empty(),
1✔
1640
                    // prune to don't have too deep nesting of failures
1641
                    Optional.empty()));
1✔
1642

1643
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
1644
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1645
          return INITIATED;
1✔
1646
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
1647
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
1648
          // attemptActivityRetry();
1649
          // start to close timeout would return as "max attempts reached".
1650

1651
          Preconditions.checkState(
1✔
1652
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
1653
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
1654
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
1655
              timeoutType);
1656

1657
          // heartbeat is preserved as the cause for the scheduleToClose timeout
1658
          // But we effectively omit startToClose timeout with scheduleToClose timeout
1659
          Optional<Failure> cause =
1660
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
1661

1662
          failure =
1✔
1663
              newTimeoutFailure(
1✔
1664
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
1665
                  Optional.ofNullable(data.heartbeatDetails),
1✔
1666
                  cause);
1667
        }
1✔
1668
        break;
1669
      default:
1670
        throw new IllegalStateException(
×
1671
            "Not implemented behavior for timeout type: " + timeoutType);
1672
    }
1673

1674
    ActivityTaskTimedOutEventAttributes.Builder a =
1675
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
1676
            .setScheduledEventId(data.scheduledEventId)
1✔
1677
            .setRetryState(retryState)
1✔
1678
            .setStartedEventId(data.startedEventId)
1✔
1679
            .setFailure(failure);
1✔
1680
    HistoryEvent event =
1681
        HistoryEvent.newBuilder()
1✔
1682
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
1683
            .setActivityTaskTimedOutEventAttributes(a)
1✔
1684
            .build();
1✔
1685
    ctx.addEvent(event);
1✔
1686
    return TIMED_OUT;
1✔
1687
  }
1688

1689
  private static Failure newTimeoutFailure(
1690
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
1691
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
1692
    if (lastHeartbeatDetails.isPresent()) {
1✔
1693
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
1694
    }
1695
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
1696
    if (cause.isPresent()) {
1✔
1697
      result.setCause(cause.get());
1✔
1698
    }
1699
    return result.build();
1✔
1700
  }
1701

1702
  private static RetryState attemptActivityRetry(
1703
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
1704
    if (data.retryState == null) {
1✔
1705
      throw new IllegalStateException("RetryPolicy is always present");
×
1706
    }
1707
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
1708
    if (info.isPresent()) {
1✔
1709
      if (info.get().getNonRetryable()) {
1✔
1710
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
1711
      }
1712
    }
1713
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
1714
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
1715
        data.retryState.getBackoffIntervalInSeconds(
1✔
1716
            info.map(ApplicationFailureInfo::getType), data.store.currentTime());
1✔
1717
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1718
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
1719
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1720
      if (data.heartbeatDetails != null) {
1✔
1721
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
1722
      }
1723
      ctx.onCommit(
1✔
1724
          (historySize) -> {
1725
            data.retryState = nextAttempt;
1✔
1726
            task.setAttempt(nextAttempt.getAttempt());
1✔
1727
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
1728
          });
1✔
1729
    } else {
1✔
1730
      data.nextBackoffInterval = Durations.ZERO;
1✔
1731
    }
1732
    return backoffInterval.getRetryState();
1✔
1733
  }
1734

1735
  private static void reportActivityTaskCancellation(
1736
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1737
    Payloads details = null;
1✔
1738
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
1739
      {
1740
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
1741

1742
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
1743
      }
1✔
1744
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
1745
      {
1746
        RespondActivityTaskCanceledByIdRequest cr =
×
1747
            (RespondActivityTaskCanceledByIdRequest) request;
1748
        details = cr.hasDetails() ? cr.getDetails() : null;
×
1749
      }
×
1750
    } else if (request != null) {
1✔
1751
      throw Status.INTERNAL
×
1752
          .withDescription("Unexpected request type: " + request)
×
1753
          .asRuntimeException();
×
1754
    }
1755
    ActivityTaskCanceledEventAttributes.Builder a =
1756
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
1757
            .setScheduledEventId(data.scheduledEventId)
1✔
1758
            .setStartedEventId(data.startedEventId);
1✔
1759
    if (details != null) {
1✔
1760
      a.setDetails(details);
×
1761
    }
1762
    HistoryEvent event =
1763
        HistoryEvent.newBuilder()
1✔
1764
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
1765
            .setActivityTaskCanceledEventAttributes(a)
1✔
1766
            .build();
1✔
1767
    ctx.addEvent(event);
1✔
1768
  }
1✔
1769

1770
  private static void heartbeatActivityTask(
1771
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
1772
    data.heartbeatDetails = details;
1✔
1773
  }
1✔
1774

1775
  private static void acceptUpdate(
1776
      RequestContext ctx,
1777
      UpdateWorkflowExecutionData data,
1778
      Message msg,
1779
      long workflowTaskCompletedEventId) {
1780
    try {
1781
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
1782

1783
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
1784
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
1785
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
1786
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
1787
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
1788
              .setAcceptedRequest(data.initialRequest)
1✔
1789
              .build();
1✔
1790

1791
      HistoryEvent event =
1792
          HistoryEvent.newBuilder()
1✔
1793
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
1794
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
1795
              .build();
1✔
1796
      // If the workflow is finished we can't write more events
1797
      // to history so if the message was processed after the workflow
1798
      // was closed there is nothing we can do.
1799
      // The real server also has this same problem
1800
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1801
        ctx.addEvent(event);
1✔
1802
      }
1803

1804
      UpdateWorkflowExecutionResponse response =
1805
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1806
              .setUpdateRef(
1✔
1807
                  UpdateRef.newBuilder()
1✔
1808
                      .setWorkflowExecution(ctx.getExecution())
1✔
1809
                      .setUpdateId(data.id))
1✔
1810
              .build();
1✔
1811

1812
      data.acceptance.complete(response);
1✔
1813
    } catch (InvalidProtocolBufferException e) {
×
1814
      throw new RuntimeException(e);
×
1815
    }
1✔
1816
  }
1✔
1817

1818
  private static void completeUpdate(
1819
      RequestContext ctx,
1820
      UpdateWorkflowExecutionData data,
1821
      Message msg,
1822
      long workflowTaskCompletedEventId) {
1823
    try {
1824
      Response response = msg.getBody().unpack(Response.class);
1✔
1825

1826
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
1827
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
1828
              .setMeta(response.getMeta())
1✔
1829
              .setOutcome(response.getOutcome())
1✔
1830
              .build();
1✔
1831

1832
      HistoryEvent event =
1833
          HistoryEvent.newBuilder()
1✔
1834
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
1835
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
1836
              .build();
1✔
1837
      // If the workflow is finished we can't write more events
1838
      // to history so if the message was processed after the workflow
1839
      // was closed there is nothing we can do.
1840
      // The real server also has this same problem
1841
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
1842
        ctx.addEvent(event);
1✔
1843
      }
1844

1845
      UpdateWorkflowExecutionResponse updateResponse =
1846
          UpdateWorkflowExecutionResponse.newBuilder()
1✔
1847
              .setUpdateRef(
1✔
1848
                  UpdateRef.newBuilder()
1✔
1849
                      .setWorkflowExecution(ctx.getExecution())
1✔
1850
                      .setUpdateId(data.id))
1✔
1851
              .setOutcome(response.getOutcome())
1✔
1852
              .build();
1✔
1853

1854
      data.complete.complete(updateResponse);
1✔
1855
    } catch (InvalidProtocolBufferException e) {
×
1856
      throw new RuntimeException(e);
×
1857
    }
1✔
1858
  }
1✔
1859

1860
  private static void startTimer(
1861
      RequestContext ctx,
1862
      TimerData data,
1863
      StartTimerCommandAttributes d,
1864
      long workflowTaskCompletedEventId) {
1865
    TimerStartedEventAttributes.Builder a =
1866
        TimerStartedEventAttributes.newBuilder()
1✔
1867
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1868
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
1869
            .setTimerId(d.getTimerId());
1✔
1870
    HistoryEvent event =
1871
        HistoryEvent.newBuilder()
1✔
1872
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
1873
            .setTimerStartedEventAttributes(a)
1✔
1874
            .build();
1✔
1875
    long startedEventId = ctx.addEvent(event);
1✔
1876
    ctx.onCommit(
1✔
1877
        (historySize) -> {
1878
          data.startedEvent = a.build();
1✔
1879
          data.startedEventId = startedEventId;
1✔
1880
        });
1✔
1881
  }
1✔
1882

1883
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1884
    TimerFiredEventAttributes.Builder a =
1885
        TimerFiredEventAttributes.newBuilder()
1✔
1886
            .setTimerId(data.startedEvent.getTimerId())
1✔
1887
            .setStartedEventId(data.startedEventId);
1✔
1888
    HistoryEvent event =
1889
        HistoryEvent.newBuilder()
1✔
1890
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
1891
            .setTimerFiredEventAttributes(a)
1✔
1892
            .build();
1✔
1893
    ctx.addEvent(event);
1✔
1894
  }
1✔
1895

1896
  private static void cancelTimer(
1897
      RequestContext ctx,
1898
      TimerData data,
1899
      CancelTimerCommandAttributes d,
1900
      long workflowTaskCompletedEventId) {
1901
    TimerCanceledEventAttributes.Builder a =
1902
        TimerCanceledEventAttributes.newBuilder()
1✔
1903
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1904
            .setTimerId(d.getTimerId())
1✔
1905
            .setStartedEventId(data.startedEventId);
1✔
1906
    HistoryEvent event =
1907
        HistoryEvent.newBuilder()
1✔
1908
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
1909
            .setTimerCanceledEventAttributes(a)
1✔
1910
            .build();
1✔
1911
    ctx.addEvent(event);
1✔
1912
  }
1✔
1913

1914
  private static void initiateExternalSignal(
1915
      RequestContext ctx,
1916
      SignalExternalData data,
1917
      SignalExternalWorkflowExecutionCommandAttributes d,
1918
      long workflowTaskCompletedEventId) {
1919
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1920
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1921
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1922
            .setControl(d.getControl())
1✔
1923
            .setInput(d.getInput())
1✔
1924
            .setNamespace(d.getNamespace())
1✔
1925
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1926
            .setSignalName(d.getSignalName())
1✔
1927
            .setWorkflowExecution(d.getExecution());
1✔
1928

1929
    HistoryEvent event =
1930
        HistoryEvent.newBuilder()
1✔
1931
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
1932
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
1933
            .build();
1✔
1934
    long initiatedEventId = ctx.addEvent(event);
1✔
1935
    ctx.onCommit(
1✔
1936
        (historySize) -> {
1937
          data.initiatedEventId = initiatedEventId;
1✔
1938
          data.initiatedEvent = a.build();
1✔
1939
        });
1✔
1940
  }
1✔
1941

1942
  private static void failExternalSignal(
1943
      RequestContext ctx,
1944
      SignalExternalData data,
1945
      SignalExternalWorkflowExecutionFailedCause cause,
1946
      long notUsed) {
1947
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1948
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
1949
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1950
            .setInitiatedEventId(data.initiatedEventId)
1✔
1951
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
1952
            .setControl(initiatedEvent.getControl())
1✔
1953
            .setCause(cause)
1✔
1954
            .setNamespace(initiatedEvent.getNamespace());
1✔
1955
    HistoryEvent event =
1956
        HistoryEvent.newBuilder()
1✔
1957
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
1958
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
1959
            .build();
1✔
1960
    ctx.addEvent(event);
1✔
1961
  }
1✔
1962

1963
  private static void completeExternalSignal(
1964
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1965
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1966
    WorkflowExecution signaledExecution =
1✔
1967
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
1968
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
1969
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
1970
            .setInitiatedEventId(data.initiatedEventId)
1✔
1971
            .setWorkflowExecution(signaledExecution)
1✔
1972
            .setControl(initiatedEvent.getControl())
1✔
1973
            .setNamespace(initiatedEvent.getNamespace());
1✔
1974
    HistoryEvent event =
1975
        HistoryEvent.newBuilder()
1✔
1976
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
1977
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
1978
            .build();
1✔
1979
    ctx.addEvent(event);
1✔
1980
  }
1✔
1981

1982
  private static void initiateExternalCancellation(
1983
      RequestContext ctx,
1984
      CancelExternalData data,
1985
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
1986
      long workflowTaskCompletedEventId) {
1987
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
1988
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1989
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1990
            .setControl(d.getControl())
1✔
1991
            .setNamespace(d.getNamespace())
1✔
1992
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
1993
            .setWorkflowExecution(
1✔
1994
                WorkflowExecution.newBuilder()
1✔
1995
                    .setWorkflowId(d.getWorkflowId())
1✔
1996
                    .setRunId(d.getRunId())
1✔
1997
                    .build());
1✔
1998

1999
    HistoryEvent event =
2000
        HistoryEvent.newBuilder()
1✔
2001
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2002
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2003
            .build();
1✔
2004
    long initiatedEventId = ctx.addEvent(event);
1✔
2005
    ctx.onCommit(
1✔
2006
        (historySize) -> {
2007
          data.initiatedEventId = initiatedEventId;
1✔
2008
          data.initiatedEvent = a.build();
1✔
2009
        });
1✔
2010
  }
1✔
2011

2012
  private static void reportExternalCancellationRequested(
2013
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2014
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2015
        data.initiatedEvent;
2016
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2017
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2018
            .setInitiatedEventId(data.initiatedEventId)
1✔
2019
            .setWorkflowExecution(
1✔
2020
                WorkflowExecution.newBuilder()
1✔
2021
                    .setRunId(runId)
1✔
2022
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2023
                    .build())
1✔
2024
            .setNamespace(initiatedEvent.getNamespace());
1✔
2025
    HistoryEvent event =
2026
        HistoryEvent.newBuilder()
1✔
2027
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2028
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2029
            .build();
1✔
2030
    ctx.addEvent(event);
1✔
2031
  }
1✔
2032

2033
  private static void failExternalCancellation(
2034
      RequestContext ctx,
2035
      CancelExternalData data,
2036
      CancelExternalWorkflowExecutionFailedCause cause,
2037
      long notUsed) {
2038
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2039
        data.initiatedEvent;
2040
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2041
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2042
            .setInitiatedEventId(data.initiatedEventId)
×
2043
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2044
            .setControl(initiatedEvent.getControl())
×
2045
            .setCause(cause)
×
2046
            .setNamespace(initiatedEvent.getNamespace());
×
2047
    HistoryEvent event =
2048
        HistoryEvent.newBuilder()
×
2049
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2050
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2051
            .build();
×
2052
    ctx.addEvent(event);
×
2053
  }
×
2054

2055
  // Mimics the default activity retry policy of a standard Temporal server.
2056
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2057
    Duration initialInterval =
2058
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2059
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2060
            : originalPolicy.getInitialInterval();
1✔
2061

2062
    return RetryPolicy.newBuilder()
1✔
2063
        .setInitialInterval(initialInterval)
1✔
2064
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2065
        .setMaximumInterval(
1✔
2066
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2067
                ? Durations.fromMillis(
1✔
2068
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2069
                        * Durations.toMillis(initialInterval))
1✔
2070
                : originalPolicy.getMaximumInterval())
1✔
2071
        .setBackoffCoefficient(
1✔
2072
            originalPolicy.getBackoffCoefficient() == 0
1✔
2073
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2074
                : originalPolicy.getBackoffCoefficient())
1✔
2075
        .setMaximumAttempts(
1✔
2076
            originalPolicy.getMaximumAttempts() == 0
1✔
2077
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2078
                : originalPolicy.getMaximumAttempts())
1✔
2079
        .build();
1✔
2080
  }
2081
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc