• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #160

pending completion
#160

push

github-actions

web-flow
Wait for worker slots to be fully released in the graceful worker shutdown (#1679)

95 of 95 new or added lines in 5 files covered. (100.0%)

17058 of 20907 relevant lines covered (81.59%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.24
/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.ByteString;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.tally.Stopwatch;
28
import com.uber.m3.util.Duration;
29
import com.uber.m3.util.ImmutableMap;
30
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.workflowservice.v1.*;
33
import io.temporal.internal.common.ProtobufTimeUtils;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.internal.worker.ActivityTaskHandler.Result;
37
import io.temporal.serviceclient.MetricsTag;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
40
import io.temporal.worker.MetricsType;
41
import io.temporal.worker.WorkerMetricsTag;
42
import java.util.Objects;
43
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.Semaphore;
45
import java.util.concurrent.TimeUnit;
46
import javax.annotation.Nonnull;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49
import org.slf4j.MDC;
50

51
final class ActivityWorker implements SuspendableWorker {
52
  private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
53

54
  private SuspendableWorker poller = new NoopWorker();
55
  private PollTaskExecutor<ActivityTask> pollTaskExecutor;
56

57
  private final ActivityTaskHandler handler;
58
  private final WorkflowServiceStubs service;
59
  private final String namespace;
60
  private final String taskQueue;
61
  private final SingleWorkerOptions options;
62
  private final double taskQueueActivitiesPerSecond;
63
  private final PollerOptions pollerOptions;
64
  private final Scope workerMetricsScope;
65
  private final GrpcRetryer grpcRetryer;
66
  private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
67
  private final int executorSlots;
68
  private final Semaphore executorSlotsSemaphore;
69

70
  public ActivityWorker(
71
      @Nonnull WorkflowServiceStubs service,
72
      @Nonnull String namespace,
73
      @Nonnull String taskQueue,
74
      double taskQueueActivitiesPerSecond,
75
      @Nonnull SingleWorkerOptions options,
76
      @Nonnull ActivityTaskHandler handler) {
77
    this.service = Objects.requireNonNull(service);
78
    this.namespace = Objects.requireNonNull(namespace);
79
    this.taskQueue = Objects.requireNonNull(taskQueue);
80
    this.handler = Objects.requireNonNull(handler);
81
    this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
82
    this.options = Objects.requireNonNull(options);
83
    this.pollerOptions = getPollerOptions(options);
84
    this.workerMetricsScope =
85
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.ACTIVITY_WORKER);
86
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
87
    this.replyGrpcRetryerOptions =
88
        new GrpcRetryer.GrpcRetryerOptions(
89
            DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
90
    this.executorSlots = options.getTaskExecutorThreadPoolSize();
91
    this.executorSlotsSemaphore = new Semaphore(executorSlots);
92
  }
93

94
  @Override
95
  public boolean start() {
96
    if (handler.isAnyTypeSupported()) {
97
      this.pollTaskExecutor =
98
          new PollTaskExecutor<>(
99
              namespace,
100
              taskQueue,
101
              options.getIdentity(),
102
              new TaskHandlerImpl(handler),
103
              pollerOptions,
104
              options.getTaskExecutorThreadPoolSize(),
105
              workerMetricsScope,
106
              true);
107
      poller =
108
          new Poller<>(
109
              options.getIdentity(),
110
              new ActivityPollTask(
111
                  service,
112
                  namespace,
113
                  taskQueue,
114
                  options.getIdentity(),
115
                  taskQueueActivitiesPerSecond,
116
                  executorSlotsSemaphore,
117
                  workerMetricsScope),
118
              this.pollTaskExecutor,
119
              pollerOptions,
120
              workerMetricsScope);
121
      poller.start();
122
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
123
      return true;
124
    } else {
125
      return false;
126
    }
127
  }
128

129
  @Override
130
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
131
    String semaphoreName = this + "#executorSlotsSemaphore";
132
    return poller
133
        .shutdown(shutdownManager, interruptTasks)
134
        .thenCompose(
135
            ignore ->
136
                !interruptTasks
137
                    ? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
138
                        executorSlotsSemaphore, executorSlots, semaphoreName)
139
                    : CompletableFuture.completedFuture(null))
140
        .thenCompose(
141
            ignore ->
142
                pollTaskExecutor != null
143
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
144
                    : CompletableFuture.completedFuture(null))
145
        .exceptionally(
146
            e -> {
147
              log.error("Unexpected exception during shutdown", e);
148
              return null;
149
            });
150
  }
151

152
  @Override
153
  public void awaitTermination(long timeout, TimeUnit unit) {
154
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
155
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
156
    // wait separately for intermediate steps
157
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
158
  }
159

160
  @Override
161
  public void suspendPolling() {
162
    poller.suspendPolling();
163
  }
164

165
  @Override
166
  public void resumePolling() {
167
    poller.resumePolling();
168
  }
169

170
  @Override
171
  public boolean isShutdown() {
172
    return poller.isShutdown();
173
  }
174

175
  @Override
176
  public boolean isTerminated() {
177
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
178
  }
179

180
  @Override
181
  public boolean isSuspended() {
182
    return poller.isSuspended();
183
  }
184

185
  @Override
186
  public WorkerLifecycleState getLifecycleState() {
187
    return poller.getLifecycleState();
188
  }
189

190
  public EagerActivityDispatcher getEagerActivityDispatcher() {
191
    return new EagerActivityDispatcherImpl();
192
  }
193

194
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
195
    PollerOptions pollerOptions = options.getPollerOptions();
196
    if (pollerOptions.getPollThreadNamePrefix() == null) {
197
      pollerOptions =
198
          PollerOptions.newBuilder(pollerOptions)
199
              .setPollThreadNamePrefix(
200
                  WorkerThreadsNameHelper.getActivityPollerThreadPrefix(namespace, taskQueue))
201
              .build();
202
    }
203
    return pollerOptions;
204
  }
205

206
  @Override
207
  public String toString() {
208
    return String.format(
209
        "ActivityWorker{identity=%s, namespace=%s, taskQueue=%s}",
210
        options.getIdentity(), namespace, taskQueue);
211
  }
212

213
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
214

215
    final ActivityTaskHandler handler;
216

217
    private TaskHandlerImpl(ActivityTaskHandler handler) {
218
      this.handler = handler;
219
    }
220

221
    @Override
222
    public void handle(ActivityTask task) throws Exception {
223
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
224
      Scope metricsScope =
225
          workerMetricsScope.tagged(
226
              ImmutableMap.of(
227
                  MetricsTag.ACTIVITY_TYPE,
228
                  pollResponse.getActivityType().getName(),
229
                  MetricsTag.WORKFLOW_TYPE,
230
                  pollResponse.getWorkflowType().getName()));
231

232
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
233
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
234
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
235
      MDC.put(LoggerTag.WORKFLOW_TYPE, pollResponse.getWorkflowType().getName());
236
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
237

238
      ActivityTaskHandler.Result result = null;
239
      try {
240
        result = handleActivity(task, metricsScope);
241
      } finally {
242
        MDC.remove(LoggerTag.ACTIVITY_ID);
243
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
244
        MDC.remove(LoggerTag.WORKFLOW_ID);
245
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
246
        MDC.remove(LoggerTag.RUN_ID);
247
        if (
248
        // handleActivity throw an exception (not a normal scenario)
249
        result == null
250
            // completed synchronously or manual completion hasn't been requested
251
            || !result.isManualCompletion()) {
252
          task.getCompletionCallback().apply();
253
        }
254
      }
255

256
      if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
257
        // don't just swallow Errors, we need to propagate it to the top
258
        throw (Error) result.getTaskFailed().getFailure();
259
      }
260
    }
261

262
    private ActivityTaskHandler.Result handleActivity(ActivityTask task, Scope metricsScope) {
263
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
264
      ByteString taskToken = pollResponse.getTaskToken();
265
      metricsScope
266
          .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
267
          .record(
268
              ProtobufTimeUtils.toM3Duration(
269
                  pollResponse.getStartedTime(), pollResponse.getCurrentAttemptScheduledTime()));
270

271
      ActivityTaskHandler.Result result;
272

273
      Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
274
      try {
275
        result = handler.handle(task, metricsScope, false);
276
      } catch (Throwable ex) {
277
        // handler.handle if expected to never throw an exception and return result
278
        // that can be used for a workflow callback if this method throws, it's a bug.
279
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
280
        throw ex;
281
      } finally {
282
        sw.stop();
283
      }
284

285
      try {
286
        sendReply(taskToken, result, metricsScope);
287
      } catch (Exception e) {
288
        logExceptionDuringResultReporting(e, pollResponse, result);
289
        // TODO this class doesn't report activity success and failure metrics now, instead it's
290
        //  located inside an activity handler. We should lift it up to this level,
291
        //  so we can increment a failure counter instead of success if send result failed.
292
        //  This will also align the behavior of ActivityWorker with WorkflowWorker.
293
        throw e;
294
      }
295

296
      if (result.getTaskCompleted() != null) {
297
        Duration e2eDuration =
298
            ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getScheduledTime());
299
        metricsScope.timer(MetricsType.ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
300
      }
301

302
      return result;
303
    }
304

305
    @Override
306
    public Throwable wrapFailure(ActivityTask t, Throwable failure) {
307
      PollActivityTaskQueueResponseOrBuilder response = t.getResponse();
308
      WorkflowExecution execution = response.getWorkflowExecution();
309
      return new RuntimeException(
310
          "Failure processing activity response. WorkflowId="
311
              + execution.getWorkflowId()
312
              + ", RunId="
313
              + execution.getRunId()
314
              + ", ActivityType="
315
              + response.getActivityType().getName()
316
              + ", ActivityId="
317
              + response.getActivityId(),
318
          failure);
319
    }
320

321
    private void sendReply(
322
        ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
323
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
324
      if (taskCompleted != null) {
325
        RespondActivityTaskCompletedRequest request =
326
            taskCompleted.toBuilder()
327
                .setTaskToken(taskToken)
328
                .setIdentity(options.getIdentity())
329
                .setNamespace(namespace)
330
                .build();
331

332
        grpcRetryer.retry(
333
            () ->
334
                service
335
                    .blockingStub()
336
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
337
                    .respondActivityTaskCompleted(request),
338
            replyGrpcRetryerOptions);
339
      } else {
340
        Result.TaskFailedResult taskFailed = response.getTaskFailed();
341
        if (taskFailed != null) {
342
          RespondActivityTaskFailedRequest request =
343
              taskFailed.getTaskFailedRequest().toBuilder()
344
                  .setTaskToken(taskToken)
345
                  .setIdentity(options.getIdentity())
346
                  .setNamespace(namespace)
347
                  .build();
348

349
          grpcRetryer.retry(
350
              () ->
351
                  service
352
                      .blockingStub()
353
                      .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
354
                      .respondActivityTaskFailed(request),
355
              replyGrpcRetryerOptions);
356
        } else {
357
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
358
          if (taskCanceled != null) {
359
            RespondActivityTaskCanceledRequest request =
360
                taskCanceled.toBuilder()
361
                    .setTaskToken(taskToken)
362
                    .setIdentity(options.getIdentity())
363
                    .setNamespace(namespace)
364
                    .build();
365

366
            grpcRetryer.retry(
367
                () ->
368
                    service
369
                        .blockingStub()
370
                        .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
371
                        .respondActivityTaskCanceled(request),
372
                replyGrpcRetryerOptions);
373
          }
374
        }
375
      }
376
      // Manual activity completion
377
    }
378

379
    private void logExceptionDuringResultReporting(
380
        Exception e,
381
        PollActivityTaskQueueResponseOrBuilder pollResponse,
382
        ActivityTaskHandler.Result result) {
383
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
384
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
385
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
386
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
387

388
      if (log.isDebugEnabled()) {
389
        log.debug(
390
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}, ActivityResult={}",
391
            pollResponse.getActivityId(),
392
            pollResponse.getActivityType().getName(),
393
            pollResponse.getWorkflowExecution().getWorkflowId(),
394
            pollResponse.getWorkflowType().getName(),
395
            pollResponse.getWorkflowExecution().getRunId(),
396
            result,
397
            e);
398
      } else {
399
        log.warn(
400
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}",
401
            pollResponse.getActivityId(),
402
            pollResponse.getActivityType().getName(),
403
            pollResponse.getWorkflowExecution().getWorkflowId(),
404
            pollResponse.getWorkflowType().getName(),
405
            pollResponse.getWorkflowExecution().getRunId(),
406
            e);
407
      }
408
    }
409
  }
410

411
  private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
412
    @Override
413
    public boolean tryReserveActivitySlot(
414
        ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
415
      return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
416
          && Objects.equals(
417
              commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
418
          && ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
419
    }
420

421
    @Override
422
    public void releaseActivitySlotReservations(int slotCounts) {
423
      ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
424
    }
425

426
    @Override
427
    public void dispatchActivity(PollActivityTaskQueueResponse activity) {
428
      ActivityWorker.this.pollTaskExecutor.process(
429
          new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
430
    }
431
  }
432
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc