• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #244

10 Apr 2024 08:19PM UTC coverage: 77.465% (-0.08%) from 77.549%
#244

push

github

web-flow
Slot supplier interface & fixed-size implementation (#2014)

https://github.com/temporalio/proposals/blob/master/all-sdk/autotuning.md

286 of 388 new or added lines in 25 files covered. (73.71%)

3 existing lines in 3 files now uncovered.

19116 of 24677 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.85
/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.worker.LocalActivityResult.failed;
24
import static io.temporal.internal.worker.LocalActivityResult.processingFailed;
25

26
import com.google.common.base.Preconditions;
27
import com.google.common.util.concurrent.SimpleTimeLimiter;
28
import com.google.common.util.concurrent.TimeLimiter;
29
import com.uber.m3.tally.Scope;
30
import com.uber.m3.tally.Stopwatch;
31
import com.uber.m3.util.ImmutableMap;
32
import io.grpc.Deadline;
33
import io.temporal.api.enums.v1.RetryState;
34
import io.temporal.api.enums.v1.TimeoutType;
35
import io.temporal.api.failure.v1.Failure;
36
import io.temporal.api.failure.v1.TimeoutFailureInfo;
37
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
38
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponseOrBuilder;
39
import io.temporal.common.RetryOptions;
40
import io.temporal.failure.ApplicationFailure;
41
import io.temporal.internal.activity.ActivityPollResponseToInfo;
42
import io.temporal.internal.common.ProtobufTimeUtils;
43
import io.temporal.internal.common.RetryOptionsUtils;
44
import io.temporal.internal.logging.LoggerTag;
45
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
46
import io.temporal.serviceclient.MetricsTag;
47
import io.temporal.worker.MetricsType;
48
import io.temporal.worker.WorkerMetricsTag;
49
import io.temporal.worker.tuning.*;
50
import io.temporal.workflow.Functions;
51
import java.time.Duration;
52
import java.util.Objects;
53
import java.util.concurrent.*;
54
import javax.annotation.Nonnull;
55
import javax.annotation.Nullable;
56
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
58
import org.slf4j.MDC;
59

60
final class LocalActivityWorker implements Startable, Shutdownable {
61
  private static final Logger log = LoggerFactory.getLogger(LocalActivityWorker.class);
1✔
62

63
  private final ActivityTaskHandler handler;
64
  private final String namespace;
65
  private final String taskQueue;
66

67
  private final SingleWorkerOptions options;
68

69
  private final LocalActivityDispatcherImpl laScheduler;
70

71
  private final PollerOptions pollerOptions;
72
  private final Scope workerMetricsScope;
73

74
  private ScheduledExecutorService scheduledExecutor;
75
  private PollTaskExecutor<LocalActivityAttemptTask> activityAttemptTaskExecutor;
76
  private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
77

78
  public LocalActivityWorker(
79
      @Nonnull String namespace,
80
      @Nonnull String taskQueue,
81
      @Nonnull SingleWorkerOptions options,
82
      @Nonnull ActivityTaskHandler handler,
83
      @Nonnull TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier) {
1✔
84
    this.namespace = Objects.requireNonNull(namespace);
1✔
85
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
86
    this.handler = handler;
1✔
87
    this.slotSupplier = Objects.requireNonNull(slotSupplier);
1✔
88
    this.laScheduler = new LocalActivityDispatcherImpl(slotSupplier);
1✔
89
    this.options = Objects.requireNonNull(options);
1✔
90
    this.pollerOptions = getPollerOptions(options);
1✔
91
    this.workerMetricsScope =
1✔
92
        MetricsTag.tagged(
1✔
93
            options.getMetricsScope(), WorkerMetricsTag.WorkerType.LOCAL_ACTIVITY_WORKER);
1✔
94
    this.slotSupplier.setMetricsScope(this.workerMetricsScope);
1✔
95
  }
1✔
96

97
  private void submitRetry(
98
      @Nonnull LocalActivityExecutionContext executionContext,
99
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
100
    submitAttempt(executionContext, activityTask);
1✔
101
  }
1✔
102

103
  private void submitAttempt(
104
      @Nonnull LocalActivityExecutionContext executionContext,
105
      @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
106
    @Nullable Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout();
1✔
107
    @Nullable ScheduledFuture<?> scheduleToStartFuture = null;
1✔
108
    if (scheduleToStartTimeout != null) {
1✔
109
      scheduleToStartFuture =
1✔
110
          scheduledExecutor.schedule(
1✔
111
              new FinalTimeoutHandler(TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_START, executionContext),
112
              scheduleToStartTimeout.toMillis(),
1✔
113
              TimeUnit.MILLISECONDS);
114
    }
115

116
    activityTask.setCurrentAttemptScheduledTime(ProtobufTimeUtils.getCurrentProtoTime());
1✔
117
    LocalActivityAttemptTask task =
1✔
118
        new LocalActivityAttemptTask(executionContext, activityTask, scheduleToStartFuture);
119
    activityAttemptTaskExecutor.process(task);
1✔
120
  }
1✔
121

122
  /**
123
   * @param executionContext execution context of the activity
124
   * @param activityTask activity task
125
   * @param attemptThrowable exception happened during the activity attempt. Can be null.
126
   * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
127
   *     applicable
128
   */
129
  @Nonnull
130
  private RetryDecision shouldRetry(
131
      LocalActivityExecutionContext executionContext,
132
      PollActivityTaskQueueResponseOrBuilder activityTask,
133
      @Nullable Throwable attemptThrowable) {
134
    int currentAttempt = activityTask.getAttempt();
1✔
135

136
    if (isNonRetryableApplicationFailure(attemptThrowable)) {
1✔
137
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
1✔
138
    }
139

140
    if (attemptThrowable instanceof Error) {
1✔
141
      // TODO Error inside Local Activity shouldn't be failing the local activity call.
142
      //  Instead we should fail Workflow Task. Implement a special flag for that in the result.
143
      //          task.callback(executionFailed(activityHandlerResult,
144
      // RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, currentAttempt));
145
      // don't just swallow Error from activities, propagate it to the top
146
      throw (Error) attemptThrowable;
1✔
147
    }
148

149
    if (isRetryPolicyNotSet(activityTask)) {
1✔
150
      return new RetryDecision(RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET, null);
×
151
    }
152

153
    RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
154

155
    if (RetryOptionsUtils.isNotRetryable(retryOptions, attemptThrowable)) {
1✔
156
      return new RetryDecision(RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE, null);
×
157
    }
158

159
    if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt)) {
1✔
160
      return new RetryDecision(RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED, null);
1✔
161
    }
162

163
    long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
164
    Duration sleep = Duration.ofMillis(sleepMillis);
1✔
165
    if (RetryOptionsUtils.isDeadlineReached(
1✔
166
        executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
167
      return new RetryDecision(RetryState.RETRY_STATE_TIMEOUT, null);
1✔
168
    }
169

170
    if (sleep.compareTo(executionContext.getLocalRetryThreshold()) > 0) {
1✔
171
      // RETRY_STATE_IN_PROGRESS shows that it's not the end for this local activity execution from
172
      // the workflow point of view. It's also not conflicting with any other situations and
173
      // uniquely identifies the reach of the local retries and a need to schedule a timer.
174
      return new RetryDecision(RetryState.RETRY_STATE_IN_PROGRESS, sleep);
1✔
175
    }
176

177
    return new RetryDecision(sleep);
1✔
178
  }
179

180
  /**
181
   * @param executionContext execution context of the activity
182
   * @param backoff delay time in milliseconds to the next attempt
183
   * @param failure if supplied, it will be used to override {@link
184
   *     LocalActivityExecutionContext#getLastAttemptFailure()}
185
   */
186
  private void scheduleNextAttempt(
187
      LocalActivityExecutionContext executionContext,
188
      @Nonnull Duration backoff,
189
      @Nullable Failure failure) {
190
    PollActivityTaskQueueResponse.Builder nextActivityTask =
1✔
191
        executionContext.getNextAttemptActivityTask(failure);
1✔
192
    Deadline.after(backoff.toMillis(), TimeUnit.MILLISECONDS)
1✔
193
        .runOnExpiration(
1✔
194
            new LocalActivityRetryHandler(executionContext, nextActivityTask), scheduledExecutor);
195
  }
1✔
196

197
  private class LocalActivityDispatcherImpl implements LocalActivityDispatcher {
198
    /**
199
     * Retries always get a green light, but we have a backpressure for new tasks if the queue fills
200
     * up with not picked up new executions
201
     */
202
    private final TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier;
203

204
    public LocalActivityDispatcherImpl(TrackingSlotSupplier<LocalActivitySlotInfo> slotSupplier) {
1✔
205
      // we allow submitters to block and wait till the workflow task heartbeat to allow the worker
206
      // to tolerate spikes of short local activity executions.
207
      this.slotSupplier = slotSupplier;
1✔
208
    }
1✔
209

210
    @Override
211
    public boolean dispatch(
212
        @Nonnull ExecuteLocalActivityParameters params,
213
        @Nonnull Functions.Proc1<LocalActivityResult> resultCallback,
214
        @Nullable Deadline acceptanceDeadline) {
215
      WorkerLifecycleState lifecycleState = getLifecycleState();
1✔
216
      switch (lifecycleState) {
1✔
217
        case NOT_STARTED:
218
          throw new IllegalStateException(
×
219
              "Local Activity Worker is not started, no activities were registered");
220
        case SHUTDOWN:
221
          throw new IllegalStateException("Local Activity Worker is shutdown");
×
222
        case TERMINATED:
223
          throw new IllegalStateException("Local Activity Worker is terminated");
×
224
        case SUSPENDED:
225
          throw new IllegalStateException(
×
226
              "[BUG] Local Activity Worker is suspended. Suspension is not supported for Local Activity Worker");
227
      }
228

229
      Preconditions.checkArgument(
1✔
230
          handler.isTypeSupported(params.getActivityType().getName()),
1✔
231
          "Activity type %s is not supported by the local activity worker",
232
          params.getActivityType().getName());
1✔
233

234
      long passedFromOriginalSchedulingMs =
235
          System.currentTimeMillis() - params.getOriginalScheduledTimestamp();
1✔
236
      Duration scheduleToCloseTimeout = params.getScheduleToCloseTimeout();
1✔
237
      Deadline scheduleToCloseDeadline = null;
1✔
238
      if (scheduleToCloseTimeout != null) {
1✔
239
        scheduleToCloseDeadline =
1✔
240
            Deadline.after(
1✔
241
                scheduleToCloseTimeout.toMillis() - passedFromOriginalSchedulingMs,
1✔
242
                TimeUnit.MILLISECONDS);
243
      }
244

245
      LocalActivityExecutionContext executionContext =
1✔
246
          new LocalActivityExecutionContext(
247
              params, resultCallback, scheduleToCloseDeadline, slotSupplier);
248

249
      PollActivityTaskQueueResponse.Builder activityTask = executionContext.getInitialTask();
1✔
250

251
      boolean retryIsNotAllowed =
1✔
252
          failIfRetryIsNotAllowedByNewPolicy(executionContext, activityTask);
1✔
253
      if (retryIsNotAllowed) {
1✔
254
        return true;
1✔
255
      }
256

257
      return submitANewExecution(executionContext, activityTask, acceptanceDeadline);
1✔
258
    }
259

260
    private boolean submitANewExecution(
261
        @Nonnull LocalActivityExecutionContext executionContext,
262
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask,
263
        @Nullable Deadline heartbeatDeadline) {
264
      long acceptanceTimeoutMs = 0;
1✔
265
      boolean timeoutIsScheduleToStart = false;
1✔
266
      if (heartbeatDeadline != null) {
1✔
267
        acceptanceTimeoutMs = heartbeatDeadline.timeRemaining(TimeUnit.MILLISECONDS);
1✔
268
      }
269
      Duration scheduleToStartTimeout = executionContext.getScheduleToStartTimeout();
1✔
270
      if (scheduleToStartTimeout != null) {
1✔
271
        long scheduleToStartTimeoutMs = scheduleToStartTimeout.toMillis();
1✔
272
        if (scheduleToStartTimeoutMs > 0 && scheduleToStartTimeoutMs < acceptanceTimeoutMs) {
1✔
273
          acceptanceTimeoutMs = scheduleToStartTimeoutMs;
1✔
274
          timeoutIsScheduleToStart = true;
1✔
275
        }
276
      }
277
      try {
278
        SlotPermit permit = null;
1✔
279
        SlotReservationData reservationCtx =
1✔
280
            new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId());
1✔
281
        if (acceptanceTimeoutMs <= 0) {
1✔
282
          permit = slotSupplier.reserveSlot(reservationCtx);
1✔
283
        } else {
284
          try {
285
            TimeLimiter timeLimiter = SimpleTimeLimiter.create(Executors.newCachedThreadPool());
1✔
286
            timeLimiter.callWithTimeout(
1✔
287
                () -> slotSupplier.reserveSlot(reservationCtx),
1✔
288
                acceptanceTimeoutMs,
289
                TimeUnit.MILLISECONDS);
290
          } catch (TimeoutException e) {
1✔
291
            // In the event that we timed out waiting for a permit *because of schedule to start* we
292
            // still want to proceed with the "attempt" with a null permit, which will then
293
            // immediately fail with the s2s timeout.
294
            if (!timeoutIsScheduleToStart) {
1✔
NEW
295
              log.warn(
×
296
                  "LocalActivity queue is full and submitting timed out for activity {} with acceptanceTimeoutMs: {}",
NEW
297
                  activityTask.getActivityId(),
×
NEW
298
                  acceptanceTimeoutMs);
×
NEW
299
              return false;
×
300
            }
301
          }
1✔
302
        }
303

304
        executionContext.setPermit(permit);
1✔
305
        // we should publish scheduleToClose before submission, so the handlers always see a full
306
        // state of executionContext
307
        @Nullable Deadline scheduleToCloseDeadline = executionContext.getScheduleToCloseDeadline();
1✔
308
        if (scheduleToCloseDeadline != null) {
1✔
309
          ScheduledFuture<?> scheduleToCloseFuture =
1✔
310
              scheduledExecutor.schedule(
1✔
311
                  new FinalTimeoutHandler(
312
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, executionContext),
313
                  scheduleToCloseDeadline.timeRemaining(TimeUnit.MILLISECONDS),
1✔
314
                  TimeUnit.MILLISECONDS);
315
          executionContext.setScheduleToCloseFuture(scheduleToCloseFuture);
1✔
316
        }
317
        submitAttempt(executionContext, activityTask);
1✔
318
        log.trace("LocalActivity queued: {}", activityTask.getActivityId());
1✔
319
        return true;
1✔
320
      } catch (InterruptedException e) {
×
321
        Thread.currentThread().interrupt();
×
322
        return false;
×
NEW
323
      } catch (Exception e) {
×
NEW
324
        log.warn("Error while trying to reserve a slot for local activity", e.getCause());
×
NEW
325
        return false;
×
326
      }
327
    }
328

329
    /**
330
     * @param attemptTask local activity retry attempt task specifying the retry we are about to
331
     *     schedule
332
     * @return true if the retry attempt specified by {@code task} is not allowed by the current
333
     *     retry policy and the error was submitted in the callback, false otherwise
334
     */
335
    private boolean failIfRetryIsNotAllowedByNewPolicy(
336
        LocalActivityExecutionContext executionContext,
337
        PollActivityTaskQueueResponseOrBuilder attemptTask) {
338
      final Failure previousExecutionFailure = executionContext.getPreviousExecutionFailure();
1✔
339
      if (previousExecutionFailure != null) {
1✔
340
        // This is not an original local execution, it's a continuation using a workflow timer.
341
        // We should verify if the RetryOptions currently supplied in the workflow still allow the
342
        // retry.
343
        // If not, we need to recreate the same structure of an error like it would happen before we
344
        // started to sleep on the timer, at the end of the previous local execution.
345
        RetryState retryState =
1✔
346
            shouldStillRetry(executionContext, attemptTask, previousExecutionFailure);
1✔
347
        if (!RetryState.RETRY_STATE_IN_PROGRESS.equals(retryState)) {
1✔
348
          Failure failure;
349
          if (RetryState.RETRY_STATE_TIMEOUT.equals(retryState)) {
1✔
350
            if (previousExecutionFailure.hasTimeoutFailureInfo()
×
351
                && TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE.equals(
×
352
                    previousExecutionFailure.getTimeoutFailureInfo().getTimeoutType())) {
×
353
              // This scenario should behave the same way as a startToClose timeout happening and
354
              // encountering
355
              // RetryState#TIMEOUT during calculation of the next attempt (which is effectively a
356
              // scheduleToClose
357
              // timeout).
358
              // See how StartToCloseTimeoutHandler or
359
              // io.temporal.internal.testservice.StateMachines#timeoutActivityTask
360
              // discard startToClose in this case and replaces it with scheduleToClose
361
              failure =
×
362
                  newTimeoutFailure(
×
363
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
364
                      previousExecutionFailure.getCause());
×
365
            } else {
366
              failure =
×
367
                  newTimeoutFailure(
×
368
                      TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, previousExecutionFailure);
369
            }
370
          } else {
371
            failure = previousExecutionFailure;
1✔
372
          }
373

374
          executionContext.callback(
1✔
375
              failed(
1✔
376
                  executionContext.getActivityId(),
1✔
377
                  attemptTask.getAttempt(),
1✔
378
                  retryState,
379
                  failure,
380
                  null));
381
          return true;
1✔
382
        }
383
      }
384
      return false;
1✔
385
    }
386

387
    /**
388
     * @param executionContext execution context of the activity
389
     * @param activityTask activity task
390
     * @param previousLocalExecutionFailure failure happened during previous local activity
391
     *     execution. Can be null.
392
     * @return decision to retry or not with a retry state, backoff or delay to the next attempt if
393
     *     applicable
394
     */
395
    @Nonnull
396
    private RetryState shouldStillRetry(
397
        LocalActivityExecutionContext executionContext,
398
        PollActivityTaskQueueResponseOrBuilder activityTask,
399
        @Nullable Failure previousLocalExecutionFailure) {
400
      int currentAttempt = activityTask.getAttempt();
1✔
401

402
      if (isRetryPolicyNotSet(activityTask)) {
1✔
403
        return RetryState.RETRY_STATE_RETRY_POLICY_NOT_SET;
×
404
      }
405

406
      RetryOptions retryOptions = RetryOptionsUtils.toRetryOptions(activityTask.getRetryPolicy());
1✔
407

408
      if (previousLocalExecutionFailure != null
1✔
409
          && previousLocalExecutionFailure.hasApplicationFailureInfo()
1✔
410
          && RetryOptionsUtils.isNotRetryable(
×
411
              retryOptions, previousLocalExecutionFailure.getApplicationFailureInfo().getType())) {
×
412
        return RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE;
×
413
      }
414

415
      // The current attempt didn't happen yet in this check, that's why -1
416
      if (RetryOptionsUtils.areAttemptsReached(retryOptions, currentAttempt - 1)) {
1✔
417
        return RetryState.RETRY_STATE_MAXIMUM_ATTEMPTS_REACHED;
1✔
418
      }
419

420
      long sleepMillis = retryOptions.calculateSleepTime(currentAttempt);
1✔
421
      if (RetryOptionsUtils.isDeadlineReached(
1✔
422
          executionContext.getScheduleToCloseDeadline(), sleepMillis)) {
1✔
423
        return RetryState.RETRY_STATE_TIMEOUT;
×
424
      }
425

426
      return RetryState.RETRY_STATE_IN_PROGRESS;
1✔
427
    }
428
  }
429

430
  private class AttemptTaskHandlerImpl
431
      implements PollTaskExecutor.TaskHandler<LocalActivityAttemptTask> {
432

433
    private final ActivityTaskHandler handler;
434

435
    private AttemptTaskHandlerImpl(ActivityTaskHandler handler) {
1✔
436
      this.handler = handler;
1✔
437
    }
1✔
438

439
    @Override
440
    public void handle(LocalActivityAttemptTask attemptTask) throws Exception {
441
      // cancel scheduleToStart timeout if not already fired
442
      @Nullable ScheduledFuture<?> scheduleToStartFuture = attemptTask.getScheduleToStartFuture();
1✔
443
      boolean scheduleToStartFired =
1✔
444
          scheduleToStartFuture != null && !scheduleToStartFuture.cancel(false);
1✔
445

446
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
447
      executionContext.newAttempt();
1✔
448
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
449

450
      // if an activity was already completed by any mean like scheduleToClose or scheduleToStart,
451
      // discard this attempt, this execution is completed.
452
      // The scheduleToStartFired check here is a bit overkill, but allows to catch an edge case
453
      // where
454
      // scheduleToStart is already fired, but didn't report a completion yet.
455
      boolean shouldDiscardTheAttempt = scheduleToStartFired || executionContext.isCompleted();
1✔
456
      if (shouldDiscardTheAttempt) {
1✔
457
        return;
×
458
      }
459

460
      Scope metricsScope =
1✔
461
          workerMetricsScope.tagged(
1✔
462
              ImmutableMap.of(
1✔
463
                  MetricsTag.ACTIVITY_TYPE,
464
                  activityTask.getActivityType().getName(),
1✔
465
                  MetricsTag.WORKFLOW_TYPE,
466
                  activityTask.getWorkflowType().getName()));
1✔
467

468
      MDC.put(LoggerTag.ACTIVITY_ID, activityTask.getActivityId());
1✔
469
      MDC.put(LoggerTag.ACTIVITY_TYPE, activityTask.getActivityType().getName());
1✔
470
      MDC.put(LoggerTag.WORKFLOW_ID, activityTask.getWorkflowExecution().getWorkflowId());
1✔
471
      MDC.put(LoggerTag.WORKFLOW_TYPE, activityTask.getWorkflowType().getName());
1✔
472
      MDC.put(LoggerTag.RUN_ID, activityTask.getWorkflowExecution().getRunId());
1✔
473

474
      slotSupplier.markSlotUsed(
1✔
475
          new LocalActivitySlotInfo(
476
              ActivityPollResponseToInfo.toActivityInfoImpl(
1✔
477
                  activityTask, namespace, taskQueue, true),
1✔
478
              options.getIdentity(),
1✔
479
              options.getBuildId()),
1✔
480
          executionContext.getPermit());
1✔
481

482
      try {
483
        ScheduledFuture<?> startToCloseTimeoutFuture = null;
1✔
484

485
        if (activityTask.hasStartToCloseTimeout()) {
1✔
486
          startToCloseTimeoutFuture =
1✔
487
              scheduledExecutor.schedule(
1✔
488
                  new StartToCloseTimeoutHandler(attemptTask),
489
                  ProtobufTimeUtils.toJavaDuration(
1✔
490
                          attemptTask.getAttemptTask().getStartToCloseTimeout())
1✔
491
                      .toMillis(),
1✔
492
                  TimeUnit.MILLISECONDS);
493
        }
494

495
        metricsScope.counter(MetricsType.LOCAL_ACTIVITY_TOTAL_COUNTER).inc(1);
1✔
496

497
        ActivityTaskHandler.Result activityHandlerResult;
498
        Stopwatch sw = metricsScope.timer(MetricsType.LOCAL_ACTIVITY_EXECUTION_LATENCY).start();
1✔
499
        try {
500
          activityHandlerResult =
1✔
501
              handler.handle(new ActivityTask(activityTask, () -> {}), metricsScope, true);
1✔
502
        } finally {
503
          sw.stop();
1✔
504
        }
505

506
        // Cancel startToCloseTimeoutFuture if it's not yet fired.
507
        boolean startToCloseTimeoutFired =
1✔
508
            startToCloseTimeoutFuture != null && !startToCloseTimeoutFuture.cancel(false);
1✔
509

510
        // We make sure that the result handling code following this statement is mutual exclusive
511
        // with the startToClose timeout handler.
512
        // If startToClose fired, scheduling of the next attempt is taken care by the
513
        // StartToCloseTimeoutHandler.
514
        // If execution is already completed, this attempt handling shouldn't proceed, nothing to do
515
        // with result. The typical scenario may be fired scheduleToClose.
516
        boolean shouldDiscardTheResult = startToCloseTimeoutFired || executionContext.isCompleted();
1✔
517
        if (shouldDiscardTheResult) {
1✔
518
          return;
1✔
519
        }
520

521
        handleResult(activityHandlerResult, attemptTask, metricsScope);
1✔
522
      } catch (Throwable ex) {
1✔
523
        // handleLocalActivity is expected to never throw an exception and return a result
524
        // that can be used for a workflow callback if this method throws, it's a bug.
525
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
1✔
526
        executionContext.callback(
1✔
527
            processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex));
1✔
528
        throw ex;
1✔
529
      } finally {
530
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
531
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
532
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
533
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
534
        MDC.remove(LoggerTag.RUN_ID);
1✔
535
      }
536
    }
1✔
537

538
    private void handleResult(
539
        ActivityTaskHandler.Result activityHandlerResult,
540
        LocalActivityAttemptTask attemptTask,
541
        Scope metricsScope) {
542
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
543
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
544
      int currentAttempt = activityTask.getAttempt();
1✔
545

546
      // Success
547
      if (activityHandlerResult.getTaskCompleted() != null) {
1✔
548
        boolean completedByThisInvocation =
1✔
549
            executionContext.callback(
1✔
550
                LocalActivityResult.completed(activityHandlerResult, currentAttempt));
1✔
551
        if (completedByThisInvocation) {
1✔
552
          // We report this metric only if the execution was completed by us right now, not by any
553
          // timeout earlier.
554
          // Completion by another attempt is not possible by another attempt earlier where we
555
          // checked if startToClose fired.
556
          com.uber.m3.util.Duration e2eDuration =
557
              com.uber.m3.util.Duration.ofMillis(
1✔
558
                  System.currentTimeMillis() - executionContext.getOriginalScheduledTimestamp());
1✔
559
          metricsScope.timer(MetricsType.LOCAL_ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
560
        }
561
        return;
1✔
562
      }
563

564
      // Cancellation
565
      if (activityHandlerResult.getTaskCanceled() != null) {
1✔
566
        executionContext.callback(
×
567
            LocalActivityResult.cancelled(activityHandlerResult, currentAttempt));
×
568
        return;
×
569
      }
570

571
      // Failure
572
      Preconditions.checkState(
1✔
573
          activityHandlerResult.getTaskFailed() != null,
1✔
574
          "One of taskCompleted, taskCanceled or taskFailed must be set");
575

576
      Failure executionFailure =
1✔
577
          activityHandlerResult.getTaskFailed().getTaskFailedRequest().getFailure();
1✔
578
      Throwable executionThrowable = activityHandlerResult.getTaskFailed().getFailure();
1✔
579

580
      RetryDecision retryDecision =
1✔
581
          shouldRetry(
1✔
582
              executionContext, activityTask, activityHandlerResult.getTaskFailed().getFailure());
1✔
583

584
      if (retryDecision.doNextAttempt()) {
1✔
585
        scheduleNextAttempt(
1✔
586
            executionContext,
587
            Objects.requireNonNull(
1✔
588
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
589
            executionFailure);
590
      } else if (retryDecision.failWorkflowTask()) {
1✔
591
        executionContext.callback(
×
592
            processingFailed(executionContext.getActivityId(), currentAttempt, executionThrowable));
×
593
      } else {
594
        executionContext.callback(
1✔
595
            failed(
1✔
596
                executionContext.getActivityId(),
1✔
597
                currentAttempt,
598
                retryDecision.retryState,
1✔
599
                executionFailure,
600
                retryDecision.nextAttemptBackoff));
1✔
601
      }
602
    }
1✔
603

604
    @Override
605
    public Throwable wrapFailure(LocalActivityAttemptTask task, Throwable failure) {
606
      return new RuntimeException("Failure processing local activity task.", failure);
1✔
607
    }
608
  }
609

610
  private class LocalActivityRetryHandler implements Runnable {
611
    private final @Nonnull LocalActivityExecutionContext executionContext;
612
    private final @Nonnull PollActivityTaskQueueResponse.Builder activityTask;
613

614
    private LocalActivityRetryHandler(
615
        @Nonnull LocalActivityExecutionContext executionContext,
616
        @Nonnull PollActivityTaskQueueResponse.Builder activityTask) {
1✔
617
      this.executionContext = Objects.requireNonNull(executionContext, "executionContext");
1✔
618
      this.activityTask = Objects.requireNonNull(activityTask, "activityTask");
1✔
619
    }
1✔
620

621
    @Override
622
    public void run() {
623
      submitRetry(executionContext, activityTask);
1✔
624
    }
1✔
625
  }
626

627
  /** Used to perform both scheduleToStart and scheduleToClose timeouts. */
628
  private static class FinalTimeoutHandler implements Runnable {
629
    private final LocalActivityExecutionContext executionContext;
630
    private final TimeoutType timeoutType;
631

632
    public FinalTimeoutHandler(
633
        TimeoutType timeoutType, LocalActivityExecutionContext executionContext) {
1✔
634
      this.executionContext = executionContext;
1✔
635
      this.timeoutType = timeoutType;
1✔
636
    }
1✔
637

638
    @Override
639
    public void run() {
640
      executionContext.callback(
1✔
641
          failed(
1✔
642
              executionContext.getActivityId(),
1✔
643
              executionContext.getCurrentAttempt(),
1✔
644
              RetryState.RETRY_STATE_TIMEOUT,
645
              newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
646
              null));
647
    }
1✔
648
  }
649

650
  private class StartToCloseTimeoutHandler implements Runnable {
651
    private final LocalActivityAttemptTask attemptTask;
652

653
    private StartToCloseTimeoutHandler(LocalActivityAttemptTask attemptTask) {
1✔
654
      this.attemptTask = attemptTask;
1✔
655
    }
1✔
656

657
    @Override
658
    public void run() {
659
      LocalActivityExecutionContext executionContext = attemptTask.getExecutionContext();
1✔
660
      PollActivityTaskQueueResponseOrBuilder activityTask = attemptTask.getAttemptTask();
1✔
661
      String activityId = activityTask.getActivityId();
1✔
662

663
      int timingOutAttempt = activityTask.getAttempt();
1✔
664

665
      RetryDecision retryDecision = shouldRetry(executionContext, activityTask, null);
1✔
666
      if (retryDecision.doNextAttempt()) {
1✔
667
        scheduleNextAttempt(
1✔
668
            executionContext,
669
            Objects.requireNonNull(
1✔
670
                retryDecision.nextAttemptBackoff, "nextAttemptBackoff is expected to not be null"),
1✔
671
            newTimeoutFailure(TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE, null));
1✔
672
      } else {
673
        // RetryState.RETRY_STATE_TIMEOUT happens only when scheduleToClose is fired
674
        // scheduleToClose timeout is effectively replacing the original startToClose
675
        TimeoutType timeoutType =
676
            RetryState.RETRY_STATE_TIMEOUT.equals(retryDecision.retryState)
1✔
677
                ? TimeoutType.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE
1✔
678
                : TimeoutType.TIMEOUT_TYPE_START_TO_CLOSE;
1✔
679
        executionContext.callback(
1✔
680
            failed(
1✔
681
                activityId,
682
                timingOutAttempt,
683
                retryDecision.retryState,
1✔
684
                newTimeoutFailure(timeoutType, executionContext.getLastAttemptFailure()),
1✔
685
                retryDecision.nextAttemptBackoff));
1✔
686
      }
687
    }
1✔
688
  }
689

690
  @Override
691
  public boolean start() {
692
    if (handler.isAnyTypeSupported()) {
1✔
693
      this.scheduledExecutor =
1✔
694
          Executors.newSingleThreadScheduledExecutor(
1✔
695
              r -> {
696
                Thread thread = new Thread(r);
1✔
697
                thread.setName(
1✔
698
                    WorkerThreadsNameHelper.getLocalActivitySchedulerThreadPrefix(
1✔
699
                        namespace, taskQueue));
700
                return thread;
1✔
701
              });
702

703
      this.activityAttemptTaskExecutor =
1✔
704
          new PollTaskExecutor<>(
705
              namespace,
706
              taskQueue,
707
              options.getIdentity(),
1✔
708
              new AttemptTaskHandlerImpl(handler),
709
              pollerOptions,
710
              slotSupplier.maximumSlots(),
1✔
711
              false);
712

713
      this.workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
714
      return true;
1✔
715
    } else {
716
      return false;
1✔
717
    }
718
  }
719

720
  @Override
721
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
722
    if (activityAttemptTaskExecutor != null && !activityAttemptTaskExecutor.isShutdown()) {
1✔
723
      return activityAttemptTaskExecutor
1✔
724
          .shutdown(shutdownManager, interruptTasks)
1✔
725
          .thenCompose(
1✔
726
              r ->
727
                  shutdownManager.shutdownExecutor(
1✔
728
                      scheduledExecutor, this + "#scheduledExecutor", Duration.ofSeconds(1)))
1✔
729
          .exceptionally(
1✔
730
              e -> {
731
                log.error("[BUG] Unexpected exception during shutdown", e);
×
732
                return null;
×
733
              });
734
    } else {
735
      return CompletableFuture.completedFuture(null);
1✔
736
    }
737
  }
738

739
  @Override
740
  public void awaitTermination(long timeout, TimeUnit unit) {
741
    long timeoutMillis = unit.toMillis(timeout);
1✔
742
    ShutdownManager.awaitTermination(scheduledExecutor, timeoutMillis);
1✔
743
  }
1✔
744

745
  @Override
746
  public boolean isShutdown() {
747
    return activityAttemptTaskExecutor != null && activityAttemptTaskExecutor.isShutdown();
×
748
  }
749

750
  @Override
751
  public boolean isTerminated() {
752
    return activityAttemptTaskExecutor != null
1✔
753
        && activityAttemptTaskExecutor.isTerminated()
×
754
        && scheduledExecutor.isTerminated();
1✔
755
  }
756

757
  @Override
758
  public WorkerLifecycleState getLifecycleState() {
759
    if (activityAttemptTaskExecutor == null) {
1✔
760
      return WorkerLifecycleState.NOT_STARTED;
×
761
    }
762
    if (activityAttemptTaskExecutor.isShutdown()) {
1✔
763
      // return TERMINATED only if both pollExecutor and taskExecutor are terminated
764
      if (activityAttemptTaskExecutor.isTerminated() && scheduledExecutor.isTerminated()) {
×
765
        return WorkerLifecycleState.TERMINATED;
×
766
      } else {
767
        return WorkerLifecycleState.SHUTDOWN;
×
768
      }
769
    }
770
    return WorkerLifecycleState.ACTIVE;
1✔
771
  }
772

773
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
774
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
775
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
776
      pollerOptions =
1✔
777
          PollerOptions.newBuilder(pollerOptions)
1✔
778
              .setPollThreadNamePrefix(
1✔
779
                  WorkerThreadsNameHelper.getLocalActivityPollerThreadPrefix(namespace, taskQueue))
1✔
780
              .build();
1✔
781
    }
782
    return pollerOptions;
1✔
783
  }
784

785
  public LocalActivityDispatcher getLocalActivityScheduler() {
786
    return laScheduler;
1✔
787
  }
788

789
  private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) {
790
    TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType);
1✔
791
    Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info);
1✔
792
    if (cause != null) {
1✔
793
      result.setCause(cause);
1✔
794
    }
795
    return result.build();
1✔
796
  }
797

798
  private static boolean isRetryPolicyNotSet(
799
      PollActivityTaskQueueResponseOrBuilder pollActivityTask) {
800
    return !pollActivityTask.hasScheduleToCloseTimeout()
1✔
801
        && (!pollActivityTask.hasRetryPolicy()
1✔
802
            || pollActivityTask.getRetryPolicy().getMaximumAttempts() <= 0);
1✔
803
  }
804

805
  private static boolean isNonRetryableApplicationFailure(@Nullable Throwable executionThrowable) {
806
    return executionThrowable instanceof ApplicationFailure
1✔
807
        && ((ApplicationFailure) executionThrowable).isNonRetryable();
1✔
808
  }
809

810
  private static class RetryDecision {
811
    private final @Nullable RetryState retryState;
812
    private final @Nullable Duration nextAttemptBackoff;
813

814
    // No next local attempts
815
    public RetryDecision(@Nonnull RetryState retryState, @Nullable Duration nextAttemptBackoff) {
1✔
816
      this.retryState = retryState;
1✔
817
      this.nextAttemptBackoff = nextAttemptBackoff;
1✔
818
    }
1✔
819

820
    // Do the next attempt
821
    public RetryDecision(@Nonnull Duration nextAttemptBackoff) {
1✔
822
      this.retryState = null;
1✔
823
      this.nextAttemptBackoff = Objects.requireNonNull(nextAttemptBackoff);
1✔
824
    }
1✔
825

826
    public boolean doNextAttempt() {
827
      return retryState == null;
1✔
828
    }
829

830
    public boolean failWorkflowTask() {
831
      return RetryState.RETRY_STATE_INTERNAL_SERVER_ERROR.equals(retryState);
1✔
832
    }
833
  }
834
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc