• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1737

pending completion
1737

push

buildkite

GitHub
[TestEnv] populate cronschedule in history start event and correct isCron in the list workflow response (#790)

Issue

TestEnv doesn't populate cronschedule in history for cron-workflows. This is blocking testing interceptors.

Changes

populate cronschedule on the start event
correct isCron in list workflows

5 of 5 new or added lines in 2 files covered. (100.0%)

11119 of 18396 relevant lines covered (60.44%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.55
/src/main/java/com/uber/cadence/internal/testservice/StateMachines.java
1
/*
2
 *  Modifications Copyright (c) 2017-2020 Uber Technologies Inc.
3
 *  Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
4
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.testservice;
19

20
import static com.uber.cadence.internal.testservice.StateMachines.Action.CANCEL;
21
import static com.uber.cadence.internal.testservice.StateMachines.Action.COMPLETE;
22
import static com.uber.cadence.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
23
import static com.uber.cadence.internal.testservice.StateMachines.Action.FAIL;
24
import static com.uber.cadence.internal.testservice.StateMachines.Action.INITIATE;
25
import static com.uber.cadence.internal.testservice.StateMachines.Action.REQUEST_CANCELLATION;
26
import static com.uber.cadence.internal.testservice.StateMachines.Action.START;
27
import static com.uber.cadence.internal.testservice.StateMachines.Action.TIME_OUT;
28
import static com.uber.cadence.internal.testservice.StateMachines.Action.UPDATE;
29
import static com.uber.cadence.internal.testservice.StateMachines.State.CANCELED;
30
import static com.uber.cadence.internal.testservice.StateMachines.State.CANCELLATION_REQUESTED;
31
import static com.uber.cadence.internal.testservice.StateMachines.State.COMPLETED;
32
import static com.uber.cadence.internal.testservice.StateMachines.State.CONTINUED_AS_NEW;
33
import static com.uber.cadence.internal.testservice.StateMachines.State.FAILED;
34
import static com.uber.cadence.internal.testservice.StateMachines.State.INITIATED;
35
import static com.uber.cadence.internal.testservice.StateMachines.State.NONE;
36
import static com.uber.cadence.internal.testservice.StateMachines.State.STARTED;
37
import static com.uber.cadence.internal.testservice.StateMachines.State.TIMED_OUT;
38

39
import com.uber.cadence.ActivityTaskCancelRequestedEventAttributes;
40
import com.uber.cadence.ActivityTaskCanceledEventAttributes;
41
import com.uber.cadence.ActivityTaskCompletedEventAttributes;
42
import com.uber.cadence.ActivityTaskFailedEventAttributes;
43
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
44
import com.uber.cadence.ActivityTaskStartedEventAttributes;
45
import com.uber.cadence.ActivityTaskTimedOutEventAttributes;
46
import com.uber.cadence.BadRequestError;
47
import com.uber.cadence.CancelTimerDecisionAttributes;
48
import com.uber.cadence.CancelWorkflowExecutionDecisionAttributes;
49
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
50
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
51
import com.uber.cadence.ChildWorkflowExecutionFailedCause;
52
import com.uber.cadence.ChildWorkflowExecutionFailedEventAttributes;
53
import com.uber.cadence.ChildWorkflowExecutionStartedEventAttributes;
54
import com.uber.cadence.ChildWorkflowExecutionTimedOutEventAttributes;
55
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
56
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
57
import com.uber.cadence.DecisionTaskCompletedEventAttributes;
58
import com.uber.cadence.DecisionTaskFailedEventAttributes;
59
import com.uber.cadence.DecisionTaskScheduledEventAttributes;
60
import com.uber.cadence.DecisionTaskStartedEventAttributes;
61
import com.uber.cadence.DecisionTaskTimedOutEventAttributes;
62
import com.uber.cadence.EntityNotExistsError;
63
import com.uber.cadence.EventType;
64
import com.uber.cadence.ExternalWorkflowExecutionSignaledEventAttributes;
65
import com.uber.cadence.FailWorkflowExecutionDecisionAttributes;
66
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
67
import com.uber.cadence.History;
68
import com.uber.cadence.HistoryEvent;
69
import com.uber.cadence.InternalServiceError;
70
import com.uber.cadence.PollForActivityTaskRequest;
71
import com.uber.cadence.PollForActivityTaskResponse;
72
import com.uber.cadence.PollForDecisionTaskRequest;
73
import com.uber.cadence.PollForDecisionTaskResponse;
74
import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes;
75
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
76
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
77
import com.uber.cadence.RespondActivityTaskCanceledRequest;
78
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
79
import com.uber.cadence.RespondActivityTaskCompletedRequest;
80
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
81
import com.uber.cadence.RespondActivityTaskFailedRequest;
82
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
83
import com.uber.cadence.RespondDecisionTaskFailedRequest;
84
import com.uber.cadence.RetryPolicy;
85
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
86
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
87
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
88
import com.uber.cadence.SignalExternalWorkflowExecutionFailedEventAttributes;
89
import com.uber.cadence.SignalExternalWorkflowExecutionInitiatedEventAttributes;
90
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
91
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
92
import com.uber.cadence.StartChildWorkflowExecutionInitiatedEventAttributes;
93
import com.uber.cadence.StartTimerDecisionAttributes;
94
import com.uber.cadence.StartWorkflowExecutionRequest;
95
import com.uber.cadence.TimeoutType;
96
import com.uber.cadence.TimerCanceledEventAttributes;
97
import com.uber.cadence.TimerFiredEventAttributes;
98
import com.uber.cadence.TimerStartedEventAttributes;
99
import com.uber.cadence.WorkflowExecution;
100
import com.uber.cadence.WorkflowExecutionAlreadyStartedError;
101
import com.uber.cadence.WorkflowExecutionCancelRequestedEventAttributes;
102
import com.uber.cadence.WorkflowExecutionCanceledEventAttributes;
103
import com.uber.cadence.WorkflowExecutionCompletedEventAttributes;
104
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
105
import com.uber.cadence.WorkflowExecutionFailedEventAttributes;
106
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
107
import com.uber.cadence.WorkflowExecutionTimedOutEventAttributes;
108
import com.uber.cadence.internal.testservice.TestWorkflowStore.ActivityTask;
109
import com.uber.cadence.internal.testservice.TestWorkflowStore.DecisionTask;
110
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
111
import java.util.List;
112
import java.util.Optional;
113
import java.util.OptionalLong;
114
import java.util.concurrent.ForkJoinPool;
115
import java.util.concurrent.TimeUnit;
116
import org.slf4j.Logger;
117
import org.slf4j.LoggerFactory;
118

119
class StateMachines {
×
120

121
  private static final Logger log = LoggerFactory.getLogger(StateMachines.class);
1✔
122

123
  private static final int NO_EVENT_ID = -1;
124
  private static final String TIMEOUT_ERROR_REASON = "cadenceInternal:Timeout";
125

126
  enum State {
1✔
127
    NONE,
1✔
128
    INITIATED,
1✔
129
    STARTED,
1✔
130
    FAILED,
1✔
131
    TIMED_OUT,
1✔
132
    CANCELLATION_REQUESTED,
1✔
133
    CANCELED,
1✔
134
    COMPLETED,
1✔
135
    CONTINUED_AS_NEW,
1✔
136
  }
137

138
  enum Action {
1✔
139
    INITIATE,
1✔
140
    START,
1✔
141
    FAIL,
1✔
142
    TIME_OUT,
1✔
143
    REQUEST_CANCELLATION,
1✔
144
    CANCEL,
1✔
145
    UPDATE,
1✔
146
    COMPLETE,
1✔
147
    CONTINUE_AS_NEW
1✔
148
  }
149

150
  static final class WorkflowData {
151
    Optional<RetryState> retryState = Optional.empty();
1✔
152
    int backoffStartIntervalInSeconds;
153
    String cronSchedule;
154
    byte[] lastCompletionResult;
155
    String originalExecutionRunId;
156
    Optional<String> continuedExecutionRunId;
157

158
    WorkflowData(
159
        Optional<RetryState> retryState,
160
        int backoffStartIntervalInSeconds,
161
        String cronSchedule,
162
        byte[] lastCompletionResult,
163
        String originalExecutionRunId,
164
        Optional<String> continuedExecutionRunId) {
1✔
165
      this.retryState = retryState;
1✔
166
      this.backoffStartIntervalInSeconds = backoffStartIntervalInSeconds;
1✔
167
      this.cronSchedule = cronSchedule;
1✔
168
      this.lastCompletionResult = lastCompletionResult;
1✔
169
      this.originalExecutionRunId = originalExecutionRunId;
1✔
170
      this.continuedExecutionRunId = continuedExecutionRunId;
1✔
171
    }
1✔
172
  }
173

174
  static final class DecisionTaskData {
175

176
    final long previousStartedEventId;
177

178
    final TestWorkflowStore store;
179

180
    long startedEventId = NO_EVENT_ID;
1✔
181

182
    PollForDecisionTaskResponse decisionTask;
183

184
    long scheduledEventId = NO_EVENT_ID;
1✔
185

186
    int attempt;
187

188
    DecisionTaskData(long previousStartedEventId, TestWorkflowStore store) {
1✔
189
      this.previousStartedEventId = previousStartedEventId;
1✔
190
      this.store = store;
1✔
191
    }
1✔
192
  }
193

194
  static final class ActivityTaskData {
195

196
    StartWorkflowExecutionRequest startWorkflowExecutionRequest;
197
    ActivityTaskScheduledEventAttributes scheduledEvent;
198
    ActivityTask activityTask;
199

200
    final TestWorkflowStore store;
201

202
    long scheduledEventId = NO_EVENT_ID;
1✔
203
    long startedEventId = NO_EVENT_ID;
1✔
204
    public HistoryEvent startedEvent;
205
    byte[] heartbeatDetails;
206
    long lastHeartbeatTime;
207
    RetryState retryState;
208
    long nextBackoffIntervalSeconds;
209

210
    ActivityTaskData(
211
        TestWorkflowStore store, StartWorkflowExecutionRequest startWorkflowExecutionRequest) {
1✔
212
      this.store = store;
1✔
213
      this.startWorkflowExecutionRequest = startWorkflowExecutionRequest;
1✔
214
    }
1✔
215
  }
216

217
  static final class SignalExternalData {
1✔
218

219
    long initiatedEventId = NO_EVENT_ID;
1✔
220
    public SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent;
221
  }
222

223
  static final class ChildWorkflowData {
224

225
    final TestWorkflowService service;
226
    StartChildWorkflowExecutionInitiatedEventAttributes initiatedEvent;
227
    long initiatedEventId;
228
    long startedEventId;
229
    WorkflowExecution execution;
230

231
    public ChildWorkflowData(TestWorkflowService service) {
1✔
232
      this.service = service;
1✔
233
    }
1✔
234
  }
235

236
  static final class TimerData {
1✔
237

238
    TimerStartedEventAttributes startedEvent;
239
    public long startedEventId;
240
  }
241

242
  static StateMachine<WorkflowData> newWorkflowStateMachine(WorkflowData data) {
243
    return new StateMachine<>(data)
1✔
244
        .add(NONE, START, STARTED, StateMachines::startWorkflow)
1✔
245
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
246
        .add(STARTED, CONTINUE_AS_NEW, CONTINUED_AS_NEW, StateMachines::continueAsNewWorkflow)
1✔
247
        .add(STARTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
248
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow)
1✔
249
        .add(
1✔
250
            STARTED,
251
            REQUEST_CANCELLATION,
252
            CANCELLATION_REQUESTED,
253
            StateMachines::requestWorkflowCancellation)
254
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeWorkflow)
1✔
255
        .add(CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::cancelWorkflow)
1✔
256
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failWorkflow)
1✔
257
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
1✔
258
  }
259

260
  static StateMachine<DecisionTaskData> newDecisionStateMachine(
261
      long previousStartedEventId, TestWorkflowStore store) {
262
    return new StateMachine<>(new DecisionTaskData(previousStartedEventId, store))
1✔
263
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleDecisionTask)
1✔
264
        .add(INITIATED, START, STARTED, StateMachines::startDecisionTask)
1✔
265
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeDecisionTask)
1✔
266
        .add(STARTED, FAIL, FAILED, StateMachines::failDecisionTask)
1✔
267
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutDecisionTask)
1✔
268
        .add(TIMED_OUT, INITIATE, INITIATED, StateMachines::scheduleDecisionTask)
1✔
269
        .add(FAILED, INITIATE, INITIATED, StateMachines::scheduleDecisionTask);
1✔
270
  }
271

272
  public static StateMachine<ActivityTaskData> newActivityStateMachine(
273
      TestWorkflowStore store, StartWorkflowExecutionRequest workflowStartedEvent) {
274
    return new StateMachine<>(new ActivityTaskData(store, workflowStartedEvent))
1✔
275
        .add(NONE, INITIATE, INITIATED, StateMachines::scheduleActivityTask)
1✔
276
        .add(INITIATED, START, STARTED, StateMachines::startActivityTask)
1✔
277
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
278
        .add(
1✔
279
            INITIATED,
280
            REQUEST_CANCELLATION,
281
            CANCELLATION_REQUESTED,
282
            StateMachines::requestActivityCancellation)
283
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
284
        // Transitions to initiated in case of the a retry
285
        .add(STARTED, FAIL, new State[] {FAILED, INITIATED}, StateMachines::failActivityTask)
1✔
286
        // Transitions to initiated in case of a retry
287
        .add(
1✔
288
            STARTED,
289
            TIME_OUT,
290
            new State[] {TIMED_OUT, INITIATED},
291
            StateMachines::timeoutActivityTask)
292
        .add(STARTED, UPDATE, STARTED, StateMachines::heartbeatActivityTask)
1✔
293
        .add(
1✔
294
            STARTED,
295
            REQUEST_CANCELLATION,
296
            CANCELLATION_REQUESTED,
297
            StateMachines::requestActivityCancellation)
298
        .add(
1✔
299
            CANCELLATION_REQUESTED, CANCEL, CANCELED, StateMachines::reportActivityTaskCancellation)
300
        .add(CANCELLATION_REQUESTED, COMPLETE, COMPLETED, StateMachines::completeActivityTask)
1✔
301
        .add(
1✔
302
            CANCELLATION_REQUESTED,
303
            UPDATE,
304
            CANCELLATION_REQUESTED,
305
            StateMachines::heartbeatActivityTask)
306
        .add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutActivityTask)
1✔
307
        .add(CANCELLATION_REQUESTED, FAIL, FAILED, StateMachines::failActivityTask);
1✔
308
  }
309

310
  public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
311
      TestWorkflowService service) {
312
    return new StateMachine<>(new ChildWorkflowData(service))
1✔
313
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateChildWorkflow)
1✔
314
        .add(INITIATED, START, STARTED, StateMachines::childWorkflowStarted)
1✔
315
        .add(INITIATED, FAIL, FAILED, StateMachines::startChildWorkflowFailed)
1✔
316
        .add(INITIATED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
317
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
1✔
318
        .add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
1✔
319
        .add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
1✔
320
        .add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
1✔
321
  }
322

323
  public static StateMachine<TimerData> newTimerStateMachine() {
324
    return new StateMachine<>(new TimerData())
1✔
325
        .add(NONE, START, STARTED, StateMachines::startTimer)
1✔
326
        .add(STARTED, COMPLETE, COMPLETED, StateMachines::fireTimer)
1✔
327
        .add(STARTED, CANCEL, CANCELED, StateMachines::cancelTimer);
1✔
328
  }
329

330
  public static StateMachine<SignalExternalData> newSignalExternalStateMachine() {
331
    return new StateMachine<>(new SignalExternalData())
1✔
332
        .add(NONE, INITIATE, INITIATED, StateMachines::initiateExternalSignal)
1✔
333
        .add(INITIATED, FAIL, FAILED, StateMachines::failExternalSignal)
1✔
334
        .add(INITIATED, COMPLETE, COMPLETED, StateMachines::completeExternalSignal);
1✔
335
  }
336

337
  private static void timeoutChildWorkflow(
338
      RequestContext ctx, ChildWorkflowData data, TimeoutType timeoutType, long notUsed) {
339
    StartChildWorkflowExecutionInitiatedEventAttributes ie = data.initiatedEvent;
1✔
340
    ChildWorkflowExecutionTimedOutEventAttributes a =
1✔
341
        new ChildWorkflowExecutionTimedOutEventAttributes()
342
            .setDomain(ie.getDomain())
1✔
343
            .setStartedEventId(data.startedEventId)
1✔
344
            .setWorkflowExecution(data.execution)
1✔
345
            .setWorkflowType(ie.getWorkflowType())
1✔
346
            .setTimeoutType(timeoutType)
1✔
347
            .setInitiatedEventId(data.initiatedEventId);
1✔
348
    HistoryEvent event =
1✔
349
        new HistoryEvent()
350
            .setEventType(EventType.ChildWorkflowExecutionTimedOut)
1✔
351
            .setChildWorkflowExecutionTimedOutEventAttributes(a);
1✔
352
    ctx.addEvent(event);
1✔
353
  }
1✔
354

355
  private static void startChildWorkflowFailed(
356
      RequestContext ctx,
357
      ChildWorkflowData data,
358
      StartChildWorkflowExecutionFailedEventAttributes a,
359
      long notUsed) {
360
    a.setInitiatedEventId(data.initiatedEventId);
1✔
361
    a.setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
362
    a.setWorkflowId(data.initiatedEvent.getWorkflowId());
1✔
363
    if (data.initiatedEvent.isSetDomain()) {
1✔
364
      a.setDomain(data.initiatedEvent.getDomain());
1✔
365
    }
366
    HistoryEvent event =
1✔
367
        new HistoryEvent()
368
            .setEventType(EventType.StartChildWorkflowExecutionFailed)
1✔
369
            .setStartChildWorkflowExecutionFailedEventAttributes(a);
1✔
370
    ctx.addEvent(event);
1✔
371
  }
1✔
372

373
  private static void childWorkflowStarted(
374
      RequestContext ctx,
375
      ChildWorkflowData data,
376
      ChildWorkflowExecutionStartedEventAttributes a,
377
      long notUsed) {
378
    a.setInitiatedEventId(data.initiatedEventId);
1✔
379
    HistoryEvent event =
1✔
380
        new HistoryEvent()
381
            .setEventType(EventType.ChildWorkflowExecutionStarted)
1✔
382
            .setChildWorkflowExecutionStartedEventAttributes(a);
1✔
383
    long startedEventId = ctx.addEvent(event);
1✔
384
    ctx.onCommit(
1✔
385
        (historySize) -> {
386
          data.startedEventId = startedEventId;
1✔
387
          data.execution = a.getWorkflowExecution();
1✔
388
        });
1✔
389
  }
1✔
390

391
  private static void childWorkflowCompleted(
392
      RequestContext ctx,
393
      ChildWorkflowData data,
394
      ChildWorkflowExecutionCompletedEventAttributes a,
395
      long notUsed) {
396
    a.setInitiatedEventId(data.initiatedEventId).setStartedEventId(data.startedEventId);
1✔
397
    HistoryEvent event =
1✔
398
        new HistoryEvent()
399
            .setEventType(EventType.ChildWorkflowExecutionCompleted)
1✔
400
            .setChildWorkflowExecutionCompletedEventAttributes(a);
1✔
401
    ctx.addEvent(event);
1✔
402
  }
1✔
403

404
  private static void childWorkflowFailed(
405
      RequestContext ctx,
406
      ChildWorkflowData data,
407
      ChildWorkflowExecutionFailedEventAttributes a,
408
      long notUsed) {
409
    a.setInitiatedEventId(data.initiatedEventId);
1✔
410
    a.setStartedEventId(data.startedEventId);
1✔
411
    a.setWorkflowExecution(data.execution);
1✔
412
    a.setWorkflowType(data.initiatedEvent.getWorkflowType());
1✔
413
    if (data.initiatedEvent.domain != null) {
1✔
414
      a.setDomain(data.initiatedEvent.domain);
1✔
415
    }
416
    HistoryEvent event =
1✔
417
        new HistoryEvent()
418
            .setEventType(EventType.ChildWorkflowExecutionFailed)
1✔
419
            .setChildWorkflowExecutionFailedEventAttributes(a);
1✔
420
    ctx.addEvent(event);
1✔
421
  }
1✔
422

423
  private static void childWorkflowCanceled(
424
      RequestContext ctx,
425
      ChildWorkflowData data,
426
      ChildWorkflowExecutionCanceledEventAttributes a,
427
      long notUsed) {
428
    a.setInitiatedEventId(data.initiatedEventId);
1✔
429
    a.setStartedEventId(data.startedEventId);
1✔
430
    HistoryEvent event =
1✔
431
        new HistoryEvent()
432
            .setEventType(EventType.ChildWorkflowExecutionCanceled)
1✔
433
            .setChildWorkflowExecutionCanceledEventAttributes(a);
1✔
434
    ctx.addEvent(event);
1✔
435
  }
1✔
436

437
  private static void initiateChildWorkflow(
438
      RequestContext ctx,
439
      ChildWorkflowData data,
440
      StartChildWorkflowExecutionDecisionAttributes d,
441
      long decisionTaskCompletedEventId) {
442
    StartChildWorkflowExecutionInitiatedEventAttributes a =
1✔
443
        new StartChildWorkflowExecutionInitiatedEventAttributes()
444
            .setControl(d.getControl())
1✔
445
            .setInput(d.getInput())
1✔
446
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId)
1✔
447
            .setDomain(d.getDomain() == null ? ctx.getDomain() : d.getDomain())
1✔
448
            .setExecutionStartToCloseTimeoutSeconds(d.getExecutionStartToCloseTimeoutSeconds())
1✔
449
            .setTaskStartToCloseTimeoutSeconds(d.getTaskStartToCloseTimeoutSeconds())
1✔
450
            .setTaskList(d.getTaskList())
1✔
451
            .setWorkflowId(d.getWorkflowId())
1✔
452
            .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
453
            .setWorkflowType(d.getWorkflowType())
1✔
454
            .setRetryPolicy(d.getRetryPolicy())
1✔
455
            .setCronSchedule(d.getCronSchedule())
1✔
456
            .setHeader(d.getHeader())
1✔
457
            .setParentClosePolicy(d.getParentClosePolicy());
1✔
458
    HistoryEvent event =
1✔
459
        new HistoryEvent()
460
            .setEventType(EventType.StartChildWorkflowExecutionInitiated)
1✔
461
            .setStartChildWorkflowExecutionInitiatedEventAttributes(a);
1✔
462
    long initiatedEventId = ctx.addEvent(event);
1✔
463
    ctx.onCommit(
1✔
464
        (historySize) -> {
465
          data.initiatedEventId = initiatedEventId;
1✔
466
          data.initiatedEvent = a;
1✔
467
          StartWorkflowExecutionRequest startChild =
1✔
468
              new StartWorkflowExecutionRequest()
469
                  .setDomain(d.getDomain() == null ? ctx.getDomain() : d.getDomain())
1✔
470
                  .setExecutionStartToCloseTimeoutSeconds(
1✔
471
                      d.getExecutionStartToCloseTimeoutSeconds())
1✔
472
                  .setTaskStartToCloseTimeoutSeconds(d.getTaskStartToCloseTimeoutSeconds())
1✔
473
                  .setTaskList(d.getTaskList())
1✔
474
                  .setWorkflowId(d.getWorkflowId())
1✔
475
                  .setWorkflowIdReusePolicy(d.getWorkflowIdReusePolicy())
1✔
476
                  .setWorkflowType(d.getWorkflowType())
1✔
477
                  .setRetryPolicy(d.getRetryPolicy())
1✔
478
                  .setCronSchedule(d.getCronSchedule())
1✔
479
                  .setHeader(d.getHeader());
1✔
480
          if (d.isSetInput()) {
1✔
481
            startChild.setInput(d.getInput());
1✔
482
          }
483
          addStartChildTask(ctx, data, initiatedEventId, startChild);
1✔
484
        });
1✔
485
  }
1✔
486

487
  private static void addStartChildTask(
488
      RequestContext ctx,
489
      ChildWorkflowData data,
490
      long initiatedEventId,
491
      StartWorkflowExecutionRequest startChild) {
492
    ForkJoinPool.commonPool()
1✔
493
        .execute(
1✔
494
            () -> {
495
              try {
496
                data.service.startWorkflowExecutionImpl(
1✔
497
                    startChild,
498
                    0,
499
                    Optional.of(ctx.getWorkflowMutableState()),
1✔
500
                    OptionalLong.of(data.initiatedEventId),
1✔
501
                    Optional.empty());
1✔
502
              } catch (WorkflowExecutionAlreadyStartedError workflowExecutionAlreadyStartedError) {
1✔
503
                StartChildWorkflowExecutionFailedEventAttributes failRequest =
1✔
504
                    new StartChildWorkflowExecutionFailedEventAttributes()
505
                        .setInitiatedEventId(initiatedEventId)
1✔
506
                        .setCause(ChildWorkflowExecutionFailedCause.WORKFLOW_ALREADY_RUNNING);
1✔
507
                try {
508
                  ctx.getWorkflowMutableState()
1✔
509
                      .failStartChildWorkflow(data.initiatedEvent.getWorkflowId(), failRequest);
1✔
510
                } catch (Throwable e) {
×
511
                  log.error("Unexpected failure inserting failStart for a child workflow", e);
×
512
                }
1✔
513
              } catch (Exception e) {
×
514
                log.error("Unexpected failure starting a child workflow", e);
×
515
              }
1✔
516
            });
1✔
517
  }
1✔
518

519
  private static void startWorkflow(
520
      RequestContext ctx, WorkflowData data, StartWorkflowExecutionRequest request, long notUsed)
521
      throws BadRequestError {
522
    WorkflowExecutionStartedEventAttributes a = new WorkflowExecutionStartedEventAttributes();
1✔
523
    if (request.isSetIdentity()) {
1✔
524
      a.setIdentity(request.getIdentity());
1✔
525
    }
526
    if (!request.isSetTaskStartToCloseTimeoutSeconds()) {
1✔
527
      throw new BadRequestError("missing taskStartToCloseTimeoutSeconds");
×
528
    }
529
    a.setTaskStartToCloseTimeoutSeconds(request.getTaskStartToCloseTimeoutSeconds());
1✔
530
    if (!request.isSetWorkflowType()) {
1✔
531
      throw new BadRequestError("missing workflowType");
×
532
    }
533
    a.setWorkflowType(request.getWorkflowType());
1✔
534
    if (!request.isSetTaskList()) {
1✔
535
      throw new BadRequestError("missing taskList");
×
536
    }
537
    a.setTaskList(request.getTaskList());
1✔
538
    if (!request.isSetExecutionStartToCloseTimeoutSeconds()) {
1✔
539
      throw new BadRequestError("missing executionStartToCloseTimeoutSeconds");
×
540
    }
541
    a.setExecutionStartToCloseTimeoutSeconds(request.getExecutionStartToCloseTimeoutSeconds());
1✔
542
    if (request.isSetInput()) {
1✔
543
      a.setInput(request.getInput());
1✔
544
    }
545
    if (data.retryState.isPresent()) {
1✔
546
      a.setAttempt(data.retryState.get().getAttempt());
1✔
547
    }
548
    a.setOriginalExecutionRunId(data.originalExecutionRunId);
1✔
549
    if (data.continuedExecutionRunId.isPresent()) {
1✔
550
      a.setContinuedExecutionRunId(data.continuedExecutionRunId.get());
1✔
551
    }
552
    a.setCronSchedule(data.cronSchedule);
1✔
553
    a.setLastCompletionResult(data.lastCompletionResult);
1✔
554
    a.setMemo(request.getMemo());
1✔
555
    a.setSearchAttributes((request.getSearchAttributes()));
1✔
556
    a.setHeader(request.getHeader());
1✔
557
    Optional<TestWorkflowMutableState> parent = ctx.getWorkflowMutableState().getParent();
1✔
558
    if (parent.isPresent()) {
1✔
559
      ExecutionId parentExecutionId = parent.get().getExecutionId();
1✔
560
      a.setParentWorkflowDomain(parentExecutionId.getDomain());
1✔
561
      a.setParentWorkflowExecution(parentExecutionId.getExecution());
1✔
562
    }
563
    HistoryEvent event =
1✔
564
        new HistoryEvent()
565
            .setEventType(EventType.WorkflowExecutionStarted)
1✔
566
            .setWorkflowExecutionStartedEventAttributes(a);
1✔
567
    ctx.addEvent(event);
1✔
568
  }
1✔
569

570
  private static void completeWorkflow(
571
      RequestContext ctx,
572
      WorkflowData data,
573
      CompleteWorkflowExecutionDecisionAttributes d,
574
      long decisionTaskCompletedEventId) {
575
    WorkflowExecutionCompletedEventAttributes a =
1✔
576
        new WorkflowExecutionCompletedEventAttributes()
577
            .setResult(d.getResult())
1✔
578
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
579
    HistoryEvent event =
1✔
580
        new HistoryEvent()
581
            .setEventType(EventType.WorkflowExecutionCompleted)
1✔
582
            .setWorkflowExecutionCompletedEventAttributes(a);
1✔
583
    ctx.addEvent(event);
1✔
584
  }
1✔
585

586
  private static void continueAsNewWorkflow(
587
      RequestContext ctx,
588
      WorkflowData data,
589
      ContinueAsNewWorkflowExecutionDecisionAttributes d,
590
      long decisionTaskCompletedEventId) {
591
    StartWorkflowExecutionRequest sr = ctx.getWorkflowMutableState().getStartRequest();
1✔
592
    WorkflowExecutionContinuedAsNewEventAttributes a =
1✔
593
        new WorkflowExecutionContinuedAsNewEventAttributes();
594
    a.setInput(d.getInput());
1✔
595
    if (d.isSetExecutionStartToCloseTimeoutSeconds()) {
1✔
596
      a.setExecutionStartToCloseTimeoutSeconds(d.getExecutionStartToCloseTimeoutSeconds());
1✔
597
    } else {
598
      a.setExecutionStartToCloseTimeoutSeconds(sr.getExecutionStartToCloseTimeoutSeconds());
×
599
    }
600
    if (d.isSetTaskList()) {
1✔
601
      a.setTaskList(d.getTaskList());
1✔
602
    } else {
603
      a.setTaskList(sr.getTaskList());
×
604
    }
605
    if (d.isSetWorkflowType()) {
1✔
606
      a.setWorkflowType(d.getWorkflowType());
1✔
607
    } else {
608
      a.setWorkflowType(sr.getWorkflowType());
×
609
    }
610
    if (d.isSetTaskStartToCloseTimeoutSeconds()) {
1✔
611
      a.setTaskStartToCloseTimeoutSeconds(d.getTaskStartToCloseTimeoutSeconds());
1✔
612
    } else {
613
      a.setTaskStartToCloseTimeoutSeconds(sr.getTaskStartToCloseTimeoutSeconds());
×
614
    }
615
    a.setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
616
    a.setBackoffStartIntervalInSeconds(d.getBackoffStartIntervalInSeconds());
1✔
617
    a.setLastCompletionResult(d.getLastCompletionResult());
1✔
618
    HistoryEvent event =
1✔
619
        new HistoryEvent()
620
            .setEventType(EventType.WorkflowExecutionContinuedAsNew)
1✔
621
            .setWorkflowExecutionContinuedAsNewEventAttributes(a);
1✔
622
    ctx.addEvent(event);
1✔
623
  }
1✔
624

625
  private static void failWorkflow(
626
      RequestContext ctx,
627
      WorkflowData data,
628
      FailWorkflowExecutionDecisionAttributes d,
629
      long decisionTaskCompletedEventId) {
630
    WorkflowExecutionFailedEventAttributes a =
1✔
631
        new WorkflowExecutionFailedEventAttributes()
632
            .setReason(d.getReason())
1✔
633
            .setDetails(d.getDetails())
1✔
634
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
635
    HistoryEvent event =
1✔
636
        new HistoryEvent()
637
            .setEventType(EventType.WorkflowExecutionFailed)
1✔
638
            .setWorkflowExecutionFailedEventAttributes(a);
1✔
639
    ctx.addEvent(event);
1✔
640
  }
1✔
641

642
  private static void timeoutWorkflow(
643
      RequestContext ctx, WorkflowData data, TimeoutType timeoutType, long notUsed) {
644
    WorkflowExecutionTimedOutEventAttributes a =
1✔
645
        new WorkflowExecutionTimedOutEventAttributes().setTimeoutType(timeoutType);
1✔
646
    HistoryEvent event =
1✔
647
        new HistoryEvent()
648
            .setEventType(EventType.WorkflowExecutionTimedOut)
1✔
649
            .setWorkflowExecutionTimedOutEventAttributes(a);
1✔
650
    ctx.addEvent(event);
1✔
651
  }
1✔
652

653
  private static void cancelWorkflow(
654
      RequestContext ctx,
655
      WorkflowData data,
656
      CancelWorkflowExecutionDecisionAttributes d,
657
      long decisionTaskCompletedEventId) {
658
    WorkflowExecutionCanceledEventAttributes a =
1✔
659
        new WorkflowExecutionCanceledEventAttributes()
660
            .setDetails(d.getDetails())
1✔
661
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
662
    HistoryEvent event =
1✔
663
        new HistoryEvent()
664
            .setEventType(EventType.WorkflowExecutionCanceled)
1✔
665
            .setWorkflowExecutionCanceledEventAttributes(a);
1✔
666
    ctx.addEvent(event);
1✔
667
  }
1✔
668

669
  private static void requestWorkflowCancellation(
670
      RequestContext ctx,
671
      WorkflowData data,
672
      RequestCancelWorkflowExecutionRequest cancelRequest,
673
      long notUsed) {
674
    WorkflowExecutionCancelRequestedEventAttributes a =
1✔
675
        new WorkflowExecutionCancelRequestedEventAttributes()
676
            .setIdentity(cancelRequest.getIdentity());
1✔
677
    HistoryEvent cancelRequested =
1✔
678
        new HistoryEvent()
679
            .setEventType(EventType.WorkflowExecutionCancelRequested)
1✔
680
            .setWorkflowExecutionCancelRequestedEventAttributes(a);
1✔
681
    ctx.addEvent(cancelRequested);
1✔
682
  }
1✔
683

684
  private static void scheduleActivityTask(
685
      RequestContext ctx,
686
      ActivityTaskData data,
687
      ScheduleActivityTaskDecisionAttributes d,
688
      long decisionTaskCompletedEventId)
689
      throws BadRequestError {
690
    int scheduleToCloseTimeoutSeconds = d.getScheduleToCloseTimeoutSeconds();
1✔
691
    int scheduleToStartTimeoutSeconds = d.getScheduleToStartTimeoutSeconds();
1✔
692
    RetryState retryState;
693
    RetryPolicy retryPolicy = d.getRetryPolicy();
1✔
694
    if (retryPolicy != null) {
1✔
695
      long expirationInterval =
1✔
696
          TimeUnit.SECONDS.toMillis(retryPolicy.getExpirationIntervalInSeconds());
1✔
697
      long expirationTime = data.store.currentTimeMillis() + expirationInterval;
1✔
698
      retryState = new RetryState(retryPolicy, expirationTime);
1✔
699
      // Override activity timeouts to allow retry policy to run up to its expiration.
700
      int overriddenTimeout;
701
      if (retryPolicy.getExpirationIntervalInSeconds() > 0) {
1✔
702
        overriddenTimeout = retryPolicy.getExpirationIntervalInSeconds();
1✔
703
      } else {
704
        overriddenTimeout =
1✔
705
            data.startWorkflowExecutionRequest.getExecutionStartToCloseTimeoutSeconds();
1✔
706
      }
707
      scheduleToCloseTimeoutSeconds = overriddenTimeout;
1✔
708
      scheduleToStartTimeoutSeconds = overriddenTimeout;
1✔
709
    } else {
1✔
710
      retryState = null;
1✔
711
    }
712

713
    ActivityTaskScheduledEventAttributes a =
1✔
714
        new ActivityTaskScheduledEventAttributes()
715
            .setInput(d.getInput())
1✔
716
            .setActivityId(d.getActivityId())
1✔
717
            .setActivityType(d.getActivityType())
1✔
718
            .setDomain(d.getDomain() == null ? ctx.getDomain() : d.getDomain())
1✔
719
            .setHeartbeatTimeoutSeconds(d.getHeartbeatTimeoutSeconds())
1✔
720
            .setScheduleToCloseTimeoutSeconds(scheduleToCloseTimeoutSeconds)
1✔
721
            .setScheduleToStartTimeoutSeconds(scheduleToStartTimeoutSeconds)
1✔
722
            .setStartToCloseTimeoutSeconds(d.getStartToCloseTimeoutSeconds())
1✔
723
            .setTaskList(d.getTaskList())
1✔
724
            .setRetryPolicy(retryPolicy)
1✔
725
            .setHeader(d.getHeader())
1✔
726
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
727
    data.scheduledEvent =
1✔
728
        a; // Cannot set it in onCommit as it is used in the processScheduleActivityTask
729
    HistoryEvent event =
1✔
730
        new HistoryEvent()
731
            .setEventType(EventType.ActivityTaskScheduled)
1✔
732
            .setActivityTaskScheduledEventAttributes(a);
1✔
733
    long scheduledEventId = ctx.addEvent(event);
1✔
734

735
    PollForActivityTaskResponse taskResponse =
1✔
736
        new PollForActivityTaskResponse()
737
            .setWorkflowDomain(ctx.getDomain())
1✔
738
            .setWorkflowType(data.startWorkflowExecutionRequest.workflowType)
1✔
739
            .setActivityType(d.getActivityType())
1✔
740
            .setWorkflowExecution(ctx.getExecution())
1✔
741
            .setActivityId(d.getActivityId())
1✔
742
            .setInput(d.getInput())
1✔
743
            .setHeartbeatTimeoutSeconds(d.getHeartbeatTimeoutSeconds())
1✔
744
            .setScheduleToCloseTimeoutSeconds(scheduleToCloseTimeoutSeconds)
1✔
745
            .setStartToCloseTimeoutSeconds(d.getStartToCloseTimeoutSeconds())
1✔
746
            .setScheduledTimestamp(ctx.currentTimeInNanoseconds())
1✔
747
            .setScheduledTimestampOfThisAttempt(ctx.currentTimeInNanoseconds())
1✔
748
            .setHeader(d.getHeader())
1✔
749
            .setAttempt(0);
1✔
750

751
    TaskListId taskListId = new TaskListId(ctx.getDomain(), d.getTaskList().getName());
1✔
752
    ActivityTask activityTask = new ActivityTask(taskListId, taskResponse);
1✔
753
    ctx.addActivityTask(activityTask);
1✔
754
    ctx.onCommit(
1✔
755
        (historySize) -> {
756
          data.scheduledEventId = scheduledEventId;
1✔
757
          data.activityTask = activityTask;
1✔
758
          data.retryState = retryState;
1✔
759
        });
1✔
760
  }
1✔
761

762
  private static void requestActivityCancellation(
763
      RequestContext ctx,
764
      ActivityTaskData data,
765
      RequestCancelActivityTaskDecisionAttributes d,
766
      long decisionTaskCompletedEventId) {
767
    ActivityTaskCancelRequestedEventAttributes a =
1✔
768
        new ActivityTaskCancelRequestedEventAttributes()
769
            .setActivityId(d.getActivityId())
1✔
770
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
771
    HistoryEvent event =
1✔
772
        new HistoryEvent()
773
            .setEventType(EventType.ActivityTaskCancelRequested)
1✔
774
            .setActivityTaskCancelRequestedEventAttributes(a);
1✔
775
    ctx.addEvent(event);
1✔
776
  }
1✔
777

778
  private static void scheduleDecisionTask(
779
      RequestContext ctx,
780
      DecisionTaskData data,
781
      StartWorkflowExecutionRequest request,
782
      long notUsed) {
783
    DecisionTaskScheduledEventAttributes a =
1✔
784
        new DecisionTaskScheduledEventAttributes()
785
            .setStartToCloseTimeoutSeconds(request.getTaskStartToCloseTimeoutSeconds())
1✔
786
            .setTaskList(request.getTaskList())
1✔
787
            .setAttempt(data.attempt);
1✔
788
    HistoryEvent event =
1✔
789
        new HistoryEvent()
790
            .setEventType(EventType.DecisionTaskScheduled)
1✔
791
            .setDecisionTaskScheduledEventAttributes(a);
1✔
792
    long scheduledEventId = ctx.addEvent(event);
1✔
793
    PollForDecisionTaskResponse decisionTaskResponse = new PollForDecisionTaskResponse();
1✔
794
    if (data.previousStartedEventId > 0) {
1✔
795
      decisionTaskResponse.setPreviousStartedEventId(data.previousStartedEventId);
1✔
796
    }
797
    decisionTaskResponse.setWorkflowExecution(ctx.getExecution());
1✔
798
    decisionTaskResponse.setWorkflowType(request.getWorkflowType());
1✔
799
    decisionTaskResponse.setAttempt(data.attempt);
1✔
800
    TaskListId taskListId = new TaskListId(ctx.getDomain(), request.getTaskList().getName());
1✔
801
    DecisionTask decisionTask = new DecisionTask(taskListId, decisionTaskResponse);
1✔
802
    ctx.setDecisionTask(decisionTask);
1✔
803
    ctx.onCommit(
1✔
804
        (historySize) -> {
805
          data.scheduledEventId = scheduledEventId;
1✔
806
          data.decisionTask = decisionTaskResponse;
1✔
807
        });
1✔
808
  }
1✔
809

810
  private static void startDecisionTask(
811
      RequestContext ctx, DecisionTaskData data, PollForDecisionTaskRequest request, long notUsed) {
812
    DecisionTaskStartedEventAttributes a =
1✔
813
        new DecisionTaskStartedEventAttributes()
814
            .setIdentity(request.getIdentity())
1✔
815
            .setScheduledEventId(data.scheduledEventId);
1✔
816
    HistoryEvent event =
1✔
817
        new HistoryEvent()
818
            .setEventType(EventType.DecisionTaskStarted)
1✔
819
            .setDecisionTaskStartedEventAttributes(a);
1✔
820
    long startedEventId = ctx.addEvent(event);
1✔
821
    ctx.onCommit(
1✔
822
        (historySize) -> {
823
          data.decisionTask.setStartedEventId(startedEventId);
1✔
824
          DecisionTaskToken taskToken = new DecisionTaskToken(ctx.getExecutionId(), historySize);
1✔
825
          data.decisionTask.setTaskToken(taskToken.toBytes());
1✔
826
          GetWorkflowExecutionHistoryRequest getRequest =
1✔
827
              new GetWorkflowExecutionHistoryRequest()
828
                  .setDomain(request.getDomain())
1✔
829
                  .setExecution(ctx.getExecution());
1✔
830
          List<HistoryEvent> events;
831
          try {
832
            events =
1✔
833
                data.store
834
                    .getWorkflowExecutionHistory(ctx.getExecutionId(), getRequest)
1✔
835
                    .getHistory()
1✔
836
                    .getEvents();
1✔
837

838
            if (ctx.getWorkflowMutableState().getStickyExecutionAttributes() != null) {
1✔
839
              events = events.subList((int) data.previousStartedEventId, events.size());
1✔
840
            }
841
            // get it from pervious started event id.
842
          } catch (EntityNotExistsError entityNotExistsError) {
×
843
            throw new InternalServiceError(entityNotExistsError.toString());
×
844
          }
1✔
845
          data.decisionTask.setHistory(new History().setEvents(events));
1✔
846
          data.startedEventId = startedEventId;
1✔
847
          data.attempt++;
1✔
848
        });
1✔
849
  }
1✔
850

851
  private static void startActivityTask(
852
      RequestContext ctx, ActivityTaskData data, PollForActivityTaskRequest request, long notUsed) {
853
    ActivityTaskStartedEventAttributes a =
1✔
854
        new ActivityTaskStartedEventAttributes()
855
            .setIdentity(request.getIdentity())
1✔
856
            .setScheduledEventId(data.scheduledEventId);
1✔
857
    if (data.retryState != null) {
1✔
858
      a.setAttempt(data.retryState.getAttempt());
1✔
859
    }
860
    // Setting timestamp here as the default logic will set it to the time when it is added to the
861
    // history. But in the case of retry it happens only after an activity completion.
862
    long timestamp = TimeUnit.MILLISECONDS.toNanos(data.store.currentTimeMillis());
1✔
863
    HistoryEvent event =
1✔
864
        new HistoryEvent()
865
            .setEventType(EventType.ActivityTaskStarted)
1✔
866
            .setTimestamp(timestamp)
1✔
867
            .setActivityTaskStartedEventAttributes(a);
1✔
868
    long startedEventId;
869
    if (data.retryState == null) {
1✔
870
      startedEventId = ctx.addEvent(event);
1✔
871
    } else {
872
      startedEventId = NO_EVENT_ID;
1✔
873
    }
874
    ctx.onCommit(
1✔
875
        (historySize) -> {
876
          data.startedEventId = startedEventId;
1✔
877
          data.startedEvent = event;
1✔
878
          PollForActivityTaskResponse task = data.activityTask.getTask();
1✔
879
          task.setTaskToken(new ActivityId(ctx.getExecutionId(), task.getActivityId()).toBytes());
1✔
880
          task.setStartedTimestamp(timestamp);
1✔
881
        });
1✔
882
  }
1✔
883

884
  private static void completeDecisionTask(
885
      RequestContext ctx,
886
      DecisionTaskData data,
887
      RespondDecisionTaskCompletedRequest request,
888
      long notUsed) {
889
    DecisionTaskCompletedEventAttributes a =
1✔
890
        new DecisionTaskCompletedEventAttributes()
891
            .setIdentity(request.getIdentity())
1✔
892
            .setScheduledEventId(data.scheduledEventId);
1✔
893
    HistoryEvent event =
1✔
894
        new HistoryEvent()
895
            .setEventType(EventType.DecisionTaskCompleted)
1✔
896
            .setDecisionTaskCompletedEventAttributes(a);
1✔
897
    ctx.addEvent(event);
1✔
898
    ctx.onCommit((historySize) -> data.attempt = 0);
1✔
899
  }
1✔
900

901
  private static void failDecisionTask(
902
      RequestContext ctx,
903
      DecisionTaskData data,
904
      RespondDecisionTaskFailedRequest request,
905
      long notUsed) {
906
    DecisionTaskFailedEventAttributes a =
1✔
907
        new DecisionTaskFailedEventAttributes()
908
            .setIdentity(request.getIdentity())
1✔
909
            .setCause(request.getCause())
1✔
910
            .setDetails(request.getDetails())
1✔
911
            .setStartedEventId(data.startedEventId)
1✔
912
            .setScheduledEventId(data.scheduledEventId);
1✔
913
    HistoryEvent event =
1✔
914
        new HistoryEvent()
915
            .setEventType(EventType.DecisionTaskFailed)
1✔
916
            .setDecisionTaskFailedEventAttributes(a);
1✔
917
    ctx.addEvent(event);
1✔
918
  }
1✔
919

920
  private static void timeoutDecisionTask(
921
      RequestContext ctx, DecisionTaskData data, Object ignored, long notUsed) {
922
    DecisionTaskTimedOutEventAttributes a =
1✔
923
        new DecisionTaskTimedOutEventAttributes()
924
            .setStartedEventId(data.startedEventId)
1✔
925
            .setTimeoutType(TimeoutType.START_TO_CLOSE)
1✔
926
            .setScheduledEventId(data.scheduledEventId);
1✔
927
    HistoryEvent event =
1✔
928
        new HistoryEvent()
929
            .setEventType(EventType.DecisionTaskTimedOut)
1✔
930
            .setDecisionTaskTimedOutEventAttributes(a);
1✔
931
    ctx.addEvent(event);
1✔
932
  }
1✔
933

934
  private static void completeActivityTask(
935
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
936
    if (data.retryState != null) {
1✔
937
      ctx.addEvent(data.startedEvent);
1✔
938
    }
939
    if (request instanceof RespondActivityTaskCompletedRequest) {
1✔
940
      completeActivityTaskByTaskToken(ctx, data, (RespondActivityTaskCompletedRequest) request);
1✔
941
    } else if (request instanceof RespondActivityTaskCompletedByIDRequest) {
1✔
942
      completeActivityTaskById(ctx, data, (RespondActivityTaskCompletedByIDRequest) request);
1✔
943
    } else {
944
      throw new IllegalArgumentException("Unknown request: " + request);
×
945
    }
946
  }
1✔
947

948
  private static void completeActivityTaskByTaskToken(
949
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedRequest request) {
950
    ActivityTaskCompletedEventAttributes a =
1✔
951
        new ActivityTaskCompletedEventAttributes()
952
            .setIdentity(request.getIdentity())
1✔
953
            .setScheduledEventId(data.scheduledEventId)
1✔
954
            .setResult(request.getResult())
1✔
955
            .setIdentity(request.getIdentity())
1✔
956
            .setStartedEventId(data.startedEventId);
1✔
957
    HistoryEvent event =
1✔
958
        new HistoryEvent()
959
            .setEventType(EventType.ActivityTaskCompleted)
1✔
960
            .setActivityTaskCompletedEventAttributes(a);
1✔
961
    ctx.addEvent(event);
1✔
962
  }
1✔
963

964
  private static void completeActivityTaskById(
965
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskCompletedByIDRequest request) {
966
    ActivityTaskCompletedEventAttributes a =
1✔
967
        new ActivityTaskCompletedEventAttributes()
968
            .setIdentity(request.getIdentity())
1✔
969
            .setScheduledEventId(data.scheduledEventId)
1✔
970
            .setResult(request.getResult())
1✔
971
            .setIdentity(request.getIdentity())
1✔
972
            .setStartedEventId(data.startedEventId);
1✔
973
    HistoryEvent event =
1✔
974
        new HistoryEvent()
975
            .setEventType(EventType.ActivityTaskCompleted)
1✔
976
            .setActivityTaskCompletedEventAttributes(a);
1✔
977
    ctx.addEvent(event);
1✔
978
  }
1✔
979

980
  private static State failActivityTask(
981
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
982
    if (request instanceof RespondActivityTaskFailedRequest) {
1✔
983
      return failActivityTaskByTaskToken(ctx, data, (RespondActivityTaskFailedRequest) request);
1✔
984
    } else if (request instanceof RespondActivityTaskFailedByIDRequest) {
×
985
      return failActivityTaskById(ctx, data, (RespondActivityTaskFailedByIDRequest) request);
×
986
    } else {
987
      throw new IllegalArgumentException("Unknown request: " + request);
×
988
    }
989
  }
990

991
  private static State failActivityTaskByTaskToken(
992
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskFailedRequest request) {
993
    if (attemptActivityRetry(ctx, request.getReason(), data)) {
1✔
994
      return INITIATED;
1✔
995
    }
996
    ActivityTaskFailedEventAttributes a =
1✔
997
        new ActivityTaskFailedEventAttributes()
998
            .setIdentity(request.getIdentity())
1✔
999
            .setScheduledEventId(data.scheduledEventId)
1✔
1000
            .setDetails(request.getDetails())
1✔
1001
            .setReason(request.getReason())
1✔
1002
            .setIdentity(request.getIdentity())
1✔
1003
            .setStartedEventId(data.startedEventId);
1✔
1004
    HistoryEvent event =
1✔
1005
        new HistoryEvent()
1006
            .setEventType(EventType.ActivityTaskFailed)
1✔
1007
            .setActivityTaskFailedEventAttributes(a);
1✔
1008
    ctx.addEvent(event);
1✔
1009
    return FAILED;
1✔
1010
  }
1011

1012
  private static State failActivityTaskById(
1013
      RequestContext ctx, ActivityTaskData data, RespondActivityTaskFailedByIDRequest request) {
1014
    if (attemptActivityRetry(ctx, request.getReason(), data)) {
×
1015
      return INITIATED;
×
1016
    }
1017
    ActivityTaskFailedEventAttributes a =
×
1018
        new ActivityTaskFailedEventAttributes()
1019
            .setIdentity(request.getIdentity())
×
1020
            .setScheduledEventId(data.scheduledEventId)
×
1021
            .setDetails(request.getDetails())
×
1022
            .setReason(request.getReason())
×
1023
            .setIdentity(request.getIdentity())
×
1024
            .setStartedEventId(data.startedEventId);
×
1025
    HistoryEvent event =
×
1026
        new HistoryEvent()
1027
            .setEventType(EventType.ActivityTaskFailed)
×
1028
            .setActivityTaskFailedEventAttributes(a);
×
1029
    ctx.addEvent(event);
×
1030
    return FAILED;
×
1031
  }
1032

1033
  private static State timeoutActivityTask(
1034
      RequestContext ctx, ActivityTaskData data, TimeoutType timeoutType, long notUsed) {
1035
    // ScheduleToStart (queue timeout) is not retriable. Instead of the retry, a customer should set
1036
    // a larger ScheduleToStart timeout.
1037
    if (timeoutType != TimeoutType.SCHEDULE_TO_START
1✔
1038
        && attemptActivityRetry(ctx, TIMEOUT_ERROR_REASON, data)) {
1✔
1039
      return INITIATED;
1✔
1040
    }
1041
    ActivityTaskTimedOutEventAttributes a =
1✔
1042
        new ActivityTaskTimedOutEventAttributes()
1043
            .setScheduledEventId(data.scheduledEventId)
1✔
1044
            .setDetails(data.heartbeatDetails)
1✔
1045
            .setTimeoutType(timeoutType)
1✔
1046
            .setStartedEventId(data.startedEventId);
1✔
1047
    HistoryEvent event =
1✔
1048
        new HistoryEvent()
1049
            .setEventType(EventType.ActivityTaskTimedOut)
1✔
1050
            .setActivityTaskTimedOutEventAttributes(a);
1✔
1051
    ctx.addEvent(event);
1✔
1052
    return TIMED_OUT;
1✔
1053
  }
1054

1055
  private static boolean attemptActivityRetry(
1056
      RequestContext ctx, String errorReason, ActivityTaskData data) {
1057
    if (data.retryState != null) {
1✔
1058
      RetryState nextAttempt = data.retryState.getNextAttempt();
1✔
1059
      data.nextBackoffIntervalSeconds =
1✔
1060
          data.retryState.getBackoffIntervalInSeconds(errorReason, data.store.currentTimeMillis());
1✔
1061
      if (data.nextBackoffIntervalSeconds > 0) {
1✔
1062
        PollForActivityTaskResponse task = data.activityTask.getTask();
1✔
1063
        task.setHeartbeatDetails(data.heartbeatDetails);
1✔
1064
        ctx.onCommit(
1✔
1065
            (historySize) -> {
1066
              data.retryState = nextAttempt;
1✔
1067
              task.setAttempt(nextAttempt.getAttempt());
1✔
1068
              task.setScheduledTimestampOfThisAttempt(ctx.currentTimeInNanoseconds());
1✔
1069
            });
1✔
1070
        return true;
1✔
1071
      } else {
1072
        data.startedEventId = ctx.addEvent(data.startedEvent);
1✔
1073
      }
1074
    }
1075
    return false;
1✔
1076
  }
1077

1078
  private static void reportActivityTaskCancellation(
1079
      RequestContext ctx, ActivityTaskData data, Object request, long notUsed) {
1080
    byte[] details = null;
1✔
1081
    if (request instanceof RespondActivityTaskCanceledRequest) {
1✔
1082
      details = ((RespondActivityTaskCanceledRequest) request).getDetails();
1✔
1083
    } else if (request instanceof RespondActivityTaskCanceledByIDRequest) {
×
1084
      details = ((RespondActivityTaskCanceledByIDRequest) request).getDetails();
×
1085
    }
1086
    ActivityTaskCanceledEventAttributes a =
1✔
1087
        new ActivityTaskCanceledEventAttributes()
1088
            .setScheduledEventId(data.scheduledEventId)
1✔
1089
            .setStartedEventId(data.startedEventId);
1✔
1090
    if (details != null) {
1✔
1091
      a.setDetails(details);
1✔
1092
    }
1093
    HistoryEvent event =
1✔
1094
        new HistoryEvent()
1095
            .setEventType(EventType.ActivityTaskCanceled)
1✔
1096
            .setActivityTaskCanceledEventAttributes(a);
1✔
1097
    ctx.addEvent(event);
1✔
1098
  }
1✔
1099

1100
  private static void heartbeatActivityTask(
1101
      RequestContext nullCtx, ActivityTaskData data, byte[] details, long notUsed) {
1102
    data.heartbeatDetails = details;
1✔
1103
  }
1✔
1104

1105
  private static void startTimer(
1106
      RequestContext ctx,
1107
      TimerData data,
1108
      StartTimerDecisionAttributes d,
1109
      long decisionTaskCompletedEventId) {
1110
    TimerStartedEventAttributes a =
1✔
1111
        new TimerStartedEventAttributes()
1112
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId)
1✔
1113
            .setStartToFireTimeoutSeconds(d.getStartToFireTimeoutSeconds())
1✔
1114
            .setTimerId(d.getTimerId());
1✔
1115
    HistoryEvent event =
1✔
1116
        new HistoryEvent().setEventType(EventType.TimerStarted).setTimerStartedEventAttributes(a);
1✔
1117
    long startedEventId = ctx.addEvent(event);
1✔
1118
    ctx.onCommit(
1✔
1119
        (historySize) -> {
1120
          data.startedEvent = a;
1✔
1121
          data.startedEventId = startedEventId;
1✔
1122
        });
1✔
1123
  }
1✔
1124

1125
  private static void fireTimer(RequestContext ctx, TimerData data, Object ignored, long notUsed) {
1126
    TimerFiredEventAttributes a =
1✔
1127
        new TimerFiredEventAttributes()
1128
            .setTimerId(data.startedEvent.getTimerId())
1✔
1129
            .setStartedEventId(data.startedEventId);
1✔
1130
    HistoryEvent event =
1✔
1131
        new HistoryEvent().setEventType(EventType.TimerFired).setTimerFiredEventAttributes(a);
1✔
1132
    ctx.addEvent(event);
1✔
1133
  }
1✔
1134

1135
  private static void cancelTimer(
1136
      RequestContext ctx,
1137
      TimerData data,
1138
      CancelTimerDecisionAttributes d,
1139
      long decisionTaskCompletedEventId) {
1140
    TimerCanceledEventAttributes a =
1✔
1141
        new TimerCanceledEventAttributes()
1142
            .setDecisionTaskCompletedEventId(decisionTaskCompletedEventId)
1✔
1143
            .setTimerId(d.getTimerId())
1✔
1144
            .setStartedEventId(data.startedEventId);
1✔
1145
    HistoryEvent event =
1✔
1146
        new HistoryEvent().setEventType(EventType.TimerCanceled).setTimerCanceledEventAttributes(a);
1✔
1147
    ctx.addEvent(event);
1✔
1148
  }
1✔
1149

1150
  private static void initiateExternalSignal(
1151
      RequestContext ctx,
1152
      SignalExternalData data,
1153
      SignalExternalWorkflowExecutionDecisionAttributes d,
1154
      long decisionTaskCompletedEventId) {
1155
    SignalExternalWorkflowExecutionInitiatedEventAttributes a =
1✔
1156
        new SignalExternalWorkflowExecutionInitiatedEventAttributes();
1157
    a.setDecisionTaskCompletedEventId(decisionTaskCompletedEventId);
1✔
1158
    if (d.isSetControl()) {
1✔
1159
      a.setControl(d.getControl());
1✔
1160
    }
1161
    if (d.isSetInput()) {
1✔
1162
      a.setInput(d.getInput());
1✔
1163
    }
1164
    if (d.isSetDomain()) {
1✔
1165
      a.setDomain(d.getDomain());
1✔
1166
    }
1167
    if (d.isSetChildWorkflowOnly()) {
1✔
1168
      a.setChildWorkflowOnly(d.isChildWorkflowOnly());
×
1169
    }
1170
    a.setSignalName(d.getSignalName());
1✔
1171
    a.setWorkflowExecution(d.getExecution());
1✔
1172
    HistoryEvent event =
1✔
1173
        new HistoryEvent()
1174
            .setEventType(EventType.SignalExternalWorkflowExecutionInitiated)
1✔
1175
            .setSignalExternalWorkflowExecutionInitiatedEventAttributes(a);
1✔
1176
    long initiatedEventId = ctx.addEvent(event);
1✔
1177
    ctx.onCommit(
1✔
1178
        (historySize) -> {
1179
          data.initiatedEventId = initiatedEventId;
1✔
1180
          data.initiatedEvent = a;
1✔
1181
        });
1✔
1182
  }
1✔
1183

1184
  private static void failExternalSignal(
1185
      RequestContext ctx,
1186
      SignalExternalData data,
1187
      SignalExternalWorkflowExecutionFailedCause cause,
1188
      long notUsed) {
1189
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1190
    SignalExternalWorkflowExecutionFailedEventAttributes a =
1✔
1191
        new SignalExternalWorkflowExecutionFailedEventAttributes()
1192
            .setInitiatedEventId(data.initiatedEventId)
1✔
1193
            .setWorkflowExecution(initiatedEvent.getWorkflowExecution())
1✔
1194
            .setControl(initiatedEvent.getControl())
1✔
1195
            .setCause(cause)
1✔
1196
            .setDomain(initiatedEvent.getDomain());
1✔
1197
    HistoryEvent event =
1✔
1198
        new HistoryEvent()
1199
            .setEventType(EventType.SignalExternalWorkflowExecutionFailed)
1✔
1200
            .setSignalExternalWorkflowExecutionFailedEventAttributes(a);
1✔
1201
    ctx.addEvent(event);
1✔
1202
  }
1✔
1203

1204
  private static void completeExternalSignal(
1205
      RequestContext ctx, SignalExternalData data, String runId, long notUsed) {
1206
    SignalExternalWorkflowExecutionInitiatedEventAttributes initiatedEvent = data.initiatedEvent;
1✔
1207
    WorkflowExecution signaledExecution =
1✔
1208
        initiatedEvent.getWorkflowExecution().deepCopy().setRunId(runId);
1✔
1209
    ExternalWorkflowExecutionSignaledEventAttributes a =
1✔
1210
        new ExternalWorkflowExecutionSignaledEventAttributes()
1211
            .setInitiatedEventId(data.initiatedEventId)
1✔
1212
            .setWorkflowExecution(signaledExecution)
1✔
1213
            .setControl(initiatedEvent.getControl())
1✔
1214
            .setDomain(initiatedEvent.getDomain());
1✔
1215
    HistoryEvent event =
1✔
1216
        new HistoryEvent()
1217
            .setEventType(EventType.ExternalWorkflowExecutionSignaled)
1✔
1218
            .setExternalWorkflowExecutionSignaledEventAttributes(a);
1✔
1219
    ctx.addEvent(event);
1✔
1220
  }
1✔
1221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc