• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #331

10 Oct 2024 05:57PM UTC coverage: 78.105% (-0.008%) from 78.113%
#331

push

github

web-flow
Call shutdown RPC on worker shutdown (#2264)

Call shutdownWorker on worker shutdown

38 of 44 new or added lines in 2 files covered. (86.36%)

14 existing lines in 3 files now uncovered.

21350 of 27335 relevant lines covered (78.1%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.82
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.TASK_FAILURE_TYPE;
25

26
import com.google.common.base.Preconditions;
27
import com.google.common.base.Strings;
28
import com.google.protobuf.ByteString;
29
import com.uber.m3.tally.Scope;
30
import com.uber.m3.tally.Stopwatch;
31
import com.uber.m3.util.ImmutableMap;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.enums.v1.TaskQueueKind;
34
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
35
import io.temporal.api.workflowservice.v1.*;
36
import io.temporal.internal.logging.LoggerTag;
37
import io.temporal.internal.retryer.GrpcRetryer;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.RpcRetryOptions;
40
import io.temporal.serviceclient.WorkflowServiceStubs;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.WorkflowTaskDispatchHandle;
44
import io.temporal.worker.tuning.SlotReleaseReason;
45
import io.temporal.worker.tuning.SlotSupplier;
46
import io.temporal.worker.tuning.WorkflowSlotInfo;
47
import java.util.Objects;
48
import java.util.Optional;
49
import java.util.concurrent.CompletableFuture;
50
import java.util.concurrent.RejectedExecutionException;
51
import java.util.concurrent.TimeUnit;
52
import javax.annotation.Nonnull;
53
import javax.annotation.Nullable;
54
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
56
import org.slf4j.MDC;
57

58
final class WorkflowWorker implements SuspendableWorker {
59
  private static final String GRACEFUL_SHUTDOWN_MESSAGE = "graceful shutdown";
60
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
61

62
  private final WorkflowRunLockManager runLocks;
63

64
  private final WorkflowServiceStubs service;
65
  private final String namespace;
66
  private final String taskQueue;
67
  private final SingleWorkerOptions options;
68
  private final WorkflowExecutorCache cache;
69
  private final WorkflowTaskHandler handler;
70
  private final String stickyTaskQueueName;
71
  private final PollerOptions pollerOptions;
72
  private final Scope workerMetricsScope;
73
  private final GrpcRetryer grpcRetryer;
74
  private final EagerActivityDispatcher eagerActivityDispatcher;
75
  private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
76

77
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
78

79
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
80
  //  Currently the implementation looks safe without volatile, but it's brittle.
81
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
82

83
  private StickyQueueBalancer stickyQueueBalancer;
84

85
  public WorkflowWorker(
86
      @Nonnull WorkflowServiceStubs service,
87
      @Nonnull String namespace,
88
      @Nonnull String taskQueue,
89
      @Nullable String stickyTaskQueueName,
90
      @Nonnull SingleWorkerOptions options,
91
      @Nonnull WorkflowRunLockManager runLocks,
92
      @Nonnull WorkflowExecutorCache cache,
93
      @Nonnull WorkflowTaskHandler handler,
94
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher,
95
      @Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier) {
1✔
96
    this.service = Objects.requireNonNull(service);
1✔
97
    this.namespace = Objects.requireNonNull(namespace);
1✔
98
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
99
    this.options = Objects.requireNonNull(options);
1✔
100
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
101
    this.pollerOptions = getPollerOptions(options);
1✔
102
    this.workerMetricsScope =
1✔
103
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
104
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
105
    this.cache = Objects.requireNonNull(cache);
1✔
106
    this.handler = Objects.requireNonNull(handler);
1✔
107
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
108
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
109
    this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
1✔
110
  }
1✔
111

112
  @Override
113
  public boolean start() {
114
    if (handler.isAnyTypeSupported()) {
1✔
115
      pollTaskExecutor =
1✔
116
          new PollTaskExecutor<>(
117
              namespace,
118
              taskQueue,
119
              options.getIdentity(),
1✔
120
              new TaskHandlerImpl(handler),
121
              pollerOptions,
122
              this.slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
1✔
123
              true);
124
      stickyQueueBalancer =
1✔
125
          new StickyQueueBalancer(
126
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
127

128
      poller =
1✔
129
          new Poller<>(
130
              options.getIdentity(),
1✔
131
              new WorkflowPollTask(
132
                  service,
133
                  namespace,
134
                  taskQueue,
135
                  stickyTaskQueueName,
136
                  options.getIdentity(),
1✔
137
                  options.getBuildId(),
1✔
138
                  options.isUsingBuildIdForVersioning(),
1✔
139
                  slotSupplier,
140
                  stickyQueueBalancer,
141
                  workerMetricsScope,
142
                  service.getServerCapabilities()),
1✔
143
              pollTaskExecutor,
144
              pollerOptions,
145
              workerMetricsScope);
146
      poller.start();
1✔
147

148
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
149

150
      return true;
1✔
151
    } else {
152
      return false;
1✔
153
    }
154
  }
155

156
  @Override
157
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
158
    String supplierName = this + "#executorSlots";
1✔
159

160
    boolean stickyQueueBalancerDrainEnabled =
1✔
161
        !interruptTasks
162
            && !options.getDrainStickyTaskQueueTimeout().isZero()
1✔
163
            && stickyTaskQueueName != null
164
            && stickyQueueBalancer != null;
165

166
    CompletableFuture<Void> pollerShutdown =
1✔
167
        CompletableFuture.completedFuture(null)
1✔
168
            .thenCompose(
1✔
169
                ignore ->
170
                    stickyQueueBalancerDrainEnabled
1✔
171
                        ? shutdownManager.waitForStickyQueueBalancer(
1✔
172
                            stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
1✔
173
                        : CompletableFuture.completedFuture(null))
1✔
174
            .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks));
1✔
175
    return CompletableFuture.allOf(
1✔
176
        pollerShutdown.thenCompose(
1✔
177
            ignore -> {
178
              if (!interruptTasks && stickyTaskQueueName != null) {
1✔
179
                return shutdownManager.waitOnWorkerShutdownRequest(
1✔
180
                    service
181
                        .futureStub()
1✔
182
                        .shutdownWorker(
1✔
183
                            ShutdownWorkerRequest.newBuilder()
1✔
184
                                .setIdentity(options.getIdentity())
1✔
185
                                .setNamespace(namespace)
1✔
186
                                .setStickyTaskQueue(stickyTaskQueueName)
1✔
187
                                .setReason(GRACEFUL_SHUTDOWN_MESSAGE)
1✔
188
                                .build()));
1✔
189
              }
190
              return CompletableFuture.completedFuture(null);
1✔
191
            }),
192
        pollerShutdown
193
            .thenCompose(
1✔
194
                ignore ->
195
                    !interruptTasks
1✔
196
                        ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
1✔
197
                            slotSupplier, supplierName)
198
                        : CompletableFuture.completedFuture(null))
1✔
199
            .thenCompose(
1✔
200
                ignore ->
201
                    pollTaskExecutor != null
1✔
202
                        ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
203
                        : CompletableFuture.completedFuture(null))
1✔
204
            .exceptionally(
1✔
205
                e -> {
NEW
206
                  log.error("Unexpected exception during shutdown", e);
×
NEW
207
                  return null;
×
208
                }));
209
  }
210

211
  @Override
212
  public void awaitTermination(long timeout, TimeUnit unit) {
213
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
214
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
215
    // wait separately for intermediate steps
216
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
217
  }
1✔
218

219
  @Override
220
  public void suspendPolling() {
221
    poller.suspendPolling();
1✔
222
  }
1✔
223

224
  @Override
225
  public void resumePolling() {
226
    poller.resumePolling();
1✔
227
  }
1✔
228

229
  @Override
230
  public boolean isSuspended() {
231
    return poller.isSuspended();
1✔
232
  }
233

234
  @Override
235
  public boolean isShutdown() {
236
    return poller.isShutdown();
×
237
  }
238

239
  @Override
240
  public boolean isTerminated() {
241
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
242
  }
243

244
  @Override
245
  public WorkerLifecycleState getLifecycleState() {
246
    return poller.getLifecycleState();
×
247
  }
248

249
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
250
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
251
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
252
      pollerOptions =
1✔
253
          PollerOptions.newBuilder(pollerOptions)
1✔
254
              .setPollThreadNamePrefix(
1✔
255
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
1✔
256
              .build();
1✔
257
    }
258
    return pollerOptions;
1✔
259
  }
260

261
  @Nullable
262
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
263
    // to avoid pollTaskExecutor becoming null inside the lambda, we cache it here
264
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
265
    if (executor == null || isSuspended()) {
1✔
266
      return null;
1✔
267
    }
268
    return slotSupplier
1✔
269
        .tryReserveSlot(
1✔
270
            new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId()))
1✔
271
        .map(
1✔
272
            slotPermit ->
273
                new WorkflowTaskDispatchHandle(
1✔
274
                    workflowTask -> {
275
                      String queueName =
1✔
276
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
277
                      TaskQueueKind queueKind =
1✔
278
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
279
                      Preconditions.checkArgument(
1✔
280
                          this.taskQueue.equals(queueName)
1✔
281
                              || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
282
                                  && this.stickyTaskQueueName.equals(queueName),
1✔
283
                          "Got a WFT for a wrong queue %s, expected %s or %s",
284
                          queueName,
285
                          this.taskQueue,
286
                          this.stickyTaskQueueName);
287
                      try {
288
                        pollTaskExecutor.process(workflowTask);
1✔
289
                        return true;
1✔
290
                      } catch (RejectedExecutionException e) {
×
291
                        return false;
×
292
                      }
293
                    },
294
                    slotSupplier,
295
                    slotPermit))
296
        .orElse(null);
1✔
297
  }
298

299
  @Override
300
  public String toString() {
301
    return String.format(
1✔
302
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
303
        options.getIdentity(), namespace, taskQueue);
1✔
304
  }
305

306
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
307

308
    final WorkflowTaskHandler handler;
309

310
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
311
      this.handler = handler;
1✔
312
    }
1✔
313

314
    @Override
315
    public void handle(WorkflowTask task) throws Exception {
316
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
317
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
318
      String runId = workflowExecution.getRunId();
1✔
319
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
320

321
      Scope workflowTypeScope =
1✔
322
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
323

324
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
325
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
326
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
327

328
      boolean locked = false;
1✔
329

330
      Stopwatch swTotal =
1✔
331
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
332
      SlotReleaseReason releaseReason = SlotReleaseReason.taskComplete();
1✔
333
      try {
334
        if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
335
          // Serialize workflow task processing for a particular workflow run.
336
          // This is used to make sure that query tasks and real workflow tasks
337
          // are serialized when sticky is on.
338
          //
339
          // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
340
          // id waiting for a lock and consuming threads in case if lock is unavailable.
341
          //
342
          // Throws interrupted exception which is propagated. It's a correct way to handle it here.
343
          //
344
          // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
345
          //   This value should be dynamically configured.
346
          // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
347
          //   any sense?
348
          //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
349
          //   progress on the worker and the next workflow task got picked up by the same exact
350
          //   worker from the general non-sticky task queue.
351
          //   Even in this case, this advice looks misleading, something else is going on
352
          //   (like an extreme network latency).
353
          locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
354

355
          if (!locked) {
1✔
356
            throw new UnableToAcquireLockException(
1✔
357
                "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
358
                    + "consider increasing workflow task timeout.");
359
          }
360
        }
361

362
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
363
        do {
364
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
365
          nextWFTResponse = Optional.empty();
1✔
366
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
367
          try {
368
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
369
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
370
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
371

372
            if (taskCompleted != null) {
1✔
373
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
374
                  taskCompleted.toBuilder();
1✔
375
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
376
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
377
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
378
                RespondWorkflowTaskCompletedResponse response =
1✔
379
                    sendTaskCompleted(
1✔
380
                        currentTask.getTaskToken(),
1✔
381
                        requestBuilder,
382
                        result.getRequestRetryOptions(),
1✔
383
                        workflowTypeScope);
384
                // If we were processing a speculative WFT the server may instruct us that the task
385
                // was dropped by resting out event ID.
386
                long resetEventId = response.getResetHistoryEventId();
1✔
387
                if (resetEventId != 0) {
1✔
388
                  result.getResetEventIdHandle().apply(resetEventId);
1✔
389
                }
390
                nextWFTResponse =
391
                    response.hasWorkflowTask()
1✔
392
                        ? Optional.of(response.getWorkflowTask())
1✔
393
                        : Optional.empty();
1✔
394
                // TODO we don't have to do this under the runId lock
395
                activitySlotsReservation.handleResponse(response);
1✔
396
              }
397
            } else if (taskFailed != null) {
1✔
398
              sendTaskFailed(
1✔
399
                  currentTask.getTaskToken(),
1✔
400
                  taskFailed.toBuilder(),
1✔
401
                  result.getRequestRetryOptions(),
1✔
402
                  workflowTypeScope);
403
            } else if (queryCompleted != null) {
1✔
404
              sendDirectQueryCompletedResponse(
1✔
405
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
406
            }
407
          } catch (Exception e) {
1✔
408
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
409
            releaseReason = SlotReleaseReason.error(e);
1✔
410
            // if we failed to report the workflow task completion back to the server,
411
            // our cached version of the workflow may be more advanced than the server is aware of.
412
            // We should discard this execution and perform a clean replay based on what server
413
            // knows next time.
414
            cache.invalidate(
1✔
415
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
416
            throw e;
1✔
417
          }
1✔
418

419
          if (result.getTaskFailed() != null) {
1✔
420
            Scope workflowTaskFailureScope = workflowTypeScope;
1✔
421
            if (result
1✔
422
                .getTaskFailed()
1✔
423
                .getCause()
1✔
424
                .equals(
1✔
425
                    WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
426
              workflowTaskFailureScope =
1✔
427
                  workflowTaskFailureScope.tagged(
1✔
428
                      ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
1✔
429
            } else {
430
              workflowTaskFailureScope =
1✔
431
                  workflowTaskFailureScope.tagged(
1✔
432
                      ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
1✔
433
            }
434
            // we don't trigger the counter in case of the legacy query
435
            // (which never has taskFailed set)
436
            workflowTaskFailureScope
1✔
437
                .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
438
                .inc(1);
1✔
439
          }
440
          if (nextWFTResponse.isPresent()) {
1✔
441
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
1✔
442
          }
443
        } while (nextWFTResponse.isPresent());
1✔
444
      } finally {
445
        swTotal.stop();
1✔
446
        task.getCompletionCallback().apply(releaseReason);
1✔
447
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
448
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
449
        MDC.remove(LoggerTag.RUN_ID);
1✔
450

451
        if (locked) {
1✔
452
          runLocks.unlock(runId);
1✔
453
        }
454
      }
455
    }
1✔
456

457
    @Override
458
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
459
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
460
      return new RuntimeException(
1✔
461
          "Failure processing workflow task. WorkflowId="
462
              + execution.getWorkflowId()
1✔
463
              + ", RunId="
464
              + execution.getRunId()
1✔
465
              + ", Attempt="
466
              + task.getResponse().getAttempt(),
1✔
467
          failure);
468
    }
469

470
    private WorkflowTaskHandler.Result handleTask(
471
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
472
      Stopwatch sw =
1✔
473
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
474
      try {
475
        return handler.handleWorkflowTask(task);
1✔
476
      } catch (Throwable e) {
1✔
477
        // more detailed logging that we can do here is already done inside `handler`
478
        workflowTypeMetricsScope
1✔
479
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
480
            .inc(1);
1✔
481
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
482
        throw e;
1✔
483
      } finally {
484
        sw.stop();
1✔
485
      }
486
    }
487

488
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
489
        ByteString taskToken,
490
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
491
        RpcRetryOptions retryOptions,
492
        Scope workflowTypeMetricsScope) {
493
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
494
          new GrpcRetryer.GrpcRetryerOptions(
495
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
496

497
      taskCompleted
1✔
498
          .setIdentity(options.getIdentity())
1✔
499
          .setNamespace(namespace)
1✔
500
          .setTaskToken(taskToken);
1✔
501
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
502
        taskCompleted.setWorkerVersionStamp(options.workerVersionStamp());
×
503
      } else {
504
        taskCompleted.setBinaryChecksum(options.getBuildId());
1✔
505
      }
506

507
      return grpcRetryer.retryWithResult(
1✔
508
          () ->
509
              service
1✔
510
                  .blockingStub()
1✔
511
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
512
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
513
          grpcRetryOptions);
514
    }
515

516
    private void sendTaskFailed(
517
        ByteString taskToken,
518
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
519
        RpcRetryOptions retryOptions,
520
        Scope workflowTypeMetricsScope) {
521
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
522
          new GrpcRetryer.GrpcRetryerOptions(
523
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
524

525
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
526

527
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
528
        taskFailed.setWorkerVersion(options.workerVersionStamp());
×
529
      }
530

531
      grpcRetryer.retry(
1✔
532
          () ->
533
              service
1✔
534
                  .blockingStub()
1✔
535
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
536
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
537
          grpcRetryOptions);
538
    }
1✔
539

540
    private void sendDirectQueryCompletedResponse(
541
        ByteString taskToken,
542
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
543
        Scope workflowTypeMetricsScope) {
544
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
545
      // Do not retry query response
546
      service
1✔
547
          .blockingStub()
1✔
548
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
549
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
550
    }
1✔
551

552
    private void logExceptionDuringResultReporting(
553
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
554
      if (log.isDebugEnabled()) {
1✔
555
        log.debug(
×
556
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
557
            currentTask.getWorkflowExecution().getWorkflowId(),
×
558
            currentTask.getWorkflowExecution().getRunId(),
×
559
            currentTask.getStartedEventId(),
×
560
            result,
561
            e);
562
      } else {
563
        log.warn(
1✔
564
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
565
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
566
            currentTask.getWorkflowExecution().getRunId(),
1✔
567
            currentTask.getStartedEventId(),
1✔
568
            e);
569
      }
570
    }
1✔
571
  }
572
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc