• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.31
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.common.LinkConverter.*;
24
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
25
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
26
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
27
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
28
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
29
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
30
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
31
import static io.temporal.internal.testservice.StateMachines.Action.START;
32
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
33
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
35
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
37
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
38
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
39
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
40
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
41
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
42
import static io.temporal.internal.testservice.StateMachines.State.NONE;
43
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
44
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
45
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
46

47
import com.google.common.base.Preconditions;
48
import com.google.common.base.Strings;
49
import com.google.protobuf.*;
50
import com.google.protobuf.util.Durations;
51
import com.google.protobuf.util.Timestamps;
52
import io.grpc.Status;
53
import io.grpc.StatusRuntimeException;
54
import io.temporal.api.command.v1.*;
55
import io.temporal.api.common.v1.*;
56
import io.temporal.api.enums.v1.*;
57
import io.temporal.api.errordetails.v1.QueryFailedFailure;
58
import io.temporal.api.failure.v1.ApplicationFailureInfo;
59
import io.temporal.api.failure.v1.Failure;
60
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
61
import io.temporal.api.failure.v1.TimeoutFailureInfo;
62
import io.temporal.api.history.v1.*;
63
import io.temporal.api.nexus.v1.*;
64
import io.temporal.api.nexus.v1.Link;
65
import io.temporal.api.protocol.v1.Message;
66
import io.temporal.api.query.v1.WorkflowQueryResult;
67
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
68
import io.temporal.api.taskqueue.v1.TaskQueue;
69
import io.temporal.api.update.v1.*;
70
import io.temporal.api.update.v1.Request;
71
import io.temporal.api.update.v1.Response;
72
import io.temporal.api.workflowservice.v1.*;
73
import io.temporal.internal.common.ProtobufTimeUtils;
74
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
75
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
76
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
77
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
78
import io.temporal.serviceclient.StatusUtils;
79
import io.temporal.workflow.Functions;
80
import java.util.*;
81
import java.util.concurrent.CompletableFuture;
82
import java.util.concurrent.ForkJoinPool;
83
import javax.annotation.Nonnull;
84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
class StateMachines {
×
88

89
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
90

91
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
92
      10L * 365 * 24 * 3600 * 1000;
93
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
94
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
95
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
96
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
97
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
98
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
99
  static final int NO_EVENT_ID = -1;
100

101
  enum State {
1✔
102
    NONE,
1✔
103
    INITIATED,
1✔
104
    STARTED,
1✔
105
    FAILED,
1✔
106
    TIMED_OUT,
1✔
107
    CANCELLATION_REQUESTED,
1✔
108
    CANCELED,
1✔
109
    COMPLETED,
1✔
110
    CONTINUED_AS_NEW,
1✔
111
    TERMINATED,
1✔
112
  }
113

114
  enum Action {
1✔
115
    INITIATE,
1✔
116
    START,
1✔
117
    FAIL,
1✔
118
    TIME_OUT,
1✔
119
    REQUEST_CANCELLATION,
1✔
120
    CANCEL,
1✔
121
    TERMINATE,
1✔
122
    UPDATE,
1✔
123
    COMPLETE,
1✔
124
    CONTINUE_AS_NEW,
1✔
125
    QUERY,
1✔
126
    UPDATE_WORKFLOW_EXECUTION,
1✔
127
  }
128

129
  static final class WorkflowData {
130
    Optional<TestServiceRetryState> retryState;
131
    Duration backoffStartInterval;
132
    String cronSchedule;
133
    Payloads lastCompletionResult;
134
    Optional<Failure> lastFailure;
135
    /**
136
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
137
     */
138
    final @Nonnull String firstExecutionRunId;
139
    /**
140
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
141
     */
142
    final @Nonnull String originalExecutionRunId;
143

144
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
145
    Optional<String> continuedExecutionRunId;
146

147
    Functions.Proc runTimerCancellationHandle;
148

149
    WorkflowData(
150
        Optional<TestServiceRetryState> retryState,
151
        Duration backoffStartInterval,
152
        String cronSchedule,
153
        Payloads lastCompletionResult,
154
        Optional<Failure> lastFailure,
155
        @Nonnull String firstExecutionRunId,
156
        @Nonnull String originalExecutionRunId,
157
        Optional<String> continuedExecutionRunId) {
1✔
158
      this.retryState = retryState;
1✔
159
      this.backoffStartInterval = backoffStartInterval;
1✔
160
      this.cronSchedule = cronSchedule;
1✔
161
      this.lastCompletionResult = lastCompletionResult;
1✔
162
      this.firstExecutionRunId =
1✔
163
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
164
      this.originalExecutionRunId =
1✔
165
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
166
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
167
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
168
    }
1✔
169

170
    @Override
171
    public String toString() {
172
      return "WorkflowData{"
×
173
          + "retryState="
174
          + retryState
175
          + ", backoffStartInterval="
176
          + backoffStartInterval
177
          + ", cronSchedule='"
178
          + cronSchedule
179
          + '\''
180
          + ", lastCompletionResult="
181
          + lastCompletionResult
182
          + ", firstExecutionRunId='"
183
          + firstExecutionRunId
184
          + '\''
185
          + ", originalExecutionRunId='"
186
          + originalExecutionRunId
187
          + '\''
188
          + ", continuedExecutionRunId="
189
          + continuedExecutionRunId
190
          + '}';
191
    }
192
  }
193

194
  static final class WorkflowTaskData {
195

196
    final TestWorkflowStore store;
197

198
    boolean workflowCompleted;
199

200
    /** id of the last started event which completed successfully */
201
    long lastSuccessfulStartedEventId;
202

203
    final StartWorkflowExecutionRequest startRequest;
204

205
    long startedEventId = NO_EVENT_ID;
1✔
206

207
    PollWorkflowTaskQueueResponse.Builder workflowTask;
208

209
    /**
210
     * Events that are added during execution of a workflow task. They have to be buffered to be
211
     * added after the events generated by a workflow task. Without this the determinism will be
212
     * broken on replay.
213
     */
214
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
215

216
    /**
217
     * Update requests that are added during execution of a workflow task. They have to be buffered
218
     * to be added to the next workflow task.
219
     */
220
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
221

222
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
223

224
    long scheduledEventId = NO_EVENT_ID;
1✔
225

226
    int attempt = 0;
1✔
227

228
    /** Query requests received during workflow task processing (after start) */
229
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
230

231
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
232
        new HashMap<>();
233

234
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
235
      this.store = store;
1✔
236
      this.startRequest = startRequest;
1✔
237
    }
1✔
238

239
    void clear() {
240
      startedEventId = NO_EVENT_ID;
1✔
241
      workflowTask = null;
1✔
242
      scheduledEventId = NO_EVENT_ID;
1✔
243
      attempt = 0;
1✔
244
    }
1✔
245

246
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
247
      return Optional.ofNullable(
1✔
248
          updateRequest.getOrDefault(
1✔
249
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
250
    }
251

252
    @Override
253
    public String toString() {
254
      return "WorkflowTaskData{"
×
255
          + "store="
256
          + store
257
          + ", workflowCompleted="
258
          + workflowCompleted
259
          + ", lastSuccessfulStartedEventId="
260
          + lastSuccessfulStartedEventId
261
          + ", startRequest="
262
          + startRequest
263
          + ", startedEventId="
264
          + startedEventId
265
          + ", workflowTask="
266
          + workflowTask
267
          + ", bufferedEvents="
268
          + bufferedEvents
269
          + ", scheduledEventId="
270
          + scheduledEventId
271
          + ", attempt="
272
          + attempt
273
          + ", queryBuffer="
274
          + queryBuffer
275
          + ", consistentQueryRequests="
276
          + consistentQueryRequests
277
          + ", updateRequest="
278
          + updateRequest
279
          + ", updateRequestBuffer="
280
          + updateRequestBuffer
281
          + '}';
282
    }
283
  }
284

285
  static final class ActivityTaskData {
286

287
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
288
    ActivityTaskScheduledEventAttributes scheduledEvent;
289
    ActivityTask activityTask;
290

291
    final TestWorkflowStore store;
292

293
    long scheduledEventId = NO_EVENT_ID;
1✔
294
    long startedEventId = NO_EVENT_ID;
1✔
295
    public HistoryEvent startedEvent;
296
    Payloads heartbeatDetails;
297
    long lastHeartbeatTime;
298
    TestServiceRetryState retryState;
299
    Duration nextBackoffInterval;
300
    String identity;
301

302
    ActivityTaskData(
303
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
304
      this.store = store;
1✔
305
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
306
    }
1✔
307

308
    @Override
309
    public String toString() {
310
      return "ActivityTaskData{"
×
311
          + "startWorkflowExecutionRequest="
312
          + startWorkflowExecutionRequest
313
          + ", scheduledEvent="
314
          + scheduledEvent
315
          + ", activityTask="
316
          + activityTask
317
          + ", store="
318
          + store
319
          + ", scheduledEventId="
320
          + scheduledEventId
321
          + ", startedEventId="
322
          + startedEventId
323
          + ", startedEvent="
324
          + startedEvent
325
          + ", heartbeatDetails="
326
          + heartbeatDetails
327
          + ", lastHeartbeatTime="
328
          + lastHeartbeatTime
329
          + ", retryState="
330
          + retryState
331
          + ", nextBackoffInterval="
332
          + nextBackoffInterval
333
          + '}';
334
    }
335

336
    public int getAttempt() {
337
      return retryState != null ? retryState.getAttempt() : 1;
1✔
338
    }
339
  }
340

341
  static final class NexusOperationData {
342
    // Timeout for an individual Start or Cancel Operation request.
343
    final Duration requestTimeout = Durations.fromSeconds(10);
1✔
344

345
    String operationId = "";
1✔
346
    Endpoint endpoint;
347
    NexusOperationScheduledEventAttributes scheduledEvent;
348
    TestWorkflowStore.NexusTask nexusTask;
349
    RetryPolicy retryPolicy = defaultNexusRetryPolicy();
1✔
350

351
    long scheduledEventId = NO_EVENT_ID;
1✔
352
    Timestamp cancelRequestedTime;
353

354
    TestServiceRetryState retryState;
355
    boolean isBackingOff = false;
1✔
356
    Duration nextBackoffInterval;
357
    Timestamp lastAttemptCompleteTime;
358
    Timestamp nextAttemptScheduleTime;
359
    String identity;
360

361
    public NexusOperationData(Endpoint endpoint) {
1✔
362
      this.endpoint = endpoint;
1✔
363
    }
1✔
364

365
    public int getAttempt() {
366
      return retryState != null ? retryState.getAttempt() : 1;
1✔
367
    }
368

369
    @Override
370
    public String toString() {
371
      return "NexusOperationData{"
×
372
          + ", nexusEndpoint="
373
          + endpoint
374
          + ", scheduledEvent="
375
          + scheduledEvent
376
          + ", nexusTask="
377
          + nexusTask
378
          + ", scheduledEventId="
379
          + scheduledEventId
380
          + ", retryState="
381
          + retryState
382
          + ", nextBackoffInterval="
383
          + nextBackoffInterval
384
          + '}';
385
    }
386
  }
387

388
  static final class SignalExternalData {
1✔
389
    long initiatedEventId = NO_EVENT_ID;
1✔
390
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
391

392
    @Override
393
    public String toString() {
394
      return "SignalExternalData{"
×
395
          + "initiatedEventId="
396
          + initiatedEventId
397
          + ", initiatedEvent="
398
          + initiatedEvent
399
          + '}';
400
    }
401
  }
402

403
  static final class CancelExternalData {
1✔
404
    long initiatedEventId = NO_EVENT_ID;
1✔
405
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
406

407
    @Override
408
    public String toString() {
409
      return "CancelExternalData{"
×
410
          + "initiatedEventId="
411
          + initiatedEventId
412
          + ", initiatedEvent="
413
          + initiatedEvent
414
          + '}';
415
    }
416
  }
417

418
  static final class ChildWorkflowData {
419

420
    final TestWorkflowService service;
421
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
422
    long initiatedEventId;
423
    long startedEventId;
424
    WorkflowExecution execution;
425

426
    public ChildWorkflowData(TestWorkflowService service) {
1✔
427
      this.service = service;
1✔
428
    }
1✔
429

430
    @Override
431
    public String toString() {
432
      return "ChildWorkflowData{"
×
433
          + "service="
434
          + service
435
          + ", initiatedEvent="
436
          + initiatedEvent
437
          + ", initiatedEventId="
438
          + initiatedEventId
439
          + ", startedEventId="
440
          + startedEventId
441
          + ", execution="
442
          + execution
443
          + '}';
444
    }
445
  }
446

447
  static final class TimerData {
1✔
448
    TimerStartedEventAttributes startedEvent;
449
    public long startedEventId;
450

451
    @Override
452
    public String toString() {
453
      return "TimerData{"
×
454
          + "startedEvent="
455
          + startedEvent
456
          + ", startedEventId="
457
          + startedEventId
458
          + '}';
459
    }
460
  }
461

462
  /** Represents an accepted update workflow execution request */
463
  static final class UpdateWorkflowExecutionData {
464
    final String id;
465
    final CompletableFuture<Boolean> accepted;
466
    final CompletableFuture<Outcome> outcome;
467
    final Request initialRequest;
468

469
    public UpdateWorkflowExecutionData(
470
        String id,
471
        Request initialRequest,
472
        CompletableFuture<Boolean> accepted,
473
        CompletableFuture<Outcome> outcome) {
1✔
474
      this.id = id;
1✔
475
      this.initialRequest = initialRequest;
1✔
476
      this.accepted = accepted;
1✔
477
      this.outcome = outcome;
1✔
478
    }
1✔
479

480
    @Override
481
    public String toString() {
482
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
483
    }
484
  }
485

486
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
487
    return new StateMachine<>(data)
1✔
488
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
489
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
490
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
491
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
492
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
493
        .add(
1✔
494
            STARTED,
495
            REQUEST_CANCELLATION,
496
            CANCELLATION_REQUESTED,
497
            StateMachines::requestWorkflowCancellation)
498
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
499
        .add(
1✔
500
            CANCELLATION_REQUESTED,
501
            REQUEST_CANCELLATION,
502
            CANCELLATION_REQUESTED,
503
            StateMachines::noop)
504
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
505
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
506
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
507
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
508
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
509
  }
510

511
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
512
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
513
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
514
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
515
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
516
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
517
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
518
        // StateMachines::queryWhileScheduled)
519
        //        .add(
520
        //            INITIATED_QUERY_ONLY,
521
        //            INITIATE,
522
        //            INITIATED,
523
        //            StateMachines::convertQueryWorkflowTaskToReal)
524
        //        .add(
525
        //            INITIATED_QUERY_ONLY,
526
        //            START,
527
        //            STARTED_QUERY_ONLY,
528
        //            StateMachines::startQueryOnlyWorkflowTask)
529
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
530
        // StateMachines::needsWorkflowTask)
531
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
532
        // StateMachines::needsWorkflowTaskDueToQuery)
533
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
534
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
535
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
536
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
537
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
538
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
539
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
540
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
541
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
542
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
543
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
544
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
545
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
546
  }
547

548
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
549
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
550
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
551
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
552
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
553
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
554
        .add(
1✔
555
            INITIATED,
556
            REQUEST_CANCELLATION,
557
            CANCELLATION_REQUESTED,
558
            StateMachines::requestActivityCancellation)
559
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
560
        // Transitions to initiated in case of a retry
561
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
562
        // Transitions to initiated in case of a retry
563
        .add(
1✔
564
            STARTED,
565
            TIME_OUT,
566
            new State[] {TIMED_OUT, INITIATED},
567
            StateMachines::timeoutActivityTask)
568
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
569
        .add(
1✔
570
            STARTED,
571
            REQUEST_CANCELLATION,
572
            CANCELLATION_REQUESTED,
573
            StateMachines::requestActivityCancellation)
574
        .add(
1✔
575
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
576
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
577
        .add(
1✔
578
            CANCELLATION_REQUESTED,
579
            UPDATE,
580
            CANCELLATION_REQUESTED,
581
            StateMachines::heartbeatActivityTask)
582
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
583
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
584
  }
585

586
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
587
      TestWorkflowService service) {
588
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
589
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
590
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
591
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
592
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
593
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
594
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
595
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
596
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
597
  }
598

599
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
600
      String updateId,
601
      Request initialRequest,
602
      CompletableFuture<Boolean> accepted,
603
      CompletableFuture<Outcome> outcome) {
604
    return new StateMachine<>(
1✔
605
            new UpdateWorkflowExecutionData(updateId, initialRequest, accepted, outcome))
606
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
607
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
608
  }
609

610
  public static StateMachine<NexusOperationData> newNexusOperation(Endpoint endpoint) {
611
    return new StateMachine<>(new NexusOperationData(endpoint))
1✔
612
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleNexusOperation)
1✔
613
        .add(INITIATED, START, STARTED, StateMachines::startNexusOperation)
1✔
614
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
615
        // TODO: properly support cancel before start
616
        // .add(
617
        //     INITIATED,
618
        //     REQUEST_CANCELLATION,
619
        //     INITIATED,
620
        //     StateMachines::requestCancelNexusOperation)
621
        .add(INITIATED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation)
1✔
622
        // Transitions directly from INITIATED to COMPLETE for sync completions
623
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
624
        // Transitions to INITIATED in case of a retry
625
        .add(INITIATED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failNexusOperation)
1✔
626
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
627
        // Transitions back to STARTED in case of a retry
628
        .add(STARTED, FAIL, new State[] {FAILED, STARTED}, StateMachines::failNexusOperation)
1✔
629
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
630
        .add(STARTED, REQUEST_CANCELLATION, STARTED, StateMachines::requestCancelNexusOperation)
1✔
631
        .add(STARTED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation);
1✔
632
  }
633

634
  public static StateMachine<TimerData> newTimerStateMachine() {
635
    return new StateMachine<>(new TimerData())
1✔
636
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
637
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
638
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
639
  }
640

641
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
642
    return new StateMachine<>(new SignalExternalData())
1✔
643
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
644
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
645
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
646
  }
647

648
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
649
    return new StateMachine<>(new CancelExternalData())
1✔
650
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
651
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
652
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
653
  }
654

655
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
656

657
  private static void scheduleNexusOperation(
658
      RequestContext ctx,
659
      NexusOperationData data,
660
      ScheduleNexusOperationCommandAttributes attr,
661
      long workflowTaskCompletedId) {
662
    Duration expirationInterval = attr.getScheduleToCloseTimeout();
1✔
663
    Timestamp expirationTime =
664
        (attr.hasScheduleToCloseTimeout())
1✔
665
            ? Timestamps.add(ctx.currentTime(), expirationInterval)
1✔
666
            : Timestamp.getDefaultInstance();
1✔
667
    TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime);
1✔
668

669
    NexusOperationScheduledEventAttributes.Builder a =
670
        NexusOperationScheduledEventAttributes.newBuilder()
1✔
671
            .setEndpoint(attr.getEndpoint())
1✔
672
            .setEndpointId(data.endpoint.getId())
1✔
673
            .setService(attr.getService())
1✔
674
            .setOperation(attr.getOperation())
1✔
675
            .setInput(attr.getInput())
1✔
676
            .setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
1✔
677
            .putAllNexusHeader(attr.getNexusHeaderMap())
1✔
678
            .setRequestId(UUID.randomUUID().toString())
1✔
679
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
1✔
680

681
    data.scheduledEvent = a.build();
1✔
682
    HistoryEvent event =
683
        HistoryEvent.newBuilder()
1✔
684
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED)
1✔
685
            .setNexusOperationScheduledEventAttributes(a)
1✔
686
            .build();
1✔
687

688
    long scheduledEventId = ctx.addEvent(event);
1✔
689
    NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId);
1✔
690
    NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);
1✔
691

692
    Link link =
693
        workflowEventToNexusLink(
1✔
694
            io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
1✔
695
                .setNamespace(ctx.getNamespace())
1✔
696
                .setWorkflowId(ctx.getExecution().getWorkflowId())
1✔
697
                .setRunId(ctx.getExecution().getRunId())
1✔
698
                .setEventRef(
1✔
699
                    io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder()
1✔
700
                        .setEventId(scheduledEventId)
1✔
701
                        .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED))
1✔
702
                .build());
1✔
703

704
    PollNexusTaskQueueResponse.Builder pollResponse =
705
        PollNexusTaskQueueResponse.newBuilder()
1✔
706
            .setTaskToken(taskToken.toBytes())
1✔
707
            .setRequest(
1✔
708
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
709
                    .setScheduledTime(ctx.currentTime())
1✔
710
                    .putAllHeader(attr.getNexusHeaderMap())
1✔
711
                    .setStartOperation(
1✔
712
                        StartOperationRequest.newBuilder()
1✔
713
                            .setService(attr.getService())
1✔
714
                            .setOperation(attr.getOperation())
1✔
715
                            .setPayload(attr.getInput())
1✔
716
                            .addLinks(link)
1✔
717
                            .setCallback("http://test-env/operations")
1✔
718
                            .setRequestId(UUID.randomUUID().toString())
1✔
719
                            // The test server uses this to lookup the operation
720
                            .putCallbackHeader(
1✔
721
                                "operation-reference", ref.toBytes().toStringUtf8())));
1✔
722

723
    TaskQueueId taskQueueId =
1✔
724
        new TaskQueueId(
725
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
726
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
727
    TestWorkflowStore.NexusTask task =
1✔
728
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
729

730
    // Test server only supports worker targets, so just push directly to Nexus task queue without
731
    // invoking Nexus client.
732
    ctx.addNexusTask(task);
1✔
733
    ctx.onCommit(
1✔
734
        historySize -> {
735
          data.scheduledEventId = scheduledEventId;
1✔
736
          data.nexusTask = task;
1✔
737
          data.retryState = retryState;
1✔
738
        });
1✔
739
  }
1✔
740

741
  private static void startNexusOperation(
742
      RequestContext ctx,
743
      NexusOperationData data,
744
      StartOperationResponse.Async resp,
745
      long notUsed) {
746
    HistoryEvent.Builder event =
747
        HistoryEvent.newBuilder()
1✔
748
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
1✔
749
            .setNexusOperationStartedEventAttributes(
1✔
750
                NexusOperationStartedEventAttributes.newBuilder()
1✔
751
                    .setOperationId(resp.getOperationId())
1✔
752
                    .setScheduledEventId(data.scheduledEventId)
1✔
753
                    .setRequestId(data.scheduledEvent.getRequestId()));
1✔
754

755
    for (Link l : resp.getLinksList()) {
1✔
756
      if (!l.getType()
1✔
757
          .equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
1✔
758
        continue;
×
759
      }
760
      event.addLinks(nexusLinkToWorkflowEvent(l));
1✔
761
    }
1✔
762

763
    ctx.addEvent(event.build());
1✔
764
    ctx.onCommit(historySize -> data.operationId = resp.getOperationId());
1✔
765
  }
1✔
766

767
  private static void completeNexusOperation(
768
      RequestContext ctx, NexusOperationData data, Payload result, long notUsed) {
769
    ctx.addEvent(
1✔
770
        HistoryEvent.newBuilder()
1✔
771
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED)
1✔
772
            .setNexusOperationCompletedEventAttributes(
1✔
773
                NexusOperationCompletedEventAttributes.newBuilder()
1✔
774
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
775
                    .setScheduledEventId(data.scheduledEventId)
1✔
776
                    .setResult(result))
1✔
777
            .build());
1✔
778
  }
1✔
779

780
  private static void timeoutNexusOperation(
781
      RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
782
    if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
1✔
783
      throw new IllegalArgumentException(
×
784
          "Timeout type not supported for Nexus operations: " + timeoutType);
785
    }
786

787
    Failure failure =
788
        Failure.newBuilder()
1✔
789
            .setMessage("nexus operation completed unsuccessfully")
1✔
790
            .setNexusOperationExecutionFailureInfo(
1✔
791
                NexusOperationFailureInfo.newBuilder()
1✔
792
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
793
                    .setService(data.scheduledEvent.getService())
1✔
794
                    .setOperation(data.scheduledEvent.getOperation())
1✔
795
                    .setOperationId(data.operationId)
1✔
796
                    .setScheduledEventId(data.scheduledEventId))
1✔
797
            .setCause(
1✔
798
                Failure.newBuilder()
1✔
799
                    .setMessage("operation timed out")
1✔
800
                    .setTimeoutFailureInfo(
1✔
801
                        TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType)))
1✔
802
            .build();
1✔
803

804
    ctx.addEvent(
1✔
805
        HistoryEvent.newBuilder()
1✔
806
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT)
1✔
807
            .setNexusOperationTimedOutEventAttributes(
1✔
808
                NexusOperationTimedOutEventAttributes.newBuilder()
1✔
809
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
810
                    .setScheduledEventId(data.scheduledEventId)
1✔
811
                    .setFailure(failure))
1✔
812
            .build());
1✔
813
  }
1✔
814

815
  private static State failNexusOperation(
816
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
817
    RetryState retryState = attemptNexusOperationRetry(ctx, Optional.of(failure), data);
1✔
818
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS
1✔
819
        || retryState == RetryState.RETRY_STATE_TIMEOUT) {
820
      // RETRY_STATE_TIMEOUT indicates that the next attempt schedule time would exceed the
821
      // operation's schedule-to-close timeout, so do not fail the operation here and allow
822
      // it to be timed out by the timer set in
823
      // io.temporal.internal.testservice.TestWorkflowMutableStateImpl.timeoutNexusOperation
824
      return (Strings.isNullOrEmpty(data.operationId)) ? INITIATED : STARTED;
1✔
825
    }
826

827
    Failure wrapped =
828
        Failure.newBuilder()
1✔
829
            .setMessage("nexus operation completed unsuccessfully")
1✔
830
            .setNexusOperationExecutionFailureInfo(
1✔
831
                NexusOperationFailureInfo.newBuilder()
1✔
832
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
833
                    .setService(data.scheduledEvent.getService())
1✔
834
                    .setOperation(data.scheduledEvent.getOperation())
1✔
835
                    .setOperationId(data.operationId)
1✔
836
                    .setScheduledEventId(data.scheduledEventId))
1✔
837
            .setCause(failure)
1✔
838
            .build();
1✔
839

840
    ctx.addEvent(
1✔
841
        HistoryEvent.newBuilder()
1✔
842
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED)
1✔
843
            .setNexusOperationFailedEventAttributes(
1✔
844
                NexusOperationFailedEventAttributes.newBuilder()
1✔
845
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
846
                    .setScheduledEventId(data.scheduledEventId)
1✔
847
                    .setFailure(wrapped))
1✔
848
            .build());
1✔
849
    return FAILED;
1✔
850
  }
851

852
  private static RetryState attemptNexusOperationRetry(
853
      RequestContext ctx, Optional<Failure> failure, NexusOperationData data) {
854
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
855
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
856
    if (info.isPresent()) {
1✔
857
      if (info.get().getNonRetryable()) {
1✔
858
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
859
      }
860
      if (info.get().hasNextRetryDelay()) {
1✔
861
        nextRetryDelay =
×
862
            Optional.of(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
×
863
      }
864
    }
865

866
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
867
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
868
        data.retryState.getBackoffIntervalInSeconds(
1✔
869
            info.map(ApplicationFailureInfo::getType), ctx.currentTime(), nextRetryDelay);
1✔
870
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
871
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
872
      PollNexusTaskQueueResponse.Builder task = data.nexusTask.getTask();
1✔
873
      ctx.onCommit(
1✔
874
          (historySize) -> {
875
            data.retryState = nextAttempt;
1✔
876
            data.isBackingOff = true;
1✔
877
            data.lastAttemptCompleteTime = ctx.currentTime();
1✔
878
            data.nextAttemptScheduleTime =
1✔
879
                Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval);
1✔
880
            task.setTaskToken(
1✔
881
                new NexusTaskToken(
882
                        ctx.getExecutionId(),
1✔
883
                        data.scheduledEventId,
884
                        nextAttempt.getAttempt(),
1✔
885
                        task.getRequest().hasCancelOperation())
1✔
886
                    .toBytes());
1✔
887
          });
1✔
888
    } else {
1✔
889
      data.nextBackoffInterval = Durations.ZERO;
1✔
890
    }
891
    return backoffInterval.getRetryState();
1✔
892
  }
893

894
  private static void requestCancelNexusOperation(
895
      RequestContext ctx,
896
      NexusOperationData data,
897
      RequestCancelNexusOperationCommandAttributes attr,
898
      long workflowTaskCompletedId) {
899
    ctx.addEvent(
1✔
900
        HistoryEvent.newBuilder()
1✔
901
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED)
1✔
902
            .setNexusOperationCancelRequestedEventAttributes(
1✔
903
                NexusOperationCancelRequestedEventAttributes.newBuilder()
1✔
904
                    .setScheduledEventId(data.scheduledEventId)
1✔
905
                    .setWorkflowTaskCompletedEventId(workflowTaskCompletedId))
1✔
906
            .build());
1✔
907

908
    NexusTaskToken taskToken =
1✔
909
        new NexusTaskToken(ctx.getExecutionId(), data.scheduledEventId, data.getAttempt(), true);
1✔
910

911
    PollNexusTaskQueueResponse.Builder pollResponse =
912
        PollNexusTaskQueueResponse.newBuilder()
1✔
913
            .setTaskToken(taskToken.toBytes())
1✔
914
            .setRequest(
1✔
915
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
916
                    .setCancelOperation(
1✔
917
                        CancelOperationRequest.newBuilder()
1✔
918
                            .setOperationId(data.operationId)
1✔
919
                            .setOperation(data.scheduledEvent.getOperation())
1✔
920
                            .setService(data.scheduledEvent.getService())));
1✔
921

922
    TaskQueueId taskQueueId =
1✔
923
        new TaskQueueId(
924
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
925
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
926
    TestWorkflowStore.NexusTask cancelTask =
1✔
927
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
928

929
    // Test server only supports worker targets, so just push directly to Nexus task queue without
930
    // invoking Nexus client.
931
    ctx.addNexusTask(cancelTask);
1✔
932
    ctx.onCommit(
1✔
933
        historySize -> {
934
          data.nexusTask = cancelTask;
1✔
935
          data.cancelRequestedTime = ctx.currentTime();
1✔
936
          data.isBackingOff = false;
1✔
937
        });
1✔
938
  }
1✔
939

940
  private static void reportNexusOperationCancellation(
941
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
942
    Failure.Builder wrapped =
943
        Failure.newBuilder()
1✔
944
            .setMessage("nexus operation completed unsuccessfully")
1✔
945
            .setNexusOperationExecutionFailureInfo(
1✔
946
                NexusOperationFailureInfo.newBuilder()
1✔
947
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
948
                    .setService(data.scheduledEvent.getService())
1✔
949
                    .setOperation(data.scheduledEvent.getOperation())
1✔
950
                    .setOperationId(data.operationId)
1✔
951
                    .setScheduledEventId(data.scheduledEventId));
1✔
952
    if (failure != null) {
1✔
953
      wrapped.setCause(failure);
1✔
954
    }
955
    ctx.addEvent(
1✔
956
        HistoryEvent.newBuilder()
1✔
957
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED)
1✔
958
            .setNexusOperationCanceledEventAttributes(
1✔
959
                NexusOperationCanceledEventAttributes.newBuilder()
1✔
960
                    .setScheduledEventId(data.scheduledEventId)
1✔
961
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
962
                    .setFailure(wrapped))
1✔
963
            .build());
1✔
964
  }
1✔
965

966
  private static void timeoutChildWorkflow(
967
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
968
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
969
    ChildWorkflowExecutionTimedOutEventAttributes a =
970
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
971
            .setNamespace(ie.getNamespace())
1✔
972
            .setStartedEventId(data.startedEventId)
1✔
973
            .setWorkflowExecution(data.execution)
1✔
974
            .setWorkflowType(ie.getWorkflowType())
1✔
975
            .setRetryState(retryState)
1✔
976
            .setInitiatedEventId(data.initiatedEventId)
1✔
977
            .build();
1✔
978
    HistoryEvent event =
979
        HistoryEvent.newBuilder()
1✔
980
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
981
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
982
            .build();
1✔
983
    ctx.addEvent(event);
1✔
984
  }
1✔
985

986
  private static void startChildWorkflowFailed(
987
      RequestContext ctx,
988
      ChildWorkflowData data,
989
      StartChildWorkflowExecutionFailedEventAttributes a,
990
      long notUsed) {
991
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
992
        a.toBuilder()
1✔
993
            .setInitiatedEventId(data.initiatedEventId)
1✔
994
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
995
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
996
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
997
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
998
    }
999
    HistoryEvent event =
1000
        HistoryEvent.newBuilder()
1✔
1001
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1002
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1003
            .build();
1✔
1004
    ctx.addEvent(event);
1✔
1005
  }
1✔
1006

1007
  private static void childWorkflowStarted(
1008
      RequestContext ctx,
1009
      ChildWorkflowData data,
1010
      ChildWorkflowExecutionStartedEventAttributes a,
1011
      long notUsed) {
1012
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
1013
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
1014
    HistoryEvent event =
1015
        HistoryEvent.newBuilder()
1✔
1016
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
1017
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
1018
            .build();
1✔
1019
    long startedEventId = ctx.addEvent(event);
1✔
1020
    ctx.onCommit(
1✔
1021
        (historySize) -> {
1022
          data.startedEventId = startedEventId;
1✔
1023
          data.execution = updatedAttr.getWorkflowExecution();
1✔
1024
        });
1✔
1025
  }
1✔
1026

1027
  private static void childWorkflowCompleted(
1028
      RequestContext ctx,
1029
      ChildWorkflowData data,
1030
      ChildWorkflowExecutionCompletedEventAttributes a,
1031
      long notUsed) {
1032
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
1033
        a.toBuilder()
1✔
1034
            .setInitiatedEventId(data.initiatedEventId)
1✔
1035
            .setStartedEventId(data.startedEventId)
1✔
1036
            .build();
1✔
1037
    HistoryEvent event =
1038
        HistoryEvent.newBuilder()
1✔
1039
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
1040
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
1041
            .build();
1✔
1042
    ctx.addEvent(event);
1✔
1043
  }
1✔
1044

1045
  private static void childWorkflowFailed(
1046
      RequestContext ctx,
1047
      ChildWorkflowData data,
1048
      ChildWorkflowExecutionFailedEventAttributes a,
1049
      long notUsed) {
1050
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
1051
        a.toBuilder()
1✔
1052
            .setInitiatedEventId(data.initiatedEventId)
1✔
1053
            .setStartedEventId(data.startedEventId)
1✔
1054
            .setWorkflowExecution(data.execution)
1✔
1055
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
1056
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
1057
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
1058
    }
1059
    HistoryEvent event =
1060
        HistoryEvent.newBuilder()
1✔
1061
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1062
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1063
            .build();
1✔
1064
    ctx.addEvent(event);
1✔
1065
  }
1✔
1066

1067
  private static void childWorkflowCanceled(
1068
      RequestContext ctx,
1069
      ChildWorkflowData data,
1070
      ChildWorkflowExecutionCanceledEventAttributes a,
1071
      long notUsed) {
1072
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
1073
        a.toBuilder()
1✔
1074
            .setInitiatedEventId(data.initiatedEventId)
1✔
1075
            .setStartedEventId(data.startedEventId)
1✔
1076
            .build();
1✔
1077
    HistoryEvent event =
1078
        HistoryEvent.newBuilder()
1✔
1079
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
1080
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
1081
            .build();
1✔
1082
    ctx.addEvent(event);
1✔
1083
  }
1✔
1084

1085
  private static void initiateChildWorkflow(
1086
      RequestContext ctx,
1087
      ChildWorkflowData data,
1088
      StartChildWorkflowExecutionCommandAttributes d,
1089
      long workflowTaskCompletedEventId) {
1090
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
1091
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1092
            .setControl(d.getControl())
1✔
1093
            .setInput(d.getInput())
1✔
1094
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1095
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1096
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1097
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1098
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1099
            .setTaskQueue(d.getTaskQueue())
1✔
1100
            .setWorkflowId(d.getWorkflowId())
1✔
1101
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1102
            .setWorkflowType(d.getWorkflowType())
1✔
1103
            .setCronSchedule(d.getCronSchedule())
1✔
1104
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
1105
    if (d.hasHeader()) {
1✔
1106
      a.setHeader(d.getHeader());
1✔
1107
    }
1108
    if (d.hasMemo()) {
1✔
1109
      a.setMemo(d.getMemo());
1✔
1110
    }
1111
    if (d.hasRetryPolicy()) {
1✔
1112
      a.setRetryPolicy(d.getRetryPolicy());
1✔
1113
    }
1114
    HistoryEvent event =
1115
        HistoryEvent.newBuilder()
1✔
1116
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
1117
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
1118
            .build();
1✔
1119
    long initiatedEventId = ctx.addEvent(event);
1✔
1120
    ctx.onCommit(
1✔
1121
        (historySize) -> {
1122
          data.initiatedEventId = initiatedEventId;
1✔
1123
          data.initiatedEvent = a.build();
1✔
1124
          StartWorkflowExecutionRequest.Builder startChild =
1125
              StartWorkflowExecutionRequest.newBuilder()
1✔
1126
                  .setRequestId(UUID.randomUUID().toString())
1✔
1127
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1128
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1129
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1130
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1131
                  .setTaskQueue(d.getTaskQueue())
1✔
1132
                  .setWorkflowId(d.getWorkflowId())
1✔
1133
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1134
                  .setWorkflowType(d.getWorkflowType())
1✔
1135
                  .setCronSchedule(d.getCronSchedule());
1✔
1136
          if (d.hasHeader()) {
1✔
1137
            startChild.setHeader(d.getHeader());
1✔
1138
          }
1139
          if (d.hasSearchAttributes()) {
1✔
1140
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
1141
          }
1142
          if (d.hasMemo()) {
1✔
1143
            startChild.setMemo(d.getMemo());
1✔
1144
          }
1145
          if (d.hasRetryPolicy()) {
1✔
1146
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
1147
          }
1148
          if (d.hasInput()) {
1✔
1149
            startChild.setInput(d.getInput());
1✔
1150
          }
1151
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
1152
        });
1✔
1153
  }
1✔
1154

1155
  private static void addStartChildTask(
1156
      RequestContext ctx,
1157
      ChildWorkflowData data,
1158
      long initiatedEventId,
1159
      StartWorkflowExecutionRequest startChild) {
1160
    ForkJoinPool.commonPool()
1✔
1161
        .execute(
1✔
1162
            () -> {
1163
              try {
1164
                data.service.startWorkflowExecutionImpl(
1✔
1165
                    startChild,
1166
                    java.time.Duration.ZERO,
1167
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
1168
                    OptionalLong.of(data.initiatedEventId),
1✔
1169
                    null);
1170
              } catch (StatusRuntimeException e) {
1✔
1171
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
1172
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
1173
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1174
                          .setInitiatedEventId(initiatedEventId)
1✔
1175
                          .setCause(
1✔
1176
                              StartChildWorkflowExecutionFailedCause
1177
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
1178
                          .build();
1✔
1179
                  try {
1180
                    ctx.getWorkflowMutableState()
1✔
1181
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
1182
                  } catch (Throwable ee) {
×
1183
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
1184
                  }
1✔
1185
                } else {
1✔
1186
                  log.error("Unexpected failure starting a child workflow", e);
×
1187
                }
1188
              } catch (Exception e) {
×
1189
                log.error("Unexpected failure starting a child workflow", e);
×
1190
              }
1✔
1191
            });
1✔
1192
  }
1✔
1193

1194
  private static void startWorkflow(
1195
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
1196
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
1197
      throw Status.INVALID_ARGUMENT
×
1198
          .withDescription("negative workflowExecution timeout")
×
1199
          .asRuntimeException();
×
1200
    }
1201
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
1202
      throw Status.INVALID_ARGUMENT
×
1203
          .withDescription("negative workflowRun timeout")
×
1204
          .asRuntimeException();
×
1205
    }
1206
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
1207
      throw Status.INVALID_ARGUMENT
×
1208
          .withDescription("negative workflowTaskTimeoutSeconds")
×
1209
          .asRuntimeException();
×
1210
    }
1211
    if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
1✔
1212
      throw Status.INVALID_ARGUMENT
×
1213
          .withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
×
1214
          .asRuntimeException();
×
1215
    }
1216
    if (request.getCompletionCallbacksCount() > 0
1✔
1217
        && !request.getCompletionCallbacksList().stream().allMatch(Callback::hasNexus)) {
1✔
1218
      throw Status.INVALID_ARGUMENT
×
1219
          .withDescription("non-Nexus completion callbacks are not supported.")
×
1220
          .asRuntimeException();
×
1221
    }
1222

1223
    WorkflowExecutionStartedEventAttributes.Builder a =
1224
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
1225
            .setWorkflowType(request.getWorkflowType())
1✔
1226
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
1227
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
1228
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
1229
            .setIdentity(request.getIdentity())
1✔
1230
            .setInput(request.getInput())
1✔
1231
            .setTaskQueue(request.getTaskQueue())
1✔
1232
            .addAllCompletionCallbacks(request.getCompletionCallbacksList())
1✔
1233
            .setAttempt(1);
1✔
1234
    if (request.hasRetryPolicy()) {
1✔
1235
      a.setRetryPolicy(request.getRetryPolicy());
1✔
1236
    }
1237
    data.retryState.ifPresent(
1✔
1238
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
1239
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
1240
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
1241
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
1242
    if (data.lastCompletionResult != null) {
1✔
1243
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
1244
    }
1245
    if (request.hasWorkflowStartDelay()) {
1✔
1246
      a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1✔
1247
    }
1248
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
1249
    if (request.hasMemo()) {
1✔
1250
      a.setMemo(request.getMemo());
1✔
1251
    }
1252
    if (request.hasSearchAttributes()) {
1✔
1253
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
1254
    }
1255
    if (request.hasHeader()) {
1✔
1256
      a.setHeader(request.getHeader());
1✔
1257
    }
1258
    String cronSchedule = request.getCronSchedule();
1✔
1259
    if (!cronSchedule.trim().isEmpty()) {
1✔
1260
      try {
1261
        CronUtils.parseCron(cronSchedule);
1✔
1262
        a.setCronSchedule(cronSchedule);
1✔
1263
      } catch (Exception e) {
×
1264
        throw Status.INVALID_ARGUMENT
×
1265
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
1266
            .withCause(e)
×
1267
            .asRuntimeException();
×
1268
      }
1✔
1269
    }
1270
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
1271
    if (parent.isPresent()) {
1✔
1272
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
1273
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
1274
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
1275
    }
1276
    HistoryEvent.Builder event =
1277
        HistoryEvent.newBuilder()
1✔
1278
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
1279
            .setWorkflowExecutionStartedEventAttributes(a);
1✔
1280
    if (request.getLinksCount() > 0) {
1✔
1281
      event.addAllLinks(request.getLinksList());
1✔
1282
    }
1283
    ctx.addEvent(event.build());
1✔
1284
  }
1✔
1285

1286
  private static void completeWorkflow(
1287
      RequestContext ctx,
1288
      WorkflowData data,
1289
      CompleteWorkflowExecutionCommandAttributes d,
1290
      long workflowTaskCompletedEventId) {
1291
    WorkflowExecutionCompletedEventAttributes.Builder a =
1292
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
1293
            .setResult(d.getResult())
1✔
1294
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1295
    HistoryEvent event =
1296
        HistoryEvent.newBuilder()
1✔
1297
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
1298
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
1299
            .build();
1✔
1300
    ctx.addEvent(event);
1✔
1301
  }
1✔
1302

1303
  private static void continueAsNewWorkflow(
1304
      RequestContext ctx,
1305
      WorkflowData data,
1306
      ContinueAsNewWorkflowExecutionCommandAttributes d,
1307
      long workflowTaskCompletedEventId) {
1308
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
1309
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
1310
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
1311
    a.setInput(d.getInput());
1✔
1312
    if (d.hasHeader()) {
1✔
1313
      a.setHeader(d.getHeader());
1✔
1314
    }
1315
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
1316
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
1317
    } else {
1318
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
1319
    }
1320
    if (d.hasTaskQueue() && !d.getTaskQueue().getName().isEmpty()) {
1✔
1321
      a.setTaskQueue(d.getTaskQueue());
1✔
1322
    } else {
1323
      a.setTaskQueue(sr.getTaskQueue());
1✔
1324
    }
1325
    if (d.hasWorkflowType() && !d.getWorkflowType().getName().isEmpty()) {
1✔
1326
      a.setWorkflowType(d.getWorkflowType());
1✔
1327
    } else {
1328
      a.setWorkflowType(sr.getWorkflowType());
1✔
1329
    }
1330
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
1331
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
1332
    } else {
1333
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
1334
    }
1335
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1336
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
1337
    if (d.hasLastCompletionResult()) {
1✔
1338
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
1339
    }
1340
    if (d.hasFailure()) {
1✔
1341
      a.setFailure(d.getFailure());
1✔
1342
    }
1343
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
1344
    HistoryEvent event =
1345
        HistoryEvent.newBuilder()
1✔
1346
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
1347
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
1348
            .build();
1✔
1349
    ctx.addEvent(event);
1✔
1350
  }
1✔
1351

1352
  private static void failWorkflow(
1353
      RequestContext ctx,
1354
      WorkflowData data,
1355
      FailWorkflowExecutionCommandAttributes d,
1356
      long workflowTaskCompletedEventId) {
1357
    WorkflowExecutionFailedEventAttributes.Builder a =
1358
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1359
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1360
    if (d.hasFailure()) {
1✔
1361
      a.setFailure(d.getFailure());
1✔
1362
    }
1363
    HistoryEvent event =
1364
        HistoryEvent.newBuilder()
1✔
1365
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
1366
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
1367
            .build();
1✔
1368
    ctx.addEvent(event);
1✔
1369
  }
1✔
1370

1371
  private static void timeoutWorkflow(
1372
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
1373
    WorkflowExecutionTimedOutEventAttributes.Builder a =
1374
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
1375
    HistoryEvent event =
1376
        HistoryEvent.newBuilder()
1✔
1377
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
1378
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
1379
            .build();
1✔
1380
    ctx.addEvent(event);
1✔
1381
  }
1✔
1382

1383
  private static void cancelWorkflow(
1384
      RequestContext ctx,
1385
      WorkflowData data,
1386
      CancelWorkflowExecutionCommandAttributes d,
1387
      long workflowTaskCompletedEventId) {
1388
    WorkflowExecutionCanceledEventAttributes.Builder a =
1389
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1390
            .setDetails(d.getDetails())
1✔
1391
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1392
    HistoryEvent event =
1393
        HistoryEvent.newBuilder()
1✔
1394
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1395
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1396
            .build();
1✔
1397
    ctx.addEvent(event);
1✔
1398
  }
1✔
1399

1400
  private static void terminateWorkflow(
1401
      RequestContext ctx,
1402
      WorkflowData data,
1403
      TerminateWorkflowExecutionRequest d,
1404
      long workflowTaskCompletedEventId) {
1405
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1406
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1407
            .setDetails(d.getDetails())
1✔
1408
            .setIdentity(d.getIdentity())
1✔
1409
            .setReason(d.getReason());
1✔
1410
    HistoryEvent event =
1411
        HistoryEvent.newBuilder()
1✔
1412
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1413
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1414
            .build();
1✔
1415
    ctx.addEvent(event);
1✔
1416
  }
1✔
1417

1418
  private static void requestWorkflowCancellation(
1419
      RequestContext ctx,
1420
      WorkflowData data,
1421
      RequestCancelWorkflowExecutionRequest cancelRequest,
1422
      long notUsed) {
1423
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1424
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1425
            .setIdentity(cancelRequest.getIdentity());
1✔
1426
    HistoryEvent cancelRequested =
1427
        HistoryEvent.newBuilder()
1✔
1428
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1429
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1430
            .build();
1✔
1431
    ctx.addEvent(cancelRequested);
1✔
1432
  }
1✔
1433

1434
  private static void scheduleActivityTask(
1435
      RequestContext ctx,
1436
      ActivityTaskData data,
1437
      ScheduleActivityTaskCommandAttributes d,
1438
      long workflowTaskCompletedEventId) {
1439
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1440
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1441
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1442
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1443

1444
    ActivityTaskScheduledEventAttributes.Builder a =
1445
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1446
            .setInput(d.getInput())
1✔
1447
            .setActivityId(d.getActivityId())
1✔
1448
            .setActivityType(d.getActivityType())
1✔
1449
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1450
            .setRetryPolicy(retryPolicy)
1✔
1451
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1452
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1453
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1454
            .setTaskQueue(d.getTaskQueue())
1✔
1455
            .setHeader(d.getHeader())
1✔
1456
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1457

1458
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1459
    data.scheduledEvent = a.build();
1✔
1460
    HistoryEvent event =
1461
        HistoryEvent.newBuilder()
1✔
1462
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1463
            .setActivityTaskScheduledEventAttributes(a)
1✔
1464
            .build();
1✔
1465
    long scheduledEventId = ctx.addEvent(event);
1✔
1466

1467
    PollActivityTaskQueueResponse.Builder taskResponse =
1468
        PollActivityTaskQueueResponse.newBuilder()
1✔
1469
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1470
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1471
            .setActivityType(d.getActivityType())
1✔
1472
            .setWorkflowExecution(ctx.getExecution())
1✔
1473
            .setActivityId(d.getActivityId())
1✔
1474
            .setInput(d.getInput())
1✔
1475
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1476
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1477
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1478
            .setScheduledTime(ctx.currentTime())
1✔
1479
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1480
            .setHeader(d.getHeader())
1✔
1481
            .setAttempt(1);
1✔
1482

1483
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1484
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1485
    ctx.addActivityTask(activityTask);
1✔
1486
    ctx.onCommit(
1✔
1487
        (historySize) -> {
1488
          data.scheduledEventId = scheduledEventId;
1✔
1489
          data.activityTask = activityTask;
1✔
1490
          data.retryState = retryState;
1✔
1491
        });
1✔
1492
  }
1✔
1493

1494
  private static void requestActivityCancellation(
1495
      RequestContext ctx,
1496
      ActivityTaskData data,
1497
      RequestCancelActivityTaskCommandAttributes d,
1498
      long workflowTaskCompletedEventId) {
1499
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1500
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1501
            .setScheduledEventId(d.getScheduledEventId())
1✔
1502
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1503
    HistoryEvent event =
1504
        HistoryEvent.newBuilder()
1✔
1505
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1506
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1507
            .build();
1✔
1508
    ctx.addEvent(event);
1✔
1509
  }
1✔
1510

1511
  private static void scheduleWorkflowTask(
1512
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1513
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1514
    long scheduledEventId;
1515
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1516
    WorkflowTaskScheduledEventAttributes a =
1517
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1518
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1519
            .setTaskQueue(taskQueue)
1✔
1520
            .setAttempt(++data.attempt)
1✔
1521
            .build();
1✔
1522
    HistoryEvent event =
1523
        HistoryEvent.newBuilder()
1✔
1524
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1525
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1526
            .build();
1✔
1527
    scheduledEventId = ctx.addEvent(event);
1✔
1528
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1529
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1530
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1531
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1532
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1533
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1534
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1535
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1536
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1537
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1538
    ctx.onCommit(
1✔
1539
        (historySize) -> {
1540
          data.scheduledEventId = scheduledEventId;
1✔
1541
          data.workflowTask = workflowTaskResponse;
1✔
1542
          // Move buffered update request to new workflow task
1543
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1544
          data.updateRequestBuffer.clear();
1✔
1545
        });
1✔
1546
  }
1✔
1547

1548
  private static void convertQueryWorkflowTaskToReal(
1549
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1550
    StartWorkflowExecutionRequest request = data.startRequest;
×
1551
    WorkflowTaskScheduledEventAttributes a =
1552
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1553
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1554
            .setTaskQueue(request.getTaskQueue())
×
1555
            .setAttempt(data.attempt)
×
1556
            .build();
×
1557
    HistoryEvent event =
1558
        HistoryEvent.newBuilder()
×
1559
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1560
            .setWorkflowTaskScheduledEventAttributes(a)
×
1561
            .build();
×
1562
    long scheduledEventId = ctx.addEvent(event);
×
1563
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1564
  }
×
1565

1566
  private static void scheduleQueryWorkflowTask(
1567
      RequestContext ctx,
1568
      WorkflowTaskData data,
1569
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1570
      long notUsed) {
1571
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1572
    StartWorkflowExecutionRequest request = data.startRequest;
×
1573
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1574
        PollWorkflowTaskQueueResponse.newBuilder();
×
1575
    StickyExecutionAttributes stickyAttributes =
×
1576
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1577
    String taskQueue =
1578
        stickyAttributes == null
×
1579
            ? request.getTaskQueue().getName()
×
1580
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1581
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1582
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1583
    workflowTaskResponse.setAttempt(++data.attempt);
×
1584
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1585
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1586
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1587
    ctx.onCommit(
×
1588
        (historySize) -> {
1589
          if (data.lastSuccessfulStartedEventId > 0) {
×
1590
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1591
          }
1592
          data.scheduledEventId = NO_EVENT_ID;
×
1593
          data.workflowTask = workflowTaskResponse;
×
1594
          if (query != null) {
×
1595
            data.consistentQueryRequests.put(query.getKey(), query);
×
1596
          }
1597
        });
×
1598
  }
×
1599

1600
  private static void queryWhileScheduled(
1601
      RequestContext ctx,
1602
      WorkflowTaskData data,
1603
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1604
      long notUsed) {
1605
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1606
  }
1✔
1607

1608
  private static void bufferQuery(
1609
      RequestContext ctx,
1610
      WorkflowTaskData data,
1611
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1612
      long notUsed) {
1613
    data.queryBuffer.put(query.getKey(), query);
1✔
1614
  }
1✔
1615

1616
  private static void bufferUpdate(
1617
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1618
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1619
      throw Status.INTERNAL
×
1620
          .withDescription("Update ID already exists: " + update.getId())
×
1621
          .asRuntimeException();
×
1622
    }
1623
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1624
  }
1✔
1625

1626
  private static void addUpdate(
1627
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1628
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1629
      throw Status.INTERNAL
×
1630
          .withDescription("Update ID already exists: " + update.getId())
×
1631
          .asRuntimeException();
×
1632
    }
1633
    data.updateRequest.put(update.getId(), update);
1✔
1634
  }
1✔
1635

1636
  private static void startWorkflowTask(
1637
      RequestContext ctx,
1638
      WorkflowTaskData data,
1639
      PollWorkflowTaskQueueRequest request,
1640
      long notUsed) {
1641
    WorkflowTaskStartedEventAttributes a =
1642
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1643
            .setIdentity(request.getIdentity())
1✔
1644
            .setScheduledEventId(data.scheduledEventId)
1✔
1645
            .build();
1✔
1646
    HistoryEvent event =
1647
        HistoryEvent.newBuilder()
1✔
1648
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1649
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1650
            .build();
1✔
1651
    long startedEventId = ctx.addEvent(event);
1✔
1652
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1653
  }
1✔
1654

1655
  private static void startQueryOnlyWorkflowTask(
1656
      RequestContext ctx,
1657
      WorkflowTaskData data,
1658
      PollWorkflowTaskQueueRequest request,
1659
      long notUsed) {
1660
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1661
  }
×
1662

1663
  private static void startWorkflowTaskImpl(
1664
      RequestContext ctx,
1665
      WorkflowTaskData data,
1666
      PollWorkflowTaskQueueRequest request,
1667
      long startedEventId,
1668
      boolean queryOnly) {
1669
    ctx.onCommit(
1✔
1670
        (historySize) -> {
1671
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1672
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1673
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1674
          task.setTaskToken(taskToken.toBytes());
1✔
1675
          GetWorkflowExecutionHistoryRequest getRequest =
1676
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1677
                  .setNamespace(request.getNamespace())
1✔
1678
                  .setExecution(ctx.getExecution())
1✔
1679
                  .build();
1✔
1680
          List<HistoryEvent> events;
1681
          events =
1✔
1682
              data.store
1683
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1684
                  .getHistory()
1✔
1685
                  .getEventsList();
1✔
1686
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1687
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1688
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1689
          }
1690
          if (queryOnly && !data.workflowCompleted) {
1✔
1691
            events = new ArrayList<>(events); // convert list to mutable
×
1692
            // Add "fake" workflow task scheduled and started if workflow is not closed
1693
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1694
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1695
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1696
                    .setTaskQueue(request.getTaskQueue())
×
1697
                    .setAttempt(data.attempt)
×
1698
                    .build();
×
1699
            HistoryEvent scheduledEvent =
1700
                HistoryEvent.newBuilder()
×
1701
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1702
                    .setEventId(lastEventId + 1)
×
1703
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1704
                    .build();
×
1705
            events.add(scheduledEvent);
×
1706
            WorkflowTaskStartedEventAttributes startedAttributes =
1707
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1708
                    .setIdentity(request.getIdentity())
×
1709
                    .setScheduledEventId(lastEventId + 1)
×
1710
                    .build();
×
1711
            HistoryEvent startedEvent =
1712
                HistoryEvent.newBuilder()
×
1713
                    .setEventId(lastEventId + 1)
×
1714
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1715
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1716
                    .build();
×
1717
            events.add(startedEvent);
×
1718
            task.setStartedEventId(lastEventId + 2);
×
1719
          }
1720
          // get it from previous started event id.
1721
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1722
          // Transfer the queries
1723
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1724
              data.consistentQueryRequests;
1725
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1726
              queries.entrySet()) {
1✔
1727
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1728
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1729
          }
1✔
1730
          // Transfer the messages
1731
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1732
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1733
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1734
            Message updateMessage =
1735
                Message.newBuilder()
1✔
1736
                    .setId(update.getKey() + "/request")
1✔
1737
                    .setProtocolInstanceId(update.getKey())
1✔
1738
                    .setEventId(data.scheduledEventId)
1✔
1739
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1740
                    .build();
1✔
1741
            task.addMessages(updateMessage);
1✔
1742
          }
1✔
1743
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1744
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1745
          }
1746
          if (!queryOnly) {
1✔
1747
            data.startedEventId = startedEventId;
1✔
1748
          }
1749
        });
1✔
1750
  }
1✔
1751

1752
  private static void startActivityTask(
1753
      RequestContext ctx,
1754
      ActivityTaskData data,
1755
      PollActivityTaskQueueRequest request,
1756
      long notUsed) {
1757
    ActivityTaskStartedEventAttributes.Builder a =
1758
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1759
            .setIdentity(request.getIdentity())
1✔
1760
            .setScheduledEventId(data.scheduledEventId);
1✔
1761
    a.setAttempt(data.getAttempt());
1✔
1762
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1763
    // history. But in the case of retry it happens only after an activity completion.
1764
    Timestamp timestamp = data.store.currentTime();
1✔
1765
    HistoryEvent event =
1766
        HistoryEvent.newBuilder()
1✔
1767
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1768
            .setEventTime(timestamp)
1✔
1769
            .setActivityTaskStartedEventAttributes(a)
1✔
1770
            .build();
1✔
1771
    long startedEventId;
1772
    startedEventId = NO_EVENT_ID;
1✔
1773
    ctx.onCommit(
1✔
1774
        (historySize) -> {
1775
          data.startedEventId = startedEventId;
1✔
1776
          data.startedEvent = event;
1✔
1777
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1778
          task.setTaskToken(
1✔
1779
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1780
                  .toBytes());
1✔
1781
          task.setStartedTime(timestamp);
1✔
1782
        });
1✔
1783
  }
1✔
1784

1785
  private static void completeWorkflowTask(
1786
      RequestContext ctx,
1787
      WorkflowTaskData data,
1788
      RespondWorkflowTaskCompletedRequest request,
1789
      long notUsed) {
1790
    WorkflowTaskCompletedEventAttributes.Builder a =
1791
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1792
            .setIdentity(request.getIdentity())
1✔
1793
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1794
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1795
            .setSdkMetadata(request.getSdkMetadata())
1✔
1796
            .setScheduledEventId(data.scheduledEventId);
1✔
1797
    HistoryEvent event =
1798
        HistoryEvent.newBuilder()
1✔
1799
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1800
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1801
            .build();
1✔
1802
    ctx.addEvent(event);
1✔
1803
    ctx.onCommit(
1✔
1804
        (historySize) -> {
1805
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1806
          data.clear();
1✔
1807
        });
1✔
1808
  }
1✔
1809

1810
  private static void completeQuery(
1811
      RequestContext ctx,
1812
      WorkflowTaskData data,
1813
      RespondWorkflowTaskCompletedRequest request,
1814
      long notUsed) {
1815
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1816
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1817
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1818
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1819
      if (query != null) {
×
1820
        WorkflowQueryResult value = resultEntry.getValue();
×
1821
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1822
        switch (value.getResultType()) {
×
1823
          case QUERY_RESULT_TYPE_ANSWERED:
1824
            QueryWorkflowResponse response =
1825
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1826
            result.complete(response);
×
1827
            break;
×
1828
          case QUERY_RESULT_TYPE_FAILED:
1829
            result.completeExceptionally(
×
1830
                StatusUtils.newException(
×
1831
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1832
                    QueryFailedFailure.getDefaultInstance(),
×
1833
                    QueryFailedFailure.getDescriptor()));
×
1834
            break;
×
1835
          default:
1836
            throw Status.INVALID_ARGUMENT
×
1837
                .withDescription("Invalid query result type: " + value.getResultType())
×
1838
                .asRuntimeException();
×
1839
        }
1840
      }
1841
    }
×
1842
    ctx.onCommit(
×
1843
        (historySize) -> {
1844
          data.clear();
×
1845
          ctx.unlockTimer("completeQuery");
×
1846
        });
×
1847
  }
×
1848

1849
  private static void failQueryWorkflowTask(
1850
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1851
    data.consistentQueryRequests
×
1852
        .entrySet()
×
1853
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1854
    if (!data.consistentQueryRequests.isEmpty()) {
×
1855
      ctx.setNeedWorkflowTask(true);
×
1856
    }
1857
    ctx.unlockTimer("failQueryWorkflowTask");
×
1858
  }
×
1859

1860
  private static void failWorkflowTask(
1861
      RequestContext ctx,
1862
      WorkflowTaskData data,
1863
      RespondWorkflowTaskFailedRequest request,
1864
      long notUsed) {
1865
    WorkflowTaskFailedEventAttributes.Builder a =
1866
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1867
            .setIdentity(request.getIdentity())
1✔
1868
            .setStartedEventId(data.startedEventId)
1✔
1869
            .setScheduledEventId(data.scheduledEventId)
1✔
1870
            .setCause(request.getCause());
1✔
1871
    if (request.hasFailure()) {
1✔
1872
      a.setFailure(request.getFailure());
1✔
1873
    }
1874
    HistoryEvent event =
1875
        HistoryEvent.newBuilder()
1✔
1876
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1877
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1878
            .build();
1✔
1879
    ctx.addEvent(event);
1✔
1880
    ctx.setNeedWorkflowTask(true);
1✔
1881
  }
1✔
1882

1883
  private static void timeoutWorkflowTask(
1884
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1885
    WorkflowTaskTimedOutEventAttributes.Builder a =
1886
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1887
            .setStartedEventId(data.startedEventId)
1✔
1888
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1889
            .setScheduledEventId(data.scheduledEventId);
1✔
1890
    HistoryEvent event =
1891
        HistoryEvent.newBuilder()
1✔
1892
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1893
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1894
            .build();
1✔
1895
    ctx.addEvent(event);
1✔
1896
    ctx.setNeedWorkflowTask(true);
1✔
1897
  }
1✔
1898

1899
  private static void needsWorkflowTask(
1900
      RequestContext requestContext,
1901
      WorkflowTaskData workflowTaskData,
1902
      Object notUsedRequest,
1903
      long notUsed) {
1904
    requestContext.setNeedWorkflowTask(true);
1✔
1905
  }
1✔
1906

1907
  private static void completeActivityTask(
1908
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1909
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1910
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1911
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1912
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1913
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1914
    } else {
1915
      throw new IllegalArgumentException("Unknown request: " + request);
×
1916
    }
1917
  }
1✔
1918

1919
  private static void completeActivityTaskByTaskToken(
1920
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1921
    ActivityTaskCompletedEventAttributes.Builder a =
1922
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1923
            .setIdentity(request.getIdentity())
1✔
1924
            .setScheduledEventId(data.scheduledEventId)
1✔
1925
            .setResult(request.getResult())
1✔
1926
            .setStartedEventId(data.startedEventId);
1✔
1927
    HistoryEvent event =
1928
        HistoryEvent.newBuilder()
1✔
1929
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1930
            .setActivityTaskCompletedEventAttributes(a)
1✔
1931
            .build();
1✔
1932
    ctx.addEvent(event);
1✔
1933
  }
1✔
1934

1935
  private static void completeActivityTaskById(
1936
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1937
    ActivityTaskCompletedEventAttributes.Builder a =
1938
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1939
            .setIdentity(request.getIdentity())
×
1940
            .setScheduledEventId(data.scheduledEventId)
×
1941
            .setResult(request.getResult())
×
1942
            .setStartedEventId(data.startedEventId);
×
1943
    HistoryEvent event =
1944
        HistoryEvent.newBuilder()
×
1945
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1946
            .setActivityTaskCompletedEventAttributes(a)
×
1947
            .build();
×
1948
    ctx.addEvent(event);
×
1949
  }
×
1950

1951
  private static State failActivityTask(
1952
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1953
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1954
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1955
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1956
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1957
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1958
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1959
    } else {
1960
      throw new IllegalArgumentException("Unknown request: " + request);
×
1961
    }
1962
  }
1963

1964
  private static State failActivityTaskByRequestType(
1965
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1966
    if (!failure.hasApplicationFailureInfo()) {
1✔
1967
      throw new IllegalArgumentException(
×
1968
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1969
    }
1970
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1971
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1972
      return INITIATED;
1✔
1973
    }
1974
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1975
    ActivityTaskFailedEventAttributes.Builder attributes =
1976
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1977
            .setIdentity(identity)
1✔
1978
            .setScheduledEventId(data.scheduledEventId)
1✔
1979
            .setFailure(failure)
1✔
1980
            .setRetryState(retryState)
1✔
1981
            .setStartedEventId(data.startedEventId);
1✔
1982
    HistoryEvent event =
1983
        HistoryEvent.newBuilder()
1✔
1984
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1985
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1986
            .build();
1✔
1987
    ctx.addEvent(event);
1✔
1988
    return FAILED;
1✔
1989
  }
1990

1991
  private static State timeoutActivityTask(
1992
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1993
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1994

1995
    // chaining with the previous run failure if we are preparing the final failure
1996
    Failure failure =
1✔
1997
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1998

1999
    RetryState retryState;
2000
    switch (timeoutType) {
1✔
2001
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
2002
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
2003
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
2004
        // set a larger ScheduleToStart timeout.
2005
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
2006
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
2007
        break;
1✔
2008
      case TIMEOUT_TYPE_START_TO_CLOSE:
2009
      case TIMEOUT_TYPE_HEARTBEAT:
2010
        // not chaining with the previous run failure if we are preparing the failure to be stored
2011
        // for the next iteration
2012
        Optional<Failure> lastFailure =
1✔
2013
            Optional.of(
1✔
2014
                newTimeoutFailure(
1✔
2015
                    timeoutType,
2016
                    // we move heartbeatDetails to the new top level (this cause is used for
2017
                    // scheduleToClose only)
2018
                    Optional.empty(),
1✔
2019
                    // prune to don't have too deep nesting of failures
2020
                    Optional.empty()));
1✔
2021

2022
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
2023
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2024
          return INITIATED;
1✔
2025
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
2026
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
2027
          // attemptActivityRetry();
2028
          // start to close timeout would return as "max attempts reached".
2029

2030
          Preconditions.checkState(
1✔
2031
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
2032
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
2033
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
2034
              timeoutType);
2035

2036
          // heartbeat is preserved as the cause for the scheduleToClose timeout
2037
          // But we effectively omit startToClose timeout with scheduleToClose timeout
2038
          Optional<Failure> cause =
2039
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
2040

2041
          failure =
1✔
2042
              newTimeoutFailure(
1✔
2043
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
2044
                  Optional.ofNullable(data.heartbeatDetails),
1✔
2045
                  cause);
2046
        }
1✔
2047
        break;
2048
      default:
2049
        throw new IllegalStateException(
×
2050
            "Not implemented behavior for timeout type: " + timeoutType);
2051
    }
2052

2053
    ActivityTaskTimedOutEventAttributes.Builder a =
2054
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
2055
            .setScheduledEventId(data.scheduledEventId)
1✔
2056
            .setRetryState(retryState)
1✔
2057
            .setStartedEventId(data.startedEventId)
1✔
2058
            .setFailure(failure);
1✔
2059
    HistoryEvent event =
2060
        HistoryEvent.newBuilder()
1✔
2061
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
2062
            .setActivityTaskTimedOutEventAttributes(a)
1✔
2063
            .build();
1✔
2064
    ctx.addEvent(event);
1✔
2065
    return TIMED_OUT;
1✔
2066
  }
2067

2068
  private static Failure newTimeoutFailure(
2069
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
2070
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
2071
    if (lastHeartbeatDetails.isPresent()) {
1✔
2072
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
2073
    }
2074
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
2075
    if (cause.isPresent()) {
1✔
2076
      result.setCause(cause.get());
1✔
2077
    }
2078
    return result.build();
1✔
2079
  }
2080

2081
  private static RetryState attemptActivityRetry(
2082
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
2083
    if (data.retryState == null) {
1✔
2084
      throw new IllegalStateException("RetryPolicy is always present");
×
2085
    }
2086
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
2087
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
2088

2089
    if (info.isPresent()) {
1✔
2090
      if (info.get().getNonRetryable()) {
1✔
2091
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
2092
      }
2093
      if (info.get().hasNextRetryDelay()) {
1✔
2094
        nextRetryDelay =
1✔
2095
            Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
1✔
2096
      }
2097
    }
2098

2099
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
2100
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
2101
        data.retryState.getBackoffIntervalInSeconds(
1✔
2102
            info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay);
1✔
2103
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2104
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
2105
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
2106
      if (data.heartbeatDetails != null) {
1✔
2107
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
2108
      }
2109
      ctx.onCommit(
1✔
2110
          (historySize) -> {
2111
            data.retryState = nextAttempt;
1✔
2112
            task.setAttempt(nextAttempt.getAttempt());
1✔
2113
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
2114
          });
1✔
2115
    } else {
1✔
2116
      data.nextBackoffInterval = Durations.ZERO;
1✔
2117
    }
2118
    return backoffInterval.getRetryState();
1✔
2119
  }
2120

2121
  private static void reportActivityTaskCancellation(
2122
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
2123
    Payloads details = null;
1✔
2124
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
2125
      {
2126
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
2127

2128
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
2129
      }
1✔
2130
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
2131
      {
2132
        RespondActivityTaskCanceledByIdRequest cr =
×
2133
            (RespondActivityTaskCanceledByIdRequest) request;
2134
        details = cr.hasDetails() ? cr.getDetails() : null;
×
2135
      }
×
2136
    } else if (request != null) {
1✔
2137
      throw Status.INTERNAL
×
2138
          .withDescription("Unexpected request type: " + request)
×
2139
          .asRuntimeException();
×
2140
    }
2141
    ActivityTaskCanceledEventAttributes.Builder a =
2142
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
2143
            .setScheduledEventId(data.scheduledEventId)
1✔
2144
            .setStartedEventId(data.startedEventId);
1✔
2145
    if (details != null) {
1✔
2146
      a.setDetails(details);
×
2147
    }
2148
    HistoryEvent event =
2149
        HistoryEvent.newBuilder()
1✔
2150
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
2151
            .setActivityTaskCanceledEventAttributes(a)
1✔
2152
            .build();
1✔
2153
    ctx.addEvent(event);
1✔
2154
  }
1✔
2155

2156
  private static void heartbeatActivityTask(
2157
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
2158
    data.heartbeatDetails = details;
1✔
2159
  }
1✔
2160

2161
  private static void acceptUpdate(
2162
      RequestContext ctx,
2163
      UpdateWorkflowExecutionData data,
2164
      Message msg,
2165
      long workflowTaskCompletedEventId) {
2166
    try {
2167
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
2168

2169
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
2170
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
2171
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
2172
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
2173
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
2174
              .setAcceptedRequest(data.initialRequest)
1✔
2175
              .build();
1✔
2176

2177
      HistoryEvent event =
2178
          HistoryEvent.newBuilder()
1✔
2179
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
2180
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
2181
              .build();
1✔
2182
      // If the workflow is finished we can't write more events
2183
      // to history so if the message was processed after the workflow
2184
      // was closed there is nothing we can do.
2185
      // The real server also has this same problem
2186
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2187
        ctx.addEvent(event);
1✔
2188
      }
2189
      ctx.onCommit(
1✔
2190
          (int historySize) -> {
2191
            data.accepted.complete(true);
1✔
2192
          });
1✔
2193
    } catch (InvalidProtocolBufferException e) {
×
2194
      throw new RuntimeException(e);
×
2195
    }
1✔
2196
  }
1✔
2197

2198
  private static void completeUpdate(
2199
      RequestContext ctx,
2200
      UpdateWorkflowExecutionData data,
2201
      Message msg,
2202
      long workflowTaskCompletedEventId) {
2203
    try {
2204
      Response response = msg.getBody().unpack(Response.class);
1✔
2205

2206
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
2207
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
2208
              .setMeta(response.getMeta())
1✔
2209
              .setOutcome(response.getOutcome())
1✔
2210
              .build();
1✔
2211

2212
      HistoryEvent event =
2213
          HistoryEvent.newBuilder()
1✔
2214
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
2215
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
2216
              .build();
1✔
2217
      // If the workflow is finished we can't write more events
2218
      // to history so if the message was processed after the workflow
2219
      // was closed there is nothing we can do.
2220
      // The real server also has this same problem
2221
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2222
        ctx.addEvent(event);
1✔
2223
      }
2224
      ctx.onCommit(
1✔
2225
          (int historySize) -> {
2226
            data.outcome.complete(response.getOutcome());
1✔
2227
          });
1✔
2228
    } catch (InvalidProtocolBufferException e) {
×
2229
      throw new RuntimeException(e);
×
2230
    }
1✔
2231
  }
1✔
2232

2233
  private static void startTimer(
2234
      RequestContext ctx,
2235
      TimerData data,
2236
      StartTimerCommandAttributes d,
2237
      long workflowTaskCompletedEventId) {
2238
    TimerStartedEventAttributes.Builder a =
2239
        TimerStartedEventAttributes.newBuilder()
1✔
2240
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2241
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
2242
            .setTimerId(d.getTimerId());
1✔
2243
    HistoryEvent event =
2244
        HistoryEvent.newBuilder()
1✔
2245
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
2246
            .setTimerStartedEventAttributes(a)
1✔
2247
            .build();
1✔
2248
    long startedEventId = ctx.addEvent(event);
1✔
2249
    ctx.onCommit(
1✔
2250
        (historySize) -> {
2251
          data.startedEvent = a.build();
1✔
2252
          data.startedEventId = startedEventId;
1✔
2253
        });
1✔
2254
  }
1✔
2255

2256
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
2257
    TimerFiredEventAttributes.Builder a =
2258
        TimerFiredEventAttributes.newBuilder()
1✔
2259
            .setTimerId(data.startedEvent.getTimerId())
1✔
2260
            .setStartedEventId(data.startedEventId);
1✔
2261
    HistoryEvent event =
2262
        HistoryEvent.newBuilder()
1✔
2263
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
2264
            .setTimerFiredEventAttributes(a)
1✔
2265
            .build();
1✔
2266
    ctx.addEvent(event);
1✔
2267
  }
1✔
2268

2269
  private static void cancelTimer(
2270
      RequestContext ctx,
2271
      TimerData data,
2272
      CancelTimerCommandAttributes d,
2273
      long workflowTaskCompletedEventId) {
2274
    TimerCanceledEventAttributes.Builder a =
2275
        TimerCanceledEventAttributes.newBuilder()
1✔
2276
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2277
            .setTimerId(d.getTimerId())
1✔
2278
            .setStartedEventId(data.startedEventId);
1✔
2279
    HistoryEvent event =
2280
        HistoryEvent.newBuilder()
1✔
2281
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
2282
            .setTimerCanceledEventAttributes(a)
1✔
2283
            .build();
1✔
2284
    ctx.addEvent(event);
1✔
2285
  }
1✔
2286

2287
  private static void initiateExternalSignal(
2288
      RequestContext ctx,
2289
      SignalExternalData data,
2290
      SignalExternalWorkflowExecutionCommandAttributes d,
2291
      long workflowTaskCompletedEventId) {
2292
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2293
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2294
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2295
            .setControl(d.getControl())
1✔
2296
            .setInput(d.getInput())
1✔
2297
            .setNamespace(d.getNamespace())
1✔
2298
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2299
            .setSignalName(d.getSignalName())
1✔
2300
            .setWorkflowExecution(d.getExecution());
1✔
2301

2302
    HistoryEvent event =
2303
        HistoryEvent.newBuilder()
1✔
2304
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2305
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2306
            .build();
1✔
2307
    long initiatedEventId = ctx.addEvent(event);
1✔
2308
    ctx.onCommit(
1✔
2309
        (historySize) -> {
2310
          data.initiatedEventId = initiatedEventId;
1✔
2311
          data.initiatedEvent = a.build();
1✔
2312
        });
1✔
2313
  }
1✔
2314

2315
  private static void failExternalSignal(
2316
      RequestContext ctx,
2317
      SignalExternalData data,
2318
      SignalExternalWorkflowExecutionFailedCause cause,
2319
      long notUsed) {
2320
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2321
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
2322
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
2323
            .setInitiatedEventId(data.initiatedEventId)
1✔
2324
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
2325
            .setControl(initiatedEvent.getControl())
1✔
2326
            .setCause(cause)
1✔
2327
            .setNamespace(initiatedEvent.getNamespace());
1✔
2328
    HistoryEvent event =
2329
        HistoryEvent.newBuilder()
1✔
2330
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
2331
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
2332
            .build();
1✔
2333
    ctx.addEvent(event);
1✔
2334
  }
1✔
2335

2336
  private static void completeExternalSignal(
2337
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
2338
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2339
    WorkflowExecution signaledExecution =
1✔
2340
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
2341
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
2342
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
2343
            .setInitiatedEventId(data.initiatedEventId)
1✔
2344
            .setWorkflowExecution(signaledExecution)
1✔
2345
            .setControl(initiatedEvent.getControl())
1✔
2346
            .setNamespace(initiatedEvent.getNamespace());
1✔
2347
    HistoryEvent event =
2348
        HistoryEvent.newBuilder()
1✔
2349
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
2350
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
2351
            .build();
1✔
2352
    ctx.addEvent(event);
1✔
2353
  }
1✔
2354

2355
  private static void initiateExternalCancellation(
2356
      RequestContext ctx,
2357
      CancelExternalData data,
2358
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
2359
      long workflowTaskCompletedEventId) {
2360
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2361
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2362
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2363
            .setControl(d.getControl())
1✔
2364
            .setNamespace(d.getNamespace())
1✔
2365
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2366
            .setWorkflowExecution(
1✔
2367
                WorkflowExecution.newBuilder()
1✔
2368
                    .setWorkflowId(d.getWorkflowId())
1✔
2369
                    .setRunId(d.getRunId())
1✔
2370
                    .build());
1✔
2371

2372
    HistoryEvent event =
2373
        HistoryEvent.newBuilder()
1✔
2374
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2375
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2376
            .build();
1✔
2377
    long initiatedEventId = ctx.addEvent(event);
1✔
2378
    ctx.onCommit(
1✔
2379
        (historySize) -> {
2380
          data.initiatedEventId = initiatedEventId;
1✔
2381
          data.initiatedEvent = a.build();
1✔
2382
        });
1✔
2383
  }
1✔
2384

2385
  private static void reportExternalCancellationRequested(
2386
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2387
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2388
        data.initiatedEvent;
2389
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2390
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2391
            .setInitiatedEventId(data.initiatedEventId)
1✔
2392
            .setWorkflowExecution(
1✔
2393
                WorkflowExecution.newBuilder()
1✔
2394
                    .setRunId(runId)
1✔
2395
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2396
                    .build())
1✔
2397
            .setNamespace(initiatedEvent.getNamespace());
1✔
2398
    HistoryEvent event =
2399
        HistoryEvent.newBuilder()
1✔
2400
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2401
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2402
            .build();
1✔
2403
    ctx.addEvent(event);
1✔
2404
  }
1✔
2405

2406
  private static void failExternalCancellation(
2407
      RequestContext ctx,
2408
      CancelExternalData data,
2409
      CancelExternalWorkflowExecutionFailedCause cause,
2410
      long notUsed) {
2411
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2412
        data.initiatedEvent;
2413
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2414
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2415
            .setInitiatedEventId(data.initiatedEventId)
×
2416
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2417
            .setControl(initiatedEvent.getControl())
×
2418
            .setCause(cause)
×
2419
            .setNamespace(initiatedEvent.getNamespace());
×
2420
    HistoryEvent event =
2421
        HistoryEvent.newBuilder()
×
2422
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2423
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2424
            .build();
×
2425
    ctx.addEvent(event);
×
2426
  }
×
2427

2428
  // Mimics the default activity retry policy of a standard Temporal server.
2429
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2430
    Duration initialInterval =
2431
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2432
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2433
            : originalPolicy.getInitialInterval();
1✔
2434

2435
    return RetryPolicy.newBuilder()
1✔
2436
        .setInitialInterval(initialInterval)
1✔
2437
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2438
        .setMaximumInterval(
1✔
2439
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2440
                ? Durations.fromMillis(
1✔
2441
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2442
                        * Durations.toMillis(initialInterval))
1✔
2443
                : originalPolicy.getMaximumInterval())
1✔
2444
        .setBackoffCoefficient(
1✔
2445
            originalPolicy.getBackoffCoefficient() == 0
1✔
2446
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2447
                : originalPolicy.getBackoffCoefficient())
1✔
2448
        .setMaximumAttempts(
1✔
2449
            originalPolicy.getMaximumAttempts() == 0
1✔
2450
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2451
                : originalPolicy.getMaximumAttempts())
1✔
2452
        .build();
1✔
2453
  }
2454

2455
  static RetryPolicy defaultNexusRetryPolicy() {
2456
    return RetryPolicy.newBuilder()
1✔
2457
        .addAllNonRetryableErrorTypes(
1✔
2458
            Arrays.asList(
1✔
2459
                "BAD_REQUEST", "INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
2460
        .setInitialInterval(Durations.fromSeconds(1))
1✔
2461
        .setMaximumInterval(Durations.fromHours(1))
1✔
2462
        .setBackoffCoefficient(2.0)
1✔
2463
        .build();
1✔
2464
  }
2465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc