• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #103

pending completion
#103

push

github-actions

web-flow
Implement retry of local activities for over local retry threshold duration (#1542)

Issue #1261

244 of 244 new or added lines in 16 files covered. (100.0%)

16122 of 19841 relevant lines covered (81.26%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.15
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.worker.LocalActivityResult.failed;
24

25
import com.google.common.base.Preconditions;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.tally.Stopwatch;
28
import com.uber.m3.util.ImmutableMap;
29
import io.grpc.Deadline;
30
import io.temporal.api.enums.v1.RetryState;
31
import io.temporal.api.enums.v1.TimeoutType;
32
import io.temporal.api.failure.v1.Failure;
33
import io.temporal.api.failure.v1.TimeoutFailureInfo;
34
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
36
import io.temporal.common.RetryOptions;
37
import io.temporal.failure.ApplicationFailure;
38
import io.temporal.failure.FailureConverter;
39
import io.temporal.internal.common.ProtobufTimeUtils;
40
import io.temporal.internal.common.RetryOptionsUtils;
41
import io.temporal.internal.logging.LoggerTag;
42
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
43
import io.temporal.serviceclient.MetricsTag;
44
import io.temporal.worker.MetricsType;
45
import io.temporal.worker.WorkerMetricsTag;
46
import io.temporal.workflow.Functions;
47
import java.time.Duration;
48
import java.util.Objects;
49
import java.util.concurrent.*;
50
import javax.annotation.Nonnull;
51
import javax.annotation.Nullable;
52
import org.slf4j.Logger;
53
import org.slf4j.LoggerFactory;
54
import org.slf4j.MDC;
55

56
final class LocalActivityWorker implements SuspendableWorker {
57
  private static final Logger log = LoggerFactory.getLogger(LocalActivityWorker.class);
1✔
58

59
  // RETRY_STATE_IN_PROGRESS shows that it's not the end
60
  // for this local activity execution from the workflow point of view.
61
  // It's also not conflicting with any other situations
62
  // and uniquely identifies the reach of the local retries
63
  // and a need to schedule a timer.
64
  private static final RetryState LOCAL_RETRY_LIMIT_RETRY_STATE =
1✔
65
      RetryState.RETRY_STATE_IN_PROGRESS;
66

67
  private final ActivityTaskHandler handler;
68
  private final String namespace;
69
  private final String taskQueue;
70
  private final ScheduledExecutorService scheduledExecutor;
71

72
  private final SingleWorkerOptions options;
73

74
  private static final int QUEUE_SIZE = 1000;
75
  private final BlockingQueue<LocalActivityAttemptTask> pendingTasks =
1✔
76
      new ArrayBlockingQueue<>(QUEUE_SIZE);
77
  private final LocalActivityPollTask laPollTask;
78
  private final LocalActivityDispatcherImpl laScheduler;
79

80
  private final PollerOptions pollerOptions;
81
  private final Scope workerMetricsScope;
82

83
  @Nonnull private SuspendableWorker poller = new NoopSuspendableWorker();
1✔
84

85
  public LocalActivityWorker(
86
      @Nonnull String namespace,
87
      @Nonnull String taskQueue,
88
      @Nonnull SingleWorkerOptions options,
89
      @Nonnull ActivityTaskHandler handler) {
1✔
90
    this.namespace = Objects.requireNonNull(namespace);
1✔
91
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
92
    this.scheduledExecutor =
1✔
93
        Executors.newSingleThreadScheduledExecutor(
1✔
94
            r -> {
95
              Thread thread = new Thread(r);
1✔
96
              thread.setName(
1✔
97
                  WorkerThreadsNameHelper.getLocalActivitySchedulerThreadPrefix(
1✔
98
                      namespace, taskQueue));
99
              return thread;
1✔
100
            });
101
    this.handler = handler;
1✔
102
    this.laPollTask = new LocalActivityPollTask(pendingTasks);
1✔
103
    this.laScheduler = new LocalActivityDispatcherImpl();
1✔
104
    this.options = Objects.requireNonNull(options);
1✔
105
    this.pollerOptions = getPollerOptions(options);
1✔
106
    this.workerMetricsScope =
1✔
107
        MetricsTag.tagged(
1✔
108
            options.getMetricsScope(), WorkerMetricsTag.WorkerType.LOCAL_ACTIVITY_WORKER);
1✔
109
  }
1✔
110

111
  @Override
112
  public void start() {
113
    if (handler.isAnyTypeSupported()) {
1✔
114
      PollTaskExecutor<LocalActivityAttemptTask> pollTaskExecutor =
1✔
115
          new PollTaskExecutor<>(
116
              namespace,
117
              taskQueue,
118
              options.getIdentity(),
1✔
119
              new TaskHandlerImpl(handler),
120
              pollerOptions,
121
              options.getTaskExecutorThreadPoolSize(),
1✔
122
              workerMetricsScope);
123
      poller =
1✔
124
          new Poller<>(
125
              options.getIdentity(),
1✔
126
              laPollTask,
127
              pollTaskExecutor,
128
              pollerOptions,
129
              workerMetricsScope);
130
      poller.start();
1✔
131
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
132
    }
133
  }
1✔
134

135
  private boolean submitAnAttempt(LocalActivityAttemptTask task) {
136
    try {
137
      @Nullable
138
      Duration scheduleToStartTimeout = task.getExecutionContext().getScheduleToStartTimeout();
1✔
139
      task.getAttemptTask().setCurrentAttemptScheduledTime(ProtobufTimeUtils.getCurrentProtoTime());
1✔
140
      boolean accepted =
141
          scheduleToStartTimeout != null
1✔
142
              ? pendingTasks.offer(task, scheduleToStartTimeout.toMillis(), TimeUnit.MILLISECONDS)
1✔
143
              : pendingTasks.offer(task);
1✔
144
      if (accepted) {
1✔
145
        log.trace("LocalActivity queued: {}", task.getActivityId());
1✔
146
      } else {
147
        log.trace(
×
148
            "LocalActivity queue submitting timed out for activity {}, scheduleToStartTimeout: {}",
149
            task.getActivityId(),
×
150
            scheduleToStartTimeout);
151
      }
152
      return accepted;
1✔
153
    } catch (InterruptedException e) {
×
154
      Thread.currentThread().interrupt();
×
155
      return false;
×
156
    }
157
  }
158

159
  /**
160
   * @param executionContext execution context of the activity
161
   * @param activityTask activity task
162
   * @param attemptThrowable exception happened during the activity attempt. Can be null.
163
   * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
164
   *     applicable
165
   */
166
  @Nonnull
167
  private RetryDecision shouldRetry(
168
      LocalActivityExecutionContext executionContext,
169
      PollActivityTaskQueueResponseOrBuilder activityTask,
170
      @Nullable Throwable attemptThrowable) {
171
    int currentAttempt = activityTask.getAttempt();
1✔
172

173
    if (isNonRetryableApplicationFailure(attemptThrowable)) {
1✔
174
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
1✔
175
    }
176

177
    if (attemptThrowable instanceof Error) {
1✔
178
      // TODO Error inside Local Activity shouldn't be failing the local activity call.
179
      //  Instead we should fail Workflow Task. Implement a special flag for that in the result.
180
      //          task.callback(executionFailed(activityHandlerResult,
181
      // RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, currentAttempt));
182
      // don't just swallow Error from activities, propagate it to the top
183
      throw (Error) attemptThrowable;
×
184
    }
185

186
    if (isRetryPolicyNotSet(activityTask)) {
1✔
187
      return new RetryDecision(RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, null);
×
188
    }
189

190
    RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
191

192
    if (RetryOptionsUtils.isNotRetryable(retryOptions, attemptThrowable)) {
1✔
193
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
×
194
    }
195

196
    if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
1✔
197
      return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
1✔
198
    }
199

200
    long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
201
    Duration sleep = Duration.ofMillis(sleepMillis);
1✔
202
    if (RetryOptionsUtils.isDeadlineReached(
1✔
203
        executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
204
      return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
1✔
205
    }
206

207
    if (sleep.compareTo(executionContext.getLocalRetryThreshold()) > 0) {
1✔
208
      return new RetryDecision(LOCAL_RETRY_LIMIT_RETRY_STATE, sleep);
1✔
209
    }
210

211
    return new RetryDecision(sleep);
1✔
212
  }
213

214
  /**
215
   * @param executionContext execution context of the activity
216
   * @param backoff delay time in milliseconds to the next attempt
217
   * @param failure if supplied, it will be used to override {@link
218
   *     LocalActivityExecutionContext#getLastAttemptFailure()}
219
   */
220
  private void scheduleNextAttempt(
221
      LocalActivityExecutionContext executionContext,
222
      @Nonnull Duration backoff,
223
      @Nullable Failure failure) {
224
    PollActivityTaskQueueResponse.Builder nextActivityTask =
1✔
225
        executionContext.getNextAttemptActivityTask(failure);
1✔
226
    LocalActivityAttemptTask task =
1✔
227
        new LocalActivityAttemptTask(executionContext, nextActivityTask);
228
    Deadline.after(backoff.toMillis(), TimeUnit.MILLISECONDS)
1✔
229
        .runOnExpiration(new LocalActivityRetryHandler(task), scheduledExecutor);
1✔
230
  }
1✔
231

232
  private class LocalActivityDispatcherImpl implements LocalActivityDispatcher {
1✔
233
    @Override
234
    public boolean dispatch(
235
        ExecuteLocalActivityParameters params,
236
        Functions.Proc1<LocalActivityResult> resultCallback) {
237
      long passedFromOriginalSchedulingMs =
238
          System.currentTimeMillis() - params.getOriginalScheduledTimestamp();
1✔
239
      Duration scheduleToCloseTimeout = params.getScheduleToCloseTimeout();
1✔
240
      Deadline scheduleToCloseDeadline = null;
1✔
241
      if (scheduleToCloseTimeout != null) {
1✔
242
        scheduleToCloseDeadline =
1✔
243
            Deadline.after(
1✔
244
                scheduleToCloseTimeout.toMillis() - passedFromOriginalSchedulingMs,
1✔
245
                TimeUnit.MILLISECONDS);
246
      }
247

248
      LocalActivityExecutionContext executionContext =
1✔
249
          new LocalActivityExecutionContext(params, resultCallback, scheduleToCloseDeadline);
250

251
      PollActivityTaskQueueResponse.Builder activityTask = executionContext.getInitialTask();
1✔
252
      LocalActivityAttemptTask task = new LocalActivityAttemptTask(executionContext, activityTask);
1✔
253

254
      boolean retryIsNotAllowed = failIfRetryIsNotAllowedByNewPolicy(task);
1✔
255
      if (retryIsNotAllowed) {
1✔
256
        return true;
1✔
257
      }
258

259
      boolean accepted = submitAnAttempt(task);
1✔
260
      if (accepted) {
1✔
261
        if (scheduleToCloseDeadline != null) {
1✔
262
          ScheduledFuture<?> scheduledScheduleToClose =
1✔
263
              scheduledExecutor.schedule(
1✔
264
                  new ScheduleToCloseTimeoutHandler(executionContext),
265
                  scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
1✔
266
                  TimeUnit.MILLISECONDS);
267
          executionContext.setScheduleToCloseFuture(scheduledScheduleToClose);
1✔
268
        }
269
      }
270

271
      return accepted;
1✔
272
    }
273

274
    /**
275
     * @param task local activity retry attempt task specifying the retry we are about to schedule
276
     * @return true if the retry attempt specified by {@code task} is not allowed by the current
277
     *     retry policy and the error was submitted in the callback, false otherwise
278
     */
279
    private boolean failIfRetryIsNotAllowedByNewPolicy(LocalActivityAttemptTask task) {
280
      LocalActivityExecutionContext executionContext = task.getExecutionContext();
1✔
281
      final Failure previousExecutionFailure = executionContext.getPreviousExecutionFailure();
1✔
282
      if (previousExecutionFailure != null) {
1✔
283
        // This is not an original local execution, it's a continuation using a workflow timer.
284
        // We should verify if the RetryOptions currently supplied in the workflow still allow the
285
        // retry.
286
        // If not, we need to recreate the same structure of an error like it would happen before we
287
        // started to sleep on the timer, at the end of the previous local execution.
288
        RetryState retryState =
1✔
289
            shouldStillRetry(executionContext, task.getAttemptTask(), previousExecutionFailure);
1✔
290
        if (!RetryState.RETRY_STATE_IN_PROGRESS.equals(retryState)) {
1✔
291
          Failure failure;
292
          if (RetryState.RETRY_STATE_TIMEOUT.equals(retryState)) {
1✔
293
            if (previousExecutionFailure.hasTimeoutFailureInfo()
×
294
                && TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE.equals(
×
295
                    previousExecutionFailure.getTimeoutFailureInfo().getTimeoutType())) {
×
296
              // This scenario should behave the same way as a startToClose timeout happening and
297
              // encountering
298
              // RetryState#TIMEOUT during calculation of the next attempt (which is effectively a
299
              // scheduleToClose
300
              // timeout).
301
              // See how StartToCloseTimeoutHandler or
302
              // io.temporal.internal.testservice.StateMachines#timeoutActivityTask
303
              // discard startToClose in this case and replaces it with scheduleToClose
304
              failure =
×
305
                  newTimeoutFailure(
×
306
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
307
                      previousExecutionFailure.getCause());
×
308
            } else {
309
              failure =
×
310
                  newTimeoutFailure(
×
311
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, previousExecutionFailure);
312
            }
313
          } else {
314
            failure = previousExecutionFailure;
1✔
315
          }
316

317
          executionContext.callback(
1✔
318
              failed(
1✔
319
                  task.getActivityId(),
1✔
320
                  task.getAttemptTask().getAttempt(),
1✔
321
                  retryState,
322
                  failure,
323
                  null));
324
          return true;
1✔
325
        }
326
      }
327
      return false;
1✔
328
    }
329

330
    /**
331
     * @param executionContext execution context of the activity
332
     * @param activityTask activity task
333
     * @param previousLocalExecutionFailure failure happened during previous local activity
334
     *     execution. Can be null.
335
     * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
336
     *     applicable
337
     */
338
    @Nonnull
339
    private RetryState shouldStillRetry(
340
        LocalActivityExecutionContext executionContext,
341
        PollActivityTaskQueueResponseOrBuilder activityTask,
342
        @Nullable Failure previousLocalExecutionFailure) {
343
      int currentAttempt = activityTask.getAttempt();
1✔
344

345
      if (isRetryPolicyNotSet(activityTask)) {
1✔
346
        return RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET;
×
347
      }
348

349
      RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
350

351
      if (previousLocalExecutionFailure != null
1✔
352
          && previousLocalExecutionFailure.hasApplicationFailureInfo()
1✔
353
          && RetryOptionsUtils.isNotRetryable(
×
354
              retryOptions, previousLocalExecutionFailure.getApplicationFailureInfo().getType())) {
×
355
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
×
356
      }
357

358
      // The current attempt didn't happen yet in this check, that's why -1
359
      if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt - 1)) {
1✔
360
        return RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED;
1✔
361
      }
362

363
      long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
364
      if (RetryOptionsUtils.isDeadlineReached(
1✔
365
          executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
366
        return RetryState.RETRY_STATE_TIMEOUT;
×
367
      }
368

369
      return RetryState.RETRY_STATE_IN_PROGRESS;
1✔
370
    }
371
  }
372

373
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<LocalActivityAttemptTask> {
374

375
    private final ActivityTaskHandler handler;
376

377
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
378
      this.handler = handler;
1✔
379
    }
1✔
380

381
    @Override
382
    public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
383
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
384
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
385
      String activityId = activityTask.getActivityId();
1✔
386

387
      Scope metricsScope =
1✔
388
          workerMetricsScope.tagged(
1✔
389
              ImmutableMap.of(
1✔
390
                  MetricsTag.ACTIVITY_TYPE,
391
                  activityTask.getActivityType().getName(),
1✔
392
                  MetricsTag.WORKFLOW_TYPE,
393
                  activityTask.getWorkflowType().getName()));
1✔
394

395
      int currentAttempt = activityTask.getAttempt();
1✔
396

397
      MDC.put(LoggerTag.ACTIVITY_ID, activityId);
1✔
398
      MDC.put(LoggerTag.ACTIVITY_TYPE, activityTask.getActivityType().getName());
1✔
399
      MDC.put(LoggerTag.WORKFLOW_ID, activityTask.getWorkflowExecution().getWorkflowId());
1✔
400
      MDC.put(LoggerTag.WORKFLOW_TYPE, activityTask.getWorkflowType().getName());
1✔
401
      MDC.put(LoggerTag.RUN_ID, activityTask.getWorkflowExecution().getRunId());
1✔
402
      try {
403
        ScheduledFuture<?> startToCloseTimeoutFuture = null;
1✔
404

405
        if (activityTask.hasStartToCloseTimeout()) {
1✔
406
          startToCloseTimeoutFuture =
1✔
407
              scheduledExecutor.schedule(
1✔
408
                  new StartToCloseTimeoutHandler(attemptTask),
409
                  ProtobufTimeUtils.toJavaDuration(
1✔
410
                          attemptTask.getAttemptTask().getStartToCloseTimeout())
1✔
411
                      .toMillis(),
1✔
412
                  TimeUnit.MILLISECONDS);
413
        }
414

415
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
416

417
        ActivityTaskHandler.Result activityHandlerResult;
418
        Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
419
        try {
420
          activityHandlerResult =
1✔
421
              handler.handle(new ActivityTask(activityTask, () -> {}), metricsScope, true);
1✔
422
        } finally {
423
          sw.stop();
1✔
424
        }
425

426
        // Making sure that the result handling code following this statement is mutual exclusive
427
        // with the start to close timeout handler.
428
        boolean startToCloseTimeoutFired =
1✔
429
            startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);
1✔
430

431
        if (startToCloseTimeoutFired) {
1✔
432
          // If start to close timeout fired, the result of this activity execution should be
433
          // discarded.
434
          // Scheduling of the next attempt is taken care by the StartToCloseTimeoutHandler.
435
          return;
1✔
436
        }
437

438
        if (activityHandlerResult.getTaskCompleted() != null) {
1✔
439
          com.uber.m3.util.Duration e2eDuration =
440
              com.uber.m3.util.Duration.ofMillis(
1✔
441
                  System.currentTimeMillis() - executionContext.getOriginalScheduledTimestamp());
1✔
442
          metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
443
          executionContext.callback(
1✔
444
              LocalActivityResult.completed(activityHandlerResult, currentAttempt));
1✔
445
          return;
1✔
446
        }
447

448
        if (activityHandlerResult.getTaskCanceled() != null) {
1✔
449
          executionContext.callback(
×
450
              LocalActivityResult.cancelled(activityHandlerResult, currentAttempt));
×
451
          return;
×
452
        }
453

454
        Preconditions.checkState(
1✔
455
            activityHandlerResult.getTaskFailed() != null,
1✔
456
            "One of taskCompleted, taskCanceled or taskFailed must be set");
457

458
        Failure executionFailure =
1✔
459
            activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
1✔
460

461
        RetryDecision retryDecision =
1✔
462
            shouldRetry(
1✔
463
                executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure());
1✔
464
        if (retryDecision.doNextAttempt()) {
1✔
465
          scheduleNextAttempt(
1✔
466
              executionContext,
467
              Objects.requireNonNull(
1✔
468
                  retryDecision.nextAttemptBackoff,
1✔
469
                  "nextAttemptBackoff is expected to not be null"),
470
              executionFailure);
471
        } else {
472
          executionContext.callback(
1✔
473
              failed(
1✔
474
                  activityId,
475
                  currentAttempt,
476
                  retryDecision.retryState,
1✔
477
                  executionFailure,
478
                  retryDecision.nextAttemptBackoff));
1✔
479
        }
480

481
      } catch (Throwable ex) {
×
482
        // handleLocalActivity is expected to never throw an exception and return a result
483
        // that can be used for a workflow callback if this method throws, it's a bug.
484
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
×
485
        Failure failure = FailureConverter.exceptionToFailure(ex);
×
486
        executionContext.callback(
×
487
            failed(
×
488
                activityId,
489
                currentAttempt,
490
                RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR,
491
                failure,
492
                null));
493
        throw ex;
×
494
      } finally {
495
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
496
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
497
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
498
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
499
        MDC.remove(LoggerTag.RUN_ID);
1✔
500
      }
501
    }
1✔
502

503
    @Override
504
    public Throwable wrapFailure(LocalActivityAttemptTask task, Throwable failure) {
505
      return new RuntimeException("Failure processing local activity task.", failure);
×
506
    }
507
  }
508

509
  private class LocalActivityRetryHandler implements Runnable {
510
    private final LocalActivityAttemptTask localActivityAttemptTask;
511

512
    private LocalActivityRetryHandler(LocalActivityAttemptTask localActivityAttemptTask) {
1✔
513
      this.localActivityAttemptTask = localActivityAttemptTask;
1✔
514
    }
1✔
515

516
    @Override
517
    public void run() {
518
      submitAnAttempt(localActivityAttemptTask);
1✔
519
    }
1✔
520
  }
521

522
  private static class ScheduleToCloseTimeoutHandler implements Runnable {
523
    private final LocalActivityExecutionContext executionContext;
524

525
    private ScheduleToCloseTimeoutHandler(LocalActivityExecutionContext executionContext) {
1✔
526
      this.executionContext = executionContext;
1✔
527
    }
1✔
528

529
    @Override
530
    public void run() {
531
      executionContext.callback(
1✔
532
          failed(
1✔
533
              executionContext.getActivityId(),
1✔
534
              executionContext.getCurrentAttempt(),
1✔
535
              RetryState.RETRY_STATE_TIMEOUT,
536
              newTimeoutFailure(
1✔
537
                  TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
538
                  executionContext.getLastAttemptFailure()),
1✔
539
              null));
540
    }
1✔
541
  }
542

543
  private class StartToCloseTimeoutHandler implements Runnable {
544
    private final LocalActivityAttemptTask attemptTask;
545

546
    private StartToCloseTimeoutHandler(LocalActivityAttemptTask attemptTask) {
1✔
547
      this.attemptTask = attemptTask;
1✔
548
    }
1✔
549

550
    @Override
551
    public void run() {
552
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
553
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
554
      String activityId = activityTask.getActivityId();
1✔
555

556
      int timingOutAttempt = activityTask.getAttempt();
1✔
557

558
      RetryDecision retryDecision = shouldRetry(executionContext, activityTask, null);
1✔
559
      if (retryDecision.doNextAttempt()) {
1✔
560
        scheduleNextAttempt(
1✔
561
            executionContext,
562
            Objects.requireNonNull(
1✔
563
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
564
            newTimeoutFailure(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, null));
1✔
565
      } else {
566
        // RetryState.RETRY_STATE_TIMEOUT happens only when scheduleToClose is fired
567
        // scheduleToClose timeout is effectively replacing the original startToClose
568
        TimeoutType timeoutType =
569
            RetryState.RETRY_STATE_TIMEOUT.equals(retryDecision.retryState)
1✔
570
                ? TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
1✔
571
                : TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE;
1✔
572
        executionContext.callback(
1✔
573
            failed(
1✔
574
                activityId,
575
                timingOutAttempt,
576
                retryDecision.retryState,
1✔
577
                newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
578
                retryDecision.nextAttemptBackoff));
1✔
579
      }
580
    }
1✔
581
  }
582

583
  public boolean isAnyTypeSupported() {
584
    return handler.isAnyTypeSupported();
×
585
  }
586

587
  @Override
588
  public boolean isStarted() {
589
    return poller.isStarted();
×
590
  }
591

592
  @Override
593
  public boolean isShutdown() {
594
    return poller.isShutdown();
×
595
  }
596

597
  @Override
598
  public boolean isTerminated() {
599
    return poller.isTerminated() && scheduledExecutor.isTerminated();
×
600
  }
601

602
  @Override
603
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
604
    return poller
1✔
605
        .shutdown(shutdownManager, interruptTasks)
1✔
606
        .thenCompose(
1✔
607
            r ->
608
                shutdownManager.shutdownExecutor(
1✔
609
                    scheduledExecutor, this + "#scheduledExecutor", Duration.ofSeconds(1)))
1✔
610
        .exceptionally(
1✔
611
            e -> {
612
              log.error("[BUG] Unexpected exception during shutdown", e);
×
613
              return null;
×
614
            });
615
  }
616

617
  @Override
618
  public void awaitTermination(long timeout, TimeUnit unit) {
619
    long timeoutMillis = unit.toMillis(timeout);
1✔
620
    timeoutMillis = ShutdownManager.awaitTermination(poller, timeoutMillis);
1✔
621
    ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
1✔
622
  }
1✔
623

624
  @Override
625
  public void suspendPolling() {
626
    poller.suspendPolling();
1✔
627
  }
1✔
628

629
  @Override
630
  public void resumePolling() {
631
    poller.resumePolling();
1✔
632
  }
1✔
633

634
  @Override
635
  public boolean isSuspended() {
636
    return poller.isSuspended();
1✔
637
  }
638

639
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
640
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
641
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
642
      pollerOptions =
1✔
643
          PollerOptions.newBuilder(pollerOptions)
1✔
644
              .setPollThreadNamePrefix(
1✔
645
                  WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
1✔
646
              .build();
1✔
647
    }
648
    return pollerOptions;
1✔
649
  }
650

651
  public LocalActivityDispatcher getLocalActivityScheduler() {
652
    return laScheduler;
1✔
653
  }
654

655
  private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {
656
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
657
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
658
    if (cause != null) {
1✔
659
      result.setCause(cause);
1✔
660
    }
661
    return result.build();
1✔
662
  }
663

664
  private static boolean isRetryPolicyNotSet(
665
      PollActivityTaskQueueResponseOrBuilder pollActivityTask) {
666
    return !pollActivityTask.hasScheduleToCloseTimeout()
1✔
667
        && (!pollActivityTask.hasRetryPolicy()
1✔
668
            || pollActivityTask.getRetryPolicy().getMaximumAttempts() <= 0);
1✔
669
  }
670

671
  private static boolean isNonRetryableApplicationFailure(@Nullable Throwable executionThrowable) {
672
    return executionThrowable instanceof ApplicationFailure
1✔
673
        && ((ApplicationFailure) executionThrowable).isNonRetryable();
1✔
674
  }
675

676
  private static class RetryDecision {
677
    private final @Nullable RetryState retryState;
678
    private final @Nullable Duration nextAttemptBackoff;
679

680
    // No next local attempts
681
    public RetryDecision(@Nonnull RetryState retryState, @Nullable Duration nextAttemptBackoff) {
1✔
682
      this.retryState = retryState;
1✔
683
      this.nextAttemptBackoff = nextAttemptBackoff;
1✔
684
    }
1✔
685

686
    // Do the next attempt
687
    public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
1✔
688
      this.retryState = null;
1✔
689
      this.nextAttemptBackoff = Objects.requireNonNull(nextAttemptBackoff);
1✔
690
    }
1✔
691

692
    public boolean doNextAttempt() {
693
      return retryState == null;
1✔
694
    }
695
  }
696
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc