• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #343

31 Oct 2024 06:31PM UTC coverage: 75.148% (-3.6%) from 78.794%
#343

push

github

web-flow
Fix jacoco coverage (#2304)

5139 of 8240 branches covered (62.37%)

Branch coverage included in aggregate %.

22841 of 28993 relevant lines covered (78.78%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.22
/temporal-test-server/src/main/java/io/temporal/internal/testservice/StateMachines.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.testservice;
22

23
import static io.temporal.internal.common.LinkConverter.*;
24
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
25
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
26
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
27
import static io.temporal.internal.testservice.StateMachines.Action.FAIL;
28
import static io.temporal.internal.testservice.StateMachines.Action.INITIATE;
29
import static io.temporal.internal.testservice.StateMachines.Action.QUERY;
30
import static io.temporal.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
31
import static io.temporal.internal.testservice.StateMachines.Action.START;
32
import static io.temporal.internal.testservice.StateMachines.Action.TERMINATE;
33
import static io.temporal.internal.testservice.StateMachines.Action.TIME_OUT;
34
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE;
35
import static io.temporal.internal.testservice.StateMachines.Action.UPDATE_WORKFLOW_EXECUTION;
36
import static io.temporal.internal.testservice.StateMachines.State.CANCELED;
37
import static io.temporal.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
38
import static io.temporal.internal.testservice.StateMachines.State.COMPLETED;
39
import static io.temporal.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
40
import static io.temporal.internal.testservice.StateMachines.State.FAILED;
41
import static io.temporal.internal.testservice.StateMachines.State.INITIATED;
42
import static io.temporal.internal.testservice.StateMachines.State.NONE;
43
import static io.temporal.internal.testservice.StateMachines.State.STARTED;
44
import static io.temporal.internal.testservice.StateMachines.State.TERMINATED;
45
import static io.temporal.internal.testservice.StateMachines.State.TIMED_OUT;
46

47
import com.google.common.base.Preconditions;
48
import com.google.common.base.Strings;
49
import com.google.protobuf.*;
50
import com.google.protobuf.util.Durations;
51
import com.google.protobuf.util.Timestamps;
52
import io.grpc.Status;
53
import io.grpc.StatusRuntimeException;
54
import io.temporal.api.command.v1.*;
55
import io.temporal.api.common.v1.*;
56
import io.temporal.api.enums.v1.*;
57
import io.temporal.api.errordetails.v1.QueryFailedFailure;
58
import io.temporal.api.failure.v1.ApplicationFailureInfo;
59
import io.temporal.api.failure.v1.Failure;
60
import io.temporal.api.failure.v1.NexusOperationFailureInfo;
61
import io.temporal.api.failure.v1.TimeoutFailureInfo;
62
import io.temporal.api.history.v1.*;
63
import io.temporal.api.nexus.v1.*;
64
import io.temporal.api.nexus.v1.Link;
65
import io.temporal.api.protocol.v1.Message;
66
import io.temporal.api.query.v1.WorkflowQueryResult;
67
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
68
import io.temporal.api.taskqueue.v1.TaskQueue;
69
import io.temporal.api.update.v1.*;
70
import io.temporal.api.update.v1.Request;
71
import io.temporal.api.update.v1.Response;
72
import io.temporal.api.workflowservice.v1.*;
73
import io.temporal.internal.common.ProtobufTimeUtils;
74
import io.temporal.internal.testservice.TestWorkflowMutableStateImpl.UpdateWorkflowExecution;
75
import io.temporal.internal.testservice.TestWorkflowStore.ActivityTask;
76
import io.temporal.internal.testservice.TestWorkflowStore.TaskQueueId;
77
import io.temporal.internal.testservice.TestWorkflowStore.WorkflowTask;
78
import io.temporal.serviceclient.StatusUtils;
79
import io.temporal.workflow.Functions;
80
import java.util.*;
81
import java.util.concurrent.CompletableFuture;
82
import java.util.concurrent.ForkJoinPool;
83
import javax.annotation.Nonnull;
84
import org.slf4j.Logger;
85
import org.slf4j.LoggerFactory;
86

87
class StateMachines {
×
88

89
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
90

91
  public static final long DEFAULT_WORKFLOW_EXECUTION_TIMEOUT_MILLISECONDS =
92
      10L * 365 * 24 * 3600 * 1000;
93
  public static final long DEFAULT_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 10L * 1000;
94
  public static final long MAX_WORKFLOW_TASK_TIMEOUT_MILLISECONDS = 60L * 1000;
95
  static final Duration DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL = Durations.fromSeconds(1);
1✔
96
  static final double DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT = 2.0;
97
  static final int DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS = 0;
98
  static final int DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT = 100;
99
  static final int NO_EVENT_ID = -1;
100

101
  enum State {
1✔
102
    NONE,
1✔
103
    INITIATED,
1✔
104
    STARTED,
1✔
105
    FAILED,
1✔
106
    TIMED_OUT,
1✔
107
    CANCELLATION_REQUESTED,
1✔
108
    CANCELED,
1✔
109
    COMPLETED,
1✔
110
    CONTINUED_AS_NEW,
1✔
111
    TERMINATED,
1✔
112
  }
113

114
  enum Action {
1✔
115
    INITIATE,
1✔
116
    START,
1✔
117
    FAIL,
1✔
118
    TIME_OUT,
1✔
119
    REQUEST_CANCELLATION,
1✔
120
    CANCEL,
1✔
121
    TERMINATE,
1✔
122
    UPDATE,
1✔
123
    COMPLETE,
1✔
124
    CONTINUE_AS_NEW,
1✔
125
    QUERY,
1✔
126
    UPDATE_WORKFLOW_EXECUTION,
1✔
127
  }
128

129
  static final class WorkflowData {
130
    Optional<TestServiceRetryState> retryState;
131
    Duration backoffStartInterval;
132
    String cronSchedule;
133
    Payloads lastCompletionResult;
134
    Optional<Failure> lastFailure;
135

136
    /**
137
     * @see WorkflowExecutionStartedEventAttributes#getFirstExecutionRunId()
138
     */
139
    final @Nonnull String firstExecutionRunId;
140

141
    /**
142
     * @see WorkflowExecutionStartedEventAttributes#getOriginalExecutionRunId()
143
     */
144
    final @Nonnull String originalExecutionRunId;
145

146
    /** RunId that was continued by this run as a result of Retry or Continue-As-New. */
147
    Optional<String> continuedExecutionRunId;
148

149
    Functions.Proc runTimerCancellationHandle;
150

151
    WorkflowData(
152
        Optional<TestServiceRetryState> retryState,
153
        Duration backoffStartInterval,
154
        String cronSchedule,
155
        Payloads lastCompletionResult,
156
        Optional<Failure> lastFailure,
157
        @Nonnull String firstExecutionRunId,
158
        @Nonnull String originalExecutionRunId,
159
        Optional<String> continuedExecutionRunId) {
1✔
160
      this.retryState = retryState;
1✔
161
      this.backoffStartInterval = backoffStartInterval;
1✔
162
      this.cronSchedule = cronSchedule;
1✔
163
      this.lastCompletionResult = lastCompletionResult;
1✔
164
      this.firstExecutionRunId =
1✔
165
          Preconditions.checkNotNull(firstExecutionRunId, "firstExecutionRunId");
1✔
166
      this.originalExecutionRunId =
1✔
167
          Preconditions.checkNotNull(originalExecutionRunId, "originalExecutionRunId");
1✔
168
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
169
      this.lastFailure = Objects.requireNonNull(lastFailure);
1✔
170
    }
1✔
171

172
    @Override
173
    public String toString() {
174
      return "WorkflowData{"
×
175
          + "retryState="
176
          + retryState
177
          + ", backoffStartInterval="
178
          + backoffStartInterval
179
          + ", cronSchedule='"
180
          + cronSchedule
181
          + '\''
182
          + ", lastCompletionResult="
183
          + lastCompletionResult
184
          + ", firstExecutionRunId='"
185
          + firstExecutionRunId
186
          + '\''
187
          + ", originalExecutionRunId='"
188
          + originalExecutionRunId
189
          + '\''
190
          + ", continuedExecutionRunId="
191
          + continuedExecutionRunId
192
          + '}';
193
    }
194
  }
195

196
  static final class WorkflowTaskData {
197

198
    final TestWorkflowStore store;
199

200
    boolean workflowCompleted;
201

202
    /** id of the last started event which completed successfully */
203
    long lastSuccessfulStartedEventId;
204

205
    final StartWorkflowExecutionRequest startRequest;
206

207
    long startedEventId = NO_EVENT_ID;
1✔
208

209
    PollWorkflowTaskQueueResponse.Builder workflowTask;
210

211
    /**
212
     * Events that are added during execution of a workflow task. They have to be buffered to be
213
     * added after the events generated by a workflow task. Without this the determinism will be
214
     * broken on replay.
215
     */
216
    final List<RequestContext> bufferedEvents = new ArrayList<>();
1✔
217

218
    /**
219
     * Update requests that are added during execution of a workflow task. They have to be buffered
220
     * to be added to the next workflow task.
221
     */
222
    final Map<String, UpdateWorkflowExecution> updateRequestBuffer = new LinkedHashMap<>();
1✔
223

224
    final Map<String, UpdateWorkflowExecution> updateRequest = new LinkedHashMap<>();
1✔
225

226
    long scheduledEventId = NO_EVENT_ID;
1✔
227

228
    int attempt = 0;
1✔
229

230
    /** Query requests received during workflow task processing (after start) */
231
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryBuffer = new HashMap<>();
1✔
232

233
    final Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> consistentQueryRequests =
1✔
234
        new HashMap<>();
235

236
    WorkflowTaskData(TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
1✔
237
      this.store = store;
1✔
238
      this.startRequest = startRequest;
1✔
239
    }
1✔
240

241
    void clear() {
242
      startedEventId = NO_EVENT_ID;
1✔
243
      workflowTask = null;
1✔
244
      scheduledEventId = NO_EVENT_ID;
1✔
245
      attempt = 0;
1✔
246
    }
1✔
247

248
    Optional<UpdateWorkflowExecution> getUpdateRequest(String protocolInstanceId) {
249
      return Optional.ofNullable(
1✔
250
          updateRequest.getOrDefault(
1✔
251
              protocolInstanceId, updateRequestBuffer.get(protocolInstanceId)));
1✔
252
    }
253

254
    @Override
255
    public String toString() {
256
      return "WorkflowTaskData{"
×
257
          + "store="
258
          + store
259
          + ", workflowCompleted="
260
          + workflowCompleted
261
          + ", lastSuccessfulStartedEventId="
262
          + lastSuccessfulStartedEventId
263
          + ", startRequest="
264
          + startRequest
265
          + ", startedEventId="
266
          + startedEventId
267
          + ", workflowTask="
268
          + workflowTask
269
          + ", bufferedEvents="
270
          + bufferedEvents
271
          + ", scheduledEventId="
272
          + scheduledEventId
273
          + ", attempt="
274
          + attempt
275
          + ", queryBuffer="
276
          + queryBuffer
277
          + ", consistentQueryRequests="
278
          + consistentQueryRequests
279
          + ", updateRequest="
280
          + updateRequest
281
          + ", updateRequestBuffer="
282
          + updateRequestBuffer
283
          + '}';
284
    }
285
  }
286

287
  static final class ActivityTaskData {
288

289
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
290
    ActivityTaskScheduledEventAttributes scheduledEvent;
291
    ActivityTask activityTask;
292

293
    final TestWorkflowStore store;
294

295
    long scheduledEventId = NO_EVENT_ID;
1✔
296
    long startedEventId = NO_EVENT_ID;
1✔
297
    public HistoryEvent startedEvent;
298
    Payloads heartbeatDetails;
299
    long lastHeartbeatTime;
300
    TestServiceRetryState retryState;
301
    Duration nextBackoffInterval;
302
    String identity;
303

304
    ActivityTaskData(
305
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
306
      this.store = store;
1✔
307
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
308
    }
1✔
309

310
    @Override
311
    public String toString() {
312
      return "ActivityTaskData{"
×
313
          + "startWorkflowExecutionRequest="
314
          + startWorkflowExecutionRequest
315
          + ", scheduledEvent="
316
          + scheduledEvent
317
          + ", activityTask="
318
          + activityTask
319
          + ", store="
320
          + store
321
          + ", scheduledEventId="
322
          + scheduledEventId
323
          + ", startedEventId="
324
          + startedEventId
325
          + ", startedEvent="
326
          + startedEvent
327
          + ", heartbeatDetails="
328
          + heartbeatDetails
329
          + ", lastHeartbeatTime="
330
          + lastHeartbeatTime
331
          + ", retryState="
332
          + retryState
333
          + ", nextBackoffInterval="
334
          + nextBackoffInterval
335
          + '}';
336
    }
337

338
    public int getAttempt() {
339
      return retryState != null ? retryState.getAttempt() : 1;
1✔
340
    }
341
  }
342

343
  static final class NexusOperationData {
344
    // Timeout for an individual Start or Cancel Operation request.
345
    final Duration requestTimeout = Durations.fromSeconds(10);
1✔
346

347
    String operationId = "";
1✔
348
    Endpoint endpoint;
349
    NexusOperationScheduledEventAttributes scheduledEvent;
350
    TestWorkflowStore.NexusTask nexusTask;
351
    RetryPolicy retryPolicy = defaultNexusRetryPolicy();
1✔
352

353
    long scheduledEventId = NO_EVENT_ID;
1✔
354
    Timestamp cancelRequestedTime;
355

356
    TestServiceRetryState retryState;
357
    boolean isBackingOff = false;
1✔
358
    Duration nextBackoffInterval;
359
    Timestamp lastAttemptCompleteTime;
360
    Timestamp nextAttemptScheduleTime;
361
    String identity;
362

363
    public NexusOperationData(Endpoint endpoint) {
1✔
364
      this.endpoint = endpoint;
1✔
365
    }
1✔
366

367
    public int getAttempt() {
368
      return retryState != null ? retryState.getAttempt() : 1;
1✔
369
    }
370

371
    @Override
372
    public String toString() {
373
      return "NexusOperationData{"
×
374
          + ", nexusEndpoint="
375
          + endpoint
376
          + ", scheduledEvent="
377
          + scheduledEvent
378
          + ", nexusTask="
379
          + nexusTask
380
          + ", scheduledEventId="
381
          + scheduledEventId
382
          + ", retryState="
383
          + retryState
384
          + ", nextBackoffInterval="
385
          + nextBackoffInterval
386
          + '}';
387
    }
388
  }
389

390
  static final class SignalExternalData {
1✔
391
    long initiatedEventId = NO_EVENT_ID;
1✔
392
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
393

394
    @Override
395
    public String toString() {
396
      return "SignalExternalData{"
×
397
          + "initiatedEventId="
398
          + initiatedEventId
399
          + ", initiatedEvent="
400
          + initiatedEvent
401
          + '}';
402
    }
403
  }
404

405
  static final class CancelExternalData {
1✔
406
    long initiatedEventId = NO_EVENT_ID;
1✔
407
    public RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
408

409
    @Override
410
    public String toString() {
411
      return "CancelExternalData{"
×
412
          + "initiatedEventId="
413
          + initiatedEventId
414
          + ", initiatedEvent="
415
          + initiatedEvent
416
          + '}';
417
    }
418
  }
419

420
  static final class ChildWorkflowData {
421

422
    final TestWorkflowService service;
423
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
424
    long initiatedEventId;
425
    long startedEventId;
426
    WorkflowExecution execution;
427

428
    public ChildWorkflowData(TestWorkflowService service) {
1✔
429
      this.service = service;
1✔
430
    }
1✔
431

432
    @Override
433
    public String toString() {
434
      return "ChildWorkflowData{"
×
435
          + "service="
436
          + service
437
          + ", initiatedEvent="
438
          + initiatedEvent
439
          + ", initiatedEventId="
440
          + initiatedEventId
441
          + ", startedEventId="
442
          + startedEventId
443
          + ", execution="
444
          + execution
445
          + '}';
446
    }
447
  }
448

449
  static final class TimerData {
1✔
450
    TimerStartedEventAttributes startedEvent;
451
    public long startedEventId;
452

453
    @Override
454
    public String toString() {
455
      return "TimerData{"
×
456
          + "startedEvent="
457
          + startedEvent
458
          + ", startedEventId="
459
          + startedEventId
460
          + '}';
461
    }
462
  }
463

464
  /** Represents an accepted update workflow execution request */
465
  static final class UpdateWorkflowExecutionData {
466
    final String id;
467
    final CompletableFuture<Boolean> accepted;
468
    final CompletableFuture<Outcome> outcome;
469
    final Request initialRequest;
470

471
    public UpdateWorkflowExecutionData(
472
        String id,
473
        Request initialRequest,
474
        CompletableFuture<Boolean> accepted,
475
        CompletableFuture<Outcome> outcome) {
1✔
476
      this.id = id;
1✔
477
      this.initialRequest = initialRequest;
1✔
478
      this.accepted = accepted;
1✔
479
      this.outcome = outcome;
1✔
480
    }
1✔
481

482
    @Override
483
    public String toString() {
484
      return "UpdateWorkflowExecutionData{" + "ID=" + id + '}';
×
485
    }
486
  }
487

488
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
489
    return new StateMachine<>(data)
1✔
490
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
491
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
492
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
493
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
494
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
495
        .add(
1✔
496
            STARTED,
497
            REQUEST_CANCELLATION,
498
            CANCELLATION_REQUESTED,
499
            StateMachines::requestWorkflowCancellation)
500
        .add(STARTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
501
        .add(
1✔
502
            CANCELLATION_REQUESTED,
503
            REQUEST_CANCELLATION,
504
            CANCELLATION_REQUESTED,
505
            StateMachines::noop)
506
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
507
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
508
        .add(CANCELLATION_REQUESTED, TERMINATE, TERMINATED, StateMachines::terminateWorkflow)
1✔
509
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
510
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
511
  }
512

513
  static StateMachine<WorkflowTaskData> newWorkflowTaskStateMachine(
514
      TestWorkflowStore store, StartWorkflowExecutionRequest startRequest) {
515
    return new StateMachine<>(new WorkflowTaskData(store, startRequest))
1✔
516
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleWorkflowTask)
1✔
517
        // TODO(maxim): Uncomment once the server supports consistent query only workflow tasks
518
        //        .add(NONE, QUERY, INITIATED_QUERY_ONLY, StateMachines::scheduleQueryWorkflowTask)
519
        //        .add(INITIATED_QUERY_ONLY, QUERY, INITIATED_QUERY_ONLY,
520
        // StateMachines::queryWhileScheduled)
521
        //        .add(
522
        //            INITIATED_QUERY_ONLY,
523
        //            INITIATE,
524
        //            INITIATED,
525
        //            StateMachines::convertQueryWorkflowTaskToReal)
526
        //        .add(
527
        //            INITIATED_QUERY_ONLY,
528
        //            START,
529
        //            STARTED_QUERY_ONLY,
530
        //            StateMachines::startQueryOnlyWorkflowTask)
531
        //        .add(STARTED_QUERY_ONLY, INITIATE, STARTED_QUERY_ONLY,
532
        // StateMachines::needsWorkflowTask)
533
        //        .add(STARTED_QUERY_ONLY, QUERY, STARTED_QUERY_ONLY,
534
        // StateMachines::needsWorkflowTaskDueToQuery)
535
        //        .add(STARTED_QUERY_ONLY, FAIL, NONE, StateMachines::failQueryWorkflowTask)
536
        //        .add(STARTED_QUERY_ONLY, TIME_OUT, NONE, StateMachines::failQueryWorkflowTask)
537
        //        .add(STARTED_QUERY_ONLY, COMPLETE, NONE, StateMachines::completeQuery)
538
        .add(STARTED, QUERY, STARTED, StateMachines::bufferQuery)
1✔
539
        .add(STARTED, UPDATE_WORKFLOW_EXECUTION, STARTED, StateMachines::bufferUpdate)
1✔
540
        .add(INITIATED, INITIATE, INITIATED, StateMachines::noop)
1✔
541
        .add(INITIATED, QUERY, INITIATED, StateMachines::queryWhileScheduled)
1✔
542
        .add(INITIATED, UPDATE_WORKFLOW_EXECUTION, INITIATED, StateMachines::addUpdate)
1✔
543
        .add(INITIATED, START, STARTED, StateMachines::startWorkflowTask)
1✔
544
        .add(STARTED, COMPLETE, NONE, StateMachines::completeWorkflowTask)
1✔
545
        .add(STARTED, FAIL, NONE, StateMachines::failWorkflowTask)
1✔
546
        .add(STARTED, TIME_OUT, NONE, StateMachines::timeoutWorkflowTask)
1✔
547
        .add(STARTED, INITIATE, STARTED, StateMachines::needsWorkflowTask);
1✔
548
  }
549

550
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
551
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
552
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
553
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
554
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
555
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
556
        .add(
1✔
557
            INITIATED,
558
            REQUEST_CANCELLATION,
559
            CANCELLATION_REQUESTED,
560
            StateMachines::requestActivityCancellation)
561
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
562
        // Transitions to initiated in case of a retry
563
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
564
        // Transitions to initiated in case of a retry
565
        .add(
1✔
566
            STARTED,
567
            TIME_OUT,
568
            new State[] {TIMED_OUT, INITIATED},
569
            StateMachines::timeoutActivityTask)
570
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
571
        .add(
1✔
572
            STARTED,
573
            REQUEST_CANCELLATION,
574
            CANCELLATION_REQUESTED,
575
            StateMachines::requestActivityCancellation)
576
        .add(
1✔
577
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
578
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
579
        .add(
1✔
580
            CANCELLATION_REQUESTED,
581
            UPDATE,
582
            CANCELLATION_REQUESTED,
583
            StateMachines::heartbeatActivityTask)
584
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
585
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
586
  }
587

588
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
589
      TestWorkflowService service) {
590
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
591
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
592
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
593
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
594
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
595
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
596
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
597
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
598
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
599
  }
600

601
  public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
602
      String updateId,
603
      Request initialRequest,
604
      CompletableFuture<Boolean> accepted,
605
      CompletableFuture<Outcome> outcome) {
606
    return new StateMachine<>(
1✔
607
            new UpdateWorkflowExecutionData(updateId, initialRequest, accepted, outcome))
608
        .add(NONE, START, STARTED, StateMachines::acceptUpdate)
1✔
609
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeUpdate);
1✔
610
  }
611

612
  public static StateMachine<NexusOperationData> newNexusOperation(Endpoint endpoint) {
613
    return new StateMachine<>(new NexusOperationData(endpoint))
1✔
614
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleNexusOperation)
1✔
615
        .add(INITIATED, START, STARTED, StateMachines::startNexusOperation)
1✔
616
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
617
        // TODO: properly support cancel before start
618
        // .add(
619
        //     INITIATED,
620
        //     REQUEST_CANCELLATION,
621
        //     INITIATED,
622
        //     StateMachines::requestCancelNexusOperation)
623
        .add(INITIATED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation)
1✔
624
        // Transitions directly from INITIATED to COMPLETE for sync completions
625
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
626
        // Transitions to INITIATED in case of a retry
627
        .add(INITIATED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failNexusOperation)
1✔
628
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeNexusOperation)
1✔
629
        // Transitions back to STARTED in case of a retry
630
        .add(STARTED, FAIL, new State[] {FAILED, STARTED}, StateMachines::failNexusOperation)
1✔
631
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutNexusOperation)
1✔
632
        .add(STARTED, REQUEST_CANCELLATION, STARTED, StateMachines::requestCancelNexusOperation)
1✔
633
        .add(STARTED, CANCEL, CANCELED, StateMachines::reportNexusOperationCancellation);
1✔
634
  }
635

636
  public static StateMachine<TimerData> newTimerStateMachine() {
637
    return new StateMachine<>(new TimerData())
1✔
638
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
639
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
640
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
641
  }
642

643
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
644
    return new StateMachine<>(new SignalExternalData())
1✔
645
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
646
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
647
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
648
  }
649

650
  public static StateMachine<CancelExternalData> newCancelExternalStateMachine() {
651
    return new StateMachine<>(new CancelExternalData())
1✔
652
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalCancellation)
1✔
653
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalCancellation)
1✔
654
        .add(INITIATED, START, STARTED, StateMachines::reportExternalCancellationRequested);
1✔
655
  }
656

657
  private static <T, A> void noop(RequestContext ctx, T data, A a, long notUsed) {}
1✔
658

659
  private static void scheduleNexusOperation(
660
      RequestContext ctx,
661
      NexusOperationData data,
662
      ScheduleNexusOperationCommandAttributes attr,
663
      long workflowTaskCompletedId) {
664
    Duration expirationInterval = attr.getScheduleToCloseTimeout();
1✔
665
    Timestamp expirationTime =
666
        (attr.hasScheduleToCloseTimeout()
1✔
667
                && Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0)
1✔
668
            ? Timestamps.add(ctx.currentTime(), expirationInterval)
1✔
669
            : Timestamp.getDefaultInstance();
1✔
670
    TestServiceRetryState retryState = new TestServiceRetryState(data.retryPolicy, expirationTime);
1✔
671

672
    NexusOperationScheduledEventAttributes.Builder a =
673
        NexusOperationScheduledEventAttributes.newBuilder()
1✔
674
            .setEndpoint(attr.getEndpoint())
1✔
675
            .setEndpointId(data.endpoint.getId())
1✔
676
            .setService(attr.getService())
1✔
677
            .setOperation(attr.getOperation())
1✔
678
            .setInput(attr.getInput())
1✔
679
            .setScheduleToCloseTimeout(attr.getScheduleToCloseTimeout())
1✔
680
            .putAllNexusHeader(attr.getNexusHeaderMap())
1✔
681
            .setRequestId(UUID.randomUUID().toString())
1✔
682
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedId);
1✔
683

684
    data.scheduledEvent = a.build();
1✔
685
    HistoryEvent event =
686
        HistoryEvent.newBuilder()
1✔
687
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED)
1✔
688
            .setNexusOperationScheduledEventAttributes(a)
1✔
689
            .build();
1✔
690

691
    long scheduledEventId = ctx.addEvent(event);
1✔
692
    NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId);
1✔
693
    NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);
1✔
694

695
    Link link =
696
        workflowEventToNexusLink(
1✔
697
            io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
1✔
698
                .setNamespace(ctx.getNamespace())
1✔
699
                .setWorkflowId(ctx.getExecution().getWorkflowId())
1✔
700
                .setRunId(ctx.getExecution().getRunId())
1✔
701
                .setEventRef(
1✔
702
                    io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder()
1✔
703
                        .setEventId(scheduledEventId)
1✔
704
                        .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED))
1✔
705
                .build());
1✔
706

707
    PollNexusTaskQueueResponse.Builder pollResponse =
708
        PollNexusTaskQueueResponse.newBuilder()
1✔
709
            .setTaskToken(taskToken.toBytes())
1✔
710
            .setRequest(
1✔
711
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
712
                    .setScheduledTime(ctx.currentTime())
1✔
713
                    .putAllHeader(attr.getNexusHeaderMap())
1✔
714
                    .setStartOperation(
1✔
715
                        StartOperationRequest.newBuilder()
1✔
716
                            .setService(attr.getService())
1✔
717
                            .setOperation(attr.getOperation())
1✔
718
                            .setPayload(attr.getInput())
1✔
719
                            .addLinks(link)
1✔
720
                            .setCallback("http://test-env/operations")
1✔
721
                            .setRequestId(UUID.randomUUID().toString())
1✔
722
                            // The test server uses this to lookup the operation
723
                            .putCallbackHeader(
1✔
724
                                "operation-reference", ref.toBytes().toStringUtf8())));
1✔
725

726
    TaskQueueId taskQueueId =
1✔
727
        new TaskQueueId(
728
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
729
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
730
    TestWorkflowStore.NexusTask task =
1✔
731
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
732

733
    // Test server only supports worker targets, so just push directly to Nexus task queue without
734
    // invoking Nexus client.
735
    ctx.addNexusTask(task);
1✔
736
    ctx.onCommit(
1✔
737
        historySize -> {
738
          data.scheduledEventId = scheduledEventId;
1✔
739
          data.nexusTask = task;
1✔
740
          data.retryState = retryState;
1✔
741
        });
1✔
742
  }
1✔
743

744
  private static void startNexusOperation(
745
      RequestContext ctx,
746
      NexusOperationData data,
747
      StartOperationResponse.Async resp,
748
      long notUsed) {
749
    HistoryEvent.Builder event =
750
        HistoryEvent.newBuilder()
1✔
751
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
1✔
752
            .setNexusOperationStartedEventAttributes(
1✔
753
                NexusOperationStartedEventAttributes.newBuilder()
1✔
754
                    .setOperationId(resp.getOperationId())
1✔
755
                    .setScheduledEventId(data.scheduledEventId)
1✔
756
                    .setRequestId(data.scheduledEvent.getRequestId()));
1✔
757

758
    for (Link l : resp.getLinksList()) {
1✔
759
      if (!l.getType()
1✔
760
          .equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
1!
761
        continue;
×
762
      }
763
      event.addLinks(nexusLinkToWorkflowEvent(l));
1✔
764
    }
1✔
765

766
    ctx.addEvent(event.build());
1✔
767
    ctx.onCommit(historySize -> data.operationId = resp.getOperationId());
1✔
768
  }
1✔
769

770
  private static void completeNexusOperation(
771
      RequestContext ctx, NexusOperationData data, Payload result, long notUsed) {
772
    ctx.addEvent(
1✔
773
        HistoryEvent.newBuilder()
1✔
774
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_COMPLETED)
1✔
775
            .setNexusOperationCompletedEventAttributes(
1✔
776
                NexusOperationCompletedEventAttributes.newBuilder()
1✔
777
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
778
                    .setScheduledEventId(data.scheduledEventId)
1✔
779
                    .setResult(result))
1✔
780
            .build());
1✔
781
  }
1✔
782

783
  private static void timeoutNexusOperation(
784
      RequestContext ctx, NexusOperationData data, TimeoutType timeoutType, long notUsed) {
785
    if (timeoutType != TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) {
1!
786
      throw new IllegalArgumentException(
×
787
          "Timeout type not supported for Nexus operations: " + timeoutType);
788
    }
789

790
    Failure failure =
791
        Failure.newBuilder()
1✔
792
            .setMessage("nexus operation completed unsuccessfully")
1✔
793
            .setNexusOperationExecutionFailureInfo(
1✔
794
                NexusOperationFailureInfo.newBuilder()
1✔
795
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
796
                    .setService(data.scheduledEvent.getService())
1✔
797
                    .setOperation(data.scheduledEvent.getOperation())
1✔
798
                    .setOperationId(data.operationId)
1✔
799
                    .setScheduledEventId(data.scheduledEventId))
1✔
800
            .setCause(
1✔
801
                Failure.newBuilder()
1✔
802
                    .setMessage("operation timed out")
1✔
803
                    .setTimeoutFailureInfo(
1✔
804
                        TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType)))
1✔
805
            .build();
1✔
806

807
    ctx.addEvent(
1✔
808
        HistoryEvent.newBuilder()
1✔
809
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT)
1✔
810
            .setNexusOperationTimedOutEventAttributes(
1✔
811
                NexusOperationTimedOutEventAttributes.newBuilder()
1✔
812
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
813
                    .setScheduledEventId(data.scheduledEventId)
1✔
814
                    .setFailure(failure))
1✔
815
            .build());
1✔
816
  }
1✔
817

818
  private static State failNexusOperation(
819
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
820
    RetryState retryState = attemptNexusOperationRetry(ctx, Optional.of(failure), data);
1✔
821
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS
1!
822
        || retryState == RetryState.RETRY_STATE_TIMEOUT) {
823
      // RETRY_STATE_TIMEOUT indicates that the next attempt schedule time would exceed the
824
      // operation's schedule-to-close timeout, so do not fail the operation here and allow
825
      // it to be timed out by the timer set in
826
      // io.temporal.internal.testservice.TestWorkflowMutableStateImpl.timeoutNexusOperation
827
      return (Strings.isNullOrEmpty(data.operationId)) ? INITIATED : STARTED;
1✔
828
    }
829

830
    Failure wrapped =
831
        Failure.newBuilder()
1✔
832
            .setMessage("nexus operation completed unsuccessfully")
1✔
833
            .setNexusOperationExecutionFailureInfo(
1✔
834
                NexusOperationFailureInfo.newBuilder()
1✔
835
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
836
                    .setService(data.scheduledEvent.getService())
1✔
837
                    .setOperation(data.scheduledEvent.getOperation())
1✔
838
                    .setOperationId(data.operationId)
1✔
839
                    .setScheduledEventId(data.scheduledEventId))
1✔
840
            .setCause(failure)
1✔
841
            .build();
1✔
842

843
    ctx.addEvent(
1✔
844
        HistoryEvent.newBuilder()
1✔
845
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_FAILED)
1✔
846
            .setNexusOperationFailedEventAttributes(
1✔
847
                NexusOperationFailedEventAttributes.newBuilder()
1✔
848
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
849
                    .setScheduledEventId(data.scheduledEventId)
1✔
850
                    .setFailure(wrapped))
1✔
851
            .build());
1✔
852
    return FAILED;
1✔
853
  }
854

855
  private static RetryState attemptNexusOperationRetry(
856
      RequestContext ctx, Optional<Failure> failure, NexusOperationData data) {
857
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
858
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
859
    if (info.isPresent()) {
1!
860
      if (info.get().getNonRetryable()) {
1✔
861
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
862
      }
863
      if (info.get().hasNextRetryDelay()) {
1!
864
        nextRetryDelay =
×
865
            Optional.of(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
×
866
      }
867
    }
868

869
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
870
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
871
        data.retryState.getBackoffIntervalInSeconds(
1✔
872
            info.map(ApplicationFailureInfo::getType), ctx.currentTime(), nextRetryDelay);
1✔
873
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
874
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
875
      PollNexusTaskQueueResponse.Builder task = data.nexusTask.getTask();
1✔
876
      ctx.onCommit(
1✔
877
          (historySize) -> {
878
            data.retryState = nextAttempt;
1✔
879
            data.isBackingOff = true;
1✔
880
            data.lastAttemptCompleteTime = ctx.currentTime();
1✔
881
            data.nextAttemptScheduleTime =
1✔
882
                Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval);
1✔
883
            task.setTaskToken(
1✔
884
                new NexusTaskToken(
885
                        ctx.getExecutionId(),
1✔
886
                        data.scheduledEventId,
887
                        nextAttempt.getAttempt(),
1✔
888
                        task.getRequest().hasCancelOperation())
1✔
889
                    .toBytes());
1✔
890
          });
1✔
891
    } else {
1✔
892
      data.nextBackoffInterval = Durations.ZERO;
1✔
893
    }
894
    return backoffInterval.getRetryState();
1✔
895
  }
896

897
  private static void requestCancelNexusOperation(
898
      RequestContext ctx,
899
      NexusOperationData data,
900
      RequestCancelNexusOperationCommandAttributes attr,
901
      long workflowTaskCompletedId) {
902
    ctx.addEvent(
1✔
903
        HistoryEvent.newBuilder()
1✔
904
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED)
1✔
905
            .setNexusOperationCancelRequestedEventAttributes(
1✔
906
                NexusOperationCancelRequestedEventAttributes.newBuilder()
1✔
907
                    .setScheduledEventId(data.scheduledEventId)
1✔
908
                    .setWorkflowTaskCompletedEventId(workflowTaskCompletedId))
1✔
909
            .build());
1✔
910

911
    NexusTaskToken taskToken =
1✔
912
        new NexusTaskToken(ctx.getExecutionId(), data.scheduledEventId, data.getAttempt(), true);
1✔
913

914
    PollNexusTaskQueueResponse.Builder pollResponse =
915
        PollNexusTaskQueueResponse.newBuilder()
1✔
916
            .setTaskToken(taskToken.toBytes())
1✔
917
            .setRequest(
1✔
918
                io.temporal.api.nexus.v1.Request.newBuilder()
1✔
919
                    .setCancelOperation(
1✔
920
                        CancelOperationRequest.newBuilder()
1✔
921
                            .setOperationId(data.operationId)
1✔
922
                            .setOperation(data.scheduledEvent.getOperation())
1✔
923
                            .setService(data.scheduledEvent.getService())));
1✔
924

925
    TaskQueueId taskQueueId =
1✔
926
        new TaskQueueId(
927
            ctx.getNamespace(), data.endpoint.getSpec().getTarget().getWorker().getTaskQueue());
1✔
928
    Timestamp taskDeadline = Timestamps.add(ctx.currentTime(), data.requestTimeout);
1✔
929
    TestWorkflowStore.NexusTask cancelTask =
1✔
930
        new TestWorkflowStore.NexusTask(taskQueueId, pollResponse, taskDeadline);
931

932
    // Test server only supports worker targets, so just push directly to Nexus task queue without
933
    // invoking Nexus client.
934
    ctx.addNexusTask(cancelTask);
1✔
935
    ctx.onCommit(
1✔
936
        historySize -> {
937
          data.nexusTask = cancelTask;
1✔
938
          data.cancelRequestedTime = ctx.currentTime();
1✔
939
          data.isBackingOff = false;
1✔
940
        });
1✔
941
  }
1✔
942

943
  private static void reportNexusOperationCancellation(
944
      RequestContext ctx, NexusOperationData data, Failure failure, long notUsed) {
945
    Failure.Builder wrapped =
946
        Failure.newBuilder()
1✔
947
            .setMessage("nexus operation completed unsuccessfully")
1✔
948
            .setNexusOperationExecutionFailureInfo(
1✔
949
                NexusOperationFailureInfo.newBuilder()
1✔
950
                    .setEndpoint(data.scheduledEvent.getEndpoint())
1✔
951
                    .setService(data.scheduledEvent.getService())
1✔
952
                    .setOperation(data.scheduledEvent.getOperation())
1✔
953
                    .setOperationId(data.operationId)
1✔
954
                    .setScheduledEventId(data.scheduledEventId));
1✔
955
    if (failure != null) {
1!
956
      wrapped.setCause(failure);
1✔
957
    }
958
    ctx.addEvent(
1✔
959
        HistoryEvent.newBuilder()
1✔
960
            .setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_CANCELED)
1✔
961
            .setNexusOperationCanceledEventAttributes(
1✔
962
                NexusOperationCanceledEventAttributes.newBuilder()
1✔
963
                    .setScheduledEventId(data.scheduledEventId)
1✔
964
                    .setRequestId(data.scheduledEvent.getRequestId())
1✔
965
                    .setFailure(wrapped))
1✔
966
            .build());
1✔
967
  }
1✔
968

969
  private static void timeoutChildWorkflow(
970
      RequestContext ctx, ChildWorkflowData data, RetryState retryState, long notUsed) {
971
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
972
    ChildWorkflowExecutionTimedOutEventAttributes a =
973
        ChildWorkflowExecutionTimedOutEventAttributes.newBuilder()
1✔
974
            .setNamespace(ie.getNamespace())
1✔
975
            .setStartedEventId(data.startedEventId)
1✔
976
            .setWorkflowExecution(data.execution)
1✔
977
            .setWorkflowType(ie.getWorkflowType())
1✔
978
            .setRetryState(retryState)
1✔
979
            .setInitiatedEventId(data.initiatedEventId)
1✔
980
            .build();
1✔
981
    HistoryEvent event =
982
        HistoryEvent.newBuilder()
1✔
983
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
984
            .setChildWorkflowExecutionTimedOutEventAttributes(a)
1✔
985
            .build();
1✔
986
    ctx.addEvent(event);
1✔
987
  }
1✔
988

989
  private static void startChildWorkflowFailed(
990
      RequestContext ctx,
991
      ChildWorkflowData data,
992
      StartChildWorkflowExecutionFailedEventAttributes a,
993
      long notUsed) {
994
    StartChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
995
        a.toBuilder()
1✔
996
            .setInitiatedEventId(data.initiatedEventId)
1✔
997
            .setWorkflowType(data.initiatedEvent.getWorkflowType())
1✔
998
            .setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
999
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1!
1000
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
1001
    }
1002
    HistoryEvent event =
1003
        HistoryEvent.newBuilder()
1✔
1004
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1005
            .setStartChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1006
            .build();
1✔
1007
    ctx.addEvent(event);
1✔
1008
  }
1✔
1009

1010
  private static void childWorkflowStarted(
1011
      RequestContext ctx,
1012
      ChildWorkflowData data,
1013
      ChildWorkflowExecutionStartedEventAttributes a,
1014
      long notUsed) {
1015
    ChildWorkflowExecutionStartedEventAttributes updatedAttr =
1✔
1016
        a.toBuilder().setInitiatedEventId(data.initiatedEventId).build();
1✔
1017
    HistoryEvent event =
1018
        HistoryEvent.newBuilder()
1✔
1019
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED)
1✔
1020
            .setChildWorkflowExecutionStartedEventAttributes(updatedAttr)
1✔
1021
            .build();
1✔
1022
    long startedEventId = ctx.addEvent(event);
1✔
1023
    ctx.onCommit(
1✔
1024
        (historySize) -> {
1025
          data.startedEventId = startedEventId;
1✔
1026
          data.execution = updatedAttr.getWorkflowExecution();
1✔
1027
        });
1✔
1028
  }
1✔
1029

1030
  private static void childWorkflowCompleted(
1031
      RequestContext ctx,
1032
      ChildWorkflowData data,
1033
      ChildWorkflowExecutionCompletedEventAttributes a,
1034
      long notUsed) {
1035
    ChildWorkflowExecutionCompletedEventAttributes updatedAttr =
1✔
1036
        a.toBuilder()
1✔
1037
            .setInitiatedEventId(data.initiatedEventId)
1✔
1038
            .setStartedEventId(data.startedEventId)
1✔
1039
            .build();
1✔
1040
    HistoryEvent event =
1041
        HistoryEvent.newBuilder()
1✔
1042
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED)
1✔
1043
            .setChildWorkflowExecutionCompletedEventAttributes(updatedAttr)
1✔
1044
            .build();
1✔
1045
    ctx.addEvent(event);
1✔
1046
  }
1✔
1047

1048
  private static void childWorkflowFailed(
1049
      RequestContext ctx,
1050
      ChildWorkflowData data,
1051
      ChildWorkflowExecutionFailedEventAttributes a,
1052
      long notUsed) {
1053
    ChildWorkflowExecutionFailedEventAttributes.Builder updatedAttr =
1✔
1054
        a.toBuilder()
1✔
1055
            .setInitiatedEventId(data.initiatedEventId)
1✔
1056
            .setStartedEventId(data.startedEventId)
1✔
1057
            .setWorkflowExecution(data.execution)
1✔
1058
            .setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
1059
    if (!data.initiatedEvent.getNamespace().isEmpty()) {
1!
1060
      updatedAttr.setNamespace(data.initiatedEvent.getNamespace());
1✔
1061
    }
1062
    HistoryEvent event =
1063
        HistoryEvent.newBuilder()
1✔
1064
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_FAILED)
1✔
1065
            .setChildWorkflowExecutionFailedEventAttributes(updatedAttr.build())
1✔
1066
            .build();
1✔
1067
    ctx.addEvent(event);
1✔
1068
  }
1✔
1069

1070
  private static void childWorkflowCanceled(
1071
      RequestContext ctx,
1072
      ChildWorkflowData data,
1073
      ChildWorkflowExecutionCanceledEventAttributes a,
1074
      long notUsed) {
1075
    ChildWorkflowExecutionCanceledEventAttributes updatedAttr =
1✔
1076
        a.toBuilder()
1✔
1077
            .setInitiatedEventId(data.initiatedEventId)
1✔
1078
            .setStartedEventId(data.startedEventId)
1✔
1079
            .build();
1✔
1080
    HistoryEvent event =
1081
        HistoryEvent.newBuilder()
1✔
1082
            .setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED)
1✔
1083
            .setChildWorkflowExecutionCanceledEventAttributes(updatedAttr)
1✔
1084
            .build();
1✔
1085
    ctx.addEvent(event);
1✔
1086
  }
1✔
1087

1088
  private static void initiateChildWorkflow(
1089
      RequestContext ctx,
1090
      ChildWorkflowData data,
1091
      StartChildWorkflowExecutionCommandAttributes d,
1092
      long workflowTaskCompletedEventId) {
1093
    StartChildWorkflowExecutionInitiatedEventAttributes.Builder a =
1094
        StartChildWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
1095
            .setControl(d.getControl())
1✔
1096
            .setInput(d.getInput())
1✔
1097
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
1098
            .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1!
1099
            .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1100
            .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1101
            .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1102
            .setTaskQueue(d.getTaskQueue())
1✔
1103
            .setWorkflowId(d.getWorkflowId())
1✔
1104
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1105
            .setWorkflowType(d.getWorkflowType())
1✔
1106
            .setCronSchedule(d.getCronSchedule())
1✔
1107
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
1108
    if (d.hasHeader()) {
1!
1109
      a.setHeader(d.getHeader());
1✔
1110
    }
1111
    if (d.hasMemo()) {
1✔
1112
      a.setMemo(d.getMemo());
1✔
1113
    }
1114
    if (d.hasRetryPolicy()) {
1✔
1115
      a.setRetryPolicy(d.getRetryPolicy());
1✔
1116
    }
1117
    HistoryEvent event =
1118
        HistoryEvent.newBuilder()
1✔
1119
            .setEventType(EventType.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED)
1✔
1120
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a)
1✔
1121
            .build();
1✔
1122
    long initiatedEventId = ctx.addEvent(event);
1✔
1123
    ctx.onCommit(
1✔
1124
        (historySize) -> {
1125
          data.initiatedEventId = initiatedEventId;
1✔
1126
          data.initiatedEvent = a.build();
1✔
1127
          StartWorkflowExecutionRequest.Builder startChild =
1128
              StartWorkflowExecutionRequest.newBuilder()
1✔
1129
                  .setRequestId(UUID.randomUUID().toString())
1✔
1130
                  .setNamespace(d.getNamespace().isEmpty() ? ctx.getNamespace() : d.getNamespace())
1!
1131
                  .setWorkflowExecutionTimeout(d.getWorkflowExecutionTimeout())
1✔
1132
                  .setWorkflowRunTimeout(d.getWorkflowRunTimeout())
1✔
1133
                  .setWorkflowTaskTimeout(d.getWorkflowTaskTimeout())
1✔
1134
                  .setTaskQueue(d.getTaskQueue())
1✔
1135
                  .setWorkflowId(d.getWorkflowId())
1✔
1136
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
1137
                  .setWorkflowType(d.getWorkflowType())
1✔
1138
                  .setCronSchedule(d.getCronSchedule());
1✔
1139
          if (d.hasHeader()) {
1!
1140
            startChild.setHeader(d.getHeader());
1✔
1141
          }
1142
          if (d.hasSearchAttributes()) {
1✔
1143
            startChild.setSearchAttributes(d.getSearchAttributes());
1✔
1144
          }
1145
          if (d.hasMemo()) {
1✔
1146
            startChild.setMemo(d.getMemo());
1✔
1147
          }
1148
          if (d.hasRetryPolicy()) {
1✔
1149
            startChild.setRetryPolicy(d.getRetryPolicy());
1✔
1150
          }
1151
          if (d.hasInput()) {
1✔
1152
            startChild.setInput(d.getInput());
1✔
1153
          }
1154
          addStartChildTask(ctx, data, initiatedEventId, startChild.build());
1✔
1155
        });
1✔
1156
  }
1✔
1157

1158
  private static void addStartChildTask(
1159
      RequestContext ctx,
1160
      ChildWorkflowData data,
1161
      long initiatedEventId,
1162
      StartWorkflowExecutionRequest startChild) {
1163
    ForkJoinPool.commonPool()
1✔
1164
        .execute(
1✔
1165
            () -> {
1166
              try {
1167
                data.service.startWorkflowExecutionImpl(
1✔
1168
                    startChild,
1169
                    java.time.Duration.ZERO,
1170
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
1171
                    OptionalLong.of(data.initiatedEventId),
1✔
1172
                    null);
1173
              } catch (StatusRuntimeException e) {
1✔
1174
                if (e.getStatus().getCode() == Status.Code.ALREADY_EXISTS) {
1!
1175
                  StartChildWorkflowExecutionFailedEventAttributes failRequest =
1176
                      StartChildWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1177
                          .setInitiatedEventId(initiatedEventId)
1✔
1178
                          .setCause(
1✔
1179
                              StartChildWorkflowExecutionFailedCause
1180
                                  .START_CHILD_WORKFLOW_EXECUTION_FAILED_CAUSE_WORKFLOW_ALREADY_EXISTS)
1181
                          .build();
1✔
1182
                  try {
1183
                    ctx.getWorkflowMutableState()
1✔
1184
                        .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
1185
                  } catch (Throwable ee) {
×
1186
                    log.error("Unexpected failure inserting failStart for a child workflow", ee);
×
1187
                  }
1✔
1188
                } else {
1✔
1189
                  log.error("Unexpected failure starting a child workflow", e);
×
1190
                }
1191
              } catch (Exception e) {
×
1192
                log.error("Unexpected failure starting a child workflow", e);
×
1193
              }
1✔
1194
            });
1✔
1195
  }
1✔
1196

1197
  private static void startWorkflow(
1198
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed) {
1199
    if (Durations.compare(request.getWorkflowExecutionTimeout(), Durations.ZERO) < 0) {
1!
1200
      throw Status.INVALID_ARGUMENT
×
1201
          .withDescription("negative workflowExecution timeout")
×
1202
          .asRuntimeException();
×
1203
    }
1204
    if (Durations.compare(request.getWorkflowRunTimeout(), Durations.ZERO) < 0) {
1!
1205
      throw Status.INVALID_ARGUMENT
×
1206
          .withDescription("negative workflowRun timeout")
×
1207
          .asRuntimeException();
×
1208
    }
1209
    if (Durations.compare(request.getWorkflowTaskTimeout(), Durations.ZERO) < 0) {
1!
1210
      throw Status.INVALID_ARGUMENT
×
1211
          .withDescription("negative workflowTaskTimeoutSeconds")
×
1212
          .asRuntimeException();
×
1213
    }
1214
    if (request.hasWorkflowStartDelay() && !request.getCronSchedule().trim().isEmpty()) {
1!
1215
      throw Status.INVALID_ARGUMENT
×
1216
          .withDescription("CronSchedule and WorkflowStartDelay may not be used together.")
×
1217
          .asRuntimeException();
×
1218
    }
1219
    if (request.getCompletionCallbacksCount() > 0
1✔
1220
        && !request.getCompletionCallbacksList().stream().allMatch(Callback::hasNexus)) {
1!
1221
      throw Status.INVALID_ARGUMENT
×
1222
          .withDescription("non-Nexus completion callbacks are not supported.")
×
1223
          .asRuntimeException();
×
1224
    }
1225

1226
    WorkflowExecutionStartedEventAttributes.Builder a =
1227
        WorkflowExecutionStartedEventAttributes.newBuilder()
1✔
1228
            .setWorkflowType(request.getWorkflowType())
1✔
1229
            .setWorkflowRunTimeout(request.getWorkflowRunTimeout())
1✔
1230
            .setWorkflowTaskTimeout(request.getWorkflowTaskTimeout())
1✔
1231
            .setWorkflowExecutionTimeout(request.getWorkflowExecutionTimeout())
1✔
1232
            .setIdentity(request.getIdentity())
1✔
1233
            .setInput(request.getInput())
1✔
1234
            .setTaskQueue(request.getTaskQueue())
1✔
1235
            .addAllCompletionCallbacks(request.getCompletionCallbacksList())
1✔
1236
            .setAttempt(1);
1✔
1237
    if (request.hasRetryPolicy()) {
1✔
1238
      a.setRetryPolicy(request.getRetryPolicy());
1✔
1239
    }
1240
    data.retryState.ifPresent(
1✔
1241
        testServiceRetryState -> a.setAttempt(testServiceRetryState.getAttempt()));
1✔
1242
    a.setFirstExecutionRunId(data.firstExecutionRunId);
1✔
1243
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
1244
    data.continuedExecutionRunId.ifPresent(a::setContinuedExecutionRunId);
1✔
1245
    if (data.lastCompletionResult != null) {
1✔
1246
      a.setLastCompletionResult(data.lastCompletionResult);
1✔
1247
    }
1248
    if (request.hasWorkflowStartDelay()) {
1✔
1249
      a.setFirstWorkflowTaskBackoff(request.getWorkflowStartDelay());
1✔
1250
    }
1251
    data.lastFailure.ifPresent(a::setContinuedFailure);
1✔
1252
    if (request.hasMemo()) {
1✔
1253
      a.setMemo(request.getMemo());
1✔
1254
    }
1255
    if (request.hasSearchAttributes()) {
1✔
1256
      a.setSearchAttributes((request.getSearchAttributes()));
1✔
1257
    }
1258
    if (request.hasHeader()) {
1✔
1259
      a.setHeader(request.getHeader());
1✔
1260
    }
1261
    String cronSchedule = request.getCronSchedule();
1✔
1262
    if (!cronSchedule.trim().isEmpty()) {
1✔
1263
      try {
1264
        CronUtils.parseCron(cronSchedule);
1✔
1265
        a.setCronSchedule(cronSchedule);
1✔
1266
      } catch (Exception e) {
×
1267
        throw Status.INVALID_ARGUMENT
×
1268
            .withDescription("Invalid cron expression \"" + cronSchedule + "\": " + e.getMessage())
×
1269
            .withCause(e)
×
1270
            .asRuntimeException();
×
1271
      }
1✔
1272
    }
1273
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
1274
    if (parent.isPresent()) {
1✔
1275
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
1276
      a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
1✔
1277
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
1278
    }
1279
    HistoryEvent.Builder event =
1280
        HistoryEvent.newBuilder()
1✔
1281
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
1✔
1282
            .setWorkflowExecutionStartedEventAttributes(a);
1✔
1283
    if (request.getLinksCount() > 0) {
1✔
1284
      event.addAllLinks(request.getLinksList());
1✔
1285
    }
1286
    ctx.addEvent(event.build());
1✔
1287
  }
1✔
1288

1289
  private static void completeWorkflow(
1290
      RequestContext ctx,
1291
      WorkflowData data,
1292
      CompleteWorkflowExecutionCommandAttributes d,
1293
      long workflowTaskCompletedEventId) {
1294
    WorkflowExecutionCompletedEventAttributes.Builder a =
1295
        WorkflowExecutionCompletedEventAttributes.newBuilder()
1✔
1296
            .setResult(d.getResult())
1✔
1297
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1298
    HistoryEvent event =
1299
        HistoryEvent.newBuilder()
1✔
1300
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED)
1✔
1301
            .setWorkflowExecutionCompletedEventAttributes(a)
1✔
1302
            .build();
1✔
1303
    ctx.addEvent(event);
1✔
1304
  }
1✔
1305

1306
  private static void continueAsNewWorkflow(
1307
      RequestContext ctx,
1308
      WorkflowData data,
1309
      ContinueAsNewWorkflowExecutionCommandAttributes d,
1310
      long workflowTaskCompletedEventId) {
1311
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
1312
    WorkflowExecutionContinuedAsNewEventAttributes.Builder a =
1313
        WorkflowExecutionContinuedAsNewEventAttributes.newBuilder();
1✔
1314
    a.setInput(d.getInput());
1✔
1315
    if (d.hasHeader()) {
1✔
1316
      a.setHeader(d.getHeader());
1✔
1317
    }
1318
    if (Durations.compare(d.getWorkflowRunTimeout(), Durations.ZERO) > 0) {
1✔
1319
      a.setWorkflowRunTimeout(d.getWorkflowRunTimeout());
1✔
1320
    } else {
1321
      a.setWorkflowRunTimeout(sr.getWorkflowRunTimeout());
1✔
1322
    }
1323
    if (d.hasTaskQueue() && !d.getTaskQueue().getName().isEmpty()) {
1!
1324
      a.setTaskQueue(d.getTaskQueue());
1✔
1325
    } else {
1326
      a.setTaskQueue(sr.getTaskQueue());
1✔
1327
    }
1328
    if (d.hasWorkflowType() && !d.getWorkflowType().getName().isEmpty()) {
1!
1329
      a.setWorkflowType(d.getWorkflowType());
1✔
1330
    } else {
1331
      a.setWorkflowType(sr.getWorkflowType());
1✔
1332
    }
1333
    if (Durations.compare(d.getWorkflowTaskTimeout(), Durations.ZERO) > 0) {
1✔
1334
      a.setWorkflowTaskTimeout(d.getWorkflowTaskTimeout());
1✔
1335
    } else {
1336
      a.setWorkflowTaskTimeout(sr.getWorkflowTaskTimeout());
1✔
1337
    }
1338
    a.setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1339
    a.setBackoffStartInterval(d.getBackoffStartInterval());
1✔
1340
    if (d.hasLastCompletionResult()) {
1✔
1341
      a.setLastCompletionResult(d.getLastCompletionResult());
1✔
1342
    }
1343
    if (d.hasFailure()) {
1✔
1344
      a.setFailure(d.getFailure());
1✔
1345
    }
1346
    a.setNewExecutionRunId(UUID.randomUUID().toString());
1✔
1347
    HistoryEvent event =
1348
        HistoryEvent.newBuilder()
1✔
1349
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW)
1✔
1350
            .setWorkflowExecutionContinuedAsNewEventAttributes(a)
1✔
1351
            .build();
1✔
1352
    ctx.addEvent(event);
1✔
1353
  }
1✔
1354

1355
  private static void failWorkflow(
1356
      RequestContext ctx,
1357
      WorkflowData data,
1358
      FailWorkflowExecutionCommandAttributes d,
1359
      long workflowTaskCompletedEventId) {
1360
    WorkflowExecutionFailedEventAttributes.Builder a =
1361
        WorkflowExecutionFailedEventAttributes.newBuilder()
1✔
1362
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1363
    if (d.hasFailure()) {
1!
1364
      a.setFailure(d.getFailure());
1✔
1365
    }
1366
    HistoryEvent event =
1367
        HistoryEvent.newBuilder()
1✔
1368
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED)
1✔
1369
            .setWorkflowExecutionFailedEventAttributes(a)
1✔
1370
            .build();
1✔
1371
    ctx.addEvent(event);
1✔
1372
  }
1✔
1373

1374
  private static void timeoutWorkflow(
1375
      RequestContext ctx, WorkflowData data, RetryState retryState, long notUsed) {
1376
    WorkflowExecutionTimedOutEventAttributes.Builder a =
1377
        WorkflowExecutionTimedOutEventAttributes.newBuilder().setRetryState(retryState);
1✔
1378
    HistoryEvent event =
1379
        HistoryEvent.newBuilder()
1✔
1380
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT)
1✔
1381
            .setWorkflowExecutionTimedOutEventAttributes(a)
1✔
1382
            .build();
1✔
1383
    ctx.addEvent(event);
1✔
1384
  }
1✔
1385

1386
  private static void cancelWorkflow(
1387
      RequestContext ctx,
1388
      WorkflowData data,
1389
      CancelWorkflowExecutionCommandAttributes d,
1390
      long workflowTaskCompletedEventId) {
1391
    WorkflowExecutionCanceledEventAttributes.Builder a =
1392
        WorkflowExecutionCanceledEventAttributes.newBuilder()
1✔
1393
            .setDetails(d.getDetails())
1✔
1394
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1395
    HistoryEvent event =
1396
        HistoryEvent.newBuilder()
1✔
1397
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED)
1✔
1398
            .setWorkflowExecutionCanceledEventAttributes(a)
1✔
1399
            .build();
1✔
1400
    ctx.addEvent(event);
1✔
1401
  }
1✔
1402

1403
  private static void terminateWorkflow(
1404
      RequestContext ctx,
1405
      WorkflowData data,
1406
      TerminateWorkflowExecutionRequest d,
1407
      long workflowTaskCompletedEventId) {
1408
    WorkflowExecutionTerminatedEventAttributes.Builder a =
1409
        WorkflowExecutionTerminatedEventAttributes.newBuilder()
1✔
1410
            .setDetails(d.getDetails())
1✔
1411
            .setIdentity(d.getIdentity())
1✔
1412
            .setReason(d.getReason());
1✔
1413
    HistoryEvent event =
1414
        HistoryEvent.newBuilder()
1✔
1415
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED)
1✔
1416
            .setWorkflowExecutionTerminatedEventAttributes(a)
1✔
1417
            .build();
1✔
1418
    ctx.addEvent(event);
1✔
1419
  }
1✔
1420

1421
  private static void requestWorkflowCancellation(
1422
      RequestContext ctx,
1423
      WorkflowData data,
1424
      RequestCancelWorkflowExecutionRequest cancelRequest,
1425
      long notUsed) {
1426
    WorkflowExecutionCancelRequestedEventAttributes.Builder a =
1427
        WorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
1428
            .setIdentity(cancelRequest.getIdentity());
1✔
1429
    HistoryEvent cancelRequested =
1430
        HistoryEvent.newBuilder()
1✔
1431
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
1432
            .setWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
1433
            .build();
1✔
1434
    ctx.addEvent(cancelRequested);
1✔
1435
  }
1✔
1436

1437
  private static void scheduleActivityTask(
1438
      RequestContext ctx,
1439
      ActivityTaskData data,
1440
      ScheduleActivityTaskCommandAttributes d,
1441
      long workflowTaskCompletedEventId) {
1442
    RetryPolicy retryPolicy = ensureDefaultFieldsForActivityRetryPolicy(d.getRetryPolicy());
1✔
1443
    Duration expirationInterval = d.getScheduleToCloseTimeout();
1✔
1444
    Timestamp expirationTime = Timestamps.add(data.store.currentTime(), expirationInterval);
1✔
1445
    TestServiceRetryState retryState = new TestServiceRetryState(retryPolicy, expirationTime);
1✔
1446

1447
    ActivityTaskScheduledEventAttributes.Builder a =
1448
        ActivityTaskScheduledEventAttributes.newBuilder()
1✔
1449
            .setInput(d.getInput())
1✔
1450
            .setActivityId(d.getActivityId())
1✔
1451
            .setActivityType(d.getActivityType())
1✔
1452
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1453
            .setRetryPolicy(retryPolicy)
1✔
1454
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1455
            .setScheduleToStartTimeout(d.getScheduleToStartTimeout())
1✔
1456
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1457
            .setTaskQueue(d.getTaskQueue())
1✔
1458
            .setHeader(d.getHeader())
1✔
1459
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1460

1461
    // Cannot set it in onCommit as it is used in the processScheduleActivityTask
1462
    data.scheduledEvent = a.build();
1✔
1463
    HistoryEvent event =
1464
        HistoryEvent.newBuilder()
1✔
1465
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED)
1✔
1466
            .setActivityTaskScheduledEventAttributes(a)
1✔
1467
            .build();
1✔
1468
    long scheduledEventId = ctx.addEvent(event);
1✔
1469

1470
    PollActivityTaskQueueResponse.Builder taskResponse =
1471
        PollActivityTaskQueueResponse.newBuilder()
1✔
1472
            .setWorkflowNamespace(ctx.getNamespace())
1✔
1473
            .setWorkflowType(data.startWorkflowExecutionRequest.getWorkflowType())
1✔
1474
            .setActivityType(d.getActivityType())
1✔
1475
            .setWorkflowExecution(ctx.getExecution())
1✔
1476
            .setActivityId(d.getActivityId())
1✔
1477
            .setInput(d.getInput())
1✔
1478
            .setHeartbeatTimeout(d.getHeartbeatTimeout())
1✔
1479
            .setScheduleToCloseTimeout(d.getScheduleToCloseTimeout())
1✔
1480
            .setStartToCloseTimeout(d.getStartToCloseTimeout())
1✔
1481
            .setScheduledTime(ctx.currentTime())
1✔
1482
            .setCurrentAttemptScheduledTime(ctx.currentTime())
1✔
1483
            .setHeader(d.getHeader())
1✔
1484
            .setAttempt(1);
1✔
1485

1486
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), d.getTaskQueue().getName());
1✔
1487
    ActivityTask activityTask = new ActivityTask(taskQueueId, taskResponse);
1✔
1488
    ctx.addActivityTask(activityTask);
1✔
1489
    ctx.onCommit(
1✔
1490
        (historySize) -> {
1491
          data.scheduledEventId = scheduledEventId;
1✔
1492
          data.activityTask = activityTask;
1✔
1493
          data.retryState = retryState;
1✔
1494
        });
1✔
1495
  }
1✔
1496

1497
  private static void requestActivityCancellation(
1498
      RequestContext ctx,
1499
      ActivityTaskData data,
1500
      RequestCancelActivityTaskCommandAttributes d,
1501
      long workflowTaskCompletedEventId) {
1502
    ActivityTaskCancelRequestedEventAttributes.Builder a =
1503
        ActivityTaskCancelRequestedEventAttributes.newBuilder()
1✔
1504
            .setScheduledEventId(d.getScheduledEventId())
1✔
1505
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId);
1✔
1506
    HistoryEvent event =
1507
        HistoryEvent.newBuilder()
1✔
1508
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED)
1✔
1509
            .setActivityTaskCancelRequestedEventAttributes(a)
1✔
1510
            .build();
1✔
1511
    ctx.addEvent(event);
1✔
1512
  }
1✔
1513

1514
  private static void scheduleWorkflowTask(
1515
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1516
    StartWorkflowExecutionRequest request = data.startRequest;
1✔
1517
    long scheduledEventId;
1518
    TaskQueue taskQueue = request.getTaskQueue();
1✔
1519
    WorkflowTaskScheduledEventAttributes a =
1520
        WorkflowTaskScheduledEventAttributes.newBuilder()
1✔
1521
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
1✔
1522
            .setTaskQueue(taskQueue)
1✔
1523
            .setAttempt(++data.attempt)
1✔
1524
            .build();
1✔
1525
    HistoryEvent event =
1526
        HistoryEvent.newBuilder()
1✔
1527
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
1✔
1528
            .setWorkflowTaskScheduledEventAttributes(a)
1✔
1529
            .build();
1✔
1530
    scheduledEventId = ctx.addEvent(event);
1✔
1531
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1532
        PollWorkflowTaskQueueResponse.newBuilder();
1✔
1533
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
1534
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
1535
    workflowTaskResponse.setAttempt(data.attempt);
1✔
1536
    workflowTaskResponse.setScheduledTime(ctx.currentTime());
1✔
1537
    workflowTaskResponse.setWorkflowExecutionTaskQueue(taskQueue);
1✔
1538
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue.getName());
1✔
1539
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
1✔
1540
    ctx.setWorkflowTaskForMatching(workflowTask);
1✔
1541
    ctx.onCommit(
1✔
1542
        (historySize) -> {
1543
          data.scheduledEventId = scheduledEventId;
1✔
1544
          data.workflowTask = workflowTaskResponse;
1✔
1545
          // Move buffered update request to new workflow task
1546
          data.updateRequest.putAll(data.updateRequestBuffer);
1✔
1547
          data.updateRequestBuffer.clear();
1✔
1548
        });
1✔
1549
  }
1✔
1550

1551
  private static void convertQueryWorkflowTaskToReal(
1552
      RequestContext ctx, WorkflowTaskData data, Object notUsedRequest, long notUsed) {
1553
    StartWorkflowExecutionRequest request = data.startRequest;
×
1554
    WorkflowTaskScheduledEventAttributes a =
1555
        WorkflowTaskScheduledEventAttributes.newBuilder()
×
1556
            .setStartToCloseTimeout(request.getWorkflowTaskTimeout())
×
1557
            .setTaskQueue(request.getTaskQueue())
×
1558
            .setAttempt(data.attempt)
×
1559
            .build();
×
1560
    HistoryEvent event =
1561
        HistoryEvent.newBuilder()
×
1562
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1563
            .setWorkflowTaskScheduledEventAttributes(a)
×
1564
            .build();
×
1565
    long scheduledEventId = ctx.addEvent(event);
×
1566
    ctx.onCommit((historySize) -> data.scheduledEventId = scheduledEventId);
×
1567
  }
×
1568

1569
  private static void scheduleQueryWorkflowTask(
1570
      RequestContext ctx,
1571
      WorkflowTaskData data,
1572
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1573
      long notUsed) {
1574
    ctx.lockTimer("scheduleQueryWorkflowTask");
×
1575
    StartWorkflowExecutionRequest request = data.startRequest;
×
1576
    PollWorkflowTaskQueueResponse.Builder workflowTaskResponse =
1577
        PollWorkflowTaskQueueResponse.newBuilder();
×
1578
    StickyExecutionAttributes stickyAttributes =
×
1579
        ctx.getWorkflowMutableState().getStickyExecutionAttributes();
×
1580
    String taskQueue =
1581
        stickyAttributes == null
×
1582
            ? request.getTaskQueue().getName()
×
1583
            : stickyAttributes.getWorkerTaskQueue().getName();
×
1584
    workflowTaskResponse.setWorkflowExecution(ctx.getExecution());
×
1585
    workflowTaskResponse.setWorkflowType(request.getWorkflowType());
×
1586
    workflowTaskResponse.setAttempt(++data.attempt);
×
1587
    TaskQueueId taskQueueId = new TaskQueueId(ctx.getNamespace(), taskQueue);
×
1588
    WorkflowTask workflowTask = new WorkflowTask(taskQueueId, workflowTaskResponse);
×
1589
    ctx.setWorkflowTaskForMatching(workflowTask);
×
1590
    ctx.onCommit(
×
1591
        (historySize) -> {
1592
          if (data.lastSuccessfulStartedEventId > 0) {
×
1593
            workflowTaskResponse.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
×
1594
          }
1595
          data.scheduledEventId = NO_EVENT_ID;
×
1596
          data.workflowTask = workflowTaskResponse;
×
1597
          if (query != null) {
×
1598
            data.consistentQueryRequests.put(query.getKey(), query);
×
1599
          }
1600
        });
×
1601
  }
×
1602

1603
  private static void queryWhileScheduled(
1604
      RequestContext ctx,
1605
      WorkflowTaskData data,
1606
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1607
      long notUsed) {
1608
    data.consistentQueryRequests.put(query.getKey(), query);
1✔
1609
  }
1✔
1610

1611
  private static void bufferQuery(
1612
      RequestContext ctx,
1613
      WorkflowTaskData data,
1614
      TestWorkflowMutableStateImpl.ConsistentQuery query,
1615
      long notUsed) {
1616
    data.queryBuffer.put(query.getKey(), query);
1✔
1617
  }
1✔
1618

1619
  private static void bufferUpdate(
1620
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1621
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1!
1622
      throw Status.INTERNAL
×
1623
          .withDescription("Update ID already exists: " + update.getId())
×
1624
          .asRuntimeException();
×
1625
    }
1626
    data.updateRequestBuffer.put(update.getId(), update);
1✔
1627
  }
1✔
1628

1629
  private static void addUpdate(
1630
      RequestContext ctx, WorkflowTaskData data, UpdateWorkflowExecution update, long notUsed) {
1631
    if (data.getUpdateRequest(update.getId()).isPresent()) {
1!
1632
      throw Status.INTERNAL
×
1633
          .withDescription("Update ID already exists: " + update.getId())
×
1634
          .asRuntimeException();
×
1635
    }
1636
    data.updateRequest.put(update.getId(), update);
1✔
1637
  }
1✔
1638

1639
  private static void startWorkflowTask(
1640
      RequestContext ctx,
1641
      WorkflowTaskData data,
1642
      PollWorkflowTaskQueueRequest request,
1643
      long notUsed) {
1644
    WorkflowTaskStartedEventAttributes a =
1645
        WorkflowTaskStartedEventAttributes.newBuilder()
1✔
1646
            .setIdentity(request.getIdentity())
1✔
1647
            .setScheduledEventId(data.scheduledEventId)
1✔
1648
            .build();
1✔
1649
    HistoryEvent event =
1650
        HistoryEvent.newBuilder()
1✔
1651
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
1✔
1652
            .setWorkflowTaskStartedEventAttributes(a)
1✔
1653
            .build();
1✔
1654
    long startedEventId = ctx.addEvent(event);
1✔
1655
    startWorkflowTaskImpl(ctx, data, request, startedEventId, false);
1✔
1656
  }
1✔
1657

1658
  private static void startQueryOnlyWorkflowTask(
1659
      RequestContext ctx,
1660
      WorkflowTaskData data,
1661
      PollWorkflowTaskQueueRequest request,
1662
      long notUsed) {
1663
    startWorkflowTaskImpl(ctx, data, request, NO_EVENT_ID, true);
×
1664
  }
×
1665

1666
  private static void startWorkflowTaskImpl(
1667
      RequestContext ctx,
1668
      WorkflowTaskData data,
1669
      PollWorkflowTaskQueueRequest request,
1670
      long startedEventId,
1671
      boolean queryOnly) {
1672
    ctx.onCommit(
1✔
1673
        (historySize) -> {
1674
          PollWorkflowTaskQueueResponse.Builder task = data.workflowTask;
1✔
1675
          task.setStartedEventId(data.scheduledEventId + 1);
1✔
1676
          WorkflowTaskToken taskToken = new WorkflowTaskToken(ctx.getExecutionId(), historySize);
1✔
1677
          task.setTaskToken(taskToken.toBytes());
1✔
1678
          GetWorkflowExecutionHistoryRequest getRequest =
1679
              GetWorkflowExecutionHistoryRequest.newBuilder()
1✔
1680
                  .setNamespace(request.getNamespace())
1✔
1681
                  .setExecution(ctx.getExecution())
1✔
1682
                  .build();
1✔
1683
          List<HistoryEvent> events;
1684
          events =
1✔
1685
              data.store
1686
                  .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest, null)
1✔
1687
                  .getHistory()
1✔
1688
                  .getEventsList();
1✔
1689
          long lastEventId = events.get(events.size() - 1).getEventId();
1✔
1690
          if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
1691
            events = events.subList((int) data.lastSuccessfulStartedEventId, events.size());
1✔
1692
          }
1693
          if (queryOnly && !data.workflowCompleted) {
1!
1694
            events = new ArrayList<>(events); // convert list to mutable
×
1695
            // Add "fake" workflow task scheduled and started if workflow is not closed
1696
            WorkflowTaskScheduledEventAttributes scheduledAttributes =
1697
                WorkflowTaskScheduledEventAttributes.newBuilder()
×
1698
                    .setStartToCloseTimeout(data.startRequest.getWorkflowTaskTimeout())
×
1699
                    .setTaskQueue(request.getTaskQueue())
×
1700
                    .setAttempt(data.attempt)
×
1701
                    .build();
×
1702
            HistoryEvent scheduledEvent =
1703
                HistoryEvent.newBuilder()
×
1704
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED)
×
1705
                    .setEventId(lastEventId + 1)
×
1706
                    .setWorkflowTaskScheduledEventAttributes(scheduledAttributes)
×
1707
                    .build();
×
1708
            events.add(scheduledEvent);
×
1709
            WorkflowTaskStartedEventAttributes startedAttributes =
1710
                WorkflowTaskStartedEventAttributes.newBuilder()
×
1711
                    .setIdentity(request.getIdentity())
×
1712
                    .setScheduledEventId(lastEventId + 1)
×
1713
                    .build();
×
1714
            HistoryEvent startedEvent =
1715
                HistoryEvent.newBuilder()
×
1716
                    .setEventId(lastEventId + 1)
×
1717
                    .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_STARTED)
×
1718
                    .setWorkflowTaskStartedEventAttributes(startedAttributes)
×
1719
                    .build();
×
1720
            events.add(startedEvent);
×
1721
            task.setStartedEventId(lastEventId + 2);
×
1722
          }
1723
          // get it from previous started event id.
1724
          task.setHistory(History.newBuilder().addAllEvents(events));
1✔
1725
          // Transfer the queries
1726
          Map<String, TestWorkflowMutableStateImpl.ConsistentQuery> queries =
1✔
1727
              data.consistentQueryRequests;
1728
          for (Map.Entry<String, TestWorkflowMutableStateImpl.ConsistentQuery> queryEntry :
1729
              queries.entrySet()) {
1✔
1730
            QueryWorkflowRequest queryWorkflowRequest = queryEntry.getValue().getRequest();
1✔
1731
            task.putQueries(queryEntry.getKey(), queryWorkflowRequest.getQuery());
1✔
1732
          }
1✔
1733
          // Transfer the messages
1734
          Map<String, UpdateWorkflowExecution> updates = data.updateRequest;
1✔
1735
          for (Map.Entry<String, UpdateWorkflowExecution> update : updates.entrySet()) {
1✔
1736
            UpdateWorkflowExecutionRequest updateRequest = update.getValue().getRequest();
1✔
1737
            Message updateMessage =
1738
                Message.newBuilder()
1✔
1739
                    .setId(update.getKey() + "/request")
1✔
1740
                    .setProtocolInstanceId(update.getKey())
1✔
1741
                    .setEventId(data.scheduledEventId)
1✔
1742
                    .setBody(Any.pack(updateRequest.getRequest()))
1✔
1743
                    .build();
1✔
1744
            task.addMessages(updateMessage);
1✔
1745
          }
1✔
1746
          if (data.lastSuccessfulStartedEventId > 0) {
1✔
1747
            task.setPreviousStartedEventId(data.lastSuccessfulStartedEventId);
1✔
1748
          }
1749
          if (!queryOnly) {
1!
1750
            data.startedEventId = startedEventId;
1✔
1751
          }
1752
        });
1✔
1753
  }
1✔
1754

1755
  private static void startActivityTask(
1756
      RequestContext ctx,
1757
      ActivityTaskData data,
1758
      PollActivityTaskQueueRequest request,
1759
      long notUsed) {
1760
    ActivityTaskStartedEventAttributes.Builder a =
1761
        ActivityTaskStartedEventAttributes.newBuilder()
1✔
1762
            .setIdentity(request.getIdentity())
1✔
1763
            .setScheduledEventId(data.scheduledEventId);
1✔
1764
    a.setAttempt(data.getAttempt());
1✔
1765
    // Setting timestamp here as the default logic will set it to the time when it is added to the
1766
    // history. But in the case of retry it happens only after an activity completion.
1767
    Timestamp timestamp = data.store.currentTime();
1✔
1768
    HistoryEvent event =
1769
        HistoryEvent.newBuilder()
1✔
1770
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_STARTED)
1✔
1771
            .setEventTime(timestamp)
1✔
1772
            .setActivityTaskStartedEventAttributes(a)
1✔
1773
            .build();
1✔
1774
    long startedEventId;
1775
    startedEventId = NO_EVENT_ID;
1✔
1776
    ctx.onCommit(
1✔
1777
        (historySize) -> {
1778
          data.startedEventId = startedEventId;
1✔
1779
          data.startedEvent = event;
1✔
1780
          PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
1781
          task.setTaskToken(
1✔
1782
              new ActivityTaskToken(ctx.getExecutionId(), data.scheduledEventId, task.getAttempt())
1✔
1783
                  .toBytes());
1✔
1784
          task.setStartedTime(timestamp);
1✔
1785
        });
1✔
1786
  }
1✔
1787

1788
  private static void completeWorkflowTask(
1789
      RequestContext ctx,
1790
      WorkflowTaskData data,
1791
      RespondWorkflowTaskCompletedRequest request,
1792
      long notUsed) {
1793
    WorkflowTaskCompletedEventAttributes.Builder a =
1794
        WorkflowTaskCompletedEventAttributes.newBuilder()
1✔
1795
            .setIdentity(request.getIdentity())
1✔
1796
            .setBinaryChecksum(request.getBinaryChecksum())
1✔
1797
            .setMeteringMetadata(request.getMeteringMetadata())
1✔
1798
            .setSdkMetadata(request.getSdkMetadata())
1✔
1799
            .setScheduledEventId(data.scheduledEventId);
1✔
1800
    HistoryEvent event =
1801
        HistoryEvent.newBuilder()
1✔
1802
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)
1✔
1803
            .setWorkflowTaskCompletedEventAttributes(a)
1✔
1804
            .build();
1✔
1805
    ctx.addEvent(event);
1✔
1806
    ctx.onCommit(
1✔
1807
        (historySize) -> {
1808
          data.lastSuccessfulStartedEventId = data.startedEventId;
1✔
1809
          data.clear();
1✔
1810
        });
1✔
1811
  }
1✔
1812

1813
  private static void completeQuery(
1814
      RequestContext ctx,
1815
      WorkflowTaskData data,
1816
      RespondWorkflowTaskCompletedRequest request,
1817
      long notUsed) {
1818
    Map<String, WorkflowQueryResult> responses = request.getQueryResultsMap();
×
1819
    for (Map.Entry<String, WorkflowQueryResult> resultEntry : responses.entrySet()) {
×
1820
      TestWorkflowMutableStateImpl.ConsistentQuery query =
×
1821
          data.consistentQueryRequests.remove(resultEntry.getKey());
×
1822
      if (query != null) {
×
1823
        WorkflowQueryResult value = resultEntry.getValue();
×
1824
        CompletableFuture<QueryWorkflowResponse> result = query.getResult();
×
1825
        switch (value.getResultType()) {
×
1826
          case QUERY_RESULT_TYPE_ANSWERED:
1827
            QueryWorkflowResponse response =
1828
                QueryWorkflowResponse.newBuilder().setQueryResult(value.getAnswer()).build();
×
1829
            result.complete(response);
×
1830
            break;
×
1831
          case QUERY_RESULT_TYPE_FAILED:
1832
            result.completeExceptionally(
×
1833
                StatusUtils.newException(
×
1834
                    Status.INTERNAL.withDescription(value.getErrorMessage()),
×
1835
                    QueryFailedFailure.getDefaultInstance(),
×
1836
                    QueryFailedFailure.getDescriptor()));
×
1837
            break;
×
1838
          default:
1839
            throw Status.INVALID_ARGUMENT
×
1840
                .withDescription("Invalid query result type: " + value.getResultType())
×
1841
                .asRuntimeException();
×
1842
        }
1843
      }
1844
    }
×
1845
    ctx.onCommit(
×
1846
        (historySize) -> {
1847
          data.clear();
×
1848
          ctx.unlockTimer("completeQuery");
×
1849
        });
×
1850
  }
×
1851

1852
  private static void failQueryWorkflowTask(
1853
      RequestContext ctx, WorkflowTaskData data, Object unused, long notUsed) {
1854
    data.consistentQueryRequests
×
1855
        .entrySet()
×
1856
        .removeIf(entry -> entry.getValue().getResult().isCancelled());
×
1857
    if (!data.consistentQueryRequests.isEmpty()) {
×
1858
      ctx.setNeedWorkflowTask(true);
×
1859
    }
1860
    ctx.unlockTimer("failQueryWorkflowTask");
×
1861
  }
×
1862

1863
  private static void failWorkflowTask(
1864
      RequestContext ctx,
1865
      WorkflowTaskData data,
1866
      RespondWorkflowTaskFailedRequest request,
1867
      long notUsed) {
1868
    WorkflowTaskFailedEventAttributes.Builder a =
1869
        WorkflowTaskFailedEventAttributes.newBuilder()
1✔
1870
            .setIdentity(request.getIdentity())
1✔
1871
            .setStartedEventId(data.startedEventId)
1✔
1872
            .setScheduledEventId(data.scheduledEventId)
1✔
1873
            .setCause(request.getCause());
1✔
1874
    if (request.hasFailure()) {
1✔
1875
      a.setFailure(request.getFailure());
1✔
1876
    }
1877
    HistoryEvent event =
1878
        HistoryEvent.newBuilder()
1✔
1879
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED)
1✔
1880
            .setWorkflowTaskFailedEventAttributes(a)
1✔
1881
            .build();
1✔
1882
    ctx.addEvent(event);
1✔
1883
    ctx.setNeedWorkflowTask(true);
1✔
1884
  }
1✔
1885

1886
  private static void timeoutWorkflowTask(
1887
      RequestContext ctx, WorkflowTaskData data, Object ignored, long notUsed) {
1888
    WorkflowTaskTimedOutEventAttributes.Builder a =
1889
        WorkflowTaskTimedOutEventAttributes.newBuilder()
1✔
1890
            .setStartedEventId(data.startedEventId)
1✔
1891
            .setTimeoutType(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE)
1✔
1892
            .setScheduledEventId(data.scheduledEventId);
1✔
1893
    HistoryEvent event =
1894
        HistoryEvent.newBuilder()
1✔
1895
            .setEventType(EventType.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT)
1✔
1896
            .setWorkflowTaskTimedOutEventAttributes(a)
1✔
1897
            .build();
1✔
1898
    ctx.addEvent(event);
1✔
1899
    ctx.setNeedWorkflowTask(true);
1✔
1900
  }
1✔
1901

1902
  private static void needsWorkflowTask(
1903
      RequestContext requestContext,
1904
      WorkflowTaskData workflowTaskData,
1905
      Object notUsedRequest,
1906
      long notUsed) {
1907
    requestContext.setNeedWorkflowTask(true);
1✔
1908
  }
1✔
1909

1910
  private static void completeActivityTask(
1911
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1912
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1913
    if (request instanceof RespondActivityTaskCompletedRequest) {
1!
1914
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
1915
    } else if (request instanceof RespondActivityTaskCompletedByIdRequest) {
×
1916
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIdRequest) request);
×
1917
    } else {
1918
      throw new IllegalArgumentException("Unknown request: " + request);
×
1919
    }
1920
  }
1✔
1921

1922
  private static void completeActivityTaskByTaskToken(
1923
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
1924
    ActivityTaskCompletedEventAttributes.Builder a =
1925
        ActivityTaskCompletedEventAttributes.newBuilder()
1✔
1926
            .setIdentity(request.getIdentity())
1✔
1927
            .setScheduledEventId(data.scheduledEventId)
1✔
1928
            .setResult(request.getResult())
1✔
1929
            .setStartedEventId(data.startedEventId);
1✔
1930
    HistoryEvent event =
1931
        HistoryEvent.newBuilder()
1✔
1932
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
1✔
1933
            .setActivityTaskCompletedEventAttributes(a)
1✔
1934
            .build();
1✔
1935
    ctx.addEvent(event);
1✔
1936
  }
1✔
1937

1938
  private static void completeActivityTaskById(
1939
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIdRequest request) {
1940
    ActivityTaskCompletedEventAttributes.Builder a =
1941
        ActivityTaskCompletedEventAttributes.newBuilder()
×
1942
            .setIdentity(request.getIdentity())
×
1943
            .setScheduledEventId(data.scheduledEventId)
×
1944
            .setResult(request.getResult())
×
1945
            .setStartedEventId(data.startedEventId);
×
1946
    HistoryEvent event =
1947
        HistoryEvent.newBuilder()
×
1948
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED)
×
1949
            .setActivityTaskCompletedEventAttributes(a)
×
1950
            .build();
×
1951
    ctx.addEvent(event);
×
1952
  }
×
1953

1954
  private static State failActivityTask(
1955
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1956
    if (request instanceof RespondActivityTaskFailedRequest) {
1!
1957
      RespondActivityTaskFailedRequest req = (RespondActivityTaskFailedRequest) request;
1✔
1958
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
1✔
1959
    } else if (request instanceof RespondActivityTaskFailedByIdRequest) {
×
1960
      RespondActivityTaskFailedByIdRequest req = (RespondActivityTaskFailedByIdRequest) request;
×
1961
      return failActivityTaskByRequestType(ctx, data, req.getFailure(), req.getIdentity());
×
1962
    } else {
1963
      throw new IllegalArgumentException("Unknown request: " + request);
×
1964
    }
1965
  }
1966

1967
  private static State failActivityTaskByRequestType(
1968
      RequestContext ctx, ActivityTaskData data, Failure failure, String identity) {
1969
    if (!failure.hasApplicationFailureInfo()) {
1!
1970
      throw new IllegalArgumentException(
×
1971
          "Failure must have ApplicationFailureInfo. Got: " + failure);
1972
    }
1973
    RetryState retryState = attemptActivityRetry(ctx, Optional.of(failure), data);
1✔
1974
    if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
1975
      return INITIATED;
1✔
1976
    }
1977
    data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1978
    ActivityTaskFailedEventAttributes.Builder attributes =
1979
        ActivityTaskFailedEventAttributes.newBuilder()
1✔
1980
            .setIdentity(identity)
1✔
1981
            .setScheduledEventId(data.scheduledEventId)
1✔
1982
            .setFailure(failure)
1✔
1983
            .setRetryState(retryState)
1✔
1984
            .setStartedEventId(data.startedEventId);
1✔
1985
    HistoryEvent event =
1986
        HistoryEvent.newBuilder()
1✔
1987
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_FAILED)
1✔
1988
            .setActivityTaskFailedEventAttributes(attributes)
1✔
1989
            .build();
1✔
1990
    ctx.addEvent(event);
1✔
1991
    return FAILED;
1✔
1992
  }
1993

1994
  private static State timeoutActivityTask(
1995
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1996
    Optional<Failure> previousFailure = data.retryState.getPreviousRunFailure();
1✔
1997

1998
    // chaining with the previous run failure if we are preparing the final failure
1999
    Failure failure =
1✔
2000
        newTimeoutFailure(timeoutType, Optional.ofNullable(data.heartbeatDetails), previousFailure);
1✔
2001

2002
    RetryState retryState;
2003
    switch (timeoutType) {
1!
2004
      case TIMEOUT_TYPE_SCHEDULE_TO_START:
2005
      case TIMEOUT_TYPE_SCHEDULE_TO_CLOSE:
2006
        // ScheduleToStart (queue timeout) is not retryable. Instead of the retry, a customer should
2007
        // set a larger ScheduleToStart timeout.
2008
        // ScheduleToClose timeout is final and failure is created with TIMEOUT retry state
2009
        retryState = RetryState.RETRY_STATE_TIMEOUT;
1✔
2010
        break;
1✔
2011
      case TIMEOUT_TYPE_START_TO_CLOSE:
2012
      case TIMEOUT_TYPE_HEARTBEAT:
2013
        // not chaining with the previous run failure if we are preparing the failure to be stored
2014
        // for the next iteration
2015
        Optional<Failure> lastFailure =
1✔
2016
            Optional.of(
1✔
2017
                newTimeoutFailure(
1✔
2018
                    timeoutType,
2019
                    // we move heartbeatDetails to the new top level (this cause is used for
2020
                    // scheduleToClose only)
2021
                    Optional.empty(),
1✔
2022
                    // prune to don't have too deep nesting of failures
2023
                    Optional.empty()));
1✔
2024

2025
        retryState = attemptActivityRetry(ctx, lastFailure, data);
1✔
2026
        if (retryState == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2027
          return INITIATED;
1✔
2028
        } else if (retryState == RetryState.RETRY_STATE_TIMEOUT) {
1✔
2029
          // if retryState = RETRY_STATE_TIMEOUT, it means scheduleToClose timeout happened inside
2030
          // attemptActivityRetry();
2031
          // start to close timeout would return as "max attempts reached".
2032

2033
          Preconditions.checkState(
1!
2034
              timeoutType == TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE
2035
                  || timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT,
2036
              "Unexpected timeout type: %s. We should end up here only in case of HEARTBEAT_TIMEOUT or START_TO_CLOSE_TIMEOUT",
2037
              timeoutType);
2038

2039
          // heartbeat is preserved as the cause for the scheduleToClose timeout
2040
          // But we effectively omit startToClose timeout with scheduleToClose timeout
2041
          Optional<Failure> cause =
2042
              timeoutType == TimeoutType.TIMEOUT_TYPE_HEARTBEAT ? lastFailure : previousFailure;
1✔
2043

2044
          failure =
1✔
2045
              newTimeoutFailure(
1✔
2046
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
2047
                  Optional.ofNullable(data.heartbeatDetails),
1✔
2048
                  cause);
2049
        }
1✔
2050
        break;
2051
      default:
2052
        throw new IllegalStateException(
×
2053
            "Not implemented behavior for timeout type: " + timeoutType);
2054
    }
2055

2056
    ActivityTaskTimedOutEventAttributes.Builder a =
2057
        ActivityTaskTimedOutEventAttributes.newBuilder()
1✔
2058
            .setScheduledEventId(data.scheduledEventId)
1✔
2059
            .setRetryState(retryState)
1✔
2060
            .setStartedEventId(data.startedEventId)
1✔
2061
            .setFailure(failure);
1✔
2062
    HistoryEvent event =
2063
        HistoryEvent.newBuilder()
1✔
2064
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT)
1✔
2065
            .setActivityTaskTimedOutEventAttributes(a)
1✔
2066
            .build();
1✔
2067
    ctx.addEvent(event);
1✔
2068
    return TIMED_OUT;
1✔
2069
  }
2070

2071
  private static Failure newTimeoutFailure(
2072
      TimeoutType timeoutType, Optional<Payloads> lastHeartbeatDetails, Optional<Failure> cause) {
2073
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
2074
    if (lastHeartbeatDetails.isPresent()) {
1✔
2075
      info.setLastHeartbeatDetails(lastHeartbeatDetails.get());
1✔
2076
    }
2077
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
2078
    if (cause.isPresent()) {
1✔
2079
      result.setCause(cause.get());
1✔
2080
    }
2081
    return result.build();
1✔
2082
  }
2083

2084
  private static RetryState attemptActivityRetry(
2085
      RequestContext ctx, Optional<Failure> failure, ActivityTaskData data) {
2086
    if (data.retryState == null) {
1!
2087
      throw new IllegalStateException("RetryPolicy is always present");
×
2088
    }
2089
    Optional<ApplicationFailureInfo> info = failure.map(Failure::getApplicationFailureInfo);
1✔
2090
    Optional<java.time.Duration> nextRetryDelay = Optional.empty();
1✔
2091

2092
    if (info.isPresent()) {
1!
2093
      if (info.get().getNonRetryable()) {
1✔
2094
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
1✔
2095
      }
2096
      if (info.get().hasNextRetryDelay()) {
1✔
2097
        nextRetryDelay =
1✔
2098
            Optional.ofNullable(ProtobufTimeUtils.toJavaDuration(info.get().getNextRetryDelay()));
1✔
2099
      }
2100
    }
2101

2102
    TestServiceRetryState nextAttempt = data.retryState.getNextAttempt(failure);
1✔
2103
    TestServiceRetryState.BackoffInterval backoffInterval =
1✔
2104
        data.retryState.getBackoffIntervalInSeconds(
1✔
2105
            info.map(ApplicationFailureInfo::getType), data.store.currentTime(), nextRetryDelay);
1✔
2106
    if (backoffInterval.getRetryState() == RetryState.RETRY_STATE_IN_PROGRESS) {
1✔
2107
      data.nextBackoffInterval = ProtobufTimeUtils.toProtoDuration(backoffInterval.getInterval());
1✔
2108
      PollActivityTaskQueueResponse.Builder task = data.activityTask.getTask();
1✔
2109
      if (data.heartbeatDetails != null) {
1✔
2110
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
2111
      }
2112
      ctx.onCommit(
1✔
2113
          (historySize) -> {
2114
            data.retryState = nextAttempt;
1✔
2115
            task.setAttempt(nextAttempt.getAttempt());
1✔
2116
            task.setCurrentAttemptScheduledTime(ctx.currentTime());
1✔
2117
          });
1✔
2118
    } else {
1✔
2119
      data.nextBackoffInterval = Durations.ZERO;
1✔
2120
    }
2121
    return backoffInterval.getRetryState();
1✔
2122
  }
2123

2124
  private static void reportActivityTaskCancellation(
2125
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
2126
    Payloads details = null;
1✔
2127
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
2128
      {
2129
        RespondActivityTaskCanceledRequest cr = (RespondActivityTaskCanceledRequest) request;
1✔
2130

2131
        details = cr.hasDetails() ? cr.getDetails() : null;
1!
2132
      }
1✔
2133
    } else if (request instanceof RespondActivityTaskCanceledByIdRequest) {
1!
2134
      {
2135
        RespondActivityTaskCanceledByIdRequest cr =
×
2136
            (RespondActivityTaskCanceledByIdRequest) request;
2137
        details = cr.hasDetails() ? cr.getDetails() : null;
×
2138
      }
×
2139
    } else if (request != null) {
1!
2140
      throw Status.INTERNAL
×
2141
          .withDescription("Unexpected request type: " + request)
×
2142
          .asRuntimeException();
×
2143
    }
2144
    ActivityTaskCanceledEventAttributes.Builder a =
2145
        ActivityTaskCanceledEventAttributes.newBuilder()
1✔
2146
            .setScheduledEventId(data.scheduledEventId)
1✔
2147
            .setStartedEventId(data.startedEventId);
1✔
2148
    if (details != null) {
1!
2149
      a.setDetails(details);
×
2150
    }
2151
    HistoryEvent event =
2152
        HistoryEvent.newBuilder()
1✔
2153
            .setEventType(EventType.EVENT_TYPE_ACTIVITY_TASK_CANCELED)
1✔
2154
            .setActivityTaskCanceledEventAttributes(a)
1✔
2155
            .build();
1✔
2156
    ctx.addEvent(event);
1✔
2157
  }
1✔
2158

2159
  private static void heartbeatActivityTask(
2160
      RequestContext nullCtx, ActivityTaskData data, Payloads details, long notUsed) {
2161
    data.heartbeatDetails = details;
1✔
2162
  }
1✔
2163

2164
  private static void acceptUpdate(
2165
      RequestContext ctx,
2166
      UpdateWorkflowExecutionData data,
2167
      Message msg,
2168
      long workflowTaskCompletedEventId) {
2169
    try {
2170
      Acceptance acceptance = msg.getBody().unpack(Acceptance.class);
1✔
2171

2172
      WorkflowExecutionUpdateAcceptedEventAttributes acceptedAttribute =
2173
          WorkflowExecutionUpdateAcceptedEventAttributes.newBuilder()
1✔
2174
              .setAcceptedRequestSequencingEventId(workflowTaskCompletedEventId - 1)
1✔
2175
              .setProtocolInstanceId(msg.getProtocolInstanceId())
1✔
2176
              .setAcceptedRequestMessageId(acceptance.getAcceptedRequestMessageId())
1✔
2177
              .setAcceptedRequest(data.initialRequest)
1✔
2178
              .build();
1✔
2179

2180
      HistoryEvent event =
2181
          HistoryEvent.newBuilder()
1✔
2182
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED)
1✔
2183
              .setWorkflowExecutionUpdateAcceptedEventAttributes(acceptedAttribute)
1✔
2184
              .build();
1✔
2185
      // If the workflow is finished we can't write more events
2186
      // to history so if the message was processed after the workflow
2187
      // was closed there is nothing we can do.
2188
      // The real server also has this same problem
2189
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1!
2190
        ctx.addEvent(event);
1✔
2191
      }
2192
      ctx.onCommit(
1✔
2193
          (int historySize) -> {
2194
            data.accepted.complete(true);
1✔
2195
          });
1✔
2196
    } catch (InvalidProtocolBufferException e) {
×
2197
      throw new RuntimeException(e);
×
2198
    }
1✔
2199
  }
1✔
2200

2201
  private static void completeUpdate(
2202
      RequestContext ctx,
2203
      UpdateWorkflowExecutionData data,
2204
      Message msg,
2205
      long workflowTaskCompletedEventId) {
2206
    try {
2207
      Response response = msg.getBody().unpack(Response.class);
1✔
2208

2209
      WorkflowExecutionUpdateCompletedEventAttributes completedAttribute =
2210
          WorkflowExecutionUpdateCompletedEventAttributes.newBuilder()
1✔
2211
              .setMeta(response.getMeta())
1✔
2212
              .setOutcome(response.getOutcome())
1✔
2213
              .build();
1✔
2214

2215
      HistoryEvent event =
2216
          HistoryEvent.newBuilder()
1✔
2217
              .setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED)
1✔
2218
              .setWorkflowExecutionUpdateCompletedEventAttributes(completedAttribute)
1✔
2219
              .build();
1✔
2220
      // If the workflow is finished we can't write more events
2221
      // to history so if the message was processed after the workflow
2222
      // was closed there is nothing we can do.
2223
      // The real server also has this same problem
2224
      if (!ctx.getWorkflowMutableState().isTerminalState()) {
1!
2225
        ctx.addEvent(event);
1✔
2226
      }
2227
      ctx.onCommit(
1✔
2228
          (int historySize) -> {
2229
            data.outcome.complete(response.getOutcome());
1✔
2230
          });
1✔
2231
    } catch (InvalidProtocolBufferException e) {
×
2232
      throw new RuntimeException(e);
×
2233
    }
1✔
2234
  }
1✔
2235

2236
  private static void startTimer(
2237
      RequestContext ctx,
2238
      TimerData data,
2239
      StartTimerCommandAttributes d,
2240
      long workflowTaskCompletedEventId) {
2241
    TimerStartedEventAttributes.Builder a =
2242
        TimerStartedEventAttributes.newBuilder()
1✔
2243
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2244
            .setStartToFireTimeout(d.getStartToFireTimeout())
1✔
2245
            .setTimerId(d.getTimerId());
1✔
2246
    HistoryEvent event =
2247
        HistoryEvent.newBuilder()
1✔
2248
            .setEventType(EventType.EVENT_TYPE_TIMER_STARTED)
1✔
2249
            .setTimerStartedEventAttributes(a)
1✔
2250
            .build();
1✔
2251
    long startedEventId = ctx.addEvent(event);
1✔
2252
    ctx.onCommit(
1✔
2253
        (historySize) -> {
2254
          data.startedEvent = a.build();
1✔
2255
          data.startedEventId = startedEventId;
1✔
2256
        });
1✔
2257
  }
1✔
2258

2259
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
2260
    TimerFiredEventAttributes.Builder a =
2261
        TimerFiredEventAttributes.newBuilder()
1✔
2262
            .setTimerId(data.startedEvent.getTimerId())
1✔
2263
            .setStartedEventId(data.startedEventId);
1✔
2264
    HistoryEvent event =
2265
        HistoryEvent.newBuilder()
1✔
2266
            .setEventType(EventType.EVENT_TYPE_TIMER_FIRED)
1✔
2267
            .setTimerFiredEventAttributes(a)
1✔
2268
            .build();
1✔
2269
    ctx.addEvent(event);
1✔
2270
  }
1✔
2271

2272
  private static void cancelTimer(
2273
      RequestContext ctx,
2274
      TimerData data,
2275
      CancelTimerCommandAttributes d,
2276
      long workflowTaskCompletedEventId) {
2277
    TimerCanceledEventAttributes.Builder a =
2278
        TimerCanceledEventAttributes.newBuilder()
1✔
2279
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2280
            .setTimerId(d.getTimerId())
1✔
2281
            .setStartedEventId(data.startedEventId);
1✔
2282
    HistoryEvent event =
2283
        HistoryEvent.newBuilder()
1✔
2284
            .setEventType(EventType.EVENT_TYPE_TIMER_CANCELED)
1✔
2285
            .setTimerCanceledEventAttributes(a)
1✔
2286
            .build();
1✔
2287
    ctx.addEvent(event);
1✔
2288
  }
1✔
2289

2290
  private static void initiateExternalSignal(
2291
      RequestContext ctx,
2292
      SignalExternalData data,
2293
      SignalExternalWorkflowExecutionCommandAttributes d,
2294
      long workflowTaskCompletedEventId) {
2295
    SignalExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2296
        SignalExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2297
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2298
            .setControl(d.getControl())
1✔
2299
            .setInput(d.getInput())
1✔
2300
            .setNamespace(d.getNamespace())
1✔
2301
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2302
            .setSignalName(d.getSignalName())
1✔
2303
            .setWorkflowExecution(d.getExecution());
1✔
2304

2305
    HistoryEvent event =
2306
        HistoryEvent.newBuilder()
1✔
2307
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2308
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2309
            .build();
1✔
2310
    long initiatedEventId = ctx.addEvent(event);
1✔
2311
    ctx.onCommit(
1✔
2312
        (historySize) -> {
2313
          data.initiatedEventId = initiatedEventId;
1✔
2314
          data.initiatedEvent = a.build();
1✔
2315
        });
1✔
2316
  }
1✔
2317

2318
  private static void failExternalSignal(
2319
      RequestContext ctx,
2320
      SignalExternalData data,
2321
      SignalExternalWorkflowExecutionFailedCause cause,
2322
      long notUsed) {
2323
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2324
    SignalExternalWorkflowExecutionFailedEventAttributes.Builder a =
2325
        SignalExternalWorkflowExecutionFailedEventAttributes.newBuilder()
1✔
2326
            .setInitiatedEventId(data.initiatedEventId)
1✔
2327
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
2328
            .setControl(initiatedEvent.getControl())
1✔
2329
            .setCause(cause)
1✔
2330
            .setNamespace(initiatedEvent.getNamespace());
1✔
2331
    HistoryEvent event =
2332
        HistoryEvent.newBuilder()
1✔
2333
            .setEventType(EventType.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
1✔
2334
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a)
1✔
2335
            .build();
1✔
2336
    ctx.addEvent(event);
1✔
2337
  }
1✔
2338

2339
  private static void completeExternalSignal(
2340
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
2341
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
2342
    WorkflowExecution signaledExecution =
1✔
2343
        initiatedEvent.getWorkflowExecution().toBuilder().setRunId(runId).build();
1✔
2344
    ExternalWorkflowExecutionSignaledEventAttributes.Builder a =
2345
        ExternalWorkflowExecutionSignaledEventAttributes.newBuilder()
1✔
2346
            .setInitiatedEventId(data.initiatedEventId)
1✔
2347
            .setWorkflowExecution(signaledExecution)
1✔
2348
            .setControl(initiatedEvent.getControl())
1✔
2349
            .setNamespace(initiatedEvent.getNamespace());
1✔
2350
    HistoryEvent event =
2351
        HistoryEvent.newBuilder()
1✔
2352
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED)
1✔
2353
            .setExternalWorkflowExecutionSignaledEventAttributes(a)
1✔
2354
            .build();
1✔
2355
    ctx.addEvent(event);
1✔
2356
  }
1✔
2357

2358
  private static void initiateExternalCancellation(
2359
      RequestContext ctx,
2360
      CancelExternalData data,
2361
      RequestCancelExternalWorkflowExecutionCommandAttributes d,
2362
      long workflowTaskCompletedEventId) {
2363
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Builder a =
2364
        RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.newBuilder()
1✔
2365
            .setWorkflowTaskCompletedEventId(workflowTaskCompletedEventId)
1✔
2366
            .setControl(d.getControl())
1✔
2367
            .setNamespace(d.getNamespace())
1✔
2368
            .setChildWorkflowOnly(d.getChildWorkflowOnly())
1✔
2369
            .setWorkflowExecution(
1✔
2370
                WorkflowExecution.newBuilder()
1✔
2371
                    .setWorkflowId(d.getWorkflowId())
1✔
2372
                    .setRunId(d.getRunId())
1✔
2373
                    .build());
1✔
2374

2375
    HistoryEvent event =
2376
        HistoryEvent.newBuilder()
1✔
2377
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED)
1✔
2378
            .setRequestCancelExternalWorkflowExecutionInitiatedEventAttributes(a)
1✔
2379
            .build();
1✔
2380
    long initiatedEventId = ctx.addEvent(event);
1✔
2381
    ctx.onCommit(
1✔
2382
        (historySize) -> {
2383
          data.initiatedEventId = initiatedEventId;
1✔
2384
          data.initiatedEvent = a.build();
1✔
2385
        });
1✔
2386
  }
1✔
2387

2388
  private static void reportExternalCancellationRequested(
2389
      RequestContext ctx, CancelExternalData data, String runId, long notUsed) {
2390
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
1✔
2391
        data.initiatedEvent;
2392
    ExternalWorkflowExecutionCancelRequestedEventAttributes.Builder a =
2393
        ExternalWorkflowExecutionCancelRequestedEventAttributes.newBuilder()
1✔
2394
            .setInitiatedEventId(data.initiatedEventId)
1✔
2395
            .setWorkflowExecution(
1✔
2396
                WorkflowExecution.newBuilder()
1✔
2397
                    .setRunId(runId)
1✔
2398
                    .setWorkflowId(initiatedEvent.getWorkflowExecution().getWorkflowId())
1✔
2399
                    .build())
1✔
2400
            .setNamespace(initiatedEvent.getNamespace());
1✔
2401
    HistoryEvent event =
2402
        HistoryEvent.newBuilder()
1✔
2403
            .setEventType(EventType.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED)
1✔
2404
            .setExternalWorkflowExecutionCancelRequestedEventAttributes(a)
1✔
2405
            .build();
1✔
2406
    ctx.addEvent(event);
1✔
2407
  }
1✔
2408

2409
  private static void failExternalCancellation(
2410
      RequestContext ctx,
2411
      CancelExternalData data,
2412
      CancelExternalWorkflowExecutionFailedCause cause,
2413
      long notUsed) {
2414
    RequestCancelExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent =
×
2415
        data.initiatedEvent;
2416
    RequestCancelExternalWorkflowExecutionFailedEventAttributes.Builder a =
2417
        RequestCancelExternalWorkflowExecutionFailedEventAttributes.newBuilder()
×
2418
            .setInitiatedEventId(data.initiatedEventId)
×
2419
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
×
2420
            .setControl(initiatedEvent.getControl())
×
2421
            .setCause(cause)
×
2422
            .setNamespace(initiatedEvent.getNamespace());
×
2423
    HistoryEvent event =
2424
        HistoryEvent.newBuilder()
×
2425
            .setEventType(EventType.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED)
×
2426
            .setRequestCancelExternalWorkflowExecutionFailedEventAttributes(a)
×
2427
            .build();
×
2428
    ctx.addEvent(event);
×
2429
  }
×
2430

2431
  // Mimics the default activity retry policy of a standard Temporal server.
2432
  static RetryPolicy ensureDefaultFieldsForActivityRetryPolicy(RetryPolicy originalPolicy) {
2433
    Duration initialInterval =
2434
        Durations.compare(originalPolicy.getInitialInterval(), Durations.ZERO) == 0
1✔
2435
            ? DEFAULT_ACTIVITY_RETRY_INITIAL_INTERVAL
1✔
2436
            : originalPolicy.getInitialInterval();
1✔
2437

2438
    return RetryPolicy.newBuilder()
1✔
2439
        .setInitialInterval(initialInterval)
1✔
2440
        .addAllNonRetryableErrorTypes(originalPolicy.getNonRetryableErrorTypesList())
1✔
2441
        .setMaximumInterval(
1✔
2442
            Durations.compare(originalPolicy.getMaximumInterval(), Durations.ZERO) == 0
1✔
2443
                ? Durations.fromMillis(
1✔
2444
                    DEFAULT_ACTIVITY_MAXIMUM_INTERVAL_COEFFICIENT
2445
                        * Durations.toMillis(initialInterval))
1✔
2446
                : originalPolicy.getMaximumInterval())
1✔
2447
        .setBackoffCoefficient(
1✔
2448
            originalPolicy.getBackoffCoefficient() == 0
1✔
2449
                ? DEFAULT_ACTIVITY_RETRY_BACKOFF_COEFFICIENT
1✔
2450
                : originalPolicy.getBackoffCoefficient())
1✔
2451
        .setMaximumAttempts(
1✔
2452
            originalPolicy.getMaximumAttempts() == 0
1✔
2453
                ? DEFAULT_ACTIVITY_RETRY_MAXIMUM_ATTEMPTS
1✔
2454
                : originalPolicy.getMaximumAttempts())
1✔
2455
        .build();
1✔
2456
  }
2457

2458
  static RetryPolicy defaultNexusRetryPolicy() {
2459
    return RetryPolicy.newBuilder()
1✔
2460
        .addAllNonRetryableErrorTypes(
1✔
2461
            Arrays.asList(
1✔
2462
                "BAD_REQUEST", "INVALID_ARGUMENT", "NOT_FOUND", "DEADLINE_EXCEEDED", "CANCELLED"))
2463
        .setInitialInterval(Durations.fromSeconds(1))
1✔
2464
        .setMaximumInterval(Durations.fromHours(1))
1✔
2465
        .setBackoffCoefficient(2.0)
1✔
2466
        .build();
1✔
2467
  }
2468
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc