• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #278

08 Jul 2024 04:42PM UTC coverage: 77.565% (+0.1%) from 77.469%
#278

push

github

web-flow
Revert configurable slot provider (#2134)

* Revert "Resource based tuner (#2110)"

This reverts commit 8a2d5cdcc.

* Revert "Slot supplier interface & fixed-size implementation (#2014)"

This reverts commit d2a06fc6f.

* Fix merge conflict

* Keep Publish Test Report step

* Add tests for worker slots

* Fix white space

* One other whitespace change

117 of 133 new or added lines in 17 files covered. (87.97%)

5 existing lines in 5 files now uncovered.

19088 of 24609 relevant lines covered (77.57%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.76
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.worker.LocalActivityResult.failed;
24
import static io.temporal.internal.worker.LocalActivityResult.processingFailed;
25

26
import com.google.common.base.Preconditions;
27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.tally.Stopwatch;
29
import com.uber.m3.util.ImmutableMap;
30
import io.grpc.Deadline;
31
import io.temporal.api.enums.v1.RetryState;
32
import io.temporal.api.enums.v1.TimeoutType;
33
import io.temporal.api.failure.v1.Failure;
34
import io.temporal.api.failure.v1.TimeoutFailureInfo;
35
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
36
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
37
import io.temporal.common.RetryOptions;
38
import io.temporal.failure.ApplicationFailure;
39
import io.temporal.internal.common.ProtobufTimeUtils;
40
import io.temporal.internal.common.RetryOptionsUtils;
41
import io.temporal.internal.logging.LoggerTag;
42
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
43
import io.temporal.serviceclient.MetricsTag;
44
import io.temporal.worker.MetricsType;
45
import io.temporal.worker.WorkerMetricsTag;
46
import io.temporal.workflow.Functions;
47
import java.time.Duration;
48
import java.util.Objects;
49
import java.util.Optional;
50
import java.util.concurrent.*;
51
import javax.annotation.Nonnull;
52
import javax.annotation.Nullable;
53
import org.slf4j.Logger;
54
import org.slf4j.LoggerFactory;
55
import org.slf4j.MDC;
56

57
final class LocalActivityWorker implements Startable, Shutdownable {
58
  private static final Logger log = LoggerFactory.getLogger(LocalActivityWorker.class);
1✔
59

60
  private final ActivityTaskHandler handler;
61
  private final String namespace;
62
  private final String taskQueue;
63

64
  private final SingleWorkerOptions options;
65

66
  private final LocalActivityDispatcherImpl laScheduler;
67

68
  private final PollerOptions pollerOptions;
69
  private final Scope workerMetricsScope;
70

71
  private ScheduledExecutorService scheduledExecutor;
72
  private PollTaskExecutor<LocalActivityAttemptTask> activityAttemptTaskExecutor;
73

74
  public LocalActivityWorker(
75
      @Nonnull String namespace,
76
      @Nonnull String taskQueue,
77
      @Nonnull SingleWorkerOptions options,
78
      @Nonnull ActivityTaskHandler handler) {
1✔
79
    this.namespace = Objects.requireNonNull(namespace);
1✔
80
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
81
    this.handler = handler;
1✔
82
    this.laScheduler = new LocalActivityDispatcherImpl(2 * options.getTaskExecutorThreadPoolSize());
1✔
83
    this.options = Objects.requireNonNull(options);
1✔
84
    this.pollerOptions = getPollerOptions(options);
1✔
85
    this.workerMetricsScope =
1✔
86
        MetricsTag.tagged(
1✔
87
            options.getMetricsScope(), WorkerMetricsTag.WorkerType.LOCAL_ACTIVITY_WORKER);
1✔
88
  }
1✔
89

90
  private void submitRetry(
91
      @Nonnull LocalActivityExecutionContext executionContext,
92
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
93
    submitAttempt(executionContext, activityTask, null);
1✔
94
  }
1✔
95

96
  private void submitAttempt(
97
      @Nonnull LocalActivityExecutionContext executionContext,
98
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask,
99
      @Nullable Functions.Proc leftQueueCallback) {
100
    @Nullable Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout();
1✔
101
    @Nullable ScheduledFuture<?> scheduleToStartFuture = null;
1✔
102
    if (scheduleToStartTimeout != null) {
1✔
103
      scheduleToStartFuture =
1✔
104
          scheduledExecutor.schedule(
1✔
105
              new FinalTimeoutHandler(TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START, executionContext),
106
              scheduleToStartTimeout.toMillis(),
1✔
107
              TimeUnit.MILLISECONDS);
108
    }
109

110
    activityTask.setCurrentAttemptScheduledTime(ProtobufTimeUtils.getCurrentProtoTime());
1✔
111
    LocalActivityAttemptTask task =
1✔
112
        new LocalActivityAttemptTask(
113
            executionContext, activityTask, leftQueueCallback, scheduleToStartFuture);
114
    activityAttemptTaskExecutor.process(task);
1✔
115
  }
1✔
116

117
  /**
118
   * @param executionContext execution context of the activity
119
   * @param activityTask activity task
120
   * @param attemptThrowable exception happened during the activity attempt. Can be null.
121
   * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
122
   *     applicable
123
   */
124
  @Nonnull
125
  private RetryDecision shouldRetry(
126
      LocalActivityExecutionContext executionContext,
127
      PollActivityTaskQueueResponseOrBuilder activityTask,
128
      @Nullable Throwable attemptThrowable) {
129
    int currentAttempt = activityTask.getAttempt();
1✔
130

131
    if (isNonRetryableApplicationFailure(attemptThrowable)) {
1✔
132
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
1✔
133
    }
134

135
    if (attemptThrowable instanceof Error) {
1✔
136
      // TODO Error inside Local Activity shouldn't be failing the local activity call.
137
      //  Instead we should fail Workflow Task. Implement a special flag for that in the result.
138
      //          task.callback(executionFailed(activityHandlerResult,
139
      // RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, currentAttempt));
140
      // don't just swallow Error from activities, propagate it to the top
141
      throw (Error) attemptThrowable;
1✔
142
    }
143

144
    if (isRetryPolicyNotSet(activityTask)) {
1✔
145
      return new RetryDecision(RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, null);
×
146
    }
147

148
    RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
149

150
    if (RetryOptionsUtils.isNotRetryable(retryOptions, attemptThrowable)) {
1✔
151
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
×
152
    }
153

154
    if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
1✔
155
      return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
1✔
156
    }
157

158
    Optional<Duration> nextRetryDelay = getNextRetryDelay(attemptThrowable);
1✔
159
    long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
160
    Duration sleep = nextRetryDelay.orElse(Duration.ofMillis(sleepMillis));
1✔
161
    if (RetryOptionsUtils.isDeadlineReached(
1✔
162
        executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
163
      return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
1✔
164
    }
165

166
    if (sleep.compareTo(executionContext.getLocalRetryThreshold()) > 0) {
1✔
167
      // RETRY_STATE_IN_PROGRESS shows that it's not the end for this local activity execution from
168
      // the workflow point of view. It's also not conflicting with any other situations and
169
      // uniquely identifies the reach of the local retries and a need to schedule a timer.
170
      return new RetryDecision(RetryState.RETRY_STATE_IN_PROGRESS, sleep);
1✔
171
    }
172

173
    return new RetryDecision(sleep);
1✔
174
  }
175

176
  /**
177
   * @param executionContext execution context of the activity
178
   * @param backoff delay time in milliseconds to the next attempt
179
   * @param failure if supplied, it will be used to override {@link
180
   *     LocalActivityExecutionContext#getLastAttemptFailure()}
181
   */
182
  private void scheduleNextAttempt(
183
      LocalActivityExecutionContext executionContext,
184
      @Nonnull Duration backoff,
185
      @Nullable Failure failure) {
186
    PollActivityTaskQueueResponse.Builder nextActivityTask =
1✔
187
        executionContext.getNextAttemptActivityTask(failure);
1✔
188
    Deadline.after(backoff.toMillis(), TimeUnit.MILLISECONDS)
1✔
189
        .runOnExpiration(
1✔
190
            new LocalActivityRetryHandler(executionContext, nextActivityTask), scheduledExecutor);
191
  }
1✔
192

193
  private class LocalActivityDispatcherImpl implements LocalActivityDispatcher {
194
    /**
195
     * Retries always get a green light, but we have a backpressure for new tasks if the queue fills
196
     * up with not picked up new executions
197
     */
198
    private final Semaphore newExecutionsBackpressureSemaphore;
199

200
    public LocalActivityDispatcherImpl(int semaphorePermits) {
1✔
201
      // number of permits for this semaphore is not that important, because we allow submitter to
202
      // block and wait till the workflow task heartbeat to allow the worker to tolerate spikes of
203
      // short local activity executions.
204
      this.newExecutionsBackpressureSemaphore = new Semaphore(semaphorePermits);
1✔
205
    }
1✔
206

207
    @Override
208
    public boolean dispatch(
209
        @Nonnull ExecuteLocalActivityParameters params,
210
        @Nonnull Functions.Proc1<LocalActivityResult> resultCallback,
211
        @Nullable Deadline acceptanceDeadline) {
212
      WorkerLifecycleState lifecycleState = getLifecycleState();
1✔
213
      switch (lifecycleState) {
1✔
214
        case NOT_STARTED:
215
          throw new IllegalStateException(
×
216
              "Local Activity Worker is not started, no activities were registered");
217
        case SHUTDOWN:
218
          throw new IllegalStateException("Local Activity Worker is shutdown");
×
219
        case TERMINATED:
220
          throw new IllegalStateException("Local Activity Worker is terminated");
×
221
        case SUSPENDED:
222
          throw new IllegalStateException(
×
223
              "[BUG] Local Activity Worker is suspended. Suspension is not supported for Local Activity Worker");
224
      }
225

226
      Preconditions.checkArgument(
1✔
227
          handler.isTypeSupported(params.getActivityType().getName()),
1✔
228
          "Activity type %s is not supported by the local activity worker",
229
          params.getActivityType().getName());
1✔
230

231
      long passedFromOriginalSchedulingMs =
232
          System.currentTimeMillis() - params.getOriginalScheduledTimestamp();
1✔
233
      Duration scheduleToCloseTimeout = params.getScheduleToCloseTimeout();
1✔
234
      Deadline scheduleToCloseDeadline = null;
1✔
235
      if (scheduleToCloseTimeout != null) {
1✔
236
        scheduleToCloseDeadline =
1✔
237
            Deadline.after(
1✔
238
                scheduleToCloseTimeout.toMillis() - passedFromOriginalSchedulingMs,
1✔
239
                TimeUnit.MILLISECONDS);
240
      }
241

242
      LocalActivityExecutionContext executionContext =
1✔
243
          new LocalActivityExecutionContext(params, resultCallback, scheduleToCloseDeadline);
244

245
      PollActivityTaskQueueResponse.Builder activityTask = executionContext.getInitialTask();
1✔
246

247
      boolean retryIsNotAllowed =
1✔
248
          failIfRetryIsNotAllowedByNewPolicy(executionContext, activityTask);
1✔
249
      if (retryIsNotAllowed) {
1✔
250
        return true;
1✔
251
      }
252

253
      return submitANewExecution(executionContext, activityTask, acceptanceDeadline);
1✔
254
    }
255

256
    private boolean submitANewExecution(
257
        @Nonnull LocalActivityExecutionContext executionContext,
258
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask,
259
        @Nullable Deadline acceptanceDeadline) {
260
      try {
261
        boolean accepted;
262
        if (acceptanceDeadline == null) {
1✔
NEW
263
          newExecutionsBackpressureSemaphore.acquire();
×
NEW
264
          accepted = true;
×
265
        } else {
266
          long acceptanceTimeoutMs = acceptanceDeadline.timeRemaining(TimeUnit.MILLISECONDS);
1✔
267
          if (acceptanceTimeoutMs > 0) {
1✔
268
            accepted =
1✔
269
                newExecutionsBackpressureSemaphore.tryAcquire(
1✔
270
                    acceptanceTimeoutMs, TimeUnit.MILLISECONDS);
271
          } else {
272
            accepted = newExecutionsBackpressureSemaphore.tryAcquire();
1✔
273
          }
274
          if (!accepted) {
1✔
NEW
275
            log.warn(
×
276
                "LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}",
NEW
277
                activityTask.getActivityId(),
×
NEW
278
                acceptanceTimeoutMs);
×
279
          }
280
        }
281

282
        if (accepted) {
1✔
283
          // we should publish scheduleToClose before submission, so the handlers always see a full
284
          // state of executionContext
285
          @Nullable
286
          Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline();
1✔
287
          if (scheduleToCloseDeadline != null) {
1✔
288
            ScheduledFuture<?> scheduleToCloseFuture =
1✔
289
                scheduledExecutor.schedule(
1✔
290
                    new FinalTimeoutHandler(
291
                        TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext),
292
                    scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
1✔
293
                    TimeUnit.MILLISECONDS);
294
            executionContext.setScheduleToCloseFuture(scheduleToCloseFuture);
1✔
295
          }
296
          submitAttempt(
1✔
297
              executionContext, activityTask, newExecutionsBackpressureSemaphore::release);
1✔
298
          log.trace("LocalActivity queued: {}", activityTask.getActivityId());
1✔
299
        }
300
        return accepted;
1✔
UNCOV
301
      } catch (InterruptedException e) {
×
302
        Thread.currentThread().interrupt();
×
303
        return false;
×
304
      }
305
    }
306

307
    /**
308
     * @param attemptTask local activity retry attempt task specifying the retry we are about to
309
     *     schedule
310
     * @return true if the retry attempt specified by {@code task} is not allowed by the current
311
     *     retry policy and the error was submitted in the callback, false otherwise
312
     */
313
    private boolean failIfRetryIsNotAllowedByNewPolicy(
314
        LocalActivityExecutionContext executionContext,
315
        PollActivityTaskQueueResponseOrBuilder attemptTask) {
316
      final Failure previousExecutionFailure = executionContext.getPreviousExecutionFailure();
1✔
317
      if (previousExecutionFailure != null) {
1✔
318
        // This is not an original local execution, it's a continuation using a workflow timer.
319
        // We should verify if the RetryOptions currently supplied in the workflow still allow the
320
        // retry.
321
        // If not, we need to recreate the same structure of an error like it would happen before we
322
        // started to sleep on the timer, at the end of the previous local execution.
323
        RetryState retryState =
1✔
324
            shouldStillRetry(executionContext, attemptTask, previousExecutionFailure);
1✔
325
        if (!RetryState.RETRY_STATE_IN_PROGRESS.equals(retryState)) {
1✔
326
          Failure failure;
327
          if (RetryState.RETRY_STATE_TIMEOUT.equals(retryState)) {
1✔
328
            if (previousExecutionFailure.hasTimeoutFailureInfo()
×
329
                && TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE.equals(
×
330
                    previousExecutionFailure.getTimeoutFailureInfo().getTimeoutType())) {
×
331
              // This scenario should behave the same way as a startToClose timeout happening and
332
              // encountering
333
              // RetryState#TIMEOUT during calculation of the next attempt (which is effectively a
334
              // scheduleToClose
335
              // timeout).
336
              // See how StartToCloseTimeoutHandler or
337
              // io.temporal.internal.testservice.StateMachines#timeoutActivityTask
338
              // discard startToClose in this case and replaces it with scheduleToClose
339
              failure =
×
340
                  newTimeoutFailure(
×
341
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
342
                      previousExecutionFailure.getCause());
×
343
            } else {
344
              failure =
×
345
                  newTimeoutFailure(
×
346
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, previousExecutionFailure);
347
            }
348
          } else {
349
            failure = previousExecutionFailure;
1✔
350
          }
351

352
          executionContext.callback(
1✔
353
              failed(
1✔
354
                  executionContext.getActivityId(),
1✔
355
                  attemptTask.getAttempt(),
1✔
356
                  retryState,
357
                  failure,
358
                  null));
359
          return true;
1✔
360
        }
361
      }
362
      return false;
1✔
363
    }
364

365
    /**
366
     * @param executionContext execution context of the activity
367
     * @param activityTask activity task
368
     * @param previousLocalExecutionFailure failure happened during previous local activity
369
     *     execution. Can be null.
370
     * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
371
     *     applicable
372
     */
373
    @Nonnull
374
    private RetryState shouldStillRetry(
375
        LocalActivityExecutionContext executionContext,
376
        PollActivityTaskQueueResponseOrBuilder activityTask,
377
        @Nullable Failure previousLocalExecutionFailure) {
378
      int currentAttempt = activityTask.getAttempt();
1✔
379

380
      if (isRetryPolicyNotSet(activityTask)) {
1✔
381
        return RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET;
×
382
      }
383

384
      RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
385

386
      if (previousLocalExecutionFailure != null
1✔
387
          && previousLocalExecutionFailure.hasApplicationFailureInfo()
1✔
388
          && RetryOptionsUtils.isNotRetryable(
×
389
              retryOptions, previousLocalExecutionFailure.getApplicationFailureInfo().getType())) {
×
390
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
×
391
      }
392

393
      // The current attempt didn't happen yet in this check, that's why -1
394
      if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt - 1)) {
1✔
395
        return RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED;
1✔
396
      }
397

398
      long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
399
      if (RetryOptionsUtils.isDeadlineReached(
1✔
400
          executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
401
        return RetryState.RETRY_STATE_TIMEOUT;
×
402
      }
403

404
      return RetryState.RETRY_STATE_IN_PROGRESS;
1✔
405
    }
406
  }
407

408
  private class AttemptTaskHandlerImpl
409
      implements PollTaskExecutor.TaskHandler<LocalActivityAttemptTask> {
410

411
    private final ActivityTaskHandler handler;
412

413
    private AttemptTaskHandlerImpl(ActivityTaskHandler handler) {
1✔
414
      this.handler = handler;
1✔
415
    }
1✔
416

417
    @Override
418
    public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
419
      attemptTask.markAsTakenFromQueue();
1✔
420

421
      // cancel scheduleToStart timeout if not already fired
422
      @Nullable ScheduledFuture<?> scheduleToStartFuture = attemptTask.getScheduleToStartFuture();
1✔
423
      boolean scheduleToStartFired =
1✔
424
          scheduleToStartFuture != null && !scheduleToStartFuture.cancel(false);
1✔
425

426
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
427
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
428

429
      // if an activity was already completed by any mean like scheduleToClose or scheduleToStart,
430
      // discard this attempt, this execution is completed.
431
      // The scheduleToStartFired check here is a bit overkill, but allows to catch an edge case
432
      // where
433
      // scheduleToStart is already fired, but didn't report a completion yet.
434
      boolean shouldDiscardTheAttempt = scheduleToStartFired || executionContext.isCompleted();
1✔
435
      if (shouldDiscardTheAttempt) {
1✔
436
        return;
×
437
      }
438

439
      Scope metricsScope =
1✔
440
          workerMetricsScope.tagged(
1✔
441
              ImmutableMap.of(
1✔
442
                  MetricsTag.ACTIVITY_TYPE,
443
                  activityTask.getActivityType().getName(),
1✔
444
                  MetricsTag.WORKFLOW_TYPE,
445
                  activityTask.getWorkflowType().getName()));
1✔
446

447
      MDC.put(LoggerTag.ACTIVITY_ID, activityTask.getActivityId());
1✔
448
      MDC.put(LoggerTag.ACTIVITY_TYPE, activityTask.getActivityType().getName());
1✔
449
      MDC.put(LoggerTag.WORKFLOW_ID, activityTask.getWorkflowExecution().getWorkflowId());
1✔
450
      MDC.put(LoggerTag.WORKFLOW_TYPE, activityTask.getWorkflowType().getName());
1✔
451
      MDC.put(LoggerTag.RUN_ID, activityTask.getWorkflowExecution().getRunId());
1✔
452
      try {
453
        ScheduledFuture<?> startToCloseTimeoutFuture = null;
1✔
454

455
        if (activityTask.hasStartToCloseTimeout()) {
1✔
456
          startToCloseTimeoutFuture =
1✔
457
              scheduledExecutor.schedule(
1✔
458
                  new StartToCloseTimeoutHandler(attemptTask),
459
                  ProtobufTimeUtils.toJavaDuration(
1✔
460
                          attemptTask.getAttemptTask().getStartToCloseTimeout())
1✔
461
                      .toMillis(),
1✔
462
                  TimeUnit.MILLISECONDS);
463
        }
464

465
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
466

467
        ActivityTaskHandler.Result activityHandlerResult;
468
        Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
469
        try {
470
          activityHandlerResult =
1✔
471
              handler.handle(new ActivityTask(activityTask, () -> {}), metricsScope, true);
1✔
472
        } finally {
473
          sw.stop();
1✔
474
        }
475

476
        // Cancel startToCloseTimeoutFuture if it's not yet fired.
477
        boolean startToCloseTimeoutFired =
1✔
478
            startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);
1✔
479

480
        // We make sure that the result handling code following this statement is mutual exclusive
481
        // with the startToClose timeout handler.
482
        // If startToClose fired, scheduling of the next attempt is taken care by the
483
        // StartToCloseTimeoutHandler.
484
        // If execution is already completed, this attempt handling shouldn't proceed, nothing to do
485
        // with result. The typical scenario may be fired scheduleToClose.
486
        boolean shouldDiscardTheResult = startToCloseTimeoutFired || executionContext.isCompleted();
1✔
487
        if (shouldDiscardTheResult) {
1✔
488
          return;
1✔
489
        }
490

491
        handleResult(activityHandlerResult, attemptTask, metricsScope);
1✔
492
      } catch (Throwable ex) {
1✔
493
        // handleLocalActivity is expected to never throw an exception and return a result
494
        // that can be used for a workflow callback if this method throws, it's a bug.
495
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
1✔
496
        executionContext.callback(
1✔
497
            processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex));
1✔
498
        throw ex;
1✔
499
      } finally {
500
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
501
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
502
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
503
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
504
        MDC.remove(LoggerTag.RUN_ID);
1✔
505
      }
506
    }
1✔
507

508
    private void handleResult(
509
        ActivityTaskHandler.Result activityHandlerResult,
510
        LocalActivityAttemptTask attemptTask,
511
        Scope metricsScope) {
512
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
513
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
514
      int currentAttempt = activityTask.getAttempt();
1✔
515

516
      // Success
517
      if (activityHandlerResult.getTaskCompleted() != null) {
1✔
518
        boolean completedByThisInvocation =
1✔
519
            executionContext.callback(
1✔
520
                LocalActivityResult.completed(activityHandlerResult, currentAttempt));
1✔
521
        if (completedByThisInvocation) {
1✔
522
          // We report this metric only if the execution was completed by us right now, not by any
523
          // timeout earlier.
524
          // Completion by another attempt is not possible by another attempt earlier where we
525
          // checked if startToClose fired.
526
          com.uber.m3.util.Duration e2eDuration =
527
              com.uber.m3.util.Duration.ofMillis(
1✔
528
                  System.currentTimeMillis() - executionContext.getOriginalScheduledTimestamp());
1✔
529
          metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
530
        }
531
        return;
1✔
532
      }
533

534
      // Cancellation
535
      if (activityHandlerResult.getTaskCanceled() != null) {
1✔
536
        executionContext.callback(
×
537
            LocalActivityResult.cancelled(activityHandlerResult, currentAttempt));
×
538
        return;
×
539
      }
540

541
      // Failure
542
      Preconditions.checkState(
1✔
543
          activityHandlerResult.getTaskFailed() != null,
1✔
544
          "One of taskCompleted, taskCanceled or taskFailed must be set");
545

546
      Failure executionFailure =
1✔
547
          activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
1✔
548
      Throwable executionThrowable = activityHandlerResult.getTaskFailed().getFailure();
1✔
549

550
      RetryDecision retryDecision =
1✔
551
          shouldRetry(
1✔
552
              executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure());
1✔
553

554
      if (retryDecision.doNextAttempt()) {
1✔
555
        scheduleNextAttempt(
1✔
556
            executionContext,
557
            Objects.requireNonNull(
1✔
558
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
559
            executionFailure);
560
      } else if (retryDecision.failWorkflowTask()) {
1✔
561
        executionContext.callback(
×
562
            processingFailed(executionContext.getActivityId(), currentAttempt, executionThrowable));
×
563
      } else {
564
        executionContext.callback(
1✔
565
            failed(
1✔
566
                executionContext.getActivityId(),
1✔
567
                currentAttempt,
568
                retryDecision.retryState,
1✔
569
                executionFailure,
570
                retryDecision.nextAttemptBackoff));
1✔
571
      }
572
    }
1✔
573

574
    @Override
575
    public Throwable wrapFailure(LocalActivityAttemptTask task, Throwable failure) {
576
      return new RuntimeException("Failure processing local activity task.", failure);
1✔
577
    }
578
  }
579

580
  private class LocalActivityRetryHandler implements Runnable {
581
    private final @Nonnull LocalActivityExecutionContext executionContext;
582
    private final @Nonnull PollActivityTaskQueueResponse.Builder activityTask;
583

584
    private LocalActivityRetryHandler(
585
        @Nonnull LocalActivityExecutionContext executionContext,
586
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
1✔
587
      this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
1✔
588
      this.activityTask = Objects.requireNonNull(activityTask, "activityTask");
1✔
589
    }
1✔
590

591
    @Override
592
    public void run() {
593
      submitRetry(executionContext, activityTask);
1✔
594
    }
1✔
595
  }
596

597
  /** Used to perform both scheduleToStart and scheduleToClose timeouts. */
598
  private static class FinalTimeoutHandler implements Runnable {
599
    private final LocalActivityExecutionContext executionContext;
600
    private final TimeoutType timeoutType;
601

602
    public FinalTimeoutHandler(
603
        TimeoutType timeoutType, LocalActivityExecutionContext executionContext) {
1✔
604
      this.executionContext = executionContext;
1✔
605
      this.timeoutType = timeoutType;
1✔
606
    }
1✔
607

608
    @Override
609
    public void run() {
610
      executionContext.callback(
1✔
611
          failed(
1✔
612
              executionContext.getActivityId(),
1✔
613
              executionContext.getCurrentAttempt(),
1✔
614
              RetryState.RETRY_STATE_TIMEOUT,
615
              newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
616
              null));
617
    }
1✔
618
  }
619

620
  private class StartToCloseTimeoutHandler implements Runnable {
621
    private final LocalActivityAttemptTask attemptTask;
622

623
    private StartToCloseTimeoutHandler(LocalActivityAttemptTask attemptTask) {
1✔
624
      this.attemptTask = attemptTask;
1✔
625
    }
1✔
626

627
    @Override
628
    public void run() {
629
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
630
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
631
      String activityId = activityTask.getActivityId();
1✔
632

633
      int timingOutAttempt = activityTask.getAttempt();
1✔
634

635
      RetryDecision retryDecision = shouldRetry(executionContext, activityTask, null);
1✔
636
      if (retryDecision.doNextAttempt()) {
1✔
637
        scheduleNextAttempt(
1✔
638
            executionContext,
639
            Objects.requireNonNull(
1✔
640
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
641
            newTimeoutFailure(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, null));
1✔
642
      } else {
643
        // RetryState.RETRY_STATE_TIMEOUT happens only when scheduleToClose is fired
644
        // scheduleToClose timeout is effectively replacing the original startToClose
645
        TimeoutType timeoutType =
646
            RetryState.RETRY_STATE_TIMEOUT.equals(retryDecision.retryState)
1✔
647
                ? TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
1✔
648
                : TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE;
1✔
649
        executionContext.callback(
1✔
650
            failed(
1✔
651
                activityId,
652
                timingOutAttempt,
653
                retryDecision.retryState,
1✔
654
                newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
655
                retryDecision.nextAttemptBackoff));
1✔
656
      }
657
    }
1✔
658
  }
659

660
  @Override
661
  public boolean start() {
662
    if (handler.isAnyTypeSupported()) {
1✔
663
      this.scheduledExecutor =
1✔
664
          Executors.newSingleThreadScheduledExecutor(
1✔
665
              r -> {
666
                Thread thread = new Thread(r);
1✔
667
                thread.setName(
1✔
668
                    WorkerThreadsNameHelper.getLocalActivitySchedulerThreadPrefix(
1✔
669
                        namespace, taskQueue));
670
                return thread;
1✔
671
              });
672

673
      this.activityAttemptTaskExecutor =
1✔
674
          new PollTaskExecutor<>(
675
              namespace,
676
              taskQueue,
677
              options.getIdentity(),
1✔
678
              new AttemptTaskHandlerImpl(handler),
679
              pollerOptions,
680
              options.getTaskExecutorThreadPoolSize(),
1✔
681
              workerMetricsScope,
682
              false);
683

684
      this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
685
      return true;
1✔
686
    } else {
687
      return false;
1✔
688
    }
689
  }
690

691
  @Override
692
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
693
    if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
1✔
694
      return activityAttemptTaskExecutor
1✔
695
          .shutdown(shutdownManager, interruptTasks)
1✔
696
          .thenCompose(
1✔
697
              r ->
698
                  shutdownManager.shutdownExecutor(
1✔
699
                      scheduledExecutor, this + "#scheduledExecutor", Duration.ofSeconds(1)))
1✔
700
          .exceptionally(
1✔
701
              e -> {
702
                log.error("[BUG] Unexpected exception during shutdown", e);
×
703
                return null;
×
704
              });
705
    } else {
706
      return CompletableFuture.completedFuture(null);
1✔
707
    }
708
  }
709

710
  @Override
711
  public void awaitTermination(long timeout, TimeUnit unit) {
712
    long timeoutMillis = unit.toMillis(timeout);
1✔
713
    ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
1✔
714
  }
1✔
715

716
  @Override
717
  public boolean isShutdown() {
718
    return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown();
×
719
  }
720

721
  @Override
722
  public boolean isTerminated() {
723
    return activityAttemptTaskExecutor != null
1✔
724
        && activityAttemptTaskExecutor.isTerminated()
×
725
        && scheduledExecutor.isTerminated();
1✔
726
  }
727

728
  @Override
729
  public WorkerLifecycleState getLifecycleState() {
730
    if (activityAttemptTaskExecutor == null) {
1✔
731
      return WorkerLifecycleState.NOT_STARTED;
×
732
    }
733
    if (activityAttemptTaskExecutor.isShutdown()) {
1✔
734
      // return TERMINATED only if both pollExecutor and taskExecutor are terminated
735
      if (activityAttemptTaskExecutor.isTerminated() && scheduledExecutor.isTerminated()) {
×
736
        return WorkerLifecycleState.TERMINATED;
×
737
      } else {
738
        return WorkerLifecycleState.SHUTDOWN;
×
739
      }
740
    }
741
    return WorkerLifecycleState.ACTIVE;
1✔
742
  }
743

744
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
745
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
746
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
747
      pollerOptions =
1✔
748
          PollerOptions.newBuilder(pollerOptions)
1✔
749
              .setPollThreadNamePrefix(
1✔
750
                  WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
1✔
751
              .build();
1✔
752
    }
753
    return pollerOptions;
1✔
754
  }
755

756
  public LocalActivityDispatcher getLocalActivityScheduler() {
757
    return laScheduler;
1✔
758
  }
759

760
  private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {
761
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
762
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
763
    if (cause != null) {
1✔
764
      result.setCause(cause);
1✔
765
    }
766
    return result.build();
1✔
767
  }
768

769
  private static boolean isRetryPolicyNotSet(
770
      PollActivityTaskQueueResponseOrBuilder pollActivityTask) {
771
    return !pollActivityTask.hasScheduleToCloseTimeout()
1✔
772
        && (!pollActivityTask.hasRetryPolicy()
1✔
773
            || pollActivityTask.getRetryPolicy().getMaximumAttempts() <= 0);
1✔
774
  }
775

776
  private static boolean isNonRetryableApplicationFailure(@Nullable Throwable executionThrowable) {
777
    return executionThrowable instanceof ApplicationFailure
1✔
778
        && ((ApplicationFailure) executionThrowable).isNonRetryable();
1✔
779
  }
780

781
  private static Optional<Duration> getNextRetryDelay(@Nullable Throwable executionThrowable) {
782
    if (executionThrowable instanceof ApplicationFailure) {
1✔
783
      return Optional.ofNullable(((ApplicationFailure) executionThrowable).getNextRetryDelay());
1✔
784
    }
785
    return Optional.empty();
1✔
786
  }
787

788
  private static class RetryDecision {
789
    private final @Nullable RetryState retryState;
790
    private final @Nullable Duration nextAttemptBackoff;
791

792
    // No next local attempts
793
    public RetryDecision(@Nonnull RetryState retryState, @Nullable Duration nextAttemptBackoff) {
1✔
794
      this.retryState = retryState;
1✔
795
      this.nextAttemptBackoff = nextAttemptBackoff;
1✔
796
    }
1✔
797

798
    // Do the next attempt
799
    public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
1✔
800
      this.retryState = null;
1✔
801
      this.nextAttemptBackoff = Objects.requireNonNull(nextAttemptBackoff);
1✔
802
    }
1✔
803

804
    public boolean doNextAttempt() {
805
      return retryState == null;
1✔
806
    }
807

808
    public boolean failWorkflowTask() {
809
      return RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR.equals(retryState);
1✔
810
    }
811
  }
812
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc