• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #160

pending completion
#160

push

github-actions

web-flow
Wait for worker slots to be fully released in the graceful worker shutdown (#1679)

95 of 95 new or added lines in 5 files covered. (100.0%)

17058 of 20907 relevant lines covered (81.59%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.61
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Strings;
27
import com.google.protobuf.ByteString;
28
import com.uber.m3.tally.Scope;
29
import com.uber.m3.tally.Stopwatch;
30
import com.uber.m3.util.ImmutableMap;
31
import io.temporal.api.common.v1.WorkflowExecution;
32
import io.temporal.api.enums.v1.TaskQueueKind;
33
import io.temporal.api.workflowservice.v1.*;
34
import io.temporal.internal.logging.LoggerTag;
35
import io.temporal.internal.retryer.GrpcRetryer;
36
import io.temporal.serviceclient.MetricsTag;
37
import io.temporal.serviceclient.RpcRetryOptions;
38
import io.temporal.serviceclient.WorkflowServiceStubs;
39
import io.temporal.worker.MetricsType;
40
import io.temporal.worker.WorkerMetricsTag;
41
import io.temporal.worker.WorkflowTaskDispatchHandle;
42
import java.util.Objects;
43
import java.util.Optional;
44
import java.util.concurrent.CompletableFuture;
45
import java.util.concurrent.RejectedExecutionException;
46
import java.util.concurrent.Semaphore;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nonnull;
49
import javax.annotation.Nullable;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52
import org.slf4j.MDC;
53

54
final class WorkflowWorker implements SuspendableWorker {
55
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
56

57
  private final WorkflowRunLockManager runLocks;
58

59
  private final WorkflowServiceStubs service;
60
  private final String namespace;
61
  private final String taskQueue;
62
  private final SingleWorkerOptions options;
63
  private final WorkflowExecutorCache cache;
64
  private final WorkflowTaskHandler handler;
65
  private final String stickyTaskQueueName;
66
  private final PollerOptions pollerOptions;
67
  private final Scope workerMetricsScope;
68
  private final GrpcRetryer grpcRetryer;
69
  private final EagerActivityDispatcher eagerActivityDispatcher;
70
  private final int executorSlots;
71
  private final Semaphore executorSlotsSemaphore;
72

73
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
74

75
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
76
  //  Currently the implementation looks safe without volatile, but it's brittle.
77
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
78

79
  public WorkflowWorker(
80
      @Nonnull WorkflowServiceStubs service,
81
      @Nonnull String namespace,
82
      @Nonnull String taskQueue,
83
      @Nullable String stickyTaskQueueName,
84
      @Nonnull SingleWorkerOptions options,
85
      @Nonnull WorkflowRunLockManager runLocks,
86
      @Nonnull WorkflowExecutorCache cache,
87
      @Nonnull WorkflowTaskHandler handler,
88
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
1✔
89
    this.service = Objects.requireNonNull(service);
1✔
90
    this.namespace = Objects.requireNonNull(namespace);
1✔
91
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
92
    this.options = Objects.requireNonNull(options);
1✔
93
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
94
    this.pollerOptions = getPollerOptions(options);
1✔
95
    this.workerMetricsScope =
1✔
96
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
97
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
98
    this.cache = Objects.requireNonNull(cache);
1✔
99
    this.handler = Objects.requireNonNull(handler);
1✔
100
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
101
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
102
    this.executorSlots = options.getTaskExecutorThreadPoolSize();
1✔
103
    this.executorSlotsSemaphore = new Semaphore(executorSlots);
1✔
104
  }
1✔
105

106
  @Override
107
  public boolean start() {
108
    if (handler.isAnyTypeSupported()) {
1✔
109
      pollTaskExecutor =
1✔
110
          new PollTaskExecutor<>(
111
              namespace,
112
              taskQueue,
113
              options.getIdentity(),
1✔
114
              new TaskHandlerImpl(handler),
115
              pollerOptions,
116
              options.getTaskExecutorThreadPoolSize(),
1✔
117
              workerMetricsScope,
118
              true);
119
      StickyQueueBalancer stickyQueueBalancer =
1✔
120
          new StickyQueueBalancer(
121
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
122

123
      poller =
1✔
124
          new Poller<>(
125
              options.getIdentity(),
1✔
126
              new WorkflowPollTask(
127
                  service,
128
                  namespace,
129
                  taskQueue,
130
                  stickyTaskQueueName,
131
                  options.getIdentity(),
1✔
132
                  options.getBinaryChecksum(),
1✔
133
                  executorSlotsSemaphore,
134
                  stickyQueueBalancer,
135
                  workerMetricsScope),
136
              pollTaskExecutor,
137
              pollerOptions,
138
              workerMetricsScope);
139
      poller.start();
1✔
140

141
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
142

143
      return true;
1✔
144
    } else {
145
      return false;
1✔
146
    }
147
  }
148

149
  @Override
150
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
151
    String semaphoreName = this + "#executorSlotsSemaphore";
1✔
152
    return poller
1✔
153
        .shutdown(shutdownManager, interruptTasks)
1✔
154
        .thenCompose(
1✔
155
            ignore ->
156
                !interruptTasks
1✔
157
                    ? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
1✔
158
                        executorSlotsSemaphore, executorSlots, semaphoreName)
159
                    : CompletableFuture.completedFuture(null))
1✔
160
        .thenCompose(
1✔
161
            ignore ->
162
                pollTaskExecutor != null
1✔
163
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
164
                    : CompletableFuture.completedFuture(null))
1✔
165
        .exceptionally(
1✔
166
            e -> {
167
              log.error("Unexpected exception during shutdown", e);
×
168
              return null;
×
169
            });
170
  }
171

172
  @Override
173
  public void awaitTermination(long timeout, TimeUnit unit) {
174
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
175
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
176
    // wait separately for intermediate steps
177
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
178
  }
1✔
179

180
  @Override
181
  public void suspendPolling() {
182
    poller.suspendPolling();
1✔
183
  }
1✔
184

185
  @Override
186
  public void resumePolling() {
187
    poller.resumePolling();
1✔
188
  }
1✔
189

190
  @Override
191
  public boolean isSuspended() {
192
    return poller.isSuspended();
1✔
193
  }
194

195
  @Override
196
  public boolean isShutdown() {
197
    return poller.isShutdown();
×
198
  }
199

200
  @Override
201
  public boolean isTerminated() {
202
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
×
203
  }
204

205
  @Override
206
  public WorkerLifecycleState getLifecycleState() {
207
    return poller.getLifecycleState();
×
208
  }
209

210
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
211
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
212
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
213
      pollerOptions =
1✔
214
          PollerOptions.newBuilder(pollerOptions)
1✔
215
              .setPollThreadNamePrefix(
1✔
216
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
1✔
217
              .build();
1✔
218
    }
219
    return pollerOptions;
1✔
220
  }
221

222
  @Nullable
223
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
224
    // to avoid pollTaskExecutor to become null inside the lambda, we are caching it here
225
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
226
    return executor != null && !isSuspended() && executorSlotsSemaphore.tryAcquire()
1✔
227
        ? new WorkflowTaskDispatchHandle(
1✔
228
            workflowTask -> {
229
              String queueName =
1✔
230
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
231
              TaskQueueKind queueKind =
1✔
232
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
233
              Preconditions.checkArgument(
1✔
234
                  this.taskQueue.equals(queueName)
1✔
235
                      || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
236
                          && this.stickyTaskQueueName.equals(queueName),
1✔
237
                  "Got a WFT for a wrong queue %s, expected %s or %s",
238
                  queueName,
239
                  this.taskQueue,
240
                  this.stickyTaskQueueName);
241
              try {
242
                pollTaskExecutor.process(workflowTask);
1✔
243
                return true;
1✔
244
              } catch (RejectedExecutionException e) {
×
245
                return false;
×
246
              }
247
            },
248
            executorSlotsSemaphore)
249
        : null;
1✔
250
  }
251

252
  @Override
253
  public String toString() {
254
    return String.format(
1✔
255
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
256
        options.getIdentity(), namespace, taskQueue);
1✔
257
  }
258

259
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
260

261
    final WorkflowTaskHandler handler;
262

263
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
264
      this.handler = handler;
1✔
265
    }
1✔
266

267
    @Override
268
    public void handle(WorkflowTask task) throws Exception {
269
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
270
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
271
      String runId = workflowExecution.getRunId();
1✔
272
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
273

274
      Scope workflowTypeScope =
1✔
275
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
276

277
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
278
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
279
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
280

281
      boolean locked = false;
1✔
282
      if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
283
        // Serialize workflow task processing for a particular workflow run.
284
        // This is used to make sure that query tasks and real workflow tasks
285
        // are serialized when sticky is on.
286
        //
287
        // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
288
        // id waiting for a lock and consuming threads in case if lock is unavailable.
289
        //
290
        // Throws interrupted exception which is propagated. It's a correct way to handle it here.
291
        //
292
        // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
293
        //   This value should be dynamically configured.
294
        // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
295
        //   any sense?
296
        //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
297
        //   progress on the worker and the next workflow task got picked up by the same exact
298
        //   worker from the general non-sticky task queue.
299
        //   Even in this case, this advice looks misleading, something else is going on
300
        //   (like an extreme network latency).
301
        locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
302

303
        if (!locked) {
1✔
304
          throw new UnableToAcquireLockException(
×
305
              "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
306
                  + "consider increasing workflow task timeout.");
307
        }
308
      }
309

310
      Stopwatch swTotal =
1✔
311
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
312
      try {
313
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
314
        do {
315
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
316
          nextWFTResponse = Optional.empty();
1✔
317
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
318
          try {
319
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
320
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
321
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
322

323
            if (taskCompleted != null) {
1✔
324
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
325
                  taskCompleted.toBuilder();
1✔
326
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
327
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
328
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
329
                RespondWorkflowTaskCompletedResponse response =
1✔
330
                    sendTaskCompleted(
1✔
331
                        currentTask.getTaskToken(),
1✔
332
                        requestBuilder,
333
                        result.getRequestRetryOptions(),
1✔
334
                        workflowTypeScope);
335
                nextWFTResponse =
336
                    response.hasWorkflowTask()
1✔
337
                        ? Optional.of(response.getWorkflowTask())
×
338
                        : Optional.empty();
1✔
339
                // TODO we don't have to do this under the runId lock
340
                activitySlotsReservation.handleResponse(response);
1✔
341
              }
342
            } else if (taskFailed != null) {
1✔
343
              sendTaskFailed(
1✔
344
                  currentTask.getTaskToken(),
1✔
345
                  taskFailed.toBuilder(),
1✔
346
                  result.getRequestRetryOptions(),
1✔
347
                  workflowTypeScope);
348
            } else if (queryCompleted != null) {
1✔
349
              sendDirectQueryCompletedResponse(
1✔
350
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
351
            }
352
          } catch (Exception e) {
1✔
353
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
354
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
1✔
355
            // if we failed to report the workflow task completion back to the server,
356
            // our cached version of the workflow may be more advanced than the server is aware of.
357
            // We should discard this execution and perform a clean replay based on what server
358
            // knows next time.
359
            cache.invalidate(
1✔
360
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
361
            throw e;
1✔
362
          }
1✔
363

364
          // this should be after sendReply, otherwise we may log
365
          // WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
366
          if (result.getTaskFailed() != null) {
1✔
367
            // we don't trigger the counter in case of the legacy query
368
            // (which never has taskFailed set)
369
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
1✔
370
          }
371
          if (nextWFTResponse.isPresent()) {
1✔
372
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
×
373
          }
374
        } while (nextWFTResponse.isPresent());
1✔
375
      } finally {
376
        swTotal.stop();
1✔
377
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
378
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
379
        MDC.remove(LoggerTag.RUN_ID);
1✔
380

381
        task.getCompletionCallback().apply();
1✔
382

383
        if (locked) {
1✔
384
          runLocks.unlock(runId);
1✔
385
        }
386
      }
387
    }
1✔
388

389
    @Override
390
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
391
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
392
      return new RuntimeException(
1✔
393
          "Failure processing workflow task. WorkflowId="
394
              + execution.getWorkflowId()
1✔
395
              + ", RunId="
396
              + execution.getRunId()
1✔
397
              + ", Attempt="
398
              + task.getResponse().getAttempt(),
1✔
399
          failure);
400
    }
401

402
    private WorkflowTaskHandler.Result handleTask(
403
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
404
      Stopwatch sw =
1✔
405
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
406
      try {
407
        return handler.handleWorkflowTask(task);
1✔
408
      } catch (Throwable e) {
1✔
409
        // more detailed logging that we can do here is already done inside `handler`
410
        workflowTypeMetricsScope
1✔
411
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
412
            .inc(1);
1✔
413
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
414
        throw e;
1✔
415
      } finally {
416
        sw.stop();
1✔
417
      }
418
    }
419

420
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
421
        ByteString taskToken,
422
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
423
        RpcRetryOptions retryOptions,
424
        Scope workflowTypeMetricsScope) {
425
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
426
          new GrpcRetryer.GrpcRetryerOptions(
427
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
428

429
      taskCompleted
1✔
430
          .setIdentity(options.getIdentity())
1✔
431
          .setNamespace(namespace)
1✔
432
          .setBinaryChecksum(options.getBinaryChecksum())
1✔
433
          .setTaskToken(taskToken);
1✔
434

435
      return grpcRetryer.retryWithResult(
1✔
436
          () ->
437
              service
1✔
438
                  .blockingStub()
1✔
439
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
440
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
441
          grpcRetryOptions);
442
    }
443

444
    private void sendTaskFailed(
445
        ByteString taskToken,
446
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
447
        RpcRetryOptions retryOptions,
448
        Scope workflowTypeMetricsScope) {
449
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
450
          new GrpcRetryer.GrpcRetryerOptions(
451
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
452

453
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
454

455
      grpcRetryer.retry(
1✔
456
          () ->
457
              service
1✔
458
                  .blockingStub()
1✔
459
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
460
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
461
          grpcRetryOptions);
462
    }
1✔
463

464
    private void sendDirectQueryCompletedResponse(
465
        ByteString taskToken,
466
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
467
        Scope workflowTypeMetricsScope) {
468
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
469
      // Do not retry query response
470
      service
1✔
471
          .blockingStub()
1✔
472
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
473
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
474
    }
1✔
475

476
    private void logExceptionDuringResultReporting(
477
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
478
      if (log.isDebugEnabled()) {
1✔
479
        log.debug(
×
480
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
481
            currentTask.getWorkflowExecution().getWorkflowId(),
×
482
            currentTask.getWorkflowExecution().getRunId(),
×
483
            currentTask.getStartedEventId(),
×
484
            result,
485
            e);
486
      } else {
487
        log.warn(
1✔
488
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
489
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
490
            currentTask.getWorkflowExecution().getRunId(),
1✔
491
            currentTask.getStartedEventId(),
1✔
492
            e);
493
      }
494
    }
1✔
495
  }
496
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc