• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #153

pending completion
#153

push

github-actions

web-flow
Eager Workflow Task Dispatch (#1674)

Issue #1646

Signed-off-by: Dmitry Spikhalskiy <dmitry@spikhalskiy.com>

213 of 213 new or added lines in 22 files covered. (100.0%)

16682 of 20566 relevant lines covered (81.11%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.01
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Strings;
27
import com.google.protobuf.ByteString;
28
import com.uber.m3.tally.Scope;
29
import com.uber.m3.tally.Stopwatch;
30
import com.uber.m3.util.ImmutableMap;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.enums.v1.TaskQueueKind;
33
import io.temporal.api.workflowservice.v1.*;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.serviceclient.MetricsTag;
37
import io.temporal.serviceclient.RpcRetryOptions;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.worker.MetricsType;
40
import io.temporal.worker.WorkerMetricsTag;
41
import io.temporal.worker.WorkflowTaskDispatchHandle;
42
import java.util.Objects;
43
import java.util.Optional;
44
import java.util.concurrent.CompletableFuture;
45
import java.util.concurrent.RejectedExecutionException;
46
import java.util.concurrent.Semaphore;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nonnull;
49
import javax.annotation.Nullable;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52
import org.slf4j.MDC;
53

54
final class WorkflowWorker implements SuspendableWorker {
55
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
56

57
  private final WorkflowRunLockManager runLocks;
58

59
  private final WorkflowServiceStubs service;
60
  private final String namespace;
61
  private final String taskQueueName;
62
  private final SingleWorkerOptions options;
63
  private final WorkflowExecutorCache cache;
64
  private final WorkflowTaskHandler handler;
65
  private final String stickyTaskQueueName;
66
  private final PollerOptions pollerOptions;
67
  private final Scope workerMetricsScope;
68
  private final GrpcRetryer grpcRetryer;
69
  private final EagerActivityDispatcher eagerActivityDispatcher;
70
  private final Semaphore wftExecutorSlotsSemaphore;
71

72
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
73

74
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
75
  //  Currently the implementation looks safe without volatile, but it's brittle.
76
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
77

78
  public WorkflowWorker(
79
      @Nonnull WorkflowServiceStubs service,
80
      @Nonnull String namespace,
81
      @Nonnull String taskQueueName,
82
      @Nullable String stickyTaskQueueName,
83
      @Nonnull SingleWorkerOptions options,
84
      @Nonnull WorkflowRunLockManager runLocks,
85
      @Nonnull WorkflowExecutorCache cache,
86
      @Nonnull WorkflowTaskHandler handler,
87
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
1✔
88
    this.service = Objects.requireNonNull(service);
1✔
89
    this.namespace = Objects.requireNonNull(namespace);
1✔
90
    this.taskQueueName = Objects.requireNonNull(taskQueueName);
1✔
91
    this.options = Objects.requireNonNull(options);
1✔
92
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
93
    this.pollerOptions = getPollerOptions(options);
1✔
94
    this.workerMetricsScope =
1✔
95
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
96
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
97
    this.cache = Objects.requireNonNull(cache);
1✔
98
    this.handler = Objects.requireNonNull(handler);
1✔
99
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
100
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
101
    this.wftExecutorSlotsSemaphore = new Semaphore(options.getTaskExecutorThreadPoolSize());
1✔
102
  }
1✔
103

104
  @Override
105
  public boolean start() {
106
    if (handler.isAnyTypeSupported()) {
1✔
107
      pollTaskExecutor =
1✔
108
          new PollTaskExecutor<>(
109
              namespace,
110
              taskQueueName,
111
              options.getIdentity(),
1✔
112
              new TaskHandlerImpl(handler),
113
              pollerOptions,
114
              options.getTaskExecutorThreadPoolSize(),
1✔
115
              workerMetricsScope,
116
              true);
117
      StickyQueueBalancer stickyQueueBalancer =
1✔
118
          new StickyQueueBalancer(
119
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
120

121
      poller =
1✔
122
          new Poller<>(
123
              options.getIdentity(),
1✔
124
              new WorkflowPollTask(
125
                  service,
126
                  namespace,
127
                  taskQueueName,
128
                  stickyTaskQueueName,
129
                  options.getIdentity(),
1✔
130
                  options.getBinaryChecksum(),
1✔
131
                  wftExecutorSlotsSemaphore,
132
                  stickyQueueBalancer,
133
                  workerMetricsScope),
134
              pollTaskExecutor,
135
              pollerOptions,
136
              workerMetricsScope);
137
      poller.start();
1✔
138

139
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
140

141
      return true;
1✔
142
    } else {
143
      return false;
1✔
144
    }
145
  }
146

147
  @Override
148
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
149
    return poller.shutdown(shutdownManager, interruptTasks);
1✔
150
    // TODO wait for returning all slots to semaphore to make sure all eager dispatch handlers are
151
    // respected
152
  }
153

154
  @Override
155
  public void awaitTermination(long timeout, TimeUnit unit) {
156
    ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
157
    // TODO wait for returning all slots to semaphore to make sure all eager dispatch handlers are
158
    // respected
159
  }
1✔
160

161
  @Override
162
  public void suspendPolling() {
163
    poller.suspendPolling();
1✔
164
  }
1✔
165

166
  @Override
167
  public void resumePolling() {
168
    poller.resumePolling();
1✔
169
  }
1✔
170

171
  @Override
172
  public boolean isSuspended() {
173
    return poller.isSuspended();
1✔
174
  }
175

176
  @Override
177
  public boolean isShutdown() {
178
    return poller.isShutdown();
×
179
  }
180

181
  @Override
182
  public boolean isTerminated() {
183
    return poller.isTerminated();
×
184
  }
185

186
  @Override
187
  public WorkerLifecycleState getLifecycleState() {
188
    return poller.getLifecycleState();
×
189
  }
190

191
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
192
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
193
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
194
      pollerOptions =
1✔
195
          PollerOptions.newBuilder(pollerOptions)
1✔
196
              .setPollThreadNamePrefix(
1✔
197
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueueName))
1✔
198
              .build();
1✔
199
    }
200
    return pollerOptions;
1✔
201
  }
202

203
  @Nullable
204
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
205
    // to avoid pollTaskExecutor to become null inside the lambda, we are caching it here
206
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
207
    return executor != null && !isSuspended() && wftExecutorSlotsSemaphore.tryAcquire()
1✔
208
        ? new WorkflowTaskDispatchHandle(
1✔
209
            workflowTask -> {
210
              String queueName =
1✔
211
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
212
              TaskQueueKind queueKind =
1✔
213
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
214
              Preconditions.checkArgument(
1✔
215
                  this.taskQueueName.equals(queueName)
1✔
216
                      || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
217
                          && this.stickyTaskQueueName.equals(queueName),
1✔
218
                  "Got a WFT for a wrong queue %s, expected %s or %s",
219
                  queueName,
220
                  this.taskQueueName,
221
                  this.stickyTaskQueueName);
222
              try {
223
                pollTaskExecutor.process(workflowTask);
1✔
224
                return true;
1✔
225
              } catch (RejectedExecutionException e) {
×
226
                return false;
×
227
              }
228
            },
229
            wftExecutorSlotsSemaphore)
230
        : null;
1✔
231
  }
232

233
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
234

235
    final WorkflowTaskHandler handler;
236

237
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
238
      this.handler = handler;
1✔
239
    }
1✔
240

241
    @Override
242
    public void handle(WorkflowTask task) throws Exception {
243
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
244
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
245
      String runId = workflowExecution.getRunId();
1✔
246
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
247

248
      Scope workflowTypeScope =
1✔
249
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
250

251
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
252
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
253
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
254

255
      boolean locked = false;
1✔
256
      if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
257
        // Serialize workflow task processing for a particular workflow run.
258
        // This is used to make sure that query tasks and real workflow tasks
259
        // are serialized when sticky is on.
260
        //
261
        // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
262
        // id waiting for a lock and consuming threads in case if lock is unavailable.
263
        //
264
        // Throws interrupted exception which is propagated. It's a correct way to handle it here.
265
        //
266
        // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
267
        //   This value should be dynamically configured.
268
        // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
269
        //   any sense?
270
        //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
271
        //   progress on the worker and the next workflow task got picked up by the same exact
272
        //   worker from the general non-sticky task queue.
273
        //   Even in this case, this advice looks misleading, something else is going on
274
        //   (like an extreme network latency).
275
        locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
276

277
        if (!locked) {
1✔
278
          throw new UnableToAcquireLockException(
×
279
              "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
280
                  + "consider increasing workflow task timeout.");
281
        }
282
      }
283

284
      Stopwatch swTotal =
1✔
285
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
286
      try {
287
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
288
        do {
289
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
290
          nextWFTResponse = Optional.empty();
1✔
291
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
292
          try {
293
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
294
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
295
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
296

297
            if (taskCompleted != null) {
1✔
298
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
299
                  taskCompleted.toBuilder();
1✔
300
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
301
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
302
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
303
                RespondWorkflowTaskCompletedResponse response =
1✔
304
                    sendTaskCompleted(
1✔
305
                        currentTask.getTaskToken(),
1✔
306
                        requestBuilder,
307
                        result.getRequestRetryOptions(),
1✔
308
                        workflowTypeScope);
309
                nextWFTResponse =
310
                    response.hasWorkflowTask()
1✔
311
                        ? Optional.of(response.getWorkflowTask())
×
312
                        : Optional.empty();
1✔
313
                // TODO we don't have to do this under the runId lock
314
                activitySlotsReservation.handleResponse(response);
1✔
315
              }
316
            } else if (taskFailed != null) {
1✔
317
              sendTaskFailed(
1✔
318
                  currentTask.getTaskToken(),
1✔
319
                  taskFailed.toBuilder(),
1✔
320
                  result.getRequestRetryOptions(),
1✔
321
                  workflowTypeScope);
322
            } else if (queryCompleted != null) {
1✔
323
              sendDirectQueryCompletedResponse(
1✔
324
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
325
            }
326
          } catch (Exception e) {
1✔
327
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
328
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
1✔
329
            // if we failed to report the workflow task completion back to the server,
330
            // our cached version of the workflow may be more advanced than the server is aware of.
331
            // We should discard this execution and perform a clean replay based on what server
332
            // knows next time.
333
            cache.invalidate(
1✔
334
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
335
            throw e;
1✔
336
          }
1✔
337

338
          // this should be after sendReply, otherwise we may log
339
          // WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
340
          if (result.getTaskFailed() != null) {
1✔
341
            // we don't trigger the counter in case of the legacy query
342
            // (which never has taskFailed set)
343
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
1✔
344
          }
345
          if (nextWFTResponse.isPresent()) {
1✔
346
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
×
347
          }
348
        } while (nextWFTResponse.isPresent());
1✔
349
      } finally {
350
        swTotal.stop();
1✔
351
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
352
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
353
        MDC.remove(LoggerTag.RUN_ID);
1✔
354

355
        task.getCompletionCallback().apply();
1✔
356

357
        if (locked) {
1✔
358
          runLocks.unlock(runId);
1✔
359
        }
360
      }
361
    }
1✔
362

363
    @Override
364
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
365
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
366
      return new RuntimeException(
1✔
367
          "Failure processing workflow task. WorkflowId="
368
              + execution.getWorkflowId()
1✔
369
              + ", RunId="
370
              + execution.getRunId()
1✔
371
              + ", Attempt="
372
              + task.getResponse().getAttempt(),
1✔
373
          failure);
374
    }
375

376
    private WorkflowTaskHandler.Result handleTask(
377
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
378
      Stopwatch sw =
1✔
379
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
380
      try {
381
        return handler.handleWorkflowTask(task);
1✔
382
      } catch (Throwable e) {
1✔
383
        // more detailed logging that we can do here is already done inside `handler`
384
        workflowTypeMetricsScope
1✔
385
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
386
            .inc(1);
1✔
387
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
388
        throw e;
1✔
389
      } finally {
390
        sw.stop();
1✔
391
      }
392
    }
393

394
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
395
        ByteString taskToken,
396
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
397
        RpcRetryOptions retryOptions,
398
        Scope workflowTypeMetricsScope) {
399
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
400
          new GrpcRetryer.GrpcRetryerOptions(
401
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
402

403
      taskCompleted
1✔
404
          .setIdentity(options.getIdentity())
1✔
405
          .setNamespace(namespace)
1✔
406
          .setBinaryChecksum(options.getBinaryChecksum())
1✔
407
          .setTaskToken(taskToken);
1✔
408

409
      return grpcRetryer.retryWithResult(
1✔
410
          () ->
411
              service
1✔
412
                  .blockingStub()
1✔
413
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
414
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
415
          grpcRetryOptions);
416
    }
417

418
    private void sendTaskFailed(
419
        ByteString taskToken,
420
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
421
        RpcRetryOptions retryOptions,
422
        Scope workflowTypeMetricsScope) {
423
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
424
          new GrpcRetryer.GrpcRetryerOptions(
425
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
426

427
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
428

429
      grpcRetryer.retry(
1✔
430
          () ->
431
              service
1✔
432
                  .blockingStub()
1✔
433
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
434
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
435
          grpcRetryOptions);
436
    }
1✔
437

438
    private void sendDirectQueryCompletedResponse(
439
        ByteString taskToken,
440
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
441
        Scope workflowTypeMetricsScope) {
442
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
443
      // Do not retry query response
444
      service
1✔
445
          .blockingStub()
1✔
446
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
447
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
448
    }
1✔
449

450
    private void logExceptionDuringResultReporting(
451
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
452
      if (log.isDebugEnabled()) {
1✔
453
        log.debug(
×
454
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
455
            currentTask.getWorkflowExecution().getWorkflowId(),
×
456
            currentTask.getWorkflowExecution().getRunId(),
×
457
            currentTask.getStartedEventId(),
×
458
            result,
459
            e);
460
      } else {
461
        log.warn(
1✔
462
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
463
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
464
            currentTask.getWorkflowExecution().getRunId(),
1✔
465
            currentTask.getStartedEventId(),
1✔
466
            e);
467
      }
468
    }
1✔
469
  }
470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc