• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 1989

27 Sep 2023 06:06PM UTC coverage: 60.177% (-0.05%) from 60.226%
1989

push

buildkite

web-flow
fixed bug: Added alreadyStarted workflow case (#853)

If we get WorkflowExecutionAlreadyStartedError while starting workflow in new domain, then we don't require startWorkflowInNew domain again as well as not throw error. Hence, we will inform by giving status as "Workflow already started".

8 of 8 new or added lines in 2 files covered. (100.0%)

11326 of 18821 relevant lines covered (60.18%)

0.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.27
/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
21

22
import com.uber.cadence.EventType;
23
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
24
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
25
import com.uber.cadence.History;
26
import com.uber.cadence.HistoryEvent;
27
import com.uber.cadence.PollForDecisionTaskResponse;
28
import com.uber.cadence.QueryResultType;
29
import com.uber.cadence.TimerFiredEventAttributes;
30
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
31
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
32
import com.uber.cadence.WorkflowQuery;
33
import com.uber.cadence.WorkflowQueryResult;
34
import com.uber.cadence.WorkflowType;
35
import com.uber.cadence.common.RetryOptions;
36
import com.uber.cadence.internal.common.OptionsUtils;
37
import com.uber.cadence.internal.common.RpcRetryer;
38
import com.uber.cadence.internal.metrics.MetricsTag;
39
import com.uber.cadence.internal.metrics.MetricsType;
40
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
41
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEventsIterator;
42
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
43
import com.uber.cadence.internal.worker.LocalActivityWorker;
44
import com.uber.cadence.internal.worker.SingleWorkerOptions;
45
import com.uber.cadence.internal.worker.WorkflowExecutionException;
46
import com.uber.cadence.serviceclient.IWorkflowService;
47
import com.uber.cadence.workflow.Functions;
48
import com.uber.m3.tally.Scope;
49
import com.uber.m3.tally.Stopwatch;
50
import com.uber.m3.util.ImmutableMap;
51
import java.time.Duration;
52
import java.util.ArrayList;
53
import java.util.Arrays;
54
import java.util.Iterator;
55
import java.util.List;
56
import java.util.Map;
57
import java.util.Objects;
58
import java.util.concurrent.CancellationException;
59
import java.util.concurrent.TimeUnit;
60
import java.util.concurrent.atomic.AtomicReference;
61
import java.util.concurrent.locks.Lock;
62
import java.util.concurrent.locks.ReentrantLock;
63
import java.util.function.BiFunction;
64
import java.util.function.Consumer;
65
import java.util.stream.Collectors;
66
import org.apache.thrift.TException;
67
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
69

70
/**
71
 * Implements decider that relies on replay of a workflow code. An instance of this class is created
72
 * per decision.
73
 */
74
class ReplayDecider implements Decider {
1✔
75

76
  private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
1✔
77

78
  private static final int MAXIMUM_PAGE_SIZE = 10000;
79

80
  private final DecisionsHelper decisionsHelper;
81
  private final DecisionContextImpl context;
82
  private final IWorkflowService service;
83
  private final ReplayWorkflow workflow;
84
  private boolean cancelRequested;
85
  private boolean completed;
86
  private WorkflowExecutionException failure;
87
  private long wakeUpTime;
88
  private Consumer<Exception> timerCancellationHandler;
89
  private final Scope metricsScope;
90
  private final long wfStartTimeNanos;
91
  private final WorkflowExecutionStartedEventAttributes startedEvent;
92
  private final Lock lock = new ReentrantLock();
1✔
93
  private final Consumer<HistoryEvent> localActivityCompletionSink;
94

95
  ReplayDecider(
96
      IWorkflowService service,
97
      String domain,
98
      WorkflowType workflowType,
99
      ReplayWorkflow workflow,
100
      DecisionsHelper decisionsHelper,
101
      SingleWorkerOptions options,
102
      BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller) {
1✔
103
    this.service = service;
1✔
104
    this.workflow = workflow;
1✔
105
    this.decisionsHelper = decisionsHelper;
1✔
106
    this.metricsScope =
1✔
107
        options
108
            .getMetricsScope()
1✔
109
            .tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType.getName()));
1✔
110
    PollForDecisionTaskResponse decisionTask = decisionsHelper.getTask();
1✔
111

112
    startedEvent =
1✔
113
        decisionTask.getHistory().getEvents().get(0).getWorkflowExecutionStartedEventAttributes();
1✔
114
    if (startedEvent == null) {
1✔
115
      throw new IllegalArgumentException(
1✔
116
          "First event in the history is not WorkflowExecutionStarted");
117
    }
118
    wfStartTimeNanos = decisionTask.getHistory().getEvents().get(0).getTimestamp();
1✔
119

120
    context =
1✔
121
        new DecisionContextImpl(
122
            decisionsHelper, domain, decisionTask, startedEvent, options, laTaskPoller, this);
123
    localActivityCompletionSink =
1✔
124
        historyEvent -> {
125
          lock.lock();
1✔
126
          try {
127
            processEvent(historyEvent);
1✔
128
          } finally {
129
            lock.unlock();
1✔
130
          }
131
        };
1✔
132
  }
1✔
133

134
  Lock getLock() {
135
    return lock;
1✔
136
  }
137

138
  private void handleWorkflowExecutionStarted(HistoryEvent event) {
139
    workflow.start(event, context);
1✔
140
  }
1✔
141

142
  private void processEvent(HistoryEvent event) {
143
    EventType eventType = event.getEventType();
1✔
144
    switch (eventType) {
1✔
145
      case ActivityTaskCanceled:
146
        context.handleActivityTaskCanceled(event);
1✔
147
        break;
1✔
148
      case ActivityTaskCompleted:
149
        context.handleActivityTaskCompleted(event);
1✔
150
        break;
1✔
151
      case ActivityTaskFailed:
152
        context.handleActivityTaskFailed(event);
1✔
153
        break;
1✔
154
      case ActivityTaskStarted:
155
        decisionsHelper.handleActivityTaskStarted(event);
1✔
156
        break;
1✔
157
      case ActivityTaskTimedOut:
158
        context.handleActivityTaskTimedOut(event);
1✔
159
        break;
1✔
160
      case ExternalWorkflowExecutionCancelRequested:
161
        context.handleChildWorkflowExecutionCancelRequested(event);
×
162
        decisionsHelper.handleExternalWorkflowExecutionCancelRequested(event);
×
163
        break;
×
164
      case ChildWorkflowExecutionCanceled:
165
        context.handleChildWorkflowExecutionCanceled(event);
1✔
166
        break;
1✔
167
      case ChildWorkflowExecutionCompleted:
168
        context.handleChildWorkflowExecutionCompleted(event);
1✔
169
        break;
1✔
170
      case ChildWorkflowExecutionFailed:
171
        context.handleChildWorkflowExecutionFailed(event);
1✔
172
        break;
1✔
173
      case ChildWorkflowExecutionStarted:
174
        context.handleChildWorkflowExecutionStarted(event);
1✔
175
        break;
1✔
176
      case ChildWorkflowExecutionTerminated:
177
        context.handleChildWorkflowExecutionTerminated(event);
×
178
        break;
×
179
      case ChildWorkflowExecutionTimedOut:
180
        context.handleChildWorkflowExecutionTimedOut(event);
1✔
181
        break;
1✔
182
      case DecisionTaskCompleted:
183
        // NOOP
184
        break;
×
185
      case DecisionTaskScheduled:
186
        // NOOP
187
        break;
1✔
188
      case DecisionTaskStarted:
189
        throw new IllegalArgumentException("not expected");
×
190
      case DecisionTaskTimedOut:
191
        // Handled in the processEvent(event)
192
        break;
1✔
193
      case ExternalWorkflowExecutionSignaled:
194
        context.handleExternalWorkflowExecutionSignaled(event);
1✔
195
        break;
1✔
196
      case StartChildWorkflowExecutionFailed:
197
        context.handleStartChildWorkflowExecutionFailed(event);
1✔
198
        break;
1✔
199
      case TimerFired:
200
        handleTimerFired(event);
1✔
201
        break;
1✔
202
      case WorkflowExecutionCancelRequested:
203
        handleWorkflowExecutionCancelRequested(event);
1✔
204
        break;
1✔
205
      case WorkflowExecutionSignaled:
206
        handleWorkflowExecutionSignaled(event);
1✔
207
        break;
1✔
208
      case WorkflowExecutionStarted:
209
        handleWorkflowExecutionStarted(event);
1✔
210
        break;
1✔
211
      case WorkflowExecutionTerminated:
212
        // NOOP
213
        break;
×
214
      case WorkflowExecutionTimedOut:
215
        // NOOP
216
        break;
×
217
      case ActivityTaskScheduled:
218
        decisionsHelper.handleActivityTaskScheduled(event);
1✔
219
        break;
1✔
220
      case ActivityTaskCancelRequested:
221
        decisionsHelper.handleActivityTaskCancelRequested(event);
1✔
222
        break;
1✔
223
      case RequestCancelActivityTaskFailed:
224
        decisionsHelper.handleRequestCancelActivityTaskFailed(event);
×
225
        break;
×
226
      case MarkerRecorded:
227
        context.handleMarkerRecorded(event);
1✔
228
        break;
1✔
229
      case WorkflowExecutionCompleted:
230
        break;
1✔
231
      case WorkflowExecutionFailed:
232
        break;
1✔
233
      case WorkflowExecutionCanceled:
234
        break;
×
235
      case WorkflowExecutionContinuedAsNew:
236
        break;
×
237
      case TimerStarted:
238
        decisionsHelper.handleTimerStarted(event);
1✔
239
        break;
1✔
240
      case TimerCanceled:
241
        context.handleTimerCanceled(event);
1✔
242
        break;
1✔
243
      case SignalExternalWorkflowExecutionInitiated:
244
        decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event);
1✔
245
        break;
1✔
246
      case SignalExternalWorkflowExecutionFailed:
247
        context.handleSignalExternalWorkflowExecutionFailed(event);
1✔
248
        break;
1✔
249
      case RequestCancelExternalWorkflowExecutionInitiated:
250
        decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event);
×
251
        break;
×
252
      case RequestCancelExternalWorkflowExecutionFailed:
253
        decisionsHelper.handleRequestCancelExternalWorkflowExecutionFailed(event);
×
254
        break;
×
255
      case StartChildWorkflowExecutionInitiated:
256
        decisionsHelper.handleStartChildWorkflowExecutionInitiated(event);
1✔
257
        break;
1✔
258
      case CancelTimerFailed:
259
        decisionsHelper.handleCancelTimerFailed(event);
×
260
        break;
×
261
      case DecisionTaskFailed:
262
        context.handleDecisionTaskFailed(event);
1✔
263
        break;
1✔
264
      case UpsertWorkflowSearchAttributes:
265
        context.handleUpsertSearchAttributes(event);
1✔
266
        break;
267
    }
268
  }
1✔
269

270
  private void eventLoop() {
271
    if (completed) {
1✔
272
      return;
×
273
    }
274
    try {
275
      completed = workflow.eventLoop();
1✔
276
    } catch (Error e) {
1✔
277
      throw e;
1✔
278
    } catch (WorkflowExecutionException e) {
1✔
279
      failure = e;
1✔
280
      completed = true;
1✔
281
    } catch (CancellationException e) {
1✔
282
      if (!cancelRequested) {
1✔
283
        failure = workflow.mapUnexpectedException(e);
1✔
284
      }
285
      completed = true;
1✔
286
    } catch (Throwable e) {
1✔
287
      // can cast as Error is caught above.
288
      failure = workflow.mapUnexpectedException((Exception) e);
1✔
289
      completed = true;
1✔
290
    }
1✔
291
  }
1✔
292

293
  private void mayBeCompleteWorkflow() {
294
    if (completed) {
1✔
295
      completeWorkflow();
1✔
296
    } else {
297
      updateTimers();
1✔
298
    }
299
  }
1✔
300

301
  private void completeWorkflow() {
302
    if (failure != null) {
1✔
303
      decisionsHelper.failWorkflowExecution(failure);
1✔
304
      metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
305
    } else if (cancelRequested) {
1✔
306
      decisionsHelper.cancelWorkflowExecution();
1✔
307
      metricsScope.counter(MetricsType.WORKFLOW_CANCELLED_COUNTER).inc(1);
1✔
308
    } else {
309
      ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion =
1✔
310
          context.getContinueAsNewOnCompletion();
1✔
311
      if (continueAsNewOnCompletion != null) {
1✔
312
        decisionsHelper.continueAsNewWorkflowExecution(continueAsNewOnCompletion);
1✔
313
        metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
1✔
314
      } else {
315
        byte[] workflowOutput = workflow.getOutput();
1✔
316
        decisionsHelper.completeWorkflowExecution(workflowOutput);
1✔
317
        metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
1✔
318
      }
319
    }
320

321
    long nanoTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
1✔
322
    com.uber.m3.util.Duration d = com.uber.m3.util.Duration.ofNanos(nanoTime - wfStartTimeNanos);
1✔
323
    metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
1✔
324
  }
1✔
325

326
  private void updateTimers() {
327
    long nextWakeUpTime = workflow.getNextWakeUpTime();
1✔
328
    if (nextWakeUpTime == 0) {
1✔
329
      if (timerCancellationHandler != null) {
1✔
330
        timerCancellationHandler.accept(null);
1✔
331
        timerCancellationHandler = null;
1✔
332
      }
333
      wakeUpTime = nextWakeUpTime;
1✔
334
      return;
1✔
335
    }
336
    if (wakeUpTime == nextWakeUpTime && timerCancellationHandler != null) {
1✔
337
      return; // existing timer
1✔
338
    }
339
    long delayMilliseconds = nextWakeUpTime - context.currentTimeMillis();
1✔
340
    if (delayMilliseconds < 0) {
1✔
341
      throw new IllegalStateException("Negative delayMilliseconds=" + delayMilliseconds);
×
342
    }
343
    // Round up to the nearest second as we don't want to deliver a timer
344
    // earlier than requested.
345
    long delaySeconds =
1✔
346
        OptionsUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
1✔
347
    if (timerCancellationHandler != null) {
1✔
348
      timerCancellationHandler.accept(null);
1✔
349
      timerCancellationHandler = null;
1✔
350
    }
351
    wakeUpTime = nextWakeUpTime;
1✔
352
    timerCancellationHandler =
1✔
353
        context.createTimer(
1✔
354
            delaySeconds,
355
            (t) -> {
356
              // Intentionally left empty.
357
              // Timer ensures that decision is scheduled at the time workflow can make progress.
358
              // But no specific timer related action is necessary as Workflow.sleep is just a
359
              // Workflow.await with a time based condition.
360
            });
1✔
361
  }
1✔
362

363
  private void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
364
    context.setCancelRequested(true);
1✔
365
    String cause = event.getWorkflowExecutionCancelRequestedEventAttributes().getCause();
1✔
366
    workflow.cancel(cause);
1✔
367
    cancelRequested = true;
1✔
368
  }
1✔
369

370
  private void handleTimerFired(HistoryEvent event) {
371
    TimerFiredEventAttributes attributes = event.getTimerFiredEventAttributes();
1✔
372
    String timerId = attributes.getTimerId();
1✔
373
    if (timerId.equals(DecisionsHelper.FORCE_IMMEDIATE_DECISION_TIMER)) {
1✔
374
      return;
×
375
    }
376
    context.handleTimerFired(attributes);
1✔
377
  }
1✔
378

379
  private void handleWorkflowExecutionSignaled(HistoryEvent event) {
380
    assert (event.getEventType() == EventType.WorkflowExecutionSignaled);
1✔
381
    final WorkflowExecutionSignaledEventAttributes signalAttributes =
1✔
382
        event.getWorkflowExecutionSignaledEventAttributes();
1✔
383
    if (completed) {
1✔
384
      throw new IllegalStateException("Signal received after workflow is closed.");
×
385
    }
386
    this.workflow.handleSignal(
1✔
387
        signalAttributes.getSignalName(), signalAttributes.getInput(), event.getEventId());
1✔
388
  }
1✔
389

390
  @Override
391
  public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
392
    lock.lock();
1✔
393
    try {
394
      AtomicReference<Map<String, WorkflowQueryResult>> queryResults = new AtomicReference<>();
1✔
395
      boolean forceCreateNewDecisionTask =
1✔
396
          decideImpl(
1✔
397
              decisionTask, () -> queryResults.set(getQueryResults(decisionTask.getQueries())));
1✔
398
      return new DecisionResult(
1✔
399
          decisionsHelper.getDecisions(), queryResults.get(), forceCreateNewDecisionTask);
1✔
400
    } finally {
401
      lock.unlock();
1✔
402
    }
403
  }
404

405
  private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQuery> queries) {
406
    if (queries == null) {
1✔
407
      return null;
×
408
    }
409

410
    return queries
1✔
411
        .entrySet()
1✔
412
        .stream()
1✔
413
        .collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
1✔
414
  }
415

416
  private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
417
    try {
418
      return new WorkflowQueryResult()
×
419
          .setResultType(QueryResultType.ANSWERED)
×
420
          .setAnswer(workflow.query(query));
×
421
    } catch (Throwable e) {
×
422
      return new WorkflowQueryResult()
×
423
          .setResultType(QueryResultType.FAILED)
×
424
          .setErrorMessage(e.getMessage());
×
425
    }
426
  }
427

428
  // Returns boolean to indicate whether we need to force create new decision task for local
429
  // activity heartbeating.
430
  private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)
431
      throws Throwable {
432
    boolean forceCreateNewDecisionTask = false;
1✔
433
    try {
434
      long startTime = System.currentTimeMillis();
1✔
435
      DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator =
1✔
436
          new DecisionTaskWithHistoryIteratorImpl(
437
              decisionTask, Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds()));
1✔
438
      HistoryHelper historyHelper =
1✔
439
          new HistoryHelper(
440
              decisionTaskWithHistoryIterator, context.getReplayCurrentTimeMilliseconds());
1✔
441
      DecisionEventsIterator iterator = historyHelper.getIterator();
1✔
442
      if ((decisionsHelper.getNextDecisionEventId()
1✔
443
              != historyHelper.getPreviousStartedEventId()
1✔
444
                  + 2) // getNextDecisionEventId() skips over completed.
445
          && (decisionsHelper.getNextDecisionEventId() != 0
1✔
446
              && historyHelper.getPreviousStartedEventId() != 0)
1✔
447
          && (decisionTask.getHistory().getEventsSize() > 0)) {
×
448
        throw new IllegalStateException(
×
449
            String.format(
×
450
                "ReplayDecider expects next event id at %d. History's previous started event id is %d",
451
                decisionsHelper.getNextDecisionEventId(),
×
452
                historyHelper.getPreviousStartedEventId()));
×
453
      }
454

455
      while (iterator.hasNext()) {
1✔
456
        DecisionEvents decision = iterator.next();
1✔
457
        context.setReplaying(decision.isReplay());
1✔
458
        context.setReplayCurrentTimeMilliseconds(decision.getReplayCurrentTimeMilliseconds());
1✔
459

460
        decisionsHelper.handleDecisionTaskStartedEvent(decision);
1✔
461
        // Markers must be cached first as their data is needed when processing events.
462
        for (HistoryEvent event : decision.getMarkers()) {
1✔
463
          if (!event
1✔
464
              .getMarkerRecordedEventAttributes()
1✔
465
              .getMarkerName()
1✔
466
              .equals(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)) {
1✔
467
            processEvent(event);
1✔
468
          }
469
        }
1✔
470

471
        for (HistoryEvent event : decision.getEvents()) {
1✔
472
          processEvent(event);
1✔
473
        }
1✔
474

475
        forceCreateNewDecisionTask =
1✔
476
            processEventLoop(
1✔
477
                startTime,
478
                startedEvent.getTaskStartToCloseTimeoutSeconds(),
1✔
479
                decision,
480
                decisionTask.getQuery() != null);
1✔
481

482
        mayBeCompleteWorkflow();
1✔
483
        if (decision.isReplay()) {
1✔
484
          decisionsHelper.notifyDecisionSent();
1✔
485
        }
486
        // Updates state machines with results of the previous decisions
487
        for (HistoryEvent event : decision.getDecisionEvents()) {
1✔
488
          processEvent(event);
1✔
489
        }
1✔
490
        // Reset state to before running the event loop
491
        decisionsHelper.handleDecisionTaskStartedEvent(decision);
1✔
492
      }
1✔
493
      if (forceCreateNewDecisionTask) {
1✔
494
        metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1);
1✔
495
      }
496
      return forceCreateNewDecisionTask;
1✔
497
    } catch (Error e) {
1✔
498
      if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
1✔
499
          == FailWorkflow) {
500
        // fail workflow
501
        failure = workflow.mapError(e);
1✔
502
        completed = true;
1✔
503
        completeWorkflow();
1✔
504
        return false;
1✔
505
      } else {
506
        metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);
1✔
507
        // fail decision, not a workflow
508
        throw e;
1✔
509
      }
510
    } finally {
511
      if (query != null) {
1✔
512
        query.apply();
1✔
513
      }
514
      if (completed) {
1✔
515
        close();
1✔
516
      }
517
    }
518
  }
519

520
  private boolean processEventLoop(
521
      long startTime, int decisionTimeoutSecs, DecisionEvents decision, boolean isQuery)
522
      throws Throwable {
523
    eventLoop();
1✔
524

525
    if (decision.isReplay() || isQuery) {
1✔
526
      return replayLocalActivities(decision);
1✔
527
    } else {
528
      return executeLocalActivities(startTime, decisionTimeoutSecs);
1✔
529
    }
530
  }
531

532
  private boolean replayLocalActivities(DecisionEvents decision) throws Throwable {
533
    List<HistoryEvent> localActivityMarkers = new ArrayList<>();
1✔
534
    for (HistoryEvent event : decision.getMarkers()) {
1✔
535
      if (event
1✔
536
          .getMarkerRecordedEventAttributes()
1✔
537
          .getMarkerName()
1✔
538
          .equals(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)) {
1✔
539
        localActivityMarkers.add(event);
1✔
540
      }
541
    }
1✔
542

543
    if (localActivityMarkers.isEmpty()) {
1✔
544
      return false;
1✔
545
    }
546

547
    int processed = 0;
1✔
548
    while (context.numPendingLaTasks() > 0) {
1✔
549
      int numTasks = context.numPendingLaTasks();
1✔
550
      for (HistoryEvent event : localActivityMarkers) {
1✔
551
        processEvent(event);
1✔
552
      }
1✔
553

554
      eventLoop();
1✔
555

556
      processed += numTasks;
1✔
557
      if (processed == localActivityMarkers.size()) {
1✔
558
        return false;
1✔
559
      }
560
    }
1✔
561
    return false;
1✔
562
  }
563

564
  // Return whether we would need a new decision task immediately.
565
  private boolean executeLocalActivities(long startTime, int decisionTimeoutSecs) {
566
    Duration maxProcessingTime = Duration.ofSeconds((long) (0.8 * decisionTimeoutSecs));
1✔
567

568
    while (context.numPendingLaTasks() > 0) {
1✔
569
      Duration processingTime = Duration.ofMillis(System.currentTimeMillis() - startTime);
1✔
570
      Duration maxWaitAllowed = maxProcessingTime.minus(processingTime);
1✔
571

572
      boolean started = context.startUnstartedLaTasks(maxWaitAllowed);
1✔
573
      if (!started) {
1✔
574
        // We were not able to send the current batch of la tasks before deadline.
575
        // Return true to indicate that we need a new decision task immediately.
576
        return true;
×
577
      }
578

579
      try {
580
        context.awaitTaskCompletion(maxWaitAllowed);
1✔
581
      } catch (InterruptedException e) {
×
582
        return true;
×
583
      }
1✔
584

585
      eventLoop();
1✔
586

587
      if (context.numPendingLaTasks() == 0) {
1✔
588
        return false;
1✔
589
      }
590

591
      // Break local activity processing loop if we almost reach decision task timeout.
592
      processingTime = Duration.ofMillis(System.currentTimeMillis() - startTime);
1✔
593
      if (processingTime.compareTo(maxProcessingTime) > 0) {
1✔
594
        return true;
1✔
595
      }
596
    }
1✔
597
    return false;
1✔
598
  }
599

600
  int getDecisionTimeoutSeconds() {
601
    return startedEvent.getTaskStartToCloseTimeoutSeconds();
1✔
602
  }
603

604
  @Override
605
  public void close() {
606
    lock.lock();
1✔
607
    try {
608
      workflow.close();
1✔
609
    } finally {
610
      lock.unlock();
1✔
611
    }
612
  }
1✔
613

614
  @Override
615
  public byte[] query(PollForDecisionTaskResponse response, WorkflowQuery query) throws Throwable {
616
    lock.lock();
1✔
617
    try {
618
      AtomicReference<byte[]> result = new AtomicReference<>();
1✔
619
      decideImpl(response, () -> result.set(workflow.query(query)));
1✔
620
      return result.get();
1✔
621
    } finally {
622
      lock.unlock();
1✔
623
    }
624
  }
625

626
  public Consumer<HistoryEvent> getLocalActivityCompletionSink() {
627
    return localActivityCompletionSink;
1✔
628
  }
629

630
  private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
631

632
    private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
1✔
633
    private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
1✔
634
    private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
1✔
635
    private Duration decisionTaskStartToCloseTimeout;
636

637
    private final Duration decisionTaskRemainingTime() {
638
      Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
×
639
      return decisionTaskStartToCloseTimeout.minus(passed);
×
640
    }
641

642
    private final PollForDecisionTaskResponse task;
643
    private Iterator<HistoryEvent> current;
644
    private byte[] nextPageToken;
645

646
    DecisionTaskWithHistoryIteratorImpl(
647
        PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) {
1✔
648
      this.task = Objects.requireNonNull(task);
1✔
649
      this.decisionTaskStartToCloseTimeout =
1✔
650
          Objects.requireNonNull(decisionTaskStartToCloseTimeout);
1✔
651

652
      History history = task.getHistory();
1✔
653
      current = history.getEventsIterator();
1✔
654
      nextPageToken = task.getNextPageToken();
1✔
655
    }
1✔
656

657
    @Override
658
    public PollForDecisionTaskResponse getDecisionTask() {
659
      lock.lock();
1✔
660
      try {
661
        return task;
1✔
662
      } finally {
663
        lock.unlock();
1✔
664
      }
665
    }
666

667
    @Override
668
    public Iterator<HistoryEvent> getHistory() {
669
      return new Iterator<HistoryEvent>() {
1✔
670
        @Override
671
        public boolean hasNext() {
672
          return current.hasNext() || nextPageToken != null;
1✔
673
        }
674

675
        @Override
676
        public HistoryEvent next() {
677
          if (current.hasNext()) {
1✔
678
            return current.next();
1✔
679
          }
680

681
          Duration decisionTaskRemainingTime = decisionTaskRemainingTime();
×
682
          if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) {
×
683
            throw new Error(
×
684
                "Decision task timed out while querying history. If this happens consistently please consider "
685
                    + "increase decision task timeout or reduce history size.");
686
          }
687

688
          metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
×
689
          Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
×
690
          RetryOptions retryOptions =
×
691
              new RetryOptions.Builder()
692
                  .setExpiration(decisionTaskRemainingTime)
×
693
                  .setInitialInterval(retryServiceOperationInitialInterval)
×
694
                  .setMaximumInterval(retryServiceOperationMaxInterval)
×
695
                  .build();
×
696

697
          GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
×
698
          request
×
699
              .setDomain(context.getDomain())
×
700
              .setExecution(task.getWorkflowExecution())
×
701
              .setMaximumPageSize(MAXIMUM_PAGE_SIZE)
×
702
              .setNextPageToken(nextPageToken);
×
703

704
          try {
705
            GetWorkflowExecutionHistoryResponse r =
×
706
                RpcRetryer.retryWithResult(
×
707
                    retryOptions, () -> service.GetWorkflowExecutionHistory(request));
×
708
            current = r.getHistory().getEventsIterator();
×
709
            nextPageToken = r.getNextPageToken();
×
710
            metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_SUCCEED_COUNTER).inc(1);
×
711
            sw.stop();
×
712
          } catch (TException e) {
×
713
            metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_FAILED_COUNTER).inc(1);
×
714
            throw new Error(e);
×
715
          }
×
716
          if (!current.hasNext()) {
×
717
            log.error(
×
718
                "GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
719
                    + request.execution.workflowId
720
                    + ", runID:"
721
                    + request.execution.runId
722
                    + ", domain:"
723
                    + request.domain
724
                    + " token:"
725
                    + Arrays.toString(request.getNextPageToken()));
×
726
            throw new Error(
×
727
                "GetWorkflowExecutionHistory return empty history, maybe a bug in server");
728
          }
729
          return current.next();
×
730
        }
731
      };
732
    }
733
  }
734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc