• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #330

10 Oct 2024 04:35PM CUT coverage: 78.113% (-0.1%) from 78.238%
#330

push

github

web-flow
Test server support for bidi links (#2258)

* Test server support for bidi links

* typo

* license

* feedback

* link validation

* describe fields

* link validation

103 of 163 new or added lines in 3 files covered. (63.19%)

11 existing lines in 4 files now uncovered.

21332 of 27309 relevant lines covered (78.11%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.testservice.LinkConverter.*;
24
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
25
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
26
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
27
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
28
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
29
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
30
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
31
import static io.temporal.internal.testservice.StateMachines.Action.START;
32
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
33
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
35
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
37
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
38
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
39
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
40
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
41
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
42
import static io.temporal.internal.testservice.StateMachines.State.NONE;
43
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
44
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
45
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
46

47
import com.google.common.base.Preconditions;
48
import com.google.common.base.Strings;
49
import com.google.protobuf.*;
50
import com.google.protobuf.util.Durations;
51
import com.google.protobuf.util.Timestamps;
52
import io.grpc.Status;
53
import io.grpc.StatusRuntimeException;
54
import io.temporal.api.command.v1.*;
55
import io.temporal.api.common.v1.*;
56
import io.temporal.api.enums.v1.*;
57
import io.temporal.api.errordetails.v1.QueryFailedFailure;
58
import io.temporal.api.failure.v1.ApplicationFailureInfo;
59
import io.temporal.api.failure.v1.Failure;
60
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
61
import io.temporal.api.failure.v1.TimeoutFailureInfo;
62
import io.temporal.api.history.v1.*;
63
import io.temporal.api.nexus.v1.*;
64
import io.temporal.api.nexus.v1.Link;
65
import io.temporal.api.protocol.v1.Message;
66
import io.temporal.api.query.v1.WorkflowQueryResult;
67
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
68
import io.temporal.api.taskqueue.v1.TaskQueue;
69
import io.temporal.api.update.v1.*;
70
import io.temporal.api.update.v1.Request;
71
import io.temporal.api.update.v1.Response;
72
import io.temporal.api.workflowservice.v1.*;
73
import io.temporal.internal.common.ProtobufTimeUtils;
74
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
75
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
76
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
77
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
78
import io.temporal.serviceclient.StatusUtils;
79
import io.temporal.workflow.Functions;
80
import java.util.*;
81
import java.util.concurrent.CompletableFuture;
82
import java.util.concurrent.ForkJoinPool;
83
import javax.annotation.Nonnull;
84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
class StateMachines {
×
88

89
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
90

91
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
92
      10L * 365 * 24 * 3600 * 1000;
93
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
94
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
95
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
96
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
97
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
98
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
99
  static final int NO_EVENT_ID = -1;
100

101
  enum State {
1✔
102
    NONE,
1✔
103
    INITIATED,
1✔
104
    STARTED,
1✔
105
    FAILED,
1✔
106
    TIMED_OUT,
1✔
107
    CANCELLATION_REQUESTED,
1✔
108
    CANCELED,
1✔
109
    COMPLETED,
1✔
110
    CONTINUED_AS_NEW,
1✔
111
    TERMINATED,
1✔
112
  }
113

114
  enum Action {
1✔
115
    INITIATE,
1✔
116
    START,
1✔
117
    FAIL,
1✔
118
    TIME_OUT,
1✔
119
    REQUEST_CANCELLATION,
1✔
120
    CANCEL,
1✔
121
    TERMINATE,
1✔
122
    UPDATE,
1✔
123
    COMPLETE,
1✔
124
    CONTINUE_AS_NEW,
1✔
125
    QUERY,
1✔
126
    UPDATE_WORKFLOW_EXECUTION,
1✔
127
  }
128

129
  static final class WorkflowData {
130
    Optional<TestServiceRetryState> retryState;
131
    Duration backoffStartInterval;
132
    String cronSchedule;
133
    Payloads lastCompletionResult;
134
    Optional<Failure> lastFailure;
135
    /**
136
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
137
     */
138
    final @Nonnull String firstExecutionRunId;
139
    /**
140
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
141
     */
142
    final @Nonnull String originalExecutionRunId;
143

144
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
145
    Optional<String> continuedExecutionRunId;
146

147
    Functions.Proc runTimerCancellationHandle;
148

149
    WorkflowData(
150
        Optional<TestServiceRetryState> retryState,
151
        Duration backoffStartInterval,
152
        String cronSchedule,
153
        Payloads lastCompletionResult,
154
        Optional<Failure> lastFailure,
155
        @Nonnull String firstExecutionRunId,
156
        @Nonnull String originalExecutionRunId,
157
        Optional<String> continuedExecutionRunId) {
1✔
158
      this.retryState = retryState;
1✔
159
      this.backoffStartInterval = backoffStartInterval;
1✔
160
      this.cronSchedule = cronSchedule;
1✔
161
      this.lastCompletionResult = lastCompletionResult;
1✔
162
      this.firstExecutionRunId =
1✔
163
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
164
      this.originalExecutionRunId =
1✔
165
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
166
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
167
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
168
    }
1✔
169

170
    @Override
171
    public String toString() {
172
      return "WorkflowData{"
×
173
          + "retryState="
174
          + retryState
175
          + ", backoffStartInterval="
176
          + backoffStartInterval
177
          + ", cronSchedule='"
178
          + cronSchedule
179
          + '\''
180
          + ", lastCompletionResult="
181
          + lastCompletionResult
182
          + ", firstExecutionRunId='"
183
          + firstExecutionRunId
184
          + '\''
185
          + ", originalExecutionRunId='"
186
          + originalExecutionRunId
187
          + '\''
188
          + ", continuedExecutionRunId="
189
          + continuedExecutionRunId
190
          + '}';
191
    }
192
  }
193

194
  static final class WorkflowTaskData {
195

196
    final TestWorkflowStore store;
197

198
    boolean workflowCompleted;
199

200
    /** id of the last started event which completed successfully */
201
    long lastSuccessfulStartedEventId;
202

203
    final StartWorkflowExecutionRequest startRequest;
204

205
    long startedEventId = NO_EVENT_ID;
1✔
206

207
    PollWorkflowTaskQueueResponse.Builder workflowTask;
208

209
    /**
210
     * Events that are added during execution of a workflow task. They have to be buffered to be
211
     * added after the events generated by a workflow task. Without this the determinism will be
212
     * broken on replay.
213
     */
214
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
215

216
    /**
217
     * Update requests that are added during execution of a workflow task. They have to be buffered
218
     * to be added to the next workflow task.
219
     */
220
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
221

222
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
223

224
    long scheduledEventId = NO_EVENT_ID;
1✔
225

226
    int attempt = 0;
1✔
227

228
    /** Query requests received during workflow task processing (after start) */
229
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
230

231
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
232
        new HashMap<>();
233

234
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
235
      this.store = store;
1✔
236
      this.startRequest = startRequest;
1✔
237
    }
1✔
238

239
    void clear() {
240
      startedEventId = NO_EVENT_ID;
1✔
241
      workflowTask = null;
1✔
242
      scheduledEventId = NO_EVENT_ID;
1✔
243
      attempt = 0;
1✔
244
    }
1✔
245

246
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
247
      return Optional.ofNullable(
1✔
248
          updateRequest.getOrDefault(
1✔
249
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
250
    }
251

252
    @Override
253
    public String toString() {
254
      return "WorkflowTaskData{"
×
255
          + "store="
256
          + store
257
          + ", workflowCompleted="
258
          + workflowCompleted
259
          + ", lastSuccessfulStartedEventId="
260
          + lastSuccessfulStartedEventId
261
          + ", startRequest="
262
          + startRequest
263
          + ", startedEventId="
264
          + startedEventId
265
          + ", workflowTask="
266
          + workflowTask
267
          + ", bufferedEvents="
268
          + bufferedEvents
269
          + ", scheduledEventId="
270
          + scheduledEventId
271
          + ", attempt="
272
          + attempt
273
          + ", queryBuffer="
274
          + queryBuffer
275
          + ", consistentQueryRequests="
276
          + consistentQueryRequests
277
          + ", updateRequest="
278
          + updateRequest
279
          + ", updateRequestBuffer="
280
          + updateRequestBuffer
281
          + '}';
282
    }
283
  }
284

285
  static final class ActivityTaskData {
286

287
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
288
    ActivityTaskScheduledEventAttributes scheduledEvent;
289
    ActivityTask activityTask;
290

291
    final TestWorkflowStore store;
292

293
    long scheduledEventId = NO_EVENT_ID;
1✔
294
    long startedEventId = NO_EVENT_ID;
1✔
295
    public HistoryEvent startedEvent;
296
    Payloads heartbeatDetails;
297
    long lastHeartbeatTime;
298
    TestServiceRetryState retryState;
299
    Duration nextBackoffInterval;
300
    String identity;
301

302
    ActivityTaskData(
303
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
304
      this.store = store;
1✔
305
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
306
    }
1✔
307

308
    @Override
309
    public String toString() {
310
      return "ActivityTaskData{"
×
311
          + "startWorkflowExecutionRequest="
312
          + startWorkflowExecutionRequest
313
          + ", scheduledEvent="
314
          + scheduledEvent
315
          + ", activityTask="
316
          + activityTask
317
          + ", store="
318
          + store
319
          + ", scheduledEventId="
320
          + scheduledEventId
321
          + ", startedEventId="
322
          + startedEventId
323
          + ", startedEvent="
324
          + startedEvent
325
          + ", heartbeatDetails="
326
          + heartbeatDetails
327
          + ", lastHeartbeatTime="
328
          + lastHeartbeatTime
329
          + ", retryState="
330
          + retryState
331
          + ", nextBackoffInterval="
332
          + nextBackoffInterval
333
          + '}';
334
    }
335

336
    public int getAttempt() {
337
      return retryState != null ? retryState.getAttempt() : 1;
1✔
338
    }
339
  }
340

341
  static final class NexusOperationData {
342
    // Timeout for an individual Start or Cancel Operation request.
343
    final Duration requestTimeout = Durations.fromSeconds(10);
1✔
344

345
    String operationId = "";
1✔
346
    Endpoint endpoint;
347
    NexusOperationScheduledEventAttributes scheduledEvent;
348
    TestWorkflowStore.NexusTask nexusTask;
349
    RetryPolicy retryPolicy = defaultNexusRetryPolicy();
1✔
350

351
    long scheduledEventId = NO_EVENT_ID;
1✔
352
    Timestamp cancelRequestedTime;
353

354
    TestServiceRetryState retryState;
355
    boolean isBackingOff = false;
1✔
356
    Duration nextBackoffInterval;
357
    Timestamp lastAttemptCompleteTime;
358
    Timestamp nextAttemptScheduleTime;
359
    String identity;
360

361
    public NexusOperationData(Endpoint endpoint) {
1✔
362
      this.endpoint = endpoint;
1✔
363
    }
1✔
364

365
    public int getAttempt() {
366
      return retryState != null ? retryState.getAttempt() : 1;
1✔
367
    }
368

369
    @Override
370
    public String toString() {
371
      return "NexusOperationData{"
×
372
          + ", nexusEndpoint="
373
          + endpoint
374
          + ", scheduledEvent="
375
          + scheduledEvent
376
          + ", nexusTask="
377
          + nexusTask
378
          + ", scheduledEventId="
379
          + scheduledEventId
380
          + ", retryState="
381
          + retryState
382
          + ", nextBackoffInterval="
383
          + nextBackoffInterval
384
          + '}';
385
    }
386
  }
387

388
  static final class SignalExternalData {
1✔
389
    long initiatedEventId = NO_EVENT_ID;
1✔
390
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
391

392
    @Override
393
    public String toString() {
394
      return "SignalExternalData{"
×
395
          + "initiatedEventId="
396
          + initiatedEventId
397
          + ", initiatedEvent="
398
          + initiatedEvent
399
          + '}';
400
    }
401
  }
402

403
  static final class CancelExternalData {
1✔
404
    long initiatedEventId = NO_EVENT_ID;
1✔
405
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
406

407
    @Override
408
    public String toString() {
409
      return "CancelExternalData{"
×
410
          + "initiatedEventId="
411
          + initiatedEventId
412
          + ", initiatedEvent="
413
          + initiatedEvent
414
          + '}';
415
    }
416
  }
417

418
  static final class ChildWorkflowData {
419

420
    final TestWorkflowService service;
421
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
422
    long initiatedEventId;
423
    long startedEventId;
424
    WorkflowExecution execution;
425

426
    public ChildWorkflowData(TestWorkflowService service) {
1✔
427
      this.service = service;
1✔
428
    }
1✔
429

430
    @Override
431
    public String toString() {
432
      return "ChildWorkflowData{"
×
433
          + "service="
434
          + service
435
          + ", initiatedEvent="
436
          + initiatedEvent
437
          + ", initiatedEventId="
438
          + initiatedEventId
439
          + ", startedEventId="
440
          + startedEventId
441
          + ", execution="
442
          + execution
443
          + '}';
444
    }
445
  }
446

447
  static final class TimerData {
1✔
448
    TimerStartedEventAttributes startedEvent;
449
    public long startedEventId;
450

451
    @Override
452
    public String toString() {
453
      return "TimerData{"
×
454
          + "startedEvent="
455
          + startedEvent
456
          + ", startedEventId="
457
          + startedEventId
458
          + '}';
459
    }
460
  }
461

462
  /** Represents an accepted update workflow execution request */
463
  static final class UpdateWorkflowExecutionData {
464
    final String id;
465
    final CompletableFuture<Boolean> accepted;
466
    final CompletableFuture<Outcome> outcome;
467
    final Request initialRequest;
468

469
    public UpdateWorkflowExecutionData(
470
        String id,
471
        Request initialRequest,
472
        CompletableFuture<Boolean> accepted,
473
        CompletableFuture<Outcome> outcome) {
1✔
474
      this.id = id;
1✔
475
      this.initialRequest = initialRequest;
1✔
476
      this.accepted = accepted;
1✔
477
      this.outcome = outcome;
1✔
478
    }
1✔
479

480
    @Override
481
    public String toString() {
482
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
483
    }
484
  }
485

486
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
487
    return new StateMachine<>(data)
1✔
488
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
489
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
490
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
491
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
492
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
493
        .add(
1✔
494
            STARTED,
495
            REQUEST_CANCELLATION,
496
            CANCELLATION_REQUESTED,
497
            StateMachines::requestWorkflowCancellation)
498
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
499
        .add(
1✔
500
            CANCELLATION_REQUESTED,
501
            REQUEST_CANCELLATION,
502
            CANCELLATION_REQUESTED,
503
            StateMachines::noop)
504
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
505
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
506
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
507
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
508
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
509
  }
510

511
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
512
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
513
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
514
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
515
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
516
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
517
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
518
        // StateMachines::queryWhileScheduled)
519
        //        .add(
520
        //            INITIATED_QUERY_ONLY,
521
        //            INITIATE,
522
        //            INITIATED,
523
        //            StateMachines::convertQueryWorkflowTaskToReal)
524
        //        .add(
525
        //            INITIATED_QUERY_ONLY,
526
        //            START,
527
        //            STARTED_QUERY_ONLY,
528
        //            StateMachines::startQueryOnlyWorkflowTask)
529
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
530
        // StateMachines::needsWorkflowTask)
531
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
532
        // StateMachines::needsWorkflowTaskDueToQuery)
533
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
534
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
535
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
536
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
537
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
538
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
539
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
540
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
541
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
542
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
543
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
544
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
545
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
546
  }
547

548
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
549
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
550
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
551
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
552
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
553
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
554
        .add(
1✔
555
            INITIATED,
556
            REQUEST_CANCELLATION,
557
            CANCELLATION_REQUESTED,
558
            StateMachines::requestActivityCancellation)
559
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
560
        // Transitions to initiated in case of a retry
561
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
562
        // Transitions to initiated in case of a retry
563
        .add(
1✔
564
            STARTED,
565
            TIME_OUT,
566
            new State[] {TIMED_OUT, INITIATED},
567
            StateMachines::timeoutActivityTask)
568
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
569
        .add(
1✔
570
            STARTED,
571
            REQUEST_CANCELLATION,
572
            CANCELLATION_REQUESTED,
573
            StateMachines::requestActivityCancellation)
574
        .add(
1✔
575
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
576
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
577
        .add(
1✔
578
            CANCELLATION_REQUESTED,
579
            UPDATE,
580
            CANCELLATION_REQUESTED,
581
            StateMachines::heartbeatActivityTask)
582
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
583
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
584
  }
585

586
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
587
      TestWorkflowService service) {
588
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
589
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
590
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
591
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
592
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
593
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
594
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
595
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
596
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
597
  }
598

599
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
600
      String updateId,
601
      Request initialRequest,
602
      CompletableFuture<Boolean> accepted,
603
      CompletableFuture<Outcome> outcome) {
604
    return new StateMachine<>(
1✔
605
            new UpdateWorkflowExecutionData(updateId, initialRequest, accepted, outcome))
606
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
607
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
608
  }
609

610
  public static StateMachine<NexusOperationData> newNexusOperation(Endpoint endpoint) {
611
    return new StateMachine<>(new NexusOperationData(endpoint))
1✔
612
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleNexusOperation)
1✔
613
        .add(INITIATED, START, STARTED, StateMachines::startNexusOperation)
1✔
614
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
615
        // TODO: properly support cancel before start
616
        // .add(
617
        //     INITIATED,
618
        //     REQUEST_CANCELLATION,
619
        //     INITIATED,
620
        //     StateMachines::requestCancelNexusOperation)
621
        .add(INITIATED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation)
1✔
622
        // Transitions directly from INITIATED to COMPLETE for sync completions
623
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
624
        // Transitions to INITIATED in case of a retry
625
        .add(INITIATED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failNexusOperation)
1✔
626
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
627
        // Transitions back to STARTED in case of a retry
628
        .add(STARTED, FAIL, new State[] {FAILED, STARTED}, StateMachines::failNexusOperation)
1✔
629
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
630
        .add(STARTED, REQUEST_CANCELLATION, STARTED, StateMachines::requestCancelNexusOperation)
1✔
631
        .add(STARTED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation);
1✔
632
  }
633

634
  public static StateMachine<TimerData> newTimerStateMachine() {
635
    return new StateMachine<>(new TimerData())
1✔
636
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
637
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
638
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
639
  }
640

641
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
642
    return new StateMachine<>(new SignalExternalData())
1✔
643
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
644
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
645
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
646
  }
647

648
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
649
    return new StateMachine<>(new CancelExternalData())
1✔
650
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
651
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
652
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
653
  }
654

655
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
656

657
  private static void scheduleNexusOperation(
658
      RequestContext ctx,
659
      NexusOperationData data,
660
      ScheduleNexusOperationCommandAttributes attr,
661
      long workflowTaskCompletedId) {
662
    Duration expirationInterval = attr.getScheduleToCloseTimeout();
1✔
663
    Timestamp expirationTime =
664
        (attr.hasScheduleToCloseTimeout())
1✔
665
            ? Timestamps.add(ctx.currentTime(), expirationInterval)
1✔
666
            : Timestamp.getDefaultInstance();
1✔
667
    TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime);
1✔
668

669
    NexusOperationScheduledEventAttributes.Builder a =
670
        NexusOperationScheduledEventAttributes.newBuilder()
1✔
671
            .setEndpoint(attr.getEndpoint())
1✔
672
            .setEndpointId(data.endpoint.getId())
1✔
673
            .setService(attr.getService())
1✔
674
            .setOperation(attr.getOperation())
1✔
675
            .setInput(attr.getInput())
1✔
676
            .setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
1✔
677
            .putAllNexusHeader(attr.getNexusHeaderMap())
1✔
678
            .setRequestId(UUID.randomUUID().toString())
1✔
679
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
1✔
680

681
    data.scheduledEvent = a.build();
1✔
682
    HistoryEvent event =
683
        HistoryEvent.newBuilder()
1✔
684
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED)
1✔
685
            .setNexusOperationScheduledEventAttributes(a)
1✔
686
            .build();
1✔
687

688
    long scheduledEventId = ctx.addEvent(event);
1✔
689
    NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId);
1✔
690
    NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);
1✔
691

692
    Link link =
693
        workflowEventToNexusLink(
1✔
694
            io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
1✔
695
                .setNamespace(ctx.getNamespace())
1✔
696
                .setWorkflowId(ctx.getExecution().getWorkflowId())
1✔
697
                .setRunId(ctx.getExecution().getRunId())
1✔
698
                .setEventRef(
1✔
699
                    io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder()
1✔
700
                        .setEventId(scheduledEventId)
1✔
701
                        .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED))
1✔
702
                .build());
1✔
703

704
    PollNexusTaskQueueResponse.Builder pollResponse =
705
        PollNexusTaskQueueResponse.newBuilder()
1✔
706
            .setTaskToken(taskToken.toBytes())
1✔
707
            .setRequest(
1✔
708
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
709
                    .setScheduledTime(ctx.currentTime())
1✔
710
                    .putAllHeader(attr.getNexusHeaderMap())
1✔
711
                    .setStartOperation(
1✔
712
                        StartOperationRequest.newBuilder()
1✔
713
                            .setService(attr.getService())
1✔
714
                            .setOperation(attr.getOperation())
1✔
715
                            .setPayload(attr.getInput())
1✔
716
                            .addLinks(link)
1✔
717
                            .setCallback("http://test-env/operations")
1✔
718
                            // The test server uses this to lookup the operation
719
                            .putCallbackHeader(
1✔
720
                                "operation-reference", ref.toBytes().toStringUtf8())));
1✔
721

722
    TaskQueueId taskQueueId =
1✔
723
        new TaskQueueId(
724
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
725
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
726
    TestWorkflowStore.NexusTask task =
1✔
727
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
728

729
    // Test server only supports worker targets, so just push directly to Nexus task queue without
730
    // invoking Nexus client.
731
    ctx.addNexusTask(task);
1✔
732
    ctx.onCommit(
1✔
733
        historySize -> {
734
          data.scheduledEventId = scheduledEventId;
1✔
735
          data.nexusTask = task;
1✔
736
          data.retryState = retryState;
1✔
737
        });
1✔
738
  }
1✔
739

740
  private static void startNexusOperation(
741
      RequestContext ctx,
742
      NexusOperationData data,
743
      StartOperationResponse.Async resp,
744
      long notUsed) {
745
    HistoryEvent.Builder event =
746
        HistoryEvent.newBuilder()
1✔
747
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
1✔
748
            .setNexusOperationStartedEventAttributes(
1✔
749
                NexusOperationStartedEventAttributes.newBuilder()
1✔
750
                    .setOperationId(resp.getOperationId())
1✔
751
                    .setScheduledEventId(data.scheduledEventId)
1✔
752
                    .setRequestId(data.scheduledEvent.getRequestId()));
1✔
753

754
    for (Link l : resp.getLinksList()) {
1✔
NEW
755
      if (!l.getType()
×
NEW
756
          .equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
×
NEW
757
        continue;
×
758
      }
NEW
759
      event.addLinks(nexusLinkToWorkflowEvent(l));
×
NEW
760
    }
×
761

762
    ctx.addEvent(event.build());
1✔
763
    ctx.onCommit(historySize -> data.operationId = resp.getOperationId());
1✔
764
  }
1✔
765

766
  private static void completeNexusOperation(
767
      RequestContext ctx, NexusOperationData data, Payload result, long notUsed) {
768
    ctx.addEvent(
1✔
769
        HistoryEvent.newBuilder()
1✔
770
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED)
1✔
771
            .setNexusOperationCompletedEventAttributes(
1✔
772
                NexusOperationCompletedEventAttributes.newBuilder()
1✔
773
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
774
                    .setScheduledEventId(data.scheduledEventId)
1✔
775
                    .setResult(result))
1✔
776
            .build());
1✔
777
  }
1✔
778

779
  private static void timeoutNexusOperation(
780
      RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
781
    if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
1✔
782
      throw new IllegalArgumentException(
×
783
          "Timeout type not supported for Nexus operations: " + timeoutType);
784
    }
785

786
    Failure failure =
787
        Failure.newBuilder()
1✔
788
            .setMessage("nexus operation completed unsuccessfully")
1✔
789
            .setNexusOperationExecutionFailureInfo(
1✔
790
                NexusOperationFailureInfo.newBuilder()
1✔
791
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
792
                    .setService(data.scheduledEvent.getService())
1✔
793
                    .setOperation(data.scheduledEvent.getOperation())
1✔
794
                    .setOperationId(data.operationId)
1✔
795
                    .setScheduledEventId(data.scheduledEventId))
1✔
796
            .setCause(
1✔
797
                Failure.newBuilder()
1✔
798
                    .setMessage("operation timed out")
1✔
799
                    .setTimeoutFailureInfo(
1✔
800
                        TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType)))
1✔
801
            .build();
1✔
802

803
    ctx.addEvent(
1✔
804
        HistoryEvent.newBuilder()
1✔
805
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT)
1✔
806
            .setNexusOperationTimedOutEventAttributes(
1✔
807
                NexusOperationTimedOutEventAttributes.newBuilder()
1✔
808
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
809
                    .setScheduledEventId(data.scheduledEventId)
1✔
810
                    .setFailure(failure))
1✔
811
            .build());
1✔
812
  }
1✔
813

814
  private static State failNexusOperation(
815
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
816
    RetryState retryState = attemptNexusOperationRetry(ctx, Optional.of(failure), data);
1✔
817
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS
1✔
818
        || retryState == RetryState.RETRY_STATE_TIMEOUT) {
819
      // RETRY_STATE_TIMEOUT indicates that the next attempt schedule time would exceed the
820
      // operation's schedule-to-close timeout, so do not fail the operation here and allow
821
      // it to be timed out by the timer set in
822
      // io.temporal.internal.testservice.TestWorkflowMutableStateImpl.timeoutNexusOperation
823
      return (Strings.isNullOrEmpty(data.operationId)) ? INITIATED : STARTED;
1✔
824
    }
825

826
    Failure wrapped =
827
        Failure.newBuilder()
1✔
828
            .setMessage("nexus operation completed unsuccessfully")
1✔
829
            .setNexusOperationExecutionFailureInfo(
1✔
830
                NexusOperationFailureInfo.newBuilder()
1✔
831
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
832
                    .setService(data.scheduledEvent.getService())
1✔
833
                    .setOperation(data.scheduledEvent.getOperation())
1✔
834
                    .setOperationId(data.operationId)
1✔
835
                    .setScheduledEventId(data.scheduledEventId))
1✔
836
            .setCause(failure)
1✔
837
            .build();
1✔
838

839
    ctx.addEvent(
1✔
840
        HistoryEvent.newBuilder()
1✔
841
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED)
1✔
842
            .setNexusOperationFailedEventAttributes(
1✔
843
                NexusOperationFailedEventAttributes.newBuilder()
1✔
844
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
845
                    .setScheduledEventId(data.scheduledEventId)
1✔
846
                    .setFailure(wrapped))
1✔
847
            .build());
1✔
848
    return FAILED;
1✔
849
  }
850

851
  private static RetryState attemptNexusOperationRetry(
852
      RequestContext ctx, Optional<Failure> failure, NexusOperationData data) {
853
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
854
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
855
    if (info.isPresent()) {
1✔
856
      if (info.get().getNonRetryable()) {
1✔
857
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
858
      }
859
      if (info.get().hasNextRetryDelay()) {
1✔
860
        nextRetryDelay =
×
861
            Optional.of(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
×
862
      }
863
    }
864

865
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
866
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
867
        data.retryState.getBackoffIntervalInSeconds(
1✔
868
            info.map(ApplicationFailureInfo::getType), ctx.currentTime(), nextRetryDelay);
1✔
869
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
870
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
871
      PollNexusTaskQueueResponse.Builder task = data.nexusTask.getTask();
1✔
872
      ctx.onCommit(
1✔
873
          (historySize) -> {
874
            data.retryState = nextAttempt;
1✔
875
            data.isBackingOff = true;
1✔
876
            data.lastAttemptCompleteTime = ctx.currentTime();
1✔
877
            data.nextAttemptScheduleTime =
1✔
878
                Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval);
1✔
879
            task.setTaskToken(
1✔
880
                new NexusTaskToken(
881
                        ctx.getExecutionId(),
1✔
882
                        data.scheduledEventId,
883
                        nextAttempt.getAttempt(),
1✔
884
                        task.getRequest().hasCancelOperation())
1✔
885
                    .toBytes());
1✔
886
          });
1✔
887
    } else {
1✔
888
      data.nextBackoffInterval = Durations.ZERO;
1✔
889
    }
890
    return backoffInterval.getRetryState();
1✔
891
  }
892

893
  private static void requestCancelNexusOperation(
894
      RequestContext ctx,
895
      NexusOperationData data,
896
      RequestCancelNexusOperationCommandAttributes attr,
897
      long workflowTaskCompletedId) {
898
    ctx.addEvent(
1✔
899
        HistoryEvent.newBuilder()
1✔
900
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED)
1✔
901
            .setNexusOperationCancelRequestedEventAttributes(
1✔
902
                NexusOperationCancelRequestedEventAttributes.newBuilder()
1✔
903
                    .setScheduledEventId(data.scheduledEventId)
1✔
904
                    .setWorkflowTaskCompletedEventId(workflowTaskCompletedId))
1✔
905
            .build());
1✔
906

907
    NexusTaskToken taskToken =
1✔
908
        new NexusTaskToken(ctx.getExecutionId(), data.scheduledEventId, data.getAttempt(), true);
1✔
909

910
    PollNexusTaskQueueResponse.Builder pollResponse =
911
        PollNexusTaskQueueResponse.newBuilder()
1✔
912
            .setTaskToken(taskToken.toBytes())
1✔
913
            .setRequest(
1✔
914
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
915
                    .setCancelOperation(
1✔
916
                        CancelOperationRequest.newBuilder()
1✔
917
                            .setOperationId(data.operationId)
1✔
918
                            .setOperation(data.scheduledEvent.getOperation())
1✔
919
                            .setService(data.scheduledEvent.getService())));
1✔
920

921
    TaskQueueId taskQueueId =
1✔
922
        new TaskQueueId(
923
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
924
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
925
    TestWorkflowStore.NexusTask cancelTask =
1✔
926
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
927

928
    // Test server only supports worker targets, so just push directly to Nexus task queue without
929
    // invoking Nexus client.
930
    ctx.addNexusTask(cancelTask);
1✔
931
    ctx.onCommit(
1✔
932
        historySize -> {
933
          data.nexusTask = cancelTask;
1✔
934
          data.cancelRequestedTime = ctx.currentTime();
1✔
935
          data.isBackingOff = false;
1✔
936
        });
1✔
937
  }
1✔
938

939
  private static void reportNexusOperationCancellation(
940
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
941
    Failure.Builder wrapped =
942
        Failure.newBuilder()
1✔
943
            .setMessage("nexus operation completed unsuccessfully")
1✔
944
            .setNexusOperationExecutionFailureInfo(
1✔
945
                NexusOperationFailureInfo.newBuilder()
1✔
946
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
947
                    .setService(data.scheduledEvent.getService())
1✔
948
                    .setOperation(data.scheduledEvent.getOperation())
1✔
949
                    .setOperationId(data.operationId)
1✔
950
                    .setScheduledEventId(data.scheduledEventId));
1✔
951
    if (failure != null) {
1✔
952
      wrapped.setCause(failure);
1✔
953
    }
954
    ctx.addEvent(
1✔
955
        HistoryEvent.newBuilder()
1✔
956
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED)
1✔
957
            .setNexusOperationCanceledEventAttributes(
1✔
958
                NexusOperationCanceledEventAttributes.newBuilder()
1✔
959
                    .setScheduledEventId(data.scheduledEventId)
1✔
960
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
961
                    .setFailure(wrapped))
1✔
962
            .build());
1✔
963
  }
1✔
964

965
  private static void timeoutChildWorkflow(
966
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
967
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
968
    ChildWorkflowExecutionTimedOutEventAttributes a =
969
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
970
            .setNamespace(ie.getNamespace())
1✔
971
            .setStartedEventId(data.startedEventId)
1✔
972
            .setWorkflowExecution(data.execution)
1✔
973
            .setWorkflowType(ie.getWorkflowType())
1✔
974
            .setRetryState(retryState)
1✔
975
            .setInitiatedEventId(data.initiatedEventId)
1✔
976
            .build();
1✔
977
    HistoryEvent event =
978
        HistoryEvent.newBuilder()
1✔
979
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
980
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
981
            .build();
1✔
982
    ctx.addEvent(event);
1✔
983
  }
1✔
984

985
  private static void startChildWorkflowFailed(
986
      RequestContext ctx,
987
      ChildWorkflowData data,
988
      StartChildWorkflowExecutionFailedEventAttributes a,
989
      long notUsed) {
990
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
991
        a.toBuilder()
1✔
992
            .setInitiatedEventId(data.initiatedEventId)
1✔
993
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
994
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
995
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
996
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
997
    }
998
    HistoryEvent event =
999
        HistoryEvent.newBuilder()
1✔
1000
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1001
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1002
            .build();
1✔
1003
    ctx.addEvent(event);
1✔
1004
  }
1✔
1005

1006
  private static void childWorkflowStarted(
1007
      RequestContext ctx,
1008
      ChildWorkflowData data,
1009
      ChildWorkflowExecutionStartedEventAttributes a,
1010
      long notUsed) {
1011
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
1012
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
1013
    HistoryEvent event =
1014
        HistoryEvent.newBuilder()
1✔
1015
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
1016
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
1017
            .build();
1✔
1018
    long startedEventId = ctx.addEvent(event);
1✔
1019
    ctx.onCommit(
1✔
1020
        (historySize) -> {
1021
          data.startedEventId = startedEventId;
1✔
1022
          data.execution = updatedAttr.getWorkflowExecution();
1✔
1023
        });
1✔
1024
  }
1✔
1025

1026
  private static void childWorkflowCompleted(
1027
      RequestContext ctx,
1028
      ChildWorkflowData data,
1029
      ChildWorkflowExecutionCompletedEventAttributes a,
1030
      long notUsed) {
1031
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
1032
        a.toBuilder()
1✔
1033
            .setInitiatedEventId(data.initiatedEventId)
1✔
1034
            .setStartedEventId(data.startedEventId)
1✔
1035
            .build();
1✔
1036
    HistoryEvent event =
1037
        HistoryEvent.newBuilder()
1✔
1038
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
1039
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
1040
            .build();
1✔
1041
    ctx.addEvent(event);
1✔
1042
  }
1✔
1043

1044
  private static void childWorkflowFailed(
1045
      RequestContext ctx,
1046
      ChildWorkflowData data,
1047
      ChildWorkflowExecutionFailedEventAttributes a,
1048
      long notUsed) {
1049
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
1050
        a.toBuilder()
1✔
1051
            .setInitiatedEventId(data.initiatedEventId)
1✔
1052
            .setStartedEventId(data.startedEventId)
1✔
1053
            .setWorkflowExecution(data.execution)
1✔
1054
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
1055
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1✔
1056
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
1057
    }
1058
    HistoryEvent event =
1059
        HistoryEvent.newBuilder()
1✔
1060
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1061
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1062
            .build();
1✔
1063
    ctx.addEvent(event);
1✔
1064
  }
1✔
1065

1066
  private static void childWorkflowCanceled(
1067
      RequestContext ctx,
1068
      ChildWorkflowData data,
1069
      ChildWorkflowExecutionCanceledEventAttributes a,
1070
      long notUsed) {
1071
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
1072
        a.toBuilder()
1✔
1073
            .setInitiatedEventId(data.initiatedEventId)
1✔
1074
            .setStartedEventId(data.startedEventId)
1✔
1075
            .build();
1✔
1076
    HistoryEvent event =
1077
        HistoryEvent.newBuilder()
1✔
1078
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
1079
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
1080
            .build();
1✔
1081
    ctx.addEvent(event);
1✔
1082
  }
1✔
1083

1084
  private static void initiateChildWorkflow(
1085
      RequestContext ctx,
1086
      ChildWorkflowData data,
1087
      StartChildWorkflowExecutionCommandAttributes d,
1088
      long workflowTaskCompletedEventId) {
1089
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
1090
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1091
            .setControl(d.getControl())
1✔
1092
            .setInput(d.getInput())
1✔
1093
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1094
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1095
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1096
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1097
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1098
            .setTaskQueue(d.getTaskQueue())
1✔
1099
            .setWorkflowId(d.getWorkflowId())
1✔
1100
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1101
            .setWorkflowType(d.getWorkflowType())
1✔
1102
            .setCronSchedule(d.getCronSchedule())
1✔
1103
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
1104
    if (d.hasHeader()) {
1✔
1105
      a.setHeader(d.getHeader());
1✔
1106
    }
1107
    if (d.hasMemo()) {
1✔
1108
      a.setMemo(d.getMemo());
1✔
1109
    }
1110
    if (d.hasRetryPolicy()) {
1✔
1111
      a.setRetryPolicy(d.getRetryPolicy());
1✔
1112
    }
1113
    HistoryEvent event =
1114
        HistoryEvent.newBuilder()
1✔
1115
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
1116
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
1117
            .build();
1✔
1118
    long initiatedEventId = ctx.addEvent(event);
1✔
1119
    ctx.onCommit(
1✔
1120
        (historySize) -> {
1121
          data.initiatedEventId = initiatedEventId;
1✔
1122
          data.initiatedEvent = a.build();
1✔
1123
          StartWorkflowExecutionRequest.Builder startChild =
1124
              StartWorkflowExecutionRequest.newBuilder()
1✔
1125
                  .setRequestId(UUID.randomUUID().toString())
1✔
1126
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1✔
1127
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1128
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1129
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1130
                  .setTaskQueue(d.getTaskQueue())
1✔
1131
                  .setWorkflowId(d.getWorkflowId())
1✔
1132
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1133
                  .setWorkflowType(d.getWorkflowType())
1✔
1134
                  .setCronSchedule(d.getCronSchedule());
1✔
1135
          if (d.hasHeader()) {
1✔
1136
            startChild.setHeader(d.getHeader());
1✔
1137
          }
1138
          if (d.hasSearchAttributes()) {
1✔
1139
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
1140
          }
1141
          if (d.hasMemo()) {
1✔
1142
            startChild.setMemo(d.getMemo());
1✔
1143
          }
1144
          if (d.hasRetryPolicy()) {
1✔
1145
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
1146
          }
1147
          if (d.hasInput()) {
1✔
1148
            startChild.setInput(d.getInput());
1✔
1149
          }
1150
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
1151
        });
1✔
1152
  }
1✔
1153

1154
  private static void addStartChildTask(
1155
      RequestContext ctx,
1156
      ChildWorkflowData data,
1157
      long initiatedEventId,
1158
      StartWorkflowExecutionRequest startChild) {
1159
    ForkJoinPool.commonPool()
1✔
1160
        .execute(
1✔
1161
            () -> {
1162
              try {
1163
                data.service.startWorkflowExecutionImpl(
1✔
1164
                    startChild,
1165
                    java.time.Duration.ZERO,
1166
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
1167
                    OptionalLong.of(data.initiatedEventId),
1✔
1168
                    null);
1169
              } catch (StatusRuntimeException e) {
1✔
1170
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1✔
1171
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
1172
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1173
                          .setInitiatedEventId(initiatedEventId)
1✔
1174
                          .setCause(
1✔
1175
                              StartChildWorkflowExecutionFailedCause
1176
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
1177
                          .build();
1✔
1178
                  try {
1179
                    ctx.getWorkflowMutableState()
1✔
1180
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
1181
                  } catch (Throwable ee) {
×
1182
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
1183
                  }
1✔
1184
                } else {
1✔
1185
                  log.error("Unexpected failure starting a child workflow", e);
×
1186
                }
1187
              } catch (Exception e) {
×
1188
                log.error("Unexpected failure starting a child workflow", e);
×
1189
              }
1✔
1190
            });
1✔
1191
  }
1✔
1192

1193
  private static void startWorkflow(
1194
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
1195
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1✔
1196
      throw Status.INVALID_ARGUMENT
×
1197
          .withDescription("negative workflowExecution timeout")
×
1198
          .asRuntimeException();
×
1199
    }
1200
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1✔
1201
      throw Status.INVALID_ARGUMENT
×
1202
          .withDescription("negative workflowRun timeout")
×
1203
          .asRuntimeException();
×
1204
    }
1205
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1✔
1206
      throw Status.INVALID_ARGUMENT
×
1207
          .withDescription("negative workflowTaskTimeoutSeconds")
×
1208
          .asRuntimeException();
×
1209
    }
1210
    if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
1✔
1211
      throw Status.INVALID_ARGUMENT
×
1212
          .withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
×
1213
          .asRuntimeException();
×
1214
    }
1215
    if (request.getCompletionCallbacksCount() > 0
1✔
1216
        && !request.getCompletionCallbacksList().stream().allMatch(Callback::hasNexus)) {
1✔
1217
      throw Status.INVALID_ARGUMENT
×
1218
          .withDescription("non-Nexus completion callbacks are not supported.")
×
1219
          .asRuntimeException();
×
1220
    }
1221

1222
    WorkflowExecutionStartedEventAttributes.Builder a =
1223
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
1224
            .setWorkflowType(request.getWorkflowType())
1✔
1225
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
1226
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
1227
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
1228
            .setIdentity(request.getIdentity())
1✔
1229
            .setInput(request.getInput())
1✔
1230
            .setTaskQueue(request.getTaskQueue())
1✔
1231
            .addAllCompletionCallbacks(request.getCompletionCallbacksList())
1✔
1232
            .setAttempt(1);
1✔
1233
    if (request.hasRetryPolicy()) {
1✔
1234
      a.setRetryPolicy(request.getRetryPolicy());
1✔
1235
    }
1236
    data.retryState.ifPresent(
1✔
1237
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
1238
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
1239
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
1240
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
1241
    if (data.lastCompletionResult != null) {
1✔
1242
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
1243
    }
1244
    if (request.hasWorkflowStartDelay()) {
1✔
1245
      a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1✔
1246
    }
1247
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
1248
    if (request.hasMemo()) {
1✔
1249
      a.setMemo(request.getMemo());
1✔
1250
    }
1251
    if (request.hasSearchAttributes()) {
1✔
1252
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
1253
    }
1254
    if (request.hasHeader()) {
1✔
1255
      a.setHeader(request.getHeader());
1✔
1256
    }
1257
    String cronSchedule = request.getCronSchedule();
1✔
1258
    if (!cronSchedule.trim().isEmpty()) {
1✔
1259
      try {
1260
        CronUtils.parseCron(cronSchedule);
1✔
1261
        a.setCronSchedule(cronSchedule);
1✔
1262
      } catch (Exception e) {
×
1263
        throw Status.INVALID_ARGUMENT
×
1264
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
1265
            .withCause(e)
×
1266
            .asRuntimeException();
×
1267
      }
1✔
1268
    }
1269
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
1270
    if (parent.isPresent()) {
1✔
1271
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
1272
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
1273
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
1274
    }
1275
    HistoryEvent.Builder event =
1276
        HistoryEvent.newBuilder()
1✔
1277
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
1278
            .setWorkflowExecutionStartedEventAttributes(a);
1✔
1279
    if (request.getLinksCount() > 0) {
1✔
1280
      event.addAllLinks(request.getLinksList());
1✔
1281
    }
1282
    ctx.addEvent(event.build());
1✔
1283
  }
1✔
1284

1285
  private static void completeWorkflow(
1286
      RequestContext ctx,
1287
      WorkflowData data,
1288
      CompleteWorkflowExecutionCommandAttributes d,
1289
      long workflowTaskCompletedEventId) {
1290
    WorkflowExecutionCompletedEventAttributes.Builder a =
1291
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
1292
            .setResult(d.getResult())
1✔
1293
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1294
    HistoryEvent event =
1295
        HistoryEvent.newBuilder()
1✔
1296
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
1297
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
1298
            .build();
1✔
1299
    ctx.addEvent(event);
1✔
1300
  }
1✔
1301

1302
  private static void continueAsNewWorkflow(
1303
      RequestContext ctx,
1304
      WorkflowData data,
1305
      ContinueAsNewWorkflowExecutionCommandAttributes d,
1306
      long workflowTaskCompletedEventId) {
1307
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
1308
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
1309
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
1310
    a.setInput(d.getInput());
1✔
1311
    if (d.hasHeader()) {
1✔
1312
      a.setHeader(d.getHeader());
1✔
1313
    }
1314
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
1315
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
1316
    } else {
1317
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
1318
    }
1319
    if (d.hasTaskQueue() && !d.getTaskQueue().getName().isEmpty()) {
1✔
1320
      a.setTaskQueue(d.getTaskQueue());
1✔
1321
    } else {
1322
      a.setTaskQueue(sr.getTaskQueue());
1✔
1323
    }
1324
    if (d.hasWorkflowType() && !d.getWorkflowType().getName().isEmpty()) {
1✔
1325
      a.setWorkflowType(d.getWorkflowType());
1✔
1326
    } else {
1327
      a.setWorkflowType(sr.getWorkflowType());
1✔
1328
    }
1329
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
1330
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
1331
    } else {
1332
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
1333
    }
1334
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1335
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
1336
    if (d.hasLastCompletionResult()) {
1✔
1337
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
1338
    }
1339
    if (d.hasFailure()) {
1✔
1340
      a.setFailure(d.getFailure());
1✔
1341
    }
1342
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
1343
    HistoryEvent event =
1344
        HistoryEvent.newBuilder()
1✔
1345
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
1346
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
1347
            .build();
1✔
1348
    ctx.addEvent(event);
1✔
1349
  }
1✔
1350

1351
  private static void failWorkflow(
1352
      RequestContext ctx,
1353
      WorkflowData data,
1354
      FailWorkflowExecutionCommandAttributes d,
1355
      long workflowTaskCompletedEventId) {
1356
    WorkflowExecutionFailedEventAttributes.Builder a =
1357
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1358
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1359
    if (d.hasFailure()) {
1✔
1360
      a.setFailure(d.getFailure());
1✔
1361
    }
1362
    HistoryEvent event =
1363
        HistoryEvent.newBuilder()
1✔
1364
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
1365
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
1366
            .build();
1✔
1367
    ctx.addEvent(event);
1✔
1368
  }
1✔
1369

1370
  private static void timeoutWorkflow(
1371
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
1372
    WorkflowExecutionTimedOutEventAttributes.Builder a =
1373
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
1374
    HistoryEvent event =
1375
        HistoryEvent.newBuilder()
1✔
1376
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
1377
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
1378
            .build();
1✔
1379
    ctx.addEvent(event);
1✔
1380
  }
1✔
1381

1382
  private static void cancelWorkflow(
1383
      RequestContext ctx,
1384
      WorkflowData data,
1385
      CancelWorkflowExecutionCommandAttributes d,
1386
      long workflowTaskCompletedEventId) {
1387
    WorkflowExecutionCanceledEventAttributes.Builder a =
1388
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1389
            .setDetails(d.getDetails())
1✔
1390
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1391
    HistoryEvent event =
1392
        HistoryEvent.newBuilder()
1✔
1393
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1394
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1395
            .build();
1✔
1396
    ctx.addEvent(event);
1✔
1397
  }
1✔
1398

1399
  private static void terminateWorkflow(
1400
      RequestContext ctx,
1401
      WorkflowData data,
1402
      TerminateWorkflowExecutionRequest d,
1403
      long workflowTaskCompletedEventId) {
1404
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1405
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1406
            .setDetails(d.getDetails())
1✔
1407
            .setIdentity(d.getIdentity())
1✔
1408
            .setReason(d.getReason());
1✔
1409
    HistoryEvent event =
1410
        HistoryEvent.newBuilder()
1✔
1411
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1412
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1413
            .build();
1✔
1414
    ctx.addEvent(event);
1✔
1415
  }
1✔
1416

1417
  private static void requestWorkflowCancellation(
1418
      RequestContext ctx,
1419
      WorkflowData data,
1420
      RequestCancelWorkflowExecutionRequest cancelRequest,
1421
      long notUsed) {
1422
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1423
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1424
            .setIdentity(cancelRequest.getIdentity());
1✔
1425
    HistoryEvent cancelRequested =
1426
        HistoryEvent.newBuilder()
1✔
1427
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1428
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1429
            .build();
1✔
1430
    ctx.addEvent(cancelRequested);
1✔
1431
  }
1✔
1432

1433
  private static void scheduleActivityTask(
1434
      RequestContext ctx,
1435
      ActivityTaskData data,
1436
      ScheduleActivityTaskCommandAttributes d,
1437
      long workflowTaskCompletedEventId) {
1438
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1439
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1440
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1441
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1442

1443
    ActivityTaskScheduledEventAttributes.Builder a =
1444
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1445
            .setInput(d.getInput())
1✔
1446
            .setActivityId(d.getActivityId())
1✔
1447
            .setActivityType(d.getActivityType())
1✔
1448
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1449
            .setRetryPolicy(retryPolicy)
1✔
1450
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1451
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1452
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1453
            .setTaskQueue(d.getTaskQueue())
1✔
1454
            .setHeader(d.getHeader())
1✔
1455
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1456

1457
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1458
    data.scheduledEvent = a.build();
1✔
1459
    HistoryEvent event =
1460
        HistoryEvent.newBuilder()
1✔
1461
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1462
            .setActivityTaskScheduledEventAttributes(a)
1✔
1463
            .build();
1✔
1464
    long scheduledEventId = ctx.addEvent(event);
1✔
1465

1466
    PollActivityTaskQueueResponse.Builder taskResponse =
1467
        PollActivityTaskQueueResponse.newBuilder()
1✔
1468
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1469
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1470
            .setActivityType(d.getActivityType())
1✔
1471
            .setWorkflowExecution(ctx.getExecution())
1✔
1472
            .setActivityId(d.getActivityId())
1✔
1473
            .setInput(d.getInput())
1✔
1474
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1475
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1476
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1477
            .setScheduledTime(ctx.currentTime())
1✔
1478
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1479
            .setHeader(d.getHeader())
1✔
1480
            .setAttempt(1);
1✔
1481

1482
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1483
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1484
    ctx.addActivityTask(activityTask);
1✔
1485
    ctx.onCommit(
1✔
1486
        (historySize) -> {
1487
          data.scheduledEventId = scheduledEventId;
1✔
1488
          data.activityTask = activityTask;
1✔
1489
          data.retryState = retryState;
1✔
1490
        });
1✔
1491
  }
1✔
1492

1493
  private static void requestActivityCancellation(
1494
      RequestContext ctx,
1495
      ActivityTaskData data,
1496
      RequestCancelActivityTaskCommandAttributes d,
1497
      long workflowTaskCompletedEventId) {
1498
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1499
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1500
            .setScheduledEventId(d.getScheduledEventId())
1✔
1501
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1502
    HistoryEvent event =
1503
        HistoryEvent.newBuilder()
1✔
1504
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1505
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1506
            .build();
1✔
1507
    ctx.addEvent(event);
1✔
1508
  }
1✔
1509

1510
  private static void scheduleWorkflowTask(
1511
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1512
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1513
    long scheduledEventId;
1514
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1515
    WorkflowTaskScheduledEventAttributes a =
1516
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1517
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1518
            .setTaskQueue(taskQueue)
1✔
1519
            .setAttempt(++data.attempt)
1✔
1520
            .build();
1✔
1521
    HistoryEvent event =
1522
        HistoryEvent.newBuilder()
1✔
1523
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1524
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1525
            .build();
1✔
1526
    scheduledEventId = ctx.addEvent(event);
1✔
1527
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1528
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1529
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1530
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1531
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1532
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1533
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1534
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1535
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1536
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1537
    ctx.onCommit(
1✔
1538
        (historySize) -> {
1539
          data.scheduledEventId = scheduledEventId;
1✔
1540
          data.workflowTask = workflowTaskResponse;
1✔
1541
          // Move buffered update request to new workflow task
1542
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1543
          data.updateRequestBuffer.clear();
1✔
1544
        });
1✔
1545
  }
1✔
1546

1547
  private static void convertQueryWorkflowTaskToReal(
1548
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1549
    StartWorkflowExecutionRequest request = data.startRequest;
×
1550
    WorkflowTaskScheduledEventAttributes a =
1551
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1552
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1553
            .setTaskQueue(request.getTaskQueue())
×
1554
            .setAttempt(data.attempt)
×
1555
            .build();
×
1556
    HistoryEvent event =
1557
        HistoryEvent.newBuilder()
×
1558
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1559
            .setWorkflowTaskScheduledEventAttributes(a)
×
1560
            .build();
×
1561
    long scheduledEventId = ctx.addEvent(event);
×
1562
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1563
  }
×
1564

1565
  private static void scheduleQueryWorkflowTask(
1566
      RequestContext ctx,
1567
      WorkflowTaskData data,
1568
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1569
      long notUsed) {
1570
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1571
    StartWorkflowExecutionRequest request = data.startRequest;
×
1572
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1573
        PollWorkflowTaskQueueResponse.newBuilder();
×
1574
    StickyExecutionAttributes stickyAttributes =
×
1575
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1576
    String taskQueue =
1577
        stickyAttributes == null
×
1578
            ? request.getTaskQueue().getName()
×
1579
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1580
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1581
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1582
    workflowTaskResponse.setAttempt(++data.attempt);
×
1583
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1584
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1585
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1586
    ctx.onCommit(
×
1587
        (historySize) -> {
1588
          if (data.lastSuccessfulStartedEventId > 0) {
×
1589
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1590
          }
1591
          data.scheduledEventId = NO_EVENT_ID;
×
1592
          data.workflowTask = workflowTaskResponse;
×
1593
          if (query != null) {
×
1594
            data.consistentQueryRequests.put(query.getKey(), query);
×
1595
          }
1596
        });
×
1597
  }
×
1598

1599
  private static void queryWhileScheduled(
1600
      RequestContext ctx,
1601
      WorkflowTaskData data,
1602
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1603
      long notUsed) {
1604
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1605
  }
1✔
1606

1607
  private static void bufferQuery(
1608
      RequestContext ctx,
1609
      WorkflowTaskData data,
1610
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1611
      long notUsed) {
1612
    data.queryBuffer.put(query.getKey(), query);
1✔
1613
  }
1✔
1614

1615
  private static void bufferUpdate(
1616
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1617
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1618
      throw Status.INTERNAL
×
1619
          .withDescription("Update ID already exists: " + update.getId())
×
1620
          .asRuntimeException();
×
1621
    }
1622
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1623
  }
1✔
1624

1625
  private static void addUpdate(
1626
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1627
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1✔
1628
      throw Status.INTERNAL
×
1629
          .withDescription("Update ID already exists: " + update.getId())
×
1630
          .asRuntimeException();
×
1631
    }
1632
    data.updateRequest.put(update.getId(), update);
1✔
1633
  }
1✔
1634

1635
  private static void startWorkflowTask(
1636
      RequestContext ctx,
1637
      WorkflowTaskData data,
1638
      PollWorkflowTaskQueueRequest request,
1639
      long notUsed) {
1640
    WorkflowTaskStartedEventAttributes a =
1641
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1642
            .setIdentity(request.getIdentity())
1✔
1643
            .setScheduledEventId(data.scheduledEventId)
1✔
1644
            .build();
1✔
1645
    HistoryEvent event =
1646
        HistoryEvent.newBuilder()
1✔
1647
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1648
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1649
            .build();
1✔
1650
    long startedEventId = ctx.addEvent(event);
1✔
1651
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1652
  }
1✔
1653

1654
  private static void startQueryOnlyWorkflowTask(
1655
      RequestContext ctx,
1656
      WorkflowTaskData data,
1657
      PollWorkflowTaskQueueRequest request,
1658
      long notUsed) {
1659
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1660
  }
×
1661

1662
  private static void startWorkflowTaskImpl(
1663
      RequestContext ctx,
1664
      WorkflowTaskData data,
1665
      PollWorkflowTaskQueueRequest request,
1666
      long startedEventId,
1667
      boolean queryOnly) {
1668
    ctx.onCommit(
1✔
1669
        (historySize) -> {
1670
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1671
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1672
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1673
          task.setTaskToken(taskToken.toBytes());
1✔
1674
          GetWorkflowExecutionHistoryRequest getRequest =
1675
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1676
                  .setNamespace(request.getNamespace())
1✔
1677
                  .setExecution(ctx.getExecution())
1✔
1678
                  .build();
1✔
1679
          List<HistoryEvent> events;
1680
          events =
1✔
1681
              data.store
1682
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1683
                  .getHistory()
1✔
1684
                  .getEventsList();
1✔
1685
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1686
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1687
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1688
          }
1689
          if (queryOnly && !data.workflowCompleted) {
1✔
1690
            events = new ArrayList<>(events); // convert list to mutable
×
1691
            // Add "fake" workflow task scheduled and started if workflow is not closed
1692
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1693
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1694
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1695
                    .setTaskQueue(request.getTaskQueue())
×
1696
                    .setAttempt(data.attempt)
×
1697
                    .build();
×
1698
            HistoryEvent scheduledEvent =
1699
                HistoryEvent.newBuilder()
×
1700
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1701
                    .setEventId(lastEventId + 1)
×
1702
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1703
                    .build();
×
1704
            events.add(scheduledEvent);
×
1705
            WorkflowTaskStartedEventAttributes startedAttributes =
1706
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1707
                    .setIdentity(request.getIdentity())
×
1708
                    .setScheduledEventId(lastEventId + 1)
×
1709
                    .build();
×
1710
            HistoryEvent startedEvent =
1711
                HistoryEvent.newBuilder()
×
1712
                    .setEventId(lastEventId + 1)
×
1713
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1714
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1715
                    .build();
×
1716
            events.add(startedEvent);
×
1717
            task.setStartedEventId(lastEventId + 2);
×
1718
          }
1719
          // get it from previous started event id.
1720
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1721
          // Transfer the queries
1722
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1723
              data.consistentQueryRequests;
1724
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1725
              queries.entrySet()) {
1✔
1726
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1727
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1728
          }
1✔
1729
          // Transfer the messages
1730
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1731
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1732
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1733
            Message updateMessage =
1734
                Message.newBuilder()
1✔
1735
                    .setId(update.getKey() + "/request")
1✔
1736
                    .setProtocolInstanceId(update.getKey())
1✔
1737
                    .setEventId(data.scheduledEventId)
1✔
1738
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1739
                    .build();
1✔
1740
            task.addMessages(updateMessage);
1✔
1741
          }
1✔
1742
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1743
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1744
          }
1745
          if (!queryOnly) {
1✔
1746
            data.startedEventId = startedEventId;
1✔
1747
          }
1748
        });
1✔
1749
  }
1✔
1750

1751
  private static void startActivityTask(
1752
      RequestContext ctx,
1753
      ActivityTaskData data,
1754
      PollActivityTaskQueueRequest request,
1755
      long notUsed) {
1756
    ActivityTaskStartedEventAttributes.Builder a =
1757
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1758
            .setIdentity(request.getIdentity())
1✔
1759
            .setScheduledEventId(data.scheduledEventId);
1✔
1760
    a.setAttempt(data.getAttempt());
1✔
1761
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1762
    // history. But in the case of retry it happens only after an activity completion.
1763
    Timestamp timestamp = data.store.currentTime();
1✔
1764
    HistoryEvent event =
1765
        HistoryEvent.newBuilder()
1✔
1766
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1767
            .setEventTime(timestamp)
1✔
1768
            .setActivityTaskStartedEventAttributes(a)
1✔
1769
            .build();
1✔
1770
    long startedEventId;
1771
    startedEventId = NO_EVENT_ID;
1✔
1772
    ctx.onCommit(
1✔
1773
        (historySize) -> {
1774
          data.startedEventId = startedEventId;
1✔
1775
          data.startedEvent = event;
1✔
1776
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1777
          task.setTaskToken(
1✔
1778
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1779
                  .toBytes());
1✔
1780
          task.setStartedTime(timestamp);
1✔
1781
        });
1✔
1782
  }
1✔
1783

1784
  private static void completeWorkflowTask(
1785
      RequestContext ctx,
1786
      WorkflowTaskData data,
1787
      RespondWorkflowTaskCompletedRequest request,
1788
      long notUsed) {
1789
    WorkflowTaskCompletedEventAttributes.Builder a =
1790
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1791
            .setIdentity(request.getIdentity())
1✔
1792
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1793
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1794
            .setSdkMetadata(request.getSdkMetadata())
1✔
1795
            .setScheduledEventId(data.scheduledEventId);
1✔
1796
    HistoryEvent event =
1797
        HistoryEvent.newBuilder()
1✔
1798
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1799
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1800
            .build();
1✔
1801
    ctx.addEvent(event);
1✔
1802
    ctx.onCommit(
1✔
1803
        (historySize) -> {
1804
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1805
          data.clear();
1✔
1806
        });
1✔
1807
  }
1✔
1808

1809
  private static void completeQuery(
1810
      RequestContext ctx,
1811
      WorkflowTaskData data,
1812
      RespondWorkflowTaskCompletedRequest request,
1813
      long notUsed) {
1814
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1815
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1816
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1817
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1818
      if (query != null) {
×
1819
        WorkflowQueryResult value = resultEntry.getValue();
×
1820
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1821
        switch (value.getResultType()) {
×
1822
          case QUERY_RESULT_TYPE_ANSWERED:
1823
            QueryWorkflowResponse response =
1824
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1825
            result.complete(response);
×
1826
            break;
×
1827
          case QUERY_RESULT_TYPE_FAILED:
1828
            result.completeExceptionally(
×
1829
                StatusUtils.newException(
×
1830
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1831
                    QueryFailedFailure.getDefaultInstance(),
×
1832
                    QueryFailedFailure.getDescriptor()));
×
1833
            break;
×
1834
          default:
1835
            throw Status.INVALID_ARGUMENT
×
1836
                .withDescription("Invalid query result type: " + value.getResultType())
×
1837
                .asRuntimeException();
×
1838
        }
1839
      }
1840
    }
×
1841
    ctx.onCommit(
×
1842
        (historySize) -> {
1843
          data.clear();
×
1844
          ctx.unlockTimer("completeQuery");
×
1845
        });
×
1846
  }
×
1847

1848
  private static void failQueryWorkflowTask(
1849
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1850
    data.consistentQueryRequests
×
1851
        .entrySet()
×
1852
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1853
    if (!data.consistentQueryRequests.isEmpty()) {
×
1854
      ctx.setNeedWorkflowTask(true);
×
1855
    }
1856
    ctx.unlockTimer("failQueryWorkflowTask");
×
1857
  }
×
1858

1859
  private static void failWorkflowTask(
1860
      RequestContext ctx,
1861
      WorkflowTaskData data,
1862
      RespondWorkflowTaskFailedRequest request,
1863
      long notUsed) {
1864
    WorkflowTaskFailedEventAttributes.Builder a =
1865
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1866
            .setIdentity(request.getIdentity())
1✔
1867
            .setStartedEventId(data.startedEventId)
1✔
1868
            .setScheduledEventId(data.scheduledEventId)
1✔
1869
            .setCause(request.getCause());
1✔
1870
    if (request.hasFailure()) {
1✔
1871
      a.setFailure(request.getFailure());
1✔
1872
    }
1873
    HistoryEvent event =
1874
        HistoryEvent.newBuilder()
1✔
1875
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1876
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1877
            .build();
1✔
1878
    ctx.addEvent(event);
1✔
1879
    ctx.setNeedWorkflowTask(true);
1✔
1880
  }
1✔
1881

1882
  private static void timeoutWorkflowTask(
1883
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1884
    WorkflowTaskTimedOutEventAttributes.Builder a =
1885
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1886
            .setStartedEventId(data.startedEventId)
1✔
1887
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1888
            .setScheduledEventId(data.scheduledEventId);
1✔
1889
    HistoryEvent event =
1890
        HistoryEvent.newBuilder()
1✔
1891
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1892
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1893
            .build();
1✔
1894
    ctx.addEvent(event);
1✔
1895
    ctx.setNeedWorkflowTask(true);
1✔
1896
  }
1✔
1897

1898
  private static void needsWorkflowTask(
1899
      RequestContext requestContext,
1900
      WorkflowTaskData workflowTaskData,
1901
      Object notUsedRequest,
1902
      long notUsed) {
1903
    requestContext.setNeedWorkflowTask(true);
1✔
1904
  }
1✔
1905

1906
  private static void completeActivityTask(
1907
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1908
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1909
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
1910
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1911
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1912
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1913
    } else {
1914
      throw new IllegalArgumentException("Unknown request: " + request);
×
1915
    }
1916
  }
1✔
1917

1918
  private static void completeActivityTaskByTaskToken(
1919
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1920
    ActivityTaskCompletedEventAttributes.Builder a =
1921
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1922
            .setIdentity(request.getIdentity())
1✔
1923
            .setScheduledEventId(data.scheduledEventId)
1✔
1924
            .setResult(request.getResult())
1✔
1925
            .setStartedEventId(data.startedEventId);
1✔
1926
    HistoryEvent event =
1927
        HistoryEvent.newBuilder()
1✔
1928
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1929
            .setActivityTaskCompletedEventAttributes(a)
1✔
1930
            .build();
1✔
1931
    ctx.addEvent(event);
1✔
1932
  }
1✔
1933

1934
  private static void completeActivityTaskById(
1935
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1936
    ActivityTaskCompletedEventAttributes.Builder a =
1937
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1938
            .setIdentity(request.getIdentity())
×
1939
            .setScheduledEventId(data.scheduledEventId)
×
1940
            .setResult(request.getResult())
×
1941
            .setStartedEventId(data.startedEventId);
×
1942
    HistoryEvent event =
1943
        HistoryEvent.newBuilder()
×
1944
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1945
            .setActivityTaskCompletedEventAttributes(a)
×
1946
            .build();
×
1947
    ctx.addEvent(event);
×
1948
  }
×
1949

1950
  private static State failActivityTask(
1951
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1952
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
1953
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1954
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1955
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1956
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1957
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1958
    } else {
1959
      throw new IllegalArgumentException("Unknown request: " + request);
×
1960
    }
1961
  }
1962

1963
  private static State failActivityTaskByRequestType(
1964
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1965
    if (!failure.hasApplicationFailureInfo()) {
1✔
1966
      throw new IllegalArgumentException(
×
1967
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1968
    }
1969
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1970
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1971
      return INITIATED;
1✔
1972
    }
1973
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1974
    ActivityTaskFailedEventAttributes.Builder attributes =
1975
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1976
            .setIdentity(identity)
1✔
1977
            .setScheduledEventId(data.scheduledEventId)
1✔
1978
            .setFailure(failure)
1✔
1979
            .setRetryState(retryState)
1✔
1980
            .setStartedEventId(data.startedEventId);
1✔
1981
    HistoryEvent event =
1982
        HistoryEvent.newBuilder()
1✔
1983
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1984
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1985
            .build();
1✔
1986
    ctx.addEvent(event);
1✔
1987
    return FAILED;
1✔
1988
  }
1989

1990
  private static State timeoutActivityTask(
1991
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1992
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1993

1994
    // chaining with the previous run failure if we are preparing the final failure
1995
    Failure failure =
1✔
1996
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
1997

1998
    RetryState retryState;
1999
    switch (timeoutType) {
1✔
2000
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
2001
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
2002
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
2003
        // set a larger ScheduleToStart timeout.
2004
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
2005
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
2006
        break;
1✔
2007
      case TIMEOUT_TYPE_START_TO_CLOSE:
2008
      case TIMEOUT_TYPE_HEARTBEAT:
2009
        // not chaining with the previous run failure if we are preparing the failure to be stored
2010
        // for the next iteration
2011
        Optional<Failure> lastFailure =
1✔
2012
            Optional.of(
1✔
2013
                newTimeoutFailure(
1✔
2014
                    timeoutType,
2015
                    // we move heartbeatDetails to the new top level (this cause is used for
2016
                    // scheduleToClose only)
2017
                    Optional.empty(),
1✔
2018
                    // prune to don't have too deep nesting of failures
2019
                    Optional.empty()));
1✔
2020

2021
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
2022
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2023
          return INITIATED;
1✔
2024
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
2025
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
2026
          // attemptActivityRetry();
2027
          // start to close timeout would return as "max attempts reached".
2028

2029
          Preconditions.checkState(
1✔
2030
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
2031
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
2032
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
2033
              timeoutType);
2034

2035
          // heartbeat is preserved as the cause for the scheduleToClose timeout
2036
          // But we effectively omit startToClose timeout with scheduleToClose timeout
2037
          Optional<Failure> cause =
2038
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
2039

2040
          failure =
1✔
2041
              newTimeoutFailure(
1✔
2042
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
2043
                  Optional.ofNullable(data.heartbeatDetails),
1✔
2044
                  cause);
2045
        }
1✔
2046
        break;
2047
      default:
2048
        throw new IllegalStateException(
×
2049
            "Not implemented behavior for timeout type: " + timeoutType);
2050
    }
2051

2052
    ActivityTaskTimedOutEventAttributes.Builder a =
2053
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
2054
            .setScheduledEventId(data.scheduledEventId)
1✔
2055
            .setRetryState(retryState)
1✔
2056
            .setStartedEventId(data.startedEventId)
1✔
2057
            .setFailure(failure);
1✔
2058
    HistoryEvent event =
2059
        HistoryEvent.newBuilder()
1✔
2060
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
2061
            .setActivityTaskTimedOutEventAttributes(a)
1✔
2062
            .build();
1✔
2063
    ctx.addEvent(event);
1✔
2064
    return TIMED_OUT;
1✔
2065
  }
2066

2067
  private static Failure newTimeoutFailure(
2068
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
2069
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
2070
    if (lastHeartbeatDetails.isPresent()) {
1✔
2071
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
2072
    }
2073
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
2074
    if (cause.isPresent()) {
1✔
2075
      result.setCause(cause.get());
1✔
2076
    }
2077
    return result.build();
1✔
2078
  }
2079

2080
  private static RetryState attemptActivityRetry(
2081
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
2082
    if (data.retryState == null) {
1✔
2083
      throw new IllegalStateException("RetryPolicy is always present");
×
2084
    }
2085
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
2086
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
2087

2088
    if (info.isPresent()) {
1✔
2089
      if (info.get().getNonRetryable()) {
1✔
2090
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
2091
      }
2092
      if (info.get().hasNextRetryDelay()) {
1✔
2093
        nextRetryDelay =
1✔
2094
            Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
1✔
2095
      }
2096
    }
2097

2098
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
2099
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
2100
        data.retryState.getBackoffIntervalInSeconds(
1✔
2101
            info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay);
1✔
2102
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2103
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
2104
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
2105
      if (data.heartbeatDetails != null) {
1✔
2106
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
2107
      }
2108
      ctx.onCommit(
1✔
2109
          (historySize) -> {
2110
            data.retryState = nextAttempt;
1✔
2111
            task.setAttempt(nextAttempt.getAttempt());
1✔
2112
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
2113
          });
1✔
2114
    } else {
1✔
2115
      data.nextBackoffInterval = Durations.ZERO;
1✔
2116
    }
2117
    return backoffInterval.getRetryState();
1✔
2118
  }
2119

2120
  private static void reportActivityTaskCancellation(
2121
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
2122
    Payloads details = null;
1✔
2123
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
2124
      {
2125
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
2126

2127
        details = cr.hasDetails() ? cr.getDetails() : null;
1✔
2128
      }
1✔
2129
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1✔
2130
      {
2131
        RespondActivityTaskCanceledByIdRequest cr =
×
2132
            (RespondActivityTaskCanceledByIdRequest) request;
2133
        details = cr.hasDetails() ? cr.getDetails() : null;
×
2134
      }
×
2135
    } else if (request != null) {
1✔
2136
      throw Status.INTERNAL
×
2137
          .withDescription("Unexpected request type: " + request)
×
2138
          .asRuntimeException();
×
2139
    }
2140
    ActivityTaskCanceledEventAttributes.Builder a =
2141
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
2142
            .setScheduledEventId(data.scheduledEventId)
1✔
2143
            .setStartedEventId(data.startedEventId);
1✔
2144
    if (details != null) {
1✔
2145
      a.setDetails(details);
×
2146
    }
2147
    HistoryEvent event =
2148
        HistoryEvent.newBuilder()
1✔
2149
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
2150
            .setActivityTaskCanceledEventAttributes(a)
1✔
2151
            .build();
1✔
2152
    ctx.addEvent(event);
1✔
2153
  }
1✔
2154

2155
  private static void heartbeatActivityTask(
2156
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
2157
    data.heartbeatDetails = details;
1✔
2158
  }
1✔
2159

2160
  private static void acceptUpdate(
2161
      RequestContext ctx,
2162
      UpdateWorkflowExecutionData data,
2163
      Message msg,
2164
      long workflowTaskCompletedEventId) {
2165
    try {
2166
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
2167

2168
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
2169
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
2170
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
2171
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
2172
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
2173
              .setAcceptedRequest(data.initialRequest)
1✔
2174
              .build();
1✔
2175

2176
      HistoryEvent event =
2177
          HistoryEvent.newBuilder()
1✔
2178
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
2179
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
2180
              .build();
1✔
2181
      // If the workflow is finished we can't write more events
2182
      // to history so if the message was processed after the workflow
2183
      // was closed there is nothing we can do.
2184
      // The real server also has this same problem
2185
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2186
        ctx.addEvent(event);
1✔
2187
      }
2188
      ctx.onCommit(
1✔
2189
          (int historySize) -> {
2190
            data.accepted.complete(true);
1✔
2191
          });
1✔
2192
    } catch (InvalidProtocolBufferException e) {
×
2193
      throw new RuntimeException(e);
×
2194
    }
1✔
2195
  }
1✔
2196

2197
  private static void completeUpdate(
2198
      RequestContext ctx,
2199
      UpdateWorkflowExecutionData data,
2200
      Message msg,
2201
      long workflowTaskCompletedEventId) {
2202
    try {
2203
      Response response = msg.getBody().unpack(Response.class);
1✔
2204

2205
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
2206
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
2207
              .setMeta(response.getMeta())
1✔
2208
              .setOutcome(response.getOutcome())
1✔
2209
              .build();
1✔
2210

2211
      HistoryEvent event =
2212
          HistoryEvent.newBuilder()
1✔
2213
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
2214
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
2215
              .build();
1✔
2216
      // If the workflow is finished we can't write more events
2217
      // to history so if the message was processed after the workflow
2218
      // was closed there is nothing we can do.
2219
      // The real server also has this same problem
2220
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1✔
2221
        ctx.addEvent(event);
1✔
2222
      }
2223
      ctx.onCommit(
1✔
2224
          (int historySize) -> {
2225
            data.outcome.complete(response.getOutcome());
1✔
2226
          });
1✔
2227
    } catch (InvalidProtocolBufferException e) {
×
2228
      throw new RuntimeException(e);
×
2229
    }
1✔
2230
  }
1✔
2231

2232
  private static void startTimer(
2233
      RequestContext ctx,
2234
      TimerData data,
2235
      StartTimerCommandAttributes d,
2236
      long workflowTaskCompletedEventId) {
2237
    TimerStartedEventAttributes.Builder a =
2238
        TimerStartedEventAttributes.newBuilder()
1✔
2239
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2240
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
2241
            .setTimerId(d.getTimerId());
1✔
2242
    HistoryEvent event =
2243
        HistoryEvent.newBuilder()
1✔
2244
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
2245
            .setTimerStartedEventAttributes(a)
1✔
2246
            .build();
1✔
2247
    long startedEventId = ctx.addEvent(event);
1✔
2248
    ctx.onCommit(
1✔
2249
        (historySize) -> {
2250
          data.startedEvent = a.build();
1✔
2251
          data.startedEventId = startedEventId;
1✔
2252
        });
1✔
2253
  }
1✔
2254

2255
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
2256
    TimerFiredEventAttributes.Builder a =
2257
        TimerFiredEventAttributes.newBuilder()
1✔
2258
            .setTimerId(data.startedEvent.getTimerId())
1✔
2259
            .setStartedEventId(data.startedEventId);
1✔
2260
    HistoryEvent event =
2261
        HistoryEvent.newBuilder()
1✔
2262
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
2263
            .setTimerFiredEventAttributes(a)
1✔
2264
            .build();
1✔
2265
    ctx.addEvent(event);
1✔
2266
  }
1✔
2267

2268
  private static void cancelTimer(
2269
      RequestContext ctx,
2270
      TimerData data,
2271
      CancelTimerCommandAttributes d,
2272
      long workflowTaskCompletedEventId) {
2273
    TimerCanceledEventAttributes.Builder a =
2274
        TimerCanceledEventAttributes.newBuilder()
1✔
2275
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2276
            .setTimerId(d.getTimerId())
1✔
2277
            .setStartedEventId(data.startedEventId);
1✔
2278
    HistoryEvent event =
2279
        HistoryEvent.newBuilder()
1✔
2280
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
2281
            .setTimerCanceledEventAttributes(a)
1✔
2282
            .build();
1✔
2283
    ctx.addEvent(event);
1✔
2284
  }
1✔
2285

2286
  private static void initiateExternalSignal(
2287
      RequestContext ctx,
2288
      SignalExternalData data,
2289
      SignalExternalWorkflowExecutionCommandAttributes d,
2290
      long workflowTaskCompletedEventId) {
2291
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2292
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2293
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2294
            .setControl(d.getControl())
1✔
2295
            .setInput(d.getInput())
1✔
2296
            .setNamespace(d.getNamespace())
1✔
2297
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2298
            .setSignalName(d.getSignalName())
1✔
2299
            .setWorkflowExecution(d.getExecution());
1✔
2300

2301
    HistoryEvent event =
2302
        HistoryEvent.newBuilder()
1✔
2303
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2304
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2305
            .build();
1✔
2306
    long initiatedEventId = ctx.addEvent(event);
1✔
2307
    ctx.onCommit(
1✔
2308
        (historySize) -> {
2309
          data.initiatedEventId = initiatedEventId;
1✔
2310
          data.initiatedEvent = a.build();
1✔
2311
        });
1✔
2312
  }
1✔
2313

2314
  private static void failExternalSignal(
2315
      RequestContext ctx,
2316
      SignalExternalData data,
2317
      SignalExternalWorkflowExecutionFailedCause cause,
2318
      long notUsed) {
2319
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2320
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
2321
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
2322
            .setInitiatedEventId(data.initiatedEventId)
1✔
2323
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
2324
            .setControl(initiatedEvent.getControl())
1✔
2325
            .setCause(cause)
1✔
2326
            .setNamespace(initiatedEvent.getNamespace());
1✔
2327
    HistoryEvent event =
2328
        HistoryEvent.newBuilder()
1✔
2329
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
2330
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
2331
            .build();
1✔
2332
    ctx.addEvent(event);
1✔
2333
  }
1✔
2334

2335
  private static void completeExternalSignal(
2336
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
2337
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2338
    WorkflowExecution signaledExecution =
1✔
2339
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
2340
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
2341
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
2342
            .setInitiatedEventId(data.initiatedEventId)
1✔
2343
            .setWorkflowExecution(signaledExecution)
1✔
2344
            .setControl(initiatedEvent.getControl())
1✔
2345
            .setNamespace(initiatedEvent.getNamespace());
1✔
2346
    HistoryEvent event =
2347
        HistoryEvent.newBuilder()
1✔
2348
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
2349
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
2350
            .build();
1✔
2351
    ctx.addEvent(event);
1✔
2352
  }
1✔
2353

2354
  private static void initiateExternalCancellation(
2355
      RequestContext ctx,
2356
      CancelExternalData data,
2357
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
2358
      long workflowTaskCompletedEventId) {
2359
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2360
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2361
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2362
            .setControl(d.getControl())
1✔
2363
            .setNamespace(d.getNamespace())
1✔
2364
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2365
            .setWorkflowExecution(
1✔
2366
                WorkflowExecution.newBuilder()
1✔
2367
                    .setWorkflowId(d.getWorkflowId())
1✔
2368
                    .setRunId(d.getRunId())
1✔
2369
                    .build());
1✔
2370

2371
    HistoryEvent event =
2372
        HistoryEvent.newBuilder()
1✔
2373
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2374
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2375
            .build();
1✔
2376
    long initiatedEventId = ctx.addEvent(event);
1✔
2377
    ctx.onCommit(
1✔
2378
        (historySize) -> {
2379
          data.initiatedEventId = initiatedEventId;
1✔
2380
          data.initiatedEvent = a.build();
1✔
2381
        });
1✔
2382
  }
1✔
2383

2384
  private static void reportExternalCancellationRequested(
2385
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2386
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2387
        data.initiatedEvent;
2388
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2389
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2390
            .setInitiatedEventId(data.initiatedEventId)
1✔
2391
            .setWorkflowExecution(
1✔
2392
                WorkflowExecution.newBuilder()
1✔
2393
                    .setRunId(runId)
1✔
2394
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2395
                    .build())
1✔
2396
            .setNamespace(initiatedEvent.getNamespace());
1✔
2397
    HistoryEvent event =
2398
        HistoryEvent.newBuilder()
1✔
2399
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2400
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2401
            .build();
1✔
2402
    ctx.addEvent(event);
1✔
2403
  }
1✔
2404

2405
  private static void failExternalCancellation(
2406
      RequestContext ctx,
2407
      CancelExternalData data,
2408
      CancelExternalWorkflowExecutionFailedCause cause,
2409
      long notUsed) {
2410
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2411
        data.initiatedEvent;
2412
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2413
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2414
            .setInitiatedEventId(data.initiatedEventId)
×
2415
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2416
            .setControl(initiatedEvent.getControl())
×
2417
            .setCause(cause)
×
2418
            .setNamespace(initiatedEvent.getNamespace());
×
2419
    HistoryEvent event =
2420
        HistoryEvent.newBuilder()
×
2421
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2422
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2423
            .build();
×
2424
    ctx.addEvent(event);
×
2425
  }
×
2426

2427
  // Mimics the default activity retry policy of a standard Temporal server.
2428
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2429
    Duration initialInterval =
2430
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2431
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2432
            : originalPolicy.getInitialInterval();
1✔
2433

2434
    return RetryPolicy.newBuilder()
1✔
2435
        .setInitialInterval(initialInterval)
1✔
2436
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2437
        .setMaximumInterval(
1✔
2438
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2439
                ? Durations.fromMillis(
1✔
2440
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2441
                        * Durations.toMillis(initialInterval))
1✔
2442
                : originalPolicy.getMaximumInterval())
1✔
2443
        .setBackoffCoefficient(
1✔
2444
            originalPolicy.getBackoffCoefficient() == 0
1✔
2445
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2446
                : originalPolicy.getBackoffCoefficient())
1✔
2447
        .setMaximumAttempts(
1✔
2448
            originalPolicy.getMaximumAttempts() == 0
1✔
2449
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2450
                : originalPolicy.getMaximumAttempts())
1✔
2451
        .build();
1✔
2452
  }
2453

2454
  static RetryPolicy defaultNexusRetryPolicy() {
2455
    return RetryPolicy.newBuilder()
1✔
2456
        .addAllNonRetryableErrorTypes(
1✔
2457
            Arrays.asList("INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
1✔
2458
        .setInitialInterval(Durations.fromSeconds(1))
1✔
2459
        .setMaximumInterval(Durations.fromHours(1))
1✔
2460
        .setBackoffCoefficient(2.0)
1✔
2461
        .build();
1✔
2462
  }
2463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc