• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.54
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.worker.LocalActivityResult.failed;
24
import static io.temporal.internal.worker.LocalActivityResult.processingFailed;
25

26
import com.google.common.base.Preconditions;
27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.tally.Stopwatch;
29
import com.uber.m3.util.ImmutableMap;
30
import io.grpc.Deadline;
31
import io.temporal.api.enums.v1.RetryState;
32
import io.temporal.api.enums.v1.TimeoutType;
33
import io.temporal.api.failure.v1.Failure;
34
import io.temporal.api.failure.v1.TimeoutFailureInfo;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
37
import io.temporal.common.RetryOptions;
38
import io.temporal.failure.ApplicationFailure;
39
import io.temporal.internal.activity.ActivityPollResponseToInfo;
40
import io.temporal.internal.common.ProtobufTimeUtils;
41
import io.temporal.internal.common.RetryOptionsUtils;
42
import io.temporal.internal.logging.LoggerTag;
43
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
44
import io.temporal.serviceclient.MetricsTag;
45
import io.temporal.worker.MetricsType;
46
import io.temporal.worker.WorkerMetricsTag;
47
import io.temporal.worker.tuning.*;
48
import io.temporal.workflow.Functions;
49
import java.time.Duration;
50
import java.util.Objects;
51
import java.util.Optional;
52
import java.util.concurrent.*;
53
import javax.annotation.Nonnull;
54
import javax.annotation.Nullable;
55
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
57
import org.slf4j.MDC;
58

59
final class LocalActivityWorker implements Startable, Shutdownable {
60
  private static final Logger log = LoggerFactory.getLogger(LocalActivityWorker.class);
1✔
61

62
  private final ActivityTaskHandler handler;
63
  private final String namespace;
64
  private final String taskQueue;
65

66
  private final SingleWorkerOptions options;
67

68
  private final LocalActivityDispatcherImpl laScheduler;
69

70
  private final PollerOptions pollerOptions;
71
  private final Scope workerMetricsScope;
72

73
  private ScheduledExecutorService scheduledExecutor;
74
  private PollTaskExecutor<LocalActivityAttemptTask> activityAttemptTaskExecutor;
75
  private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
76
  private final LocalActivitySlotSupplierQueue slotQueue;
77

78
  public LocalActivityWorker(
79
      @Nonnull String namespace,
80
      @Nonnull String taskQueue,
81
      @Nonnull SingleWorkerOptions options,
82
      @Nonnull ActivityTaskHandler handler,
83
      @Nonnull SlotSupplier<LocalActivitySlotInfo> slotSupplier) {
1✔
84
    this.namespace = Objects.requireNonNull(namespace);
1✔
85
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
86
    this.handler = handler;
1✔
87
    this.options = Objects.requireNonNull(options);
1✔
88
    this.pollerOptions = getPollerOptions(options);
1✔
89
    this.workerMetricsScope =
1✔
90
        MetricsTag.tagged(
1✔
91
            options.getMetricsScope(), WorkerMetricsTag.WorkerType.LOCAL_ACTIVITY_WORKER);
1✔
92
    this.slotSupplier =
1✔
93
        new TrackingSlotSupplier<>(Objects.requireNonNull(slotSupplier), this.workerMetricsScope);
1✔
94
    this.slotQueue =
1✔
95
        new LocalActivitySlotSupplierQueue(
96
            this.slotSupplier, (t) -> activityAttemptTaskExecutor.process(t));
1✔
97
    this.laScheduler = new LocalActivityDispatcherImpl();
1✔
98
  }
1✔
99

100
  private void submitRetry(
101
      @Nonnull LocalActivityExecutionContext executionContext,
102
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
103
    submitAttempt(executionContext, activityTask, true);
1✔
104
  }
1✔
105

106
  private void submitAttempt(
107
      @Nonnull LocalActivityExecutionContext executionContext,
108
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask,
109
      boolean isRetry) {
110
    @Nullable Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout();
1✔
111
    @Nullable final ScheduledFuture<?> scheduleToStartFuture;
112
    if (scheduleToStartTimeout != null) {
1✔
113
      scheduleToStartFuture =
1✔
114
          scheduledExecutor.schedule(
1✔
115
              new FinalTimeoutHandler(TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START, executionContext),
116
              scheduleToStartTimeout.toMillis(),
1✔
117
              TimeUnit.MILLISECONDS);
118
    } else {
119
      scheduleToStartFuture = null;
1✔
120
    }
121

122
    SlotReservationData reservationDat =
1✔
123
        new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId());
1✔
124
    activityTask.setCurrentAttemptScheduledTime(ProtobufTimeUtils.getCurrentProtoTime());
1✔
125
    final LocalActivityAttemptTask task =
1✔
126
        new LocalActivityAttemptTask(executionContext, activityTask, scheduleToStartFuture);
127
    slotQueue.submitAttempt(reservationDat, isRetry, task);
1✔
128
  }
1✔
129

130
  /**
131
   * @param executionContext execution context of the activity
132
   * @param activityTask activity task
133
   * @param attemptThrowable exception happened during the activity attempt. Can be null.
134
   * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
135
   *     applicable
136
   */
137
  @Nonnull
138
  private RetryDecision shouldRetry(
139
      LocalActivityExecutionContext executionContext,
140
      PollActivityTaskQueueResponseOrBuilder activityTask,
141
      @Nullable Throwable attemptThrowable) {
142
    int currentAttempt = activityTask.getAttempt();
1✔
143

144
    if (isNonRetryableApplicationFailure(attemptThrowable)) {
1✔
145
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
1✔
146
    }
147

148
    if (attemptThrowable instanceof Error) {
1✔
149
      // TODO Error inside Local Activity shouldn't be failing the local activity call.
150
      //  Instead we should fail Workflow Task. Implement a special flag for that in the result.
151
      //          task.callback(executionFailed(activityHandlerResult,
152
      // RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, currentAttempt));
153
      // don't just swallow Error from activities, propagate it to the top
154
      throw (Error) attemptThrowable;
1✔
155
    }
156

157
    if (isRetryPolicyNotSet(activityTask)) {
1✔
158
      return new RetryDecision(RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, null);
×
159
    }
160

161
    RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
162

163
    if (RetryOptionsUtils.isNotRetryable(retryOptions, attemptThrowable)) {
1✔
164
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
×
165
    }
166

167
    if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
1✔
168
      return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
1✔
169
    }
170

171
    Optional<Duration> nextRetryDelay = getNextRetryDelay(attemptThrowable);
1✔
172
    long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
173
    Duration sleep = nextRetryDelay.orElse(Duration.ofMillis(sleepMillis));
1✔
174
    if (RetryOptionsUtils.isDeadlineReached(
1✔
175
        executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
176
      return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
1✔
177
    }
178

179
    if (sleep.compareTo(executionContext.getLocalRetryThreshold()) > 0) {
1✔
180
      // RETRY_STATE_IN_PROGRESS shows that it's not the end for this local activity execution from
181
      // the workflow point of view. It's also not conflicting with any other situations and
182
      // uniquely identifies the reach of the local retries and a need to schedule a timer.
183
      return new RetryDecision(RetryState.RETRY_STATE_IN_PROGRESS, sleep);
1✔
184
    }
185

186
    return new RetryDecision(sleep);
1✔
187
  }
188

189
  /**
190
   * @param executionContext execution context of the activity
191
   * @param backoff delay time in milliseconds to the next attempt
192
   * @param failure if supplied, it will be used to override {@link
193
   *     LocalActivityExecutionContext#getLastAttemptFailure()}
194
   */
195
  private void scheduleNextAttempt(
196
      LocalActivityExecutionContext executionContext,
197
      @Nonnull Duration backoff,
198
      @Nullable Failure failure) {
199
    PollActivityTaskQueueResponse.Builder nextActivityTask =
1✔
200
        executionContext.getNextAttemptActivityTask(failure);
1✔
201
    Deadline.after(backoff.toMillis(), TimeUnit.MILLISECONDS)
1✔
202
        .runOnExpiration(
1✔
203
            new LocalActivityRetryHandler(executionContext, nextActivityTask), scheduledExecutor);
204
  }
1✔
205

206
  private class LocalActivityDispatcherImpl implements LocalActivityDispatcher {
1✔
207

208
    @Override
209
    public boolean dispatch(
210
        @Nonnull ExecuteLocalActivityParameters params,
211
        @Nonnull Functions.Proc1<LocalActivityResult> resultCallback,
212
        @Nullable Deadline acceptanceDeadline) {
213
      WorkerLifecycleState lifecycleState = getLifecycleState();
1✔
214
      switch (lifecycleState) {
1✔
215
        case NOT_STARTED:
216
          throw new IllegalStateException(
×
217
              "Local Activity Worker is not started, no activities were registered");
218
        case SHUTDOWN:
219
          throw new IllegalStateException("Local Activity Worker is shutdown");
×
220
        case TERMINATED:
221
          throw new IllegalStateException("Local Activity Worker is terminated");
×
222
        case SUSPENDED:
223
          throw new IllegalStateException(
×
224
              "[BUG] Local Activity Worker is suspended. Suspension is not supported for Local Activity Worker");
225
      }
226

227
      Preconditions.checkArgument(
1✔
228
          handler.isTypeSupported(params.getActivityType().getName()),
1✔
229
          "Activity type %s is not supported by the local activity worker",
230
          params.getActivityType().getName());
1✔
231

232
      long passedFromOriginalSchedulingMs =
233
          System.currentTimeMillis() - params.getOriginalScheduledTimestamp();
1✔
234
      Duration scheduleToCloseTimeout = params.getScheduleToCloseTimeout();
1✔
235
      Deadline scheduleToCloseDeadline = null;
1✔
236
      if (scheduleToCloseTimeout != null) {
1✔
237
        scheduleToCloseDeadline =
1✔
238
            Deadline.after(
1✔
239
                scheduleToCloseTimeout.toMillis() - passedFromOriginalSchedulingMs,
1✔
240
                TimeUnit.MILLISECONDS);
241
      }
242

243
      LocalActivityExecutionContext executionContext =
1✔
244
          new LocalActivityExecutionContext(params, resultCallback, scheduleToCloseDeadline);
245

246
      PollActivityTaskQueueResponse.Builder activityTask = executionContext.getInitialTask();
1✔
247

248
      boolean retryIsNotAllowed =
1✔
249
          failIfRetryIsNotAllowedByNewPolicy(executionContext, activityTask);
1✔
250
      if (retryIsNotAllowed) {
1✔
251
        return true;
1✔
252
      }
253

254
      return submitANewExecution(executionContext, activityTask, acceptanceDeadline);
1✔
255
    }
256

257
    private boolean submitANewExecution(
258
        @Nonnull LocalActivityExecutionContext executionContext,
259
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask,
260
        @Nullable Deadline acceptanceDeadline) {
261
      try {
262
        Long acceptanceTimeoutMs =
263
            acceptanceDeadline != null
1✔
264
                ? acceptanceDeadline.timeRemaining(TimeUnit.MILLISECONDS)
1✔
265
                : null;
1✔
266
        boolean accepted = slotQueue.waitOnBackpressure(acceptanceTimeoutMs);
1✔
267
        if (!accepted) {
1✔
268
          log.warn(
×
269
              "LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}",
270
              activityTask.getActivityId(),
×
271
              acceptanceTimeoutMs);
272
        }
273

274
        if (accepted) {
1✔
275
          // we should publish scheduleToClose before submission, so the handlers always see a full
276
          // state of executionContext
277
          @Nullable
278
          Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline();
1✔
279
          if (scheduleToCloseDeadline != null) {
1✔
280
            ScheduledFuture<?> scheduleToCloseFuture =
1✔
281
                scheduledExecutor.schedule(
1✔
282
                    new FinalTimeoutHandler(
283
                        TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext),
284
                    scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
1✔
285
                    TimeUnit.MILLISECONDS);
286
            executionContext.setScheduleToCloseFuture(scheduleToCloseFuture);
1✔
287
          }
288
          submitAttempt(executionContext, activityTask, false);
1✔
289
          log.trace("LocalActivity queued: {}", activityTask.getActivityId());
1✔
290
        }
291
        return accepted;
1✔
292
      } catch (InterruptedException e) {
×
293
        Thread.currentThread().interrupt();
×
294
        return false;
×
295
      }
296
    }
297

298
    /**
299
     * @param attemptTask local activity retry attempt task specifying the retry we are about to
300
     *     schedule
301
     * @return true if the retry attempt specified by {@code task} is not allowed by the current
302
     *     retry policy and the error was submitted in the callback, false otherwise
303
     */
304
    private boolean failIfRetryIsNotAllowedByNewPolicy(
305
        LocalActivityExecutionContext executionContext,
306
        PollActivityTaskQueueResponseOrBuilder attemptTask) {
307
      final Failure previousExecutionFailure = executionContext.getPreviousExecutionFailure();
1✔
308
      if (previousExecutionFailure != null) {
1✔
309
        // This is not an original local execution, it's a continuation using a workflow timer.
310
        // We should verify if the RetryOptions currently supplied in the workflow still allow the
311
        // retry.
312
        // If not, we need to recreate the same structure of an error like it would happen before we
313
        // started to sleep on the timer, at the end of the previous local execution.
314
        RetryState retryState =
1✔
315
            shouldStillRetry(executionContext, attemptTask, previousExecutionFailure);
1✔
316
        if (!RetryState.RETRY_STATE_IN_PROGRESS.equals(retryState)) {
1✔
317
          Failure failure;
318
          if (RetryState.RETRY_STATE_TIMEOUT.equals(retryState)) {
1✔
319
            if (previousExecutionFailure.hasTimeoutFailureInfo()
×
320
                && TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE.equals(
×
321
                    previousExecutionFailure.getTimeoutFailureInfo().getTimeoutType())) {
×
322
              // This scenario should behave the same way as a startToClose timeout happening and
323
              // encountering
324
              // RetryState#TIMEOUT during calculation of the next attempt (which is effectively a
325
              // scheduleToClose
326
              // timeout).
327
              // See how StartToCloseTimeoutHandler or
328
              // io.temporal.internal.testservice.StateMachines#timeoutActivityTask
329
              // discard startToClose in this case and replaces it with scheduleToClose
330
              failure =
×
331
                  newTimeoutFailure(
×
332
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
333
                      previousExecutionFailure.getCause());
×
334
            } else {
335
              failure =
×
336
                  newTimeoutFailure(
×
337
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, previousExecutionFailure);
338
            }
339
          } else {
340
            failure = previousExecutionFailure;
1✔
341
          }
342

343
          executionContext.callback(
1✔
344
              failed(
1✔
345
                  executionContext.getActivityId(),
1✔
346
                  attemptTask.getAttempt(),
1✔
347
                  retryState,
348
                  failure,
349
                  null));
350
          return true;
1✔
351
        }
352
      }
353
      return false;
1✔
354
    }
355

356
    /**
357
     * @param executionContext execution context of the activity
358
     * @param activityTask activity task
359
     * @param previousLocalExecutionFailure failure happened during previous local activity
360
     *     execution. Can be null.
361
     * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
362
     *     applicable
363
     */
364
    @Nonnull
365
    private RetryState shouldStillRetry(
366
        LocalActivityExecutionContext executionContext,
367
        PollActivityTaskQueueResponseOrBuilder activityTask,
368
        @Nullable Failure previousLocalExecutionFailure) {
369
      int currentAttempt = activityTask.getAttempt();
1✔
370

371
      if (isRetryPolicyNotSet(activityTask)) {
1✔
372
        return RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET;
×
373
      }
374

375
      RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
376

377
      if (previousLocalExecutionFailure != null
1✔
378
          && previousLocalExecutionFailure.hasApplicationFailureInfo()
1✔
379
          && RetryOptionsUtils.isNotRetryable(
×
380
              retryOptions, previousLocalExecutionFailure.getApplicationFailureInfo().getType())) {
×
381
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
×
382
      }
383

384
      // The current attempt didn't happen yet in this check, that's why -1
385
      if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt - 1)) {
1✔
386
        return RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED;
1✔
387
      }
388

389
      long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
390
      if (RetryOptionsUtils.isDeadlineReached(
1✔
391
          executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
392
        return RetryState.RETRY_STATE_TIMEOUT;
×
393
      }
394

395
      return RetryState.RETRY_STATE_IN_PROGRESS;
1✔
396
    }
397
  }
398

399
  private class AttemptTaskHandlerImpl
400
      implements PollTaskExecutor.TaskHandler<LocalActivityAttemptTask> {
401

402
    private final ActivityTaskHandler handler;
403

404
    private AttemptTaskHandlerImpl(ActivityTaskHandler handler) {
1✔
405
      this.handler = handler;
1✔
406
    }
1✔
407

408
    @Override
409
    public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
410
      SlotReleaseReason reason = SlotReleaseReason.taskComplete();
1✔
411

412
      // cancel scheduleToStart timeout if not already fired
413
      @Nullable ScheduledFuture<?> scheduleToStartFuture = attemptTask.getScheduleToStartFuture();
1✔
414
      boolean scheduleToStartFired =
1✔
415
          scheduleToStartFuture != null && !scheduleToStartFuture.cancel(false);
1✔
416

417
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
418
      executionContext.newAttempt();
1✔
419
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
420

421
      try {
422
        // if an activity was already completed by any mean like scheduleToClose or scheduleToStart,
423
        // discard this attempt, this execution is completed.
424
        // The scheduleToStartFired check here is a bit overkill, but allows to catch an edge case
425
        // where scheduleToStart is already fired, but didn't report a completion yet.
426
        boolean shouldDiscardTheAttempt = scheduleToStartFired || executionContext.isCompleted();
1✔
427
        if (shouldDiscardTheAttempt) {
1✔
428
          return;
×
429
        }
430

431
        Scope metricsScope =
1✔
432
            workerMetricsScope.tagged(
1✔
433
                ImmutableMap.of(
1✔
434
                    MetricsTag.ACTIVITY_TYPE,
435
                    activityTask.getActivityType().getName(),
1✔
436
                    MetricsTag.WORKFLOW_TYPE,
437
                    activityTask.getWorkflowType().getName()));
1✔
438

439
        MDC.put(LoggerTag.ACTIVITY_ID, activityTask.getActivityId());
1✔
440
        MDC.put(LoggerTag.ACTIVITY_TYPE, activityTask.getActivityType().getName());
1✔
441
        MDC.put(LoggerTag.WORKFLOW_ID, activityTask.getWorkflowExecution().getWorkflowId());
1✔
442
        MDC.put(LoggerTag.WORKFLOW_TYPE, activityTask.getWorkflowType().getName());
1✔
443
        MDC.put(LoggerTag.RUN_ID, activityTask.getWorkflowExecution().getRunId());
1✔
444
        MDC.put(LoggerTag.ATTEMPT, Integer.toString(activityTask.getAttempt()));
1✔
445

446
        slotSupplier.markSlotUsed(
1✔
447
            new LocalActivitySlotInfo(
448
                ActivityPollResponseToInfo.toActivityInfoImpl(
1✔
449
                    activityTask, namespace, taskQueue, true),
1✔
450
                options.getIdentity(),
1✔
451
                options.getBuildId()),
1✔
452
            executionContext.getPermit());
1✔
453

454
        ScheduledFuture<?> startToCloseTimeoutFuture = null;
1✔
455

456
        if (activityTask.hasStartToCloseTimeout()) {
1✔
457
          startToCloseTimeoutFuture =
1✔
458
              scheduledExecutor.schedule(
1✔
459
                  new StartToCloseTimeoutHandler(attemptTask),
460
                  ProtobufTimeUtils.toJavaDuration(
1✔
461
                          attemptTask.getAttemptTask().getStartToCloseTimeout())
1✔
462
                      .toMillis(),
1✔
463
                  TimeUnit.MILLISECONDS);
464
        }
465

466
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
467

468
        ActivityTaskHandler.Result activityHandlerResult;
469
        Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
470
        try {
471
          activityHandlerResult =
1✔
472
              handler.handle(
1✔
473
                  new ActivityTask(activityTask, executionContext.getPermit(), () -> {}),
1✔
474
                  metricsScope,
475
                  true);
476
        } finally {
477
          sw.stop();
1✔
478
        }
479

480
        // Cancel startToCloseTimeoutFuture if it's not yet fired.
481
        boolean startToCloseTimeoutFired =
1✔
482
            startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);
1✔
483

484
        // We make sure that the result handling code following this statement is mutual exclusive
485
        // with the startToClose timeout handler.
486
        // If startToClose fired, scheduling of the next attempt is taken care by the
487
        // StartToCloseTimeoutHandler.
488
        // If execution is already completed, this attempt handling shouldn't proceed, nothing to do
489
        // with result. The typical scenario may be fired scheduleToClose.
490
        boolean shouldDiscardTheResult = startToCloseTimeoutFired || executionContext.isCompleted();
1✔
491
        if (shouldDiscardTheResult) {
1✔
492
          return;
1✔
493
        }
494

495
        reason = handleResult(activityHandlerResult, attemptTask, metricsScope);
1✔
496
      } catch (Throwable ex) {
1✔
497
        // handleLocalActivity is expected to never throw an exception and return a result
498
        // that can be used for a workflow callback if this method throws, it's a bug.
499
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
1✔
500
        executionContext.callback(
1✔
501
            processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex));
1✔
502
        throw ex;
1✔
503
      } finally {
504
        slotSupplier.releaseSlot(reason, executionContext.getPermit());
1✔
505
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
506
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
507
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
508
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
509
        MDC.remove(LoggerTag.RUN_ID);
1✔
510
        MDC.remove(LoggerTag.ATTEMPT);
1✔
511
      }
512
    }
1✔
513

514
    private SlotReleaseReason handleResult(
515
        ActivityTaskHandler.Result activityHandlerResult,
516
        LocalActivityAttemptTask attemptTask,
517
        Scope metricsScope) {
518
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
519
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
520
      int currentAttempt = activityTask.getAttempt();
1✔
521
      SlotReleaseReason releaseReason = SlotReleaseReason.taskComplete();
1✔
522

523
      // Success
524
      if (activityHandlerResult.getTaskCompleted() != null) {
1✔
525
        boolean completedByThisInvocation =
1✔
526
            executionContext.callback(
1✔
527
                LocalActivityResult.completed(activityHandlerResult, currentAttempt));
1✔
528
        if (completedByThisInvocation) {
1✔
529
          // We report this metric only if the execution was completed by us right now, not by any
530
          // timeout earlier.
531
          // Completion by another attempt is not possible by another attempt earlier where we
532
          // checked if startToClose fired.
533
          com.uber.m3.util.Duration e2eDuration =
534
              com.uber.m3.util.Duration.ofMillis(
1✔
535
                  System.currentTimeMillis() - executionContext.getOriginalScheduledTimestamp());
1✔
536
          metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
537
        }
538
        return releaseReason;
1✔
539
      }
540

541
      // Cancellation
542
      if (activityHandlerResult.getTaskCanceled() != null) {
1✔
543
        executionContext.callback(
×
544
            LocalActivityResult.cancelled(activityHandlerResult, currentAttempt));
×
545
        return releaseReason;
×
546
      }
547

548
      // Failure
549
      Preconditions.checkState(
1✔
550
          activityHandlerResult.getTaskFailed() != null,
1✔
551
          "One of taskCompleted, taskCanceled or taskFailed must be set");
552

553
      Failure executionFailure =
1✔
554
          activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
1✔
555
      Throwable executionThrowable = activityHandlerResult.getTaskFailed().getFailure();
1✔
556

557
      RetryDecision retryDecision =
1✔
558
          shouldRetry(
1✔
559
              executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure());
1✔
560

561
      if (retryDecision.doNextAttempt()) {
1✔
562
        releaseReason = SlotReleaseReason.willRetry();
1✔
563
        scheduleNextAttempt(
1✔
564
            executionContext,
565
            Objects.requireNonNull(
1✔
566
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
567
            executionFailure);
568
      } else if (retryDecision.failWorkflowTask()) {
1✔
569
        releaseReason = SlotReleaseReason.error(new Exception(executionThrowable));
×
570
        executionContext.callback(
×
571
            processingFailed(executionContext.getActivityId(), currentAttempt, executionThrowable));
×
572
      } else {
573
        executionContext.callback(
1✔
574
            failed(
1✔
575
                executionContext.getActivityId(),
1✔
576
                currentAttempt,
577
                retryDecision.retryState,
1✔
578
                executionFailure,
579
                retryDecision.nextAttemptBackoff));
1✔
580
      }
581
      return releaseReason;
1✔
582
    }
583

584
    @Override
585
    public Throwable wrapFailure(LocalActivityAttemptTask task, Throwable failure) {
586
      return new RuntimeException("Failure processing local activity task.", failure);
1✔
587
    }
588
  }
589

590
  private class LocalActivityRetryHandler implements Runnable {
591
    private final @Nonnull LocalActivityExecutionContext executionContext;
592
    private final @Nonnull PollActivityTaskQueueResponse.Builder activityTask;
593

594
    private LocalActivityRetryHandler(
595
        @Nonnull LocalActivityExecutionContext executionContext,
596
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
1✔
597
      this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
1✔
598
      this.activityTask = Objects.requireNonNull(activityTask, "activityTask");
1✔
599
    }
1✔
600

601
    @Override
602
    public void run() {
603
      submitRetry(executionContext, activityTask);
1✔
604
    }
1✔
605
  }
606

607
  /** Used to perform both scheduleToStart and scheduleToClose timeouts. */
608
  private static class FinalTimeoutHandler implements Runnable {
609
    private final LocalActivityExecutionContext executionContext;
610
    private final TimeoutType timeoutType;
611

612
    public FinalTimeoutHandler(
613
        TimeoutType timeoutType, LocalActivityExecutionContext executionContext) {
1✔
614
      this.executionContext = executionContext;
1✔
615
      this.timeoutType = timeoutType;
1✔
616
    }
1✔
617

618
    @Override
619
    public void run() {
620
      executionContext.callback(
1✔
621
          failed(
1✔
622
              executionContext.getActivityId(),
1✔
623
              executionContext.getCurrentAttempt(),
1✔
624
              RetryState.RETRY_STATE_TIMEOUT,
625
              newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
626
              null));
627
    }
1✔
628
  }
629

630
  private class StartToCloseTimeoutHandler implements Runnable {
631
    private final LocalActivityAttemptTask attemptTask;
632

633
    private StartToCloseTimeoutHandler(LocalActivityAttemptTask attemptTask) {
1✔
634
      this.attemptTask = attemptTask;
1✔
635
    }
1✔
636

637
    @Override
638
    public void run() {
639
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
640
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
641
      String activityId = activityTask.getActivityId();
1✔
642

643
      int timingOutAttempt = activityTask.getAttempt();
1✔
644

645
      RetryDecision retryDecision = shouldRetry(executionContext, activityTask, null);
1✔
646
      if (retryDecision.doNextAttempt()) {
1✔
647
        scheduleNextAttempt(
1✔
648
            executionContext,
649
            Objects.requireNonNull(
1✔
650
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
651
            newTimeoutFailure(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, null));
1✔
652
      } else {
653
        // RetryState.RETRY_STATE_TIMEOUT happens only when scheduleToClose is fired
654
        // scheduleToClose timeout is effectively replacing the original startToClose
655
        TimeoutType timeoutType =
656
            RetryState.RETRY_STATE_TIMEOUT.equals(retryDecision.retryState)
1✔
657
                ? TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
1✔
658
                : TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE;
1✔
659
        executionContext.callback(
1✔
660
            failed(
1✔
661
                activityId,
662
                timingOutAttempt,
663
                retryDecision.retryState,
1✔
664
                newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
665
                retryDecision.nextAttemptBackoff));
1✔
666
      }
667
    }
1✔
668
  }
669

670
  @Override
671
  public boolean start() {
672
    if (handler.isAnyTypeSupported()) {
1✔
673
      this.scheduledExecutor =
1✔
674
          Executors.newSingleThreadScheduledExecutor(
1✔
675
              r -> {
676
                Thread thread = new Thread(r);
1✔
677
                thread.setName(
1✔
678
                    WorkerThreadsNameHelper.getLocalActivitySchedulerThreadPrefix(
1✔
679
                        namespace, taskQueue));
680
                return thread;
1✔
681
              });
682

683
      this.activityAttemptTaskExecutor =
1✔
684
          new PollTaskExecutor<>(
685
              namespace,
686
              taskQueue,
687
              options.getIdentity(),
1✔
688
              new AttemptTaskHandlerImpl(handler),
689
              pollerOptions,
690
              slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
1✔
691
              false);
692

693
      this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
694
      this.slotQueue.start();
1✔
695
      return true;
1✔
696
    } else {
697
      return false;
1✔
698
    }
699
  }
700

701
  @Override
702
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
703
    if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
1✔
704
      return slotQueue
1✔
705
          .shutdown(shutdownManager, interruptTasks)
1✔
706
          .thenCompose(r -> activityAttemptTaskExecutor.shutdown(shutdownManager, interruptTasks))
1✔
707
          .thenCompose(
1✔
708
              r ->
709
                  shutdownManager.shutdownExecutor(
1✔
710
                      scheduledExecutor, this + "#scheduledExecutor", Duration.ofSeconds(1)))
1✔
711
          .exceptionally(
1✔
712
              e -> {
713
                log.error("[BUG] Unexpected exception during shutdown", e);
×
714
                return null;
×
715
              });
716
    } else {
717
      return CompletableFuture.completedFuture(null);
1✔
718
    }
719
  }
720

721
  @Override
722
  public void awaitTermination(long timeout, TimeUnit unit) {
723
    long timeoutMillis = unit.toMillis(timeout);
1✔
724
    long remainingTimeout = ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
1✔
725
    ShutdownManager.awaitTermination(slotQueue, remainingTimeout);
1✔
726
  }
1✔
727

728
  @Override
729
  public boolean isShutdown() {
730
    return activityAttemptTaskExecutor != null
×
731
        && activityAttemptTaskExecutor.isShutdown()
×
732
        && slotQueue.isShutdown();
×
733
  }
734

735
  @Override
736
  public boolean isTerminated() {
737
    return activityAttemptTaskExecutor != null
×
738
        && activityAttemptTaskExecutor.isTerminated()
×
739
        && scheduledExecutor.isTerminated()
×
740
        && slotQueue.isTerminated();
×
741
  }
742

743
  @Override
744
  public WorkerLifecycleState getLifecycleState() {
745
    if (activityAttemptTaskExecutor == null) {
1✔
746
      return WorkerLifecycleState.NOT_STARTED;
×
747
    }
748
    if (activityAttemptTaskExecutor.isShutdown()) {
1✔
749
      // return TERMINATED only if both pollExecutor and taskExecutor are terminated
750
      if (activityAttemptTaskExecutor.isTerminated() && scheduledExecutor.isTerminated()) {
×
751
        return WorkerLifecycleState.TERMINATED;
×
752
      } else {
753
        return WorkerLifecycleState.SHUTDOWN;
×
754
      }
755
    }
756
    return WorkerLifecycleState.ACTIVE;
1✔
757
  }
758

759
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
760
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
761
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
762
      pollerOptions =
1✔
763
          PollerOptions.newBuilder(pollerOptions)
1✔
764
              .setPollThreadNamePrefix(
1✔
765
                  WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
1✔
766
              .build();
1✔
767
    }
768
    return pollerOptions;
1✔
769
  }
770

771
  public LocalActivityDispatcher getLocalActivityScheduler() {
772
    return laScheduler;
1✔
773
  }
774

775
  private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {
776
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
777
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
778
    if (cause != null) {
1✔
779
      result.setCause(cause);
1✔
780
    }
781
    return result.build();
1✔
782
  }
783

784
  private static boolean isRetryPolicyNotSet(
785
      PollActivityTaskQueueResponseOrBuilder pollActivityTask) {
786
    return !pollActivityTask.hasScheduleToCloseTimeout()
1✔
787
        && (!pollActivityTask.hasRetryPolicy()
1✔
788
            || pollActivityTask.getRetryPolicy().getMaximumAttempts() <= 0);
1✔
789
  }
790

791
  private static boolean isNonRetryableApplicationFailure(@Nullable Throwable executionThrowable) {
792
    return executionThrowable instanceof ApplicationFailure
1✔
793
        && ((ApplicationFailure) executionThrowable).isNonRetryable();
1✔
794
  }
795

796
  private static Optional<Duration> getNextRetryDelay(@Nullable Throwable executionThrowable) {
797
    if (executionThrowable instanceof ApplicationFailure) {
1✔
798
      return Optional.ofNullable(((ApplicationFailure) executionThrowable).getNextRetryDelay());
1✔
799
    }
800
    return Optional.empty();
1✔
801
  }
802

803
  private static class RetryDecision {
804
    private final @Nullable RetryState retryState;
805
    private final @Nullable Duration nextAttemptBackoff;
806

807
    // No next local attempts
808
    public RetryDecision(@Nonnull RetryState retryState, @Nullable Duration nextAttemptBackoff) {
1✔
809
      this.retryState = retryState;
1✔
810
      this.nextAttemptBackoff = nextAttemptBackoff;
1✔
811
    }
1✔
812

813
    // Do the next attempt
814
    public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
1✔
815
      this.retryState = null;
1✔
816
      this.nextAttemptBackoff = Objects.requireNonNull(nextAttemptBackoff);
1✔
817
    }
1✔
818

819
    public boolean doNextAttempt() {
820
      return retryState == null;
1✔
821
    }
822

823
    public boolean failWorkflowTask() {
824
      return RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR.equals(retryState);
1✔
825
    }
826
  }
827
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc