• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #341

24 Oct 2024 04:18PM UTC coverage: 78.746% (+0.04%) from 78.708%
#341

push

github

web-flow
Add failure_reason to nexus_task_execution_failed (#2274)

Add failure_reason to nexus_task_execution_failed

22 of 26 new or added lines in 2 files covered. (84.62%)

48 existing lines in 7 files now uncovered.

22726 of 28860 relevant lines covered (78.75%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.62
/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24
import static io.temporal.serviceclient.MetricsTag.TASK_FAILURE_TYPE;
25

26
import com.google.protobuf.ByteString;
27
import com.uber.m3.tally.Scope;
28
import com.uber.m3.tally.Stopwatch;
29
import com.uber.m3.util.Duration;
30
import com.uber.m3.util.ImmutableMap;
31
import io.temporal.api.nexus.v1.HandlerError;
32
import io.temporal.api.nexus.v1.Request;
33
import io.temporal.api.nexus.v1.Response;
34
import io.temporal.api.workflowservice.v1.*;
35
import io.temporal.internal.common.ProtobufTimeUtils;
36
import io.temporal.internal.logging.LoggerTag;
37
import io.temporal.internal.retryer.GrpcRetryer;
38
import io.temporal.serviceclient.MetricsTag;
39
import io.temporal.serviceclient.WorkflowServiceStubs;
40
import io.temporal.serviceclient.rpcretry.DefaultStubServiceOperationRpcRetryOptions;
41
import io.temporal.worker.MetricsType;
42
import io.temporal.worker.WorkerMetricsTag;
43
import io.temporal.worker.tuning.*;
44
import java.util.Collections;
45
import java.util.Objects;
46
import java.util.concurrent.CompletableFuture;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.TimeoutException;
49
import javax.annotation.Nonnull;
50
import org.slf4j.Logger;
51
import org.slf4j.LoggerFactory;
52
import org.slf4j.MDC;
53

54
final class NexusWorker implements SuspendableWorker {
55
  private static final Logger log = LoggerFactory.getLogger(NexusWorker.class);
1✔
56

57
  private SuspendableWorker poller = new NoopWorker();
1✔
58
  private PollTaskExecutor<NexusTask> pollTaskExecutor;
59

60
  private final NexusTaskHandler handler;
61
  private final WorkflowServiceStubs service;
62
  private final String namespace;
63
  private final String taskQueue;
64
  private final SingleWorkerOptions options;
65
  private final PollerOptions pollerOptions;
66
  private final Scope workerMetricsScope;
67
  private final GrpcRetryer grpcRetryer;
68
  private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
69
  private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
70

71
  public NexusWorker(
72
      @Nonnull WorkflowServiceStubs service,
73
      @Nonnull String namespace,
74
      @Nonnull String taskQueue,
75
      @Nonnull SingleWorkerOptions options,
76
      @Nonnull NexusTaskHandler handler,
77
      @Nonnull SlotSupplier<NexusSlotInfo> slotSupplier) {
1✔
78
    this.service = Objects.requireNonNull(service);
1✔
79
    this.namespace = Objects.requireNonNull(namespace);
1✔
80
    this.taskQueue = Objects.requireNonNull(taskQueue);
1✔
81
    this.handler = Objects.requireNonNull(handler);
1✔
82
    this.options = Objects.requireNonNull(options);
1✔
83
    this.pollerOptions = getPollerOptions(options);
1✔
84
    this.workerMetricsScope =
1✔
85
        MetricsTag.tagged(options.getMetricsScope(), WorkerMetricsTag.WorkerType.NEXUS_WORKER);
1✔
86
    this.grpcRetryer = new GrpcRetryer(service.getServerCapabilities());
1✔
87
    this.replyGrpcRetryerOptions =
1✔
88
        new GrpcRetryer.GrpcRetryerOptions(
89
            DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);
90

91
    this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
1✔
92
  }
1✔
93

94
  @Override
95
  public boolean start() {
96
    if (handler.start()) {
1✔
97
      this.pollTaskExecutor =
1✔
98
          new PollTaskExecutor<>(
99
              namespace,
100
              taskQueue,
101
              options.getIdentity(),
1✔
102
              new TaskHandlerImpl(handler),
103
              pollerOptions,
104
              slotSupplier.maximumSlots().orElse(Integer.MAX_VALUE),
1✔
105
              true);
106
      poller =
1✔
107
          new Poller<>(
108
              options.getIdentity(),
1✔
109
              new NexusPollTask(
110
                  service,
111
                  namespace,
112
                  taskQueue,
113
                  options.getIdentity(),
1✔
114
                  options.getBuildId(),
1✔
115
                  options.isUsingBuildIdForVersioning(),
1✔
116
                  this.slotSupplier,
117
                  workerMetricsScope,
118
                  service.getServerCapabilities()),
1✔
119
              this.pollTaskExecutor,
120
              pollerOptions,
121
              workerMetricsScope);
122
      poller.start();
1✔
123
      workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
1✔
124
      return true;
1✔
125
    } else {
126
      return false;
1✔
127
    }
128
  }
129

130
  @Override
131
  public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
132
    String supplierName = this + "#executorSlots";
1✔
133
    return poller
1✔
134
        .shutdown(shutdownManager, interruptTasks)
1✔
135
        .thenCompose(
1✔
136
            ignore ->
137
                !interruptTasks
1✔
138
                    ? shutdownManager.waitForSupplierPermitsReleasedUnlimited(
1✔
139
                        slotSupplier, supplierName)
140
                    : CompletableFuture.completedFuture(null))
1✔
141
        .thenCompose(
1✔
142
            ignore ->
143
                pollTaskExecutor != null
1✔
144
                    ? pollTaskExecutor.shutdown(shutdownManager, interruptTasks)
1✔
145
                    : CompletableFuture.completedFuture(null))
1✔
146
        .exceptionally(
1✔
147
            e -> {
148
              log.error("Unexpected exception during shutdown", e);
×
149
              return null;
×
150
            });
151
  }
152

153
  @Override
154
  public void awaitTermination(long timeout, TimeUnit unit) {
155
    long timeoutMillis = ShutdownManager.awaitTermination(poller, unit.toMillis(timeout));
1✔
156
    ShutdownManager.awaitTermination(pollTaskExecutor, timeoutMillis);
1✔
157
  }
1✔
158

159
  @Override
160
  public void suspendPolling() {
161
    poller.suspendPolling();
1✔
162
  }
1✔
163

164
  @Override
165
  public void resumePolling() {
166
    poller.resumePolling();
1✔
167
  }
1✔
168

169
  @Override
170
  public boolean isShutdown() {
171
    return poller.isShutdown();
×
172
  }
173

174
  @Override
175
  public boolean isTerminated() {
176
    return poller.isTerminated() && (pollTaskExecutor == null || pollTaskExecutor.isTerminated());
1✔
177
  }
178

179
  @Override
180
  public boolean isSuspended() {
181
    return poller.isSuspended();
1✔
182
  }
183

184
  @Override
185
  public WorkerLifecycleState getLifecycleState() {
186
    return poller.getLifecycleState();
×
187
  }
188

189
  private PollerOptions getPollerOptions(SingleWorkerOptions options) {
190
    PollerOptions pollerOptions = options.getPollerOptions();
1✔
191
    if (pollerOptions.getPollThreadNamePrefix() == null) {
1✔
192
      pollerOptions =
1✔
193
          PollerOptions.newBuilder(pollerOptions)
1✔
194
              .setPollThreadNamePrefix(
1✔
195
                  WorkerThreadsNameHelper.getNexusPollerThreadPrefix(namespace, taskQueue))
1✔
196
              .build();
1✔
197
    }
198
    return pollerOptions;
1✔
199
  }
200

201
  @Override
202
  public String toString() {
203
    return String.format(
1✔
204
        "NexusWorker{identity=%s, namespace=%s, taskQueue=%s}",
205
        options.getIdentity(), namespace, taskQueue);
1✔
206
  }
207

208
  private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<NexusTask> {
209

210
    final NexusTaskHandler handler;
211

212
    private TaskHandlerImpl(NexusTaskHandler handler) {
1✔
213
      this.handler = handler;
1✔
214
    }
1✔
215

216
    private String getNexusTaskService(PollNexusTaskQueueResponseOrBuilder pollResponse) {
217
      Request request = pollResponse.getRequest();
1✔
218
      if (request.hasStartOperation()) {
1✔
219
        return request.getStartOperation().getService();
1✔
220
      } else if (request.hasCancelOperation()) {
1✔
221
        return request.getCancelOperation().getService();
1✔
222
      }
223
      return "";
×
224
    }
225

226
    private String getNexusTaskOperation(PollNexusTaskQueueResponseOrBuilder pollResponse) {
227
      Request request = pollResponse.getRequest();
1✔
228
      if (request.hasStartOperation()) {
1✔
229
        return request.getStartOperation().getOperation();
1✔
230
      } else if (request.hasCancelOperation()) {
1✔
231
        return request.getCancelOperation().getOperation();
1✔
232
      }
233
      return "";
×
234
    }
235

236
    @Override
237
    public void handle(NexusTask task) {
238
      PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
239
      // Extract service and operation from the request and set them as MDC and metrics
240
      // scope tags. If the request does not have a service or operation, do not set the tags.
241
      // If we don't know how to handle the task, we will fail the task further down the line.
242
      Scope metricsScope = workerMetricsScope;
1✔
243
      String service = getNexusTaskService(pollResponse);
1✔
244
      if (!service.isEmpty()) {
1✔
245
        MDC.put(LoggerTag.NEXUS_SERVICE, service);
1✔
246
        metricsScope = metricsScope.tagged(ImmutableMap.of(MetricsTag.NEXUS_SERVICE, service));
1✔
247
      }
248
      String operation = getNexusTaskOperation(pollResponse);
1✔
249
      if (!operation.isEmpty()) {
1✔
250
        MDC.put(LoggerTag.NEXUS_OPERATION, operation);
1✔
251
        metricsScope = metricsScope.tagged(ImmutableMap.of(MetricsTag.NEXUS_OPERATION, operation));
1✔
252
      }
253
      slotSupplier.markSlotUsed(
1✔
254
          new NexusSlotInfo(
255
              service, operation, taskQueue, options.getIdentity(), options.getBuildId()),
1✔
256
          task.getPermit());
1✔
257

258
      try {
259
        handleNexusTask(task, metricsScope);
1✔
260
      } finally {
261
        task.getCompletionCallback().apply();
1✔
262
        MDC.remove(LoggerTag.NEXUS_SERVICE);
1✔
263
        MDC.remove(LoggerTag.NEXUS_OPERATION);
1✔
264
      }
265
    }
1✔
266

267
    @Override
268
    public Throwable wrapFailure(NexusTask task, Throwable failure) {
269
      PollNexusTaskQueueResponseOrBuilder response = task.getResponse();
1✔
270
      return new RuntimeException(
1✔
271
          "Failure processing nexus response: " + response.getRequest().toString(), failure);
1✔
272
    }
273

274
    private void handleNexusTask(NexusTask task, Scope metricsScope) {
275
      PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse();
1✔
276
      ByteString taskToken = pollResponse.getTaskToken();
1✔
277

278
      NexusTaskHandler.Result result;
279

280
      Stopwatch sw = metricsScope.timer(MetricsType.NEXUS_EXEC_LATENCY).start();
1✔
281
      try {
282
        result = handler.handle(task, metricsScope);
1✔
283
        if (result.getHandlerError() != null) {
1✔
284
          metricsScope
1✔
285
              .tagged(
1✔
286
                  Collections.singletonMap(
1✔
287
                      TASK_FAILURE_TYPE,
288
                      "handler_error_" + result.getHandlerError().getErrorType()))
1✔
289
              .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER)
1✔
290
              .inc(1);
1✔
291
        } else if (result.getResponse().hasStartOperation()
1✔
292
            && result.getResponse().getStartOperation().hasOperationError()) {
1✔
293
          String operationState =
1✔
294
              result.getResponse().getStartOperation().getOperationError().getOperationState();
1✔
295
          metricsScope
1✔
296
              .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "operation_" + operationState))
1✔
297
              .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER)
1✔
298
              .inc(1);
1✔
299
        }
300
      } catch (TimeoutException e) {
1✔
301
        log.warn("Nexus task timed out while processing", e);
1✔
302
        metricsScope
1✔
303
            .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "timeout"))
1✔
304
            .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER)
1✔
305
            .inc(1);
1✔
306
        return;
1✔
307
      } catch (Throwable e) {
×
NEW
308
        metricsScope
×
NEW
309
            .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "internal_sdk_error"))
×
NEW
310
            .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER)
×
NEW
311
            .inc(1);
×
312
        // handler.handle if expected to never throw an exception and return result
313
        // that can be used for a workflow callback if this method throws, it's a bug.
314
        log.error("[BUG] Code that expected to never throw an exception threw an exception", e);
×
315
        throw e;
×
316
      } finally {
317
        sw.stop();
1✔
318
      }
319

320
      try {
321
        sendReply(taskToken, result, metricsScope);
1✔
322
      } catch (Exception e) {
1✔
323
        logExceptionDuringResultReporting(e, pollResponse, result);
1✔
324
        throw e;
1✔
325
      }
1✔
326

327
      Duration e2eDuration =
1✔
328
          ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getRequest().getScheduledTime());
1✔
329
      metricsScope.timer(MetricsType.NEXUS_TASK_E2E_LATENCY).record(e2eDuration);
1✔
330
    }
1✔
331

332
    private void logExceptionDuringResultReporting(
333
        Exception e,
334
        PollNexusTaskQueueResponseOrBuilder pollResponse,
335
        NexusTaskHandler.Result result) {
336
      if (log.isDebugEnabled()) {
1✔
337
        log.debug(
×
338
            "Failure during reporting of nexus task result to the server. TaskResult={}",
339
            result,
340
            e);
341
      } else {
342
        log.warn("Failure during reporting of nexus task result to the server.", e);
1✔
343
      }
344
    }
1✔
345

346
    private void sendReply(
347
        ByteString taskToken, NexusTaskHandler.Result response, Scope metricsScope) {
348
      Response taskResponse = response.getResponse();
1✔
349
      if (taskResponse != null) {
1✔
350
        RespondNexusTaskCompletedRequest request =
351
            RespondNexusTaskCompletedRequest.newBuilder()
1✔
352
                .setTaskToken(taskToken)
1✔
353
                .setIdentity(options.getIdentity())
1✔
354
                .setNamespace(namespace)
1✔
355
                .setResponse(taskResponse)
1✔
356
                .build();
1✔
357

358
        grpcRetryer.retry(
1✔
359
            () ->
360
                service
1✔
361
                    .blockingStub()
1✔
362
                    .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
363
                    .respondNexusTaskCompleted(request),
1✔
364
            replyGrpcRetryerOptions);
1✔
365
      } else {
1✔
366
        HandlerError taskFailed = response.getHandlerError();
1✔
367
        if (taskFailed != null) {
1✔
368
          RespondNexusTaskFailedRequest request =
369
              RespondNexusTaskFailedRequest.newBuilder()
1✔
370
                  .setTaskToken(taskToken)
1✔
371
                  .setIdentity(options.getIdentity())
1✔
372
                  .setNamespace(namespace)
1✔
373
                  .setError(taskFailed)
1✔
374
                  .build();
1✔
375

376
          grpcRetryer.retry(
1✔
377
              () ->
378
                  service
1✔
379
                      .blockingStub()
1✔
380
                      .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
381
                      .respondNexusTaskFailed(request),
1✔
382
              replyGrpcRetryerOptions);
1✔
383
        } else {
1✔
384
          throw new IllegalArgumentException("[BUG] Either response or failure must be set");
×
385
        }
386
      }
387
    }
1✔
388
  }
389
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc