• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1991

27 Sep 2023 06:06PM UTC coverage: 60.278% (+0.05%) from 60.226%
1991

push

buildkite

web-flow
fixed bug: Added alreadyStarted workflow case (#853)

If we get WorkflowExecutionAlreadyStartedError while starting workflow in new domain, then we don't require startWorkflowInNew domain again as well as not throw error. Hence, we will inform by giving status as "Workflow already started".

8 of 8 new or added lines in 2 files covered. (100.0%)

11345 of 18821 relevant lines covered (60.28%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.16
/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.testservice;
19

20
import com.cronutils.model.Cron;
21
import com.cronutils.model.CronType;
22
import com.cronutils.model.definition.CronDefinition;
23
import com.cronutils.model.definition.CronDefinitionBuilder;
24
import com.cronutils.model.time.ExecutionTime;
25
import com.cronutils.parser.CronParser;
26
import com.google.common.base.Strings;
27
import com.google.common.base.Throwables;
28
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
29
import com.uber.cadence.BadRequestError;
30
import com.uber.cadence.CancelTimerDecisionAttributes;
31
import com.uber.cadence.CancelTimerFailedEventAttributes;
32
import com.uber.cadence.CancelWorkflowExecutionDecisionAttributes;
33
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
34
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
35
import com.uber.cadence.ChildWorkflowExecutionFailedEventAttributes;
36
import com.uber.cadence.ChildWorkflowExecutionStartedEventAttributes;
37
import com.uber.cadence.ChildWorkflowExecutionTimedOutEventAttributes;
38
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
39
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
40
import com.uber.cadence.Decision;
41
import com.uber.cadence.DecisionTaskFailedCause;
42
import com.uber.cadence.EntityNotExistsError;
43
import com.uber.cadence.EventType;
44
import com.uber.cadence.FailWorkflowExecutionDecisionAttributes;
45
import com.uber.cadence.HistoryEvent;
46
import com.uber.cadence.InternalServiceError;
47
import com.uber.cadence.MarkerRecordedEventAttributes;
48
import com.uber.cadence.PollForActivityTaskRequest;
49
import com.uber.cadence.PollForActivityTaskResponse;
50
import com.uber.cadence.PollForDecisionTaskRequest;
51
import com.uber.cadence.PollForDecisionTaskResponse;
52
import com.uber.cadence.QueryConsistencyLevel;
53
import com.uber.cadence.QueryFailedError;
54
import com.uber.cadence.QueryRejectCondition;
55
import com.uber.cadence.QueryRejected;
56
import com.uber.cadence.QueryResultType;
57
import com.uber.cadence.QueryTaskCompletedType;
58
import com.uber.cadence.QueryWorkflowRequest;
59
import com.uber.cadence.QueryWorkflowResponse;
60
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
61
import com.uber.cadence.RecordMarkerDecisionAttributes;
62
import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes;
63
import com.uber.cadence.RequestCancelActivityTaskFailedEventAttributes;
64
import com.uber.cadence.RequestCancelExternalWorkflowExecutionDecisionAttributes;
65
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
66
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
67
import com.uber.cadence.RespondActivityTaskCanceledRequest;
68
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
69
import com.uber.cadence.RespondActivityTaskCompletedRequest;
70
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
71
import com.uber.cadence.RespondActivityTaskFailedRequest;
72
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
73
import com.uber.cadence.RespondDecisionTaskFailedRequest;
74
import com.uber.cadence.RespondQueryTaskCompletedRequest;
75
import com.uber.cadence.RetryPolicy;
76
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
77
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
78
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
79
import com.uber.cadence.SignalWorkflowExecutionRequest;
80
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
81
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
82
import com.uber.cadence.StartTimerDecisionAttributes;
83
import com.uber.cadence.StartWorkflowExecutionRequest;
84
import com.uber.cadence.StickyExecutionAttributes;
85
import com.uber.cadence.TimeoutType;
86
import com.uber.cadence.UpsertWorkflowSearchAttributesDecisionAttributes;
87
import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes;
88
import com.uber.cadence.WorkflowExecution;
89
import com.uber.cadence.WorkflowExecutionAlreadyCompletedError;
90
import com.uber.cadence.WorkflowExecutionCloseStatus;
91
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
92
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
93
import com.uber.cadence.WorkflowQuery;
94
import com.uber.cadence.WorkflowQueryResult;
95
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
96
import com.uber.cadence.internal.testservice.StateMachines.Action;
97
import com.uber.cadence.internal.testservice.StateMachines.ActivityTaskData;
98
import com.uber.cadence.internal.testservice.StateMachines.ChildWorkflowData;
99
import com.uber.cadence.internal.testservice.StateMachines.DecisionTaskData;
100
import com.uber.cadence.internal.testservice.StateMachines.SignalExternalData;
101
import com.uber.cadence.internal.testservice.StateMachines.State;
102
import com.uber.cadence.internal.testservice.StateMachines.TimerData;
103
import com.uber.cadence.internal.testservice.StateMachines.WorkflowData;
104
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
105
import java.io.ByteArrayInputStream;
106
import java.io.ByteArrayOutputStream;
107
import java.io.DataInputStream;
108
import java.io.DataOutputStream;
109
import java.io.IOException;
110
import java.time.Duration;
111
import java.time.Instant;
112
import java.time.ZoneOffset;
113
import java.time.ZonedDateTime;
114
import java.util.ArrayList;
115
import java.util.HashMap;
116
import java.util.List;
117
import java.util.Map;
118
import java.util.Objects;
119
import java.util.Optional;
120
import java.util.OptionalLong;
121
import java.util.UUID;
122
import java.util.concurrent.CompletableFuture;
123
import java.util.concurrent.ConcurrentHashMap;
124
import java.util.concurrent.ExecutionException;
125
import java.util.concurrent.ForkJoinPool;
126
import java.util.concurrent.TimeUnit;
127
import java.util.concurrent.locks.Lock;
128
import java.util.concurrent.locks.ReentrantLock;
129
import java.util.function.LongSupplier;
130
import org.apache.thrift.TException;
131
import org.slf4j.Logger;
132
import org.slf4j.LoggerFactory;
133

134
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
135

136
  @FunctionalInterface
137
  private interface UpdateProcedure {
138

139
    void apply(RequestContext ctx)
140
        throws InternalServiceError, BadRequestError, EntityNotExistsError;
141
  }
142

143
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowMutableStateImpl.class);
1✔
144

145
  private final Lock lock = new ReentrantLock();
1✔
146
  private final SelfAdvancingTimer selfAdvancingTimer;
147
  private final LongSupplier clock;
148
  private final ExecutionId executionId;
149
  private final Optional<TestWorkflowMutableState> parent;
150
  private final OptionalLong parentChildInitiatedEventId;
151
  private final TestWorkflowStore store;
152
  private final TestWorkflowService service;
153
  private final StartWorkflowExecutionRequest startRequest;
154
  private long nextEventId;
155
  private final List<RequestContext> concurrentToDecision = new ArrayList<>();
1✔
156
  private final Map<String, StateMachine<ActivityTaskData>> activities = new HashMap<>();
1✔
157
  private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
1✔
158
  private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
1✔
159
  private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
1✔
160
  private StateMachine<WorkflowData> workflow;
161
  private volatile StateMachine<DecisionTaskData> decision;
162
  private long lastNonFailedDecisionStartEventId;
163
  private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
1✔
164
      new ConcurrentHashMap<>();
165
  private final Map<String, PollForDecisionTaskResponse> queryRequests = new ConcurrentHashMap<>();
1✔
166
  private final Map<String, WorkflowQuery> pendingQueries = new ConcurrentHashMap<>();
1✔
167
  private final Optional<String> continuedExecutionRunId;
168
  public StickyExecutionAttributes stickyExecutionAttributes;
169

170
  /**
171
   * @param retryState present if workflow is a retry
172
   * @param backoffStartIntervalInSeconds
173
   * @param parentChildInitiatedEventId id of the child initiated event in the parent history
174
   */
175
  TestWorkflowMutableStateImpl(
176
      StartWorkflowExecutionRequest startRequest,
177
      Optional<RetryState> retryState,
178
      int backoffStartIntervalInSeconds,
179
      byte[] lastCompletionResult,
180
      Optional<TestWorkflowMutableState> parent,
181
      OptionalLong parentChildInitiatedEventId,
182
      Optional<String> continuedExecutionRunId,
183
      TestWorkflowService service,
184
      TestWorkflowStore store) {
1✔
185
    this.startRequest = startRequest;
1✔
186
    this.parent = parent;
1✔
187
    this.parentChildInitiatedEventId = parentChildInitiatedEventId;
1✔
188
    this.continuedExecutionRunId = continuedExecutionRunId;
1✔
189
    this.service = service;
1✔
190
    String runId = UUID.randomUUID().toString();
1✔
191
    this.executionId =
1✔
192
        new ExecutionId(startRequest.getDomain(), startRequest.getWorkflowId(), runId);
1✔
193
    this.store = store;
1✔
194
    selfAdvancingTimer = store.getTimer();
1✔
195
    this.clock = selfAdvancingTimer.getClock();
1✔
196
    WorkflowData data =
1✔
197
        new WorkflowData(
198
            retryState,
199
            backoffStartIntervalInSeconds,
200
            startRequest.getCronSchedule(),
1✔
201
            lastCompletionResult,
202
            runId, // Test service doesn't support reset. Thus originalRunId is always the same as
203
            // runId.
204
            continuedExecutionRunId);
205
    this.workflow = StateMachines.newWorkflowStateMachine(data);
1✔
206
  }
1✔
207

208
  private void update(UpdateProcedure updater)
209
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
210
          BadRequestError {
211
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
212
    update(false, updater, stackTraceElements[2].getMethodName());
1✔
213
  }
1✔
214

215
  private void completeDecisionUpdate(UpdateProcedure updater, StickyExecutionAttributes attributes)
216
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
217
          BadRequestError {
218
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
219
    stickyExecutionAttributes = attributes;
1✔
220
    update(true, updater, stackTraceElements[2].getMethodName());
1✔
221
  }
1✔
222

223
  private void update(boolean completeDecisionUpdate, UpdateProcedure updater, String caller)
224
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
225
          BadRequestError {
226
    String callerInfo = "Decision Update from " + caller;
1✔
227
    lock.lock();
1✔
228
    LockHandle lockHandle = selfAdvancingTimer.lockTimeSkipping(callerInfo);
1✔
229

230
    try {
231
      checkCompleted();
1✔
232
      boolean concurrentDecision =
1✔
233
          !completeDecisionUpdate
234
              && (decision != null && decision.getState() == StateMachines.State.STARTED);
1✔
235

236
      RequestContext ctx = new RequestContext(clock, this, nextEventId);
1✔
237
      updater.apply(ctx);
1✔
238
      setPendingQueries(ctx);
1✔
239
      if (concurrentDecision && workflow.getState() != State.TIMED_OUT) {
1✔
240
        concurrentToDecision.add(ctx);
1✔
241
        ctx.fireCallbacks(0);
1✔
242
        store.applyTimersAndLocks(ctx);
1✔
243
      } else {
244
        nextEventId = ctx.commitChanges(store);
1✔
245
      }
246
    } catch (InternalServiceError
1✔
247
        | EntityNotExistsError
248
        | WorkflowExecutionAlreadyCompletedError
249
        | BadRequestError e) {
250
      throw e;
1✔
251
    } catch (Exception e) {
×
252
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
253
    } finally {
254
      lockHandle.unlock();
1✔
255
      lock.unlock();
1✔
256
    }
257
  }
1✔
258

259
  private void setPendingQueries(RequestContext ctx) {
260
    TestWorkflowStore.DecisionTask decisionTask = ctx.getDecisionTask();
1✔
261
    if (decisionTask != null) {
1✔
262
      decisionTask.getTask().setQueries(new HashMap<>(pendingQueries));
1✔
263
    }
264
  }
1✔
265

266
  @Override
267
  public ExecutionId getExecutionId() {
268
    return executionId;
1✔
269
  }
270

271
  @Override
272
  public Optional<WorkflowExecutionCloseStatus> getCloseStatus() {
273
    switch (workflow.getState()) {
1✔
274
      case NONE:
275
      case INITIATED:
276
      case STARTED:
277
      case CANCELLATION_REQUESTED:
278
        return Optional.empty();
1✔
279
      case FAILED:
280
        return Optional.of(WorkflowExecutionCloseStatus.FAILED);
1✔
281
      case TIMED_OUT:
282
        return Optional.of(WorkflowExecutionCloseStatus.TIMED_OUT);
×
283
      case CANCELED:
284
        return Optional.of(WorkflowExecutionCloseStatus.CANCELED);
×
285
      case COMPLETED:
286
        return Optional.of(WorkflowExecutionCloseStatus.COMPLETED);
1✔
287
      case CONTINUED_AS_NEW:
288
        return Optional.of(WorkflowExecutionCloseStatus.CONTINUED_AS_NEW);
×
289
    }
290
    throw new IllegalStateException("unreachable");
×
291
  }
292

293
  @Override
294
  public StartWorkflowExecutionRequest getStartRequest() {
295
    return startRequest;
1✔
296
  }
297

298
  @Override
299
  public StickyExecutionAttributes getStickyExecutionAttributes() {
300
    return stickyExecutionAttributes;
1✔
301
  }
302

303
  @Override
304
  public Optional<TestWorkflowMutableState> getParent() {
305
    return parent;
1✔
306
  }
307

308
  @Override
309
  public void startDecisionTask(
310
      PollForDecisionTaskResponse task, PollForDecisionTaskRequest pollRequest)
311
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
312
          BadRequestError {
313
    if (task.getQuery() == null) {
1✔
314
      update(
1✔
315
          ctx -> {
316
            long scheduledEventId = decision.getData().scheduledEventId;
1✔
317
            decision.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
318
            ctx.addTimer(
1✔
319
                startRequest.getTaskStartToCloseTimeoutSeconds(),
1✔
320
                () -> timeoutDecisionTask(scheduledEventId),
1✔
321
                "DecisionTask StartToCloseTimeout");
322
          });
1✔
323
    }
324
  }
1✔
325

326
  @Override
327
  public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRequest request)
328
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
329
          BadRequestError {
330
    List<Decision> decisions = request.getDecisions();
1✔
331
    completeDecisionUpdate(
1✔
332
        ctx -> {
333
          if (request.getQueryResultsSize() > 0) {
1✔
334
            request
×
335
                .getQueryResults()
×
336
                .forEach(
×
337
                    (queryId, queryResult) -> {
338
                      completeQuery(queryId, queryResult);
×
339
                      pendingQueries.remove(queryId);
×
340
                    });
×
341
          }
342

343
          if (ctx.getInitialEventId() != historySize + 1) {
1✔
344
            throw new BadRequestError(
1✔
345
                "Expired decision: expectedHistorySize="
346
                    + historySize
347
                    + ","
348
                    + " actualHistorySize="
349
                    + ctx.getInitialEventId());
1✔
350
          }
351
          long decisionTaskCompletedId = ctx.getNextEventId() - 1;
1✔
352
          // Fail the decision if there are new events and the decision tries to complete the
353
          // workflow
354
          if (!concurrentToDecision.isEmpty() && hasCompleteDecision(request.getDecisions())) {
1✔
355
            RespondDecisionTaskFailedRequest failedRequest =
1✔
356
                new RespondDecisionTaskFailedRequest()
357
                    .setCause(DecisionTaskFailedCause.UNHANDLED_DECISION)
1✔
358
                    .setIdentity(request.getIdentity());
1✔
359
            decision.action(Action.FAIL, ctx, failedRequest, decisionTaskCompletedId);
1✔
360
            for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
361
              ctx.add(deferredCtx);
1✔
362
            }
1✔
363
            this.concurrentToDecision.clear();
1✔
364

365
            // Reset sticky execution attributes on failure
366
            stickyExecutionAttributes = null;
1✔
367
            scheduleDecision(ctx);
1✔
368
            return;
1✔
369
          }
370
          if (decision == null) {
1✔
371
            throw new EntityNotExistsError("No outstanding decision");
×
372
          }
373
          decision.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
374
          for (Decision d : decisions) {
1✔
375
            processDecision(ctx, d, request.getIdentity(), decisionTaskCompletedId);
1✔
376
          }
1✔
377
          for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
378
            ctx.add(deferredCtx);
1✔
379
          }
1✔
380
          lastNonFailedDecisionStartEventId = this.decision.getData().startedEventId;
1✔
381
          this.decision = null;
1✔
382
          boolean completed =
1✔
383
              workflow.getState() == StateMachines.State.COMPLETED
1✔
384
                  || workflow.getState() == StateMachines.State.FAILED
1✔
385
                  || workflow.getState() == StateMachines.State.CANCELED;
1✔
386
          if (!completed
1✔
387
              && ((ctx.isNeedDecision() || !this.concurrentToDecision.isEmpty())
1✔
388
                  || request.isForceCreateNewDecisionTask())) {
1✔
389
            scheduleDecision(ctx);
1✔
390
          }
391
          this.concurrentToDecision.clear();
1✔
392
          ctx.unlockTimer();
1✔
393
        },
1✔
394
        request.getStickyAttributes());
1✔
395
  }
1✔
396

397
  private void completeQuery(String queryId, WorkflowQueryResult queryResult) {
398
    CompletableFuture<QueryWorkflowResponse> future = queries.get(queryId);
×
399
    if (future == null) {
×
400
      throw new RuntimeException("Unknown query id: " + queryId);
×
401
    }
402
    if (queryResult.getResultType() == QueryResultType.ANSWERED) {
×
403
      future.complete(new QueryWorkflowResponse().setQueryResult(queryResult.getAnswer()));
×
404
    } else {
405
      future.completeExceptionally(
×
406
          new QueryFailedError().setMessage(queryResult.getErrorMessage()));
×
407
    }
408
  }
×
409

410
  private boolean hasCompleteDecision(List<Decision> decisions) {
411
    for (Decision d : decisions) {
1✔
412
      if (WorkflowExecutionUtils.isWorkflowExecutionCompleteDecision(d)) {
1✔
413
        return true;
1✔
414
      }
415
    }
1✔
416
    return false;
1✔
417
  }
418

419
  private void processDecision(
420
      RequestContext ctx, Decision d, String identity, long decisionTaskCompletedId)
421
      throws BadRequestError, InternalServiceError {
422
    switch (d.getDecisionType()) {
1✔
423
      case CompleteWorkflowExecution:
424
        processCompleteWorkflowExecution(
1✔
425
            ctx,
426
            d.getCompleteWorkflowExecutionDecisionAttributes(),
1✔
427
            decisionTaskCompletedId,
428
            identity);
429
        break;
1✔
430
      case FailWorkflowExecution:
431
        processFailWorkflowExecution(
1✔
432
            ctx, d.getFailWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId, identity);
1✔
433
        break;
1✔
434
      case CancelWorkflowExecution:
435
        processCancelWorkflowExecution(
1✔
436
            ctx, d.getCancelWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
437
        break;
1✔
438
      case ContinueAsNewWorkflowExecution:
439
        processContinueAsNewWorkflowExecution(
1✔
440
            ctx,
441
            d.getContinueAsNewWorkflowExecutionDecisionAttributes(),
1✔
442
            decisionTaskCompletedId,
443
            identity);
444
        break;
1✔
445
      case ScheduleActivityTask:
446
        processScheduleActivityTask(
1✔
447
            ctx, d.getScheduleActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
448
        break;
1✔
449
      case RequestCancelActivityTask:
450
        processRequestCancelActivityTask(
1✔
451
            ctx, d.getRequestCancelActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
452
        break;
1✔
453
      case StartTimer:
454
        processStartTimer(ctx, d.getStartTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
455
        break;
1✔
456
      case CancelTimer:
457
        processCancelTimer(ctx, d.getCancelTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
458
        break;
1✔
459
      case StartChildWorkflowExecution:
460
        processStartChildWorkflow(
1✔
461
            ctx, d.getStartChildWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
462
        break;
1✔
463
      case SignalExternalWorkflowExecution:
464
        processSignalExternalWorkflowExecution(
1✔
465
            ctx, d.getSignalExternalWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
466
        break;
1✔
467
      case RecordMarker:
468
        processRecordMarker(ctx, d.getRecordMarkerDecisionAttributes(), decisionTaskCompletedId);
1✔
469
        break;
1✔
470
      case RequestCancelExternalWorkflowExecution:
471
        processRequestCancelExternalWorkflowExecution(
1✔
472
            ctx, d.getRequestCancelExternalWorkflowExecutionDecisionAttributes());
1✔
473
        break;
1✔
474
      case UpsertWorkflowSearchAttributes:
475
        processUpsertWorkflowSearchAttributes(
1✔
476
            ctx, d.getUpsertWorkflowSearchAttributesDecisionAttributes(), decisionTaskCompletedId);
1✔
477
        break;
478
    }
479
  }
1✔
480

481
  private void processRequestCancelExternalWorkflowExecution(
482
      RequestContext ctx, RequestCancelExternalWorkflowExecutionDecisionAttributes attr) {
483
    ForkJoinPool.commonPool()
1✔
484
        .execute(
1✔
485
            () -> {
486
              RequestCancelWorkflowExecutionRequest request =
1✔
487
                  new RequestCancelWorkflowExecutionRequest();
488
              WorkflowExecution workflowExecution = new WorkflowExecution();
1✔
489
              workflowExecution.setWorkflowId(attr.workflowId);
1✔
490
              request.setWorkflowExecution(workflowExecution);
1✔
491
              request.setDomain(ctx.getDomain());
1✔
492
              try {
493
                service.RequestCancelWorkflowExecution(request);
1✔
494
              } catch (Exception e) {
×
495
                log.error("Failure to request cancel external workflow", e);
×
496
              }
1✔
497
            });
1✔
498
  }
1✔
499

500
  private void processRecordMarker(
501
      RequestContext ctx, RecordMarkerDecisionAttributes attr, long decisionTaskCompletedId)
502
      throws BadRequestError {
503
    if (!attr.isSetMarkerName()) {
1✔
504
      throw new BadRequestError("marker name is required");
×
505
    }
506

507
    MarkerRecordedEventAttributes marker =
1✔
508
        new MarkerRecordedEventAttributes()
509
            .setMarkerName(attr.getMarkerName())
1✔
510
            .setHeader(attr.getHeader())
1✔
511
            .setDetails(attr.getDetails())
1✔
512
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
513
    HistoryEvent event =
1✔
514
        new HistoryEvent()
515
            .setEventType(EventType.MarkerRecorded)
1✔
516
            .setMarkerRecordedEventAttributes(marker);
1✔
517
    ctx.addEvent(event);
1✔
518
  }
1✔
519

520
  private void processCancelTimer(
521
      RequestContext ctx, CancelTimerDecisionAttributes d, long decisionTaskCompletedId)
522
      throws InternalServiceError, BadRequestError {
523
    String timerId = d.getTimerId();
1✔
524
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
525
    if (timer == null) {
1✔
526
      CancelTimerFailedEventAttributes failedAttr =
×
527
          new CancelTimerFailedEventAttributes()
528
              .setTimerId(timerId)
×
529
              .setCause("TIMER_ID_UNKNOWN")
×
530
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
531
      HistoryEvent cancellationFailed =
×
532
          new HistoryEvent()
533
              .setEventType(EventType.CancelTimerFailed)
×
534
              .setCancelTimerFailedEventAttributes(failedAttr);
×
535
      ctx.addEvent(cancellationFailed);
×
536
      return;
×
537
    }
538
    timer.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
539
    timers.remove(timerId);
1✔
540
  }
1✔
541

542
  private void processRequestCancelActivityTask(
543
      RequestContext ctx,
544
      RequestCancelActivityTaskDecisionAttributes a,
545
      long decisionTaskCompletedId)
546
      throws InternalServiceError, BadRequestError {
547
    String activityId = a.getActivityId();
1✔
548
    StateMachine<?> activity = activities.get(activityId);
1✔
549
    if (activity == null) {
1✔
550
      RequestCancelActivityTaskFailedEventAttributes failedAttr =
×
551
          new RequestCancelActivityTaskFailedEventAttributes()
552
              .setActivityId(activityId)
×
553
              .setCause("ACTIVITY_ID_UNKNOWN")
×
554
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
555
      HistoryEvent cancellationFailed =
×
556
          new HistoryEvent()
557
              .setEventType(EventType.RequestCancelActivityTaskFailed)
×
558
              .setRequestCancelActivityTaskFailedEventAttributes(failedAttr);
×
559
      ctx.addEvent(cancellationFailed);
×
560
      return;
×
561
    }
562
    State beforeState = activity.getState();
1✔
563
    activity.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, a, decisionTaskCompletedId);
1✔
564
    if (beforeState == StateMachines.State.INITIATED) {
1✔
565
      activity.action(StateMachines.Action.CANCEL, ctx, null, 0);
×
566
      activities.remove(activityId);
×
567
      ctx.setNeedDecision(true);
×
568
    }
569
  }
1✔
570

571
  private void processScheduleActivityTask(
572
      RequestContext ctx, ScheduleActivityTaskDecisionAttributes a, long decisionTaskCompletedId)
573
      throws BadRequestError, InternalServiceError {
574
    validateScheduleActivityTask(a);
1✔
575
    String activityId = a.getActivityId();
1✔
576
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
577
    if (activity != null) {
1✔
578
      throw new BadRequestError("Already open activity with " + activityId);
×
579
    }
580
    activity = StateMachines.newActivityStateMachine(store, this.startRequest);
1✔
581
    activities.put(activityId, activity);
1✔
582
    activity.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
583
    ActivityTaskScheduledEventAttributes scheduledEvent = activity.getData().scheduledEvent;
1✔
584
    ctx.addTimer(
1✔
585
        scheduledEvent.getScheduleToCloseTimeoutSeconds(),
1✔
586
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_CLOSE),
1✔
587
        "Activity ScheduleToCloseTimeout");
588
    ctx.addTimer(
1✔
589
        scheduledEvent.getScheduleToStartTimeoutSeconds(),
1✔
590
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_START),
1✔
591
        "Activity ScheduleToStartTimeout");
592
    ctx.lockTimer();
1✔
593
  }
1✔
594

595
  private void validateScheduleActivityTask(ScheduleActivityTaskDecisionAttributes a)
596
      throws BadRequestError {
597
    if (a == null) {
1✔
598
      throw new BadRequestError("ScheduleActivityTaskDecisionAttributes is not set on decision.");
×
599
    }
600

601
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
602
      throw new BadRequestError("TaskList is not set on decision.");
×
603
    }
604
    if (a.getActivityId() == null || a.getActivityId().isEmpty()) {
1✔
605
      throw new BadRequestError("ActivityId is not set on decision.");
×
606
    }
607
    if (a.getActivityType() == null
1✔
608
        || a.getActivityType().getName() == null
1✔
609
        || a.getActivityType().getName().isEmpty()) {
1✔
610
      throw new BadRequestError("ActivityType is not set on decision.");
×
611
    }
612
    if (a.getStartToCloseTimeoutSeconds() <= 0) {
1✔
613
      throw new BadRequestError("A valid StartToCloseTimeoutSeconds is not set on decision.");
×
614
    }
615
    if (a.getScheduleToStartTimeoutSeconds() <= 0) {
1✔
616
      throw new BadRequestError("A valid ScheduleToStartTimeoutSeconds is not set on decision.");
×
617
    }
618
    if (a.getScheduleToCloseTimeoutSeconds() <= 0) {
1✔
619
      throw new BadRequestError("A valid ScheduleToCloseTimeoutSeconds is not set on decision.");
×
620
    }
621
    if (a.getHeartbeatTimeoutSeconds() < 0) {
1✔
622
      throw new BadRequestError("Ac valid HeartbeatTimeoutSeconds is not set on decision.");
×
623
    }
624
  }
1✔
625

626
  private void processStartChildWorkflow(
627
      RequestContext ctx,
628
      StartChildWorkflowExecutionDecisionAttributes a,
629
      long decisionTaskCompletedId)
630
      throws BadRequestError, InternalServiceError {
631
    validateStartChildExecutionAttributes(a);
1✔
632
    StateMachine<ChildWorkflowData> child = StateMachines.newChildWorkflowStateMachine(service);
1✔
633
    childWorkflows.put(ctx.getNextEventId(), child);
1✔
634
    child.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
635
    ctx.lockTimer();
1✔
636
  }
1✔
637

638
  /** Clone of the validateStartChildExecutionAttributes from historyEngine.go */
639
  private void validateStartChildExecutionAttributes(
640
      StartChildWorkflowExecutionDecisionAttributes a) throws BadRequestError {
641
    if (a == null) {
1✔
642
      throw new BadRequestError(
×
643
          "StartChildWorkflowExecutionDecisionAttributes is not set on decision.");
644
    }
645

646
    if (a.getWorkflowId().isEmpty()) {
1✔
647
      throw new BadRequestError("Required field WorkflowID is not set on decision.");
×
648
    }
649

650
    if (a.getWorkflowType() == null || a.getWorkflowType().getName().isEmpty()) {
1✔
651
      throw new BadRequestError("Required field WorkflowType is not set on decision.");
×
652
    }
653

654
    // Inherit tasklist from parent workflow execution if not provided on decision
655
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
656
      a.setTaskList(startRequest.getTaskList());
×
657
    }
658

659
    // Inherit workflow timeout from parent workflow execution if not provided on decision
660
    if (a.getExecutionStartToCloseTimeoutSeconds() <= 0) {
1✔
661
      a.setExecutionStartToCloseTimeoutSeconds(
×
662
          startRequest.getExecutionStartToCloseTimeoutSeconds());
×
663
    }
664

665
    // Inherit decision task timeout from parent workflow execution if not provided on decision
666
    if (a.getTaskStartToCloseTimeoutSeconds() <= 0) {
1✔
667
      a.setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds());
×
668
    }
669

670
    RetryPolicy retryPolicy = a.getRetryPolicy();
1✔
671
    if (retryPolicy != null) {
1✔
672
      RetryState.validateRetryPolicy(retryPolicy);
1✔
673
    }
674
  }
1✔
675

676
  private void processSignalExternalWorkflowExecution(
677
      RequestContext ctx,
678
      SignalExternalWorkflowExecutionDecisionAttributes a,
679
      long decisionTaskCompletedId)
680
      throws InternalServiceError, BadRequestError {
681
    String signalId = UUID.randomUUID().toString();
1✔
682
    StateMachine<SignalExternalData> signalStateMachine =
683
        StateMachines.newSignalExternalStateMachine();
1✔
684
    externalSignals.put(signalId, signalStateMachine);
1✔
685
    signalStateMachine.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
686
    ForkJoinPool.commonPool()
1✔
687
        .execute(
1✔
688
            () -> {
689
              try {
690
                service.signalExternalWorkflowExecution(signalId, a, this);
1✔
691
              } catch (Exception e) {
×
692
                log.error("Failure signalling an external workflow execution", e);
×
693
              }
1✔
694
            });
1✔
695
    ctx.lockTimer();
1✔
696
  }
1✔
697

698
  @Override
699
  public void completeSignalExternalWorkflowExecution(String signalId, String runId)
700
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
701
          BadRequestError {
702
    update(
1✔
703
        ctx -> {
704
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
705
          signal.action(Action.COMPLETE, ctx, runId, 0);
1✔
706
          scheduleDecision(ctx);
1✔
707
          ctx.unlockTimer();
1✔
708
        });
1✔
709
  }
1✔
710

711
  @Override
712
  public void failSignalExternalWorkflowExecution(
713
      String signalId, SignalExternalWorkflowExecutionFailedCause cause)
714
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
715
          BadRequestError {
716
    update(
1✔
717
        ctx -> {
718
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
719
          signal.action(Action.FAIL, ctx, cause, 0);
1✔
720
          scheduleDecision(ctx);
1✔
721
          ctx.unlockTimer();
1✔
722
        });
1✔
723
  }
1✔
724

725
  private StateMachine<SignalExternalData> getSignal(String signalId) throws EntityNotExistsError {
726
    StateMachine<SignalExternalData> signal = externalSignals.get(signalId);
1✔
727
    if (signal == null) {
1✔
728
      throw new EntityNotExistsError("unknown signalId: " + signalId);
×
729
    }
730
    return signal;
1✔
731
  }
732

733
  // TODO: insert a single decision failure into the history
734
  @Override
735
  public void failDecisionTask(RespondDecisionTaskFailedRequest request)
736
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
737
          BadRequestError {
738
    completeDecisionUpdate(
1✔
739
        ctx -> {
740
          decision.action(Action.FAIL, ctx, request, 0);
1✔
741
          scheduleDecision(ctx);
1✔
742
        },
1✔
743
        null); // reset sticky attributes to null
744
  }
1✔
745

746
  // TODO: insert a single decision timeout into the history
747
  private void timeoutDecisionTask(long scheduledEventId) {
748
    try {
749
      completeDecisionUpdate(
1✔
750
          ctx -> {
751
            if (decision == null
1✔
752
                || decision.getData().scheduledEventId != scheduledEventId
1✔
753
                || decision.getState() == State.COMPLETED) {
1✔
754
              // timeout for a previous decision
755
              return;
1✔
756
            }
757
            decision.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
758
            scheduleDecision(ctx);
1✔
759
          },
1✔
760
          null); // reset sticky attributes to null
761
    } catch (EntityNotExistsError e) {
×
762
      // Expected as timers are not removed
763
    } catch (Exception e) {
1✔
764
      // Cannot fail to timer threads
765
      log.error("Failure trying to timeout a decision scheduledEventId=" + scheduledEventId, e);
1✔
766
    }
1✔
767
  }
1✔
768

769
  @Override
770
  public void childWorkflowStarted(ChildWorkflowExecutionStartedEventAttributes a)
771
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
772
          BadRequestError {
773
    update(
1✔
774
        ctx -> {
775
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
776
          child.action(StateMachines.Action.START, ctx, a, 0);
1✔
777
          scheduleDecision(ctx);
1✔
778
          // No need to lock until completion as child workflow might skip
779
          // time as well
780
          ctx.unlockTimer();
1✔
781
        });
1✔
782
  }
1✔
783

784
  @Override
785
  public void childWorkflowFailed(String activityId, ChildWorkflowExecutionFailedEventAttributes a)
786
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
787
          BadRequestError {
788
    update(
1✔
789
        ctx -> {
790
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
791
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
792
          childWorkflows.remove(a.getInitiatedEventId());
1✔
793
          scheduleDecision(ctx);
1✔
794
          ctx.unlockTimer();
1✔
795
        });
1✔
796
  }
1✔
797

798
  @Override
799
  public void childWorkflowTimedOut(
800
      String activityId, ChildWorkflowExecutionTimedOutEventAttributes a)
801
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
802
          BadRequestError {
803
    update(
1✔
804
        ctx -> {
805
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
806
          child.action(Action.TIME_OUT, ctx, a.getTimeoutType(), 0);
1✔
807
          childWorkflows.remove(a.getInitiatedEventId());
1✔
808
          scheduleDecision(ctx);
1✔
809
          ctx.unlockTimer();
1✔
810
        });
1✔
811
  }
1✔
812

813
  @Override
814
  public void failStartChildWorkflow(
815
      String childId, StartChildWorkflowExecutionFailedEventAttributes a)
816
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
817
          BadRequestError {
818
    update(
1✔
819
        ctx -> {
820
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
821
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
822
          childWorkflows.remove(a.getInitiatedEventId());
1✔
823
          scheduleDecision(ctx);
1✔
824
          ctx.unlockTimer();
1✔
825
        });
1✔
826
  }
1✔
827

828
  @Override
829
  public void childWorkflowCompleted(
830
      String activityId, ChildWorkflowExecutionCompletedEventAttributes a)
831
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
832
          BadRequestError {
833
    update(
1✔
834
        ctx -> {
835
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
836
          child.action(StateMachines.Action.COMPLETE, ctx, a, 0);
1✔
837
          childWorkflows.remove(a.getInitiatedEventId());
1✔
838
          scheduleDecision(ctx);
1✔
839
          ctx.unlockTimer();
1✔
840
        });
1✔
841
  }
1✔
842

843
  @Override
844
  public void childWorkflowCanceled(
845
      String activityId, ChildWorkflowExecutionCanceledEventAttributes a)
846
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
847
          BadRequestError {
848
    update(
1✔
849
        ctx -> {
850
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
851
          child.action(StateMachines.Action.CANCEL, ctx, a, 0);
1✔
852
          childWorkflows.remove(a.getInitiatedEventId());
1✔
853
          scheduleDecision(ctx);
1✔
854
          ctx.unlockTimer();
1✔
855
        });
1✔
856
  }
1✔
857

858
  private void processStartTimer(
859
      RequestContext ctx, StartTimerDecisionAttributes a, long decisionTaskCompletedId)
860
      throws BadRequestError, InternalServiceError {
861
    String timerId = a.getTimerId();
1✔
862
    if (timerId == null) {
1✔
863
      throw new BadRequestError("A valid TimerId is not set on StartTimerDecision");
×
864
    }
865
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
866
    if (timer != null) {
1✔
867
      throw new BadRequestError("Already open timer with " + timerId);
×
868
    }
869
    timer = StateMachines.newTimerStateMachine();
1✔
870
    timers.put(timerId, timer);
1✔
871
    timer.action(StateMachines.Action.START, ctx, a, decisionTaskCompletedId);
1✔
872
    ctx.addTimer(a.getStartToFireTimeoutSeconds(), () -> fireTimer(timerId), "fire timer");
1✔
873
  }
1✔
874

875
  private void fireTimer(String timerId) {
876
    StateMachine<TimerData> timer;
877
    lock.lock();
1✔
878
    try {
879
      {
880
        timer = timers.get(timerId);
1✔
881
        if (timer == null || workflow.getState() != State.STARTED) {
1✔
882
          return; // cancelled already
1✔
883
        }
884
      }
885
    } finally {
886
      lock.unlock();
1✔
887
    }
888
    try {
889
      update(
1✔
890
          ctx -> {
891
            timer.action(StateMachines.Action.COMPLETE, ctx, null, 0);
1✔
892
            timers.remove(timerId);
1✔
893
            scheduleDecision(ctx);
1✔
894
          });
1✔
895
    } catch (BadRequestError
×
896
        | InternalServiceError
897
        | EntityNotExistsError
898
        | WorkflowExecutionAlreadyCompletedError e) {
899
      // Cannot fail to timer threads
900
      log.error("Failure firing a timer", e);
×
901
    }
1✔
902
  }
1✔
903

904
  private void processFailWorkflowExecution(
905
      RequestContext ctx,
906
      FailWorkflowExecutionDecisionAttributes d,
907
      long decisionTaskCompletedId,
908
      String identity)
909
      throws InternalServiceError, BadRequestError {
910
    WorkflowData data = workflow.getData();
1✔
911
    if (data.retryState.isPresent()) {
1✔
912
      RetryState rs = data.retryState.get();
1✔
913
      int backoffIntervalSeconds =
1✔
914
          rs.getBackoffIntervalInSeconds(d.getReason(), store.currentTimeMillis());
1✔
915
      if (backoffIntervalSeconds > 0) {
1✔
916
        ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
917
            new ContinueAsNewWorkflowExecutionDecisionAttributes()
918
                .setInput(startRequest.getInput())
1✔
919
                .setWorkflowType(startRequest.getWorkflowType())
1✔
920
                .setExecutionStartToCloseTimeoutSeconds(
1✔
921
                    startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
922
                .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
923
                .setTaskList(startRequest.getTaskList())
1✔
924
                .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
925
                .setRetryPolicy(startRequest.getRetryPolicy());
1✔
926
        workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
927
        HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
928
        WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
929
            event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
930

931
        Optional<RetryState> continuedRetryState = Optional.of(rs.getNextAttempt());
1✔
932
        String runId =
1✔
933
            service.continueAsNew(
1✔
934
                startRequest,
935
                continuedAsNewEventAttributes,
936
                continuedRetryState,
937
                identity,
938
                getExecutionId(),
1✔
939
                parent,
940
                parentChildInitiatedEventId);
941
        continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
942
        return;
1✔
943
      }
944
    }
945

946
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
947
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, data.lastCompletionResult);
1✔
948
      return;
1✔
949
    }
950

951
    workflow.action(StateMachines.Action.FAIL, ctx, d, decisionTaskCompletedId);
1✔
952
    if (parent.isPresent()) {
1✔
953
      ctx.lockTimer(); // unlocked by the parent
1✔
954
      ChildWorkflowExecutionFailedEventAttributes a =
1✔
955
          new ChildWorkflowExecutionFailedEventAttributes()
956
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
957
              .setDetails(d.getDetails())
1✔
958
              .setReason(d.getReason())
1✔
959
              .setWorkflowType(startRequest.getWorkflowType())
1✔
960
              .setDomain(ctx.getDomain())
1✔
961
              .setWorkflowExecution(ctx.getExecution());
1✔
962
      ForkJoinPool.commonPool()
1✔
963
          .execute(
1✔
964
              () -> {
965
                try {
966
                  parent
1✔
967
                      .get()
1✔
968
                      .childWorkflowFailed(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
969
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
970
                  // Parent might already close
971
                } catch (BadRequestError | InternalServiceError e) {
×
972
                  log.error("Failure reporting child completion", e);
×
973
                }
1✔
974
              });
1✔
975
    }
976
  }
1✔
977

978
  private void processCompleteWorkflowExecution(
979
      RequestContext ctx,
980
      CompleteWorkflowExecutionDecisionAttributes d,
981
      long decisionTaskCompletedId,
982
      String identity)
983
      throws InternalServiceError, BadRequestError {
984
    WorkflowData data = workflow.getData();
1✔
985
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
986
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, d.getResult());
1✔
987
      return;
1✔
988
    }
989

990
    workflow.action(StateMachines.Action.COMPLETE, ctx, d, decisionTaskCompletedId);
1✔
991
    if (parent.isPresent()) {
1✔
992
      ctx.lockTimer(); // unlocked by the parent
1✔
993
      ChildWorkflowExecutionCompletedEventAttributes a =
1✔
994
          new ChildWorkflowExecutionCompletedEventAttributes()
995
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
996
              .setResult(d.getResult())
1✔
997
              .setDomain(ctx.getDomain())
1✔
998
              .setWorkflowExecution(ctx.getExecution())
1✔
999
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1000
      ForkJoinPool.commonPool()
1✔
1001
          .execute(
1✔
1002
              () -> {
1003
                try {
1004
                  parent
1✔
1005
                      .get()
1✔
1006
                      .childWorkflowCompleted(
1✔
1007
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1008
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1009
                  // Parent might already close
1010
                } catch (BadRequestError | InternalServiceError e) {
×
1011
                  log.error("Failure reporting child completion", e);
×
1012
                }
1✔
1013
              });
1✔
1014
    }
1015
  }
1✔
1016

1017
  private void startNewCronRun(
1018
      RequestContext ctx,
1019
      long decisionTaskCompletedId,
1020
      String identity,
1021
      WorkflowData data,
1022
      byte[] lastCompletionResult)
1023
      throws InternalServiceError, BadRequestError {
1024
    CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX);
1✔
1025
    CronParser parser = new CronParser(cronDefinition);
1✔
1026
    Cron cron = parser.parse(data.cronSchedule);
1✔
1027

1028
    Instant i = Instant.ofEpochMilli(store.currentTimeMillis());
1✔
1029
    ZonedDateTime now = ZonedDateTime.ofInstant(i, ZoneOffset.UTC);
1✔
1030

1031
    ExecutionTime executionTime = ExecutionTime.forCron(cron);
1✔
1032
    Optional<Duration> backoff = executionTime.timeToNextExecution(now);
1✔
1033
    int backoffIntervalSeconds = (int) backoff.get().getSeconds();
1✔
1034

1035
    if (backoffIntervalSeconds == 0) {
1✔
1036
      backoff = executionTime.timeToNextExecution(now.plusSeconds(1));
1✔
1037
      backoffIntervalSeconds = (int) backoff.get().getSeconds() + 1;
1✔
1038
    }
1039

1040
    ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
1041
        new ContinueAsNewWorkflowExecutionDecisionAttributes()
1042
            .setInput(startRequest.getInput())
1✔
1043
            .setWorkflowType(startRequest.getWorkflowType())
1✔
1044
            .setExecutionStartToCloseTimeoutSeconds(
1✔
1045
                startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
1046
            .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
1047
            .setTaskList(startRequest.getTaskList())
1✔
1048
            .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
1049
            .setRetryPolicy(startRequest.getRetryPolicy())
1✔
1050
            .setLastCompletionResult(lastCompletionResult);
1✔
1051
    workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
1052
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1053
    WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
1054
        event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
1055

1056
    String runId =
1✔
1057
        service.continueAsNew(
1✔
1058
            startRequest,
1059
            continuedAsNewEventAttributes,
1060
            Optional.empty(),
1✔
1061
            identity,
1062
            getExecutionId(),
1✔
1063
            parent,
1064
            parentChildInitiatedEventId);
1065
    continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
1066
  }
1✔
1067

1068
  private void processCancelWorkflowExecution(
1069
      RequestContext ctx, CancelWorkflowExecutionDecisionAttributes d, long decisionTaskCompletedId)
1070
      throws InternalServiceError, BadRequestError {
1071
    workflow.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
1072
    if (parent.isPresent()) {
1✔
1073
      ctx.lockTimer(); // unlocked by the parent
1✔
1074
      ChildWorkflowExecutionCanceledEventAttributes a =
1✔
1075
          new ChildWorkflowExecutionCanceledEventAttributes()
1076
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1077
              .setDetails(d.getDetails())
1✔
1078
              .setDomain(ctx.getDomain())
1✔
1079
              .setWorkflowExecution(ctx.getExecution())
1✔
1080
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1081
      ForkJoinPool.commonPool()
1✔
1082
          .execute(
1✔
1083
              () -> {
1084
                try {
1085
                  parent
1✔
1086
                      .get()
1✔
1087
                      .childWorkflowCanceled(
1✔
1088
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1089
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1090
                  // Parent might already close
1091
                } catch (BadRequestError | InternalServiceError e) {
×
1092
                  log.error("Failure reporting child completion", e);
×
1093
                }
1✔
1094
              });
1✔
1095
    }
1096
  }
1✔
1097

1098
  private void processContinueAsNewWorkflowExecution(
1099
      RequestContext ctx,
1100
      ContinueAsNewWorkflowExecutionDecisionAttributes d,
1101
      long decisionTaskCompletedId,
1102
      String identity)
1103
      throws InternalServiceError, BadRequestError {
1104
    workflow.action(Action.CONTINUE_AS_NEW, ctx, d, decisionTaskCompletedId);
1✔
1105
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1106
    String runId =
1✔
1107
        service.continueAsNew(
1✔
1108
            startRequest,
1109
            event.getWorkflowExecutionContinuedAsNewEventAttributes(),
1✔
1110
            workflow.getData().retryState,
1✔
1111
            identity,
1112
            getExecutionId(),
1✔
1113
            parent,
1114
            parentChildInitiatedEventId);
1115
    event.getWorkflowExecutionContinuedAsNewEventAttributes().setNewExecutionRunId(runId);
1✔
1116
  }
1✔
1117

1118
  private void processUpsertWorkflowSearchAttributes(
1119
      RequestContext ctx,
1120
      UpsertWorkflowSearchAttributesDecisionAttributes attr,
1121
      long decisionTaskCompletedId)
1122
      throws BadRequestError, InternalServiceError {
1123
    UpsertWorkflowSearchAttributesEventAttributes upsertEventAttr =
1✔
1124
        new UpsertWorkflowSearchAttributesEventAttributes()
1125
            .setSearchAttributes(attr.getSearchAttributes())
1✔
1126
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
1127
    HistoryEvent event =
1✔
1128
        new HistoryEvent()
1129
            .setEventType(EventType.UpsertWorkflowSearchAttributes)
1✔
1130
            .setUpsertWorkflowSearchAttributesEventAttributes(upsertEventAttr);
1✔
1131
    ctx.addEvent(event);
1✔
1132
  }
1✔
1133

1134
  @Override
1135
  public void startWorkflow(
1136
      boolean continuedAsNew, Optional<SignalWorkflowExecutionRequest> signalWithStartSignal)
1137
      throws InternalServiceError, BadRequestError {
1138
    try {
1139
      update(
1✔
1140
          ctx -> {
1141
            workflow.action(StateMachines.Action.START, ctx, startRequest, 0);
1✔
1142
            if (signalWithStartSignal.isPresent()) {
1✔
1143
              addExecutionSignaledEvent(ctx, signalWithStartSignal.get());
1✔
1144
            }
1145
            int backoffStartIntervalInSeconds = workflow.getData().backoffStartIntervalInSeconds;
1✔
1146
            if (backoffStartIntervalInSeconds > 0) {
1✔
1147
              ctx.addTimer(
1✔
1148
                  backoffStartIntervalInSeconds,
1149
                  () -> {
1150
                    try {
1151
                      update(ctx1 -> scheduleDecision(ctx1));
1✔
1152
                    } catch (EntityNotExistsError e) {
×
1153
                      // Expected as timers are not removed
1154
                    } catch (Exception e) {
×
1155
                      // Cannot fail to timer threads
1156
                      log.error("Failure trying to add task for an delayed workflow retry", e);
×
1157
                    }
1✔
1158
                  },
1✔
1159
                  "delayedFirstDecision");
1160
            } else {
1161
              scheduleDecision(ctx);
1✔
1162
            }
1163

1164
            int executionTimeoutTimerDelay = startRequest.getExecutionStartToCloseTimeoutSeconds();
1✔
1165
            if (backoffStartIntervalInSeconds > 0) {
1✔
1166
              executionTimeoutTimerDelay =
1✔
1167
                  executionTimeoutTimerDelay + backoffStartIntervalInSeconds;
1168
            }
1169
            ctx.addTimer(
1✔
1170
                executionTimeoutTimerDelay, this::timeoutWorkflow, "workflow execution timeout");
1171
          });
1✔
1172
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1173
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1174
    }
1✔
1175
    if (!continuedAsNew && parent.isPresent()) {
1✔
1176
      ChildWorkflowExecutionStartedEventAttributes a =
1✔
1177
          new ChildWorkflowExecutionStartedEventAttributes()
1178
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1179
              .setWorkflowExecution(getExecutionId().getExecution())
1✔
1180
              .setDomain(getExecutionId().getDomain())
1✔
1181
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1182
      ForkJoinPool.commonPool()
1✔
1183
          .execute(
1✔
1184
              () -> {
1185
                try {
1186
                  parent.get().childWorkflowStarted(a);
1✔
1187
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1188
                  // Not a problem. Parent might just close by now.
1189
                } catch (BadRequestError | InternalServiceError e) {
×
1190
                  log.error("Failure reporting child completion", e);
×
1191
                }
1✔
1192
              });
1✔
1193
    }
1194
  }
1✔
1195

1196
  private void scheduleDecision(RequestContext ctx) throws InternalServiceError, BadRequestError {
1197
    if (decision != null) {
1✔
1198
      if (decision.getState() == StateMachines.State.INITIATED) {
1✔
1199
        return; // No need to schedule again
1✔
1200
      }
1201
      if (decision.getState() == StateMachines.State.STARTED) {
1✔
1202
        ctx.setNeedDecision(true);
1✔
1203
        return;
1✔
1204
      }
1205
      if (decision.getState() == StateMachines.State.FAILED
1✔
1206
          || decision.getState() == StateMachines.State.COMPLETED
1✔
1207
          || decision.getState() == State.TIMED_OUT) {
1✔
1208
        decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1209
        ctx.lockTimer();
1✔
1210
        return;
1✔
1211
      }
1212
      throw new InternalServiceError("unexpected decision state: " + decision.getState());
×
1213
    }
1214
    this.decision = StateMachines.newDecisionStateMachine(lastNonFailedDecisionStartEventId, store);
1✔
1215
    decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1216
    ctx.lockTimer();
1✔
1217
  }
1✔
1218

1219
  @Override
1220
  public void startActivityTask(
1221
      PollForActivityTaskResponse task, PollForActivityTaskRequest pollRequest)
1222
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1223
          BadRequestError {
1224
    update(
1✔
1225
        ctx -> {
1226
          String activityId = task.getActivityId();
1✔
1227
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1228
          activity.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
1229
          ActivityTaskData data = activity.getData();
1✔
1230
          int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1231
          int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1232
          if (startToCloseTimeout > 0) {
1✔
1233
            ctx.addTimer(
1✔
1234
                startToCloseTimeout,
1235
                () -> timeoutActivity(activityId, TimeoutType.START_TO_CLOSE),
1✔
1236
                "Activity StartToCloseTimeout");
1237
          }
1238
          updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1239
        });
1✔
1240
  }
1✔
1241

1242
  private void checkCompleted()
1243
      throws EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1244
    State workflowState = workflow.getState();
1✔
1245
    if (isTerminalState(workflowState)) {
1✔
1246
      throw new WorkflowExecutionAlreadyCompletedError(
1✔
1247
          "Workflow is already completed: " + workflowState);
1248
    }
1249
  }
1✔
1250

1251
  private boolean isTerminalState(State workflowState) {
1252
    return workflowState == State.COMPLETED
1✔
1253
        || workflowState == State.TIMED_OUT
1254
        || workflowState == State.FAILED
1255
        || workflowState == State.CANCELED
1256
        || workflowState == State.CONTINUED_AS_NEW;
1257
  }
1258

1259
  private void updateHeartbeatTimer(
1260
      RequestContext ctx,
1261
      String activityId,
1262
      StateMachine<ActivityTaskData> activity,
1263
      int startToCloseTimeout,
1264
      int heartbeatTimeout) {
1265
    if (heartbeatTimeout > 0 && heartbeatTimeout < startToCloseTimeout) {
1✔
1266
      activity.getData().lastHeartbeatTime = clock.getAsLong();
1✔
1267
      ctx.addTimer(
1✔
1268
          heartbeatTimeout,
1269
          () -> timeoutActivity(activityId, TimeoutType.HEARTBEAT),
1✔
1270
          "Activity Heartbeat Timeout");
1271
    }
1272
  }
1✔
1273

1274
  @Override
1275
  public void completeActivityTask(String activityId, RespondActivityTaskCompletedRequest request)
1276
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1277
          BadRequestError {
1278
    update(
1✔
1279
        ctx -> {
1280
          StateMachine<?> activity = getActivity(activityId);
1✔
1281
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1282
          activities.remove(activityId);
1✔
1283
          scheduleDecision(ctx);
1✔
1284
          ctx.unlockTimer();
1✔
1285
        });
1✔
1286
  }
1✔
1287

1288
  @Override
1289
  public void completeActivityTaskById(
1290
      String activityId, RespondActivityTaskCompletedByIDRequest request)
1291
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1292
          BadRequestError {
1293
    update(
1✔
1294
        ctx -> {
1295
          StateMachine<?> activity = getActivity(activityId);
1✔
1296
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1297
          activities.remove(activityId);
1✔
1298
          scheduleDecision(ctx);
1✔
1299
          ctx.unlockTimer();
1✔
1300
        });
1✔
1301
  }
1✔
1302

1303
  @Override
1304
  public void failActivityTask(String activityId, RespondActivityTaskFailedRequest request)
1305
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1306
          BadRequestError {
1307
    update(
1✔
1308
        ctx -> {
1309
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1310
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
1✔
1311
          if (isTerminalState(activity.getState())) {
1✔
1312
            activities.remove(activityId);
1✔
1313
            scheduleDecision(ctx);
1✔
1314
          } else {
1315
            addActivityRetryTimer(ctx, activity);
1✔
1316
          }
1317
          // Allow time skipping when waiting for retry
1318
          ctx.unlockTimer();
1✔
1319
        });
1✔
1320
  }
1✔
1321

1322
  private void addActivityRetryTimer(RequestContext ctx, StateMachine<ActivityTaskData> activity) {
1323
    ActivityTaskData data = activity.getData();
1✔
1324
    int attempt = data.retryState.getAttempt();
1✔
1325
    ctx.addTimer(
1✔
1326
        data.nextBackoffIntervalSeconds,
1327
        () -> {
1328
          // Timers are not removed, so skip if it is not for this attempt.
1329
          if (activity.getState() != State.INITIATED && data.retryState.getAttempt() != attempt) {
1✔
1330
            return;
×
1331
          }
1332
          selfAdvancingTimer.lockTimeSkipping(
1✔
1333
              "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
1✔
1334
          boolean unlockTimer = false;
1✔
1335
          try {
1336
            update(ctx1 -> ctx1.addActivityTask(data.activityTask));
1✔
1337
          } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1338
            unlockTimer = true;
×
1339
            // Expected as timers are not removed
1340
          } catch (Exception e) {
×
1341
            unlockTimer = true;
×
1342
            // Cannot fail to timer threads
1343
            log.error("Failure trying to add task for an activity retry", e);
×
1344
          } finally {
1345
            if (unlockTimer) {
1✔
1346
              // Allow time skipping when waiting for an activity retry
1347
              selfAdvancingTimer.unlockTimeSkipping(
×
1348
                  "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
×
1349
            }
1350
          }
1351
        },
1✔
1352
        "Activity Retry");
1353
  }
1✔
1354

1355
  @Override
1356
  public void failActivityTaskById(String activityId, RespondActivityTaskFailedByIDRequest request)
1357
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1358
          BadRequestError {
1359
    update(
×
1360
        ctx -> {
1361
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
×
1362
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
×
1363
          if (isTerminalState(activity.getState())) {
×
1364
            activities.remove(activityId);
×
1365
            scheduleDecision(ctx);
×
1366
          } else {
1367
            addActivityRetryTimer(ctx, activity);
×
1368
          }
1369
          ctx.unlockTimer();
×
1370
        });
×
1371
  }
×
1372

1373
  @Override
1374
  public void cancelActivityTask(String activityId, RespondActivityTaskCanceledRequest request)
1375
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1376
          BadRequestError {
1377
    update(
1✔
1378
        ctx -> {
1379
          StateMachine<?> activity = getActivity(activityId);
1✔
1380
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
1✔
1381
          activities.remove(activityId);
1✔
1382
          scheduleDecision(ctx);
1✔
1383
          ctx.unlockTimer();
1✔
1384
        });
1✔
1385
  }
1✔
1386

1387
  @Override
1388
  public void cancelActivityTaskById(
1389
      String activityId, RespondActivityTaskCanceledByIDRequest request)
1390
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1391
          BadRequestError {
1392
    update(
×
1393
        ctx -> {
1394
          StateMachine<?> activity = getActivity(activityId);
×
1395
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
×
1396
          activities.remove(activityId);
×
1397
          scheduleDecision(ctx);
×
1398
          ctx.unlockTimer();
×
1399
        });
×
1400
  }
×
1401

1402
  @Override
1403
  public RecordActivityTaskHeartbeatResponse heartbeatActivityTask(
1404
      String activityId, byte[] details)
1405
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1406
    RecordActivityTaskHeartbeatResponse result = new RecordActivityTaskHeartbeatResponse();
1✔
1407
    try {
1408
      update(
1✔
1409
          ctx -> {
1410
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1411
            activity.action(StateMachines.Action.UPDATE, ctx, details, 0);
1✔
1412
            if (activity.getState() == StateMachines.State.CANCELLATION_REQUESTED) {
1✔
1413
              result.setCancelRequested(true);
1✔
1414
            }
1415
            ActivityTaskData data = activity.getData();
1✔
1416
            data.lastHeartbeatTime = clock.getAsLong();
1✔
1417
            int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1418
            int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1419
            updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1420
          });
1✔
1421

1422
    } catch (InternalServiceError
×
1423
        | EntityNotExistsError
1424
        | WorkflowExecutionAlreadyCompletedError e) {
1425
      throw e;
×
1426
    } catch (Exception e) {
×
1427
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1428
    }
1✔
1429
    return result;
1✔
1430
  }
1431

1432
  private void timeoutActivity(String activityId, TimeoutType timeoutType) {
1433
    boolean unlockTimer = true;
1✔
1434
    try {
1435
      update(
1✔
1436
          ctx -> {
1437
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1438
            if (timeoutType == TimeoutType.SCHEDULE_TO_START
1✔
1439
                && activity.getState() != StateMachines.State.INITIATED) {
1✔
1440
              throw new EntityNotExistsError("Not in INITIATED");
×
1441
            }
1442
            if (timeoutType == TimeoutType.HEARTBEAT) {
1✔
1443
              // Deal with timers which are never cancelled
1444
              long heartbeatTimeout =
1✔
1445
                  TimeUnit.SECONDS.toMillis(
1✔
1446
                      activity.getData().scheduledEvent.getHeartbeatTimeoutSeconds());
1✔
1447
              if (clock.getAsLong() - activity.getData().lastHeartbeatTime < heartbeatTimeout) {
1✔
1448
                throw new EntityNotExistsError("Not heartbeat timeout");
×
1449
              }
1450
            }
1451
            activity.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
1✔
1452
            if (isTerminalState(activity.getState())) {
1✔
1453
              activities.remove(activityId);
1✔
1454
              scheduleDecision(ctx);
1✔
1455
            } else {
1456
              addActivityRetryTimer(ctx, activity);
1✔
1457
            }
1458
          });
1✔
1459
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
1✔
1460
      // Expected as timers are not removed
1461
      unlockTimer = false;
1✔
1462
    } catch (Exception e) {
×
1463
      // Cannot fail to timer threads
1464
      log.error("Failure trying to timeout an activity", e);
×
1465
    } finally {
1466
      if (unlockTimer) {
1✔
1467
        selfAdvancingTimer.unlockTimeSkipping("timeoutActivity " + activityId);
1✔
1468
      }
1469
    }
1470
  }
1✔
1471

1472
  private void timeoutWorkflow() {
1473
    lock.lock();
1✔
1474
    try {
1475
      {
1476
        if (isTerminalState(workflow.getState())) {
1✔
1477
          return;
1✔
1478
        }
1479
      }
1480
    } finally {
1481
      lock.unlock();
1✔
1482
    }
1483
    try {
1484
      update(
1✔
1485
          ctx -> {
1486
            if (isTerminalState(workflow.getState())) {
1✔
1487
              return;
×
1488
            }
1489
            workflow.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
1490
            if (parent != null) {
1✔
1491
              ctx.lockTimer(); // unlocked by the parent
1✔
1492
            }
1493
            ForkJoinPool.commonPool().execute(() -> reportWorkflowTimeoutToParent(ctx));
1✔
1494
          });
1✔
1495
    } catch (BadRequestError
×
1496
        | InternalServiceError
1497
        | EntityNotExistsError
1498
        | WorkflowExecutionAlreadyCompletedError e) {
1499
      // Cannot fail to timer threads
1500
      log.error("Failure trying to timeout a workflow", e);
×
1501
    }
1✔
1502
  }
1✔
1503

1504
  private void reportWorkflowTimeoutToParent(RequestContext ctx) {
1505
    if (!parent.isPresent()) {
1✔
1506
      return;
1✔
1507
    }
1508
    try {
1509
      ChildWorkflowExecutionTimedOutEventAttributes a =
1✔
1510
          new ChildWorkflowExecutionTimedOutEventAttributes()
1511
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1512
              .setTimeoutType(TimeoutType.START_TO_CLOSE)
1✔
1513
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1514
              .setDomain(ctx.getDomain())
1✔
1515
              .setWorkflowExecution(ctx.getExecution());
1✔
1516
      parent.get().childWorkflowTimedOut(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1517
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1518
      // Parent might already close
1519
    } catch (BadRequestError | InternalServiceError e) {
×
1520
      log.error("Failure reporting child timing out", e);
×
1521
    }
1✔
1522
  }
1✔
1523

1524
  @Override
1525
  public void signal(SignalWorkflowExecutionRequest signalRequest)
1526
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1527
          BadRequestError {
1528
    update(
1✔
1529
        ctx -> {
1530
          addExecutionSignaledEvent(ctx, signalRequest);
1✔
1531
          scheduleDecision(ctx);
1✔
1532
        });
1✔
1533
  }
1✔
1534

1535
  @Override
1536
  public void signalFromWorkflow(SignalExternalWorkflowExecutionDecisionAttributes a)
1537
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1538
          BadRequestError {
1539
    update(
1✔
1540
        ctx -> {
1541
          addExecutionSignaledByExternalEvent(ctx, a);
1✔
1542
          scheduleDecision(ctx);
1✔
1543
        });
1✔
1544
  }
1✔
1545

1546
  @Override
1547
  public void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest)
1548
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1549
          BadRequestError {
1550
    update(
1✔
1551
        ctx -> {
1552
          workflow.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, cancelRequest, 0);
1✔
1553
          scheduleDecision(ctx);
1✔
1554
        });
1✔
1555
  }
1✔
1556

1557
  @Override
1558
  public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException {
1559
    QueryId queryId = new QueryId(executionId);
1✔
1560

1561
    Optional<WorkflowExecutionCloseStatus> optCloseStatus = getCloseStatus();
1✔
1562
    if (optCloseStatus.isPresent() && queryRequest.getQueryRejectCondition() != null) {
1✔
1563
      WorkflowExecutionCloseStatus closeStatus = optCloseStatus.get();
1✔
1564
      boolean rejectNotOpen =
1✔
1565
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_OPEN;
1✔
1566
      boolean rejectNotCompletedCleanly =
1✔
1567
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_COMPLETED_CLEANLY
1✔
1568
              && closeStatus != WorkflowExecutionCloseStatus.COMPLETED;
1569
      if (rejectNotOpen || rejectNotCompletedCleanly) {
1✔
1570
        return new QueryWorkflowResponse()
1✔
1571
            .setQueryRejected(new QueryRejected().setCloseStatus(closeStatus));
1✔
1572
      }
1573
    }
1574

1575
    CompletableFuture<QueryWorkflowResponse> result = new CompletableFuture<>();
1✔
1576
    queries.put(queryId.getQueryId(), result);
1✔
1577

1578
    if (queryRequest.getQueryConsistencyLevel() == QueryConsistencyLevel.STRONG) {
1✔
1579
      pendingQueries.put(queryId.getQueryId(), queryRequest.getQuery());
×
1580
    } else {
1581
      PollForDecisionTaskResponse task =
1✔
1582
          new PollForDecisionTaskResponse()
1583
              .setTaskToken(queryId.toBytes())
1✔
1584
              .setWorkflowExecution(executionId.getExecution())
1✔
1585
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1586
              .setQuery(queryRequest.getQuery())
1✔
1587
              .setWorkflowExecutionTaskList(startRequest.getTaskList());
1✔
1588
      TaskListId taskListId =
1✔
1589
          new TaskListId(
1590
              queryRequest.getDomain(),
1✔
1591
              stickyExecutionAttributes == null
1✔
1592
                  ? startRequest.getTaskList().getName()
1✔
1593
                  : stickyExecutionAttributes.getWorkerTaskList().getName());
1✔
1594
      queryRequests.put(queryId.getQueryId(), task);
1✔
1595
      store.sendQueryTask(executionId, taskListId, task);
1✔
1596
    }
1597

1598
    try {
1599
      return result.get();
1✔
1600
    } catch (InterruptedException e) {
×
1601
      return new QueryWorkflowResponse();
×
1602
    } catch (ExecutionException e) {
1✔
1603
      Throwable cause = e.getCause();
1✔
1604
      if (cause instanceof TException) {
1✔
1605
        throw (TException) cause;
1✔
1606
      }
1607
      throw new InternalServiceError(Throwables.getStackTraceAsString(cause));
×
1608
    }
1609
  }
1610

1611
  @Override
1612
  public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest)
1613
      throws EntityNotExistsError {
1614
    CompletableFuture<QueryWorkflowResponse> result = queries.get(queryId.getQueryId());
1✔
1615
    if (result == null) {
1✔
1616
      throw new EntityNotExistsError("Unknown query id: " + queryId.getQueryId());
×
1617
    }
1618
    if (completeRequest.getCompletedType() == QueryTaskCompletedType.COMPLETED) {
1✔
1619
      QueryWorkflowResponse response =
1✔
1620
          new QueryWorkflowResponse().setQueryResult(completeRequest.getQueryResult());
1✔
1621
      result.complete(response);
1✔
1622
    } else if (stickyExecutionAttributes != null) {
1✔
1623
      stickyExecutionAttributes = null;
×
1624
      PollForDecisionTaskResponse task = queryRequests.remove(queryId.getQueryId());
×
1625

1626
      TaskListId taskListId =
×
1627
          new TaskListId(startRequest.getDomain(), startRequest.getTaskList().getName());
×
1628
      store.sendQueryTask(executionId, taskListId, task);
×
1629
    } else {
×
1630
      QueryFailedError error = new QueryFailedError().setMessage(completeRequest.getErrorMessage());
1✔
1631
      result.completeExceptionally(error);
1✔
1632
    }
1633
  }
1✔
1634

1635
  private void addExecutionSignaledEvent(
1636
      RequestContext ctx, SignalWorkflowExecutionRequest signalRequest) {
1637
    WorkflowExecutionSignaledEventAttributes a =
1✔
1638
        new WorkflowExecutionSignaledEventAttributes()
1639
            .setInput(startRequest.getInput())
1✔
1640
            .setIdentity(signalRequest.getIdentity())
1✔
1641
            .setInput(signalRequest.getInput())
1✔
1642
            .setSignalName(signalRequest.getSignalName());
1✔
1643
    HistoryEvent executionSignaled =
1✔
1644
        new HistoryEvent()
1645
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1646
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1647
    ctx.addEvent(executionSignaled);
1✔
1648
  }
1✔
1649

1650
  private void addExecutionSignaledByExternalEvent(
1651
      RequestContext ctx, SignalExternalWorkflowExecutionDecisionAttributes d) {
1652
    WorkflowExecutionSignaledEventAttributes a =
1✔
1653
        new WorkflowExecutionSignaledEventAttributes()
1654
            .setInput(startRequest.getInput())
1✔
1655
            .setInput(d.getInput())
1✔
1656
            .setSignalName(d.getSignalName());
1✔
1657
    HistoryEvent executionSignaled =
1✔
1658
        new HistoryEvent()
1659
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1660
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1661
    ctx.addEvent(executionSignaled);
1✔
1662
  }
1✔
1663

1664
  private StateMachine<ActivityTaskData> getActivity(String activityId)
1665
      throws EntityNotExistsError {
1666
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
1667
    if (activity == null) {
1✔
1668
      throw new EntityNotExistsError("unknown activityId: " + activityId);
1✔
1669
    }
1670
    return activity;
1✔
1671
  }
1672

1673
  private StateMachine<ChildWorkflowData> getChildWorkflow(long initiatedEventId)
1674
      throws InternalServiceError {
1675
    StateMachine<ChildWorkflowData> child = childWorkflows.get(initiatedEventId);
1✔
1676
    if (child == null) {
1✔
1677
      throw new InternalServiceError("unknown initiatedEventId: " + initiatedEventId);
×
1678
    }
1679
    return child;
1✔
1680
  }
1681

1682
  static class QueryId {
1683

1684
    private final ExecutionId executionId;
1685
    private final String queryId;
1686

1687
    QueryId(ExecutionId executionId) {
1✔
1688
      this.executionId = Objects.requireNonNull(executionId);
1✔
1689
      this.queryId = UUID.randomUUID().toString();
1✔
1690
    }
1✔
1691

1692
    private QueryId(ExecutionId executionId, String queryId) {
1✔
1693
      this.executionId = Objects.requireNonNull(executionId);
1✔
1694
      this.queryId = queryId;
1✔
1695
    }
1✔
1696

1697
    public ExecutionId getExecutionId() {
1698
      return executionId;
1✔
1699
    }
1700

1701
    String getQueryId() {
1702
      return queryId;
1✔
1703
    }
1704

1705
    byte[] toBytes() throws InternalServiceError {
1706
      ByteArrayOutputStream bout = new ByteArrayOutputStream();
1✔
1707
      DataOutputStream out = new DataOutputStream(bout);
1✔
1708
      addBytes(out);
1✔
1709
      return bout.toByteArray();
1✔
1710
    }
1711

1712
    void addBytes(DataOutputStream out) throws InternalServiceError {
1713
      try {
1714
        executionId.addBytes(out);
1✔
1715
        out.writeUTF(queryId);
1✔
1716
      } catch (IOException e) {
×
1717
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1718
      }
1✔
1719
    }
1✔
1720

1721
    static QueryId fromBytes(byte[] serialized) throws InternalServiceError {
1722
      ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
1✔
1723
      DataInputStream in = new DataInputStream(bin);
1✔
1724
      try {
1725
        ExecutionId executionId = ExecutionId.readFromBytes(in);
1✔
1726
        String queryId = in.readUTF();
1✔
1727
        return new QueryId(executionId, queryId);
1✔
1728
      } catch (IOException e) {
×
1729
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1730
      }
1731
    }
1732
  }
1733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc