• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.75
/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.ByteString;
26
import com.uber.m3.tally.Scope;
27
import com.uber.m3.tally.Stopwatch;
28
import com.uber.m3.util.Duration;
29
import com.uber.m3.util.ImmutableMap;
30
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributesOrBuilder;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.workflowservice.v1.*;
33
import io.temporal.internal.common.ProtobufTimeUtils;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.internal.worker.ActivityTaskHandler.Result;
37
import io.temporal.serviceclient.MetricsTag;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
40
import io.temporal.worker.MetricsType;
41
import io.temporal.worker.WorkerMetricsTag;
42
import java.util.Objects;
43
import java.util.concurrent.CompletableFuture;
44
import java.util.concurrent.Semaphore;
45
import java.util.concurrent.TimeUnit;
46
import javax.annotation.Nonnull;
47
import org.slf4j.Logger;
48
import org.slf4j.LoggerFactory;
49
import org.slf4j.MDC;
50

51
final class ActivityWorker implements SuspendableWorker {
52
  private static final Logger log = LoggerFactory.getLogger(ActivityWorker.class);
1✔
53

54
  private SuspendableWorker poller = new NoopWorker();
1✔
55
  private PollTaskExecutor<ActivityTask> pollTaskExecutor;
56

57
  private final ActivityTaskHandler handler;
58
  private final WorkflowServiceStubs service;
59
  private final String namespace;
60
  private final String taskQueue;
61
  private final SingleWorkerOptions options;
62
  private final double taskQueueActivitiesPerSecond;
63
  private final PollerOptions pollerOptions;
64
  private final Scope workerMetricsScope;
65
  private final GrpcRetryer grpcRetryer;
66
  private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
67
  private final Semaphore executorSlotsSemaphore;
68

69
  public ActivityWorker(
70
      @Nonnull WorkflowServiceStubs service,
71
      @Nonnull String namespace,
72
      @Nonnull String taskQueue,
73
      double taskQueueActivitiesPerSecond,
74
      @Nonnull SingleWorkerOptions options,
75
      @Nonnull ActivityTaskHandler handler) {
1✔
76
    this.service = Objects.requireNonNull(service);
1✔
77
    this.namespace = Objects.requireNonNull(namespace);
1✔
78
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
79
    this.handler = Objects.requireNonNull(handler);
1✔
80
    this.taskQueueActivitiesPerSecond = taskQueueActivitiesPerSecond;
1✔
81
    this.options = Objects.requireNonNull(options);
1✔
82
    this.pollerOptions = getPollerOptions(options);
1✔
83
    this.workerMetricsScope =
1✔
84
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.ACTIVITY_WORKER);
1✔
85
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
86
    this.replyGrpcRetryerOptions =
1✔
87
        new GrpcRetryer.GrpcRetryerOptions(
88
            DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
89
    this.executorSlotsSemaphore = new Semaphore(options.getTaskExecutorThreadPoolSize());
1✔
90
  }
1✔
91

92
  @Override
93
  public boolean start() {
94
    if (handler.isAnyTypeSupported()) {
1✔
95
      this.pollTaskExecutor =
1✔
96
          new PollTaskExecutor<>(
97
              namespace,
98
              taskQueue,
99
              options.getIdentity(),
1✔
100
              new TaskHandlerImpl(handler),
101
              pollerOptions,
102
              options.getTaskExecutorThreadPoolSize(),
1✔
103
              workerMetricsScope,
104
              true);
105
      poller =
1✔
106
          new Poller<>(
107
              options.getIdentity(),
1✔
108
              new ActivityPollTask(
109
                  service,
110
                  namespace,
111
                  taskQueue,
112
                  options.getIdentity(),
1✔
113
                  taskQueueActivitiesPerSecond,
114
                  executorSlotsSemaphore,
115
                  workerMetricsScope),
116
              this.pollTaskExecutor,
117
              pollerOptions,
118
              workerMetricsScope);
119
      poller.start();
1✔
120
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
121
      return true;
1✔
122
    } else {
123
      return false;
1✔
124
    }
125
  }
126

127
  @Override
128
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
129
    return poller.shutdown(shutdownManager, interruptTasks);
1✔
130
  }
131

132
  @Override
133
  public void awaitTermination(long timeout, TimeUnit unit) {
134
    poller.awaitTermination(timeout, unit);
1✔
135
  }
1✔
136

137
  @Override
138
  public void suspendPolling() {
139
    poller.suspendPolling();
1✔
140
  }
1✔
141

142
  @Override
143
  public void resumePolling() {
144
    poller.resumePolling();
1✔
145
  }
1✔
146

147
  @Override
148
  public boolean isShutdown() {
149
    return poller.isShutdown();
×
150
  }
151

152
  @Override
153
  public boolean isTerminated() {
154
    return poller.isTerminated();
×
155
  }
156

157
  @Override
158
  public boolean isSuspended() {
159
    return poller.isSuspended();
1✔
160
  }
161

162
  @Override
163
  public WorkerLifecycleState getLifecycleState() {
164
    return poller.getLifecycleState();
1✔
165
  }
166

167
  public EagerActivityDispatcher getEagerActivityDispatcher() {
168
    return new EagerActivityDispatcherImpl();
1✔
169
  }
170

171
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
172
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
173
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
174
      pollerOptions =
1✔
175
          PollerOptions.newBuilder(pollerOptions)
1✔
176
              .setPollThreadNamePrefix(
1✔
177
                  WorkerThreadsNameHelper.getActivityPollerThreadPrefix(namespace, taskQueue))
1✔
178
              .build();
1✔
179
    }
180
    return pollerOptions;
1✔
181
  }
182

183
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<ActivityTask> {
184

185
    final ActivityTaskHandler handler;
186

187
    private TaskHandlerImpl(ActivityTaskHandler handler) {
1✔
188
      this.handler = handler;
1✔
189
    }
1✔
190

191
    @Override
192
    public void handle(ActivityTask task) throws Exception {
193
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
194
      Scope metricsScope =
1✔
195
          workerMetricsScope.tagged(
1✔
196
              ImmutableMap.of(
1✔
197
                  MetricsTag.ACTIVITY_TYPE,
198
                  pollResponse.getActivityType().getName(),
1✔
199
                  MetricsTag.WORKFLOW_TYPE,
200
                  pollResponse.getWorkflowType().getName()));
1✔
201

202
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
203
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
204
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
205
      MDC.put(LoggerTag.WORKFLOW_TYPE, pollResponse.getWorkflowType().getName());
1✔
206
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
207

208
      ActivityTaskHandler.Result result = null;
1✔
209
      try {
210
        result = handleActivity(task, metricsScope);
1✔
211
      } finally {
212
        MDC.remove(LoggerTag.ACTIVITY_ID);
1✔
213
        MDC.remove(LoggerTag.ACTIVITY_TYPE);
1✔
214
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
215
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
216
        MDC.remove(LoggerTag.RUN_ID);
1✔
217
        if (
1✔
218
        // handleActivity throw an exception (not a normal scenario)
219
        result == null
220
            // completed synchronously or manual completion hasn't been requested
221
            || !result.isManualCompletion()) {
1✔
222
          task.getCompletionCallback().apply();
1✔
223
        }
224
      }
225

226
      if (result.getTaskFailed() != null && result.getTaskFailed().getFailure() instanceof Error) {
1✔
227
        // don't just swallow Errors, we need to propagate it to the top
228
        throw (Error) result.getTaskFailed().getFailure();
×
229
      }
230
    }
1✔
231

232
    private ActivityTaskHandler.Result handleActivity(ActivityTask task, Scope metricsScope) {
233
      PollActivityTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
234
      ByteString taskToken = pollResponse.getTaskToken();
1✔
235
      metricsScope
1✔
236
          .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
1✔
237
          .record(
1✔
238
              ProtobufTimeUtils.toM3Duration(
1✔
239
                  pollResponse.getStartedTime(), pollResponse.getCurrentAttemptScheduledTime()));
1✔
240

241
      ActivityTaskHandler.Result result;
242

243
      Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
1✔
244
      try {
245
        result = handler.handle(task, metricsScope, false);
1✔
246
      } catch (Throwable ex) {
×
247
        // handler.handle if expected to never throw an exception and return result
248
        // that can be used for a workflow callback if this method throws, it's a bug.
249
        log.error("[BUG] Code that expected to never throw an exception threw an exception", ex);
×
250
        throw ex;
×
251
      } finally {
252
        sw.stop();
1✔
253
      }
254

255
      try {
256
        sendReply(taskToken, result, metricsScope);
1✔
257
      } catch (Exception e) {
1✔
258
        logExceptionDuringResultReporting(e, pollResponse, result);
1✔
259
        // TODO this class doesn't report activity success and failure metrics now, instead it's
260
        //  located inside an activity handler. We should lift it up to this level,
261
        //  so we can increment a failure counter instead of success if send result failed.
262
        //  This will also align the behavior of ActivityWorker with WorkflowWorker.
263
        throw e;
1✔
264
      }
1✔
265

266
      if (result.getTaskCompleted() != null) {
1✔
267
        Duration e2eDuration =
1✔
268
            ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getScheduledTime());
1✔
269
        metricsScope.timer(MetricsType.ACTIVITY_SUCCEED_E2E_LATENCY).record(e2eDuration);
1✔
270
      }
271

272
      return result;
1✔
273
    }
274

275
    @Override
276
    public Throwable wrapFailure(ActivityTask t, Throwable failure) {
277
      PollActivityTaskQueueResponseOrBuilder response = t.getResponse();
1✔
278
      WorkflowExecution execution = response.getWorkflowExecution();
1✔
279
      return new RuntimeException(
1✔
280
          "Failure processing activity response. WorkflowId="
281
              + execution.getWorkflowId()
1✔
282
              + ", RunId="
283
              + execution.getRunId()
1✔
284
              + ", ActivityType="
285
              + response.getActivityType().getName()
1✔
286
              + ", ActivityId="
287
              + response.getActivityId(),
1✔
288
          failure);
289
    }
290

291
    private void sendReply(
292
        ByteString taskToken, ActivityTaskHandler.Result response, Scope metricsScope) {
293
      RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
1✔
294
      if (taskCompleted != null) {
1✔
295
        RespondActivityTaskCompletedRequest request =
1✔
296
            taskCompleted.toBuilder()
1✔
297
                .setTaskToken(taskToken)
1✔
298
                .setIdentity(options.getIdentity())
1✔
299
                .setNamespace(namespace)
1✔
300
                .build();
1✔
301

302
        grpcRetryer.retry(
1✔
303
            () ->
304
                service
1✔
305
                    .blockingStub()
1✔
306
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
307
                    .respondActivityTaskCompleted(request),
1✔
308
            replyGrpcRetryerOptions);
1✔
309
      } else {
1✔
310
        Result.TaskFailedResult taskFailed = response.getTaskFailed();
1✔
311
        if (taskFailed != null) {
1✔
312
          RespondActivityTaskFailedRequest request =
1✔
313
              taskFailed.getTaskFailedRequest().toBuilder()
1✔
314
                  .setTaskToken(taskToken)
1✔
315
                  .setIdentity(options.getIdentity())
1✔
316
                  .setNamespace(namespace)
1✔
317
                  .build();
1✔
318

319
          grpcRetryer.retry(
1✔
320
              () ->
321
                  service
1✔
322
                      .blockingStub()
1✔
323
                      .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
324
                      .respondActivityTaskFailed(request),
1✔
325
              replyGrpcRetryerOptions);
1✔
326
        } else {
1✔
327
          RespondActivityTaskCanceledRequest taskCanceled = response.getTaskCanceled();
1✔
328
          if (taskCanceled != null) {
1✔
329
            RespondActivityTaskCanceledRequest request =
×
330
                taskCanceled.toBuilder()
×
331
                    .setTaskToken(taskToken)
×
332
                    .setIdentity(options.getIdentity())
×
333
                    .setNamespace(namespace)
×
334
                    .build();
×
335

336
            grpcRetryer.retry(
×
337
                () ->
338
                    service
×
339
                        .blockingStub()
×
340
                        .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
×
341
                        .respondActivityTaskCanceled(request),
×
342
                replyGrpcRetryerOptions);
×
343
          }
344
        }
345
      }
346
      // Manual activity completion
347
    }
1✔
348

349
    private void logExceptionDuringResultReporting(
350
        Exception e,
351
        PollActivityTaskQueueResponseOrBuilder pollResponse,
352
        ActivityTaskHandler.Result result) {
353
      MDC.put(LoggerTag.ACTIVITY_ID, pollResponse.getActivityId());
1✔
354
      MDC.put(LoggerTag.ACTIVITY_TYPE, pollResponse.getActivityType().getName());
1✔
355
      MDC.put(LoggerTag.WORKFLOW_ID, pollResponse.getWorkflowExecution().getWorkflowId());
1✔
356
      MDC.put(LoggerTag.RUN_ID, pollResponse.getWorkflowExecution().getRunId());
1✔
357

358
      if (log.isDebugEnabled()) {
1✔
359
        log.debug(
×
360
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}, ActivityResult={}",
361
            pollResponse.getActivityId(),
×
362
            pollResponse.getActivityType().getName(),
×
363
            pollResponse.getWorkflowExecution().getWorkflowId(),
×
364
            pollResponse.getWorkflowType().getName(),
×
365
            pollResponse.getWorkflowExecution().getRunId(),
×
366
            result,
367
            e);
368
      } else {
369
        log.warn(
1✔
370
            "Failure during reporting of activity result to the server. ActivityId = {}, ActivityType = {}, WorkflowId={}, WorkflowType={}, RunId={}",
371
            pollResponse.getActivityId(),
1✔
372
            pollResponse.getActivityType().getName(),
1✔
373
            pollResponse.getWorkflowExecution().getWorkflowId(),
1✔
374
            pollResponse.getWorkflowType().getName(),
1✔
375
            pollResponse.getWorkflowExecution().getRunId(),
1✔
376
            e);
377
      }
378
    }
1✔
379
  }
380

381
  private final class EagerActivityDispatcherImpl implements EagerActivityDispatcher {
1✔
382
    @Override
383
    public boolean tryReserveActivitySlot(
384
        ScheduleActivityTaskCommandAttributesOrBuilder commandAttributes) {
385
      return WorkerLifecycleState.ACTIVE.equals(ActivityWorker.this.getLifecycleState())
1✔
386
          && Objects.equals(
1✔
387
              commandAttributes.getTaskQueue().getName(), ActivityWorker.this.taskQueue)
1✔
388
          && ActivityWorker.this.executorSlotsSemaphore.tryAcquire();
1✔
389
    }
390

391
    @Override
392
    public void releaseActivitySlotReservations(int slotCounts) {
393
      ActivityWorker.this.executorSlotsSemaphore.release(slotCounts);
1✔
394
    }
1✔
395

396
    @Override
397
    public void dispatchActivity(PollActivityTaskQueueResponse activity) {
398
      ActivityWorker.this.pollTaskExecutor.process(
×
399
          new ActivityTask(activity, ActivityWorker.this.executorSlotsSemaphore::release));
×
400
    }
×
401
  }
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc