• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2594

04 Nov 2024 05:49PM UTC coverage: 76.079% (+0.03%) from 76.054%
2594

Pull #937

buildkite

natemort
Fix flakiness in ManualActivityCompletionWorkflowTest

The operations here are flaky because we're starting an activity and then attempting to cancel/complete/fail it in parallel. We need the second activity to block until the first one has started, and there's no way to orchestrate that within the workflow. The best we can do is make the acitivites block until the ActivityTask is available.
Pull Request #937: Fix flakiness in ManualActivityCompletionWorkflowTest

14748 of 19385 relevant lines covered (76.08%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.33
/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.testservice;
19

20
import com.cronutils.model.Cron;
21
import com.cronutils.model.CronType;
22
import com.cronutils.model.definition.CronDefinition;
23
import com.cronutils.model.definition.CronDefinitionBuilder;
24
import com.cronutils.model.time.ExecutionTime;
25
import com.cronutils.parser.CronParser;
26
import com.google.common.base.Strings;
27
import com.google.common.base.Throwables;
28
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
29
import com.uber.cadence.BadRequestError;
30
import com.uber.cadence.CancelTimerDecisionAttributes;
31
import com.uber.cadence.CancelTimerFailedEventAttributes;
32
import com.uber.cadence.CancelWorkflowExecutionDecisionAttributes;
33
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
34
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
35
import com.uber.cadence.ChildWorkflowExecutionFailedEventAttributes;
36
import com.uber.cadence.ChildWorkflowExecutionStartedEventAttributes;
37
import com.uber.cadence.ChildWorkflowExecutionTimedOutEventAttributes;
38
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
39
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
40
import com.uber.cadence.Decision;
41
import com.uber.cadence.DecisionTaskFailedCause;
42
import com.uber.cadence.EntityNotExistsError;
43
import com.uber.cadence.EventType;
44
import com.uber.cadence.FailWorkflowExecutionDecisionAttributes;
45
import com.uber.cadence.HistoryEvent;
46
import com.uber.cadence.InternalServiceError;
47
import com.uber.cadence.MarkerRecordedEventAttributes;
48
import com.uber.cadence.PollForActivityTaskRequest;
49
import com.uber.cadence.PollForActivityTaskResponse;
50
import com.uber.cadence.PollForDecisionTaskRequest;
51
import com.uber.cadence.PollForDecisionTaskResponse;
52
import com.uber.cadence.QueryConsistencyLevel;
53
import com.uber.cadence.QueryFailedError;
54
import com.uber.cadence.QueryRejectCondition;
55
import com.uber.cadence.QueryRejected;
56
import com.uber.cadence.QueryResultType;
57
import com.uber.cadence.QueryTaskCompletedType;
58
import com.uber.cadence.QueryWorkflowRequest;
59
import com.uber.cadence.QueryWorkflowResponse;
60
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
61
import com.uber.cadence.RecordMarkerDecisionAttributes;
62
import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes;
63
import com.uber.cadence.RequestCancelActivityTaskFailedEventAttributes;
64
import com.uber.cadence.RequestCancelExternalWorkflowExecutionDecisionAttributes;
65
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
66
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
67
import com.uber.cadence.RespondActivityTaskCanceledRequest;
68
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
69
import com.uber.cadence.RespondActivityTaskCompletedRequest;
70
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
71
import com.uber.cadence.RespondActivityTaskFailedRequest;
72
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
73
import com.uber.cadence.RespondDecisionTaskFailedRequest;
74
import com.uber.cadence.RespondQueryTaskCompletedRequest;
75
import com.uber.cadence.RetryPolicy;
76
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
77
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
78
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
79
import com.uber.cadence.SignalWorkflowExecutionRequest;
80
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
81
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
82
import com.uber.cadence.StartTimerDecisionAttributes;
83
import com.uber.cadence.StartWorkflowExecutionRequest;
84
import com.uber.cadence.StickyExecutionAttributes;
85
import com.uber.cadence.TimeoutType;
86
import com.uber.cadence.UpsertWorkflowSearchAttributesDecisionAttributes;
87
import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes;
88
import com.uber.cadence.WorkflowExecution;
89
import com.uber.cadence.WorkflowExecutionAlreadyCompletedError;
90
import com.uber.cadence.WorkflowExecutionCloseStatus;
91
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
92
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
93
import com.uber.cadence.WorkflowQuery;
94
import com.uber.cadence.WorkflowQueryResult;
95
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
96
import com.uber.cadence.internal.testservice.StateMachines.Action;
97
import com.uber.cadence.internal.testservice.StateMachines.ActivityTaskData;
98
import com.uber.cadence.internal.testservice.StateMachines.ChildWorkflowData;
99
import com.uber.cadence.internal.testservice.StateMachines.DecisionTaskData;
100
import com.uber.cadence.internal.testservice.StateMachines.SignalExternalData;
101
import com.uber.cadence.internal.testservice.StateMachines.State;
102
import com.uber.cadence.internal.testservice.StateMachines.TimerData;
103
import com.uber.cadence.internal.testservice.StateMachines.WorkflowData;
104
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
105
import java.io.ByteArrayInputStream;
106
import java.io.ByteArrayOutputStream;
107
import java.io.DataInputStream;
108
import java.io.DataOutputStream;
109
import java.io.IOException;
110
import java.time.Duration;
111
import java.time.Instant;
112
import java.time.ZoneOffset;
113
import java.time.ZonedDateTime;
114
import java.util.ArrayList;
115
import java.util.HashMap;
116
import java.util.List;
117
import java.util.Map;
118
import java.util.Objects;
119
import java.util.Optional;
120
import java.util.OptionalLong;
121
import java.util.UUID;
122
import java.util.concurrent.CompletableFuture;
123
import java.util.concurrent.ConcurrentHashMap;
124
import java.util.concurrent.ExecutionException;
125
import java.util.concurrent.ForkJoinPool;
126
import java.util.concurrent.TimeUnit;
127
import java.util.concurrent.locks.Lock;
128
import java.util.concurrent.locks.ReentrantLock;
129
import java.util.function.LongSupplier;
130
import org.apache.thrift.TException;
131
import org.slf4j.Logger;
132
import org.slf4j.LoggerFactory;
133

134
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
135

136
  @FunctionalInterface
137
  private interface UpdateProcedure {
138

139
    void apply(RequestContext ctx)
140
        throws InternalServiceError, BadRequestError, EntityNotExistsError;
141
  }
142

143
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowMutableStateImpl.class);
1✔
144

145
  private final Lock lock = new ReentrantLock();
1✔
146
  private final SelfAdvancingTimer selfAdvancingTimer;
147
  private final LongSupplier clock;
148
  private final ExecutionId executionId;
149
  private final Optional<TestWorkflowMutableState> parent;
150
  private final OptionalLong parentChildInitiatedEventId;
151
  private final TestWorkflowStore store;
152
  private final TestWorkflowService service;
153
  private final StartWorkflowExecutionRequest startRequest;
154
  private long nextEventId;
155
  private final List<RequestContext> concurrentToDecision = new ArrayList<>();
1✔
156
  private final Map<String, StateMachine<ActivityTaskData>> activities = new HashMap<>();
1✔
157
  private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
1✔
158
  private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
1✔
159
  private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
1✔
160
  private StateMachine<WorkflowData> workflow;
161
  private volatile StateMachine<DecisionTaskData> decision;
162
  private long lastNonFailedDecisionStartEventId;
163
  private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
1✔
164
      new ConcurrentHashMap<>();
165
  private final Map<String, PollForDecisionTaskResponse> queryRequests = new ConcurrentHashMap<>();
1✔
166
  private final Map<String, WorkflowQuery> pendingQueries = new ConcurrentHashMap<>();
1✔
167
  private final Optional<String> continuedExecutionRunId;
168
  public StickyExecutionAttributes stickyExecutionAttributes;
169

170
  /**
171
   * @param retryState present if workflow is a retry
172
   * @param backoffStartIntervalInSeconds
173
   * @param parentChildInitiatedEventId id of the child initiated event in the parent history
174
   */
175
  TestWorkflowMutableStateImpl(
176
      StartWorkflowExecutionRequest startRequest,
177
      Optional<RetryState> retryState,
178
      int backoffStartIntervalInSeconds,
179
      byte[] lastCompletionResult,
180
      Optional<TestWorkflowMutableState> parent,
181
      OptionalLong parentChildInitiatedEventId,
182
      Optional<String> continuedExecutionRunId,
183
      TestWorkflowService service,
184
      TestWorkflowStore store) {
1✔
185
    this.startRequest = startRequest;
1✔
186
    this.parent = parent;
1✔
187
    this.parentChildInitiatedEventId = parentChildInitiatedEventId;
1✔
188
    this.continuedExecutionRunId = continuedExecutionRunId;
1✔
189
    this.service = service;
1✔
190
    String runId = UUID.randomUUID().toString();
1✔
191
    this.executionId =
1✔
192
        new ExecutionId(startRequest.getDomain(), startRequest.getWorkflowId(), runId);
1✔
193
    this.store = store;
1✔
194
    selfAdvancingTimer = store.getTimer();
1✔
195
    this.clock = selfAdvancingTimer.getClock();
1✔
196
    WorkflowData data =
1✔
197
        new WorkflowData(
198
            retryState,
199
            backoffStartIntervalInSeconds,
200
            startRequest.getCronSchedule(),
1✔
201
            lastCompletionResult,
202
            runId, // Test service doesn't support reset. Thus originalRunId is always the same as
203
            // runId.
204
            continuedExecutionRunId);
205
    this.workflow = StateMachines.newWorkflowStateMachine(data);
1✔
206
  }
1✔
207

208
  private void update(UpdateProcedure updater)
209
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
210
          BadRequestError {
211
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
212
    update(false, updater, stackTraceElements[2].getMethodName());
1✔
213
  }
1✔
214

215
  private void completeDecisionUpdate(UpdateProcedure updater, StickyExecutionAttributes attributes)
216
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
217
          BadRequestError {
218
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
219
    stickyExecutionAttributes = attributes;
1✔
220
    update(true, updater, stackTraceElements[2].getMethodName());
1✔
221
  }
1✔
222

223
  private void update(boolean completeDecisionUpdate, UpdateProcedure updater, String caller)
224
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
225
          BadRequestError {
226
    String callerInfo = "Decision Update from " + caller;
1✔
227
    lock.lock();
1✔
228
    LockHandle lockHandle = selfAdvancingTimer.lockTimeSkipping(callerInfo);
1✔
229

230
    try {
231
      checkCompleted();
1✔
232
      boolean concurrentDecision =
1✔
233
          !completeDecisionUpdate
234
              && (decision != null && decision.getState() == StateMachines.State.STARTED);
1✔
235

236
      RequestContext ctx = new RequestContext(clock, this, nextEventId);
1✔
237
      updater.apply(ctx);
1✔
238
      setPendingQueries(ctx);
1✔
239
      if (concurrentDecision && workflow.getState() != State.TIMED_OUT) {
1✔
240
        concurrentToDecision.add(ctx);
1✔
241
        ctx.fireCallbacks(0);
1✔
242
        store.applyTimersAndLocks(ctx);
1✔
243
      } else {
244
        nextEventId = ctx.commitChanges(store);
1✔
245
      }
246
    } catch (InternalServiceError
1✔
247
        | EntityNotExistsError
248
        | WorkflowExecutionAlreadyCompletedError
249
        | BadRequestError e) {
250
      throw e;
1✔
251
    } catch (Exception e) {
×
252
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
253
    } finally {
254
      lockHandle.unlock();
1✔
255
      lock.unlock();
1✔
256
    }
257
  }
1✔
258

259
  private void setPendingQueries(RequestContext ctx) {
260
    TestWorkflowStore.DecisionTask decisionTask = ctx.getDecisionTask();
1✔
261
    if (decisionTask != null) {
1✔
262
      decisionTask.getTask().setQueries(new HashMap<>(pendingQueries));
1✔
263
    }
264
  }
1✔
265

266
  @Override
267
  public ExecutionId getExecutionId() {
268
    return executionId;
1✔
269
  }
270

271
  @Override
272
  public Optional<WorkflowExecutionCloseStatus> getCloseStatus() {
273
    switch (workflow.getState()) {
1✔
274
      case NONE:
275
      case INITIATED:
276
      case STARTED:
277
      case CANCELLATION_REQUESTED:
278
        return Optional.empty();
1✔
279
      case FAILED:
280
        return Optional.of(WorkflowExecutionCloseStatus.FAILED);
1✔
281
      case TIMED_OUT:
282
        return Optional.of(WorkflowExecutionCloseStatus.TIMED_OUT);
×
283
      case CANCELED:
284
        return Optional.of(WorkflowExecutionCloseStatus.CANCELED);
×
285
      case COMPLETED:
286
        return Optional.of(WorkflowExecutionCloseStatus.COMPLETED);
1✔
287
      case CONTINUED_AS_NEW:
288
        return Optional.of(WorkflowExecutionCloseStatus.CONTINUED_AS_NEW);
×
289
    }
290
    throw new IllegalStateException("unreachable");
×
291
  }
292

293
  @Override
294
  public StartWorkflowExecutionRequest getStartRequest() {
295
    return startRequest;
1✔
296
  }
297

298
  @Override
299
  public StickyExecutionAttributes getStickyExecutionAttributes() {
300
    return stickyExecutionAttributes;
1✔
301
  }
302

303
  @Override
304
  public Optional<TestWorkflowMutableState> getParent() {
305
    return parent;
1✔
306
  }
307

308
  @Override
309
  public void startDecisionTask(
310
      PollForDecisionTaskResponse task, PollForDecisionTaskRequest pollRequest)
311
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
312
          BadRequestError {
313
    if (task.getQuery() == null) {
1✔
314
      update(
1✔
315
          ctx -> {
316
            long scheduledEventId = decision.getData().scheduledEventId;
1✔
317
            decision.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
318
            ctx.addTimer(
1✔
319
                startRequest.getTaskStartToCloseTimeoutSeconds(),
1✔
320
                () -> timeoutDecisionTask(scheduledEventId),
1✔
321
                "DecisionTask StartToCloseTimeout");
322
          });
1✔
323
    }
324
  }
1✔
325

326
  @Override
327
  public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRequest request)
328
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
329
          BadRequestError {
330
    List<Decision> decisions = request.getDecisions();
1✔
331
    completeDecisionUpdate(
1✔
332
        ctx -> {
333
          if (request.getQueryResultsSize() > 0) {
1✔
334
            request
×
335
                .getQueryResults()
×
336
                .forEach(
×
337
                    (queryId, queryResult) -> {
338
                      completeQuery(queryId, queryResult);
×
339
                      pendingQueries.remove(queryId);
×
340
                    });
×
341
          }
342

343
          if (ctx.getInitialEventId() != historySize + 1) {
1✔
344
            throw new BadRequestError(
1✔
345
                "Expired decision: expectedHistorySize="
346
                    + historySize
347
                    + ","
348
                    + " actualHistorySize="
349
                    + ctx.getInitialEventId());
1✔
350
          }
351
          long decisionTaskCompletedId = ctx.getNextEventId() - 1;
1✔
352
          // Fail the decision if there are new events and the decision tries to complete the
353
          // workflow
354
          if (!concurrentToDecision.isEmpty() && hasCompleteDecision(request.getDecisions())) {
1✔
355
            RespondDecisionTaskFailedRequest failedRequest =
1✔
356
                new RespondDecisionTaskFailedRequest()
357
                    .setCause(DecisionTaskFailedCause.UNHANDLED_DECISION)
1✔
358
                    .setIdentity(request.getIdentity());
1✔
359
            decision.action(Action.FAIL, ctx, failedRequest, decisionTaskCompletedId);
1✔
360
            for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
361
              ctx.add(deferredCtx);
1✔
362
            }
1✔
363
            this.concurrentToDecision.clear();
1✔
364

365
            // Reset sticky execution attributes on failure
366
            stickyExecutionAttributes = null;
1✔
367
            scheduleDecision(ctx);
1✔
368
            return;
1✔
369
          }
370
          if (decision == null) {
1✔
371
            throw new EntityNotExistsError("No outstanding decision");
×
372
          }
373
          decision.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
374
          for (Decision d : decisions) {
1✔
375
            processDecision(ctx, d, request.getIdentity(), decisionTaskCompletedId);
1✔
376
          }
1✔
377
          for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
378
            ctx.add(deferredCtx);
1✔
379
          }
1✔
380
          lastNonFailedDecisionStartEventId = this.decision.getData().startedEventId;
1✔
381
          this.decision = null;
1✔
382
          boolean completed =
1✔
383
              workflow.getState() == StateMachines.State.COMPLETED
1✔
384
                  || workflow.getState() == StateMachines.State.FAILED
1✔
385
                  || workflow.getState() == StateMachines.State.CANCELED;
1✔
386
          if (!completed
1✔
387
              && ((ctx.isNeedDecision() || !this.concurrentToDecision.isEmpty())
1✔
388
                  || request.isForceCreateNewDecisionTask())) {
1✔
389
            scheduleDecision(ctx);
1✔
390
          }
391
          this.concurrentToDecision.clear();
1✔
392
          ctx.unlockTimer();
1✔
393
        },
1✔
394
        request.getStickyAttributes());
1✔
395
  }
1✔
396

397
  private void completeQuery(String queryId, WorkflowQueryResult queryResult) {
398
    CompletableFuture<QueryWorkflowResponse> future = queries.get(queryId);
×
399
    if (future == null) {
×
400
      throw new RuntimeException("Unknown query id: " + queryId);
×
401
    }
402
    if (queryResult.getResultType() == QueryResultType.ANSWERED) {
×
403
      future.complete(new QueryWorkflowResponse().setQueryResult(queryResult.getAnswer()));
×
404
    } else {
405
      future.completeExceptionally(
×
406
          new QueryFailedError().setMessage(queryResult.getErrorMessage()));
×
407
    }
408
  }
×
409

410
  private boolean hasCompleteDecision(List<Decision> decisions) {
411
    for (Decision d : decisions) {
1✔
412
      if (WorkflowExecutionUtils.isWorkflowExecutionCompleteDecision(d)) {
1✔
413
        return true;
1✔
414
      }
415
    }
1✔
416
    return false;
1✔
417
  }
418

419
  private void processDecision(
420
      RequestContext ctx, Decision d, String identity, long decisionTaskCompletedId)
421
      throws BadRequestError, InternalServiceError {
422
    switch (d.getDecisionType()) {
1✔
423
      case CompleteWorkflowExecution:
424
        processCompleteWorkflowExecution(
1✔
425
            ctx,
426
            d.getCompleteWorkflowExecutionDecisionAttributes(),
1✔
427
            decisionTaskCompletedId,
428
            identity);
429
        break;
1✔
430
      case FailWorkflowExecution:
431
        processFailWorkflowExecution(
1✔
432
            ctx, d.getFailWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId, identity);
1✔
433
        break;
1✔
434
      case CancelWorkflowExecution:
435
        processCancelWorkflowExecution(
1✔
436
            ctx, d.getCancelWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
437
        break;
1✔
438
      case ContinueAsNewWorkflowExecution:
439
        processContinueAsNewWorkflowExecution(
1✔
440
            ctx,
441
            d.getContinueAsNewWorkflowExecutionDecisionAttributes(),
1✔
442
            decisionTaskCompletedId,
443
            identity);
444
        break;
1✔
445
      case ScheduleActivityTask:
446
        processScheduleActivityTask(
1✔
447
            ctx, d.getScheduleActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
448
        break;
1✔
449
      case RequestCancelActivityTask:
450
        processRequestCancelActivityTask(
1✔
451
            ctx, d.getRequestCancelActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
452
        break;
1✔
453
      case StartTimer:
454
        processStartTimer(ctx, d.getStartTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
455
        break;
1✔
456
      case CancelTimer:
457
        processCancelTimer(ctx, d.getCancelTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
458
        break;
1✔
459
      case StartChildWorkflowExecution:
460
        processStartChildWorkflow(
1✔
461
            ctx, d.getStartChildWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
462
        break;
1✔
463
      case SignalExternalWorkflowExecution:
464
        processSignalExternalWorkflowExecution(
1✔
465
            ctx, d.getSignalExternalWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
466
        break;
1✔
467
      case RecordMarker:
468
        processRecordMarker(ctx, d.getRecordMarkerDecisionAttributes(), decisionTaskCompletedId);
1✔
469
        break;
1✔
470
      case RequestCancelExternalWorkflowExecution:
471
        processRequestCancelExternalWorkflowExecution(
1✔
472
            ctx, d.getRequestCancelExternalWorkflowExecutionDecisionAttributes());
1✔
473
        break;
1✔
474
      case UpsertWorkflowSearchAttributes:
475
        processUpsertWorkflowSearchAttributes(
1✔
476
            ctx, d.getUpsertWorkflowSearchAttributesDecisionAttributes(), decisionTaskCompletedId);
1✔
477
        break;
478
    }
479
  }
1✔
480

481
  private void processRequestCancelExternalWorkflowExecution(
482
      RequestContext ctx, RequestCancelExternalWorkflowExecutionDecisionAttributes attr) {
483
    ForkJoinPool.commonPool()
1✔
484
        .execute(
1✔
485
            () -> {
486
              RequestCancelWorkflowExecutionRequest request =
1✔
487
                  new RequestCancelWorkflowExecutionRequest();
488
              WorkflowExecution workflowExecution = new WorkflowExecution();
1✔
489
              workflowExecution.setWorkflowId(attr.workflowId);
1✔
490
              request.setWorkflowExecution(workflowExecution);
1✔
491
              request.setDomain(ctx.getDomain());
1✔
492
              try {
493
                service.RequestCancelWorkflowExecution(request);
1✔
494
              } catch (Exception e) {
×
495
                log.error("Failure to request cancel external workflow", e);
×
496
              }
1✔
497
            });
1✔
498
  }
1✔
499

500
  private void processRecordMarker(
501
      RequestContext ctx, RecordMarkerDecisionAttributes attr, long decisionTaskCompletedId)
502
      throws BadRequestError {
503
    if (!attr.isSetMarkerName()) {
1✔
504
      throw new BadRequestError("marker name is required");
×
505
    }
506

507
    MarkerRecordedEventAttributes marker =
1✔
508
        new MarkerRecordedEventAttributes()
509
            .setMarkerName(attr.getMarkerName())
1✔
510
            .setHeader(attr.getHeader())
1✔
511
            .setDetails(attr.getDetails())
1✔
512
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
513
    HistoryEvent event =
1✔
514
        new HistoryEvent()
515
            .setEventType(EventType.MarkerRecorded)
1✔
516
            .setMarkerRecordedEventAttributes(marker);
1✔
517
    ctx.addEvent(event);
1✔
518
  }
1✔
519

520
  private void processCancelTimer(
521
      RequestContext ctx, CancelTimerDecisionAttributes d, long decisionTaskCompletedId)
522
      throws InternalServiceError, BadRequestError {
523
    String timerId = d.getTimerId();
1✔
524
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
525
    if (timer == null) {
1✔
526
      CancelTimerFailedEventAttributes failedAttr =
×
527
          new CancelTimerFailedEventAttributes()
528
              .setTimerId(timerId)
×
529
              .setCause("TIMER_ID_UNKNOWN")
×
530
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
531
      HistoryEvent cancellationFailed =
×
532
          new HistoryEvent()
533
              .setEventType(EventType.CancelTimerFailed)
×
534
              .setCancelTimerFailedEventAttributes(failedAttr);
×
535
      ctx.addEvent(cancellationFailed);
×
536
      return;
×
537
    }
538
    timer.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
539
    timers.remove(timerId);
1✔
540
  }
1✔
541

542
  private void processRequestCancelActivityTask(
543
      RequestContext ctx,
544
      RequestCancelActivityTaskDecisionAttributes a,
545
      long decisionTaskCompletedId)
546
      throws InternalServiceError, BadRequestError {
547
    String activityId = a.getActivityId();
1✔
548
    StateMachine<?> activity = activities.get(activityId);
1✔
549
    if (activity == null) {
1✔
550
      RequestCancelActivityTaskFailedEventAttributes failedAttr =
×
551
          new RequestCancelActivityTaskFailedEventAttributes()
552
              .setActivityId(activityId)
×
553
              .setCause("ACTIVITY_ID_UNKNOWN")
×
554
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
555
      HistoryEvent cancellationFailed =
×
556
          new HistoryEvent()
557
              .setEventType(EventType.RequestCancelActivityTaskFailed)
×
558
              .setRequestCancelActivityTaskFailedEventAttributes(failedAttr);
×
559
      ctx.addEvent(cancellationFailed);
×
560
      return;
×
561
    }
562
    State beforeState = activity.getState();
1✔
563
    activity.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, a, decisionTaskCompletedId);
1✔
564
    if (beforeState == StateMachines.State.INITIATED) {
1✔
565
      activity.action(StateMachines.Action.CANCEL, ctx, null, 0);
×
566
      activities.remove(activityId);
×
567
      ctx.setNeedDecision(true);
×
568
    }
569
  }
1✔
570

571
  private void processScheduleActivityTask(
572
      RequestContext ctx, ScheduleActivityTaskDecisionAttributes a, long decisionTaskCompletedId)
573
      throws BadRequestError, InternalServiceError {
574
    validateScheduleActivityTask(a);
1✔
575
    String activityId = a.getActivityId();
1✔
576
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
577
    if (activity != null) {
1✔
578
      throw new BadRequestError("Already open activity with " + activityId);
×
579
    }
580
    activity = StateMachines.newActivityStateMachine(store, this.startRequest);
1✔
581
    activities.put(activityId, activity);
1✔
582
    activity.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
583
    ActivityTaskScheduledEventAttributes scheduledEvent = activity.getData().scheduledEvent;
1✔
584
    ctx.addTimer(
1✔
585
        scheduledEvent.getScheduleToCloseTimeoutSeconds(),
1✔
586
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_CLOSE),
1✔
587
        "Activity ScheduleToCloseTimeout");
588
    ctx.addTimer(
1✔
589
        scheduledEvent.getScheduleToStartTimeoutSeconds(),
1✔
590
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_START),
1✔
591
        "Activity ScheduleToStartTimeout");
592
    ctx.lockTimer();
1✔
593
  }
1✔
594

595
  private void validateScheduleActivityTask(ScheduleActivityTaskDecisionAttributes a)
596
      throws BadRequestError {
597
    if (a == null) {
1✔
598
      throw new BadRequestError("ScheduleActivityTaskDecisionAttributes is not set on decision.");
×
599
    }
600

601
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
602
      throw new BadRequestError("TaskList is not set on decision.");
×
603
    }
604
    if (a.getActivityId() == null || a.getActivityId().isEmpty()) {
1✔
605
      throw new BadRequestError("ActivityId is not set on decision.");
×
606
    }
607
    if (a.getActivityType() == null
1✔
608
        || a.getActivityType().getName() == null
1✔
609
        || a.getActivityType().getName().isEmpty()) {
1✔
610
      throw new BadRequestError("ActivityType is not set on decision.");
×
611
    }
612
    if (a.getStartToCloseTimeoutSeconds() <= 0) {
1✔
613
      throw new BadRequestError("A valid StartToCloseTimeoutSeconds is not set on decision.");
×
614
    }
615
    if (a.getScheduleToStartTimeoutSeconds() <= 0) {
1✔
616
      throw new BadRequestError("A valid ScheduleToStartTimeoutSeconds is not set on decision.");
×
617
    }
618
    if (a.getScheduleToCloseTimeoutSeconds() <= 0) {
1✔
619
      throw new BadRequestError("A valid ScheduleToCloseTimeoutSeconds is not set on decision.");
×
620
    }
621
    if (a.getHeartbeatTimeoutSeconds() < 0) {
1✔
622
      throw new BadRequestError("Ac valid HeartbeatTimeoutSeconds is not set on decision.");
×
623
    }
624
  }
1✔
625

626
  private void processStartChildWorkflow(
627
      RequestContext ctx,
628
      StartChildWorkflowExecutionDecisionAttributes a,
629
      long decisionTaskCompletedId)
630
      throws BadRequestError, InternalServiceError {
631
    validateStartChildExecutionAttributes(a);
1✔
632
    StateMachine<ChildWorkflowData> child = StateMachines.newChildWorkflowStateMachine(service);
1✔
633
    childWorkflows.put(ctx.getNextEventId(), child);
1✔
634
    child.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
635
    ctx.lockTimer();
1✔
636
  }
1✔
637

638
  /** Clone of the validateStartChildExecutionAttributes from historyEngine.go */
639
  private void validateStartChildExecutionAttributes(
640
      StartChildWorkflowExecutionDecisionAttributes a) throws BadRequestError {
641
    if (a == null) {
1✔
642
      throw new BadRequestError(
×
643
          "StartChildWorkflowExecutionDecisionAttributes is not set on decision.");
644
    }
645

646
    if (a.getWorkflowId().isEmpty()) {
1✔
647
      throw new BadRequestError("Required field WorkflowID is not set on decision.");
×
648
    }
649

650
    if (a.getWorkflowType() == null || a.getWorkflowType().getName().isEmpty()) {
1✔
651
      throw new BadRequestError("Required field WorkflowType is not set on decision.");
×
652
    }
653

654
    // Inherit tasklist from parent workflow execution if not provided on decision
655
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
656
      a.setTaskList(startRequest.getTaskList());
×
657
    }
658

659
    // Inherit workflow timeout from parent workflow execution if not provided on decision
660
    if (a.getExecutionStartToCloseTimeoutSeconds() <= 0) {
1✔
661
      a.setExecutionStartToCloseTimeoutSeconds(
×
662
          startRequest.getExecutionStartToCloseTimeoutSeconds());
×
663
    }
664

665
    // Inherit decision task timeout from parent workflow execution if not provided on decision
666
    if (a.getTaskStartToCloseTimeoutSeconds() <= 0) {
1✔
667
      a.setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds());
×
668
    }
669

670
    RetryPolicy retryPolicy = a.getRetryPolicy();
1✔
671
    if (retryPolicy != null) {
1✔
672
      RetryState.validateRetryPolicy(retryPolicy);
1✔
673
    }
674
  }
1✔
675

676
  private void processSignalExternalWorkflowExecution(
677
      RequestContext ctx,
678
      SignalExternalWorkflowExecutionDecisionAttributes a,
679
      long decisionTaskCompletedId)
680
      throws InternalServiceError, BadRequestError {
681
    String signalId = UUID.randomUUID().toString();
1✔
682
    StateMachine<SignalExternalData> signalStateMachine =
683
        StateMachines.newSignalExternalStateMachine();
1✔
684
    externalSignals.put(signalId, signalStateMachine);
1✔
685
    signalStateMachine.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
686
    ForkJoinPool.commonPool()
1✔
687
        .execute(
1✔
688
            () -> {
689
              try {
690
                service.signalExternalWorkflowExecution(signalId, a, this);
1✔
691
              } catch (Exception e) {
×
692
                log.error("Failure signalling an external workflow execution", e);
×
693
              }
1✔
694
            });
1✔
695
    ctx.lockTimer();
1✔
696
  }
1✔
697

698
  @Override
699
  public void completeSignalExternalWorkflowExecution(String signalId, String runId)
700
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
701
          BadRequestError {
702
    update(
1✔
703
        ctx -> {
704
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
705
          signal.action(Action.COMPLETE, ctx, runId, 0);
1✔
706
          scheduleDecision(ctx);
1✔
707
          ctx.unlockTimer();
1✔
708
        });
1✔
709
  }
1✔
710

711
  @Override
712
  public void failSignalExternalWorkflowExecution(
713
      String signalId, SignalExternalWorkflowExecutionFailedCause cause)
714
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
715
          BadRequestError {
716
    update(
1✔
717
        ctx -> {
718
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
719
          signal.action(Action.FAIL, ctx, cause, 0);
1✔
720
          scheduleDecision(ctx);
1✔
721
          ctx.unlockTimer();
1✔
722
        });
1✔
723
  }
1✔
724

725
  private StateMachine<SignalExternalData> getSignal(String signalId) throws EntityNotExistsError {
726
    StateMachine<SignalExternalData> signal = externalSignals.get(signalId);
1✔
727
    if (signal == null) {
1✔
728
      throw new EntityNotExistsError("unknown signalId: " + signalId);
×
729
    }
730
    return signal;
1✔
731
  }
732

733
  // TODO: insert a single decision failure into the history
734
  @Override
735
  public void failDecisionTask(RespondDecisionTaskFailedRequest request)
736
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
737
          BadRequestError {
738
    completeDecisionUpdate(
1✔
739
        ctx -> {
740
          decision.action(Action.FAIL, ctx, request, 0);
1✔
741
          scheduleDecision(ctx);
1✔
742
        },
1✔
743
        null); // reset sticky attributes to null
744
  }
1✔
745

746
  // TODO: insert a single decision timeout into the history
747
  private void timeoutDecisionTask(long scheduledEventId) {
748
    try {
749
      completeDecisionUpdate(
1✔
750
          ctx -> {
751
            if (decision == null
1✔
752
                || decision.getData().scheduledEventId != scheduledEventId
1✔
753
                || decision.getState() == State.COMPLETED) {
1✔
754
              // timeout for a previous decision
755
              return;
1✔
756
            }
757
            decision.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
758
            scheduleDecision(ctx);
1✔
759
          },
1✔
760
          null); // reset sticky attributes to null
761
    } catch (EntityNotExistsError e) {
×
762
      // Expected as timers are not removed
763
    } catch (Exception e) {
1✔
764
      // Cannot fail to timer threads
765
      log.error("Failure trying to timeout a decision scheduledEventId=" + scheduledEventId, e);
1✔
766
    }
1✔
767
  }
1✔
768

769
  @Override
770
  public void childWorkflowStarted(ChildWorkflowExecutionStartedEventAttributes a)
771
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
772
          BadRequestError {
773
    update(
1✔
774
        ctx -> {
775
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
776
          child.action(StateMachines.Action.START, ctx, a, 0);
1✔
777
          scheduleDecision(ctx);
1✔
778
          // No need to lock until completion as child workflow might skip
779
          // time as well
780
          ctx.unlockTimer();
1✔
781
        });
1✔
782
  }
1✔
783

784
  @Override
785
  public void childWorkflowFailed(String activityId, ChildWorkflowExecutionFailedEventAttributes a)
786
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
787
          BadRequestError {
788
    update(
1✔
789
        ctx -> {
790
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
791
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
792
          childWorkflows.remove(a.getInitiatedEventId());
1✔
793
          scheduleDecision(ctx);
1✔
794
          ctx.unlockTimer();
1✔
795
        });
1✔
796
  }
1✔
797

798
  @Override
799
  public void childWorkflowTimedOut(
800
      String activityId, ChildWorkflowExecutionTimedOutEventAttributes a)
801
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
802
          BadRequestError {
803
    update(
1✔
804
        ctx -> {
805
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
806
          child.action(Action.TIME_OUT, ctx, a.getTimeoutType(), 0);
1✔
807
          childWorkflows.remove(a.getInitiatedEventId());
1✔
808
          scheduleDecision(ctx);
1✔
809
          ctx.unlockTimer();
1✔
810
        });
1✔
811
  }
1✔
812

813
  @Override
814
  public void failStartChildWorkflow(
815
      String childId, StartChildWorkflowExecutionFailedEventAttributes a)
816
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
817
          BadRequestError {
818
    update(
1✔
819
        ctx -> {
820
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
821
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
822
          childWorkflows.remove(a.getInitiatedEventId());
1✔
823
          scheduleDecision(ctx);
1✔
824
          ctx.unlockTimer();
1✔
825
        });
1✔
826
  }
1✔
827

828
  @Override
829
  public void childWorkflowCompleted(
830
      String activityId, ChildWorkflowExecutionCompletedEventAttributes a)
831
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
832
          BadRequestError {
833
    update(
1✔
834
        ctx -> {
835
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
836
          child.action(StateMachines.Action.COMPLETE, ctx, a, 0);
1✔
837
          childWorkflows.remove(a.getInitiatedEventId());
1✔
838
          scheduleDecision(ctx);
1✔
839
          ctx.unlockTimer();
1✔
840
        });
1✔
841
  }
1✔
842

843
  @Override
844
  public void childWorkflowCanceled(
845
      String activityId, ChildWorkflowExecutionCanceledEventAttributes a)
846
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
847
          BadRequestError {
848
    update(
1✔
849
        ctx -> {
850
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
851
          child.action(StateMachines.Action.CANCEL, ctx, a, 0);
1✔
852
          childWorkflows.remove(a.getInitiatedEventId());
1✔
853
          scheduleDecision(ctx);
1✔
854
          ctx.unlockTimer();
1✔
855
        });
1✔
856
  }
1✔
857

858
  private void processStartTimer(
859
      RequestContext ctx, StartTimerDecisionAttributes a, long decisionTaskCompletedId)
860
      throws BadRequestError, InternalServiceError {
861
    String timerId = a.getTimerId();
1✔
862
    if (timerId == null) {
1✔
863
      throw new BadRequestError("A valid TimerId is not set on StartTimerDecision");
×
864
    }
865
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
866
    if (timer != null) {
1✔
867
      throw new BadRequestError("Already open timer with " + timerId);
×
868
    }
869
    timer = StateMachines.newTimerStateMachine();
1✔
870
    timers.put(timerId, timer);
1✔
871
    timer.action(StateMachines.Action.START, ctx, a, decisionTaskCompletedId);
1✔
872
    ctx.addTimer(a.getStartToFireTimeoutSeconds(), () -> fireTimer(timerId), "fire timer");
1✔
873
  }
1✔
874

875
  private void fireTimer(String timerId) {
876
    StateMachine<TimerData> timer;
877
    lock.lock();
1✔
878
    try {
879
      {
880
        timer = timers.get(timerId);
1✔
881
        if (timer == null || workflow.getState() != State.STARTED) {
1✔
882
          return; // cancelled already
1✔
883
        }
884
      }
885
    } finally {
886
      lock.unlock();
1✔
887
    }
888
    try {
889
      update(
1✔
890
          ctx -> {
891
            timer.action(StateMachines.Action.COMPLETE, ctx, null, 0);
1✔
892
            timers.remove(timerId);
1✔
893
            scheduleDecision(ctx);
1✔
894
          });
1✔
895
    } catch (BadRequestError
×
896
        | InternalServiceError
897
        | EntityNotExistsError
898
        | WorkflowExecutionAlreadyCompletedError e) {
899
      // Cannot fail to timer threads
900
      log.error("Failure firing a timer", e);
×
901
    }
1✔
902
  }
1✔
903

904
  private void processFailWorkflowExecution(
905
      RequestContext ctx,
906
      FailWorkflowExecutionDecisionAttributes d,
907
      long decisionTaskCompletedId,
908
      String identity)
909
      throws InternalServiceError, BadRequestError {
910
    WorkflowData data = workflow.getData();
1✔
911
    if (data.retryState.isPresent()) {
1✔
912
      RetryState rs = data.retryState.get();
1✔
913
      int backoffIntervalSeconds =
1✔
914
          rs.getBackoffIntervalInSeconds(d.getReason(), store.currentTimeMillis());
1✔
915
      if (backoffIntervalSeconds > 0) {
1✔
916
        ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
917
            new ContinueAsNewWorkflowExecutionDecisionAttributes()
918
                .setInput(startRequest.getInput())
1✔
919
                .setWorkflowType(startRequest.getWorkflowType())
1✔
920
                .setExecutionStartToCloseTimeoutSeconds(
1✔
921
                    startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
922
                .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
923
                .setTaskList(startRequest.getTaskList())
1✔
924
                .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
925
                .setRetryPolicy(startRequest.getRetryPolicy());
1✔
926
        workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
927
        HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
928
        WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
929
            event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
930

931
        Optional<RetryState> continuedRetryState = Optional.of(rs.getNextAttempt());
1✔
932
        String runId =
1✔
933
            service.continueAsNew(
1✔
934
                startRequest,
935
                continuedAsNewEventAttributes,
936
                continuedRetryState,
937
                identity,
938
                getExecutionId(),
1✔
939
                parent,
940
                parentChildInitiatedEventId);
941
        continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
942
        return;
1✔
943
      }
944
    }
945

946
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
947
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, data.lastCompletionResult);
1✔
948
      return;
1✔
949
    }
950

951
    workflow.action(StateMachines.Action.FAIL, ctx, d, decisionTaskCompletedId);
1✔
952
    if (parent.isPresent()) {
1✔
953
      ctx.lockTimer(); // unlocked by the parent
1✔
954
      ChildWorkflowExecutionFailedEventAttributes a =
1✔
955
          new ChildWorkflowExecutionFailedEventAttributes()
956
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
957
              .setDetails(d.getDetails())
1✔
958
              .setReason(d.getReason())
1✔
959
              .setWorkflowType(startRequest.getWorkflowType())
1✔
960
              .setDomain(ctx.getDomain())
1✔
961
              .setWorkflowExecution(ctx.getExecution());
1✔
962
      ForkJoinPool.commonPool()
1✔
963
          .execute(
1✔
964
              () -> {
965
                try {
966
                  parent
1✔
967
                      .get()
1✔
968
                      .childWorkflowFailed(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
969
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
970
                  // Parent might already close
971
                } catch (BadRequestError | InternalServiceError e) {
×
972
                  log.error("Failure reporting child completion", e);
×
973
                }
1✔
974
              });
1✔
975
    }
976
  }
1✔
977

978
  private void processCompleteWorkflowExecution(
979
      RequestContext ctx,
980
      CompleteWorkflowExecutionDecisionAttributes d,
981
      long decisionTaskCompletedId,
982
      String identity)
983
      throws InternalServiceError, BadRequestError {
984
    WorkflowData data = workflow.getData();
1✔
985
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
986
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, d.getResult());
1✔
987
      return;
1✔
988
    }
989

990
    workflow.action(StateMachines.Action.COMPLETE, ctx, d, decisionTaskCompletedId);
1✔
991
    if (parent.isPresent()) {
1✔
992
      ctx.lockTimer(); // unlocked by the parent
1✔
993
      ChildWorkflowExecutionCompletedEventAttributes a =
1✔
994
          new ChildWorkflowExecutionCompletedEventAttributes()
995
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
996
              .setResult(d.getResult())
1✔
997
              .setDomain(ctx.getDomain())
1✔
998
              .setWorkflowExecution(ctx.getExecution())
1✔
999
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1000
      ForkJoinPool.commonPool()
1✔
1001
          .execute(
1✔
1002
              () -> {
1003
                try {
1004
                  parent
1✔
1005
                      .get()
1✔
1006
                      .childWorkflowCompleted(
1✔
1007
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1008
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1009
                  // Parent might already close
1010
                } catch (BadRequestError | InternalServiceError e) {
×
1011
                  log.error("Failure reporting child completion", e);
×
1012
                }
1✔
1013
              });
1✔
1014
    }
1015
  }
1✔
1016

1017
  private void startNewCronRun(
1018
      RequestContext ctx,
1019
      long decisionTaskCompletedId,
1020
      String identity,
1021
      WorkflowData data,
1022
      byte[] lastCompletionResult)
1023
      throws InternalServiceError, BadRequestError {
1024
    CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX);
1✔
1025
    CronParser parser = new CronParser(cronDefinition);
1✔
1026
    Cron cron = parser.parse(data.cronSchedule);
1✔
1027

1028
    Instant i = Instant.ofEpochMilli(store.currentTimeMillis());
1✔
1029
    ZonedDateTime now = ZonedDateTime.ofInstant(i, ZoneOffset.UTC);
1✔
1030

1031
    ExecutionTime executionTime = ExecutionTime.forCron(cron);
1✔
1032
    Optional<Duration> backoff = executionTime.timeToNextExecution(now);
1✔
1033
    int backoffIntervalSeconds = (int) backoff.get().getSeconds();
1✔
1034

1035
    if (backoffIntervalSeconds == 0) {
1✔
1036
      backoff = executionTime.timeToNextExecution(now.plusSeconds(1));
1✔
1037
      backoffIntervalSeconds = (int) backoff.get().getSeconds() + 1;
1✔
1038
    }
1039

1040
    ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
1041
        new ContinueAsNewWorkflowExecutionDecisionAttributes()
1042
            .setInput(startRequest.getInput())
1✔
1043
            .setWorkflowType(startRequest.getWorkflowType())
1✔
1044
            .setExecutionStartToCloseTimeoutSeconds(
1✔
1045
                startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
1046
            .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
1047
            .setTaskList(startRequest.getTaskList())
1✔
1048
            .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
1049
            .setRetryPolicy(startRequest.getRetryPolicy())
1✔
1050
            .setLastCompletionResult(lastCompletionResult);
1✔
1051
    workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
1052
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1053
    WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
1054
        event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
1055

1056
    String runId =
1✔
1057
        service.continueAsNew(
1✔
1058
            startRequest,
1059
            continuedAsNewEventAttributes,
1060
            Optional.empty(),
1✔
1061
            identity,
1062
            getExecutionId(),
1✔
1063
            parent,
1064
            parentChildInitiatedEventId);
1065
    continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
1066
  }
1✔
1067

1068
  private void processCancelWorkflowExecution(
1069
      RequestContext ctx, CancelWorkflowExecutionDecisionAttributes d, long decisionTaskCompletedId)
1070
      throws InternalServiceError, BadRequestError {
1071
    workflow.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
1072
    if (parent.isPresent()) {
1✔
1073
      ctx.lockTimer(); // unlocked by the parent
1✔
1074
      ChildWorkflowExecutionCanceledEventAttributes a =
1✔
1075
          new ChildWorkflowExecutionCanceledEventAttributes()
1076
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1077
              .setDetails(d.getDetails())
1✔
1078
              .setDomain(ctx.getDomain())
1✔
1079
              .setWorkflowExecution(ctx.getExecution())
1✔
1080
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1081
      ForkJoinPool.commonPool()
1✔
1082
          .execute(
1✔
1083
              () -> {
1084
                try {
1085
                  parent
1✔
1086
                      .get()
1✔
1087
                      .childWorkflowCanceled(
1✔
1088
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1089
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1090
                  // Parent might already close
1091
                } catch (BadRequestError | InternalServiceError e) {
×
1092
                  log.error("Failure reporting child completion", e);
×
1093
                }
1✔
1094
              });
1✔
1095
    }
1096
  }
1✔
1097

1098
  private void processContinueAsNewWorkflowExecution(
1099
      RequestContext ctx,
1100
      ContinueAsNewWorkflowExecutionDecisionAttributes d,
1101
      long decisionTaskCompletedId,
1102
      String identity)
1103
      throws InternalServiceError, BadRequestError {
1104
    workflow.action(Action.CONTINUE_AS_NEW, ctx, d, decisionTaskCompletedId);
1✔
1105
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1106
    String runId =
1✔
1107
        service.continueAsNew(
1✔
1108
            startRequest,
1109
            event.getWorkflowExecutionContinuedAsNewEventAttributes(),
1✔
1110
            workflow.getData().retryState,
1✔
1111
            identity,
1112
            getExecutionId(),
1✔
1113
            parent,
1114
            parentChildInitiatedEventId);
1115
    event.getWorkflowExecutionContinuedAsNewEventAttributes().setNewExecutionRunId(runId);
1✔
1116
  }
1✔
1117

1118
  private void processUpsertWorkflowSearchAttributes(
1119
      RequestContext ctx,
1120
      UpsertWorkflowSearchAttributesDecisionAttributes attr,
1121
      long decisionTaskCompletedId)
1122
      throws BadRequestError, InternalServiceError {
1123
    UpsertWorkflowSearchAttributesEventAttributes upsertEventAttr =
1✔
1124
        new UpsertWorkflowSearchAttributesEventAttributes()
1125
            .setSearchAttributes(attr.getSearchAttributes())
1✔
1126
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
1127
    HistoryEvent event =
1✔
1128
        new HistoryEvent()
1129
            .setEventType(EventType.UpsertWorkflowSearchAttributes)
1✔
1130
            .setUpsertWorkflowSearchAttributesEventAttributes(upsertEventAttr);
1✔
1131
    ctx.addEvent(event);
1✔
1132
  }
1✔
1133

1134
  @Override
1135
  public void startWorkflow(
1136
      boolean continuedAsNew, Optional<SignalWorkflowExecutionRequest> signalWithStartSignal)
1137
      throws InternalServiceError, BadRequestError {
1138
    try {
1139
      update(
1✔
1140
          ctx -> {
1141
            workflow.action(StateMachines.Action.START, ctx, startRequest, 0);
1✔
1142
            if (signalWithStartSignal.isPresent()) {
1✔
1143
              addExecutionSignaledEvent(ctx, signalWithStartSignal.get());
1✔
1144
            }
1145
            int backoffStartIntervalInSeconds = workflow.getData().backoffStartIntervalInSeconds;
1✔
1146
            if (backoffStartIntervalInSeconds > 0) {
1✔
1147
              ctx.addTimer(
1✔
1148
                  backoffStartIntervalInSeconds,
1149
                  () -> {
1150
                    try {
1151
                      update(ctx1 -> scheduleDecision(ctx1));
1✔
1152
                    } catch (EntityNotExistsError e) {
×
1153
                      // Expected as timers are not removed
1154
                    } catch (Exception e) {
×
1155
                      // Cannot fail to timer threads
1156
                      log.error("Failure trying to add task for an delayed workflow retry", e);
×
1157
                    }
1✔
1158
                  },
1✔
1159
                  "delayedFirstDecision");
1160
            } else {
1161
              scheduleDecision(ctx);
1✔
1162
            }
1163

1164
            int executionTimeoutTimerDelay = startRequest.getExecutionStartToCloseTimeoutSeconds();
1✔
1165
            if (backoffStartIntervalInSeconds > 0) {
1✔
1166
              executionTimeoutTimerDelay =
1✔
1167
                  executionTimeoutTimerDelay + backoffStartIntervalInSeconds;
1168
            }
1169
            ctx.addTimer(
1✔
1170
                executionTimeoutTimerDelay, this::timeoutWorkflow, "workflow execution timeout");
1171
          });
1✔
1172
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1173
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1174
    }
1✔
1175
    if (!continuedAsNew && parent.isPresent()) {
1✔
1176
      ChildWorkflowExecutionStartedEventAttributes a =
1✔
1177
          new ChildWorkflowExecutionStartedEventAttributes()
1178
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1179
              .setWorkflowExecution(getExecutionId().getExecution())
1✔
1180
              .setDomain(getExecutionId().getDomain())
1✔
1181
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1182
      try {
1183
        parent.get().childWorkflowStarted(a);
1✔
1184
      } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1185
        // Not a problem. Parent might just close by now.
1186
      } catch (BadRequestError | InternalServiceError e) {
×
1187
        log.error("Failure reporting child completion", e);
×
1188
      }
1✔
1189
    }
1190
  }
1✔
1191

1192
  private void scheduleDecision(RequestContext ctx) throws InternalServiceError, BadRequestError {
1193
    if (decision != null) {
1✔
1194
      if (decision.getState() == StateMachines.State.INITIATED) {
1✔
1195
        return; // No need to schedule again
1✔
1196
      }
1197
      if (decision.getState() == StateMachines.State.STARTED) {
1✔
1198
        ctx.setNeedDecision(true);
1✔
1199
        return;
1✔
1200
      }
1201
      if (decision.getState() == StateMachines.State.FAILED
1✔
1202
          || decision.getState() == StateMachines.State.COMPLETED
1✔
1203
          || decision.getState() == State.TIMED_OUT) {
1✔
1204
        decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1205
        ctx.lockTimer();
1✔
1206
        return;
1✔
1207
      }
1208
      throw new InternalServiceError("unexpected decision state: " + decision.getState());
×
1209
    }
1210
    this.decision = StateMachines.newDecisionStateMachine(lastNonFailedDecisionStartEventId, store);
1✔
1211
    decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1212
    ctx.lockTimer();
1✔
1213
  }
1✔
1214

1215
  @Override
1216
  public void startActivityTask(
1217
      PollForActivityTaskResponse task, PollForActivityTaskRequest pollRequest)
1218
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1219
          BadRequestError {
1220
    update(
1✔
1221
        ctx -> {
1222
          String activityId = task.getActivityId();
1✔
1223
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1224
          activity.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
1225
          ActivityTaskData data = activity.getData();
1✔
1226
          int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1227
          int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1228
          if (startToCloseTimeout > 0) {
1✔
1229
            ctx.addTimer(
1✔
1230
                startToCloseTimeout,
1231
                () -> timeoutActivity(activityId, TimeoutType.START_TO_CLOSE),
1✔
1232
                "Activity StartToCloseTimeout");
1233
          }
1234
          updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1235
        });
1✔
1236
  }
1✔
1237

1238
  private void checkCompleted()
1239
      throws EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1240
    State workflowState = workflow.getState();
1✔
1241
    if (isTerminalState(workflowState)) {
1✔
1242
      throw new WorkflowExecutionAlreadyCompletedError(
1✔
1243
          "Workflow is already completed: " + workflowState);
1244
    }
1245
  }
1✔
1246

1247
  private boolean isTerminalState(State workflowState) {
1248
    return workflowState == State.COMPLETED
1✔
1249
        || workflowState == State.TIMED_OUT
1250
        || workflowState == State.FAILED
1251
        || workflowState == State.CANCELED
1252
        || workflowState == State.CONTINUED_AS_NEW;
1253
  }
1254

1255
  private void updateHeartbeatTimer(
1256
      RequestContext ctx,
1257
      String activityId,
1258
      StateMachine<ActivityTaskData> activity,
1259
      int startToCloseTimeout,
1260
      int heartbeatTimeout) {
1261
    if (heartbeatTimeout > 0 && heartbeatTimeout < startToCloseTimeout) {
1✔
1262
      activity.getData().lastHeartbeatTime = clock.getAsLong();
1✔
1263
      ctx.addTimer(
1✔
1264
          heartbeatTimeout,
1265
          () -> timeoutActivity(activityId, TimeoutType.HEARTBEAT),
1✔
1266
          "Activity Heartbeat Timeout");
1267
    }
1268
  }
1✔
1269

1270
  @Override
1271
  public void completeActivityTask(String activityId, RespondActivityTaskCompletedRequest request)
1272
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1273
          BadRequestError {
1274
    update(
1✔
1275
        ctx -> {
1276
          StateMachine<?> activity = getActivity(activityId);
1✔
1277
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1278
          activities.remove(activityId);
1✔
1279
          scheduleDecision(ctx);
1✔
1280
          ctx.unlockTimer();
1✔
1281
        });
1✔
1282
  }
1✔
1283

1284
  @Override
1285
  public void completeActivityTaskById(
1286
      String activityId, RespondActivityTaskCompletedByIDRequest request)
1287
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1288
          BadRequestError {
1289
    update(
1✔
1290
        ctx -> {
1291
          StateMachine<?> activity = getActivity(activityId);
1✔
1292
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1293
          activities.remove(activityId);
1✔
1294
          scheduleDecision(ctx);
1✔
1295
          ctx.unlockTimer();
1✔
1296
        });
1✔
1297
  }
1✔
1298

1299
  @Override
1300
  public void failActivityTask(String activityId, RespondActivityTaskFailedRequest request)
1301
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1302
          BadRequestError {
1303
    update(
1✔
1304
        ctx -> {
1305
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1306
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
1✔
1307
          if (isTerminalState(activity.getState())) {
1✔
1308
            activities.remove(activityId);
1✔
1309
            scheduleDecision(ctx);
1✔
1310
          } else {
1311
            addActivityRetryTimer(ctx, activity);
1✔
1312
          }
1313
          // Allow time skipping when waiting for retry
1314
          ctx.unlockTimer();
1✔
1315
        });
1✔
1316
  }
1✔
1317

1318
  private void addActivityRetryTimer(RequestContext ctx, StateMachine<ActivityTaskData> activity) {
1319
    ActivityTaskData data = activity.getData();
1✔
1320
    int attempt = data.retryState.getAttempt();
1✔
1321
    ctx.addTimer(
1✔
1322
        data.nextBackoffIntervalSeconds,
1323
        () -> {
1324
          // Timers are not removed, so skip if it is not for this attempt.
1325
          if (activity.getState() != State.INITIATED && data.retryState.getAttempt() != attempt) {
1✔
1326
            return;
×
1327
          }
1328
          selfAdvancingTimer.lockTimeSkipping(
1✔
1329
              "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
1✔
1330
          boolean unlockTimer = false;
1✔
1331
          try {
1332
            update(ctx1 -> ctx1.addActivityTask(data.activityTask));
1✔
1333
          } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1334
            unlockTimer = true;
×
1335
            // Expected as timers are not removed
1336
          } catch (Exception e) {
×
1337
            unlockTimer = true;
×
1338
            // Cannot fail to timer threads
1339
            log.error("Failure trying to add task for an activity retry", e);
×
1340
          } finally {
1341
            if (unlockTimer) {
1✔
1342
              // Allow time skipping when waiting for an activity retry
1343
              selfAdvancingTimer.unlockTimeSkipping(
×
1344
                  "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
×
1345
            }
1346
          }
1347
        },
1✔
1348
        "Activity Retry");
1349
  }
1✔
1350

1351
  @Override
1352
  public void failActivityTaskById(String activityId, RespondActivityTaskFailedByIDRequest request)
1353
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1354
          BadRequestError {
1355
    update(
1✔
1356
        ctx -> {
1357
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1358
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
1✔
1359
          if (isTerminalState(activity.getState())) {
1✔
1360
            activities.remove(activityId);
1✔
1361
            scheduleDecision(ctx);
1✔
1362
          } else {
1363
            addActivityRetryTimer(ctx, activity);
×
1364
          }
1365
          ctx.unlockTimer();
1✔
1366
        });
1✔
1367
  }
1✔
1368

1369
  @Override
1370
  public void cancelActivityTask(String activityId, RespondActivityTaskCanceledRequest request)
1371
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1372
          BadRequestError {
1373
    update(
1✔
1374
        ctx -> {
1375
          StateMachine<?> activity = getActivity(activityId);
1✔
1376
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
1✔
1377
          activities.remove(activityId);
1✔
1378
          scheduleDecision(ctx);
1✔
1379
          ctx.unlockTimer();
1✔
1380
        });
1✔
1381
  }
1✔
1382

1383
  @Override
1384
  public void cancelActivityTaskById(
1385
      String activityId, RespondActivityTaskCanceledByIDRequest request)
1386
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1387
          BadRequestError {
1388
    update(
1✔
1389
        ctx -> {
1390
          StateMachine<?> activity = getActivity(activityId);
1✔
1391
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
1✔
1392
          activities.remove(activityId);
1✔
1393
          scheduleDecision(ctx);
1✔
1394
          ctx.unlockTimer();
1✔
1395
        });
1✔
1396
  }
1✔
1397

1398
  @Override
1399
  public RecordActivityTaskHeartbeatResponse heartbeatActivityTask(
1400
      String activityId, byte[] details)
1401
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1402
    RecordActivityTaskHeartbeatResponse result = new RecordActivityTaskHeartbeatResponse();
1✔
1403
    try {
1404
      update(
1✔
1405
          ctx -> {
1406
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1407
            activity.action(StateMachines.Action.UPDATE, ctx, details, 0);
1✔
1408
            if (activity.getState() == StateMachines.State.CANCELLATION_REQUESTED) {
1✔
1409
              result.setCancelRequested(true);
1✔
1410
            }
1411
            ActivityTaskData data = activity.getData();
1✔
1412
            data.lastHeartbeatTime = clock.getAsLong();
1✔
1413
            int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1414
            int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1415
            updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1416
          });
1✔
1417

1418
    } catch (InternalServiceError
×
1419
        | EntityNotExistsError
1420
        | WorkflowExecutionAlreadyCompletedError e) {
1421
      throw e;
×
1422
    } catch (Exception e) {
×
1423
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1424
    }
1✔
1425
    return result;
1✔
1426
  }
1427

1428
  private void timeoutActivity(String activityId, TimeoutType timeoutType) {
1429
    boolean unlockTimer = true;
1✔
1430
    try {
1431
      update(
1✔
1432
          ctx -> {
1433
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1434
            if (timeoutType == TimeoutType.SCHEDULE_TO_START
1✔
1435
                && activity.getState() != StateMachines.State.INITIATED) {
1✔
1436
              throw new EntityNotExistsError("Not in INITIATED");
×
1437
            }
1438
            if (timeoutType == TimeoutType.HEARTBEAT) {
1✔
1439
              // Deal with timers which are never cancelled
1440
              long heartbeatTimeout =
1✔
1441
                  TimeUnit.SECONDS.toMillis(
1✔
1442
                      activity.getData().scheduledEvent.getHeartbeatTimeoutSeconds());
1✔
1443
              if (clock.getAsLong() - activity.getData().lastHeartbeatTime < heartbeatTimeout) {
1✔
1444
                throw new EntityNotExistsError("Not heartbeat timeout");
×
1445
              }
1446
            }
1447
            activity.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
1✔
1448
            if (isTerminalState(activity.getState())) {
1✔
1449
              activities.remove(activityId);
1✔
1450
              scheduleDecision(ctx);
1✔
1451
            } else {
1452
              addActivityRetryTimer(ctx, activity);
1✔
1453
            }
1454
          });
1✔
1455
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
1✔
1456
      // Expected as timers are not removed
1457
      unlockTimer = false;
1✔
1458
    } catch (Exception e) {
×
1459
      // Cannot fail to timer threads
1460
      log.error("Failure trying to timeout an activity", e);
×
1461
    } finally {
1462
      if (unlockTimer) {
1✔
1463
        selfAdvancingTimer.unlockTimeSkipping("timeoutActivity " + activityId);
1✔
1464
      }
1465
    }
1466
  }
1✔
1467

1468
  private void timeoutWorkflow() {
1469
    lock.lock();
1✔
1470
    try {
1471
      {
1472
        if (isTerminalState(workflow.getState())) {
1✔
1473
          return;
1✔
1474
        }
1475
      }
1476
    } finally {
1477
      lock.unlock();
1✔
1478
    }
1479
    try {
1480
      update(
1✔
1481
          ctx -> {
1482
            if (isTerminalState(workflow.getState())) {
1✔
1483
              return;
×
1484
            }
1485
            workflow.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
1486
            if (parent != null) {
1✔
1487
              ctx.lockTimer(); // unlocked by the parent
1✔
1488
            }
1489
            ForkJoinPool.commonPool().execute(() -> reportWorkflowTimeoutToParent(ctx));
1✔
1490
          });
1✔
1491
    } catch (BadRequestError
×
1492
        | InternalServiceError
1493
        | EntityNotExistsError
1494
        | WorkflowExecutionAlreadyCompletedError e) {
1495
      // Cannot fail to timer threads
1496
      log.error("Failure trying to timeout a workflow", e);
×
1497
    }
1✔
1498
  }
1✔
1499

1500
  private void reportWorkflowTimeoutToParent(RequestContext ctx) {
1501
    if (!parent.isPresent()) {
1✔
1502
      return;
1✔
1503
    }
1504
    try {
1505
      ChildWorkflowExecutionTimedOutEventAttributes a =
1✔
1506
          new ChildWorkflowExecutionTimedOutEventAttributes()
1507
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1508
              .setTimeoutType(TimeoutType.START_TO_CLOSE)
1✔
1509
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1510
              .setDomain(ctx.getDomain())
1✔
1511
              .setWorkflowExecution(ctx.getExecution());
1✔
1512
      parent.get().childWorkflowTimedOut(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1513
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1514
      // Parent might already close
1515
    } catch (BadRequestError | InternalServiceError e) {
×
1516
      log.error("Failure reporting child timing out", e);
×
1517
    }
1✔
1518
  }
1✔
1519

1520
  @Override
1521
  public void signal(SignalWorkflowExecutionRequest signalRequest)
1522
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1523
          BadRequestError {
1524
    update(
1✔
1525
        ctx -> {
1526
          addExecutionSignaledEvent(ctx, signalRequest);
1✔
1527
          scheduleDecision(ctx);
1✔
1528
        });
1✔
1529
  }
1✔
1530

1531
  @Override
1532
  public void signalFromWorkflow(SignalExternalWorkflowExecutionDecisionAttributes a)
1533
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1534
          BadRequestError {
1535
    update(
1✔
1536
        ctx -> {
1537
          addExecutionSignaledByExternalEvent(ctx, a);
1✔
1538
          scheduleDecision(ctx);
1✔
1539
        });
1✔
1540
  }
1✔
1541

1542
  @Override
1543
  public void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest)
1544
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1545
          BadRequestError {
1546
    update(
1✔
1547
        ctx -> {
1548
          workflow.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, cancelRequest, 0);
1✔
1549
          scheduleDecision(ctx);
1✔
1550
        });
1✔
1551
  }
1✔
1552

1553
  @Override
1554
  public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException {
1555
    QueryId queryId = new QueryId(executionId);
1✔
1556

1557
    Optional<WorkflowExecutionCloseStatus> optCloseStatus = getCloseStatus();
1✔
1558
    if (optCloseStatus.isPresent() && queryRequest.getQueryRejectCondition() != null) {
1✔
1559
      WorkflowExecutionCloseStatus closeStatus = optCloseStatus.get();
1✔
1560
      boolean rejectNotOpen =
1✔
1561
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_OPEN;
1✔
1562
      boolean rejectNotCompletedCleanly =
1✔
1563
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_COMPLETED_CLEANLY
1✔
1564
              && closeStatus != WorkflowExecutionCloseStatus.COMPLETED;
1565
      if (rejectNotOpen || rejectNotCompletedCleanly) {
1✔
1566
        return new QueryWorkflowResponse()
1✔
1567
            .setQueryRejected(new QueryRejected().setCloseStatus(closeStatus));
1✔
1568
      }
1569
    }
1570

1571
    CompletableFuture<QueryWorkflowResponse> result = new CompletableFuture<>();
1✔
1572
    queries.put(queryId.getQueryId(), result);
1✔
1573

1574
    if (queryRequest.getQueryConsistencyLevel() == QueryConsistencyLevel.STRONG) {
1✔
1575
      pendingQueries.put(queryId.getQueryId(), queryRequest.getQuery());
×
1576
    } else {
1577
      PollForDecisionTaskResponse task =
1✔
1578
          new PollForDecisionTaskResponse()
1579
              .setTaskToken(queryId.toBytes())
1✔
1580
              .setWorkflowExecution(executionId.getExecution())
1✔
1581
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1582
              .setQuery(queryRequest.getQuery())
1✔
1583
              .setWorkflowExecutionTaskList(startRequest.getTaskList());
1✔
1584
      TaskListId taskListId =
1✔
1585
          new TaskListId(
1586
              queryRequest.getDomain(),
1✔
1587
              stickyExecutionAttributes == null
1✔
1588
                  ? startRequest.getTaskList().getName()
1✔
1589
                  : stickyExecutionAttributes.getWorkerTaskList().getName());
1✔
1590
      queryRequests.put(queryId.getQueryId(), task);
1✔
1591
      store.sendQueryTask(executionId, taskListId, task);
1✔
1592
    }
1593

1594
    try {
1595
      return result.get();
1✔
1596
    } catch (InterruptedException e) {
×
1597
      return new QueryWorkflowResponse();
×
1598
    } catch (ExecutionException e) {
×
1599
      Throwable cause = e.getCause();
×
1600
      if (cause instanceof TException) {
×
1601
        throw (TException) cause;
×
1602
      }
1603
      throw new InternalServiceError(Throwables.getStackTraceAsString(cause));
×
1604
    }
1605
  }
1606

1607
  @Override
1608
  public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest)
1609
      throws EntityNotExistsError {
1610
    CompletableFuture<QueryWorkflowResponse> result = queries.get(queryId.getQueryId());
1✔
1611
    if (result == null) {
1✔
1612
      throw new EntityNotExistsError("Unknown query id: " + queryId.getQueryId());
×
1613
    }
1614
    if (completeRequest.getCompletedType() == QueryTaskCompletedType.COMPLETED) {
1✔
1615
      QueryWorkflowResponse response =
1✔
1616
          new QueryWorkflowResponse().setQueryResult(completeRequest.getQueryResult());
1✔
1617
      result.complete(response);
1✔
1618
    } else if (stickyExecutionAttributes != null) {
1✔
1619
      stickyExecutionAttributes = null;
×
1620
      PollForDecisionTaskResponse task = queryRequests.remove(queryId.getQueryId());
×
1621

1622
      TaskListId taskListId =
×
1623
          new TaskListId(startRequest.getDomain(), startRequest.getTaskList().getName());
×
1624
      store.sendQueryTask(executionId, taskListId, task);
×
1625
    } else {
×
1626
      QueryFailedError error = new QueryFailedError().setMessage(completeRequest.getErrorMessage());
×
1627
      result.completeExceptionally(error);
×
1628
    }
1629
  }
1✔
1630

1631
  private void addExecutionSignaledEvent(
1632
      RequestContext ctx, SignalWorkflowExecutionRequest signalRequest) {
1633
    WorkflowExecutionSignaledEventAttributes a =
1✔
1634
        new WorkflowExecutionSignaledEventAttributes()
1635
            .setInput(startRequest.getInput())
1✔
1636
            .setIdentity(signalRequest.getIdentity())
1✔
1637
            .setInput(signalRequest.getInput())
1✔
1638
            .setSignalName(signalRequest.getSignalName());
1✔
1639
    HistoryEvent executionSignaled =
1✔
1640
        new HistoryEvent()
1641
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1642
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1643
    ctx.addEvent(executionSignaled);
1✔
1644
  }
1✔
1645

1646
  private void addExecutionSignaledByExternalEvent(
1647
      RequestContext ctx, SignalExternalWorkflowExecutionDecisionAttributes d) {
1648
    WorkflowExecutionSignaledEventAttributes a =
1✔
1649
        new WorkflowExecutionSignaledEventAttributes()
1650
            .setInput(startRequest.getInput())
1✔
1651
            .setInput(d.getInput())
1✔
1652
            .setSignalName(d.getSignalName());
1✔
1653
    HistoryEvent executionSignaled =
1✔
1654
        new HistoryEvent()
1655
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1656
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1657
    ctx.addEvent(executionSignaled);
1✔
1658
  }
1✔
1659

1660
  private StateMachine<ActivityTaskData> getActivity(String activityId)
1661
      throws EntityNotExistsError {
1662
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
1663
    if (activity == null) {
1✔
1664
      throw new EntityNotExistsError("unknown activityId: " + activityId);
1✔
1665
    }
1666
    return activity;
1✔
1667
  }
1668

1669
  private StateMachine<ChildWorkflowData> getChildWorkflow(long initiatedEventId)
1670
      throws InternalServiceError {
1671
    StateMachine<ChildWorkflowData> child = childWorkflows.get(initiatedEventId);
1✔
1672
    if (child == null) {
1✔
1673
      throw new InternalServiceError("unknown initiatedEventId: " + initiatedEventId);
×
1674
    }
1675
    return child;
1✔
1676
  }
1677

1678
  static class QueryId {
1679

1680
    private final ExecutionId executionId;
1681
    private final String queryId;
1682

1683
    QueryId(ExecutionId executionId) {
1✔
1684
      this.executionId = Objects.requireNonNull(executionId);
1✔
1685
      this.queryId = UUID.randomUUID().toString();
1✔
1686
    }
1✔
1687

1688
    private QueryId(ExecutionId executionId, String queryId) {
1✔
1689
      this.executionId = Objects.requireNonNull(executionId);
1✔
1690
      this.queryId = queryId;
1✔
1691
    }
1✔
1692

1693
    public ExecutionId getExecutionId() {
1694
      return executionId;
1✔
1695
    }
1696

1697
    String getQueryId() {
1698
      return queryId;
1✔
1699
    }
1700

1701
    byte[] toBytes() throws InternalServiceError {
1702
      ByteArrayOutputStream bout = new ByteArrayOutputStream();
1✔
1703
      DataOutputStream out = new DataOutputStream(bout);
1✔
1704
      addBytes(out);
1✔
1705
      return bout.toByteArray();
1✔
1706
    }
1707

1708
    void addBytes(DataOutputStream out) throws InternalServiceError {
1709
      try {
1710
        executionId.addBytes(out);
1✔
1711
        out.writeUTF(queryId);
1✔
1712
      } catch (IOException e) {
×
1713
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1714
      }
1✔
1715
    }
1✔
1716

1717
    static QueryId fromBytes(byte[] serialized) throws InternalServiceError {
1718
      ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
1✔
1719
      DataInputStream in = new DataInputStream(bin);
1✔
1720
      try {
1721
        ExecutionId executionId = ExecutionId.readFromBytes(in);
1✔
1722
        String queryId = in.readUTF();
1✔
1723
        return new QueryId(executionId, queryId);
1✔
1724
      } catch (IOException e) {
×
1725
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1726
      }
1727
    }
1728
  }
1729
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc