• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

uber / cadence-java-client / 2490

13 Aug 2024 05:39PM CUT coverage: 61.99% (-0.03%) from 62.021%
2490

push

buildkite

web-flow
Removing fossa as it is migrated to snyk (#919)

12090 of 19503 relevant lines covered (61.99%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.27
/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java
1
/*
2
 *  Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3
 *
4
 *  Modifications copyright (C) 2017 Uber Technologies, Inc.
5
 *
6
 *  Licensed under the Apache License, Version 2.0 (the "License"). You may not
7
 *  use this file except in compliance with the License. A copy of the License is
8
 *  located at
9
 *
10
 *  http://aws.amazon.com/apache2.0
11
 *
12
 *  or in the "license" file accompanying this file. This file is distributed on
13
 *  an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
 *  express or implied. See the License for the specific language governing
15
 *  permissions and limitations under the License.
16
 */
17

18
package com.uber.cadence.internal.replay;
19

20
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
21

22
import com.uber.cadence.EventType;
23
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
24
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
25
import com.uber.cadence.History;
26
import com.uber.cadence.HistoryEvent;
27
import com.uber.cadence.PollForDecisionTaskResponse;
28
import com.uber.cadence.QueryResultType;
29
import com.uber.cadence.TimerFiredEventAttributes;
30
import com.uber.cadence.WorkflowExecutionSignaledEventAttributes;
31
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
32
import com.uber.cadence.WorkflowQuery;
33
import com.uber.cadence.WorkflowQueryResult;
34
import com.uber.cadence.WorkflowType;
35
import com.uber.cadence.common.RetryOptions;
36
import com.uber.cadence.internal.common.OptionsUtils;
37
import com.uber.cadence.internal.common.RpcRetryer;
38
import com.uber.cadence.internal.metrics.MetricsTag;
39
import com.uber.cadence.internal.metrics.MetricsType;
40
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
41
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEventsIterator;
42
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
43
import com.uber.cadence.internal.worker.LocalActivityWorker;
44
import com.uber.cadence.internal.worker.SingleWorkerOptions;
45
import com.uber.cadence.internal.worker.WorkflowExecutionException;
46
import com.uber.cadence.serviceclient.IWorkflowService;
47
import com.uber.cadence.workflow.Functions;
48
import com.uber.m3.tally.Scope;
49
import com.uber.m3.tally.Stopwatch;
50
import com.uber.m3.util.ImmutableMap;
51
import java.time.Duration;
52
import java.util.ArrayList;
53
import java.util.Arrays;
54
import java.util.Iterator;
55
import java.util.List;
56
import java.util.Map;
57
import java.util.Objects;
58
import java.util.concurrent.CancellationException;
59
import java.util.concurrent.TimeUnit;
60
import java.util.concurrent.atomic.AtomicReference;
61
import java.util.concurrent.locks.Lock;
62
import java.util.concurrent.locks.ReentrantLock;
63
import java.util.function.BiFunction;
64
import java.util.function.Consumer;
65
import java.util.stream.Collectors;
66
import org.apache.thrift.TException;
67
import org.slf4j.Logger;
68
import org.slf4j.LoggerFactory;
69

70
/**
71
 * Implements decider that relies on replay of a workflow code. An instance of this class is created
72
 * per decision.
73
 */
74
class ReplayDecider implements Decider {
1✔
75

76
  private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
1✔
77

78
  private static final int MAXIMUM_PAGE_SIZE = 10000;
79

80
  private final DecisionsHelper decisionsHelper;
81
  private final DecisionContextImpl context;
82
  private final IWorkflowService service;
83
  private final ReplayWorkflow workflow;
84
  private boolean cancelRequested;
85
  private boolean completed;
86
  private WorkflowExecutionException failure;
87
  private long wakeUpTime;
88
  private Consumer<Exception> timerCancellationHandler;
89
  private final Scope metricsScope;
90
  private final long wfStartTimeNanos;
91
  private final WorkflowExecutionStartedEventAttributes startedEvent;
92
  private final Lock lock = new ReentrantLock();
1✔
93
  private final Consumer<HistoryEvent> localActivityCompletionSink;
94

95
  ReplayDecider(
96
      IWorkflowService service,
97
      String domain,
98
      WorkflowType workflowType,
99
      ReplayWorkflow workflow,
100
      DecisionsHelper decisionsHelper,
101
      SingleWorkerOptions options,
102
      BiFunction<LocalActivityWorker.Task, Duration, Boolean> laTaskPoller) {
1✔
103
    this.service = service;
1✔
104
    this.workflow = workflow;
1✔
105
    this.decisionsHelper = decisionsHelper;
1✔
106
    this.metricsScope =
1✔
107
        options
108
            .getMetricsScope()
1✔
109
            .tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType.getName()));
1✔
110
    PollForDecisionTaskResponse decisionTask = decisionsHelper.getTask();
1✔
111

112
    startedEvent =
1✔
113
        decisionTask.getHistory().getEvents().get(0).getWorkflowExecutionStartedEventAttributes();
1✔
114
    if (startedEvent == null) {
1✔
115
      throw new IllegalArgumentException(
1✔
116
          "First event in the history is not WorkflowExecutionStarted");
117
    }
118
    wfStartTimeNanos = decisionTask.getHistory().getEvents().get(0).getTimestamp();
1✔
119

120
    context =
1✔
121
        new DecisionContextImpl(
122
            decisionsHelper, domain, decisionTask, startedEvent, options, laTaskPoller, this);
123
    localActivityCompletionSink =
1✔
124
        historyEvent -> {
125
          lock.lock();
1✔
126
          try {
127
            processEvent(historyEvent);
1✔
128
          } finally {
129
            lock.unlock();
1✔
130
          }
131
        };
1✔
132
  }
1✔
133

134
  Lock getLock() {
135
    return lock;
1✔
136
  }
137

138
  private void handleWorkflowExecutionStarted(HistoryEvent event) {
139
    workflow.start(event, context);
1✔
140
  }
1✔
141

142
  private void processEvent(HistoryEvent event) {
143
    EventType eventType = event.getEventType();
1✔
144
    switch (eventType) {
1✔
145
      case ActivityTaskCanceled:
146
        context.handleActivityTaskCanceled(event);
1✔
147
        break;
1✔
148
      case ActivityTaskCompleted:
149
        context.handleActivityTaskCompleted(event);
1✔
150
        break;
1✔
151
      case ActivityTaskFailed:
152
        context.handleActivityTaskFailed(event);
1✔
153
        break;
1✔
154
      case ActivityTaskStarted:
155
        decisionsHelper.handleActivityTaskStarted(event);
1✔
156
        break;
1✔
157
      case ActivityTaskTimedOut:
158
        context.handleActivityTaskTimedOut(event);
1✔
159
        break;
1✔
160
      case ExternalWorkflowExecutionCancelRequested:
161
        context.handleChildWorkflowExecutionCancelRequested(event);
×
162
        decisionsHelper.handleExternalWorkflowExecutionCancelRequested(event);
×
163
        break;
×
164
      case ChildWorkflowExecutionCanceled:
165
        context.handleChildWorkflowExecutionCanceled(event);
1✔
166
        break;
1✔
167
      case ChildWorkflowExecutionCompleted:
168
        context.handleChildWorkflowExecutionCompleted(event);
1✔
169
        break;
1✔
170
      case ChildWorkflowExecutionFailed:
171
        context.handleChildWorkflowExecutionFailed(event);
1✔
172
        break;
1✔
173
      case ChildWorkflowExecutionStarted:
174
        context.handleChildWorkflowExecutionStarted(event);
1✔
175
        break;
1✔
176
      case ChildWorkflowExecutionTerminated:
177
        context.handleChildWorkflowExecutionTerminated(event);
×
178
        break;
×
179
      case ChildWorkflowExecutionTimedOut:
180
        context.handleChildWorkflowExecutionTimedOut(event);
1✔
181
        break;
1✔
182
      case DecisionTaskCompleted:
183
        // NOOP
184
        break;
×
185
      case DecisionTaskScheduled:
186
        // NOOP
187
        break;
1✔
188
      case DecisionTaskStarted:
189
        throw new IllegalArgumentException("not expected");
×
190
      case DecisionTaskTimedOut:
191
        // Handled in the processEvent(event)
192
        break;
1✔
193
      case ExternalWorkflowExecutionSignaled:
194
        context.handleExternalWorkflowExecutionSignaled(event);
1✔
195
        break;
1✔
196
      case StartChildWorkflowExecutionFailed:
197
        context.handleStartChildWorkflowExecutionFailed(event);
1✔
198
        break;
1✔
199
      case TimerFired:
200
        handleTimerFired(event);
1✔
201
        break;
1✔
202
      case WorkflowExecutionCancelRequested:
203
        handleWorkflowExecutionCancelRequested(event);
1✔
204
        break;
1✔
205
      case WorkflowExecutionSignaled:
206
        handleWorkflowExecutionSignaled(event);
1✔
207
        break;
1✔
208
      case WorkflowExecutionStarted:
209
        handleWorkflowExecutionStarted(event);
1✔
210
        break;
1✔
211
      case WorkflowExecutionTerminated:
212
        // NOOP
213
        break;
×
214
      case WorkflowExecutionTimedOut:
215
        // NOOP
216
        break;
×
217
      case ActivityTaskScheduled:
218
        decisionsHelper.handleActivityTaskScheduled(event);
1✔
219
        break;
1✔
220
      case ActivityTaskCancelRequested:
221
        decisionsHelper.handleActivityTaskCancelRequested(event);
1✔
222
        break;
1✔
223
      case RequestCancelActivityTaskFailed:
224
        decisionsHelper.handleRequestCancelActivityTaskFailed(event);
×
225
        break;
×
226
      case MarkerRecorded:
227
        context.handleMarkerRecorded(event);
1✔
228
        break;
1✔
229
      case WorkflowExecutionCompleted:
230
        break;
1✔
231
      case WorkflowExecutionFailed:
232
        break;
1✔
233
      case WorkflowExecutionCanceled:
234
        break;
×
235
      case WorkflowExecutionContinuedAsNew:
236
        break;
×
237
      case TimerStarted:
238
        decisionsHelper.handleTimerStarted(event);
1✔
239
        break;
1✔
240
      case TimerCanceled:
241
        context.handleTimerCanceled(event);
1✔
242
        break;
1✔
243
      case SignalExternalWorkflowExecutionInitiated:
244
        decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event);
1✔
245
        break;
1✔
246
      case SignalExternalWorkflowExecutionFailed:
247
        context.handleSignalExternalWorkflowExecutionFailed(event);
1✔
248
        break;
1✔
249
      case RequestCancelExternalWorkflowExecutionInitiated:
250
        decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event);
×
251
        break;
×
252
      case RequestCancelExternalWorkflowExecutionFailed:
253
        decisionsHelper.handleRequestCancelExternalWorkflowExecutionFailed(event);
×
254
        break;
×
255
      case StartChildWorkflowExecutionInitiated:
256
        decisionsHelper.handleStartChildWorkflowExecutionInitiated(event);
1✔
257
        break;
1✔
258
      case CancelTimerFailed:
259
        decisionsHelper.handleCancelTimerFailed(event);
×
260
        break;
×
261
      case DecisionTaskFailed:
262
        context.handleDecisionTaskFailed(event);
1✔
263
        break;
1✔
264
      case UpsertWorkflowSearchAttributes:
265
        context.handleUpsertSearchAttributes(event);
1✔
266
        break;
267
    }
268
  }
1✔
269

270
  private void eventLoop() {
271
    if (completed) {
1✔
272
      return;
×
273
    }
274
    try {
275
      completed = workflow.eventLoop();
1✔
276
    } catch (Error e) {
1✔
277
      throw e;
1✔
278
    } catch (WorkflowExecutionException e) {
1✔
279
      failure = e;
1✔
280
      completed = true;
1✔
281
    } catch (CancellationException e) {
1✔
282
      if (!cancelRequested) {
1✔
283
        failure = workflow.mapUnexpectedException(e);
1✔
284
      }
285
      completed = true;
1✔
286
    } catch (Throwable e) {
1✔
287
      // can cast as Error is caught above.
288
      failure = workflow.mapUnexpectedException((Exception) e);
1✔
289
      completed = true;
1✔
290
    }
1✔
291
  }
1✔
292

293
  private void mayBeCompleteWorkflow() {
294
    if (completed) {
1✔
295
      completeWorkflow();
1✔
296
    } else {
297
      updateTimers();
1✔
298
    }
299
  }
1✔
300

301
  private void completeWorkflow() {
302
    if (failure != null) {
1✔
303
      decisionsHelper.failWorkflowExecution(failure);
1✔
304
      metricsScope.counter(MetricsType.WORKFLOW_FAILED_COUNTER).inc(1);
1✔
305
    } else if (cancelRequested) {
1✔
306
      decisionsHelper.cancelWorkflowExecution();
1✔
307
      metricsScope.counter(MetricsType.WORKFLOW_CANCELLED_COUNTER).inc(1);
1✔
308
    } else {
309
      ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion =
1✔
310
          context.getContinueAsNewOnCompletion();
1✔
311
      if (continueAsNewOnCompletion != null) {
1✔
312
        decisionsHelper.continueAsNewWorkflowExecution(continueAsNewOnCompletion);
1✔
313
        metricsScope.counter(MetricsType.WORKFLOW_CONTINUE_AS_NEW_COUNTER).inc(1);
1✔
314
      } else {
315
        byte[] workflowOutput = workflow.getOutput();
1✔
316
        decisionsHelper.completeWorkflowExecution(workflowOutput);
1✔
317
        metricsScope.counter(MetricsType.WORKFLOW_COMPLETED_COUNTER).inc(1);
1✔
318
      }
319
    }
320

321
    long nanoTime = TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
1✔
322
    com.uber.m3.util.Duration d = com.uber.m3.util.Duration.ofNanos(nanoTime - wfStartTimeNanos);
1✔
323
    metricsScope.timer(MetricsType.WORKFLOW_E2E_LATENCY).record(d);
1✔
324
  }
1✔
325

326
  private void updateTimers() {
327
    long nextWakeUpTime = workflow.getNextWakeUpTime();
1✔
328
    if (nextWakeUpTime == 0) {
1✔
329
      if (timerCancellationHandler != null) {
1✔
330
        timerCancellationHandler.accept(null);
1✔
331
        timerCancellationHandler = null;
1✔
332
      }
333
      wakeUpTime = nextWakeUpTime;
1✔
334
      return;
1✔
335
    }
336
    if (wakeUpTime == nextWakeUpTime && timerCancellationHandler != null) {
1✔
337
      return; // existing timer
1✔
338
    }
339
    long delayMilliseconds = nextWakeUpTime - context.currentTimeMillis();
1✔
340
    if (delayMilliseconds < 0) {
1✔
341
      throw new IllegalStateException("Negative delayMilliseconds=" + delayMilliseconds);
×
342
    }
343
    // Round up to the nearest second as we don't want to deliver a timer
344
    // earlier than requested.
345
    long delaySeconds =
1✔
346
        OptionsUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
1✔
347
    if (timerCancellationHandler != null) {
1✔
348
      timerCancellationHandler.accept(null);
1✔
349
      timerCancellationHandler = null;
1✔
350
    }
351
    wakeUpTime = nextWakeUpTime;
1✔
352
    timerCancellationHandler =
1✔
353
        context.createTimer(
1✔
354
            delaySeconds,
355
            (t) -> {
356
              // Intentionally left empty.
357
              // Timer ensures that decision is scheduled at the time workflow can make progress.
358
              // But no specific timer related action is necessary as Workflow.sleep is just a
359
              // Workflow.await with a time based condition.
360
            });
1✔
361
  }
1✔
362

363
  private void handleWorkflowExecutionCancelRequested(HistoryEvent event) {
364
    context.setCancelRequested(true);
1✔
365
    String cause = event.getWorkflowExecutionCancelRequestedEventAttributes().getCause();
1✔
366
    workflow.cancel(cause);
1✔
367
    cancelRequested = true;
1✔
368
  }
1✔
369

370
  private void handleTimerFired(HistoryEvent event) {
371
    TimerFiredEventAttributes attributes = event.getTimerFiredEventAttributes();
1✔
372
    String timerId = attributes.getTimerId();
1✔
373
    if (timerId.equals(DecisionsHelper.FORCE_IMMEDIATE_DECISION_TIMER)) {
1✔
374
      return;
×
375
    }
376
    context.handleTimerFired(attributes);
1✔
377
  }
1✔
378

379
  private void handleWorkflowExecutionSignaled(HistoryEvent event) {
380
    assert (event.getEventType() == EventType.WorkflowExecutionSignaled);
1✔
381
    final WorkflowExecutionSignaledEventAttributes signalAttributes =
1✔
382
        event.getWorkflowExecutionSignaledEventAttributes();
1✔
383
    if (completed) {
1✔
384
      throw new IllegalStateException("Signal received after workflow is closed.");
×
385
    }
386
    this.workflow.handleSignal(
1✔
387
        signalAttributes.getSignalName(), signalAttributes.getInput(), event.getEventId());
1✔
388
  }
1✔
389

390
  @Override
391
  public DecisionResult decide(PollForDecisionTaskResponse decisionTask) throws Throwable {
392
    lock.lock();
1✔
393
    try {
394
      AtomicReference<Map<String, WorkflowQueryResult>> queryResults = new AtomicReference<>();
1✔
395
      boolean forceCreateNewDecisionTask =
1✔
396
          decideImpl(
1✔
397
              decisionTask, () -> queryResults.set(getQueryResults(decisionTask.getQueries())));
1✔
398
      return new DecisionResult(
1✔
399
          decisionsHelper.getDecisions(), queryResults.get(), forceCreateNewDecisionTask);
1✔
400
    } finally {
401
      lock.unlock();
1✔
402
    }
403
  }
404

405
  private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQuery> queries) {
406
    if (queries == null) {
1✔
407
      return null;
×
408
    }
409

410
    return queries
1✔
411
        .entrySet()
1✔
412
        .stream()
1✔
413
        .collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
1✔
414
  }
415

416
  private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
417
    try {
418
      return new WorkflowQueryResult()
×
419
          .setResultType(QueryResultType.ANSWERED)
×
420
          .setAnswer(workflow.query(query));
×
421
    } catch (Throwable e) {
×
422
      return new WorkflowQueryResult()
×
423
          .setResultType(QueryResultType.FAILED)
×
424
          .setErrorMessage(e.getMessage());
×
425
    }
426
  }
427

428
  // Returns boolean to indicate whether we need to force create new decision task for local
429
  // activity heartbeating.
430
  private boolean decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc query)
431
      throws Throwable {
432
    boolean forceCreateNewDecisionTask = false;
1✔
433
    try {
434
      long startTime = System.currentTimeMillis();
1✔
435
      DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator =
1✔
436
          new DecisionTaskWithHistoryIteratorImpl(
437
              decisionTask, Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds()));
1✔
438
      HistoryHelper historyHelper =
1✔
439
          new HistoryHelper(
440
              decisionTaskWithHistoryIterator, context.getReplayCurrentTimeMilliseconds());
1✔
441
      DecisionEventsIterator iterator = historyHelper.getIterator();
1✔
442
      if ((decisionsHelper.getNextDecisionEventId()
1✔
443
              != historyHelper.getPreviousStartedEventId()
1✔
444
                  + 2) // getNextDecisionEventId() skips over completed.
445
          && (decisionsHelper.getNextDecisionEventId() != 0
1✔
446
              && historyHelper.getPreviousStartedEventId() != 0)
1✔
447
          && (decisionTask.getHistory().getEventsSize() > 0)) {
×
448
        throw new IllegalStateException(
×
449
            String.format(
×
450
                "ReplayDecider expects next event id at %d. History's previous started event id is %d",
451
                decisionsHelper.getNextDecisionEventId(),
×
452
                historyHelper.getPreviousStartedEventId()));
×
453
      }
454

455
      while (iterator.hasNext()) {
1✔
456
        DecisionEvents decision = iterator.next();
1✔
457
        context.setReplaying(decision.isReplay());
1✔
458
        context.setReplayCurrentTimeMilliseconds(decision.getReplayCurrentTimeMilliseconds());
1✔
459

460
        decisionsHelper.handleDecisionTaskStartedEvent(decision);
1✔
461
        // Markers must be cached first as their data is needed when processing events.
462
        for (HistoryEvent event : decision.getMarkers()) {
1✔
463
          if (!event
1✔
464
              .getMarkerRecordedEventAttributes()
1✔
465
              .getMarkerName()
1✔
466
              .equals(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)) {
1✔
467
            processEvent(event);
1✔
468
          }
469
        }
1✔
470

471
        for (HistoryEvent event : decision.getEvents()) {
1✔
472
          processEvent(event);
1✔
473
        }
1✔
474

475
        forceCreateNewDecisionTask =
1✔
476
            processEventLoop(
1✔
477
                startTime,
478
                startedEvent.getTaskStartToCloseTimeoutSeconds(),
1✔
479
                decision,
480
                decisionTask.getQuery() != null);
1✔
481

482
        mayBeCompleteWorkflow();
1✔
483
        if (decision.isReplay()) {
1✔
484
          decisionsHelper.notifyDecisionSent();
1✔
485
        }
486
        // Updates state machines with results of the previous decisions
487
        for (HistoryEvent event : decision.getDecisionEvents()) {
1✔
488
          processEvent(event);
1✔
489
        }
1✔
490
        // Reset state to before running the event loop
491
        decisionsHelper.handleDecisionTaskStartedEvent(decision);
1✔
492
      }
1✔
493
      if (forceCreateNewDecisionTask) {
1✔
494
        metricsScope.counter(MetricsType.DECISION_TASK_FORCE_COMPLETED).inc(1);
1✔
495
      }
496
      return forceCreateNewDecisionTask;
1✔
497
    } catch (Error e) {
1✔
498
      if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
1✔
499
          == FailWorkflow) {
500
        // fail workflow
501
        failure = workflow.mapError(e);
1✔
502
        completed = true;
1✔
503
        completeWorkflow();
1✔
504
        return false;
1✔
505
      } else {
506
        metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);
1✔
507
        // fail decision, not a workflow
508
        throw e;
1✔
509
      }
510
    } finally {
511
      if (query != null) {
1✔
512
        query.apply();
1✔
513
      }
514
      if (completed) {
1✔
515
        close();
1✔
516
      }
517
    }
518
  }
519

520
  private boolean processEventLoop(
521
      long startTime, int decisionTimeoutSecs, DecisionEvents decision, boolean isQuery)
522
      throws Throwable {
523
    eventLoop();
1✔
524

525
    if (decision.isReplay() || isQuery) {
1✔
526
      return replayLocalActivities(decision);
1✔
527
    } else {
528
      return executeLocalActivities(startTime, decisionTimeoutSecs);
1✔
529
    }
530
  }
531

532
  private boolean replayLocalActivities(DecisionEvents decision) throws Throwable {
533
    List<HistoryEvent> localActivityMarkers = new ArrayList<>();
1✔
534
    for (HistoryEvent event : decision.getMarkers()) {
1✔
535
      if (event
1✔
536
          .getMarkerRecordedEventAttributes()
1✔
537
          .getMarkerName()
1✔
538
          .equals(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)) {
1✔
539
        localActivityMarkers.add(event);
1✔
540
      }
541
    }
1✔
542

543
    if (localActivityMarkers.isEmpty()) {
1✔
544
      return false;
1✔
545
    }
546

547
    int processed = 0;
1✔
548
    while (context.numPendingLaTasks() > 0) {
1✔
549
      int numTasks = context.numPendingLaTasks();
1✔
550
      for (HistoryEvent event : localActivityMarkers) {
1✔
551
        processEvent(event);
1✔
552
      }
1✔
553

554
      eventLoop();
1✔
555

556
      processed += numTasks;
1✔
557
      if (processed == localActivityMarkers.size()) {
1✔
558
        return false;
1✔
559
      }
560
    }
1✔
561
    return false;
1✔
562
  }
563

564
  // Return whether we would need a new decision task immediately.
565
  private boolean executeLocalActivities(long startTime, int decisionTimeoutSecs) {
566
    Duration maxProcessingTime = Duration.ofSeconds((long) (0.8 * decisionTimeoutSecs));
1✔
567

568
    while (context.numPendingLaTasks() > 0) {
1✔
569
      Duration processingTime = Duration.ofMillis(System.currentTimeMillis() - startTime);
1✔
570
      Duration maxWaitAllowed = maxProcessingTime.minus(processingTime);
1✔
571

572
      boolean started = context.startUnstartedLaTasks(maxWaitAllowed);
1✔
573
      if (!started) {
1✔
574
        // We were not able to send the current batch of la tasks before deadline.
575
        // Return true to indicate that we need a new decision task immediately.
576
        return true;
×
577
      }
578

579
      try {
580
        context.awaitTaskCompletion(maxWaitAllowed);
1✔
581
      } catch (InterruptedException e) {
×
582
        return true;
×
583
      }
1✔
584

585
      eventLoop();
1✔
586

587
      if (context.numPendingLaTasks() == 0) {
1✔
588
        return false;
1✔
589
      }
590

591
      // Break local activity processing loop if we almost reach decision task timeout.
592
      processingTime = Duration.ofMillis(System.currentTimeMillis() - startTime);
1✔
593
      if (processingTime.compareTo(maxProcessingTime) > 0) {
1✔
594
        return true;
1✔
595
      }
596
    }
1✔
597
    return false;
1✔
598
  }
599

600
  int getDecisionTimeoutSeconds() {
601
    return startedEvent.getTaskStartToCloseTimeoutSeconds();
1✔
602
  }
603

604
  @Override
605
  public void close() {
606
    lock.lock();
1✔
607
    try {
608
      workflow.close();
1✔
609
    } finally {
610
      lock.unlock();
1✔
611
    }
612
  }
1✔
613

614
  @Override
615
  public byte[] query(PollForDecisionTaskResponse response, WorkflowQuery query) throws Throwable {
616
    lock.lock();
1✔
617
    try {
618
      AtomicReference<byte[]> result = new AtomicReference<>();
1✔
619
      decideImpl(response, () -> result.set(workflow.query(query)));
1✔
620
      return result.get();
1✔
621
    } finally {
622
      lock.unlock();
1✔
623
    }
624
  }
625

626
  public Consumer<HistoryEvent> getLocalActivityCompletionSink() {
627
    return localActivityCompletionSink;
1✔
628
  }
629

630
  private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHistoryIterator {
631

632
    private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
1✔
633
    private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
1✔
634
    private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
1✔
635
    private Duration decisionTaskStartToCloseTimeout;
636

637
    private final Duration decisionTaskRemainingTime() {
638
      Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
×
639
      return decisionTaskStartToCloseTimeout.minus(passed);
×
640
    }
641

642
    private final PollForDecisionTaskResponse task;
643
    private Iterator<HistoryEvent> current;
644
    private byte[] nextPageToken;
645

646
    DecisionTaskWithHistoryIteratorImpl(
647
        PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) {
1✔
648
      this.task = Objects.requireNonNull(task);
1✔
649
      this.decisionTaskStartToCloseTimeout =
1✔
650
          Objects.requireNonNull(decisionTaskStartToCloseTimeout);
1✔
651

652
      History history = task.getHistory();
1✔
653
      current = history.getEventsIterator();
1✔
654
      nextPageToken = task.getNextPageToken();
1✔
655
    }
1✔
656

657
    @Override
658
    public PollForDecisionTaskResponse getDecisionTask() {
659
      lock.lock();
1✔
660
      try {
661
        return task;
1✔
662
      } finally {
663
        lock.unlock();
1✔
664
      }
665
    }
666

667
    @Override
668
    public Iterator<HistoryEvent> getHistory() {
669
      return new Iterator<HistoryEvent>() {
1✔
670
        @Override
671
        public boolean hasNext() {
672
          return current.hasNext() || nextPageToken != null;
1✔
673
        }
674

675
        @Override
676
        public HistoryEvent next() {
677
          if (current.hasNext()) {
1✔
678
            return current.next();
1✔
679
          }
680

681
          Duration decisionTaskRemainingTime = decisionTaskRemainingTime();
×
682
          if (decisionTaskRemainingTime.isNegative() || decisionTaskRemainingTime.isZero()) {
×
683
            throw new Error(
×
684
                "Decision task timed out while querying history. If this happens consistently please consider "
685
                    + "increase decision task timeout or reduce history size.");
686
          }
687

688
          metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_COUNTER).inc(1);
×
689
          Stopwatch sw = metricsScope.timer(MetricsType.WORKFLOW_GET_HISTORY_LATENCY).start();
×
690
          RetryOptions retryOptions =
×
691
              new RetryOptions.Builder()
692
                  .setExpiration(decisionTaskRemainingTime)
×
693
                  .setInitialInterval(retryServiceOperationInitialInterval)
×
694
                  .setMaximumInterval(retryServiceOperationMaxInterval)
×
695
                  .build();
×
696

697
          GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
×
698
          request
×
699
              .setDomain(context.getDomain())
×
700
              .setExecution(task.getWorkflowExecution())
×
701
              .setMaximumPageSize(MAXIMUM_PAGE_SIZE)
×
702
              .setNextPageToken(nextPageToken);
×
703

704
          try {
705
            GetWorkflowExecutionHistoryResponse r =
×
706
                RpcRetryer.retryWithResult(
×
707
                    retryOptions, () -> service.GetWorkflowExecutionHistory(request));
×
708
            current = r.getHistory().getEventsIterator();
×
709
            nextPageToken = r.getNextPageToken();
×
710
            metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_SUCCEED_COUNTER).inc(1);
×
711
            sw.stop();
×
712
          } catch (TException e) {
×
713
            metricsScope.counter(MetricsType.WORKFLOW_GET_HISTORY_FAILED_COUNTER).inc(1);
×
714
            throw new Error(e);
×
715
          }
×
716
          if (!current.hasNext()) {
×
717
            log.error(
×
718
                "GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
719
                    + request.execution.workflowId
720
                    + ", runID:"
721
                    + request.execution.runId
722
                    + ", domain:"
723
                    + request.domain
724
                    + " token:"
725
                    + Arrays.toString(request.getNextPageToken()));
×
726
            throw new Error(
×
727
                "GetWorkflowExecutionHistory return empty history, maybe a bug in server");
728
          }
729
          return current.next();
×
730
        }
731
      };
732
    }
733
  }
734
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc