• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.9
/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.ByteString;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.tally.Stopwatch;
28
import com.uber.m3.util.Duration;
29
import com.uber.m3.util.ImmutableMap;
30
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.workflowservice.v1.*;
33
import io.temporal.internal.activity.ActivityPollResponseToInfo;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.internal.logging.LoggerTag;
36
import io.temporal.internal.retryer.GrpcRetryer;
37
import io.temporal.internal.worker.ActivityTaskHandler.Result;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.WorkflowServiceStubs;
40
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.tuning.*;
44
import java.util.Objects;
45
import java.util.Optional;
46
import java.util.concurrent.CompletableFuture;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nonnull;
49
import org.slf4j.Logger;
50
import org.slf4j.LoggerFactory;
51
import org.slf4j.MDC;
52

53
final class ActivityWorker implements SuspendableWorker {
54
  private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
1✔
55

56
  private SuspendableWorker poller = new NoopWorker();
1✔
57
  private PollTaskExecutor<ActivityTask> pollTaskExecutor;
58

59
  private final ActivityTaskHandler handler;
60
  private final WorkflowServiceStubs service;
61
  private final String namespace;
62
  private final String taskQueue;
63
  private final SingleWorkerOptions options;
64
  private final double taskQueueActivitiesPerSecond;
65
  private final PollerOptions pollerOptions;
66
  private final Scope workerMetricsScope;
67
  private final GrpcRetryer grpcRetryer;
68
  private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
69
  private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
70

71
  public ActivityWorker(
72
      @Nonnull WorkflowServiceStubs service,
73
      @Nonnull String namespace,
74
      @Nonnull String taskQueue,
75
      double taskQueueActivitiesPerSecond,
76
      @Nonnull SingleWorkerOptions options,
77
      @Nonnull ActivityTaskHandler handler,
78
      @Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier) {
1✔
79
    this.service = Objects.requireNonNull(service);
1✔
80
    this.namespace = Objects.requireNonNull(namespace);
1✔
81
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
82
    this.handler = Objects.requireNonNull(handler);
1✔
83
    this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
1✔
84
    this.options = Objects.requireNonNull(options);
1✔
85
    this.pollerOptions = getPollerOptions(options);
1✔
86
    this.workerMetricsScope =
1✔
87
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.ACTIVITY_WORKER);
1✔
88
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
89
    this.replyGrpcRetryerOptions =
1✔
90
        new GrpcRetryer.GrpcRetryerOptions(
91
            DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
92

93
    this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
1✔
94
  }
1✔
95

96
  @Override
97
  public boolean start() {
98
    if (handler.isAnyTypeSupported()) {
1✔
99
      this.pollTaskExecutor =
1✔
100
          new PollTaskExecutor<>(
101
              namespace,
102
              taskQueue,
103
              options.getIdentity(),
1✔
104
              new TaskHandlerImpl(handler),
105
              pollerOptions,
106
              slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
1✔
107
              true);
108
      poller =
1✔
109
          new Poller<>(
110
              options.getIdentity(),
1✔
111
              new ActivityPollTask(
112
                  service,
113
                  namespace,
114
                  taskQueue,
115
                  options.getIdentity(),
1✔
116
                  options.getBuildId(),
1✔
117
                  options.isUsingBuildIdForVersioning(),
1✔
118
                  taskQueueActivitiesPerSecond,
119
                  this.slotSupplier,
120
                  workerMetricsScope,
121
                  service.getServerCapabilities()),
1✔
122
              this.pollTaskExecutor,
123
              pollerOptions,
124
              workerMetricsScope);
125
      poller.start();
1✔
126
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
127
      return true;
1✔
128
    } else {
129
      return false;
1✔
130
    }
131
  }
132

133
  @Override
134
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
135
    String supplierName = this + "#executorSlots";
1✔
136
    return poller
1✔
137
        .shutdown(shutdownManager, interruptTasks)
1✔
138
        .thenCompose(
1✔
139
            ignore ->
140
                !interruptTasks
1✔
141
                    ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
1✔
142
                        slotSupplier, supplierName)
143
                    : CompletableFuture.completedFuture(null))
1✔
144
        .thenCompose(
1✔
145
            ignore ->
146
                pollTaskExecutor != null
1✔
147
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
148
                    : CompletableFuture.completedFuture(null))
1✔
149
        .exceptionally(
1✔
150
            e -> {
151
              log.error("Unexpected exception during shutdown", e);
×
152
              return null;
×
153
            });
154
  }
155

156
  @Override
157
  public void awaitTermination(long timeout, TimeUnit unit) {
158
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
159
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
160
    // wait separately for intermediate steps
161
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
162
  }
1✔
163

164
  @Override
165
  public void suspendPolling() {
166
    poller.suspendPolling();
1✔
167
  }
1✔
168

169
  @Override
170
  public void resumePolling() {
171
    poller.resumePolling();
1✔
172
  }
1✔
173

174
  @Override
175
  public boolean isShutdown() {
176
    return poller.isShutdown();
×
177
  }
178

179
  @Override
180
  public boolean isTerminated() {
181
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
182
  }
183

184
  @Override
185
  public boolean isSuspended() {
186
    return poller.isSuspended();
1✔
187
  }
188

189
  @Override
190
  public WorkerLifecycleState getLifecycleState() {
191
    return poller.getLifecycleState();
1✔
192
  }
193

194
  public EagerActivityDispatcher getEagerActivityDispatcher() {
195
    return new EagerActivityDispatcherImpl();
1✔
196
  }
197

198
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
199
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
200
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
201
      pollerOptions =
1✔
202
          PollerOptions.newBuilder(pollerOptions)
1✔
203
              .setPollThreadNamePrefix(
1✔
204
                  WorkerThreadsNameHelper.getActivityPollerThreadPrefix(namespace, taskQueue))
1✔
205
              .build();
1✔
206
    }
207
    return pollerOptions;
1✔
208
  }
209

210
  @Override
211
  public String toString() {
212
    return String.format(
1✔
213
        "ActivityWorker{identity=%s, namespace=%s, taskQueue=%s}",
214
        options.getIdentity(), namespace, taskQueue);
1✔
215
  }
216

217
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
218

219
    final ActivityTaskHandler handler;
220

221
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
222
      this.handler = handler;
1✔
223
    }
1✔
224

225
    @Override
226
    public void handle(ActivityTask task) throws Exception {
227
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
228

229
      slotSupplier.markSlotUsed(
1✔
230
          new ActivitySlotInfo(
231
              ActivityPollResponseToInfo.toActivityInfoImpl(
1✔
232
                  pollResponse, namespace, taskQueue, false),
1✔
233
              options.getIdentity(),
1✔
234
              options.getBuildId()),
1✔
235
          task.getPermit());
1✔
236

237
      Scope metricsScope =
1✔
238
          workerMetricsScope.tagged(
1✔
239
              ImmutableMap.of(
1✔
240
                  MetricsTag.ACTIVITY_TYPE,
241
                  pollResponse.getActivityType().getName(),
1✔
242
                  MetricsTag.WORKFLOW_TYPE,
243
                  pollResponse.getWorkflowType().getName()));
1✔
244

245
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
246
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
247
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
248
      MDC.put(LoggerTag.WORKFLOW_TYPE, pollResponse.getWorkflowType().getName());
1✔
249
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
250

251
      ActivityTaskHandler.Result result = null;
1✔
252
      try {
253
        result = handleActivity(task, metricsScope);
1✔
254
      } finally {
255
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
256
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
257
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
258
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
259
        MDC.remove(LoggerTag.RUN_ID);
1✔
260
        if (
1✔
261
        // handleActivity throw an exception (not a normal scenario)
262
        result == null
263
            // completed synchronously or manual completion hasn't been requested
264
            || !result.isManualCompletion()) {
1✔
265
          task.getCompletionCallback().apply();
1✔
266
        }
267
      }
268

269
      if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
1✔
270
        // don't just swallow Errors, we need to propagate it to the top
271
        throw (Error) result.getTaskFailed().getFailure();
×
272
      }
273
    }
1✔
274

275
    private ActivityTaskHandler.Result handleActivity(ActivityTask task, Scope metricsScope) {
276
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
277
      ByteString taskToken = pollResponse.getTaskToken();
1✔
278
      metricsScope
1✔
279
          .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
1✔
280
          .record(
1✔
281
              ProtobufTimeUtils.toM3Duration(
1✔
282
                  pollResponse.getStartedTime(), pollResponse.getCurrentAttemptScheduledTime()));
1✔
283

284
      ActivityTaskHandler.Result result;
285

286
      Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
1✔
287
      try {
288
        result = handler.handle(task, metricsScope, false);
1✔
289
      } catch (Throwable ex) {
×
290
        // handler.handle if expected to never throw an exception and return result
291
        // that can be used for a workflow callback if this method throws, it's a bug.
292
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
×
293
        throw ex;
×
294
      } finally {
295
        sw.stop();
1✔
296
      }
297

298
      try {
299
        sendReply(taskToken, result, metricsScope);
1✔
300
      } catch (Exception e) {
1✔
301
        logExceptionDuringResultReporting(e, pollResponse, result);
1✔
302
        // TODO this class doesn't report activity success and failure metrics now, instead it's
303
        //  located inside an activity handler. We should lift it up to this level,
304
        //  so we can increment a failure counter instead of success if send result failed.
305
        //  This will also align the behavior of ActivityWorker with WorkflowWorker.
306
        throw e;
1✔
307
      }
1✔
308

309
      if (result.getTaskCompleted() != null) {
1✔
310
        Duration e2eDuration =
1✔
311
            ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getScheduledTime());
1✔
312
        metricsScope.timer(MetricsType.ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
313
      }
314

315
      return result;
1✔
316
    }
317

318
    @Override
319
    public Throwable wrapFailure(ActivityTask t, Throwable failure) {
320
      PollActivityTaskQueueResponseOrBuilder response = t.getResponse();
1✔
321
      WorkflowExecution execution = response.getWorkflowExecution();
1✔
322
      return new RuntimeException(
1✔
323
          "Failure processing activity response. WorkflowId="
324
              + execution.getWorkflowId()
1✔
325
              + ", RunId="
326
              + execution.getRunId()
1✔
327
              + ", ActivityType="
328
              + response.getActivityType().getName()
1✔
329
              + ", ActivityId="
330
              + response.getActivityId(),
1✔
331
          failure);
332
    }
333

334
    private void sendReply(
335
        ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
336
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
337
      if (taskCompleted != null) {
1✔
338
        RespondActivityTaskCompletedRequest request =
1✔
339
            taskCompleted.toBuilder()
1✔
340
                .setTaskToken(taskToken)
1✔
341
                .setIdentity(options.getIdentity())
1✔
342
                .setNamespace(namespace)
1✔
343
                .setWorkerVersion(options.workerVersionStamp())
1✔
344
                .build();
1✔
345

346
        grpcRetryer.retry(
1✔
347
            () ->
348
                service
1✔
349
                    .blockingStub()
1✔
350
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
351
                    .respondActivityTaskCompleted(request),
1✔
352
            replyGrpcRetryerOptions);
1✔
353
      } else {
1✔
354
        Result.TaskFailedResult taskFailed = response.getTaskFailed();
1✔
355
        if (taskFailed != null) {
1✔
356
          RespondActivityTaskFailedRequest request =
1✔
357
              taskFailed.getTaskFailedRequest().toBuilder()
1✔
358
                  .setTaskToken(taskToken)
1✔
359
                  .setIdentity(options.getIdentity())
1✔
360
                  .setNamespace(namespace)
1✔
361
                  .setWorkerVersion(options.workerVersionStamp())
1✔
362
                  .build();
1✔
363

364
          grpcRetryer.retry(
1✔
365
              () ->
366
                  service
1✔
367
                      .blockingStub()
1✔
368
                      .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
369
                      .respondActivityTaskFailed(request),
1✔
370
              replyGrpcRetryerOptions);
1✔
371
        } else {
1✔
372
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
1✔
373
          if (taskCanceled != null) {
1✔
374
            RespondActivityTaskCanceledRequest request =
1✔
375
                taskCanceled.toBuilder()
1✔
376
                    .setTaskToken(taskToken)
1✔
377
                    .setIdentity(options.getIdentity())
1✔
378
                    .setNamespace(namespace)
1✔
379
                    .setWorkerVersion(options.workerVersionStamp())
1✔
380
                    .build();
1✔
381

382
            grpcRetryer.retry(
1✔
383
                () ->
384
                    service
1✔
385
                        .blockingStub()
1✔
386
                        .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
387
                        .respondActivityTaskCanceled(request),
1✔
388
                replyGrpcRetryerOptions);
1✔
389
          }
390
        }
391
      }
392
      // Manual activity completion
393
    }
1✔
394

395
    private void logExceptionDuringResultReporting(
396
        Exception e,
397
        PollActivityTaskQueueResponseOrBuilder pollResponse,
398
        ActivityTaskHandler.Result result) {
399
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
400
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
401
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
402
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
403

404
      if (log.isDebugEnabled()) {
1✔
405
        log.debug(
×
406
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}, ActivityResult={}",
407
            pollResponse.getActivityId(),
×
408
            pollResponse.getActivityType().getName(),
×
409
            pollResponse.getWorkflowExecution().getWorkflowId(),
×
410
            pollResponse.getWorkflowType().getName(),
×
411
            pollResponse.getWorkflowExecution().getRunId(),
×
412
            result,
413
            e);
414
      } else {
415
        log.warn(
1✔
416
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}",
417
            pollResponse.getActivityId(),
1✔
418
            pollResponse.getActivityType().getName(),
1✔
419
            pollResponse.getWorkflowExecution().getWorkflowId(),
1✔
420
            pollResponse.getWorkflowType().getName(),
1✔
421
            pollResponse.getWorkflowExecution().getRunId(),
1✔
422
            e);
423
      }
424
    }
1✔
425
  }
426

427
  private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
1✔
428
    @Override
429
    public Optional<SlotPermit> tryReserveActivitySlot(
430
        ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
431
      if (!WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
1✔
432
          || !Objects.equals(
1✔
433
              commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)) {
1✔
434
        return Optional.empty();
1✔
435
      }
436
      return ActivityWorker.this.slotSupplier.tryReserveSlot(
1✔
437
          new SlotReservationData(
438
              ActivityWorker.this.taskQueue, options.getIdentity(), options.getBuildId()));
1✔
439
    }
440

441
    @Override
442
    public void releaseActivitySlotReservations(Iterable<SlotPermit> permits) {
443
      for (SlotPermit permit : permits) {
1✔
444
        ActivityWorker.this.slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
1✔
445
      }
1✔
446
    }
1✔
447

448
    @Override
449
    public void dispatchActivity(PollActivityTaskQueueResponse activity, SlotPermit permit) {
450
      ActivityWorker.this.pollTaskExecutor.process(
×
451
          new ActivityTask(
452
              activity,
453
              permit,
454
              () ->
NEW
455
                  ActivityWorker.this.slotSupplier.releaseSlot(
×
NEW
456
                      SlotReleaseReason.taskComplete(), permit)));
×
UNCOV
457
    }
×
458
  }
459
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc