• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #244

10 Apr 2024 08:19PM UTC coverage: 77.465% (-0.08%) from 77.549%
#244

push

github

web-flow
Slot supplier interface & fixed-size implementation (#2014)

https://github.com/temporalio/proposals/blob/master/all-sdk/autotuning.md

286 of 388 new or added lines in 25 files covered. (73.71%)

3 existing lines in 3 files now uncovered.

19116 of 24677 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.07
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.TASK_FAILURE_TYPE;
25

26
import com.google.common.base.Preconditions;
27
import com.google.common.base.Strings;
28
import com.google.protobuf.ByteString;
29
import com.uber.m3.tally.Scope;
30
import com.uber.m3.tally.Stopwatch;
31
import com.uber.m3.util.ImmutableMap;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.enums.v1.TaskQueueKind;
34
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
35
import io.temporal.api.workflowservice.v1.*;
36
import io.temporal.internal.logging.LoggerTag;
37
import io.temporal.internal.retryer.GrpcRetryer;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.RpcRetryOptions;
40
import io.temporal.serviceclient.WorkflowServiceStubs;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.WorkflowTaskDispatchHandle;
44
import io.temporal.worker.tuning.WorkflowSlotInfo;
45
import java.util.Objects;
46
import java.util.Optional;
47
import java.util.concurrent.CompletableFuture;
48
import java.util.concurrent.RejectedExecutionException;
49
import java.util.concurrent.TimeUnit;
50
import javax.annotation.Nonnull;
51
import javax.annotation.Nullable;
52
import org.slf4j.Logger;
53
import org.slf4j.LoggerFactory;
54
import org.slf4j.MDC;
55

56
final class WorkflowWorker implements SuspendableWorker {
57
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
58

59
  private final WorkflowRunLockManager runLocks;
60

61
  private final WorkflowServiceStubs service;
62
  private final String namespace;
63
  private final String taskQueue;
64
  private final SingleWorkerOptions options;
65
  private final WorkflowExecutorCache cache;
66
  private final WorkflowTaskHandler handler;
67
  private final String stickyTaskQueueName;
68
  private final PollerOptions pollerOptions;
69
  private final Scope workerMetricsScope;
70
  private final GrpcRetryer grpcRetryer;
71
  private final EagerActivityDispatcher eagerActivityDispatcher;
72
  private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
73

74
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
75

76
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
77
  //  Currently the implementation looks safe without volatile, but it's brittle.
78
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
79

80
  private StickyQueueBalancer stickyQueueBalancer;
81

82
  public WorkflowWorker(
83
      @Nonnull WorkflowServiceStubs service,
84
      @Nonnull String namespace,
85
      @Nonnull String taskQueue,
86
      @Nullable String stickyTaskQueueName,
87
      @Nonnull SingleWorkerOptions options,
88
      @Nonnull WorkflowRunLockManager runLocks,
89
      @Nonnull WorkflowExecutorCache cache,
90
      @Nonnull WorkflowTaskHandler handler,
91
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher,
92
      @Nonnull TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier) {
1✔
93
    this.service = Objects.requireNonNull(service);
1✔
94
    this.namespace = Objects.requireNonNull(namespace);
1✔
95
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
96
    this.options = Objects.requireNonNull(options);
1✔
97
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
98
    this.pollerOptions = getPollerOptions(options);
1✔
99
    this.workerMetricsScope =
1✔
100
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
101
    slotSupplier.setMetricsScope(workerMetricsScope);
1✔
102
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
103
    this.cache = Objects.requireNonNull(cache);
1✔
104
    this.handler = Objects.requireNonNull(handler);
1✔
105
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
106
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
107
    this.slotSupplier = slotSupplier;
1✔
108
  }
1✔
109

110
  @Override
111
  public boolean start() {
112
    if (handler.isAnyTypeSupported()) {
1✔
113
      pollTaskExecutor =
1✔
114
          new PollTaskExecutor<>(
115
              namespace,
116
              taskQueue,
117
              options.getIdentity(),
1✔
118
              new TaskHandlerImpl(handler),
119
              pollerOptions,
120
              this.slotSupplier.maximumSlots(),
1✔
121
              true);
122
      stickyQueueBalancer =
1✔
123
          new StickyQueueBalancer(
124
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
125

126
      poller =
1✔
127
          new Poller<>(
128
              options.getIdentity(),
1✔
129
              new WorkflowPollTask(
130
                  service,
131
                  namespace,
132
                  taskQueue,
133
                  stickyTaskQueueName,
134
                  options.getIdentity(),
1✔
135
                  options.getBuildId(),
1✔
136
                  options.isUsingBuildIdForVersioning(),
1✔
137
                  slotSupplier,
138
                  stickyQueueBalancer,
139
                  workerMetricsScope,
140
                  service.getServerCapabilities()),
1✔
141
              pollTaskExecutor,
142
              pollerOptions,
143
              workerMetricsScope);
144
      poller.start();
1✔
145

146
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
147

148
      return true;
1✔
149
    } else {
150
      return false;
1✔
151
    }
152
  }
153

154
  @Override
155
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
156
    String supplierName = this + "#executorSlots";
1✔
157

158
    boolean stickyQueueBalancerDrainEnabled =
1✔
159
        !interruptTasks
160
            && !options.getDrainStickyTaskQueueTimeout().isZero()
1✔
161
            && stickyTaskQueueName != null
162
            && stickyQueueBalancer != null;
163

164
    return CompletableFuture.completedFuture(null)
1✔
165
        .thenCompose(
1✔
166
            ignore ->
167
                stickyQueueBalancerDrainEnabled
1✔
168
                    ? shutdownManager.waitForStickyQueueBalancer(
1✔
169
                        stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
1✔
170
                    : CompletableFuture.completedFuture(null))
1✔
171
        .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
1✔
172
        .thenCompose(
1✔
173
            ignore ->
174
                !interruptTasks
1✔
175
                    ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
1✔
176
                        slotSupplier, supplierName)
177
                    : CompletableFuture.completedFuture(null))
1✔
178
        .thenCompose(
1✔
179
            ignore ->
180
                pollTaskExecutor != null
1✔
181
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
182
                    : CompletableFuture.completedFuture(null))
1✔
183
        .exceptionally(
1✔
184
            e -> {
185
              log.error("Unexpected exception during shutdown", e);
×
186
              return null;
×
187
            });
188
  }
189

190
  @Override
191
  public void awaitTermination(long timeout, TimeUnit unit) {
192
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
193
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
194
    // wait separately for intermediate steps
195
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
196
  }
1✔
197

198
  @Override
199
  public void suspendPolling() {
200
    poller.suspendPolling();
1✔
201
  }
1✔
202

203
  @Override
204
  public void resumePolling() {
205
    poller.resumePolling();
1✔
206
  }
1✔
207

208
  @Override
209
  public boolean isSuspended() {
210
    return poller.isSuspended();
1✔
211
  }
212

213
  @Override
214
  public boolean isShutdown() {
215
    return poller.isShutdown();
×
216
  }
217

218
  @Override
219
  public boolean isTerminated() {
220
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
221
  }
222

223
  @Override
224
  public WorkerLifecycleState getLifecycleState() {
225
    return poller.getLifecycleState();
×
226
  }
227

228
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
229
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
230
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
231
      pollerOptions =
1✔
232
          PollerOptions.newBuilder(pollerOptions)
1✔
233
              .setPollThreadNamePrefix(
1✔
234
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
1✔
235
              .build();
1✔
236
    }
237
    return pollerOptions;
1✔
238
  }
239

240
  @Nullable
241
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
242
    // to avoid pollTaskExecutor becoming null inside the lambda, we cache it here
243
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
244
    if (executor == null || isSuspended()) {
1✔
245
      return null;
1✔
246
    }
247
    return slotSupplier
1✔
248
        .tryReserveSlot(
1✔
249
            new SlotReservationData(taskQueue, options.getIdentity(), options.getBuildId()))
1✔
250
        .map(
1✔
251
            slotPermit ->
252
                new WorkflowTaskDispatchHandle(
1✔
253
                    workflowTask -> {
254
                      String queueName =
1✔
255
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
256
                      TaskQueueKind queueKind =
1✔
257
                          workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
258
                      Preconditions.checkArgument(
1✔
259
                          this.taskQueue.equals(queueName)
1✔
NEW
260
                              || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
261
                                  && this.stickyTaskQueueName.equals(queueName),
1✔
262
                          "Got a WFT for a wrong queue %s, expected %s or %s",
263
                          queueName,
264
                          this.taskQueue,
265
                          this.stickyTaskQueueName);
266
                      try {
267
                        pollTaskExecutor.process(workflowTask);
1✔
268
                        return true;
1✔
NEW
269
                      } catch (RejectedExecutionException e) {
×
NEW
270
                        return false;
×
271
                      }
272
                    },
273
                    slotSupplier,
274
                    slotPermit))
275
        .orElse(null);
1✔
276
  }
277

278
  @Override
279
  public String toString() {
280
    return String.format(
1✔
281
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
282
        options.getIdentity(), namespace, taskQueue);
1✔
283
  }
284

285
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
286

287
    final WorkflowTaskHandler handler;
288

289
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
290
      this.handler = handler;
1✔
291
    }
1✔
292

293
    @Override
294
    public void handle(WorkflowTask task) throws Exception {
295
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
296
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
297
      String runId = workflowExecution.getRunId();
1✔
298
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
299

300
      Scope workflowTypeScope =
1✔
301
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
302

303
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
304
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
305
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
306

307
      boolean locked = false;
1✔
308

309
      Stopwatch swTotal =
1✔
310
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
311
      try {
312
        if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
313
          // Serialize workflow task processing for a particular workflow run.
314
          // This is used to make sure that query tasks and real workflow tasks
315
          // are serialized when sticky is on.
316
          //
317
          // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
318
          // id waiting for a lock and consuming threads in case if lock is unavailable.
319
          //
320
          // Throws interrupted exception which is propagated. It's a correct way to handle it here.
321
          //
322
          // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
323
          //   This value should be dynamically configured.
324
          // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
325
          //   any sense?
326
          //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
327
          //   progress on the worker and the next workflow task got picked up by the same exact
328
          //   worker from the general non-sticky task queue.
329
          //   Even in this case, this advice looks misleading, something else is going on
330
          //   (like an extreme network latency).
331
          locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
332

333
          if (!locked) {
1✔
334
            throw new UnableToAcquireLockException(
1✔
335
                "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
336
                    + "consider increasing workflow task timeout.");
337
          }
338
        }
339

340
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
341
        do {
342
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
343
          nextWFTResponse = Optional.empty();
1✔
344
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
345
          try {
346
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
347
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
348
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
349

350
            if (taskCompleted != null) {
1✔
351
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
352
                  taskCompleted.toBuilder();
1✔
353
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
354
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
355
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
356
                RespondWorkflowTaskCompletedResponse response =
1✔
357
                    sendTaskCompleted(
1✔
358
                        currentTask.getTaskToken(),
1✔
359
                        requestBuilder,
360
                        result.getRequestRetryOptions(),
1✔
361
                        workflowTypeScope);
362
                // If we were processing a speculative WFT the server may instruct us that the task
363
                // was dropped by resting out event ID.
364
                long resetEventId = response.getResetHistoryEventId();
1✔
365
                if (resetEventId != 0) {
1✔
366
                  result.getEventIdSetHandle().apply(resetEventId);
×
367
                }
368
                nextWFTResponse =
369
                    response.hasWorkflowTask()
1✔
370
                        ? Optional.of(response.getWorkflowTask())
1✔
371
                        : Optional.empty();
1✔
372
                // TODO we don't have to do this under the runId lock
373
                activitySlotsReservation.handleResponse(response);
1✔
374
              }
375
            } else if (taskFailed != null) {
1✔
376
              sendTaskFailed(
1✔
377
                  currentTask.getTaskToken(),
1✔
378
                  taskFailed.toBuilder(),
1✔
379
                  result.getRequestRetryOptions(),
1✔
380
                  workflowTypeScope);
381
            } else if (queryCompleted != null) {
1✔
382
              sendDirectQueryCompletedResponse(
1✔
383
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
384
            }
385
          } catch (Exception e) {
1✔
386
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
387
            // if we failed to report the workflow task completion back to the server,
388
            // our cached version of the workflow may be more advanced than the server is aware of.
389
            // We should discard this execution and perform a clean replay based on what server
390
            // knows next time.
391
            cache.invalidate(
1✔
392
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
393
            throw e;
1✔
394
          }
1✔
395

396
          if (result.getTaskFailed() != null) {
1✔
397
            Scope workflowTaskFailureScope = workflowTypeScope;
1✔
398
            if (result
1✔
399
                .getTaskFailed()
1✔
400
                .getCause()
1✔
401
                .equals(
1✔
402
                    WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
403
              workflowTaskFailureScope =
1✔
404
                  workflowTaskFailureScope.tagged(
1✔
405
                      ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
1✔
406
            } else {
407
              workflowTaskFailureScope =
1✔
408
                  workflowTaskFailureScope.tagged(
1✔
409
                      ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
1✔
410
            }
411
            // we don't trigger the counter in case of the legacy query
412
            // (which never has taskFailed set)
413
            workflowTaskFailureScope
1✔
414
                .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
415
                .inc(1);
1✔
416
          }
417
          if (nextWFTResponse.isPresent()) {
1✔
418
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
1✔
419
          }
420
        } while (nextWFTResponse.isPresent());
1✔
421
      } finally {
422
        swTotal.stop();
1✔
423
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
424
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
425
        MDC.remove(LoggerTag.RUN_ID);
1✔
426

427
        task.getCompletionCallback().apply();
1✔
428

429
        if (locked) {
1✔
430
          runLocks.unlock(runId);
1✔
431
        }
432
      }
433
    }
1✔
434

435
    @Override
436
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
437
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
438
      return new RuntimeException(
1✔
439
          "Failure processing workflow task. WorkflowId="
440
              + execution.getWorkflowId()
1✔
441
              + ", RunId="
442
              + execution.getRunId()
1✔
443
              + ", Attempt="
444
              + task.getResponse().getAttempt(),
1✔
445
          failure);
446
    }
447

448
    private WorkflowTaskHandler.Result handleTask(
449
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
450
      Stopwatch sw =
1✔
451
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
452
      try {
453
        return handler.handleWorkflowTask(task);
1✔
454
      } catch (Throwable e) {
1✔
455
        // more detailed logging that we can do here is already done inside `handler`
456
        workflowTypeMetricsScope
1✔
457
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
458
            .inc(1);
1✔
459
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
460
        throw e;
1✔
461
      } finally {
462
        sw.stop();
1✔
463
      }
464
    }
465

466
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
467
        ByteString taskToken,
468
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
469
        RpcRetryOptions retryOptions,
470
        Scope workflowTypeMetricsScope) {
471
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
472
          new GrpcRetryer.GrpcRetryerOptions(
473
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
474

475
      taskCompleted
1✔
476
          .setIdentity(options.getIdentity())
1✔
477
          .setNamespace(namespace)
1✔
478
          .setTaskToken(taskToken);
1✔
479
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
480
        taskCompleted.setWorkerVersionStamp(options.workerVersionStamp());
×
481
      } else {
482
        taskCompleted.setBinaryChecksum(options.getBuildId());
1✔
483
      }
484

485
      return grpcRetryer.retryWithResult(
1✔
486
          () ->
487
              service
1✔
488
                  .blockingStub()
1✔
489
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
490
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
491
          grpcRetryOptions);
492
    }
493

494
    private void sendTaskFailed(
495
        ByteString taskToken,
496
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
497
        RpcRetryOptions retryOptions,
498
        Scope workflowTypeMetricsScope) {
499
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
500
          new GrpcRetryer.GrpcRetryerOptions(
501
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
502

503
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
504

505
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
506
        taskFailed.setWorkerVersion(options.workerVersionStamp());
×
507
      }
508

509
      grpcRetryer.retry(
1✔
510
          () ->
511
              service
1✔
512
                  .blockingStub()
1✔
513
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
514
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
515
          grpcRetryOptions);
516
    }
1✔
517

518
    private void sendDirectQueryCompletedResponse(
519
        ByteString taskToken,
520
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
521
        Scope workflowTypeMetricsScope) {
522
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
523
      // Do not retry query response
524
      service
1✔
525
          .blockingStub()
1✔
526
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
527
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
528
    }
1✔
529

530
    private void logExceptionDuringResultReporting(
531
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
532
      if (log.isDebugEnabled()) {
1✔
533
        log.debug(
×
534
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
535
            currentTask.getWorkflowExecution().getWorkflowId(),
×
536
            currentTask.getWorkflowExecution().getRunId(),
×
537
            currentTask.getStartedEventId(),
×
538
            result,
539
            e);
540
      } else {
541
        log.warn(
1✔
542
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
543
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
544
            currentTask.getWorkflowExecution().getRunId(),
1✔
545
            currentTask.getStartedEventId(),
1✔
546
            e);
547
      }
548
    }
1✔
549
  }
550
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc