• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #302

15 Aug 2024 04:56PM UTC coverage: 77.456% (-0.3%) from 77.71%
#302

push

github

web-flow
Implement test server support for sync Nexus operation commands (#2176)

* Implement test server support for sync Nexus operations

* Nexus operations command implementations

* test cleanup

* cleanup

* tests

334 of 524 new or added lines in 9 files covered. (63.74%)

9 existing lines in 3 files now uncovered.

20298 of 26206 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.46
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
24
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
25
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
26
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
27
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
28
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
29
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
30
import static io.temporal.internal.testservice.StateMachines.Action.START;
31
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
32
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
33
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
35
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
37
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
38
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
39
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
40
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
41
import static io.temporal.internal.testservice.StateMachines.State.NONE;
42
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
43
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
44
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
45

46
import com.google.common.base.Preconditions;
47
import com.google.protobuf.*;
48
import com.google.protobuf.util.Durations;
49
import com.google.protobuf.util.Timestamps;
50
import io.grpc.Status;
51
import io.grpc.StatusRuntimeException;
52
import io.temporal.api.command.v1.*;
53
import io.temporal.api.common.v1.Payload;
54
import io.temporal.api.common.v1.Payloads;
55
import io.temporal.api.common.v1.RetryPolicy;
56
import io.temporal.api.common.v1.WorkflowExecution;
57
import io.temporal.api.enums.v1.*;
58
import io.temporal.api.errordetails.v1.QueryFailedFailure;
59
import io.temporal.api.failure.v1.ApplicationFailureInfo;
60
import io.temporal.api.failure.v1.Failure;
61
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
62
import io.temporal.api.failure.v1.TimeoutFailureInfo;
63
import io.temporal.api.history.v1.*;
64
import io.temporal.api.nexus.v1.Endpoint;
65
import io.temporal.api.nexus.v1.StartOperationRequest;
66
import io.temporal.api.nexus.v1.StartOperationResponse;
67
import io.temporal.api.nexus.v1.UnsuccessfulOperationError;
68
import io.temporal.api.protocol.v1.Message;
69
import io.temporal.api.query.v1.WorkflowQueryResult;
70
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
71
import io.temporal.api.taskqueue.v1.TaskQueue;
72
import io.temporal.api.update.v1.*;
73
import io.temporal.api.workflowservice.v1.*;
74
import io.temporal.internal.common.ProtobufTimeUtils;
75
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
76
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
77
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
78
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
79
import io.temporal.serviceclient.StatusUtils;
80
import io.temporal.workflow.Functions;
81
import java.util.*;
82
import java.util.concurrent.CompletableFuture;
83
import java.util.concurrent.ForkJoinPool;
84
import java.util.stream.Collectors;
85
import javax.annotation.Nonnull;
86
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
88

89
class StateMachines {
×
90

91
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
92

93
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
94
      10L * 365 * 24 * 3600 * 1000;
95
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
96
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
97
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
98
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
99
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
100
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
101
  static final int NO_EVENT_ID = -1;
102

103
  enum State {
1✔
104
    NONE,
1✔
105
    INITIATED,
1✔
106
    STARTED,
1✔
107
    FAILED,
1✔
108
    TIMED_OUT,
1✔
109
    CANCELLATION_REQUESTED,
1✔
110
    CANCELED,
1✔
111
    COMPLETED,
1✔
112
    CONTINUED_AS_NEW,
1✔
113
    TERMINATED,
1✔
114
  }
115

116
  enum Action {
1✔
117
    INITIATE,
1✔
118
    START,
1✔
119
    FAIL,
1✔
120
    TIME_OUT,
1✔
121
    REQUEST_CANCELLATION,
1✔
122
    CANCEL,
1✔
123
    TERMINATE,
1✔
124
    UPDATE,
1✔
125
    COMPLETE,
1✔
126
    CONTINUE_AS_NEW,
1✔
127
    QUERY,
1✔
128
    UPDATE_WORKFLOW_EXECUTION,
1✔
129
  }
130

131
  static final class WorkflowData {
132
    Optional<TestServiceRetryState> retryState;
133
    Duration backoffStartInterval;
134
    String cronSchedule;
135
    Payloads lastCompletionResult;
136
    Optional<Failure> lastFailure;
137
    /**
138
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
139
     */
140
    final @Nonnull String firstExecutionRunId;
141
    /**
142
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
143
     */
144
    final @Nonnull String originalExecutionRunId;
145

146
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
147
    Optional<String> continuedExecutionRunId;
148

149
    Functions.Proc runTimerCancellationHandle;
150

151
    WorkflowData(
152
        Optional<TestServiceRetryState> retryState,
153
        Duration backoffStartInterval,
154
        String cronSchedule,
155
        Payloads lastCompletionResult,
156
        Optional<Failure> lastFailure,
157
        @Nonnull String firstExecutionRunId,
158
        @Nonnull String originalExecutionRunId,
159
        Optional<String> continuedExecutionRunId) {
1✔
160
      this.retryState = retryState;
1✔
161
      this.backoffStartInterval = backoffStartInterval;
1✔
162
      this.cronSchedule = cronSchedule;
1✔
163
      this.lastCompletionResult = lastCompletionResult;
1✔
164
      this.firstExecutionRunId =
1✔
165
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
166
      this.originalExecutionRunId =
1✔
167
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
168
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
169
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
170
    }
1✔
171

172
    @Override
173
    public String toString() {
174
      return "WorkflowData{"
×
175
          + "retryState="
176
          + retryState
177
          + ", backoffStartInterval="
178
          + backoffStartInterval
179
          + ", cronSchedule='"
180
          + cronSchedule
181
          + '\''
182
          + ", lastCompletionResult="
183
          + lastCompletionResult
184
          + ", firstExecutionRunId='"
185
          + firstExecutionRunId
186
          + '\''
187
          + ", originalExecutionRunId='"
188
          + originalExecutionRunId
189
          + '\''
190
          + ", continuedExecutionRunId="
191
          + continuedExecutionRunId
192
          + '}';
193
    }
194
  }
195

196
  static final class WorkflowTaskData {
197

198
    final TestWorkflowStore store;
199

200
    boolean workflowCompleted;
201

202
    /** id of the last started event which completed successfully */
203
    long lastSuccessfulStartedEventId;
204

205
    final StartWorkflowExecutionRequest startRequest;
206

207
    long startedEventId = NO_EVENT_ID;
1✔
208

209
    PollWorkflowTaskQueueResponse.Builder workflowTask;
210

211
    /**
212
     * Events that are added during execution of a workflow task. They have to be buffered to be
213
     * added after the events generated by a workflow task. Without this the determinism will be
214
     * broken on replay.
215
     */
216
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
217

218
    /**
219
     * Update requests that are added during execution of a workflow task. They have to be buffered
220
     * to be added to the next workflow task.
221
     */
222
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
223

224
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
225

226
    long scheduledEventId = NO_EVENT_ID;
1✔
227

228
    int attempt = 0;
1✔
229

230
    /** Query requests received during workflow task processing (after start) */
231
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
232

233
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
234
        new HashMap<>();
235

236
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
237
      this.store = store;
1✔
238
      this.startRequest = startRequest;
1✔
239
    }
1✔
240

241
    void clear() {
242
      startedEventId = NO_EVENT_ID;
1✔
243
      workflowTask = null;
1✔
244
      scheduledEventId = NO_EVENT_ID;
1✔
245
      attempt = 0;
1✔
246
    }
1✔
247

248
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
249
      return Optional.ofNullable(
1✔
250
          updateRequest.getOrDefault(
1✔
251
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
252
    }
253

254
    @Override
255
    public String toString() {
256
      return "WorkflowTaskData{"
×
257
          + "store="
258
          + store
259
          + ", workflowCompleted="
260
          + workflowCompleted
261
          + ", lastSuccessfulStartedEventId="
262
          + lastSuccessfulStartedEventId
263
          + ", startRequest="
264
          + startRequest
265
          + ", startedEventId="
266
          + startedEventId
267
          + ", workflowTask="
268
          + workflowTask
269
          + ", bufferedEvents="
270
          + bufferedEvents
271
          + ", scheduledEventId="
272
          + scheduledEventId
273
          + ", attempt="
274
          + attempt
275
          + ", queryBuffer="
276
          + queryBuffer
277
          + ", consistentQueryRequests="
278
          + consistentQueryRequests
279
          + ", updateRequest="
280
          + updateRequest
281
          + ", updateRequestBuffer="
282
          + updateRequestBuffer
283
          + '}';
284
    }
285
  }
286

287
  static final class ActivityTaskData {
288

289
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
290
    ActivityTaskScheduledEventAttributes scheduledEvent;
291
    ActivityTask activityTask;
292

293
    final TestWorkflowStore store;
294

295
    long scheduledEventId = NO_EVENT_ID;
1✔
296
    long startedEventId = NO_EVENT_ID;
1✔
297
    public HistoryEvent startedEvent;
298
    Payloads heartbeatDetails;
299
    long lastHeartbeatTime;
300
    TestServiceRetryState retryState;
301
    Duration nextBackoffInterval;
302
    String identity;
303

304
    ActivityTaskData(
305
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
306
      this.store = store;
1✔
307
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
308
    }
1✔
309

310
    @Override
311
    public String toString() {
312
      return "ActivityTaskData{"
×
313
          + "startWorkflowExecutionRequest="
314
          + startWorkflowExecutionRequest
315
          + ", scheduledEvent="
316
          + scheduledEvent
317
          + ", activityTask="
318
          + activityTask
319
          + ", store="
320
          + store
321
          + ", scheduledEventId="
322
          + scheduledEventId
323
          + ", startedEventId="
324
          + startedEventId
325
          + ", startedEvent="
326
          + startedEvent
327
          + ", heartbeatDetails="
328
          + heartbeatDetails
329
          + ", lastHeartbeatTime="
330
          + lastHeartbeatTime
331
          + ", retryState="
332
          + retryState
333
          + ", nextBackoffInterval="
334
          + nextBackoffInterval
335
          + '}';
336
    }
337

338
    public int getAttempt() {
339
      return retryState != null ? retryState.getAttempt() : 1;
1✔
340
    }
341
  }
342

343
  static final class NexusOperationData {
344

345
    String operationId;
346
    Endpoint endpoint;
347
    NexusOperationScheduledEventAttributes scheduledEvent;
348
    TestWorkflowStore.NexusTask nexusTask;
349

350
    long scheduledEventId = NO_EVENT_ID;
1✔
351
    // TODO(pj): consider refactoring cancellation into its own state machine as part of async work
352
    boolean cancelRequested = false;
1✔
353

354
    TestServiceRetryState retryState;
355
    long lastAttemptCompleteTime;
356
    Duration nextBackoffInterval;
357
    long nextAttemptScheduleTime;
358
    String identity;
359

360
    public NexusOperationData(Endpoint endpoint) {
1✔
361
      this.operationId = UUID.randomUUID().toString();
1✔
362
      this.endpoint = endpoint;
1✔
363
    }
1✔
364

365
    public int getAttempt() {
366
      return retryState != null ? retryState.getAttempt() : 1;
1✔
367
    }
368

369
    @Override
370
    public String toString() {
371
      return "NexusOperationData{"
1✔
372
          + ", nexusEndpoint="
373
          + endpoint
374
          + ", scheduledEvent="
375
          + scheduledEvent
376
          + ", nexusTask="
377
          + nexusTask
378
          + ", scheduledEventId="
379
          + scheduledEventId
380
          + ", retryState="
381
          + retryState
382
          + ", nextBackoffInterval="
383
          + nextBackoffInterval
384
          + '}';
385
    }
386
  }
387

388
  static final class SignalExternalData {
1✔
389
    long initiatedEventId = NO_EVENT_ID;
1✔
390
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
391

392
    @Override
393
    public String toString() {
394
      return "SignalExternalData{"
×
395
          + "initiatedEventId="
396
          + initiatedEventId
397
          + ", initiatedEvent="
398
          + initiatedEvent
399
          + '}';
400
    }
401
  }
402

403
  static final class CancelExternalData {
1✔
404
    long initiatedEventId = NO_EVENT_ID;
1✔
405
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
406

407
    @Override
408
    public String toString() {
409
      return "CancelExternalData{"
×
410
          + "initiatedEventId="
411
          + initiatedEventId
412
          + ", initiatedEvent="
413
          + initiatedEvent
414
          + '}';
415
    }
416
  }
417

418
  static final class ChildWorkflowData {
419

420
    final TestWorkflowService service;
421
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
422
    long initiatedEventId;
423
    long startedEventId;
424
    WorkflowExecution execution;
425

426
    public ChildWorkflowData(TestWorkflowService service) {
1✔
427
      this.service = service;
1✔
428
    }
1✔
429

430
    @Override
431
    public String toString() {
432
      return "ChildWorkflowData{"
×
433
          + "service="
434
          + service
435
          + ", initiatedEvent="
436
          + initiatedEvent
437
          + ", initiatedEventId="
438
          + initiatedEventId
439
          + ", startedEventId="
440
          + startedEventId
441
          + ", execution="
442
          + execution
443
          + '}';
444
    }
445
  }
446

447
  static final class TimerData {
1✔
448
    TimerStartedEventAttributes startedEvent;
449
    public long startedEventId;
450

451
    @Override
452
    public String toString() {
453
      return "TimerData{"
×
454
          + "startedEvent="
455
          + startedEvent
456
          + ", startedEventId="
457
          + startedEventId
458
          + '}';
459
    }
460
  }
461

462
  /** Represents an accepted update workflow execution request */
463
  static final class UpdateWorkflowExecutionData {
464
    final String id;
465
    final CompletableFuture<Boolean> accepted;
466
    final CompletableFuture<Outcome> outcome;
467
    final Request initialRequest;
468

469
    public UpdateWorkflowExecutionData(
470
        String id,
471
        Request initialRequest,
472
        CompletableFuture<Boolean> accepted,
473
        CompletableFuture<Outcome> outcome) {
1✔
474
      this.id = id;
1✔
475
      this.initialRequest = initialRequest;
1✔
476
      this.accepted = accepted;
1✔
477
      this.outcome = outcome;
1✔
478
    }
1✔
479

480
    @Override
481
    public String toString() {
482
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
483
    }
484
  }
485

486
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
487
    return new StateMachine<>(data)
1✔
488
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
489
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
490
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
491
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
492
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
493
        .add(
1✔
494
            STARTED,
495
            REQUEST_CANCELLATION,
496
            CANCELLATION_REQUESTED,
497
            StateMachines::requestWorkflowCancellation)
498
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
499
        .add(
1✔
500
            CANCELLATION_REQUESTED,
501
            REQUEST_CANCELLATION,
502
            CANCELLATION_REQUESTED,
503
            StateMachines::noop)
504
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
505
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
506
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
507
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
508
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
509
  }
510

511
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
512
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
513
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
514
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
515
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
516
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
517
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
518
        // StateMachines::queryWhileScheduled)
519
        //        .add(
520
        //            INITIATED_QUERY_ONLY,
521
        //            INITIATE,
522
        //            INITIATED,
523
        //            StateMachines::convertQueryWorkflowTaskToReal)
524
        //        .add(
525
        //            INITIATED_QUERY_ONLY,
526
        //            START,
527
        //            STARTED_QUERY_ONLY,
528
        //            StateMachines::startQueryOnlyWorkflowTask)
529
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
530
        // StateMachines::needsWorkflowTask)
531
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
532
        // StateMachines::needsWorkflowTaskDueToQuery)
533
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
534
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
535
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
536
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
537
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
538
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
539
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
540
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
541
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
542
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
543
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
544
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
545
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
546
  }
547

548
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
549
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
550
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
551
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
552
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
553
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
554
        .add(
1✔
555
            INITIATED,
556
            REQUEST_CANCELLATION,
557
            CANCELLATION_REQUESTED,
558
            StateMachines::requestActivityCancellation)
559
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
560
        // Transitions to initiated in case of a retry
561
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
562
        // Transitions to initiated in case of a retry
563
        .add(
1✔
564
            STARTED,
565
            TIME_OUT,
566
            new State[] {TIMED_OUT, INITIATED},
567
            StateMachines::timeoutActivityTask)
568
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
569
        .add(
1✔
570
            STARTED,
571
            REQUEST_CANCELLATION,
572
            CANCELLATION_REQUESTED,
573
            StateMachines::requestActivityCancellation)
574
        .add(
1✔
575
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
576
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
577
        .add(
1✔
578
            CANCELLATION_REQUESTED,
579
            UPDATE,
580
            CANCELLATION_REQUESTED,
581
            StateMachines::heartbeatActivityTask)
582
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
583
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
584
  }
585

586
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
587
      TestWorkflowService service) {
588
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
589
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
590
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
591
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
592
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
593
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
594
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
595
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
596
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
597
  }
598

599
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
600
      String updateId,
601
      Request initialRequest,
602
      CompletableFuture<Boolean> accepted,
603
      CompletableFuture<Outcome> outcome) {
604
    return new StateMachine<>(
1✔
605
            new UpdateWorkflowExecutionData(updateId, initialRequest, accepted, outcome))
606
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
607
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
608
  }
609

610
  public static StateMachine<NexusOperationData> newNexusOperation(Endpoint endpoint) {
611
    return new StateMachine<>(new NexusOperationData(endpoint))
1✔
612
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleNexusOperation)
1✔
613
        .add(INITIATED, START, STARTED, StateMachines::startNexusOperation)
1✔
614
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
615
        .add(
1✔
616
            INITIATED,
617
            REQUEST_CANCELLATION,
618
            CANCELLATION_REQUESTED,
619
            StateMachines::requestCancelNexusOperation)
620
        // Transitions directly from INITIATED to COMPLETE for sync completions
621
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
622
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
623
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
624
        // Transitions to initiated in case of a retry
625
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failNexusOperation)
1✔
626
        // Transitions to initiated in case of a retry
627
        .add(
1✔
628
            STARTED,
629
            TIME_OUT,
630
            new State[] {TIMED_OUT, INITIATED},
631
            StateMachines::timeoutNexusOperation)
632
        .add(
1✔
633
            STARTED,
634
            REQUEST_CANCELLATION,
635
            CANCELLATION_REQUESTED,
636
            StateMachines::requestCancelNexusOperation)
637
        .add(
1✔
638
            CANCELLATION_REQUESTED,
639
            CANCEL,
640
            CANCELED,
641
            StateMachines::reportNexusOperationCancellation)
642
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
643
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
644
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failNexusOperation);
1✔
645
  }
646

647
  public static StateMachine<TimerData> newTimerStateMachine() {
648
    return new StateMachine<>(new TimerData())
1✔
649
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
650
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
651
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
652
  }
653

654
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
655
    return new StateMachine<>(new SignalExternalData())
1✔
656
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
657
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
658
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
659
  }
660

661
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
662
    return new StateMachine<>(new CancelExternalData())
1✔
663
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
664
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
665
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
666
  }
667

668
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
669

670
  private static void scheduleNexusOperation(
671
      RequestContext ctx,
672
      NexusOperationData data,
673
      ScheduleNexusOperationCommandAttributes attr,
674
      long workflowTaskCompletedId) {
675
    RetryPolicy retryPolicy = getDefaultNexusOperationRetryPolicy();
1✔
676
    Duration expirationInterval = attr.getScheduleToCloseTimeout();
1✔
677
    Timestamp expirationTime = Timestamps.add(ctx.currentTime(), expirationInterval);
1✔
678
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
679

680
    NexusOperationScheduledEventAttributes.Builder a =
681
        NexusOperationScheduledEventAttributes.newBuilder()
1✔
682
            .setEndpoint(attr.getEndpoint())
1✔
683
            .setEndpointId(data.endpoint.getId())
1✔
684
            .setService(attr.getService())
1✔
685
            .setOperation(attr.getOperation())
1✔
686
            .setInput(attr.getInput())
1✔
687
            .setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
1✔
688
            .putAllNexusHeader(attr.getNexusHeaderMap())
1✔
689
            .setRequestId(UUID.randomUUID().toString())
1✔
690
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
1✔
691

692
    data.scheduledEvent = a.build();
1✔
693
    HistoryEvent event =
694
        HistoryEvent.newBuilder()
1✔
695
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED)
1✔
696
            .setNexusOperationScheduledEventAttributes(a)
1✔
697
            .build();
1✔
698

699
    long scheduledEventId = ctx.addEvent(event);
1✔
700
    NexusTaskToken taskToken =
1✔
701
        new NexusTaskToken(ctx.getExecutionId(), scheduledEventId, data.getAttempt());
1✔
702

703
    PollNexusTaskQueueResponse.Builder pollResponse =
704
        PollNexusTaskQueueResponse.newBuilder()
1✔
705
            .setTaskToken(taskToken.toBytes())
1✔
706
            .setRequest(
1✔
707
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
708
                    .setScheduledTime(ctx.currentTime())
1✔
709
                    .putAllHeader(attr.getNexusHeaderMap())
1✔
710
                    .setStartOperation(
1✔
711
                        StartOperationRequest.newBuilder()
1✔
712
                            .setService(attr.getService())
1✔
713
                            .setOperation(attr.getOperation())
1✔
714
                            .setPayload(attr.getInput())
1✔
715
                            .setCallback(""))); // TODO(pj): support async operations
1✔
716

717
    TaskQueueId taskQueueId =
1✔
718
        new TaskQueueId(
719
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
720
    TestWorkflowStore.NexusTask task = new TestWorkflowStore.NexusTask(taskQueueId, pollResponse);
1✔
721

722
    // Test server only supports worker targets, so just push directly to Nexus task queue without
723
    // invoking Nexus client.
724
    ctx.addNexusTask(task);
1✔
725
    ctx.onCommit(
1✔
726
        historySize -> {
727
          data.scheduledEventId = scheduledEventId;
1✔
728
          data.nexusTask = task;
1✔
729
          data.retryState = retryState;
1✔
730
        });
1✔
731
  }
1✔
732

733
  private static void startNexusOperation(
734
      RequestContext ctx,
735
      NexusOperationData data,
736
      RespondNexusTaskCompletedRequest request,
737
      long notUsed) {
738
    // TODO(pj): support async operations
NEW
739
  }
×
740

741
  private static void completeNexusOperation(
742
      RequestContext ctx, NexusOperationData data, Object request, long notUsed) {
743
    if (request instanceof RespondNexusTaskCompletedRequest) {
1✔
744
      handleSyncStartOperation(
1✔
745
          ctx,
746
          data,
747
          ((RespondNexusTaskCompletedRequest) request).getResponse().getStartOperation());
1✔
748
    } else {
749
      // TODO(pj): support async completion
NEW
750
      throw new IllegalArgumentException("Unknown request: " + request);
×
751
    }
752
  }
1✔
753

754
  private static void handleSyncStartOperation(
755
      RequestContext ctx, NexusOperationData data, StartOperationResponse response) {
756
    if (response.hasSyncSuccess()) {
1✔
757
      handleSyncSuccess(ctx, data, response.getSyncSuccess());
1✔
NEW
758
    } else if (response.hasOperationError()) {
×
NEW
759
      handleUnsuccessfulOperationError(ctx, data, response.getOperationError());
×
760
    } else {
NEW
761
      throw new IllegalArgumentException(
×
762
          "Unable to process StartOperationResponse. Expected SyncSuccess or OperationError.");
763
    }
764
  }
1✔
765

766
  private static void handleSyncSuccess(
767
      RequestContext ctx, NexusOperationData data, StartOperationResponse.Sync response) {
768
    ctx.addEvent(
1✔
769
        HistoryEvent.newBuilder()
1✔
770
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED)
1✔
771
            .setNexusOperationCompletedEventAttributes(
1✔
772
                NexusOperationCompletedEventAttributes.newBuilder()
1✔
773
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
774
                    .setScheduledEventId(data.scheduledEventId)
1✔
775
                    .setResult(response.getPayload()))
1✔
776
            .build());
1✔
777
  }
1✔
778

779
  private static void handleUnsuccessfulOperationError(
780
      RequestContext ctx, NexusOperationData data, UnsuccessfulOperationError err) {
781

782
    Failure f =
NEW
783
        Failure.newBuilder()
×
NEW
784
            .setMessage("nexus operation completed unsuccessfully")
×
NEW
785
            .setNexusOperationExecutionFailureInfo(
×
NEW
786
                NexusOperationFailureInfo.newBuilder()
×
NEW
787
                    .setEndpoint(data.endpoint.getSpec().getName())
×
NEW
788
                    .setService(data.scheduledEvent.getService())
×
NEW
789
                    .setOperation(data.scheduledEvent.getOperation())
×
NEW
790
                    .setOperationId(data.operationId)
×
NEW
791
                    .setScheduledEventId(data.scheduledEventId))
×
NEW
792
            .setCause(nexusFailureToApplicationFailure(err.getFailure()))
×
NEW
793
            .build();
×
794

795
    HistoryEvent event;
NEW
796
    if (data.cancelRequested) {
×
797
      event =
NEW
798
          HistoryEvent.newBuilder()
×
NEW
799
              .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED)
×
NEW
800
              .setNexusOperationCanceledEventAttributes(
×
NEW
801
                  NexusOperationCanceledEventAttributes.newBuilder()
×
NEW
802
                      .setRequestId(data.scheduledEvent.getRequestId())
×
NEW
803
                      .setScheduledEventId(data.scheduledEventId)
×
NEW
804
                      .setFailure(f))
×
NEW
805
              .build();
×
806
    } else {
807
      event =
NEW
808
          HistoryEvent.newBuilder()
×
NEW
809
              .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED)
×
NEW
810
              .setNexusOperationFailedEventAttributes(
×
NEW
811
                  NexusOperationFailedEventAttributes.newBuilder()
×
NEW
812
                      .setRequestId(data.scheduledEvent.getRequestId())
×
NEW
813
                      .setScheduledEventId(data.scheduledEventId)
×
NEW
814
                      .setFailure(f))
×
NEW
815
              .build();
×
816
    }
817

NEW
818
    ctx.addEvent(event);
×
NEW
819
  }
×
820

821
  private static State timeoutNexusOperation(
822
      RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
823
    if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
1✔
NEW
824
      throw new IllegalArgumentException(
×
825
          "Timeout type not supported for Nexus operations: " + timeoutType);
826
    }
827

828
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
829

830
    // chaining with the previous run failure if we are preparing the final failure
831
    Failure failure = newTimeoutFailure(timeoutType, Optional.empty(), previousFailure);
1✔
832

833
    // not chaining with the previous run failure if we are preparing the failure to be stored
834
    // for the next iteration
835
    Optional<Failure> lastFailure =
1✔
836
        Optional.of(newTimeoutFailure(timeoutType, Optional.empty(), Optional.empty()));
1✔
837
    RetryState retryState = attemptNexusOperationRetry(ctx, lastFailure, data);
1✔
838
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
NEW
839
      return INITIATED;
×
840
    }
841

842
    ctx.addEvent(
1✔
843
        HistoryEvent.newBuilder()
1✔
844
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT)
1✔
845
            .setNexusOperationTimedOutEventAttributes(
1✔
846
                NexusOperationTimedOutEventAttributes.newBuilder()
1✔
847
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
848
                    .setScheduledEventId(data.scheduledEventId)
1✔
849
                    .setFailure(failure))
1✔
850
            .build());
1✔
851

852
    return TIMED_OUT;
1✔
853
  }
854

855
  private static State failNexusOperation(
856
      RequestContext ctx,
857
      NexusOperationData data,
858
      RespondNexusTaskFailedRequest request,
859
      long notUsed) {
NEW
860
    if (!request.hasError()) {
×
NEW
861
      throw new IllegalArgumentException(
×
862
          "Nexus handler error not set on RespondNexusTaskFailedRequest");
863
    }
864

NEW
865
    Failure failure = nexusFailureToApplicationFailure(request.getError().getFailure());
×
NEW
866
    RetryState retryState = attemptNexusOperationRetry(ctx, Optional.of(failure), data);
×
NEW
867
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
×
NEW
868
      return INITIATED;
×
869
    }
870

NEW
871
    ctx.addEvent(
×
NEW
872
        HistoryEvent.newBuilder()
×
NEW
873
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED)
×
NEW
874
            .setNexusOperationFailedEventAttributes(
×
NEW
875
                NexusOperationFailedEventAttributes.newBuilder()
×
NEW
876
                    .setRequestId(data.scheduledEvent.getRequestId())
×
NEW
877
                    .setScheduledEventId(data.scheduledEventId)
×
NEW
878
                    .setFailure(failure))
×
NEW
879
            .build());
×
NEW
880
    return FAILED;
×
881
  }
882

883
  private static RetryState attemptNexusOperationRetry(
884
      RequestContext ctx, Optional<Failure> failure, NexusOperationData data) {
885
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
886
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
887
    if (info.isPresent()) {
1✔
888
      if (info.get().getNonRetryable()) {
1✔
NEW
889
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
×
890
      }
891
      if (info.get().hasNextRetryDelay()) {
1✔
NEW
892
        nextRetryDelay =
×
NEW
893
            Optional.of(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
×
894
      }
895
    }
896

897
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
898
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
899
        data.retryState.getBackoffIntervalInSeconds(
1✔
900
            info.map(ApplicationFailureInfo::getType), ctx.currentTime(), nextRetryDelay);
1✔
901
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
NEW
902
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
×
NEW
903
      PollNexusTaskQueueResponse.Builder task = data.nexusTask.getTask();
×
NEW
904
      ctx.onCommit(
×
905
          (historySize) -> {
NEW
906
            data.retryState = nextAttempt;
×
NEW
907
            data.nextAttemptScheduleTime = ctx.currentTime().getSeconds();
×
NEW
908
          });
×
NEW
909
    } else {
×
910
      data.nextBackoffInterval = Durations.ZERO;
1✔
911
    }
912
    return backoffInterval.getRetryState();
1✔
913
  }
914

915
  private static Failure nexusFailureToApplicationFailure(
916
      io.temporal.api.nexus.v1.Failure failure) {
NEW
917
    return Failure.newBuilder()
×
NEW
918
        .setMessage(failure.getMessage())
×
NEW
919
        .setApplicationFailureInfo(
×
NEW
920
            ApplicationFailureInfo.newBuilder()
×
NEW
921
                .setType("NexusOperationFailure")
×
NEW
922
                .setNonRetryable(true)
×
NEW
923
                .setDetails(
×
NEW
924
                    Payloads.newBuilder()
×
NEW
925
                        .addPayloads(
×
NEW
926
                            Payload.newBuilder()
×
NEW
927
                                .putAllMetadata(
×
NEW
928
                                    failure.getMetadataMap().entrySet().stream()
×
NEW
929
                                        .collect(
×
NEW
930
                                            Collectors.toMap(
×
931
                                                Map.Entry::getKey,
NEW
932
                                                e -> ByteString.copyFromUtf8(e.getValue()))))
×
NEW
933
                                .setData(failure.getDetails()))))
×
NEW
934
        .build();
×
935
  }
936

937
  private static void requestCancelNexusOperation(
938
      RequestContext ctx,
939
      NexusOperationData data,
940
      RequestCancelNexusOperationCommandAttributes attr,
941
      long workflowTaskCompletedId) {
942
    data.cancelRequested = true;
1✔
943
    ctx.addEvent(
1✔
944
        HistoryEvent.newBuilder()
1✔
945
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED)
1✔
946
            .setNexusOperationCancelRequestedEventAttributes(
1✔
947
                NexusOperationCancelRequestedEventAttributes.newBuilder()
1✔
948
                    .setScheduledEventId(attr.getScheduledEventId())
1✔
949
                    .setWorkflowTaskCompletedEventId(workflowTaskCompletedId))
1✔
950
            .build());
1✔
951
  }
1✔
952

953
  private static void reportNexusOperationCancellation(
954
      RequestContext ctx, NexusOperationData data, Object request, long notUsed) {
955
    ctx.addEvent(
1✔
956
        HistoryEvent.newBuilder()
1✔
957
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED)
1✔
958
            .setNexusOperationCanceledEventAttributes(
1✔
959
                NexusOperationCanceledEventAttributes.newBuilder()
1✔
960
                    .setScheduledEventId(data.scheduledEventId)
1✔
961
                    .setRequestId(data.scheduledEvent.getRequestId()))
1✔
962
            .build());
1✔
963
  }
1✔
964

965
  private static void timeoutChildWorkflow(
966
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
967
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
968
    ChildWorkflowExecutionTimedOutEventAttributes a =
969
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
970
            .setNamespace(ie.getNamespace())
1✔
971
            .setStartedEventId(data.startedEventId)
1✔
972
            .setWorkflowExecution(data.execution)
1✔
973
            .setWorkflowType(ie.getWorkflowType())
1✔
974
            .setRetryState(retryState)
1✔
975
            .setInitiatedEventId(data.initiatedEventId)
1✔
976
            .build();
1✔
977
    HistoryEvent event =
978
        HistoryEvent.newBuilder()
1✔
979
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
980
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
981
            .build();
1✔
982
    ctx.addEvent(event);
1✔
983
  }
1✔
984

985
  private static void startChildWorkflowFailed(
986
      RequestContext ctx,
987
      ChildWorkflowData data,
988
      StartChildWorkflowExecutionFailedEventAttributes a,
989
      long notUsed) {
990
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
991
        a.toBuilder()
1✔
992
            .setInitiatedEventId(data.initiatedEventId)
1✔
993
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
994
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
995
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
996
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
997
    }
998
    HistoryEvent event =
999
        HistoryEvent.newBuilder()
1✔
1000
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1001
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1002
            .build();
1✔
1003
    ctx.addEvent(event);
1✔
1004
  }
1✔
1005

1006
  private static void childWorkflowStarted(
1007
      RequestContext ctx,
1008
      ChildWorkflowData data,
1009
      ChildWorkflowExecutionStartedEventAttributes a,
1010
      long notUsed) {
1011
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
1012
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
1013
    HistoryEvent event =
1014
        HistoryEvent.newBuilder()
1✔
1015
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
1016
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
1017
            .build();
1✔
1018
    long startedEventId = ctx.addEvent(event);
1✔
1019
    ctx.onCommit(
1✔
1020
        (historySize) -> {
1021
          data.startedEventId = startedEventId;
1✔
1022
          data.execution = updatedAttr.getWorkflowExecution();
1✔
1023
        });
1✔
1024
  }
1✔
1025

1026
  private static void childWorkflowCompleted(
1027
      RequestContext ctx,
1028
      ChildWorkflowData data,
1029
      ChildWorkflowExecutionCompletedEventAttributes a,
1030
      long notUsed) {
1031
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
1032
        a.toBuilder()
1✔
1033
            .setInitiatedEventId(data.initiatedEventId)
1✔
1034
            .setStartedEventId(data.startedEventId)
1✔
1035
            .build();
1✔
1036
    HistoryEvent event =
1037
        HistoryEvent.newBuilder()
1✔
1038
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
1039
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
1040
            .build();
1✔
1041
    ctx.addEvent(event);
1✔
1042
  }
1✔
1043

1044
  private static void childWorkflowFailed(
1045
      RequestContext ctx,
1046
      ChildWorkflowData data,
1047
      ChildWorkflowExecutionFailedEventAttributes a,
1048
      long notUsed) {
1049
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
1050
        a.toBuilder()
1✔
1051
            .setInitiatedEventId(data.initiatedEventId)
1✔
1052
            .setStartedEventId(data.startedEventId)
1✔
1053
            .setWorkflowExecution(data.execution)
1✔
1054
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
1055
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
1056
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
1057
    }
1058
    HistoryEvent event =
1059
        HistoryEvent.newBuilder()
1✔
1060
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1061
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1062
            .build();
1✔
1063
    ctx.addEvent(event);
1✔
1064
  }
1✔
1065

1066
  private static void childWorkflowCanceled(
1067
      RequestContext ctx,
1068
      ChildWorkflowData data,
1069
      ChildWorkflowExecutionCanceledEventAttributes a,
1070
      long notUsed) {
1071
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
1072
        a.toBuilder()
1✔
1073
            .setInitiatedEventId(data.initiatedEventId)
1✔
1074
            .setStartedEventId(data.startedEventId)
1✔
1075
            .build();
1✔
1076
    HistoryEvent event =
1077
        HistoryEvent.newBuilder()
1✔
1078
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
1079
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
1080
            .build();
1✔
1081
    ctx.addEvent(event);
1✔
1082
  }
1✔
1083

1084
  private static void initiateChildWorkflow(
1085
      RequestContext ctx,
1086
      ChildWorkflowData data,
1087
      StartChildWorkflowExecutionCommandAttributes d,
1088
      long workflowTaskCompletedEventId) {
1089
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
1090
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1091
            .setControl(d.getControl())
1✔
1092
            .setInput(d.getInput())
1✔
1093
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1094
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1095
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1096
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1097
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1098
            .setTaskQueue(d.getTaskQueue())
1✔
1099
            .setWorkflowId(d.getWorkflowId())
1✔
1100
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1101
            .setWorkflowType(d.getWorkflowType())
1✔
1102
            .setCronSchedule(d.getCronSchedule())
1✔
1103
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
1104
    if (d.hasHeader()) {
1✔
1105
      a.setHeader(d.getHeader());
1✔
1106
    }
1107
    if (d.hasMemo()) {
1✔
1108
      a.setMemo(d.getMemo());
1✔
1109
    }
1110
    if (d.hasRetryPolicy()) {
1✔
1111
      a.setRetryPolicy(d.getRetryPolicy());
1✔
1112
    }
1113
    HistoryEvent event =
1114
        HistoryEvent.newBuilder()
1✔
1115
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
1116
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
1117
            .build();
1✔
1118
    long initiatedEventId = ctx.addEvent(event);
1✔
1119
    ctx.onCommit(
1✔
1120
        (historySize) -> {
1121
          data.initiatedEventId = initiatedEventId;
1✔
1122
          data.initiatedEvent = a.build();
1✔
1123
          StartWorkflowExecutionRequest.Builder startChild =
1124
              StartWorkflowExecutionRequest.newBuilder()
1✔
1125
                  .setRequestId(UUID.randomUUID().toString())
1✔
1126
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1127
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1128
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1129
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1130
                  .setTaskQueue(d.getTaskQueue())
1✔
1131
                  .setWorkflowId(d.getWorkflowId())
1✔
1132
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1133
                  .setWorkflowType(d.getWorkflowType())
1✔
1134
                  .setCronSchedule(d.getCronSchedule());
1✔
1135
          if (d.hasHeader()) {
1✔
1136
            startChild.setHeader(d.getHeader());
1✔
1137
          }
1138
          if (d.hasSearchAttributes()) {
1✔
1139
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
1140
          }
1141
          if (d.hasMemo()) {
1✔
1142
            startChild.setMemo(d.getMemo());
1✔
1143
          }
1144
          if (d.hasRetryPolicy()) {
1✔
1145
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
1146
          }
1147
          if (d.hasInput()) {
1✔
1148
            startChild.setInput(d.getInput());
1✔
1149
          }
1150
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
1151
        });
1✔
1152
  }
1✔
1153

1154
  private static void addStartChildTask(
1155
      RequestContext ctx,
1156
      ChildWorkflowData data,
1157
      long initiatedEventId,
1158
      StartWorkflowExecutionRequest startChild) {
1159
    ForkJoinPool.commonPool()
1✔
1160
        .execute(
1✔
1161
            () -> {
1162
              try {
1163
                data.service.startWorkflowExecutionImpl(
1✔
1164
                    startChild,
1165
                    java.time.Duration.ZERO,
1166
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
1167
                    OptionalLong.of(data.initiatedEventId),
1✔
1168
                    null);
1169
              } catch (StatusRuntimeException e) {
1✔
1170
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
1171
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
1172
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1173
                          .setInitiatedEventId(initiatedEventId)
1✔
1174
                          .setCause(
1✔
1175
                              StartChildWorkflowExecutionFailedCause
1176
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
1177
                          .build();
1✔
1178
                  try {
1179
                    ctx.getWorkflowMutableState()
1✔
1180
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
1181
                  } catch (Throwable ee) {
×
1182
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
1183
                  }
1✔
1184
                } else {
1✔
1185
                  log.error("Unexpected failure starting a child workflow", e);
×
1186
                }
1187
              } catch (Exception e) {
×
1188
                log.error("Unexpected failure starting a child workflow", e);
×
1189
              }
1✔
1190
            });
1✔
1191
  }
1✔
1192

1193
  private static void startWorkflow(
1194
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
1195
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
1196
      throw Status.INVALID_ARGUMENT
×
1197
          .withDescription("negative workflowExecution timeout")
×
1198
          .asRuntimeException();
×
1199
    }
1200
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
1201
      throw Status.INVALID_ARGUMENT
×
1202
          .withDescription("negative workflowRun timeout")
×
1203
          .asRuntimeException();
×
1204
    }
1205
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
1206
      throw Status.INVALID_ARGUMENT
×
1207
          .withDescription("negative workflowTaskTimeoutSeconds")
×
1208
          .asRuntimeException();
×
1209
    }
1210
    if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
1✔
1211
      throw Status.INVALID_ARGUMENT
×
1212
          .withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
×
1213
          .asRuntimeException();
×
1214
    }
1215

1216
    WorkflowExecutionStartedEventAttributes.Builder a =
1217
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
1218
            .setWorkflowType(request.getWorkflowType())
1✔
1219
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
1220
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
1221
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
1222
            .setIdentity(request.getIdentity())
1✔
1223
            .setInput(request.getInput())
1✔
1224
            .setTaskQueue(request.getTaskQueue())
1✔
1225
            .setAttempt(1);
1✔
1226
    if (request.hasRetryPolicy()) {
1✔
1227
      a.setRetryPolicy(request.getRetryPolicy());
1✔
1228
    }
1229
    data.retryState.ifPresent(
1✔
1230
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
1231
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
1232
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
1233
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
1234
    if (data.lastCompletionResult != null) {
1✔
1235
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
1236
    }
1237
    if (request.hasWorkflowStartDelay()) {
1✔
1238
      a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1✔
1239
    }
1240
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
1241
    if (request.hasMemo()) {
1✔
1242
      a.setMemo(request.getMemo());
1✔
1243
    }
1244
    if (request.hasSearchAttributes()) {
1✔
1245
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
1246
    }
1247
    if (request.hasHeader()) {
1✔
1248
      a.setHeader(request.getHeader());
1✔
1249
    }
1250
    String cronSchedule = request.getCronSchedule();
1✔
1251
    if (!cronSchedule.trim().isEmpty()) {
1✔
1252
      try {
1253
        CronUtils.parseCron(cronSchedule);
1✔
1254
        a.setCronSchedule(cronSchedule);
1✔
1255
      } catch (Exception e) {
×
1256
        throw Status.INVALID_ARGUMENT
×
1257
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
1258
            .withCause(e)
×
1259
            .asRuntimeException();
×
1260
      }
1✔
1261
    }
1262
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
1263
    if (parent.isPresent()) {
1✔
1264
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
1265
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
1266
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
1267
    }
1268
    HistoryEvent event =
1269
        HistoryEvent.newBuilder()
1✔
1270
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
1271
            .setWorkflowExecutionStartedEventAttributes(a)
1✔
1272
            .build();
1✔
1273
    ctx.addEvent(event);
1✔
1274
  }
1✔
1275

1276
  private static void completeWorkflow(
1277
      RequestContext ctx,
1278
      WorkflowData data,
1279
      CompleteWorkflowExecutionCommandAttributes d,
1280
      long workflowTaskCompletedEventId) {
1281
    WorkflowExecutionCompletedEventAttributes.Builder a =
1282
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
1283
            .setResult(d.getResult())
1✔
1284
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1285
    HistoryEvent event =
1286
        HistoryEvent.newBuilder()
1✔
1287
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
1288
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
1289
            .build();
1✔
1290
    ctx.addEvent(event);
1✔
1291
  }
1✔
1292

1293
  private static void continueAsNewWorkflow(
1294
      RequestContext ctx,
1295
      WorkflowData data,
1296
      ContinueAsNewWorkflowExecutionCommandAttributes d,
1297
      long workflowTaskCompletedEventId) {
1298
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
1299
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
1300
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
1301
    a.setInput(d.getInput());
1✔
1302
    if (d.hasHeader()) {
1✔
1303
      a.setHeader(d.getHeader());
1✔
1304
    }
1305
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
1306
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
1307
    } else {
1308
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
1309
    }
1310
    if (d.hasTaskQueue() && !d.getTaskQueue().getName().isEmpty()) {
1✔
1311
      a.setTaskQueue(d.getTaskQueue());
1✔
1312
    } else {
1313
      a.setTaskQueue(sr.getTaskQueue());
1✔
1314
    }
1315
    if (d.hasWorkflowType() && !d.getWorkflowType().getName().isEmpty()) {
1✔
1316
      a.setWorkflowType(d.getWorkflowType());
1✔
1317
    } else {
1318
      a.setWorkflowType(sr.getWorkflowType());
1✔
1319
    }
1320
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
1321
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
1322
    } else {
1323
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
1324
    }
1325
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1326
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
1327
    if (d.hasLastCompletionResult()) {
1✔
1328
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
1329
    }
1330
    if (d.hasFailure()) {
1✔
1331
      a.setFailure(d.getFailure());
1✔
1332
    }
1333
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
1334
    HistoryEvent event =
1335
        HistoryEvent.newBuilder()
1✔
1336
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
1337
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
1338
            .build();
1✔
1339
    ctx.addEvent(event);
1✔
1340
  }
1✔
1341

1342
  private static void failWorkflow(
1343
      RequestContext ctx,
1344
      WorkflowData data,
1345
      FailWorkflowExecutionCommandAttributes d,
1346
      long workflowTaskCompletedEventId) {
1347
    WorkflowExecutionFailedEventAttributes.Builder a =
1348
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1349
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1350
    if (d.hasFailure()) {
1✔
1351
      a.setFailure(d.getFailure());
1✔
1352
    }
1353
    HistoryEvent event =
1354
        HistoryEvent.newBuilder()
1✔
1355
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
1356
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
1357
            .build();
1✔
1358
    ctx.addEvent(event);
1✔
1359
  }
1✔
1360

1361
  private static void timeoutWorkflow(
1362
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
1363
    WorkflowExecutionTimedOutEventAttributes.Builder a =
1364
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
1365
    HistoryEvent event =
1366
        HistoryEvent.newBuilder()
1✔
1367
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
1368
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
1369
            .build();
1✔
1370
    ctx.addEvent(event);
1✔
1371
  }
1✔
1372

1373
  private static void cancelWorkflow(
1374
      RequestContext ctx,
1375
      WorkflowData data,
1376
      CancelWorkflowExecutionCommandAttributes d,
1377
      long workflowTaskCompletedEventId) {
1378
    WorkflowExecutionCanceledEventAttributes.Builder a =
1379
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1380
            .setDetails(d.getDetails())
1✔
1381
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1382
    HistoryEvent event =
1383
        HistoryEvent.newBuilder()
1✔
1384
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1385
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1386
            .build();
1✔
1387
    ctx.addEvent(event);
1✔
1388
  }
1✔
1389

1390
  private static void terminateWorkflow(
1391
      RequestContext ctx,
1392
      WorkflowData data,
1393
      TerminateWorkflowExecutionRequest d,
1394
      long workflowTaskCompletedEventId) {
1395
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1396
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1397
            .setDetails(d.getDetails())
1✔
1398
            .setIdentity(d.getIdentity())
1✔
1399
            .setReason(d.getReason());
1✔
1400
    HistoryEvent event =
1401
        HistoryEvent.newBuilder()
1✔
1402
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1403
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1404
            .build();
1✔
1405
    ctx.addEvent(event);
1✔
1406
  }
1✔
1407

1408
  private static void requestWorkflowCancellation(
1409
      RequestContext ctx,
1410
      WorkflowData data,
1411
      RequestCancelWorkflowExecutionRequest cancelRequest,
1412
      long notUsed) {
1413
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1414
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1415
            .setIdentity(cancelRequest.getIdentity());
1✔
1416
    HistoryEvent cancelRequested =
1417
        HistoryEvent.newBuilder()
1✔
1418
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1419
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1420
            .build();
1✔
1421
    ctx.addEvent(cancelRequested);
1✔
1422
  }
1✔
1423

1424
  private static void scheduleActivityTask(
1425
      RequestContext ctx,
1426
      ActivityTaskData data,
1427
      ScheduleActivityTaskCommandAttributes d,
1428
      long workflowTaskCompletedEventId) {
1429
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1430
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1431
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1432
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1433

1434
    ActivityTaskScheduledEventAttributes.Builder a =
1435
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1436
            .setInput(d.getInput())
1✔
1437
            .setActivityId(d.getActivityId())
1✔
1438
            .setActivityType(d.getActivityType())
1✔
1439
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1440
            .setRetryPolicy(retryPolicy)
1✔
1441
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1442
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1443
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1444
            .setTaskQueue(d.getTaskQueue())
1✔
1445
            .setHeader(d.getHeader())
1✔
1446
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1447

1448
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1449
    data.scheduledEvent = a.build();
1✔
1450
    HistoryEvent event =
1451
        HistoryEvent.newBuilder()
1✔
1452
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1453
            .setActivityTaskScheduledEventAttributes(a)
1✔
1454
            .build();
1✔
1455
    long scheduledEventId = ctx.addEvent(event);
1✔
1456

1457
    PollActivityTaskQueueResponse.Builder taskResponse =
1458
        PollActivityTaskQueueResponse.newBuilder()
1✔
1459
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1460
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1461
            .setActivityType(d.getActivityType())
1✔
1462
            .setWorkflowExecution(ctx.getExecution())
1✔
1463
            .setActivityId(d.getActivityId())
1✔
1464
            .setInput(d.getInput())
1✔
1465
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1466
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1467
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1468
            .setScheduledTime(ctx.currentTime())
1✔
1469
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1470
            .setHeader(d.getHeader())
1✔
1471
            .setAttempt(1);
1✔
1472

1473
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1474
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1475
    ctx.addActivityTask(activityTask);
1✔
1476
    ctx.onCommit(
1✔
1477
        (historySize) -> {
1478
          data.scheduledEventId = scheduledEventId;
1✔
1479
          data.activityTask = activityTask;
1✔
1480
          data.retryState = retryState;
1✔
1481
        });
1✔
1482
  }
1✔
1483

1484
  private static void requestActivityCancellation(
1485
      RequestContext ctx,
1486
      ActivityTaskData data,
1487
      RequestCancelActivityTaskCommandAttributes d,
1488
      long workflowTaskCompletedEventId) {
1489
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1490
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1491
            .setScheduledEventId(d.getScheduledEventId())
1✔
1492
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1493
    HistoryEvent event =
1494
        HistoryEvent.newBuilder()
1✔
1495
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1496
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1497
            .build();
1✔
1498
    ctx.addEvent(event);
1✔
1499
  }
1✔
1500

1501
  private static void scheduleWorkflowTask(
1502
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1503
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1504
    long scheduledEventId;
1505
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1506
    WorkflowTaskScheduledEventAttributes a =
1507
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1508
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1509
            .setTaskQueue(taskQueue)
1✔
1510
            .setAttempt(++data.attempt)
1✔
1511
            .build();
1✔
1512
    HistoryEvent event =
1513
        HistoryEvent.newBuilder()
1✔
1514
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1515
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1516
            .build();
1✔
1517
    scheduledEventId = ctx.addEvent(event);
1✔
1518
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1519
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1520
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1521
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1522
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1523
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1524
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1525
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1526
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1527
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1528
    ctx.onCommit(
1✔
1529
        (historySize) -> {
1530
          data.scheduledEventId = scheduledEventId;
1✔
1531
          data.workflowTask = workflowTaskResponse;
1✔
1532
          // Move buffered update request to new workflow task
1533
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1534
          data.updateRequestBuffer.clear();
1✔
1535
        });
1✔
1536
  }
1✔
1537

1538
  private static void convertQueryWorkflowTaskToReal(
1539
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1540
    StartWorkflowExecutionRequest request = data.startRequest;
×
1541
    WorkflowTaskScheduledEventAttributes a =
1542
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1543
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1544
            .setTaskQueue(request.getTaskQueue())
×
1545
            .setAttempt(data.attempt)
×
1546
            .build();
×
1547
    HistoryEvent event =
1548
        HistoryEvent.newBuilder()
×
1549
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1550
            .setWorkflowTaskScheduledEventAttributes(a)
×
1551
            .build();
×
1552
    long scheduledEventId = ctx.addEvent(event);
×
1553
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1554
  }
×
1555

1556
  private static void scheduleQueryWorkflowTask(
1557
      RequestContext ctx,
1558
      WorkflowTaskData data,
1559
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1560
      long notUsed) {
1561
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1562
    StartWorkflowExecutionRequest request = data.startRequest;
×
1563
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1564
        PollWorkflowTaskQueueResponse.newBuilder();
×
1565
    StickyExecutionAttributes stickyAttributes =
×
1566
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1567
    String taskQueue =
1568
        stickyAttributes == null
×
1569
            ? request.getTaskQueue().getName()
×
1570
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1571
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1572
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1573
    workflowTaskResponse.setAttempt(++data.attempt);
×
1574
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1575
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1576
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1577
    ctx.onCommit(
×
1578
        (historySize) -> {
1579
          if (data.lastSuccessfulStartedEventId > 0) {
×
1580
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1581
          }
1582
          data.scheduledEventId = NO_EVENT_ID;
×
1583
          data.workflowTask = workflowTaskResponse;
×
1584
          if (query != null) {
×
1585
            data.consistentQueryRequests.put(query.getKey(), query);
×
1586
          }
1587
        });
×
1588
  }
×
1589

1590
  private static void queryWhileScheduled(
1591
      RequestContext ctx,
1592
      WorkflowTaskData data,
1593
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1594
      long notUsed) {
1595
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1596
  }
1✔
1597

1598
  private static void bufferQuery(
1599
      RequestContext ctx,
1600
      WorkflowTaskData data,
1601
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1602
      long notUsed) {
1603
    data.queryBuffer.put(query.getKey(), query);
1✔
1604
  }
1✔
1605

1606
  private static void bufferUpdate(
1607
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1608
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1609
      throw Status.INTERNAL
×
1610
          .withDescription("Update ID already exists: " + update.getId())
×
1611
          .asRuntimeException();
×
1612
    }
1613
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1614
  }
1✔
1615

1616
  private static void addUpdate(
1617
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1618
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1619
      throw Status.INTERNAL
×
1620
          .withDescription("Update ID already exists: " + update.getId())
×
1621
          .asRuntimeException();
×
1622
    }
1623
    data.updateRequest.put(update.getId(), update);
1✔
1624
  }
1✔
1625

1626
  private static void startWorkflowTask(
1627
      RequestContext ctx,
1628
      WorkflowTaskData data,
1629
      PollWorkflowTaskQueueRequest request,
1630
      long notUsed) {
1631
    WorkflowTaskStartedEventAttributes a =
1632
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1633
            .setIdentity(request.getIdentity())
1✔
1634
            .setScheduledEventId(data.scheduledEventId)
1✔
1635
            .build();
1✔
1636
    HistoryEvent event =
1637
        HistoryEvent.newBuilder()
1✔
1638
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1639
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1640
            .build();
1✔
1641
    long startedEventId = ctx.addEvent(event);
1✔
1642
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1643
  }
1✔
1644

1645
  private static void startQueryOnlyWorkflowTask(
1646
      RequestContext ctx,
1647
      WorkflowTaskData data,
1648
      PollWorkflowTaskQueueRequest request,
1649
      long notUsed) {
1650
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1651
  }
×
1652

1653
  private static void startWorkflowTaskImpl(
1654
      RequestContext ctx,
1655
      WorkflowTaskData data,
1656
      PollWorkflowTaskQueueRequest request,
1657
      long startedEventId,
1658
      boolean queryOnly) {
1659
    ctx.onCommit(
1✔
1660
        (historySize) -> {
1661
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1662
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1663
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1664
          task.setTaskToken(taskToken.toBytes());
1✔
1665
          GetWorkflowExecutionHistoryRequest getRequest =
1666
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1667
                  .setNamespace(request.getNamespace())
1✔
1668
                  .setExecution(ctx.getExecution())
1✔
1669
                  .build();
1✔
1670
          List<HistoryEvent> events;
1671
          events =
1✔
1672
              data.store
1673
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1674
                  .getHistory()
1✔
1675
                  .getEventsList();
1✔
1676
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1677
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1678
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1679
          }
1680
          if (queryOnly && !data.workflowCompleted) {
1✔
1681
            events = new ArrayList<>(events); // convert list to mutable
×
1682
            // Add "fake" workflow task scheduled and started if workflow is not closed
1683
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1684
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1685
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1686
                    .setTaskQueue(request.getTaskQueue())
×
1687
                    .setAttempt(data.attempt)
×
1688
                    .build();
×
1689
            HistoryEvent scheduledEvent =
1690
                HistoryEvent.newBuilder()
×
1691
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1692
                    .setEventId(lastEventId + 1)
×
1693
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1694
                    .build();
×
1695
            events.add(scheduledEvent);
×
1696
            WorkflowTaskStartedEventAttributes startedAttributes =
1697
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1698
                    .setIdentity(request.getIdentity())
×
1699
                    .setScheduledEventId(lastEventId + 1)
×
1700
                    .build();
×
1701
            HistoryEvent startedEvent =
1702
                HistoryEvent.newBuilder()
×
1703
                    .setEventId(lastEventId + 1)
×
1704
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1705
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1706
                    .build();
×
1707
            events.add(startedEvent);
×
1708
            task.setStartedEventId(lastEventId + 2);
×
1709
          }
1710
          // get it from previous started event id.
1711
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1712
          // Transfer the queries
1713
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1714
              data.consistentQueryRequests;
1715
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1716
              queries.entrySet()) {
1✔
1717
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1718
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1719
          }
1✔
1720
          // Transfer the messages
1721
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1722
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1723
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1724
            Message updateMessage =
1725
                Message.newBuilder()
1✔
1726
                    .setId(update.getKey() + "/request")
1✔
1727
                    .setProtocolInstanceId(update.getKey())
1✔
1728
                    .setEventId(data.scheduledEventId)
1✔
1729
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1730
                    .build();
1✔
1731
            task.addMessages(updateMessage);
1✔
1732
          }
1✔
1733
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1734
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1735
          }
1736
          if (!queryOnly) {
1✔
1737
            data.startedEventId = startedEventId;
1✔
1738
          }
1739
        });
1✔
1740
  }
1✔
1741

1742
  private static void startActivityTask(
1743
      RequestContext ctx,
1744
      ActivityTaskData data,
1745
      PollActivityTaskQueueRequest request,
1746
      long notUsed) {
1747
    ActivityTaskStartedEventAttributes.Builder a =
1748
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1749
            .setIdentity(request.getIdentity())
1✔
1750
            .setScheduledEventId(data.scheduledEventId);
1✔
1751
    a.setAttempt(data.getAttempt());
1✔
1752
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1753
    // history. But in the case of retry it happens only after an activity completion.
1754
    Timestamp timestamp = data.store.currentTime();
1✔
1755
    HistoryEvent event =
1756
        HistoryEvent.newBuilder()
1✔
1757
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1758
            .setEventTime(timestamp)
1✔
1759
            .setActivityTaskStartedEventAttributes(a)
1✔
1760
            .build();
1✔
1761
    long startedEventId;
1762
    startedEventId = NO_EVENT_ID;
1✔
1763
    ctx.onCommit(
1✔
1764
        (historySize) -> {
1765
          data.startedEventId = startedEventId;
1✔
1766
          data.startedEvent = event;
1✔
1767
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1768
          task.setTaskToken(
1✔
1769
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1770
                  .toBytes());
1✔
1771
          task.setStartedTime(timestamp);
1✔
1772
        });
1✔
1773
  }
1✔
1774

1775
  private static void completeWorkflowTask(
1776
      RequestContext ctx,
1777
      WorkflowTaskData data,
1778
      RespondWorkflowTaskCompletedRequest request,
1779
      long notUsed) {
1780
    WorkflowTaskCompletedEventAttributes.Builder a =
1781
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1782
            .setIdentity(request.getIdentity())
1✔
1783
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1784
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1785
            .setSdkMetadata(request.getSdkMetadata())
1✔
1786
            .setScheduledEventId(data.scheduledEventId);
1✔
1787
    HistoryEvent event =
1788
        HistoryEvent.newBuilder()
1✔
1789
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1790
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1791
            .build();
1✔
1792
    ctx.addEvent(event);
1✔
1793
    ctx.onCommit(
1✔
1794
        (historySize) -> {
1795
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1796
          data.clear();
1✔
1797
        });
1✔
1798
  }
1✔
1799

1800
  private static void completeQuery(
1801
      RequestContext ctx,
1802
      WorkflowTaskData data,
1803
      RespondWorkflowTaskCompletedRequest request,
1804
      long notUsed) {
1805
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1806
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1807
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1808
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1809
      if (query != null) {
×
1810
        WorkflowQueryResult value = resultEntry.getValue();
×
1811
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1812
        switch (value.getResultType()) {
×
1813
          case QUERY_RESULT_TYPE_ANSWERED:
1814
            QueryWorkflowResponse response =
1815
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1816
            result.complete(response);
×
1817
            break;
×
1818
          case QUERY_RESULT_TYPE_FAILED:
1819
            result.completeExceptionally(
×
1820
                StatusUtils.newException(
×
1821
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1822
                    QueryFailedFailure.getDefaultInstance(),
×
1823
                    QueryFailedFailure.getDescriptor()));
×
1824
            break;
×
1825
          default:
1826
            throw Status.INVALID_ARGUMENT
×
1827
                .withDescription("Invalid query result type: " + value.getResultType())
×
1828
                .asRuntimeException();
×
1829
        }
1830
      }
1831
    }
×
1832
    ctx.onCommit(
×
1833
        (historySize) -> {
1834
          data.clear();
×
1835
          ctx.unlockTimer("completeQuery");
×
1836
        });
×
1837
  }
×
1838

1839
  private static void failQueryWorkflowTask(
1840
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1841
    data.consistentQueryRequests
×
1842
        .entrySet()
×
1843
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1844
    if (!data.consistentQueryRequests.isEmpty()) {
×
1845
      ctx.setNeedWorkflowTask(true);
×
1846
    }
1847
    ctx.unlockTimer("failQueryWorkflowTask");
×
1848
  }
×
1849

1850
  private static void failWorkflowTask(
1851
      RequestContext ctx,
1852
      WorkflowTaskData data,
1853
      RespondWorkflowTaskFailedRequest request,
1854
      long notUsed) {
1855
    WorkflowTaskFailedEventAttributes.Builder a =
1856
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1857
            .setIdentity(request.getIdentity())
1✔
1858
            .setStartedEventId(data.startedEventId)
1✔
1859
            .setScheduledEventId(data.scheduledEventId)
1✔
1860
            .setCause(request.getCause());
1✔
1861
    if (request.hasFailure()) {
1✔
1862
      a.setFailure(request.getFailure());
1✔
1863
    }
1864
    HistoryEvent event =
1865
        HistoryEvent.newBuilder()
1✔
1866
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1867
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1868
            .build();
1✔
1869
    ctx.addEvent(event);
1✔
1870
    ctx.setNeedWorkflowTask(true);
1✔
1871
  }
1✔
1872

1873
  private static void timeoutWorkflowTask(
1874
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1875
    WorkflowTaskTimedOutEventAttributes.Builder a =
1876
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1877
            .setStartedEventId(data.startedEventId)
1✔
1878
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1879
            .setScheduledEventId(data.scheduledEventId);
1✔
1880
    HistoryEvent event =
1881
        HistoryEvent.newBuilder()
1✔
1882
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1883
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1884
            .build();
1✔
1885
    ctx.addEvent(event);
1✔
1886
    ctx.setNeedWorkflowTask(true);
1✔
1887
  }
1✔
1888

1889
  private static void needsWorkflowTask(
1890
      RequestContext requestContext,
1891
      WorkflowTaskData workflowTaskData,
1892
      Object notUsedRequest,
1893
      long notUsed) {
1894
    requestContext.setNeedWorkflowTask(true);
1✔
1895
  }
1✔
1896

1897
  private static void completeActivityTask(
1898
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1899
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1900
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1901
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1902
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1903
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1904
    } else {
1905
      throw new IllegalArgumentException("Unknown request: " + request);
×
1906
    }
1907
  }
1✔
1908

1909
  private static void completeActivityTaskByTaskToken(
1910
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1911
    ActivityTaskCompletedEventAttributes.Builder a =
1912
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1913
            .setIdentity(request.getIdentity())
1✔
1914
            .setScheduledEventId(data.scheduledEventId)
1✔
1915
            .setResult(request.getResult())
1✔
1916
            .setStartedEventId(data.startedEventId);
1✔
1917
    HistoryEvent event =
1918
        HistoryEvent.newBuilder()
1✔
1919
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1920
            .setActivityTaskCompletedEventAttributes(a)
1✔
1921
            .build();
1✔
1922
    ctx.addEvent(event);
1✔
1923
  }
1✔
1924

1925
  private static void completeActivityTaskById(
1926
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1927
    ActivityTaskCompletedEventAttributes.Builder a =
1928
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1929
            .setIdentity(request.getIdentity())
×
1930
            .setScheduledEventId(data.scheduledEventId)
×
1931
            .setResult(request.getResult())
×
1932
            .setStartedEventId(data.startedEventId);
×
1933
    HistoryEvent event =
1934
        HistoryEvent.newBuilder()
×
1935
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1936
            .setActivityTaskCompletedEventAttributes(a)
×
1937
            .build();
×
1938
    ctx.addEvent(event);
×
1939
  }
×
1940

1941
  private static State failActivityTask(
1942
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1943
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1944
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1945
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1946
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1947
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1948
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1949
    } else {
1950
      throw new IllegalArgumentException("Unknown request: " + request);
×
1951
    }
1952
  }
1953

1954
  private static State failActivityTaskByRequestType(
1955
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1956
    if (!failure.hasApplicationFailureInfo()) {
1✔
1957
      throw new IllegalArgumentException(
×
1958
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1959
    }
1960
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1961
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1962
      return INITIATED;
1✔
1963
    }
1964
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1965
    ActivityTaskFailedEventAttributes.Builder attributes =
1966
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1967
            .setIdentity(identity)
1✔
1968
            .setScheduledEventId(data.scheduledEventId)
1✔
1969
            .setFailure(failure)
1✔
1970
            .setRetryState(retryState)
1✔
1971
            .setStartedEventId(data.startedEventId);
1✔
1972
    HistoryEvent event =
1973
        HistoryEvent.newBuilder()
1✔
1974
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1975
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1976
            .build();
1✔
1977
    ctx.addEvent(event);
1✔
1978
    return FAILED;
1✔
1979
  }
1980

1981
  private static State timeoutActivityTask(
1982
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1983
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1984

1985
    // chaining with the previous run failure if we are preparing the final failure
1986
    Failure failure =
1✔
1987
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1988

1989
    RetryState retryState;
1990
    switch (timeoutType) {
1✔
1991
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
1992
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
1993
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
1994
        // set a larger ScheduleToStart timeout.
1995
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
1996
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
1997
        break;
1✔
1998
      case TIMEOUT_TYPE_START_TO_CLOSE:
1999
      case TIMEOUT_TYPE_HEARTBEAT:
2000
        // not chaining with the previous run failure if we are preparing the failure to be stored
2001
        // for the next iteration
2002
        Optional<Failure> lastFailure =
1✔
2003
            Optional.of(
1✔
2004
                newTimeoutFailure(
1✔
2005
                    timeoutType,
2006
                    // we move heartbeatDetails to the new top level (this cause is used for
2007
                    // scheduleToClose only)
2008
                    Optional.empty(),
1✔
2009
                    // prune to don't have too deep nesting of failures
2010
                    Optional.empty()));
1✔
2011

2012
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
2013
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2014
          return INITIATED;
1✔
2015
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
2016
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
2017
          // attemptActivityRetry();
2018
          // start to close timeout would return as "max attempts reached".
2019

2020
          Preconditions.checkState(
1✔
2021
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
2022
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
2023
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
2024
              timeoutType);
2025

2026
          // heartbeat is preserved as the cause for the scheduleToClose timeout
2027
          // But we effectively omit startToClose timeout with scheduleToClose timeout
2028
          Optional<Failure> cause =
2029
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
2030

2031
          failure =
1✔
2032
              newTimeoutFailure(
1✔
2033
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
2034
                  Optional.ofNullable(data.heartbeatDetails),
1✔
2035
                  cause);
2036
        }
1✔
2037
        break;
2038
      default:
2039
        throw new IllegalStateException(
×
2040
            "Not implemented behavior for timeout type: " + timeoutType);
2041
    }
2042

2043
    ActivityTaskTimedOutEventAttributes.Builder a =
2044
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
2045
            .setScheduledEventId(data.scheduledEventId)
1✔
2046
            .setRetryState(retryState)
1✔
2047
            .setStartedEventId(data.startedEventId)
1✔
2048
            .setFailure(failure);
1✔
2049
    HistoryEvent event =
2050
        HistoryEvent.newBuilder()
1✔
2051
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
2052
            .setActivityTaskTimedOutEventAttributes(a)
1✔
2053
            .build();
1✔
2054
    ctx.addEvent(event);
1✔
2055
    return TIMED_OUT;
1✔
2056
  }
2057

2058
  private static Failure newTimeoutFailure(
2059
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
2060
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
2061
    if (lastHeartbeatDetails.isPresent()) {
1✔
2062
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
2063
    }
2064
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
2065
    if (cause.isPresent()) {
1✔
2066
      result.setCause(cause.get());
1✔
2067
    }
2068
    return result.build();
1✔
2069
  }
2070

2071
  private static RetryState attemptActivityRetry(
2072
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
2073
    if (data.retryState == null) {
1✔
2074
      throw new IllegalStateException("RetryPolicy is always present");
×
2075
    }
2076
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
2077
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
2078

2079
    if (info.isPresent()) {
1✔
2080
      if (info.get().getNonRetryable()) {
1✔
2081
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
2082
      }
2083
      if (info.get().hasNextRetryDelay()) {
1✔
2084
        nextRetryDelay =
1✔
2085
            Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
1✔
2086
      }
2087
    }
2088

2089
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
2090
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
2091
        data.retryState.getBackoffIntervalInSeconds(
1✔
2092
            info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay);
1✔
2093
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2094
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
2095
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
2096
      if (data.heartbeatDetails != null) {
1✔
2097
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
2098
      }
2099
      ctx.onCommit(
1✔
2100
          (historySize) -> {
2101
            data.retryState = nextAttempt;
1✔
2102
            task.setAttempt(nextAttempt.getAttempt());
1✔
2103
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
2104
          });
1✔
2105
    } else {
1✔
2106
      data.nextBackoffInterval = Durations.ZERO;
1✔
2107
    }
2108
    return backoffInterval.getRetryState();
1✔
2109
  }
2110

2111
  private static void reportActivityTaskCancellation(
2112
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
2113
    Payloads details = null;
1✔
2114
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
2115
      {
2116
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
2117

2118
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
2119
      }
1✔
2120
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
2121
      {
2122
        RespondActivityTaskCanceledByIdRequest cr =
×
2123
            (RespondActivityTaskCanceledByIdRequest) request;
2124
        details = cr.hasDetails() ? cr.getDetails() : null;
×
2125
      }
×
2126
    } else if (request != null) {
1✔
2127
      throw Status.INTERNAL
×
2128
          .withDescription("Unexpected request type: " + request)
×
2129
          .asRuntimeException();
×
2130
    }
2131
    ActivityTaskCanceledEventAttributes.Builder a =
2132
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
2133
            .setScheduledEventId(data.scheduledEventId)
1✔
2134
            .setStartedEventId(data.startedEventId);
1✔
2135
    if (details != null) {
1✔
2136
      a.setDetails(details);
×
2137
    }
2138
    HistoryEvent event =
2139
        HistoryEvent.newBuilder()
1✔
2140
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
2141
            .setActivityTaskCanceledEventAttributes(a)
1✔
2142
            .build();
1✔
2143
    ctx.addEvent(event);
1✔
2144
  }
1✔
2145

2146
  private static void heartbeatActivityTask(
2147
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
2148
    data.heartbeatDetails = details;
1✔
2149
  }
1✔
2150

2151
  private static void acceptUpdate(
2152
      RequestContext ctx,
2153
      UpdateWorkflowExecutionData data,
2154
      Message msg,
2155
      long workflowTaskCompletedEventId) {
2156
    try {
2157
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
2158

2159
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
2160
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
2161
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
2162
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
2163
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
2164
              .setAcceptedRequest(data.initialRequest)
1✔
2165
              .build();
1✔
2166

2167
      HistoryEvent event =
2168
          HistoryEvent.newBuilder()
1✔
2169
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
2170
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
2171
              .build();
1✔
2172
      // If the workflow is finished we can't write more events
2173
      // to history so if the message was processed after the workflow
2174
      // was closed there is nothing we can do.
2175
      // The real server also has this same problem
2176
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2177
        ctx.addEvent(event);
1✔
2178
      }
2179
      ctx.onCommit(
1✔
2180
          (int historySize) -> {
2181
            data.accepted.complete(true);
1✔
2182
          });
1✔
2183
    } catch (InvalidProtocolBufferException e) {
×
2184
      throw new RuntimeException(e);
×
2185
    }
1✔
2186
  }
1✔
2187

2188
  private static void completeUpdate(
2189
      RequestContext ctx,
2190
      UpdateWorkflowExecutionData data,
2191
      Message msg,
2192
      long workflowTaskCompletedEventId) {
2193
    try {
2194
      Response response = msg.getBody().unpack(Response.class);
1✔
2195

2196
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
2197
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
2198
              .setMeta(response.getMeta())
1✔
2199
              .setOutcome(response.getOutcome())
1✔
2200
              .build();
1✔
2201

2202
      HistoryEvent event =
2203
          HistoryEvent.newBuilder()
1✔
2204
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
2205
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
2206
              .build();
1✔
2207
      // If the workflow is finished we can't write more events
2208
      // to history so if the message was processed after the workflow
2209
      // was closed there is nothing we can do.
2210
      // The real server also has this same problem
2211
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2212
        ctx.addEvent(event);
1✔
2213
      }
2214
      ctx.onCommit(
1✔
2215
          (int historySize) -> {
2216
            data.outcome.complete(response.getOutcome());
1✔
2217
          });
1✔
2218
    } catch (InvalidProtocolBufferException e) {
×
2219
      throw new RuntimeException(e);
×
2220
    }
1✔
2221
  }
1✔
2222

2223
  private static void startTimer(
2224
      RequestContext ctx,
2225
      TimerData data,
2226
      StartTimerCommandAttributes d,
2227
      long workflowTaskCompletedEventId) {
2228
    TimerStartedEventAttributes.Builder a =
2229
        TimerStartedEventAttributes.newBuilder()
1✔
2230
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2231
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
2232
            .setTimerId(d.getTimerId());
1✔
2233
    HistoryEvent event =
2234
        HistoryEvent.newBuilder()
1✔
2235
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
2236
            .setTimerStartedEventAttributes(a)
1✔
2237
            .build();
1✔
2238
    long startedEventId = ctx.addEvent(event);
1✔
2239
    ctx.onCommit(
1✔
2240
        (historySize) -> {
2241
          data.startedEvent = a.build();
1✔
2242
          data.startedEventId = startedEventId;
1✔
2243
        });
1✔
2244
  }
1✔
2245

2246
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
2247
    TimerFiredEventAttributes.Builder a =
2248
        TimerFiredEventAttributes.newBuilder()
1✔
2249
            .setTimerId(data.startedEvent.getTimerId())
1✔
2250
            .setStartedEventId(data.startedEventId);
1✔
2251
    HistoryEvent event =
2252
        HistoryEvent.newBuilder()
1✔
2253
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
2254
            .setTimerFiredEventAttributes(a)
1✔
2255
            .build();
1✔
2256
    ctx.addEvent(event);
1✔
2257
  }
1✔
2258

2259
  private static void cancelTimer(
2260
      RequestContext ctx,
2261
      TimerData data,
2262
      CancelTimerCommandAttributes d,
2263
      long workflowTaskCompletedEventId) {
2264
    TimerCanceledEventAttributes.Builder a =
2265
        TimerCanceledEventAttributes.newBuilder()
1✔
2266
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2267
            .setTimerId(d.getTimerId())
1✔
2268
            .setStartedEventId(data.startedEventId);
1✔
2269
    HistoryEvent event =
2270
        HistoryEvent.newBuilder()
1✔
2271
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
2272
            .setTimerCanceledEventAttributes(a)
1✔
2273
            .build();
1✔
2274
    ctx.addEvent(event);
1✔
2275
  }
1✔
2276

2277
  private static void initiateExternalSignal(
2278
      RequestContext ctx,
2279
      SignalExternalData data,
2280
      SignalExternalWorkflowExecutionCommandAttributes d,
2281
      long workflowTaskCompletedEventId) {
2282
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2283
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2284
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2285
            .setControl(d.getControl())
1✔
2286
            .setInput(d.getInput())
1✔
2287
            .setNamespace(d.getNamespace())
1✔
2288
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2289
            .setSignalName(d.getSignalName())
1✔
2290
            .setWorkflowExecution(d.getExecution());
1✔
2291

2292
    HistoryEvent event =
2293
        HistoryEvent.newBuilder()
1✔
2294
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2295
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2296
            .build();
1✔
2297
    long initiatedEventId = ctx.addEvent(event);
1✔
2298
    ctx.onCommit(
1✔
2299
        (historySize) -> {
2300
          data.initiatedEventId = initiatedEventId;
1✔
2301
          data.initiatedEvent = a.build();
1✔
2302
        });
1✔
2303
  }
1✔
2304

2305
  private static void failExternalSignal(
2306
      RequestContext ctx,
2307
      SignalExternalData data,
2308
      SignalExternalWorkflowExecutionFailedCause cause,
2309
      long notUsed) {
2310
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2311
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
2312
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
2313
            .setInitiatedEventId(data.initiatedEventId)
1✔
2314
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
2315
            .setControl(initiatedEvent.getControl())
1✔
2316
            .setCause(cause)
1✔
2317
            .setNamespace(initiatedEvent.getNamespace());
1✔
2318
    HistoryEvent event =
2319
        HistoryEvent.newBuilder()
1✔
2320
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
2321
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
2322
            .build();
1✔
2323
    ctx.addEvent(event);
1✔
2324
  }
1✔
2325

2326
  private static void completeExternalSignal(
2327
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
2328
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2329
    WorkflowExecution signaledExecution =
1✔
2330
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
2331
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
2332
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
2333
            .setInitiatedEventId(data.initiatedEventId)
1✔
2334
            .setWorkflowExecution(signaledExecution)
1✔
2335
            .setControl(initiatedEvent.getControl())
1✔
2336
            .setNamespace(initiatedEvent.getNamespace());
1✔
2337
    HistoryEvent event =
2338
        HistoryEvent.newBuilder()
1✔
2339
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
2340
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
2341
            .build();
1✔
2342
    ctx.addEvent(event);
1✔
2343
  }
1✔
2344

2345
  private static void initiateExternalCancellation(
2346
      RequestContext ctx,
2347
      CancelExternalData data,
2348
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
2349
      long workflowTaskCompletedEventId) {
2350
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2351
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2352
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2353
            .setControl(d.getControl())
1✔
2354
            .setNamespace(d.getNamespace())
1✔
2355
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2356
            .setWorkflowExecution(
1✔
2357
                WorkflowExecution.newBuilder()
1✔
2358
                    .setWorkflowId(d.getWorkflowId())
1✔
2359
                    .setRunId(d.getRunId())
1✔
2360
                    .build());
1✔
2361

2362
    HistoryEvent event =
2363
        HistoryEvent.newBuilder()
1✔
2364
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2365
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2366
            .build();
1✔
2367
    long initiatedEventId = ctx.addEvent(event);
1✔
2368
    ctx.onCommit(
1✔
2369
        (historySize) -> {
2370
          data.initiatedEventId = initiatedEventId;
1✔
2371
          data.initiatedEvent = a.build();
1✔
2372
        });
1✔
2373
  }
1✔
2374

2375
  private static void reportExternalCancellationRequested(
2376
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2377
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2378
        data.initiatedEvent;
2379
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2380
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2381
            .setInitiatedEventId(data.initiatedEventId)
1✔
2382
            .setWorkflowExecution(
1✔
2383
                WorkflowExecution.newBuilder()
1✔
2384
                    .setRunId(runId)
1✔
2385
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2386
                    .build())
1✔
2387
            .setNamespace(initiatedEvent.getNamespace());
1✔
2388
    HistoryEvent event =
2389
        HistoryEvent.newBuilder()
1✔
2390
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2391
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2392
            .build();
1✔
2393
    ctx.addEvent(event);
1✔
2394
  }
1✔
2395

2396
  private static void failExternalCancellation(
2397
      RequestContext ctx,
2398
      CancelExternalData data,
2399
      CancelExternalWorkflowExecutionFailedCause cause,
2400
      long notUsed) {
2401
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2402
        data.initiatedEvent;
2403
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2404
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2405
            .setInitiatedEventId(data.initiatedEventId)
×
2406
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2407
            .setControl(initiatedEvent.getControl())
×
2408
            .setCause(cause)
×
2409
            .setNamespace(initiatedEvent.getNamespace());
×
2410
    HistoryEvent event =
2411
        HistoryEvent.newBuilder()
×
2412
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2413
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2414
            .build();
×
2415
    ctx.addEvent(event);
×
2416
  }
×
2417

2418
  // Mimics the default activity retry policy of a standard Temporal server.
2419
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2420
    Duration initialInterval =
2421
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2422
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2423
            : originalPolicy.getInitialInterval();
1✔
2424

2425
    return RetryPolicy.newBuilder()
1✔
2426
        .setInitialInterval(initialInterval)
1✔
2427
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2428
        .setMaximumInterval(
1✔
2429
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2430
                ? Durations.fromMillis(
1✔
2431
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2432
                        * Durations.toMillis(initialInterval))
1✔
2433
                : originalPolicy.getMaximumInterval())
1✔
2434
        .setBackoffCoefficient(
1✔
2435
            originalPolicy.getBackoffCoefficient() == 0
1✔
2436
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2437
                : originalPolicy.getBackoffCoefficient())
1✔
2438
        .setMaximumAttempts(
1✔
2439
            originalPolicy.getMaximumAttempts() == 0
1✔
2440
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2441
                : originalPolicy.getMaximumAttempts())
1✔
2442
        .build();
1✔
2443
  }
2444

2445
  static RetryPolicy getDefaultNexusOperationRetryPolicy() {
2446
    return RetryPolicy.newBuilder()
1✔
2447
        .addAllNonRetryableErrorTypes(
1✔
2448
            Arrays.asList(
1✔
2449
                "BAD_REQUEST", "INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
2450
        .setInitialInterval(Durations.fromSeconds(1))
1✔
2451
        .setMaximumInterval(Durations.fromSeconds(10))
1✔
2452
        .setBackoffCoefficient(2.0)
1✔
2453
        .setMaximumAttempts(10)
1✔
2454
        .build();
1✔
2455
  }
2456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc