• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #169

pending completion
#169

push

github-actions

web-flow
Remove use of deprecated API (#1758)

4 of 4 new or added lines in 1 file covered. (100.0%)

17345 of 21558 relevant lines covered (80.46%)

0.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.23
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Strings;
27
import com.google.protobuf.ByteString;
28
import com.uber.m3.tally.Scope;
29
import com.uber.m3.tally.Stopwatch;
30
import com.uber.m3.util.ImmutableMap;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.enums.v1.TaskQueueKind;
33
import io.temporal.api.workflowservice.v1.*;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.serviceclient.MetricsTag;
37
import io.temporal.serviceclient.RpcRetryOptions;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.worker.MetricsType;
40
import io.temporal.worker.WorkerMetricsTag;
41
import io.temporal.worker.WorkflowTaskDispatchHandle;
42
import java.util.Objects;
43
import java.util.Optional;
44
import java.util.concurrent.CompletableFuture;
45
import java.util.concurrent.RejectedExecutionException;
46
import java.util.concurrent.Semaphore;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nonnull;
49
import javax.annotation.Nullable;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52
import org.slf4j.MDC;
53

54
final class WorkflowWorker implements SuspendableWorker {
55
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
56

57
  private final WorkflowRunLockManager runLocks;
58

59
  private final WorkflowServiceStubs service;
60
  private final String namespace;
61
  private final String taskQueue;
62
  private final SingleWorkerOptions options;
63
  private final WorkflowExecutorCache cache;
64
  private final WorkflowTaskHandler handler;
65
  private final String stickyTaskQueueName;
66
  private final PollerOptions pollerOptions;
67
  private final Scope workerMetricsScope;
68
  private final GrpcRetryer grpcRetryer;
69
  private final EagerActivityDispatcher eagerActivityDispatcher;
70
  private final int executorSlots;
71
  private final Semaphore executorSlotsSemaphore;
72

73
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
74

75
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
76
  //  Currently the implementation looks safe without volatile, but it's brittle.
77
  @Nonnull private SuspendableWorker poller = new NoopWorker();
78

79
  public WorkflowWorker(
80
      @Nonnull WorkflowServiceStubs service,
81
      @Nonnull String namespace,
82
      @Nonnull String taskQueue,
83
      @Nullable String stickyTaskQueueName,
84
      @Nonnull SingleWorkerOptions options,
85
      @Nonnull WorkflowRunLockManager runLocks,
86
      @Nonnull WorkflowExecutorCache cache,
87
      @Nonnull WorkflowTaskHandler handler,
88
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
89
    this.service = Objects.requireNonNull(service);
90
    this.namespace = Objects.requireNonNull(namespace);
91
    this.taskQueue = Objects.requireNonNull(taskQueue);
92
    this.options = Objects.requireNonNull(options);
93
    this.stickyTaskQueueName = stickyTaskQueueName;
94
    this.pollerOptions = getPollerOptions(options);
95
    this.workerMetricsScope =
96
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
97
    this.runLocks = Objects.requireNonNull(runLocks);
98
    this.cache = Objects.requireNonNull(cache);
99
    this.handler = Objects.requireNonNull(handler);
100
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
101
    this.eagerActivityDispatcher = eagerActivityDispatcher;
102
    this.executorSlots = options.getTaskExecutorThreadPoolSize();
103
    this.executorSlotsSemaphore = new Semaphore(executorSlots);
104
  }
105

106
  @Override
107
  public boolean start() {
108
    if (handler.isAnyTypeSupported()) {
109
      pollTaskExecutor =
110
          new PollTaskExecutor<>(
111
              namespace,
112
              taskQueue,
113
              options.getIdentity(),
114
              new TaskHandlerImpl(handler),
115
              pollerOptions,
116
              options.getTaskExecutorThreadPoolSize(),
117
              workerMetricsScope,
118
              true);
119
      StickyQueueBalancer stickyQueueBalancer =
120
          new StickyQueueBalancer(
121
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
122

123
      poller =
124
          new Poller<>(
125
              options.getIdentity(),
126
              new WorkflowPollTask(
127
                  service,
128
                  namespace,
129
                  taskQueue,
130
                  stickyTaskQueueName,
131
                  options.getIdentity(),
132
                  options.getBinaryChecksum(),
133
                  executorSlotsSemaphore,
134
                  stickyQueueBalancer,
135
                  workerMetricsScope),
136
              pollTaskExecutor,
137
              pollerOptions,
138
              workerMetricsScope);
139
      poller.start();
140

141
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
142

143
      return true;
144
    } else {
145
      return false;
146
    }
147
  }
148

149
  @Override
150
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
151
    String semaphoreName = this + "#executorSlotsSemaphore";
152
    return poller
153
        .shutdown(shutdownManager, interruptTasks)
154
        .thenCompose(
155
            ignore ->
156
                !interruptTasks
157
                    ? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
158
                        executorSlotsSemaphore, executorSlots, semaphoreName)
159
                    : CompletableFuture.completedFuture(null))
160
        .thenCompose(
161
            ignore ->
162
                pollTaskExecutor != null
163
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
164
                    : CompletableFuture.completedFuture(null))
165
        .exceptionally(
166
            e -> {
167
              log.error("Unexpected exception during shutdown", e);
168
              return null;
169
            });
170
  }
171

172
  @Override
173
  public void awaitTermination(long timeout, TimeUnit unit) {
174
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
175
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
176
    // wait separately for intermediate steps
177
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
178
  }
179

180
  @Override
181
  public void suspendPolling() {
182
    poller.suspendPolling();
183
  }
184

185
  @Override
186
  public void resumePolling() {
187
    poller.resumePolling();
188
  }
189

190
  @Override
191
  public boolean isSuspended() {
192
    return poller.isSuspended();
193
  }
194

195
  @Override
196
  public boolean isShutdown() {
197
    return poller.isShutdown();
198
  }
199

200
  @Override
201
  public boolean isTerminated() {
202
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
203
  }
204

205
  @Override
206
  public WorkerLifecycleState getLifecycleState() {
207
    return poller.getLifecycleState();
208
  }
209

210
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
211
    PollerOptions pollerOptions = options.getPollerOptions();
212
    if (pollerOptions.getPollThreadNamePrefix() == null) {
213
      pollerOptions =
214
          PollerOptions.newBuilder(pollerOptions)
215
              .setPollThreadNamePrefix(
216
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
217
              .build();
218
    }
219
    return pollerOptions;
220
  }
221

222
  @Nullable
223
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
224
    // to avoid pollTaskExecutor to become null inside the lambda, we are caching it here
225
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
226
    return executor != null && !isSuspended() && executorSlotsSemaphore.tryAcquire()
227
        ? new WorkflowTaskDispatchHandle(
228
            workflowTask -> {
229
              String queueName =
230
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
231
              TaskQueueKind queueKind =
232
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
233
              Preconditions.checkArgument(
234
                  this.taskQueue.equals(queueName)
235
                      || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
236
                          && this.stickyTaskQueueName.equals(queueName),
237
                  "Got a WFT for a wrong queue %s, expected %s or %s",
238
                  queueName,
239
                  this.taskQueue,
240
                  this.stickyTaskQueueName);
241
              try {
242
                pollTaskExecutor.process(workflowTask);
243
                return true;
244
              } catch (RejectedExecutionException e) {
245
                return false;
246
              }
247
            },
248
            executorSlotsSemaphore)
249
        : null;
250
  }
251

252
  @Override
253
  public String toString() {
254
    return String.format(
255
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
256
        options.getIdentity(), namespace, taskQueue);
257
  }
258

259
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
260

261
    final WorkflowTaskHandler handler;
262

263
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
264
      this.handler = handler;
265
    }
266

267
    @Override
268
    public void handle(WorkflowTask task) throws Exception {
269
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
270
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
271
      String runId = workflowExecution.getRunId();
272
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
273

274
      Scope workflowTypeScope =
275
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
276

277
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
278
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
279
      MDC.put(LoggerTag.RUN_ID, runId);
280

281
      boolean locked = false;
282
      if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
283
        // Serialize workflow task processing for a particular workflow run.
284
        // This is used to make sure that query tasks and real workflow tasks
285
        // are serialized when sticky is on.
286
        //
287
        // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
288
        // id waiting for a lock and consuming threads in case if lock is unavailable.
289
        //
290
        // Throws interrupted exception which is propagated. It's a correct way to handle it here.
291
        //
292
        // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
293
        //   This value should be dynamically configured.
294
        // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
295
        //   any sense?
296
        //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
297
        //   progress on the worker and the next workflow task got picked up by the same exact
298
        //   worker from the general non-sticky task queue.
299
        //   Even in this case, this advice looks misleading, something else is going on
300
        //   (like an extreme network latency).
301
        locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
302

303
        if (!locked) {
304
          throw new UnableToAcquireLockException(
305
              "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
306
                  + "consider increasing workflow task timeout.");
307
        }
308
      }
309

310
      Stopwatch swTotal =
311
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
312
      try {
313
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
314
        do {
315
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
316
          nextWFTResponse = Optional.empty();
317
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
318
          try {
319
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
320
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
321
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
322

323
            if (taskCompleted != null) {
324
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
325
                  taskCompleted.toBuilder();
326
              try (EagerActivitySlotsReservation activitySlotsReservation =
327
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
328
                activitySlotsReservation.applyToRequest(requestBuilder);
329
                RespondWorkflowTaskCompletedResponse response =
330
                    sendTaskCompleted(
331
                        currentTask.getTaskToken(),
332
                        requestBuilder,
333
                        result.getRequestRetryOptions(),
334
                        workflowTypeScope);
335
                // If we were processing a speculative WFT the server may instruct us that the task
336
                // was dropped by resting out event ID.
337
                long resetEventId = response.getResetHistoryEventId();
338
                if (resetEventId != 0) {
339
                  result.getEventIdSetHandle().apply(resetEventId);
340
                }
341
                nextWFTResponse =
342
                    response.hasWorkflowTask()
343
                        ? Optional.of(response.getWorkflowTask())
344
                        : Optional.empty();
345
                // TODO we don't have to do this under the runId lock
346
                activitySlotsReservation.handleResponse(response);
347
              }
348
            } else if (taskFailed != null) {
349
              sendTaskFailed(
350
                  currentTask.getTaskToken(),
351
                  taskFailed.toBuilder(),
352
                  result.getRequestRetryOptions(),
353
                  workflowTypeScope);
354
            } else if (queryCompleted != null) {
355
              sendDirectQueryCompletedResponse(
356
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
357
            }
358
          } catch (Exception e) {
359
            logExceptionDuringResultReporting(e, currentTask, result);
360
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
361
            // if we failed to report the workflow task completion back to the server,
362
            // our cached version of the workflow may be more advanced than the server is aware of.
363
            // We should discard this execution and perform a clean replay based on what server
364
            // knows next time.
365
            cache.invalidate(
366
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
367
            throw e;
368
          }
369

370
          // this should be after sendReply, otherwise we may log
371
          // WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
372
          if (result.getTaskFailed() != null) {
373
            // we don't trigger the counter in case of the legacy query
374
            // (which never has taskFailed set)
375
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
376
          }
377
          if (nextWFTResponse.isPresent()) {
378
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
379
          }
380
        } while (nextWFTResponse.isPresent());
381
      } finally {
382
        swTotal.stop();
383
        MDC.remove(LoggerTag.WORKFLOW_ID);
384
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
385
        MDC.remove(LoggerTag.RUN_ID);
386

387
        task.getCompletionCallback().apply();
388

389
        if (locked) {
390
          runLocks.unlock(runId);
391
        }
392
      }
393
    }
394

395
    @Override
396
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
397
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
398
      return new RuntimeException(
399
          "Failure processing workflow task. WorkflowId="
400
              + execution.getWorkflowId()
401
              + ", RunId="
402
              + execution.getRunId()
403
              + ", Attempt="
404
              + task.getResponse().getAttempt(),
405
          failure);
406
    }
407

408
    private WorkflowTaskHandler.Result handleTask(
409
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
410
      Stopwatch sw =
411
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
412
      try {
413
        return handler.handleWorkflowTask(task);
414
      } catch (Throwable e) {
415
        // more detailed logging that we can do here is already done inside `handler`
416
        workflowTypeMetricsScope
417
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
418
            .inc(1);
419
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
420
        throw e;
421
      } finally {
422
        sw.stop();
423
      }
424
    }
425

426
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
427
        ByteString taskToken,
428
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
429
        RpcRetryOptions retryOptions,
430
        Scope workflowTypeMetricsScope) {
431
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
432
          new GrpcRetryer.GrpcRetryerOptions(
433
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
434

435
      taskCompleted
436
          .setIdentity(options.getIdentity())
437
          .setNamespace(namespace)
438
          .setBinaryChecksum(options.getBinaryChecksum())
439
          .setTaskToken(taskToken);
440

441
      return grpcRetryer.retryWithResult(
442
          () ->
443
              service
444
                  .blockingStub()
445
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
446
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
447
          grpcRetryOptions);
448
    }
449

450
    private void sendTaskFailed(
451
        ByteString taskToken,
452
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
453
        RpcRetryOptions retryOptions,
454
        Scope workflowTypeMetricsScope) {
455
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
456
          new GrpcRetryer.GrpcRetryerOptions(
457
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
458

459
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
460

461
      grpcRetryer.retry(
462
          () ->
463
              service
464
                  .blockingStub()
465
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
466
                  .respondWorkflowTaskFailed(taskFailed.build()),
467
          grpcRetryOptions);
468
    }
469

470
    private void sendDirectQueryCompletedResponse(
471
        ByteString taskToken,
472
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
473
        Scope workflowTypeMetricsScope) {
474
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
475
      // Do not retry query response
476
      service
477
          .blockingStub()
478
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
479
          .respondQueryTaskCompleted(queryCompleted.build());
480
    }
481

482
    private void logExceptionDuringResultReporting(
483
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
484
      if (log.isDebugEnabled()) {
485
        log.debug(
486
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
487
            currentTask.getWorkflowExecution().getWorkflowId(),
488
            currentTask.getWorkflowExecution().getRunId(),
489
            currentTask.getStartedEventId(),
490
            result,
491
            e);
492
      } else {
493
        log.warn(
494
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
495
            currentTask.getWorkflowExecution().getWorkflowId(),
496
            currentTask.getWorkflowExecution().getRunId(),
497
            currentTask.getStartedEventId(),
498
            e);
499
      }
500
    }
501
  }
502
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc