• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #278

08 Jul 2024 04:42PM UTC coverage: 77.565% (+0.1%) from 77.469%
#278

push

github

web-flow
Revert configurable slot provider (#2134)

* Revert "Resource based tuner (#2110)"

This reverts commit 8a2d5cdcc.

* Revert "Slot supplier interface & fixed-size implementation (#2014)"

This reverts commit d2a06fc6f.

* Fix merge conflict

* Keep Publish Test Report step

* Add tests for worker slots

* Fix white space

* One other whitespace change

117 of 133 new or added lines in 17 files covered. (87.97%)

5 existing lines in 5 files now uncovered.

19088 of 24609 relevant lines covered (77.57%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.ByteString;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.tally.Stopwatch;
28
import com.uber.m3.util.Duration;
29
import com.uber.m3.util.ImmutableMap;
30
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.workflowservice.v1.*;
33
import io.temporal.internal.common.ProtobufTimeUtils;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.internal.worker.ActivityTaskHandler.Result;
37
import io.temporal.serviceclient.MetricsTag;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
40
import io.temporal.worker.MetricsType;
41
import io.temporal.worker.WorkerMetricsTag;
42
import java.util.Objects;
43
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.Semaphore;
45
import java.util.concurrent.TimeUnit;
46
import javax.annotation.Nonnull;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49
import org.slf4j.MDC;
50

51
final class ActivityWorker implements SuspendableWorker {
52
  private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
1✔
53

54
  private SuspendableWorker poller = new NoopWorker();
1✔
55
  private PollTaskExecutor<ActivityTask> pollTaskExecutor;
56

57
  private final ActivityTaskHandler handler;
58
  private final WorkflowServiceStubs service;
59
  private final String namespace;
60
  private final String taskQueue;
61
  private final SingleWorkerOptions options;
62
  private final double taskQueueActivitiesPerSecond;
63
  private final PollerOptions pollerOptions;
64
  private final Scope workerMetricsScope;
65
  private final GrpcRetryer grpcRetryer;
66
  private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
67
  private final int executorSlots;
68
  private final Semaphore executorSlotsSemaphore;
69

70
  public ActivityWorker(
71
      @Nonnull WorkflowServiceStubs service,
72
      @Nonnull String namespace,
73
      @Nonnull String taskQueue,
74
      double taskQueueActivitiesPerSecond,
75
      @Nonnull SingleWorkerOptions options,
76
      @Nonnull ActivityTaskHandler handler) {
1✔
77
    this.service = Objects.requireNonNull(service);
1✔
78
    this.namespace = Objects.requireNonNull(namespace);
1✔
79
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
80
    this.handler = Objects.requireNonNull(handler);
1✔
81
    this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
1✔
82
    this.options = Objects.requireNonNull(options);
1✔
83
    this.pollerOptions = getPollerOptions(options);
1✔
84
    this.workerMetricsScope =
1✔
85
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.ACTIVITY_WORKER);
1✔
86
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
87
    this.replyGrpcRetryerOptions =
1✔
88
        new GrpcRetryer.GrpcRetryerOptions(
89
            DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
90
    this.executorSlots = options.getTaskExecutorThreadPoolSize();
1✔
91
    this.executorSlotsSemaphore = new Semaphore(executorSlots);
1✔
92
  }
1✔
93

94
  @Override
95
  public boolean start() {
96
    if (handler.isAnyTypeSupported()) {
1✔
97
      this.pollTaskExecutor =
1✔
98
          new PollTaskExecutor<>(
99
              namespace,
100
              taskQueue,
101
              options.getIdentity(),
1✔
102
              new TaskHandlerImpl(handler),
103
              pollerOptions,
104
              options.getTaskExecutorThreadPoolSize(),
1✔
105
              workerMetricsScope,
106
              true);
107
      poller =
1✔
108
          new Poller<>(
109
              options.getIdentity(),
1✔
110
              new ActivityPollTask(
111
                  service,
112
                  namespace,
113
                  taskQueue,
114
                  options.getIdentity(),
1✔
115
                  options.getBuildId(),
1✔
116
                  options.isUsingBuildIdForVersioning(),
1✔
117
                  taskQueueActivitiesPerSecond,
118
                  executorSlotsSemaphore,
119
                  workerMetricsScope,
120
                  service.getServerCapabilities()),
1✔
121
              this.pollTaskExecutor,
122
              pollerOptions,
123
              workerMetricsScope);
124
      poller.start();
1✔
125
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
126
      return true;
1✔
127
    } else {
128
      return false;
1✔
129
    }
130
  }
131

132
  @Override
133
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
134
    String semaphoreName = this + "#executorSlotsSemaphore";
1✔
135
    return poller
1✔
136
        .shutdown(shutdownManager, interruptTasks)
1✔
137
        .thenCompose(
1✔
138
            ignore ->
139
                !interruptTasks
1✔
140
                    ? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
1✔
141
                        executorSlotsSemaphore, executorSlots, semaphoreName)
142
                    : CompletableFuture.completedFuture(null))
1✔
143
        .thenCompose(
1✔
144
            ignore ->
145
                pollTaskExecutor != null
1✔
146
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
147
                    : CompletableFuture.completedFuture(null))
1✔
148
        .exceptionally(
1✔
149
            e -> {
150
              log.error("Unexpected exception during shutdown", e);
×
151
              return null;
×
152
            });
153
  }
154

155
  @Override
156
  public void awaitTermination(long timeout, TimeUnit unit) {
157
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
158
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
159
    // wait separately for intermediate steps
160
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
161
  }
1✔
162

163
  @Override
164
  public void suspendPolling() {
165
    poller.suspendPolling();
1✔
166
  }
1✔
167

168
  @Override
169
  public void resumePolling() {
170
    poller.resumePolling();
1✔
171
  }
1✔
172

173
  @Override
174
  public boolean isShutdown() {
175
    return poller.isShutdown();
×
176
  }
177

178
  @Override
179
  public boolean isTerminated() {
180
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
181
  }
182

183
  @Override
184
  public boolean isSuspended() {
185
    return poller.isSuspended();
1✔
186
  }
187

188
  @Override
189
  public WorkerLifecycleState getLifecycleState() {
190
    return poller.getLifecycleState();
1✔
191
  }
192

193
  public EagerActivityDispatcher getEagerActivityDispatcher() {
194
    return new EagerActivityDispatcherImpl();
1✔
195
  }
196

197
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
198
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
199
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
200
      pollerOptions =
1✔
201
          PollerOptions.newBuilder(pollerOptions)
1✔
202
              .setPollThreadNamePrefix(
1✔
203
                  WorkerThreadsNameHelper.getActivityPollerThreadPrefix(namespace, taskQueue))
1✔
204
              .build();
1✔
205
    }
206
    return pollerOptions;
1✔
207
  }
208

209
  @Override
210
  public String toString() {
211
    return String.format(
1✔
212
        "ActivityWorker{identity=%s, namespace=%s, taskQueue=%s}",
213
        options.getIdentity(), namespace, taskQueue);
1✔
214
  }
215

216
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
217

218
    final ActivityTaskHandler handler;
219

220
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
221
      this.handler = handler;
1✔
222
    }
1✔
223

224
    @Override
225
    public void handle(ActivityTask task) throws Exception {
226
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
227
      Scope metricsScope =
1✔
228
          workerMetricsScope.tagged(
1✔
229
              ImmutableMap.of(
1✔
230
                  MetricsTag.ACTIVITY_TYPE,
231
                  pollResponse.getActivityType().getName(),
1✔
232
                  MetricsTag.WORKFLOW_TYPE,
233
                  pollResponse.getWorkflowType().getName()));
1✔
234

235
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
236
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
237
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
238
      MDC.put(LoggerTag.WORKFLOW_TYPE, pollResponse.getWorkflowType().getName());
1✔
239
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
240

241
      ActivityTaskHandler.Result result = null;
1✔
242
      try {
243
        result = handleActivity(task, metricsScope);
1✔
244
      } finally {
245
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
246
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
247
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
248
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
249
        MDC.remove(LoggerTag.RUN_ID);
1✔
250
        if (
1✔
251
        // handleActivity throw an exception (not a normal scenario)
252
        result == null
253
            // completed synchronously or manual completion hasn't been requested
254
            || !result.isManualCompletion()) {
1✔
255
          task.getCompletionCallback().apply();
1✔
256
        }
257
      }
258

259
      if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
1✔
260
        // don't just swallow Errors, we need to propagate it to the top
261
        throw (Error) result.getTaskFailed().getFailure();
×
262
      }
263
    }
1✔
264

265
    private ActivityTaskHandler.Result handleActivity(ActivityTask task, Scope metricsScope) {
266
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
267
      ByteString taskToken = pollResponse.getTaskToken();
1✔
268
      metricsScope
1✔
269
          .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
1✔
270
          .record(
1✔
271
              ProtobufTimeUtils.toM3Duration(
1✔
272
                  pollResponse.getStartedTime(), pollResponse.getCurrentAttemptScheduledTime()));
1✔
273

274
      ActivityTaskHandler.Result result;
275

276
      Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
1✔
277
      try {
278
        result = handler.handle(task, metricsScope, false);
1✔
279
      } catch (Throwable ex) {
×
280
        // handler.handle if expected to never throw an exception and return result
281
        // that can be used for a workflow callback if this method throws, it's a bug.
282
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
×
283
        throw ex;
×
284
      } finally {
285
        sw.stop();
1✔
286
      }
287

288
      try {
289
        sendReply(taskToken, result, metricsScope);
1✔
290
      } catch (Exception e) {
1✔
291
        logExceptionDuringResultReporting(e, pollResponse, result);
1✔
292
        // TODO this class doesn't report activity success and failure metrics now, instead it's
293
        //  located inside an activity handler. We should lift it up to this level,
294
        //  so we can increment a failure counter instead of success if send result failed.
295
        //  This will also align the behavior of ActivityWorker with WorkflowWorker.
296
        throw e;
1✔
297
      }
1✔
298

299
      if (result.getTaskCompleted() != null) {
1✔
300
        Duration e2eDuration =
1✔
301
            ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getScheduledTime());
1✔
302
        metricsScope.timer(MetricsType.ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
303
      }
304

305
      return result;
1✔
306
    }
307

308
    @Override
309
    public Throwable wrapFailure(ActivityTask t, Throwable failure) {
310
      PollActivityTaskQueueResponseOrBuilder response = t.getResponse();
1✔
311
      WorkflowExecution execution = response.getWorkflowExecution();
1✔
312
      return new RuntimeException(
1✔
313
          "Failure processing activity response. WorkflowId="
314
              + execution.getWorkflowId()
1✔
315
              + ", RunId="
316
              + execution.getRunId()
1✔
317
              + ", ActivityType="
318
              + response.getActivityType().getName()
1✔
319
              + ", ActivityId="
320
              + response.getActivityId(),
1✔
321
          failure);
322
    }
323

324
    private void sendReply(
325
        ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
326
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
327
      if (taskCompleted != null) {
1✔
328
        RespondActivityTaskCompletedRequest request =
1✔
329
            taskCompleted.toBuilder()
1✔
330
                .setTaskToken(taskToken)
1✔
331
                .setIdentity(options.getIdentity())
1✔
332
                .setNamespace(namespace)
1✔
333
                .setWorkerVersion(options.workerVersionStamp())
1✔
334
                .build();
1✔
335

336
        grpcRetryer.retry(
1✔
337
            () ->
338
                service
1✔
339
                    .blockingStub()
1✔
340
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
341
                    .respondActivityTaskCompleted(request),
1✔
342
            replyGrpcRetryerOptions);
1✔
343
      } else {
1✔
344
        Result.TaskFailedResult taskFailed = response.getTaskFailed();
1✔
345
        if (taskFailed != null) {
1✔
346
          RespondActivityTaskFailedRequest request =
1✔
347
              taskFailed.getTaskFailedRequest().toBuilder()
1✔
348
                  .setTaskToken(taskToken)
1✔
349
                  .setIdentity(options.getIdentity())
1✔
350
                  .setNamespace(namespace)
1✔
351
                  .setWorkerVersion(options.workerVersionStamp())
1✔
352
                  .build();
1✔
353

354
          grpcRetryer.retry(
1✔
355
              () ->
356
                  service
1✔
357
                      .blockingStub()
1✔
358
                      .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
359
                      .respondActivityTaskFailed(request),
1✔
360
              replyGrpcRetryerOptions);
1✔
361
        } else {
1✔
362
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
1✔
363
          if (taskCanceled != null) {
1✔
364
            RespondActivityTaskCanceledRequest request =
1✔
365
                taskCanceled.toBuilder()
1✔
366
                    .setTaskToken(taskToken)
1✔
367
                    .setIdentity(options.getIdentity())
1✔
368
                    .setNamespace(namespace)
1✔
369
                    .setWorkerVersion(options.workerVersionStamp())
1✔
370
                    .build();
1✔
371

372
            grpcRetryer.retry(
1✔
373
                () ->
374
                    service
1✔
375
                        .blockingStub()
1✔
376
                        .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
377
                        .respondActivityTaskCanceled(request),
1✔
378
                replyGrpcRetryerOptions);
1✔
379
          }
380
        }
381
      }
382
      // Manual activity completion
383
    }
1✔
384

385
    private void logExceptionDuringResultReporting(
386
        Exception e,
387
        PollActivityTaskQueueResponseOrBuilder pollResponse,
388
        ActivityTaskHandler.Result result) {
389
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
390
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
391
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
392
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
393

394
      if (log.isDebugEnabled()) {
1✔
395
        log.debug(
×
396
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}, ActivityResult={}",
397
            pollResponse.getActivityId(),
×
398
            pollResponse.getActivityType().getName(),
×
399
            pollResponse.getWorkflowExecution().getWorkflowId(),
×
400
            pollResponse.getWorkflowType().getName(),
×
401
            pollResponse.getWorkflowExecution().getRunId(),
×
402
            result,
403
            e);
404
      } else {
405
        log.warn(
1✔
406
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}",
407
            pollResponse.getActivityId(),
1✔
408
            pollResponse.getActivityType().getName(),
1✔
409
            pollResponse.getWorkflowExecution().getWorkflowId(),
1✔
410
            pollResponse.getWorkflowType().getName(),
1✔
411
            pollResponse.getWorkflowExecution().getRunId(),
1✔
412
            e);
413
      }
414
    }
1✔
415
  }
416

417
  private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
1✔
418
    @Override
419
    public boolean tryReserveActivitySlot(
420
        ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
421
      return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
1✔
422
          && Objects.equals(
1✔
423
              commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
1✔
424
          && ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
1✔
425
    }
426

427
    @Override
428
    public void releaseActivitySlotReservations(int slotCounts) {
429
      ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
1✔
430
    }
1✔
431

432
    @Override
433
    public void dispatchActivity(PollActivityTaskQueueResponse activity) {
434
      ActivityWorker.this.pollTaskExecutor.process(
×
NEW
435
          new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
×
UNCOV
436
    }
×
437
  }
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc