• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #278

08 Jul 2024 04:42PM UTC coverage: 77.565% (+0.1%) from 77.469%
#278

push

github

web-flow
Revert configurable slot provider (#2134)

* Revert "Resource based tuner (#2110)"

This reverts commit 8a2d5cdcc.

* Revert "Slot supplier interface & fixed-size implementation (#2014)"

This reverts commit d2a06fc6f.

* Fix merge conflict

* Keep Publish Test Report step

* Add tests for worker slots

* Fix white space

* One other whitespace change

117 of 133 new or added lines in 17 files covered. (87.97%)

5 existing lines in 5 files now uncovered.

19088 of 24609 relevant lines covered (77.57%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.37
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.TASK_FAILURE_TYPE;
25

26
import com.google.common.base.Preconditions;
27
import com.google.common.base.Strings;
28
import com.google.protobuf.ByteString;
29
import com.uber.m3.tally.Scope;
30
import com.uber.m3.tally.Stopwatch;
31
import com.uber.m3.util.ImmutableMap;
32
import io.temporal.api.common.v1.WorkflowExecution;
33
import io.temporal.api.enums.v1.TaskQueueKind;
34
import io.temporal.api.enums.v1.WorkflowTaskFailedCause;
35
import io.temporal.api.workflowservice.v1.*;
36
import io.temporal.internal.logging.LoggerTag;
37
import io.temporal.internal.retryer.GrpcRetryer;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.RpcRetryOptions;
40
import io.temporal.serviceclient.WorkflowServiceStubs;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.WorkflowTaskDispatchHandle;
44
import java.util.Objects;
45
import java.util.Optional;
46
import java.util.concurrent.CompletableFuture;
47
import java.util.concurrent.RejectedExecutionException;
48
import java.util.concurrent.Semaphore;
49
import java.util.concurrent.TimeUnit;
50
import javax.annotation.Nonnull;
51
import javax.annotation.Nullable;
52
import org.slf4j.Logger;
53
import org.slf4j.LoggerFactory;
54
import org.slf4j.MDC;
55

56
final class WorkflowWorker implements SuspendableWorker {
57
  private static final Logger log = LoggerFactory.getLogger(WorkflowWorker.class);
1✔
58

59
  private final WorkflowRunLockManager runLocks;
60

61
  private final WorkflowServiceStubs service;
62
  private final String namespace;
63
  private final String taskQueue;
64
  private final SingleWorkerOptions options;
65
  private final WorkflowExecutorCache cache;
66
  private final WorkflowTaskHandler handler;
67
  private final String stickyTaskQueueName;
68
  private final PollerOptions pollerOptions;
69
  private final Scope workerMetricsScope;
70
  private final GrpcRetryer grpcRetryer;
71
  private final EagerActivityDispatcher eagerActivityDispatcher;
72
  private final int executorSlots;
73
  private final Semaphore executorSlotsSemaphore;
74

75
  private PollTaskExecutor<WorkflowTask> pollTaskExecutor;
76

77
  // TODO this ideally should be volatile or final (and NoopWorker should go away)
78
  //  Currently the implementation looks safe without volatile, but it's brittle.
79
  @Nonnull private SuspendableWorker poller = new NoopWorker();
1✔
80

81
  private StickyQueueBalancer stickyQueueBalancer;
82

83
  public WorkflowWorker(
84
      @Nonnull WorkflowServiceStubs service,
85
      @Nonnull String namespace,
86
      @Nonnull String taskQueue,
87
      @Nullable String stickyTaskQueueName,
88
      @Nonnull SingleWorkerOptions options,
89
      @Nonnull WorkflowRunLockManager runLocks,
90
      @Nonnull WorkflowExecutorCache cache,
91
      @Nonnull WorkflowTaskHandler handler,
92
      @Nonnull EagerActivityDispatcher eagerActivityDispatcher) {
1✔
93
    this.service = Objects.requireNonNull(service);
1✔
94
    this.namespace = Objects.requireNonNull(namespace);
1✔
95
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
96
    this.options = Objects.requireNonNull(options);
1✔
97
    this.stickyTaskQueueName = stickyTaskQueueName;
1✔
98
    this.pollerOptions = getPollerOptions(options);
1✔
99
    this.workerMetricsScope =
1✔
100
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.WORKFLOW_WORKER);
1✔
101
    this.runLocks = Objects.requireNonNull(runLocks);
1✔
102
    this.cache = Objects.requireNonNull(cache);
1✔
103
    this.handler = Objects.requireNonNull(handler);
1✔
104
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
105
    this.eagerActivityDispatcher = eagerActivityDispatcher;
1✔
106
    this.executorSlots = options.getTaskExecutorThreadPoolSize();
1✔
107
    this.executorSlotsSemaphore = new Semaphore(executorSlots);
1✔
108
  }
1✔
109

110
  @Override
111
  public boolean start() {
112
    if (handler.isAnyTypeSupported()) {
1✔
113
      pollTaskExecutor =
1✔
114
          new PollTaskExecutor<>(
115
              namespace,
116
              taskQueue,
117
              options.getIdentity(),
1✔
118
              new TaskHandlerImpl(handler),
119
              pollerOptions,
120
              options.getTaskExecutorThreadPoolSize(),
1✔
121
              workerMetricsScope,
122
              true);
123
      stickyQueueBalancer =
1✔
124
          new StickyQueueBalancer(
125
              options.getPollerOptions().getPollThreadCount(), stickyTaskQueueName != null);
1✔
126

127
      poller =
1✔
128
          new Poller<>(
129
              options.getIdentity(),
1✔
130
              new WorkflowPollTask(
131
                  service,
132
                  namespace,
133
                  taskQueue,
134
                  stickyTaskQueueName,
135
                  options.getIdentity(),
1✔
136
                  options.getBuildId(),
1✔
137
                  options.isUsingBuildIdForVersioning(),
1✔
138
                  executorSlotsSemaphore,
139
                  stickyQueueBalancer,
140
                  workerMetricsScope,
141
                  service.getServerCapabilities()),
1✔
142
              pollTaskExecutor,
143
              pollerOptions,
144
              workerMetricsScope);
145
      poller.start();
1✔
146

147
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
148

149
      return true;
1✔
150
    } else {
151
      return false;
1✔
152
    }
153
  }
154

155
  @Override
156
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
157
    String semaphoreName = this + "#executorSlotsSemaphore";
1✔
158

159
    boolean stickyQueueBalancerDrainEnabled =
1✔
160
        !interruptTasks
161
            && !options.getDrainStickyTaskQueueTimeout().isZero()
1✔
162
            && stickyTaskQueueName != null
163
            && stickyQueueBalancer != null;
164

165
    return CompletableFuture.completedFuture(null)
1✔
166
        .thenCompose(
1✔
167
            ignore ->
168
                stickyQueueBalancerDrainEnabled
1✔
169
                    ? shutdownManager.waitForStickyQueueBalancer(
1✔
170
                        stickyQueueBalancer, options.getDrainStickyTaskQueueTimeout())
1✔
171
                    : CompletableFuture.completedFuture(null))
1✔
172
        .thenCompose(ignore -> poller.shutdown(shutdownManager, interruptTasks))
1✔
173
        .thenCompose(
1✔
174
            ignore ->
175
                !interruptTasks
1✔
176
                    ? shutdownManager.waitForSemaphorePermitsReleaseUntimed(
1✔
177
                        executorSlotsSemaphore, executorSlots, semaphoreName)
178
                    : CompletableFuture.completedFuture(null))
1✔
179
        .thenCompose(
1✔
180
            ignore ->
181
                pollTaskExecutor != null
1✔
182
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
183
                    : CompletableFuture.completedFuture(null))
1✔
184
        .exceptionally(
1✔
185
            e -> {
186
              log.error("Unexpected exception during shutdown", e);
×
187
              return null;
×
188
            });
189
  }
190

191
  @Override
192
  public void awaitTermination(long timeout, TimeUnit unit) {
193
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
194
    // relies on the fact that the pollTaskExecutor is the last one to be shutdown, no need to
195
    // wait separately for intermediate steps
196
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
197
  }
1✔
198

199
  @Override
200
  public void suspendPolling() {
201
    poller.suspendPolling();
1✔
202
  }
1✔
203

204
  @Override
205
  public void resumePolling() {
206
    poller.resumePolling();
1✔
207
  }
1✔
208

209
  @Override
210
  public boolean isSuspended() {
211
    return poller.isSuspended();
1✔
212
  }
213

214
  @Override
215
  public boolean isShutdown() {
216
    return poller.isShutdown();
×
217
  }
218

219
  @Override
220
  public boolean isTerminated() {
221
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
222
  }
223

224
  @Override
225
  public WorkerLifecycleState getLifecycleState() {
226
    return poller.getLifecycleState();
×
227
  }
228

229
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
230
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
231
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
232
      pollerOptions =
1✔
233
          PollerOptions.newBuilder(pollerOptions)
1✔
234
              .setPollThreadNamePrefix(
1✔
235
                  WorkerThreadsNameHelper.getWorkflowPollerThreadPrefix(namespace, taskQueue))
1✔
236
              .build();
1✔
237
    }
238
    return pollerOptions;
1✔
239
  }
240

241
  @Nullable
242
  public WorkflowTaskDispatchHandle reserveWorkflowExecutor() {
243
    // to avoid pollTaskExecutor to become null inside the lambda, we are caching it here
244
    final PollTaskExecutor<WorkflowTask> executor = pollTaskExecutor;
1✔
245
    return executor != null && !isSuspended() && executorSlotsSemaphore.tryAcquire()
1✔
246
        ? new WorkflowTaskDispatchHandle(
1✔
247
            workflowTask -> {
248
              String queueName =
1✔
249
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getName();
1✔
250
              TaskQueueKind queueKind =
1✔
251
                  workflowTask.getResponse().getWorkflowExecutionTaskQueue().getKind();
1✔
252
              Preconditions.checkArgument(
1✔
253
                  this.taskQueue.equals(queueName)
1✔
NEW
254
                      || TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(queueKind)
×
255
                          && this.stickyTaskQueueName.equals(queueName),
1✔
256
                  "Got a WFT for a wrong queue %s, expected %s or %s",
257
                  queueName,
258
                  this.taskQueue,
259
                  this.stickyTaskQueueName);
260
              try {
261
                pollTaskExecutor.process(workflowTask);
1✔
262
                return true;
1✔
NEW
263
              } catch (RejectedExecutionException e) {
×
NEW
264
                return false;
×
265
              }
266
            },
267
            executorSlotsSemaphore)
268
        : null;
1✔
269
  }
270

271
  @Override
272
  public String toString() {
273
    return String.format(
1✔
274
        "WorkflowWorker{identity=%s, namespace=%s, taskQueue=%s}",
275
        options.getIdentity(), namespace, taskQueue);
1✔
276
  }
277

278
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<WorkflowTask> {
279

280
    final WorkflowTaskHandler handler;
281

282
    private TaskHandlerImpl(WorkflowTaskHandler handler) {
1✔
283
      this.handler = handler;
1✔
284
    }
1✔
285

286
    @Override
287
    public void handle(WorkflowTask task) throws Exception {
288
      PollWorkflowTaskQueueResponse workflowTaskResponse = task.getResponse();
1✔
289
      WorkflowExecution workflowExecution = workflowTaskResponse.getWorkflowExecution();
1✔
290
      String runId = workflowExecution.getRunId();
1✔
291
      String workflowType = workflowTaskResponse.getWorkflowType().getName();
1✔
292

293
      Scope workflowTypeScope =
1✔
294
          workerMetricsScope.tagged(ImmutableMap.of(MetricsTag.WORKFLOW_TYPE, workflowType));
1✔
295

296
      MDC.put(LoggerTag.WORKFLOW_ID, workflowExecution.getWorkflowId());
1✔
297
      MDC.put(LoggerTag.WORKFLOW_TYPE, workflowType);
1✔
298
      MDC.put(LoggerTag.RUN_ID, runId);
1✔
299

300
      boolean locked = false;
1✔
301

302
      Stopwatch swTotal =
1✔
303
          workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start();
1✔
304
      try {
305
        if (!Strings.isNullOrEmpty(stickyTaskQueueName)) {
1✔
306
          // Serialize workflow task processing for a particular workflow run.
307
          // This is used to make sure that query tasks and real workflow tasks
308
          // are serialized when sticky is on.
309
          //
310
          // Acquiring a lock with a timeout to avoid having lots of workflow tasks for the same run
311
          // id waiting for a lock and consuming threads in case if lock is unavailable.
312
          //
313
          // Throws interrupted exception which is propagated. It's a correct way to handle it here.
314
          //
315
          // TODO 1: 5 seconds is chosen as a half of normal workflow task timeout.
316
          //   This value should be dynamically configured.
317
          // TODO 2: Does "consider increasing workflow task timeout" advice in this exception makes
318
          //   any sense?
319
          //   This MAYBE makes sense only if a previous workflow task timed out, it's still in
320
          //   progress on the worker and the next workflow task got picked up by the same exact
321
          //   worker from the general non-sticky task queue.
322
          //   Even in this case, this advice looks misleading, something else is going on
323
          //   (like an extreme network latency).
324
          locked = runLocks.tryLock(runId, 5, TimeUnit.SECONDS);
1✔
325

326
          if (!locked) {
1✔
327
            throw new UnableToAcquireLockException(
1✔
328
                "Workflow lock for the run id hasn't been released by one of previous execution attempts, "
329
                    + "consider increasing workflow task timeout.");
330
          }
331
        }
332

333
        Optional<PollWorkflowTaskQueueResponse> nextWFTResponse = Optional.of(workflowTaskResponse);
1✔
334
        do {
335
          PollWorkflowTaskQueueResponse currentTask = nextWFTResponse.get();
1✔
336
          nextWFTResponse = Optional.empty();
1✔
337
          WorkflowTaskHandler.Result result = handleTask(currentTask, workflowTypeScope);
1✔
338
          try {
339
            RespondWorkflowTaskCompletedRequest taskCompleted = result.getTaskCompleted();
1✔
340
            RespondWorkflowTaskFailedRequest taskFailed = result.getTaskFailed();
1✔
341
            RespondQueryTaskCompletedRequest queryCompleted = result.getQueryCompleted();
1✔
342

343
            if (taskCompleted != null) {
1✔
344
              RespondWorkflowTaskCompletedRequest.Builder requestBuilder =
1✔
345
                  taskCompleted.toBuilder();
1✔
346
              try (EagerActivitySlotsReservation activitySlotsReservation =
1✔
347
                  new EagerActivitySlotsReservation(eagerActivityDispatcher)) {
1✔
348
                activitySlotsReservation.applyToRequest(requestBuilder);
1✔
349
                RespondWorkflowTaskCompletedResponse response =
1✔
350
                    sendTaskCompleted(
1✔
351
                        currentTask.getTaskToken(),
1✔
352
                        requestBuilder,
353
                        result.getRequestRetryOptions(),
1✔
354
                        workflowTypeScope);
355
                // If we were processing a speculative WFT the server may instruct us that the task
356
                // was dropped by resting out event ID.
357
                long resetEventId = response.getResetHistoryEventId();
1✔
358
                if (resetEventId != 0) {
1✔
359
                  result.getResetEventIdHandle().apply(resetEventId);
1✔
360
                }
361
                nextWFTResponse =
362
                    response.hasWorkflowTask()
1✔
363
                        ? Optional.of(response.getWorkflowTask())
1✔
364
                        : Optional.empty();
1✔
365
                // TODO we don't have to do this under the runId lock
366
                activitySlotsReservation.handleResponse(response);
1✔
367
              }
368
            } else if (taskFailed != null) {
1✔
369
              sendTaskFailed(
1✔
370
                  currentTask.getTaskToken(),
1✔
371
                  taskFailed.toBuilder(),
1✔
372
                  result.getRequestRetryOptions(),
1✔
373
                  workflowTypeScope);
374
            } else if (queryCompleted != null) {
1✔
375
              sendDirectQueryCompletedResponse(
1✔
376
                  currentTask.getTaskToken(), queryCompleted.toBuilder(), workflowTypeScope);
1✔
377
            }
378
          } catch (Exception e) {
1✔
379
            logExceptionDuringResultReporting(e, currentTask, result);
1✔
380
            // if we failed to report the workflow task completion back to the server,
381
            // our cached version of the workflow may be more advanced than the server is aware of.
382
            // We should discard this execution and perform a clean replay based on what server
383
            // knows next time.
384
            cache.invalidate(
1✔
385
                workflowExecution, workflowTypeScope, "Failed result reporting to the server", e);
386
            throw e;
1✔
387
          }
1✔
388

389
          if (result.getTaskFailed() != null) {
1✔
390
            Scope workflowTaskFailureScope = workflowTypeScope;
1✔
391
            if (result
1✔
392
                .getTaskFailed()
1✔
393
                .getCause()
1✔
394
                .equals(
1✔
395
                    WorkflowTaskFailedCause.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR)) {
396
              workflowTaskFailureScope =
1✔
397
                  workflowTaskFailureScope.tagged(
1✔
398
                      ImmutableMap.of(TASK_FAILURE_TYPE, "NonDeterminismError"));
1✔
399
            } else {
400
              workflowTaskFailureScope =
1✔
401
                  workflowTaskFailureScope.tagged(
1✔
402
                      ImmutableMap.of(TASK_FAILURE_TYPE, "WorkflowError"));
1✔
403
            }
404
            // we don't trigger the counter in case of the legacy query
405
            // (which never has taskFailed set)
406
            workflowTaskFailureScope
1✔
407
                .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
408
                .inc(1);
1✔
409
          }
410
          if (nextWFTResponse.isPresent()) {
1✔
411
            workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_HEARTBEAT_COUNTER).inc(1);
1✔
412
          }
413
        } while (nextWFTResponse.isPresent());
1✔
414
      } finally {
415
        swTotal.stop();
1✔
416
        MDC.remove(LoggerTag.WORKFLOW_ID);
1✔
417
        MDC.remove(LoggerTag.WORKFLOW_TYPE);
1✔
418
        MDC.remove(LoggerTag.RUN_ID);
1✔
419

420
        task.getCompletionCallback().apply();
1✔
421

422
        if (locked) {
1✔
423
          runLocks.unlock(runId);
1✔
424
        }
425
      }
426
    }
1✔
427

428
    @Override
429
    public Throwable wrapFailure(WorkflowTask task, Throwable failure) {
430
      WorkflowExecution execution = task.getResponse().getWorkflowExecution();
1✔
431
      return new RuntimeException(
1✔
432
          "Failure processing workflow task. WorkflowId="
433
              + execution.getWorkflowId()
1✔
434
              + ", RunId="
435
              + execution.getRunId()
1✔
436
              + ", Attempt="
437
              + task.getResponse().getAttempt(),
1✔
438
          failure);
439
    }
440

441
    private WorkflowTaskHandler.Result handleTask(
442
        PollWorkflowTaskQueueResponse task, Scope workflowTypeMetricsScope) throws Exception {
443
      Stopwatch sw =
1✔
444
          workflowTypeMetricsScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_LATENCY).start();
1✔
445
      try {
446
        return handler.handleWorkflowTask(task);
1✔
447
      } catch (Throwable e) {
1✔
448
        // more detailed logging that we can do here is already done inside `handler`
449
        workflowTypeMetricsScope
1✔
450
            .counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER)
1✔
451
            .inc(1);
1✔
452
        workflowTypeMetricsScope.counter(MetricsType.WORKFLOW_TASK_NO_COMPLETION_COUNTER).inc(1);
1✔
453
        throw e;
1✔
454
      } finally {
455
        sw.stop();
1✔
456
      }
457
    }
458

459
    private RespondWorkflowTaskCompletedResponse sendTaskCompleted(
460
        ByteString taskToken,
461
        RespondWorkflowTaskCompletedRequest.Builder taskCompleted,
462
        RpcRetryOptions retryOptions,
463
        Scope workflowTypeMetricsScope) {
464
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
465
          new GrpcRetryer.GrpcRetryerOptions(
466
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
467

468
      taskCompleted
1✔
469
          .setIdentity(options.getIdentity())
1✔
470
          .setNamespace(namespace)
1✔
471
          .setTaskToken(taskToken);
1✔
472
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
473
        taskCompleted.setWorkerVersionStamp(options.workerVersionStamp());
×
474
      } else {
475
        taskCompleted.setBinaryChecksum(options.getBuildId());
1✔
476
      }
477

478
      return grpcRetryer.retryWithResult(
1✔
479
          () ->
480
              service
1✔
481
                  .blockingStub()
1✔
482
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
483
                  .respondWorkflowTaskCompleted(taskCompleted.build()),
1✔
484
          grpcRetryOptions);
485
    }
486

487
    private void sendTaskFailed(
488
        ByteString taskToken,
489
        RespondWorkflowTaskFailedRequest.Builder taskFailed,
490
        RpcRetryOptions retryOptions,
491
        Scope workflowTypeMetricsScope) {
492
      GrpcRetryer.GrpcRetryerOptions grpcRetryOptions =
1✔
493
          new GrpcRetryer.GrpcRetryerOptions(
494
              RpcRetryOptions.newBuilder().buildWithDefaultsFrom(retryOptions), null);
1✔
495

496
      taskFailed.setIdentity(options.getIdentity()).setNamespace(namespace).setTaskToken(taskToken);
1✔
497

498
      if (service.getServerCapabilities().get().getBuildIdBasedVersioning()) {
1✔
499
        taskFailed.setWorkerVersion(options.workerVersionStamp());
×
500
      }
501

502
      grpcRetryer.retry(
1✔
503
          () ->
504
              service
1✔
505
                  .blockingStub()
1✔
506
                  .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
507
                  .respondWorkflowTaskFailed(taskFailed.build()),
1✔
508
          grpcRetryOptions);
509
    }
1✔
510

511
    private void sendDirectQueryCompletedResponse(
512
        ByteString taskToken,
513
        RespondQueryTaskCompletedRequest.Builder queryCompleted,
514
        Scope workflowTypeMetricsScope) {
515
      queryCompleted.setTaskToken(taskToken).setNamespace(namespace);
1✔
516
      // Do not retry query response
517
      service
1✔
518
          .blockingStub()
1✔
519
          .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, workflowTypeMetricsScope)
1✔
520
          .respondQueryTaskCompleted(queryCompleted.build());
1✔
521
    }
1✔
522

523
    private void logExceptionDuringResultReporting(
524
        Exception e, PollWorkflowTaskQueueResponse currentTask, WorkflowTaskHandler.Result result) {
525
      if (log.isDebugEnabled()) {
1✔
526
        log.debug(
×
527
            "Failure during reporting of workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}, WFTResult={}",
528
            currentTask.getWorkflowExecution().getWorkflowId(),
×
529
            currentTask.getWorkflowExecution().getRunId(),
×
530
            currentTask.getStartedEventId(),
×
531
            result,
532
            e);
533
      } else {
534
        log.warn(
1✔
535
            "Failure while reporting workflow progress to the server. If seen continuously the workflow might be stuck. WorkflowId={}, RunId={}, startedEventId={}",
536
            currentTask.getWorkflowExecution().getWorkflowId(),
1✔
537
            currentTask.getWorkflowExecution().getRunId(),
1✔
538
            currentTask.getStartedEventId(),
1✔
539
            e);
540
      }
541
    }
1✔
542
  }
543
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc