• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2458

13 Aug 2024 05:39PM CUT coverage: 62.021% (+0.01%) from 62.011%
2458

push

buildkite

web-flow
Removing fossa as it is migrated to snyk (#919)

12096 of 19503 relevant lines covered (62.02%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.5
/src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.testservice;
19

20
import com.cronutils.model.Cron;
21
import com.cronutils.model.CronType;
22
import com.cronutils.model.definition.CronDefinition;
23
import com.cronutils.model.definition.CronDefinitionBuilder;
24
import com.cronutils.model.time.ExecutionTime;
25
import com.cronutils.parser.CronParser;
26
import com.google.common.base.Strings;
27
import com.google.common.base.Throwables;
28
import com.uber.cadence.ActivityTaskScheduledEventAttributes;
29
import com.uber.cadence.BadRequestError;
30
import com.uber.cadence.CancelTimerDecisionAttributes;
31
import com.uber.cadence.CancelTimerFailedEventAttributes;
32
import com.uber.cadence.CancelWorkflowExecutionDecisionAttributes;
33
import com.uber.cadence.ChildWorkflowExecutionCanceledEventAttributes;
34
import com.uber.cadence.ChildWorkflowExecutionCompletedEventAttributes;
35
import com.uber.cadence.ChildWorkflowExecutionFailedEventAttributes;
36
import com.uber.cadence.ChildWorkflowExecutionStartedEventAttributes;
37
import com.uber.cadence.ChildWorkflowExecutionTimedOutEventAttributes;
38
import com.uber.cadence.CompleteWorkflowExecutionDecisionAttributes;
39
import com.uber.cadence.ContinueAsNewWorkflowExecutionDecisionAttributes;
40
import com.uber.cadence.Decision;
41
import com.uber.cadence.DecisionTaskFailedCause;
42
import com.uber.cadence.EntityNotExistsError;
43
import com.uber.cadence.EventType;
44
import com.uber.cadence.FailWorkflowExecutionDecisionAttributes;
45
import com.uber.cadence.HistoryEvent;
46
import com.uber.cadence.InternalServiceError;
47
import com.uber.cadence.MarkerRecordedEventAttributes;
48
import com.uber.cadence.PollForActivityTaskRequest;
49
import com.uber.cadence.PollForActivityTaskResponse;
50
import com.uber.cadence.PollForDecisionTaskRequest;
51
import com.uber.cadence.PollForDecisionTaskResponse;
52
import com.uber.cadence.QueryConsistencyLevel;
53
import com.uber.cadence.QueryFailedError;
54
import com.uber.cadence.QueryRejectCondition;
55
import com.uber.cadence.QueryRejected;
56
import com.uber.cadence.QueryResultType;
57
import com.uber.cadence.QueryTaskCompletedType;
58
import com.uber.cadence.QueryWorkflowRequest;
59
import com.uber.cadence.QueryWorkflowResponse;
60
import com.uber.cadence.RecordActivityTaskHeartbeatResponse;
61
import com.uber.cadence.RecordMarkerDecisionAttributes;
62
import com.uber.cadence.RequestCancelActivityTaskDecisionAttributes;
63
import com.uber.cadence.RequestCancelActivityTaskFailedEventAttributes;
64
import com.uber.cadence.RequestCancelExternalWorkflowExecutionDecisionAttributes;
65
import com.uber.cadence.RequestCancelWorkflowExecutionRequest;
66
import com.uber.cadence.RespondActivityTaskCanceledByIDRequest;
67
import com.uber.cadence.RespondActivityTaskCanceledRequest;
68
import com.uber.cadence.RespondActivityTaskCompletedByIDRequest;
69
import com.uber.cadence.RespondActivityTaskCompletedRequest;
70
import com.uber.cadence.RespondActivityTaskFailedByIDRequest;
71
import com.uber.cadence.RespondActivityTaskFailedRequest;
72
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
73
import com.uber.cadence.RespondDecisionTaskFailedRequest;
74
import com.uber.cadence.RespondQueryTaskCompletedRequest;
75
import com.uber.cadence.RetryPolicy;
76
import com.uber.cadence.ScheduleActivityTaskDecisionAttributes;
77
import com.uber.cadence.SignalExternalWorkflowExecutionDecisionAttributes;
78
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
79
import com.uber.cadence.SignalWorkflowExecutionRequest;
80
import com.uber.cadence.StartChildWorkflowExecutionDecisionAttributes;
81
import com.uber.cadence.StartChildWorkflowExecutionFailedEventAttributes;
82
import com.uber.cadence.StartTimerDecisionAttributes;
83
import com.uber.cadence.StartWorkflowExecutionRequest;
84
import com.uber.cadence.StickyExecutionAttributes;
85
import com.uber.cadence.TimeoutType;
86
import com.uber.cadence.UpsertWorkflowSearchAttributesDecisionAttributes;
87
import com.uber.cadence.UpsertWorkflowSearchAttributesEventAttributes;
88
import com.uber.cadence.WorkflowExecution;
89
import com.uber.cadence.WorkflowExecutionAlreadyCompletedError;
90
import com.uber.cadence.WorkflowExecutionCloseStatus;
91
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
92
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
93
import com.uber.cadence.WorkflowQuery;
94
import com.uber.cadence.WorkflowQueryResult;
95
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
96
import com.uber.cadence.internal.testservice.StateMachines.Action;
97
import com.uber.cadence.internal.testservice.StateMachines.ActivityTaskData;
98
import com.uber.cadence.internal.testservice.StateMachines.ChildWorkflowData;
99
import com.uber.cadence.internal.testservice.StateMachines.DecisionTaskData;
100
import com.uber.cadence.internal.testservice.StateMachines.SignalExternalData;
101
import com.uber.cadence.internal.testservice.StateMachines.State;
102
import com.uber.cadence.internal.testservice.StateMachines.TimerData;
103
import com.uber.cadence.internal.testservice.StateMachines.WorkflowData;
104
import com.uber.cadence.internal.testservice.TestWorkflowStore.TaskListId;
105
import java.io.ByteArrayInputStream;
106
import java.io.ByteArrayOutputStream;
107
import java.io.DataInputStream;
108
import java.io.DataOutputStream;
109
import java.io.IOException;
110
import java.time.Duration;
111
import java.time.Instant;
112
import java.time.ZoneOffset;
113
import java.time.ZonedDateTime;
114
import java.util.ArrayList;
115
import java.util.HashMap;
116
import java.util.List;
117
import java.util.Map;
118
import java.util.Objects;
119
import java.util.Optional;
120
import java.util.OptionalLong;
121
import java.util.UUID;
122
import java.util.concurrent.CompletableFuture;
123
import java.util.concurrent.ConcurrentHashMap;
124
import java.util.concurrent.ExecutionException;
125
import java.util.concurrent.ForkJoinPool;
126
import java.util.concurrent.TimeUnit;
127
import java.util.concurrent.locks.Lock;
128
import java.util.concurrent.locks.ReentrantLock;
129
import java.util.function.LongSupplier;
130
import org.apache.thrift.TException;
131
import org.slf4j.Logger;
132
import org.slf4j.LoggerFactory;
133

134
class TestWorkflowMutableStateImpl implements TestWorkflowMutableState {
135

136
  @FunctionalInterface
137
  private interface UpdateProcedure {
138

139
    void apply(RequestContext ctx)
140
        throws InternalServiceError, BadRequestError, EntityNotExistsError;
141
  }
142

143
  private static final Logger log = LoggerFactory.getLogger(TestWorkflowMutableStateImpl.class);
1✔
144

145
  private final Lock lock = new ReentrantLock();
1✔
146
  private final SelfAdvancingTimer selfAdvancingTimer;
147
  private final LongSupplier clock;
148
  private final ExecutionId executionId;
149
  private final Optional<TestWorkflowMutableState> parent;
150
  private final OptionalLong parentChildInitiatedEventId;
151
  private final TestWorkflowStore store;
152
  private final TestWorkflowService service;
153
  private final StartWorkflowExecutionRequest startRequest;
154
  private long nextEventId;
155
  private final List<RequestContext> concurrentToDecision = new ArrayList<>();
1✔
156
  private final Map<String, StateMachine<ActivityTaskData>> activities = new HashMap<>();
1✔
157
  private final Map<Long, StateMachine<ChildWorkflowData>> childWorkflows = new HashMap<>();
1✔
158
  private final Map<String, StateMachine<TimerData>> timers = new HashMap<>();
1✔
159
  private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
1✔
160
  private StateMachine<WorkflowData> workflow;
161
  private volatile StateMachine<DecisionTaskData> decision;
162
  private long lastNonFailedDecisionStartEventId;
163
  private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
1✔
164
      new ConcurrentHashMap<>();
165
  private final Map<String, PollForDecisionTaskResponse> queryRequests = new ConcurrentHashMap<>();
1✔
166
  private final Map<String, WorkflowQuery> pendingQueries = new ConcurrentHashMap<>();
1✔
167
  private final Optional<String> continuedExecutionRunId;
168
  public StickyExecutionAttributes stickyExecutionAttributes;
169

170
  /**
171
   * @param retryState present if workflow is a retry
172
   * @param backoffStartIntervalInSeconds
173
   * @param parentChildInitiatedEventId id of the child initiated event in the parent history
174
   */
175
  TestWorkflowMutableStateImpl(
176
      StartWorkflowExecutionRequest startRequest,
177
      Optional<RetryState> retryState,
178
      int backoffStartIntervalInSeconds,
179
      byte[] lastCompletionResult,
180
      Optional<TestWorkflowMutableState> parent,
181
      OptionalLong parentChildInitiatedEventId,
182
      Optional<String> continuedExecutionRunId,
183
      TestWorkflowService service,
184
      TestWorkflowStore store) {
1✔
185
    this.startRequest = startRequest;
1✔
186
    this.parent = parent;
1✔
187
    this.parentChildInitiatedEventId = parentChildInitiatedEventId;
1✔
188
    this.continuedExecutionRunId = continuedExecutionRunId;
1✔
189
    this.service = service;
1✔
190
    String runId = UUID.randomUUID().toString();
1✔
191
    this.executionId =
1✔
192
        new ExecutionId(startRequest.getDomain(), startRequest.getWorkflowId(), runId);
1✔
193
    this.store = store;
1✔
194
    selfAdvancingTimer = store.getTimer();
1✔
195
    this.clock = selfAdvancingTimer.getClock();
1✔
196
    WorkflowData data =
1✔
197
        new WorkflowData(
198
            retryState,
199
            backoffStartIntervalInSeconds,
200
            startRequest.getCronSchedule(),
1✔
201
            lastCompletionResult,
202
            runId, // Test service doesn't support reset. Thus originalRunId is always the same as
203
            // runId.
204
            continuedExecutionRunId);
205
    this.workflow = StateMachines.newWorkflowStateMachine(data);
1✔
206
  }
1✔
207

208
  private void update(UpdateProcedure updater)
209
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
210
          BadRequestError {
211
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
212
    update(false, updater, stackTraceElements[2].getMethodName());
1✔
213
  }
1✔
214

215
  private void completeDecisionUpdate(UpdateProcedure updater, StickyExecutionAttributes attributes)
216
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
217
          BadRequestError {
218
    StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
1✔
219
    stickyExecutionAttributes = attributes;
1✔
220
    update(true, updater, stackTraceElements[2].getMethodName());
1✔
221
  }
1✔
222

223
  private void update(boolean completeDecisionUpdate, UpdateProcedure updater, String caller)
224
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
225
          BadRequestError {
226
    String callerInfo = "Decision Update from " + caller;
1✔
227
    lock.lock();
1✔
228
    LockHandle lockHandle = selfAdvancingTimer.lockTimeSkipping(callerInfo);
1✔
229

230
    try {
231
      checkCompleted();
1✔
232
      boolean concurrentDecision =
1✔
233
          !completeDecisionUpdate
234
              && (decision != null && decision.getState() == StateMachines.State.STARTED);
1✔
235

236
      RequestContext ctx = new RequestContext(clock, this, nextEventId);
1✔
237
      updater.apply(ctx);
1✔
238
      setPendingQueries(ctx);
1✔
239
      if (concurrentDecision && workflow.getState() != State.TIMED_OUT) {
1✔
240
        concurrentToDecision.add(ctx);
1✔
241
        ctx.fireCallbacks(0);
1✔
242
        store.applyTimersAndLocks(ctx);
1✔
243
      } else {
244
        nextEventId = ctx.commitChanges(store);
1✔
245
      }
246
    } catch (InternalServiceError
1✔
247
        | EntityNotExistsError
248
        | WorkflowExecutionAlreadyCompletedError
249
        | BadRequestError e) {
250
      throw e;
1✔
251
    } catch (Exception e) {
×
252
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
253
    } finally {
254
      lockHandle.unlock();
1✔
255
      lock.unlock();
1✔
256
    }
257
  }
1✔
258

259
  private void setPendingQueries(RequestContext ctx) {
260
    TestWorkflowStore.DecisionTask decisionTask = ctx.getDecisionTask();
1✔
261
    if (decisionTask != null) {
1✔
262
      decisionTask.getTask().setQueries(new HashMap<>(pendingQueries));
1✔
263
    }
264
  }
1✔
265

266
  @Override
267
  public ExecutionId getExecutionId() {
268
    return executionId;
1✔
269
  }
270

271
  @Override
272
  public Optional<WorkflowExecutionCloseStatus> getCloseStatus() {
273
    switch (workflow.getState()) {
1✔
274
      case NONE:
275
      case INITIATED:
276
      case STARTED:
277
      case CANCELLATION_REQUESTED:
278
        return Optional.empty();
1✔
279
      case FAILED:
280
        return Optional.of(WorkflowExecutionCloseStatus.FAILED);
1✔
281
      case TIMED_OUT:
282
        return Optional.of(WorkflowExecutionCloseStatus.TIMED_OUT);
×
283
      case CANCELED:
284
        return Optional.of(WorkflowExecutionCloseStatus.CANCELED);
×
285
      case COMPLETED:
286
        return Optional.of(WorkflowExecutionCloseStatus.COMPLETED);
1✔
287
      case CONTINUED_AS_NEW:
288
        return Optional.of(WorkflowExecutionCloseStatus.CONTINUED_AS_NEW);
×
289
    }
290
    throw new IllegalStateException("unreachable");
×
291
  }
292

293
  @Override
294
  public StartWorkflowExecutionRequest getStartRequest() {
295
    return startRequest;
1✔
296
  }
297

298
  @Override
299
  public StickyExecutionAttributes getStickyExecutionAttributes() {
300
    return stickyExecutionAttributes;
1✔
301
  }
302

303
  @Override
304
  public Optional<TestWorkflowMutableState> getParent() {
305
    return parent;
1✔
306
  }
307

308
  @Override
309
  public void startDecisionTask(
310
      PollForDecisionTaskResponse task, PollForDecisionTaskRequest pollRequest)
311
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
312
          BadRequestError {
313
    if (task.getQuery() == null) {
1✔
314
      update(
1✔
315
          ctx -> {
316
            long scheduledEventId = decision.getData().scheduledEventId;
1✔
317
            decision.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
318
            ctx.addTimer(
1✔
319
                startRequest.getTaskStartToCloseTimeoutSeconds(),
1✔
320
                () -> timeoutDecisionTask(scheduledEventId),
1✔
321
                "DecisionTask StartToCloseTimeout");
322
          });
1✔
323
    }
324
  }
1✔
325

326
  @Override
327
  public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRequest request)
328
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
329
          BadRequestError {
330
    List<Decision> decisions = request.getDecisions();
1✔
331
    completeDecisionUpdate(
1✔
332
        ctx -> {
333
          if (request.getQueryResultsSize() > 0) {
1✔
334
            request
×
335
                .getQueryResults()
×
336
                .forEach(
×
337
                    (queryId, queryResult) -> {
338
                      completeQuery(queryId, queryResult);
×
339
                      pendingQueries.remove(queryId);
×
340
                    });
×
341
          }
342

343
          if (ctx.getInitialEventId() != historySize + 1) {
1✔
344
            throw new BadRequestError(
1✔
345
                "Expired decision: expectedHistorySize="
346
                    + historySize
347
                    + ","
348
                    + " actualHistorySize="
349
                    + ctx.getInitialEventId());
1✔
350
          }
351
          long decisionTaskCompletedId = ctx.getNextEventId() - 1;
1✔
352
          // Fail the decision if there are new events and the decision tries to complete the
353
          // workflow
354
          if (!concurrentToDecision.isEmpty() && hasCompleteDecision(request.getDecisions())) {
1✔
355
            RespondDecisionTaskFailedRequest failedRequest =
1✔
356
                new RespondDecisionTaskFailedRequest()
357
                    .setCause(DecisionTaskFailedCause.UNHANDLED_DECISION)
1✔
358
                    .setIdentity(request.getIdentity());
1✔
359
            decision.action(Action.FAIL, ctx, failedRequest, decisionTaskCompletedId);
1✔
360
            for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
361
              ctx.add(deferredCtx);
1✔
362
            }
1✔
363
            this.concurrentToDecision.clear();
1✔
364

365
            // Reset sticky execution attributes on failure
366
            stickyExecutionAttributes = null;
1✔
367
            scheduleDecision(ctx);
1✔
368
            return;
1✔
369
          }
370
          if (decision == null) {
1✔
371
            throw new EntityNotExistsError("No outstanding decision");
×
372
          }
373
          decision.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
374
          for (Decision d : decisions) {
1✔
375
            processDecision(ctx, d, request.getIdentity(), decisionTaskCompletedId);
1✔
376
          }
1✔
377
          for (RequestContext deferredCtx : this.concurrentToDecision) {
1✔
378
            ctx.add(deferredCtx);
1✔
379
          }
1✔
380
          lastNonFailedDecisionStartEventId = this.decision.getData().startedEventId;
1✔
381
          this.decision = null;
1✔
382
          boolean completed =
1✔
383
              workflow.getState() == StateMachines.State.COMPLETED
1✔
384
                  || workflow.getState() == StateMachines.State.FAILED
1✔
385
                  || workflow.getState() == StateMachines.State.CANCELED;
1✔
386
          if (!completed
1✔
387
              && ((ctx.isNeedDecision() || !this.concurrentToDecision.isEmpty())
1✔
388
                  || request.isForceCreateNewDecisionTask())) {
1✔
389
            scheduleDecision(ctx);
1✔
390
          }
391
          this.concurrentToDecision.clear();
1✔
392
          ctx.unlockTimer();
1✔
393
        },
1✔
394
        request.getStickyAttributes());
1✔
395
  }
1✔
396

397
  private void completeQuery(String queryId, WorkflowQueryResult queryResult) {
398
    CompletableFuture<QueryWorkflowResponse> future = queries.get(queryId);
×
399
    if (future == null) {
×
400
      throw new RuntimeException("Unknown query id: " + queryId);
×
401
    }
402
    if (queryResult.getResultType() == QueryResultType.ANSWERED) {
×
403
      future.complete(new QueryWorkflowResponse().setQueryResult(queryResult.getAnswer()));
×
404
    } else {
405
      future.completeExceptionally(
×
406
          new QueryFailedError().setMessage(queryResult.getErrorMessage()));
×
407
    }
408
  }
×
409

410
  private boolean hasCompleteDecision(List<Decision> decisions) {
411
    for (Decision d : decisions) {
1✔
412
      if (WorkflowExecutionUtils.isWorkflowExecutionCompleteDecision(d)) {
1✔
413
        return true;
1✔
414
      }
415
    }
1✔
416
    return false;
1✔
417
  }
418

419
  private void processDecision(
420
      RequestContext ctx, Decision d, String identity, long decisionTaskCompletedId)
421
      throws BadRequestError, InternalServiceError {
422
    switch (d.getDecisionType()) {
1✔
423
      case CompleteWorkflowExecution:
424
        processCompleteWorkflowExecution(
1✔
425
            ctx,
426
            d.getCompleteWorkflowExecutionDecisionAttributes(),
1✔
427
            decisionTaskCompletedId,
428
            identity);
429
        break;
1✔
430
      case FailWorkflowExecution:
431
        processFailWorkflowExecution(
1✔
432
            ctx, d.getFailWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId, identity);
1✔
433
        break;
1✔
434
      case CancelWorkflowExecution:
435
        processCancelWorkflowExecution(
1✔
436
            ctx, d.getCancelWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
437
        break;
1✔
438
      case ContinueAsNewWorkflowExecution:
439
        processContinueAsNewWorkflowExecution(
1✔
440
            ctx,
441
            d.getContinueAsNewWorkflowExecutionDecisionAttributes(),
1✔
442
            decisionTaskCompletedId,
443
            identity);
444
        break;
1✔
445
      case ScheduleActivityTask:
446
        processScheduleActivityTask(
1✔
447
            ctx, d.getScheduleActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
448
        break;
1✔
449
      case RequestCancelActivityTask:
450
        processRequestCancelActivityTask(
1✔
451
            ctx, d.getRequestCancelActivityTaskDecisionAttributes(), decisionTaskCompletedId);
1✔
452
        break;
1✔
453
      case StartTimer:
454
        processStartTimer(ctx, d.getStartTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
455
        break;
1✔
456
      case CancelTimer:
457
        processCancelTimer(ctx, d.getCancelTimerDecisionAttributes(), decisionTaskCompletedId);
1✔
458
        break;
1✔
459
      case StartChildWorkflowExecution:
460
        processStartChildWorkflow(
1✔
461
            ctx, d.getStartChildWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
462
        break;
1✔
463
      case SignalExternalWorkflowExecution:
464
        processSignalExternalWorkflowExecution(
1✔
465
            ctx, d.getSignalExternalWorkflowExecutionDecisionAttributes(), decisionTaskCompletedId);
1✔
466
        break;
1✔
467
      case RecordMarker:
468
        processRecordMarker(ctx, d.getRecordMarkerDecisionAttributes(), decisionTaskCompletedId);
1✔
469
        break;
1✔
470
      case RequestCancelExternalWorkflowExecution:
471
        processRequestCancelExternalWorkflowExecution(
1✔
472
            ctx, d.getRequestCancelExternalWorkflowExecutionDecisionAttributes());
1✔
473
        break;
1✔
474
      case UpsertWorkflowSearchAttributes:
475
        processUpsertWorkflowSearchAttributes(
1✔
476
            ctx, d.getUpsertWorkflowSearchAttributesDecisionAttributes(), decisionTaskCompletedId);
1✔
477
        break;
478
    }
479
  }
1✔
480

481
  private void processRequestCancelExternalWorkflowExecution(
482
      RequestContext ctx, RequestCancelExternalWorkflowExecutionDecisionAttributes attr) {
483
    ForkJoinPool.commonPool()
1✔
484
        .execute(
1✔
485
            () -> {
486
              RequestCancelWorkflowExecutionRequest request =
1✔
487
                  new RequestCancelWorkflowExecutionRequest();
488
              WorkflowExecution workflowExecution = new WorkflowExecution();
1✔
489
              workflowExecution.setWorkflowId(attr.workflowId);
1✔
490
              request.setWorkflowExecution(workflowExecution);
1✔
491
              request.setDomain(ctx.getDomain());
1✔
492
              try {
493
                service.RequestCancelWorkflowExecution(request);
1✔
494
              } catch (Exception e) {
×
495
                log.error("Failure to request cancel external workflow", e);
×
496
              }
1✔
497
            });
1✔
498
  }
1✔
499

500
  private void processRecordMarker(
501
      RequestContext ctx, RecordMarkerDecisionAttributes attr, long decisionTaskCompletedId)
502
      throws BadRequestError {
503
    if (!attr.isSetMarkerName()) {
1✔
504
      throw new BadRequestError("marker name is required");
×
505
    }
506

507
    MarkerRecordedEventAttributes marker =
1✔
508
        new MarkerRecordedEventAttributes()
509
            .setMarkerName(attr.getMarkerName())
1✔
510
            .setHeader(attr.getHeader())
1✔
511
            .setDetails(attr.getDetails())
1✔
512
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
513
    HistoryEvent event =
1✔
514
        new HistoryEvent()
515
            .setEventType(EventType.MarkerRecorded)
1✔
516
            .setMarkerRecordedEventAttributes(marker);
1✔
517
    ctx.addEvent(event);
1✔
518
  }
1✔
519

520
  private void processCancelTimer(
521
      RequestContext ctx, CancelTimerDecisionAttributes d, long decisionTaskCompletedId)
522
      throws InternalServiceError, BadRequestError {
523
    String timerId = d.getTimerId();
1✔
524
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
525
    if (timer == null) {
1✔
526
      CancelTimerFailedEventAttributes failedAttr =
×
527
          new CancelTimerFailedEventAttributes()
528
              .setTimerId(timerId)
×
529
              .setCause("TIMER_ID_UNKNOWN")
×
530
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
531
      HistoryEvent cancellationFailed =
×
532
          new HistoryEvent()
533
              .setEventType(EventType.CancelTimerFailed)
×
534
              .setCancelTimerFailedEventAttributes(failedAttr);
×
535
      ctx.addEvent(cancellationFailed);
×
536
      return;
×
537
    }
538
    timer.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
539
    timers.remove(timerId);
1✔
540
  }
1✔
541

542
  private void processRequestCancelActivityTask(
543
      RequestContext ctx,
544
      RequestCancelActivityTaskDecisionAttributes a,
545
      long decisionTaskCompletedId)
546
      throws InternalServiceError, BadRequestError {
547
    String activityId = a.getActivityId();
1✔
548
    StateMachine<?> activity = activities.get(activityId);
1✔
549
    if (activity == null) {
1✔
550
      RequestCancelActivityTaskFailedEventAttributes failedAttr =
×
551
          new RequestCancelActivityTaskFailedEventAttributes()
552
              .setActivityId(activityId)
×
553
              .setCause("ACTIVITY_ID_UNKNOWN")
×
554
              .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
×
555
      HistoryEvent cancellationFailed =
×
556
          new HistoryEvent()
557
              .setEventType(EventType.RequestCancelActivityTaskFailed)
×
558
              .setRequestCancelActivityTaskFailedEventAttributes(failedAttr);
×
559
      ctx.addEvent(cancellationFailed);
×
560
      return;
×
561
    }
562
    State beforeState = activity.getState();
1✔
563
    activity.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, a, decisionTaskCompletedId);
1✔
564
    if (beforeState == StateMachines.State.INITIATED) {
1✔
565
      activity.action(StateMachines.Action.CANCEL, ctx, null, 0);
×
566
      activities.remove(activityId);
×
567
      ctx.setNeedDecision(true);
×
568
    }
569
  }
1✔
570

571
  private void processScheduleActivityTask(
572
      RequestContext ctx, ScheduleActivityTaskDecisionAttributes a, long decisionTaskCompletedId)
573
      throws BadRequestError, InternalServiceError {
574
    validateScheduleActivityTask(a);
1✔
575
    String activityId = a.getActivityId();
1✔
576
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
577
    if (activity != null) {
1✔
578
      throw new BadRequestError("Already open activity with " + activityId);
×
579
    }
580
    activity = StateMachines.newActivityStateMachine(store, this.startRequest);
1✔
581
    activities.put(activityId, activity);
1✔
582
    activity.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
583
    ActivityTaskScheduledEventAttributes scheduledEvent = activity.getData().scheduledEvent;
1✔
584
    ctx.addTimer(
1✔
585
        scheduledEvent.getScheduleToCloseTimeoutSeconds(),
1✔
586
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_CLOSE),
1✔
587
        "Activity ScheduleToCloseTimeout");
588
    ctx.addTimer(
1✔
589
        scheduledEvent.getScheduleToStartTimeoutSeconds(),
1✔
590
        () -> timeoutActivity(activityId, TimeoutType.SCHEDULE_TO_START),
1✔
591
        "Activity ScheduleToStartTimeout");
592
    ctx.lockTimer();
1✔
593
  }
1✔
594

595
  private void validateScheduleActivityTask(ScheduleActivityTaskDecisionAttributes a)
596
      throws BadRequestError {
597
    if (a == null) {
1✔
598
      throw new BadRequestError("ScheduleActivityTaskDecisionAttributes is not set on decision.");
×
599
    }
600

601
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
602
      throw new BadRequestError("TaskList is not set on decision.");
×
603
    }
604
    if (a.getActivityId() == null || a.getActivityId().isEmpty()) {
1✔
605
      throw new BadRequestError("ActivityId is not set on decision.");
×
606
    }
607
    if (a.getActivityType() == null
1✔
608
        || a.getActivityType().getName() == null
1✔
609
        || a.getActivityType().getName().isEmpty()) {
1✔
610
      throw new BadRequestError("ActivityType is not set on decision.");
×
611
    }
612
    if (a.getStartToCloseTimeoutSeconds() <= 0) {
1✔
613
      throw new BadRequestError("A valid StartToCloseTimeoutSeconds is not set on decision.");
×
614
    }
615
    if (a.getScheduleToStartTimeoutSeconds() <= 0) {
1✔
616
      throw new BadRequestError("A valid ScheduleToStartTimeoutSeconds is not set on decision.");
×
617
    }
618
    if (a.getScheduleToCloseTimeoutSeconds() <= 0) {
1✔
619
      throw new BadRequestError("A valid ScheduleToCloseTimeoutSeconds is not set on decision.");
×
620
    }
621
    if (a.getHeartbeatTimeoutSeconds() < 0) {
1✔
622
      throw new BadRequestError("Ac valid HeartbeatTimeoutSeconds is not set on decision.");
×
623
    }
624
  }
1✔
625

626
  private void processStartChildWorkflow(
627
      RequestContext ctx,
628
      StartChildWorkflowExecutionDecisionAttributes a,
629
      long decisionTaskCompletedId)
630
      throws BadRequestError, InternalServiceError {
631
    validateStartChildExecutionAttributes(a);
1✔
632
    StateMachine<ChildWorkflowData> child = StateMachines.newChildWorkflowStateMachine(service);
1✔
633
    childWorkflows.put(ctx.getNextEventId(), child);
1✔
634
    child.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
635
    ctx.lockTimer();
1✔
636
  }
1✔
637

638
  /** Clone of the validateStartChildExecutionAttributes from historyEngine.go */
639
  private void validateStartChildExecutionAttributes(
640
      StartChildWorkflowExecutionDecisionAttributes a) throws BadRequestError {
641
    if (a == null) {
1✔
642
      throw new BadRequestError(
×
643
          "StartChildWorkflowExecutionDecisionAttributes is not set on decision.");
644
    }
645

646
    if (a.getWorkflowId().isEmpty()) {
1✔
647
      throw new BadRequestError("Required field WorkflowID is not set on decision.");
×
648
    }
649

650
    if (a.getWorkflowType() == null || a.getWorkflowType().getName().isEmpty()) {
1✔
651
      throw new BadRequestError("Required field WorkflowType is not set on decision.");
×
652
    }
653

654
    // Inherit tasklist from parent workflow execution if not provided on decision
655
    if (a.getTaskList() == null || a.getTaskList().getName().isEmpty()) {
1✔
656
      a.setTaskList(startRequest.getTaskList());
×
657
    }
658

659
    // Inherit workflow timeout from parent workflow execution if not provided on decision
660
    if (a.getExecutionStartToCloseTimeoutSeconds() <= 0) {
1✔
661
      a.setExecutionStartToCloseTimeoutSeconds(
×
662
          startRequest.getExecutionStartToCloseTimeoutSeconds());
×
663
    }
664

665
    // Inherit decision task timeout from parent workflow execution if not provided on decision
666
    if (a.getTaskStartToCloseTimeoutSeconds() <= 0) {
1✔
667
      a.setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds());
×
668
    }
669

670
    RetryPolicy retryPolicy = a.getRetryPolicy();
1✔
671
    if (retryPolicy != null) {
1✔
672
      RetryState.validateRetryPolicy(retryPolicy);
1✔
673
    }
674
  }
1✔
675

676
  private void processSignalExternalWorkflowExecution(
677
      RequestContext ctx,
678
      SignalExternalWorkflowExecutionDecisionAttributes a,
679
      long decisionTaskCompletedId)
680
      throws InternalServiceError, BadRequestError {
681
    String signalId = UUID.randomUUID().toString();
1✔
682
    StateMachine<SignalExternalData> signalStateMachine =
683
        StateMachines.newSignalExternalStateMachine();
1✔
684
    externalSignals.put(signalId, signalStateMachine);
1✔
685
    signalStateMachine.action(StateMachines.Action.INITIATE, ctx, a, decisionTaskCompletedId);
1✔
686
    ForkJoinPool.commonPool()
1✔
687
        .execute(
1✔
688
            () -> {
689
              try {
690
                service.signalExternalWorkflowExecution(signalId, a, this);
1✔
691
              } catch (Exception e) {
×
692
                log.error("Failure signalling an external workflow execution", e);
×
693
              }
1✔
694
            });
1✔
695
    ctx.lockTimer();
1✔
696
  }
1✔
697

698
  @Override
699
  public void completeSignalExternalWorkflowExecution(String signalId, String runId)
700
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
701
          BadRequestError {
702
    update(
1✔
703
        ctx -> {
704
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
705
          signal.action(Action.COMPLETE, ctx, runId, 0);
1✔
706
          scheduleDecision(ctx);
1✔
707
          ctx.unlockTimer();
1✔
708
        });
1✔
709
  }
1✔
710

711
  @Override
712
  public void failSignalExternalWorkflowExecution(
713
      String signalId, SignalExternalWorkflowExecutionFailedCause cause)
714
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
715
          BadRequestError {
716
    update(
1✔
717
        ctx -> {
718
          StateMachine<SignalExternalData> signal = getSignal(signalId);
1✔
719
          signal.action(Action.FAIL, ctx, cause, 0);
1✔
720
          scheduleDecision(ctx);
1✔
721
          ctx.unlockTimer();
1✔
722
        });
1✔
723
  }
1✔
724

725
  private StateMachine<SignalExternalData> getSignal(String signalId) throws EntityNotExistsError {
726
    StateMachine<SignalExternalData> signal = externalSignals.get(signalId);
1✔
727
    if (signal == null) {
1✔
728
      throw new EntityNotExistsError("unknown signalId: " + signalId);
×
729
    }
730
    return signal;
1✔
731
  }
732

733
  // TODO: insert a single decision failure into the history
734
  @Override
735
  public void failDecisionTask(RespondDecisionTaskFailedRequest request)
736
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
737
          BadRequestError {
738
    completeDecisionUpdate(
1✔
739
        ctx -> {
740
          decision.action(Action.FAIL, ctx, request, 0);
1✔
741
          scheduleDecision(ctx);
1✔
742
        },
1✔
743
        null); // reset sticky attributes to null
744
  }
1✔
745

746
  // TODO: insert a single decision timeout into the history
747
  private void timeoutDecisionTask(long scheduledEventId) {
748
    try {
749
      completeDecisionUpdate(
1✔
750
          ctx -> {
751
            if (decision == null
1✔
752
                || decision.getData().scheduledEventId != scheduledEventId
1✔
753
                || decision.getState() == State.COMPLETED) {
1✔
754
              // timeout for a previous decision
755
              return;
1✔
756
            }
757
            decision.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
758
            scheduleDecision(ctx);
1✔
759
          },
1✔
760
          null); // reset sticky attributes to null
761
    } catch (EntityNotExistsError e) {
×
762
      // Expected as timers are not removed
763
    } catch (Exception e) {
1✔
764
      // Cannot fail to timer threads
765
      log.error("Failure trying to timeout a decision scheduledEventId=" + scheduledEventId, e);
1✔
766
    }
1✔
767
  }
1✔
768

769
  @Override
770
  public void childWorkflowStarted(ChildWorkflowExecutionStartedEventAttributes a)
771
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
772
          BadRequestError {
773
    update(
1✔
774
        ctx -> {
775
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
776
          child.action(StateMachines.Action.START, ctx, a, 0);
1✔
777
          scheduleDecision(ctx);
1✔
778
          // No need to lock until completion as child workflow might skip
779
          // time as well
780
          ctx.unlockTimer();
1✔
781
        });
1✔
782
  }
1✔
783

784
  @Override
785
  public void childWorkflowFailed(String activityId, ChildWorkflowExecutionFailedEventAttributes a)
786
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
787
          BadRequestError {
788
    update(
1✔
789
        ctx -> {
790
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
791
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
792
          childWorkflows.remove(a.getInitiatedEventId());
1✔
793
          scheduleDecision(ctx);
1✔
794
          ctx.unlockTimer();
1✔
795
        });
1✔
796
  }
1✔
797

798
  @Override
799
  public void childWorkflowTimedOut(
800
      String activityId, ChildWorkflowExecutionTimedOutEventAttributes a)
801
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
802
          BadRequestError {
803
    update(
1✔
804
        ctx -> {
805
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
806
          child.action(Action.TIME_OUT, ctx, a.getTimeoutType(), 0);
1✔
807
          childWorkflows.remove(a.getInitiatedEventId());
1✔
808
          scheduleDecision(ctx);
1✔
809
          ctx.unlockTimer();
1✔
810
        });
1✔
811
  }
1✔
812

813
  @Override
814
  public void failStartChildWorkflow(
815
      String childId, StartChildWorkflowExecutionFailedEventAttributes a)
816
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
817
          BadRequestError {
818
    update(
1✔
819
        ctx -> {
820
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
821
          child.action(StateMachines.Action.FAIL, ctx, a, 0);
1✔
822
          childWorkflows.remove(a.getInitiatedEventId());
1✔
823
          scheduleDecision(ctx);
1✔
824
          ctx.unlockTimer();
1✔
825
        });
1✔
826
  }
1✔
827

828
  @Override
829
  public void childWorkflowCompleted(
830
      String activityId, ChildWorkflowExecutionCompletedEventAttributes a)
831
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
832
          BadRequestError {
833
    update(
1✔
834
        ctx -> {
835
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
836
          child.action(StateMachines.Action.COMPLETE, ctx, a, 0);
1✔
837
          childWorkflows.remove(a.getInitiatedEventId());
1✔
838
          scheduleDecision(ctx);
1✔
839
          ctx.unlockTimer();
1✔
840
        });
1✔
841
  }
1✔
842

843
  @Override
844
  public void childWorkflowCanceled(
845
      String activityId, ChildWorkflowExecutionCanceledEventAttributes a)
846
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
847
          BadRequestError {
848
    update(
1✔
849
        ctx -> {
850
          StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
1✔
851
          child.action(StateMachines.Action.CANCEL, ctx, a, 0);
1✔
852
          childWorkflows.remove(a.getInitiatedEventId());
1✔
853
          scheduleDecision(ctx);
1✔
854
          ctx.unlockTimer();
1✔
855
        });
1✔
856
  }
1✔
857

858
  private void processStartTimer(
859
      RequestContext ctx, StartTimerDecisionAttributes a, long decisionTaskCompletedId)
860
      throws BadRequestError, InternalServiceError {
861
    String timerId = a.getTimerId();
1✔
862
    if (timerId == null) {
1✔
863
      throw new BadRequestError("A valid TimerId is not set on StartTimerDecision");
×
864
    }
865
    StateMachine<TimerData> timer = timers.get(timerId);
1✔
866
    if (timer != null) {
1✔
867
      throw new BadRequestError("Already open timer with " + timerId);
×
868
    }
869
    timer = StateMachines.newTimerStateMachine();
1✔
870
    timers.put(timerId, timer);
1✔
871
    timer.action(StateMachines.Action.START, ctx, a, decisionTaskCompletedId);
1✔
872
    ctx.addTimer(a.getStartToFireTimeoutSeconds(), () -> fireTimer(timerId), "fire timer");
1✔
873
  }
1✔
874

875
  private void fireTimer(String timerId) {
876
    StateMachine<TimerData> timer;
877
    lock.lock();
1✔
878
    try {
879
      {
880
        timer = timers.get(timerId);
1✔
881
        if (timer == null || workflow.getState() != State.STARTED) {
1✔
882
          return; // cancelled already
1✔
883
        }
884
      }
885
    } finally {
886
      lock.unlock();
1✔
887
    }
888
    try {
889
      update(
1✔
890
          ctx -> {
891
            timer.action(StateMachines.Action.COMPLETE, ctx, null, 0);
1✔
892
            timers.remove(timerId);
1✔
893
            scheduleDecision(ctx);
1✔
894
          });
1✔
895
    } catch (BadRequestError
×
896
        | InternalServiceError
897
        | EntityNotExistsError
898
        | WorkflowExecutionAlreadyCompletedError e) {
899
      // Cannot fail to timer threads
900
      log.error("Failure firing a timer", e);
×
901
    }
1✔
902
  }
1✔
903

904
  private void processFailWorkflowExecution(
905
      RequestContext ctx,
906
      FailWorkflowExecutionDecisionAttributes d,
907
      long decisionTaskCompletedId,
908
      String identity)
909
      throws InternalServiceError, BadRequestError {
910
    WorkflowData data = workflow.getData();
1✔
911
    if (data.retryState.isPresent()) {
1✔
912
      RetryState rs = data.retryState.get();
1✔
913
      int backoffIntervalSeconds =
1✔
914
          rs.getBackoffIntervalInSeconds(d.getReason(), store.currentTimeMillis());
1✔
915
      if (backoffIntervalSeconds > 0) {
1✔
916
        ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
917
            new ContinueAsNewWorkflowExecutionDecisionAttributes()
918
                .setInput(startRequest.getInput())
1✔
919
                .setWorkflowType(startRequest.getWorkflowType())
1✔
920
                .setExecutionStartToCloseTimeoutSeconds(
1✔
921
                    startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
922
                .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
923
                .setTaskList(startRequest.getTaskList())
1✔
924
                .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
925
                .setRetryPolicy(startRequest.getRetryPolicy());
1✔
926
        workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
927
        HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
928
        WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
929
            event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
930

931
        Optional<RetryState> continuedRetryState = Optional.of(rs.getNextAttempt());
1✔
932
        String runId =
1✔
933
            service.continueAsNew(
1✔
934
                startRequest,
935
                continuedAsNewEventAttributes,
936
                continuedRetryState,
937
                identity,
938
                getExecutionId(),
1✔
939
                parent,
940
                parentChildInitiatedEventId);
941
        continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
942
        return;
1✔
943
      }
944
    }
945

946
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
947
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, data.lastCompletionResult);
1✔
948
      return;
1✔
949
    }
950

951
    workflow.action(StateMachines.Action.FAIL, ctx, d, decisionTaskCompletedId);
1✔
952
    if (parent.isPresent()) {
1✔
953
      ctx.lockTimer(); // unlocked by the parent
1✔
954
      ChildWorkflowExecutionFailedEventAttributes a =
1✔
955
          new ChildWorkflowExecutionFailedEventAttributes()
956
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
957
              .setDetails(d.getDetails())
1✔
958
              .setReason(d.getReason())
1✔
959
              .setWorkflowType(startRequest.getWorkflowType())
1✔
960
              .setDomain(ctx.getDomain())
1✔
961
              .setWorkflowExecution(ctx.getExecution());
1✔
962
      ForkJoinPool.commonPool()
1✔
963
          .execute(
1✔
964
              () -> {
965
                try {
966
                  parent
1✔
967
                      .get()
1✔
968
                      .childWorkflowFailed(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
969
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
970
                  // Parent might already close
971
                } catch (BadRequestError | InternalServiceError e) {
×
972
                  log.error("Failure reporting child completion", e);
×
973
                }
1✔
974
              });
1✔
975
    }
976
  }
1✔
977

978
  private void processCompleteWorkflowExecution(
979
      RequestContext ctx,
980
      CompleteWorkflowExecutionDecisionAttributes d,
981
      long decisionTaskCompletedId,
982
      String identity)
983
      throws InternalServiceError, BadRequestError {
984
    WorkflowData data = workflow.getData();
1✔
985
    if (!Strings.isNullOrEmpty(data.cronSchedule)) {
1✔
986
      startNewCronRun(ctx, decisionTaskCompletedId, identity, data, d.getResult());
1✔
987
      return;
1✔
988
    }
989

990
    workflow.action(StateMachines.Action.COMPLETE, ctx, d, decisionTaskCompletedId);
1✔
991
    if (parent.isPresent()) {
1✔
992
      ctx.lockTimer(); // unlocked by the parent
1✔
993
      ChildWorkflowExecutionCompletedEventAttributes a =
1✔
994
          new ChildWorkflowExecutionCompletedEventAttributes()
995
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
996
              .setResult(d.getResult())
1✔
997
              .setDomain(ctx.getDomain())
1✔
998
              .setWorkflowExecution(ctx.getExecution())
1✔
999
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1000
      ForkJoinPool.commonPool()
1✔
1001
          .execute(
1✔
1002
              () -> {
1003
                try {
1004
                  parent
1✔
1005
                      .get()
1✔
1006
                      .childWorkflowCompleted(
1✔
1007
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1008
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1009
                  // Parent might already close
1010
                } catch (BadRequestError | InternalServiceError e) {
×
1011
                  log.error("Failure reporting child completion", e);
×
1012
                }
1✔
1013
              });
1✔
1014
    }
1015
  }
1✔
1016

1017
  private void startNewCronRun(
1018
      RequestContext ctx,
1019
      long decisionTaskCompletedId,
1020
      String identity,
1021
      WorkflowData data,
1022
      byte[] lastCompletionResult)
1023
      throws InternalServiceError, BadRequestError {
1024
    CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX);
1✔
1025
    CronParser parser = new CronParser(cronDefinition);
1✔
1026
    Cron cron = parser.parse(data.cronSchedule);
1✔
1027

1028
    Instant i = Instant.ofEpochMilli(store.currentTimeMillis());
1✔
1029
    ZonedDateTime now = ZonedDateTime.ofInstant(i, ZoneOffset.UTC);
1✔
1030

1031
    ExecutionTime executionTime = ExecutionTime.forCron(cron);
1✔
1032
    Optional<Duration> backoff = executionTime.timeToNextExecution(now);
1✔
1033
    int backoffIntervalSeconds = (int) backoff.get().getSeconds();
1✔
1034

1035
    if (backoffIntervalSeconds == 0) {
1✔
1036
      backoff = executionTime.timeToNextExecution(now.plusSeconds(1));
1✔
1037
      backoffIntervalSeconds = (int) backoff.get().getSeconds() + 1;
1✔
1038
    }
1039

1040
    ContinueAsNewWorkflowExecutionDecisionAttributes continueAsNewAttr =
1✔
1041
        new ContinueAsNewWorkflowExecutionDecisionAttributes()
1042
            .setInput(startRequest.getInput())
1✔
1043
            .setWorkflowType(startRequest.getWorkflowType())
1✔
1044
            .setExecutionStartToCloseTimeoutSeconds(
1✔
1045
                startRequest.getExecutionStartToCloseTimeoutSeconds())
1✔
1046
            .setTaskStartToCloseTimeoutSeconds(startRequest.getTaskStartToCloseTimeoutSeconds())
1✔
1047
            .setTaskList(startRequest.getTaskList())
1✔
1048
            .setBackoffStartIntervalInSeconds(backoffIntervalSeconds)
1✔
1049
            .setRetryPolicy(startRequest.getRetryPolicy())
1✔
1050
            .setLastCompletionResult(lastCompletionResult);
1✔
1051
    workflow.action(Action.CONTINUE_AS_NEW, ctx, continueAsNewAttr, decisionTaskCompletedId);
1✔
1052
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1053
    WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewEventAttributes =
1✔
1054
        event.getWorkflowExecutionContinuedAsNewEventAttributes();
1✔
1055

1056
    String runId =
1✔
1057
        service.continueAsNew(
1✔
1058
            startRequest,
1059
            continuedAsNewEventAttributes,
1060
            Optional.empty(),
1✔
1061
            identity,
1062
            getExecutionId(),
1✔
1063
            parent,
1064
            parentChildInitiatedEventId);
1065
    continuedAsNewEventAttributes.setNewExecutionRunId(runId);
1✔
1066
  }
1✔
1067

1068
  private void processCancelWorkflowExecution(
1069
      RequestContext ctx, CancelWorkflowExecutionDecisionAttributes d, long decisionTaskCompletedId)
1070
      throws InternalServiceError, BadRequestError {
1071
    workflow.action(StateMachines.Action.CANCEL, ctx, d, decisionTaskCompletedId);
1✔
1072
    if (parent.isPresent()) {
1✔
1073
      ctx.lockTimer(); // unlocked by the parent
1✔
1074
      ChildWorkflowExecutionCanceledEventAttributes a =
1✔
1075
          new ChildWorkflowExecutionCanceledEventAttributes()
1076
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1077
              .setDetails(d.getDetails())
1✔
1078
              .setDomain(ctx.getDomain())
1✔
1079
              .setWorkflowExecution(ctx.getExecution())
1✔
1080
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1081
      ForkJoinPool.commonPool()
1✔
1082
          .execute(
1✔
1083
              () -> {
1084
                try {
1085
                  parent
1✔
1086
                      .get()
1✔
1087
                      .childWorkflowCanceled(
1✔
1088
                          ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1089
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1090
                  // Parent might already close
1091
                } catch (BadRequestError | InternalServiceError e) {
×
1092
                  log.error("Failure reporting child completion", e);
×
1093
                }
1✔
1094
              });
1✔
1095
    }
1096
  }
1✔
1097

1098
  private void processContinueAsNewWorkflowExecution(
1099
      RequestContext ctx,
1100
      ContinueAsNewWorkflowExecutionDecisionAttributes d,
1101
      long decisionTaskCompletedId,
1102
      String identity)
1103
      throws InternalServiceError, BadRequestError {
1104
    workflow.action(Action.CONTINUE_AS_NEW, ctx, d, decisionTaskCompletedId);
1✔
1105
    HistoryEvent event = ctx.getEvents().get(ctx.getEvents().size() - 1);
1✔
1106
    String runId =
1✔
1107
        service.continueAsNew(
1✔
1108
            startRequest,
1109
            event.getWorkflowExecutionContinuedAsNewEventAttributes(),
1✔
1110
            workflow.getData().retryState,
1✔
1111
            identity,
1112
            getExecutionId(),
1✔
1113
            parent,
1114
            parentChildInitiatedEventId);
1115
    event.getWorkflowExecutionContinuedAsNewEventAttributes().setNewExecutionRunId(runId);
1✔
1116
  }
1✔
1117

1118
  private void processUpsertWorkflowSearchAttributes(
1119
      RequestContext ctx,
1120
      UpsertWorkflowSearchAttributesDecisionAttributes attr,
1121
      long decisionTaskCompletedId)
1122
      throws BadRequestError, InternalServiceError {
1123
    UpsertWorkflowSearchAttributesEventAttributes upsertEventAttr =
1✔
1124
        new UpsertWorkflowSearchAttributesEventAttributes()
1125
            .setSearchAttributes(attr.getSearchAttributes())
1✔
1126
            .setDecisionTaskCompletedEventId(decisionTaskCompletedId);
1✔
1127
    HistoryEvent event =
1✔
1128
        new HistoryEvent()
1129
            .setEventType(EventType.UpsertWorkflowSearchAttributes)
1✔
1130
            .setUpsertWorkflowSearchAttributesEventAttributes(upsertEventAttr);
1✔
1131
    ctx.addEvent(event);
1✔
1132
  }
1✔
1133

1134
  @Override
1135
  public void startWorkflow(
1136
      boolean continuedAsNew, Optional<SignalWorkflowExecutionRequest> signalWithStartSignal)
1137
      throws InternalServiceError, BadRequestError {
1138
    try {
1139
      update(
1✔
1140
          ctx -> {
1141
            workflow.action(StateMachines.Action.START, ctx, startRequest, 0);
1✔
1142
            if (signalWithStartSignal.isPresent()) {
1✔
1143
              addExecutionSignaledEvent(ctx, signalWithStartSignal.get());
1✔
1144
            }
1145
            int backoffStartIntervalInSeconds = workflow.getData().backoffStartIntervalInSeconds;
1✔
1146
            if (backoffStartIntervalInSeconds > 0) {
1✔
1147
              ctx.addTimer(
1✔
1148
                  backoffStartIntervalInSeconds,
1149
                  () -> {
1150
                    try {
1151
                      update(ctx1 -> scheduleDecision(ctx1));
1✔
1152
                    } catch (EntityNotExistsError e) {
×
1153
                      // Expected as timers are not removed
1154
                    } catch (Exception e) {
×
1155
                      // Cannot fail to timer threads
1156
                      log.error("Failure trying to add task for an delayed workflow retry", e);
×
1157
                    }
1✔
1158
                  },
1✔
1159
                  "delayedFirstDecision");
1160
            } else {
1161
              scheduleDecision(ctx);
1✔
1162
            }
1163

1164
            int executionTimeoutTimerDelay = startRequest.getExecutionStartToCloseTimeoutSeconds();
1✔
1165
            if (backoffStartIntervalInSeconds > 0) {
1✔
1166
              executionTimeoutTimerDelay =
1✔
1167
                  executionTimeoutTimerDelay + backoffStartIntervalInSeconds;
1168
            }
1169
            ctx.addTimer(
1✔
1170
                executionTimeoutTimerDelay, this::timeoutWorkflow, "workflow execution timeout");
1171
          });
1✔
1172
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1173
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1174
    }
1✔
1175
    if (!continuedAsNew && parent.isPresent()) {
1✔
1176
      ChildWorkflowExecutionStartedEventAttributes a =
1✔
1177
          new ChildWorkflowExecutionStartedEventAttributes()
1178
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1179
              .setWorkflowExecution(getExecutionId().getExecution())
1✔
1180
              .setDomain(getExecutionId().getDomain())
1✔
1181
              .setWorkflowType(startRequest.getWorkflowType());
1✔
1182
      ForkJoinPool.commonPool()
1✔
1183
          .execute(
1✔
1184
              () -> {
1185
                try {
1186
                  parent.get().childWorkflowStarted(a);
1✔
1187
                } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1188
                  // Not a problem. Parent might just close by now.
1189
                } catch (BadRequestError | InternalServiceError e) {
×
1190
                  log.error("Failure reporting child completion", e);
×
1191
                }
1✔
1192
              });
1✔
1193
    }
1194
  }
1✔
1195

1196
  private void scheduleDecision(RequestContext ctx) throws InternalServiceError, BadRequestError {
1197
    if (decision != null) {
1✔
1198
      if (decision.getState() == StateMachines.State.INITIATED) {
1✔
1199
        return; // No need to schedule again
1✔
1200
      }
1201
      if (decision.getState() == StateMachines.State.STARTED) {
1✔
1202
        ctx.setNeedDecision(true);
1✔
1203
        return;
1✔
1204
      }
1205
      if (decision.getState() == StateMachines.State.FAILED
1✔
1206
          || decision.getState() == StateMachines.State.COMPLETED
1✔
1207
          || decision.getState() == State.TIMED_OUT) {
1✔
1208
        decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1209
        ctx.lockTimer();
1✔
1210
        return;
1✔
1211
      }
1212
      throw new InternalServiceError("unexpected decision state: " + decision.getState());
×
1213
    }
1214
    this.decision = StateMachines.newDecisionStateMachine(lastNonFailedDecisionStartEventId, store);
1✔
1215
    decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
1✔
1216
    ctx.lockTimer();
1✔
1217
  }
1✔
1218

1219
  @Override
1220
  public void startActivityTask(
1221
      PollForActivityTaskResponse task, PollForActivityTaskRequest pollRequest)
1222
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1223
          BadRequestError {
1224
    update(
1✔
1225
        ctx -> {
1226
          String activityId = task.getActivityId();
1✔
1227
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1228
          activity.action(StateMachines.Action.START, ctx, pollRequest, 0);
1✔
1229
          ActivityTaskData data = activity.getData();
1✔
1230
          int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1231
          int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1232
          if (startToCloseTimeout > 0) {
1✔
1233
            ctx.addTimer(
1✔
1234
                startToCloseTimeout,
1235
                () -> timeoutActivity(activityId, TimeoutType.START_TO_CLOSE),
1✔
1236
                "Activity StartToCloseTimeout");
1237
          }
1238
          updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1239
        });
1✔
1240
  }
1✔
1241

1242
  private void checkCompleted()
1243
      throws EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1244
    State workflowState = workflow.getState();
1✔
1245
    if (isTerminalState(workflowState)) {
1✔
1246
      throw new WorkflowExecutionAlreadyCompletedError(
1✔
1247
          "Workflow is already completed: " + workflowState);
1248
    }
1249
  }
1✔
1250

1251
  private boolean isTerminalState(State workflowState) {
1252
    return workflowState == State.COMPLETED
1✔
1253
        || workflowState == State.TIMED_OUT
1254
        || workflowState == State.FAILED
1255
        || workflowState == State.CANCELED
1256
        || workflowState == State.CONTINUED_AS_NEW;
1257
  }
1258

1259
  private void updateHeartbeatTimer(
1260
      RequestContext ctx,
1261
      String activityId,
1262
      StateMachine<ActivityTaskData> activity,
1263
      int startToCloseTimeout,
1264
      int heartbeatTimeout) {
1265
    if (heartbeatTimeout > 0 && heartbeatTimeout < startToCloseTimeout) {
1✔
1266
      activity.getData().lastHeartbeatTime = clock.getAsLong();
1✔
1267
      ctx.addTimer(
1✔
1268
          heartbeatTimeout,
1269
          () -> timeoutActivity(activityId, TimeoutType.HEARTBEAT),
1✔
1270
          "Activity Heartbeat Timeout");
1271
    }
1272
  }
1✔
1273

1274
  @Override
1275
  public void completeActivityTask(String activityId, RespondActivityTaskCompletedRequest request)
1276
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1277
          BadRequestError {
1278
    update(
1✔
1279
        ctx -> {
1280
          StateMachine<?> activity = getActivity(activityId);
1✔
1281
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1282
          activities.remove(activityId);
1✔
1283
          scheduleDecision(ctx);
1✔
1284
          ctx.unlockTimer();
1✔
1285
        });
1✔
1286
  }
1✔
1287

1288
  @Override
1289
  public void completeActivityTaskById(
1290
      String activityId, RespondActivityTaskCompletedByIDRequest request)
1291
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1292
          BadRequestError {
1293
    update(
1✔
1294
        ctx -> {
1295
          StateMachine<?> activity = getActivity(activityId);
1✔
1296
          activity.action(StateMachines.Action.COMPLETE, ctx, request, 0);
1✔
1297
          activities.remove(activityId);
1✔
1298
          scheduleDecision(ctx);
1✔
1299
          ctx.unlockTimer();
1✔
1300
        });
1✔
1301
  }
1✔
1302

1303
  @Override
1304
  public void failActivityTask(String activityId, RespondActivityTaskFailedRequest request)
1305
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError,
1306
          BadRequestError {
1307
    update(
1✔
1308
        ctx -> {
1309
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1310
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
1✔
1311
          if (isTerminalState(activity.getState())) {
1✔
1312
            activities.remove(activityId);
1✔
1313
            scheduleDecision(ctx);
1✔
1314
          } else {
1315
            addActivityRetryTimer(ctx, activity);
1✔
1316
          }
1317
          // Allow time skipping when waiting for retry
1318
          ctx.unlockTimer();
1✔
1319
        });
1✔
1320
  }
1✔
1321

1322
  private void addActivityRetryTimer(RequestContext ctx, StateMachine<ActivityTaskData> activity) {
1323
    ActivityTaskData data = activity.getData();
1✔
1324
    int attempt = data.retryState.getAttempt();
1✔
1325
    ctx.addTimer(
1✔
1326
        data.nextBackoffIntervalSeconds,
1327
        () -> {
1328
          // Timers are not removed, so skip if it is not for this attempt.
1329
          if (activity.getState() != State.INITIATED && data.retryState.getAttempt() != attempt) {
1✔
1330
            return;
×
1331
          }
1332
          selfAdvancingTimer.lockTimeSkipping(
1✔
1333
              "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
1✔
1334
          boolean unlockTimer = false;
1✔
1335
          try {
1336
            update(ctx1 -> ctx1.addActivityTask(data.activityTask));
1✔
1337
          } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1338
            unlockTimer = true;
×
1339
            // Expected as timers are not removed
1340
          } catch (Exception e) {
×
1341
            unlockTimer = true;
×
1342
            // Cannot fail to timer threads
1343
            log.error("Failure trying to add task for an activity retry", e);
×
1344
          } finally {
1345
            if (unlockTimer) {
1✔
1346
              // Allow time skipping when waiting for an activity retry
1347
              selfAdvancingTimer.unlockTimeSkipping(
×
1348
                  "activityRetryTimer " + activity.getData().scheduledEvent.getActivityId());
×
1349
            }
1350
          }
1351
        },
1✔
1352
        "Activity Retry");
1353
  }
1✔
1354

1355
  @Override
1356
  public void failActivityTaskById(String activityId, RespondActivityTaskFailedByIDRequest request)
1357
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1358
          BadRequestError {
1359
    update(
×
1360
        ctx -> {
1361
          StateMachine<ActivityTaskData> activity = getActivity(activityId);
×
1362
          activity.action(StateMachines.Action.FAIL, ctx, request, 0);
×
1363
          if (isTerminalState(activity.getState())) {
×
1364
            activities.remove(activityId);
×
1365
            scheduleDecision(ctx);
×
1366
          } else {
1367
            addActivityRetryTimer(ctx, activity);
×
1368
          }
1369
          ctx.unlockTimer();
×
1370
        });
×
1371
  }
×
1372

1373
  @Override
1374
  public void cancelActivityTask(String activityId, RespondActivityTaskCanceledRequest request)
1375
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1376
          BadRequestError {
1377
    update(
1✔
1378
        ctx -> {
1379
          StateMachine<?> activity = getActivity(activityId);
1✔
1380
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
1✔
1381
          activities.remove(activityId);
1✔
1382
          scheduleDecision(ctx);
1✔
1383
          ctx.unlockTimer();
1✔
1384
        });
1✔
1385
  }
1✔
1386

1387
  @Override
1388
  public void cancelActivityTaskById(
1389
      String activityId, RespondActivityTaskCanceledByIDRequest request)
1390
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1391
          BadRequestError {
1392
    update(
×
1393
        ctx -> {
1394
          StateMachine<?> activity = getActivity(activityId);
×
1395
          activity.action(StateMachines.Action.CANCEL, ctx, request, 0);
×
1396
          activities.remove(activityId);
×
1397
          scheduleDecision(ctx);
×
1398
          ctx.unlockTimer();
×
1399
        });
×
1400
  }
×
1401

1402
  @Override
1403
  public RecordActivityTaskHeartbeatResponse heartbeatActivityTask(
1404
      String activityId, byte[] details)
1405
      throws InternalServiceError, EntityNotExistsError, WorkflowExecutionAlreadyCompletedError {
1406
    RecordActivityTaskHeartbeatResponse result = new RecordActivityTaskHeartbeatResponse();
1✔
1407
    try {
1408
      update(
1✔
1409
          ctx -> {
1410
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1411
            activity.action(StateMachines.Action.UPDATE, ctx, details, 0);
1✔
1412
            if (activity.getState() == StateMachines.State.CANCELLATION_REQUESTED) {
1✔
1413
              result.setCancelRequested(true);
1✔
1414
            }
1415
            ActivityTaskData data = activity.getData();
1✔
1416
            data.lastHeartbeatTime = clock.getAsLong();
1✔
1417
            int startToCloseTimeout = data.scheduledEvent.getStartToCloseTimeoutSeconds();
1✔
1418
            int heartbeatTimeout = data.scheduledEvent.getHeartbeatTimeoutSeconds();
1✔
1419
            updateHeartbeatTimer(ctx, activityId, activity, startToCloseTimeout, heartbeatTimeout);
1✔
1420
          });
1✔
1421

1422
    } catch (InternalServiceError
×
1423
        | EntityNotExistsError
1424
        | WorkflowExecutionAlreadyCompletedError e) {
1425
      throw e;
×
1426
    } catch (Exception e) {
×
1427
      throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1428
    }
1✔
1429
    return result;
1✔
1430
  }
1431

1432
  private void timeoutActivity(String activityId, TimeoutType timeoutType) {
1433
    boolean unlockTimer = true;
1✔
1434
    try {
1435
      update(
1✔
1436
          ctx -> {
1437
            StateMachine<ActivityTaskData> activity = getActivity(activityId);
1✔
1438
            if (timeoutType == TimeoutType.SCHEDULE_TO_START
1✔
1439
                && activity.getState() != StateMachines.State.INITIATED) {
1✔
1440
              throw new EntityNotExistsError("Not in INITIATED");
×
1441
            }
1442
            if (timeoutType == TimeoutType.HEARTBEAT) {
1✔
1443
              // Deal with timers which are never cancelled
1444
              long heartbeatTimeout =
1✔
1445
                  TimeUnit.SECONDS.toMillis(
1✔
1446
                      activity.getData().scheduledEvent.getHeartbeatTimeoutSeconds());
1✔
1447
              if (clock.getAsLong() - activity.getData().lastHeartbeatTime < heartbeatTimeout) {
1✔
1448
                throw new EntityNotExistsError("Not heartbeat timeout");
×
1449
              }
1450
            }
1451
            activity.action(StateMachines.Action.TIME_OUT, ctx, timeoutType, 0);
1✔
1452
            if (isTerminalState(activity.getState())) {
1✔
1453
              activities.remove(activityId);
1✔
1454
              scheduleDecision(ctx);
1✔
1455
            } else {
1456
              addActivityRetryTimer(ctx, activity);
1✔
1457
            }
1458
          });
1✔
1459
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
1✔
1460
      // Expected as timers are not removed
1461
      unlockTimer = false;
1✔
1462
    } catch (Exception e) {
×
1463
      // Cannot fail to timer threads
1464
      log.error("Failure trying to timeout an activity", e);
×
1465
    } finally {
1466
      if (unlockTimer) {
1✔
1467
        selfAdvancingTimer.unlockTimeSkipping("timeoutActivity " + activityId);
1✔
1468
      }
1469
    }
1470
  }
1✔
1471

1472
  private void timeoutWorkflow() {
1473
    lock.lock();
1✔
1474
    try {
1475
      {
1476
        if (isTerminalState(workflow.getState())) {
1✔
1477
          return;
1✔
1478
        }
1479
      }
1480
    } finally {
1481
      lock.unlock();
1✔
1482
    }
1483
    try {
1484
      update(
1✔
1485
          ctx -> {
1486
            if (isTerminalState(workflow.getState())) {
1✔
1487
              return;
×
1488
            }
1489
            workflow.action(StateMachines.Action.TIME_OUT, ctx, TimeoutType.START_TO_CLOSE, 0);
1✔
1490
            if (parent != null) {
1✔
1491
              ctx.lockTimer(); // unlocked by the parent
1✔
1492
            }
1493
            ForkJoinPool.commonPool().execute(() -> reportWorkflowTimeoutToParent(ctx));
1✔
1494
          });
1✔
1495
    } catch (BadRequestError
×
1496
        | InternalServiceError
1497
        | EntityNotExistsError
1498
        | WorkflowExecutionAlreadyCompletedError e) {
1499
      // Cannot fail to timer threads
1500
      log.error("Failure trying to timeout a workflow", e);
×
1501
    }
1✔
1502
  }
1✔
1503

1504
  private void reportWorkflowTimeoutToParent(RequestContext ctx) {
1505
    if (!parent.isPresent()) {
1✔
1506
      return;
1✔
1507
    }
1508
    try {
1509
      ChildWorkflowExecutionTimedOutEventAttributes a =
1✔
1510
          new ChildWorkflowExecutionTimedOutEventAttributes()
1511
              .setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
1✔
1512
              .setTimeoutType(TimeoutType.START_TO_CLOSE)
1✔
1513
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1514
              .setDomain(ctx.getDomain())
1✔
1515
              .setWorkflowExecution(ctx.getExecution());
1✔
1516
      parent.get().childWorkflowTimedOut(ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
1✔
1517
    } catch (EntityNotExistsError | WorkflowExecutionAlreadyCompletedError e) {
×
1518
      // Parent might already close
1519
    } catch (BadRequestError | InternalServiceError e) {
×
1520
      log.error("Failure reporting child timing out", e);
×
1521
    }
1✔
1522
  }
1✔
1523

1524
  @Override
1525
  public void signal(SignalWorkflowExecutionRequest signalRequest)
1526
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1527
          BadRequestError {
1528
    update(
1✔
1529
        ctx -> {
1530
          addExecutionSignaledEvent(ctx, signalRequest);
1✔
1531
          scheduleDecision(ctx);
1✔
1532
        });
1✔
1533
  }
1✔
1534

1535
  @Override
1536
  public void signalFromWorkflow(SignalExternalWorkflowExecutionDecisionAttributes a)
1537
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1538
          BadRequestError {
1539
    update(
1✔
1540
        ctx -> {
1541
          addExecutionSignaledByExternalEvent(ctx, a);
1✔
1542
          scheduleDecision(ctx);
1✔
1543
        });
1✔
1544
  }
1✔
1545

1546
  @Override
1547
  public void requestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest cancelRequest)
1548
      throws EntityNotExistsError, InternalServiceError, WorkflowExecutionAlreadyCompletedError,
1549
          BadRequestError {
1550
    update(
1✔
1551
        ctx -> {
1552
          workflow.action(StateMachines.Action.REQUEST_CANCELLATION, ctx, cancelRequest, 0);
1✔
1553
          scheduleDecision(ctx);
1✔
1554
        });
1✔
1555
  }
1✔
1556

1557
  @Override
1558
  public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TException {
1559
    QueryId queryId = new QueryId(executionId);
1✔
1560

1561
    Optional<WorkflowExecutionCloseStatus> optCloseStatus = getCloseStatus();
1✔
1562
    if (optCloseStatus.isPresent() && queryRequest.getQueryRejectCondition() != null) {
1✔
1563
      WorkflowExecutionCloseStatus closeStatus = optCloseStatus.get();
1✔
1564
      boolean rejectNotOpen =
1✔
1565
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_OPEN;
1✔
1566
      boolean rejectNotCompletedCleanly =
1✔
1567
          queryRequest.getQueryRejectCondition() == QueryRejectCondition.NOT_COMPLETED_CLEANLY
1✔
1568
              && closeStatus != WorkflowExecutionCloseStatus.COMPLETED;
1569
      if (rejectNotOpen || rejectNotCompletedCleanly) {
1✔
1570
        return new QueryWorkflowResponse()
1✔
1571
            .setQueryRejected(new QueryRejected().setCloseStatus(closeStatus));
1✔
1572
      }
1573
    }
1574

1575
    CompletableFuture<QueryWorkflowResponse> result = new CompletableFuture<>();
1✔
1576
    queries.put(queryId.getQueryId(), result);
1✔
1577

1578
    if (queryRequest.getQueryConsistencyLevel() == QueryConsistencyLevel.STRONG) {
1✔
1579
      pendingQueries.put(queryId.getQueryId(), queryRequest.getQuery());
×
1580
    } else {
1581
      PollForDecisionTaskResponse task =
1✔
1582
          new PollForDecisionTaskResponse()
1583
              .setTaskToken(queryId.toBytes())
1✔
1584
              .setWorkflowExecution(executionId.getExecution())
1✔
1585
              .setWorkflowType(startRequest.getWorkflowType())
1✔
1586
              .setQuery(queryRequest.getQuery())
1✔
1587
              .setWorkflowExecutionTaskList(startRequest.getTaskList());
1✔
1588
      TaskListId taskListId =
1✔
1589
          new TaskListId(
1590
              queryRequest.getDomain(),
1✔
1591
              stickyExecutionAttributes == null
1✔
1592
                  ? startRequest.getTaskList().getName()
1✔
1593
                  : stickyExecutionAttributes.getWorkerTaskList().getName());
1✔
1594
      queryRequests.put(queryId.getQueryId(), task);
1✔
1595
      store.sendQueryTask(executionId, taskListId, task);
1✔
1596
    }
1597

1598
    try {
1599
      return result.get();
1✔
1600
    } catch (InterruptedException e) {
×
1601
      return new QueryWorkflowResponse();
×
1602
    } catch (ExecutionException e) {
×
1603
      Throwable cause = e.getCause();
×
1604
      if (cause instanceof TException) {
×
1605
        throw (TException) cause;
×
1606
      }
1607
      throw new InternalServiceError(Throwables.getStackTraceAsString(cause));
×
1608
    }
1609
  }
1610

1611
  @Override
1612
  public void completeQuery(QueryId queryId, RespondQueryTaskCompletedRequest completeRequest)
1613
      throws EntityNotExistsError {
1614
    CompletableFuture<QueryWorkflowResponse> result = queries.get(queryId.getQueryId());
1✔
1615
    if (result == null) {
1✔
1616
      throw new EntityNotExistsError("Unknown query id: " + queryId.getQueryId());
×
1617
    }
1618
    if (completeRequest.getCompletedType() == QueryTaskCompletedType.COMPLETED) {
1✔
1619
      QueryWorkflowResponse response =
1✔
1620
          new QueryWorkflowResponse().setQueryResult(completeRequest.getQueryResult());
1✔
1621
      result.complete(response);
1✔
1622
    } else if (stickyExecutionAttributes != null) {
1✔
1623
      stickyExecutionAttributes = null;
×
1624
      PollForDecisionTaskResponse task = queryRequests.remove(queryId.getQueryId());
×
1625

1626
      TaskListId taskListId =
×
1627
          new TaskListId(startRequest.getDomain(), startRequest.getTaskList().getName());
×
1628
      store.sendQueryTask(executionId, taskListId, task);
×
1629
    } else {
×
1630
      QueryFailedError error = new QueryFailedError().setMessage(completeRequest.getErrorMessage());
×
1631
      result.completeExceptionally(error);
×
1632
    }
1633
  }
1✔
1634

1635
  private void addExecutionSignaledEvent(
1636
      RequestContext ctx, SignalWorkflowExecutionRequest signalRequest) {
1637
    WorkflowExecutionSignaledEventAttributes a =
1✔
1638
        new WorkflowExecutionSignaledEventAttributes()
1639
            .setInput(startRequest.getInput())
1✔
1640
            .setIdentity(signalRequest.getIdentity())
1✔
1641
            .setInput(signalRequest.getInput())
1✔
1642
            .setSignalName(signalRequest.getSignalName());
1✔
1643
    HistoryEvent executionSignaled =
1✔
1644
        new HistoryEvent()
1645
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1646
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1647
    ctx.addEvent(executionSignaled);
1✔
1648
  }
1✔
1649

1650
  private void addExecutionSignaledByExternalEvent(
1651
      RequestContext ctx, SignalExternalWorkflowExecutionDecisionAttributes d) {
1652
    WorkflowExecutionSignaledEventAttributes a =
1✔
1653
        new WorkflowExecutionSignaledEventAttributes()
1654
            .setInput(startRequest.getInput())
1✔
1655
            .setInput(d.getInput())
1✔
1656
            .setSignalName(d.getSignalName());
1✔
1657
    HistoryEvent executionSignaled =
1✔
1658
        new HistoryEvent()
1659
            .setEventType(EventType.WorkflowExecutionSignaled)
1✔
1660
            .setWorkflowExecutionSignaledEventAttributes(a);
1✔
1661
    ctx.addEvent(executionSignaled);
1✔
1662
  }
1✔
1663

1664
  private StateMachine<ActivityTaskData> getActivity(String activityId)
1665
      throws EntityNotExistsError {
1666
    StateMachine<ActivityTaskData> activity = activities.get(activityId);
1✔
1667
    if (activity == null) {
1✔
1668
      throw new EntityNotExistsError("unknown activityId: " + activityId);
1✔
1669
    }
1670
    return activity;
1✔
1671
  }
1672

1673
  private StateMachine<ChildWorkflowData> getChildWorkflow(long initiatedEventId)
1674
      throws InternalServiceError {
1675
    StateMachine<ChildWorkflowData> child = childWorkflows.get(initiatedEventId);
1✔
1676
    if (child == null) {
1✔
1677
      throw new InternalServiceError("unknown initiatedEventId: " + initiatedEventId);
×
1678
    }
1679
    return child;
1✔
1680
  }
1681

1682
  static class QueryId {
1683

1684
    private final ExecutionId executionId;
1685
    private final String queryId;
1686

1687
    QueryId(ExecutionId executionId) {
1✔
1688
      this.executionId = Objects.requireNonNull(executionId);
1✔
1689
      this.queryId = UUID.randomUUID().toString();
1✔
1690
    }
1✔
1691

1692
    private QueryId(ExecutionId executionId, String queryId) {
1✔
1693
      this.executionId = Objects.requireNonNull(executionId);
1✔
1694
      this.queryId = queryId;
1✔
1695
    }
1✔
1696

1697
    public ExecutionId getExecutionId() {
1698
      return executionId;
1✔
1699
    }
1700

1701
    String getQueryId() {
1702
      return queryId;
1✔
1703
    }
1704

1705
    byte[] toBytes() throws InternalServiceError {
1706
      ByteArrayOutputStream bout = new ByteArrayOutputStream();
1✔
1707
      DataOutputStream out = new DataOutputStream(bout);
1✔
1708
      addBytes(out);
1✔
1709
      return bout.toByteArray();
1✔
1710
    }
1711

1712
    void addBytes(DataOutputStream out) throws InternalServiceError {
1713
      try {
1714
        executionId.addBytes(out);
1✔
1715
        out.writeUTF(queryId);
1✔
1716
      } catch (IOException e) {
×
1717
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1718
      }
1✔
1719
    }
1✔
1720

1721
    static QueryId fromBytes(byte[] serialized) throws InternalServiceError {
1722
      ByteArrayInputStream bin = new ByteArrayInputStream(serialized);
1✔
1723
      DataInputStream in = new DataInputStream(bin);
1✔
1724
      try {
1725
        ExecutionId executionId = ExecutionId.readFromBytes(in);
1✔
1726
        String queryId = in.readUTF();
1✔
1727
        return new QueryId(executionId, queryId);
1✔
1728
      } catch (IOException e) {
×
1729
        throw new InternalServiceError(Throwables.getStackTraceAsString(e));
×
1730
      }
1731
    }
1732
  }
1733
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc