• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.51
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.TASK_FAILURE_TYPE;
25

26
import com.google.common.base.Preconditions;
27
import com.google.common.base.Strings;
28
import com.google.protobuf.ByteString;
29
import com.uber.m3.tally.Scope;
30
import com.uber.m3.tally.Stopwatch;
31
import com.uber.m3.util.ImmutableMap;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.enums.v1.TaskQueueKind;
34
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
35
import io.temporal.api.workflowservice.v1.*;
36
import io.temporal.internal.logging.LoggerTag;
37
import io.temporal.internal.retryer.GrpcRetryer;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.RpcRetryOptions;
40
import io.temporal.serviceclient.WorkflowServiceStubs;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.WorkflowTaskDispatchHandle;
44
import io.temporal.worker.tuning.SlotReleaseReason;
45
import io.temporal.worker.tuning.SlotSupplier;
46
import io.temporal.worker.tuning.WorkflowSlotInfo;
47
import java.util.Objects;
48
import java.util.Optional;
49
import java.util.concurrent.CompletableFuture;
50
import java.util.concurrent.RejectedExecutionException;
51
import java.util.concurrent.TimeUnit;
52
import javax.annotation.Nonnull;
53
import javax.annotation.Nullable;
54
import org.slf4j.Logger;
55
import org.slf4j.LoggerFactory;
56
import org.slf4j.MDC;
57

58
final class WorkflowWorker implements SuspendableWorker {
59
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
60

61
  private final WorkflowRunLockManager runLocks;
62

63
  private final WorkflowServiceStubs service;
64
  private final String namespace;
65
  private final String taskQueue;
66
  private final SingleWorkerOptions options;
67
  private final WorkflowExecutorCache cache;
68
  private final WorkflowTaskHandler handler;
69
  private final String stickyTaskQueueName;
70
  private final PollerOptions pollerOptions;
71
  private final Scope workerMetricsScope;
72
  private final GrpcRetryer grpcRetryer;
73
  private final EagerActivityDispatcher eagerActivityDispatcher;
74
  private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
75

76
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
77

78
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
79
  //  Currently the implementation looks safe without volatile, but it's brittle.
80
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
81

82
  private StickyQueueBalancer stickyQueueBalancer;
83

84
  public WorkflowWorker(
85
      @Nonnull WorkflowServiceStubs service,
86
      @Nonnull String namespace,
87
      @Nonnull String taskQueue,
88
      @Nullable String stickyTaskQueueName,
89
      @Nonnull SingleWorkerOptions options,
90
      @Nonnull WorkflowRunLockManager runLocks,
91
      @Nonnull WorkflowExecutorCache cache,
92
      @Nonnull WorkflowTaskHandler handler,
93
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher,
94
      @Nonnull SlotSupplier<WorkflowSlotInfo> slotSupplier) {
1✔
95
    this.service = Objects.requireNonNull(service);
1✔
96
    this.namespace = Objects.requireNonNull(namespace);
1✔
97
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
98
    this.options = Objects.requireNonNull(options);
1✔
99
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
100
    this.pollerOptions = getPollerOptions(options);
1✔
101
    this.workerMetricsScope =
1✔
102
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
103
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
104
    this.cache = Objects.requireNonNull(cache);
1✔
105
    this.handler = Objects.requireNonNull(handler);
1✔
106
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
107
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
108
    this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
1✔
109
  }
1✔
110

111
  @Override
112
  public boolean start() {
113
    if (handler.isAnyTypeSupported()) {
1✔
114
      pollTaskExecutor =
1✔
115
          new PollTaskExecutor<>(
116
              namespace,
117
              taskQueue,
118
              options.getIdentity(),
1✔
119
              new TaskHandlerImpl(handler),
120
              pollerOptions,
121
              this.slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
1✔
122
              true);
123
      stickyQueueBalancer =
1✔
124
          new StickyQueueBalancer(
125
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
126

127
      poller =
1✔
128
          new Poller<>(
129
              options.getIdentity(),
1✔
130
              new WorkflowPollTask(
131
                  service,
132
                  namespace,
133
                  taskQueue,
134
                  stickyTaskQueueName,
135
                  options.getIdentity(),
1✔
136
                  options.getBuildId(),
1✔
137
                  options.isUsingBuildIdForVersioning(),
1✔
138
                  slotSupplier,
139
                  stickyQueueBalancer,
140
                  workerMetricsScope,
141
                  service.getServerCapabilities()),
1✔
142
              pollTaskExecutor,
143
              pollerOptions,
144
              workerMetricsScope);
145
      poller.start();
1✔
146

147
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
148

149
      return true;
1✔
150
    } else {
151
      return false;
1✔
152
    }
153
  }
154

155
  @Override
156
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
157
    String supplierName = this + "#executorSlots";
1✔
158

159
    boolean stickyQueueBalancerDrainEnabled =
1✔
160
        !interruptTasks
161
            && !options.getDrainStickyTaskQueueTimeout().isZero()
1✔
162
            && stickyTaskQueueName != null
163
            && stickyQueueBalancer != null;
164

165
    return CompletableFuture.completedFuture(null)
1✔
166
        .thenCompose(
1✔
167
            ignore ->
168
                stickyQueueBalancerDrainEnabled
1✔
169
                    ? shutdownManager.waitForStickyQueueBalancer(
1✔
170
                        stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
1✔
171
                    : CompletableFuture.completedFuture(null))
1✔
172
        .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
1✔
173
        .thenCompose(
1✔
174
            ignore ->
175
                !interruptTasks
1✔
176
                    ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
1✔
177
                        slotSupplier, supplierName)
178
                    : CompletableFuture.completedFuture(null))
1✔
179
        .thenCompose(
1✔
180
            ignore ->
181
                pollTaskExecutor != null
1✔
182
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
183
                    : CompletableFuture.completedFuture(null))
1✔
184
        .exceptionally(
1✔
185
            e -> {
186
              log.error("Unexpected exception during shutdown", e);
×
187
              return null;
×
188
            });
189
  }
190

191
  @Override
192
  public void awaitTermination(long timeout, TimeUnit unit) {
193
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
194
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
195
    // wait separately for intermediate steps
196
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
197
  }
1✔
198

199
  @Override
200
  public void suspendPolling() {
201
    poller.suspendPolling();
1✔
202
  }
1✔
203

204
  @Override
205
  public void resumePolling() {
206
    poller.resumePolling();
1✔
207
  }
1✔
208

209
  @Override
210
  public boolean isSuspended() {
211
    return poller.isSuspended();
1✔
212
  }
213

214
  @Override
215
  public boolean isShutdown() {
216
    return poller.isShutdown();
×
217
  }
218

219
  @Override
220
  public boolean isTerminated() {
221
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
222
  }
223

224
  @Override
225
  public WorkerLifecycleState getLifecycleState() {
226
    return poller.getLifecycleState();
×
227
  }
228

229
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
230
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
231
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
232
      pollerOptions =
1✔
233
          PollerOptions.newBuilder(pollerOptions)
1✔
234
              .setPollThreadNamePrefix(
1✔
235
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
1✔
236
              .build();
1✔
237
    }
238
    return pollerOptions;
1✔
239
  }
240

241
  @Nullable
242
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
243
    // to avoid pollTaskExecutor becoming null inside the lambda, we cache it here
244
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
245
    if (executor == null || isSuspended()) {
1✔
246
      return null;
1✔
247
    }
248
    return slotSupplier
1✔
249
        .tryReserveSlot(
1✔
250
            new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId()))
1✔
251
        .map(
1✔
252
            slotPermit ->
253
                new WorkflowTaskDispatchHandle(
1✔
254
                    workflowTask -> {
255
                      String queueName =
1✔
256
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
257
                      TaskQueueKind queueKind =
1✔
258
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
259
                      Preconditions.checkArgument(
1✔
260
                          this.taskQueue.equals(queueName)
1✔
NEW
261
                              || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
262
                                  && this.stickyTaskQueueName.equals(queueName),
1✔
263
                          "Got a WFT for a wrong queue %s, expected %s or %s",
264
                          queueName,
265
                          this.taskQueue,
266
                          this.stickyTaskQueueName);
267
                      try {
268
                        pollTaskExecutor.process(workflowTask);
1✔
269
                        return true;
1✔
NEW
270
                      } catch (RejectedExecutionException e) {
×
NEW
271
                        return false;
×
272
                      }
273
                    },
274
                    slotSupplier,
275
                    slotPermit))
276
        .orElse(null);
1✔
277
  }
278

279
  @Override
280
  public String toString() {
281
    return String.format(
1✔
282
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
283
        options.getIdentity(), namespace, taskQueue);
1✔
284
  }
285

286
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
287

288
    final WorkflowTaskHandler handler;
289

290
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
291
      this.handler = handler;
1✔
292
    }
1✔
293

294
    @Override
295
    public void handle(WorkflowTask task) throws Exception {
296
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
297
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
298
      String runId = workflowExecution.getRunId();
1✔
299
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
300

301
      Scope workflowTypeScope =
1✔
302
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
303

304
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
305
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
306
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
307

308
      boolean locked = false;
1✔
309

310
      Stopwatch swTotal =
1✔
311
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
312
      SlotReleaseReason releaseReason = SlotReleaseReason.taskComplete();
1✔
313
      try {
314
        if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
315
          // Serialize workflow task processing for a particular workflow run.
316
          // This is used to make sure that query tasks and real workflow tasks
317
          // are serialized when sticky is on.
318
          //
319
          // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
320
          // id waiting for a lock and consuming threads in case if lock is unavailable.
321
          //
322
          // Throws interrupted exception which is propagated. It's a correct way to handle it here.
323
          //
324
          // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
325
          //   This value should be dynamically configured.
326
          // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
327
          //   any sense?
328
          //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
329
          //   progress on the worker and the next workflow task got picked up by the same exact
330
          //   worker from the general non-sticky task queue.
331
          //   Even in this case, this advice looks misleading, something else is going on
332
          //   (like an extreme network latency).
333
          locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
334

335
          if (!locked) {
1✔
336
            throw new UnableToAcquireLockException(
1✔
337
                "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
338
                    + "consider increasing workflow task timeout.");
339
          }
340
        }
341

342
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
343
        do {
344
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
345
          nextWFTResponse = Optional.empty();
1✔
346
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
347
          try {
348
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
349
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
350
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
351

352
            if (taskCompleted != null) {
1✔
353
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
354
                  taskCompleted.toBuilder();
1✔
355
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
356
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
357
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
358
                RespondWorkflowTaskCompletedResponse response =
1✔
359
                    sendTaskCompleted(
1✔
360
                        currentTask.getTaskToken(),
1✔
361
                        requestBuilder,
362
                        result.getRequestRetryOptions(),
1✔
363
                        workflowTypeScope);
364
                // If we were processing a speculative WFT the server may instruct us that the task
365
                // was dropped by resting out event ID.
366
                long resetEventId = response.getResetHistoryEventId();
1✔
367
                if (resetEventId != 0) {
1✔
368
                  result.getResetEventIdHandle().apply(resetEventId);
1✔
369
                }
370
                nextWFTResponse =
371
                    response.hasWorkflowTask()
1✔
372
                        ? Optional.of(response.getWorkflowTask())
1✔
373
                        : Optional.empty();
1✔
374
                // TODO we don't have to do this under the runId lock
375
                activitySlotsReservation.handleResponse(response);
1✔
376
              }
377
            } else if (taskFailed != null) {
1✔
378
              sendTaskFailed(
1✔
379
                  currentTask.getTaskToken(),
1✔
380
                  taskFailed.toBuilder(),
1✔
381
                  result.getRequestRetryOptions(),
1✔
382
                  workflowTypeScope);
383
            } else if (queryCompleted != null) {
1✔
384
              sendDirectQueryCompletedResponse(
1✔
385
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
386
            }
387
          } catch (Exception e) {
1✔
388
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
389
            releaseReason = SlotReleaseReason.error(e);
1✔
390
            // if we failed to report the workflow task completion back to the server,
391
            // our cached version of the workflow may be more advanced than the server is aware of.
392
            // We should discard this execution and perform a clean replay based on what server
393
            // knows next time.
394
            cache.invalidate(
1✔
395
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
396
            throw e;
1✔
397
          }
1✔
398

399
          if (result.getTaskFailed() != null) {
1✔
400
            Scope workflowTaskFailureScope = workflowTypeScope;
1✔
401
            if (result
1✔
402
                .getTaskFailed()
1✔
403
                .getCause()
1✔
404
                .equals(
1✔
405
                    WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
406
              workflowTaskFailureScope =
1✔
407
                  workflowTaskFailureScope.tagged(
1✔
408
                      ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
1✔
409
            } else {
410
              workflowTaskFailureScope =
1✔
411
                  workflowTaskFailureScope.tagged(
1✔
412
                      ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
1✔
413
            }
414
            // we don't trigger the counter in case of the legacy query
415
            // (which never has taskFailed set)
416
            workflowTaskFailureScope
1✔
417
                .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
418
                .inc(1);
1✔
419
          }
420
          if (nextWFTResponse.isPresent()) {
1✔
421
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
1✔
422
          }
423
        } while (nextWFTResponse.isPresent());
1✔
424
      } finally {
425
        swTotal.stop();
1✔
426
        task.getCompletionCallback().apply(releaseReason);
1✔
427
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
428
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
429
        MDC.remove(LoggerTag.RUN_ID);
1✔
430

431
        if (locked) {
1✔
432
          runLocks.unlock(runId);
1✔
433
        }
434
      }
435
    }
1✔
436

437
    @Override
438
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
439
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
440
      return new RuntimeException(
1✔
441
          "Failure processing workflow task. WorkflowId="
442
              + execution.getWorkflowId()
1✔
443
              + ", RunId="
444
              + execution.getRunId()
1✔
445
              + ", Attempt="
446
              + task.getResponse().getAttempt(),
1✔
447
          failure);
448
    }
449

450
    private WorkflowTaskHandler.Result handleTask(
451
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
452
      Stopwatch sw =
1✔
453
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
454
      try {
455
        return handler.handleWorkflowTask(task);
1✔
456
      } catch (Throwable e) {
1✔
457
        // more detailed logging that we can do here is already done inside `handler`
458
        workflowTypeMetricsScope
1✔
459
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
460
            .inc(1);
1✔
461
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
462
        throw e;
1✔
463
      } finally {
464
        sw.stop();
1✔
465
      }
466
    }
467

468
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
469
        ByteString taskToken,
470
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
471
        RpcRetryOptions retryOptions,
472
        Scope workflowTypeMetricsScope) {
473
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
474
          new GrpcRetryer.GrpcRetryerOptions(
475
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
476

477
      taskCompleted
1✔
478
          .setIdentity(options.getIdentity())
1✔
479
          .setNamespace(namespace)
1✔
480
          .setTaskToken(taskToken);
1✔
481
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
482
        taskCompleted.setWorkerVersionStamp(options.workerVersionStamp());
×
483
      } else {
484
        taskCompleted.setBinaryChecksum(options.getBuildId());
1✔
485
      }
486

487
      return grpcRetryer.retryWithResult(
1✔
488
          () ->
489
              service
1✔
490
                  .blockingStub()
1✔
491
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
492
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
493
          grpcRetryOptions);
494
    }
495

496
    private void sendTaskFailed(
497
        ByteString taskToken,
498
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
499
        RpcRetryOptions retryOptions,
500
        Scope workflowTypeMetricsScope) {
501
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
502
          new GrpcRetryer.GrpcRetryerOptions(
503
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
504

505
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
506

507
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
508
        taskFailed.setWorkerVersion(options.workerVersionStamp());
×
509
      }
510

511
      grpcRetryer.retry(
1✔
512
          () ->
513
              service
1✔
514
                  .blockingStub()
1✔
515
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
516
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
517
          grpcRetryOptions);
518
    }
1✔
519

520
    private void sendDirectQueryCompletedResponse(
521
        ByteString taskToken,
522
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
523
        Scope workflowTypeMetricsScope) {
524
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
525
      // Do not retry query response
526
      service
1✔
527
          .blockingStub()
1✔
528
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
529
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
530
    }
1✔
531

532
    private void logExceptionDuringResultReporting(
533
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
534
      if (log.isDebugEnabled()) {
1✔
535
        log.debug(
×
536
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
537
            currentTask.getWorkflowExecution().getWorkflowId(),
×
538
            currentTask.getWorkflowExecution().getRunId(),
×
539
            currentTask.getStartedEventId(),
×
540
            result,
541
            e);
542
      } else {
543
        log.warn(
1✔
544
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
545
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
546
            currentTask.getWorkflowExecution().getRunId(),
1✔
547
            currentTask.getStartedEventId(),
1✔
548
            e);
549
      }
550
    }
1✔
551
  }
552
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc