• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #341

24 Oct 2024 04:18PM UTC coverage: 78.746% (+0.04%) from 78.708%
#341

push

github

web-flow
Add failure_reason to nexus_task_execution_failed (#2274)

Add failure_reason to nexus_task_execution_failed

22 of 26 new or added lines in 2 files covered. (84.62%)

48 existing lines in 7 files now uncovered.

22726 of 28860 relevant lines covered (78.75%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.Timestamp;
26
import com.uber.m3.tally.Scope;
27
import io.temporal.api.common.v1.WorkerVersionCapabilities;
28
import io.temporal.api.taskqueue.v1.TaskQueue;
29
import io.temporal.api.workflowservice.v1.*;
30
import io.temporal.internal.common.ProtobufTimeUtils;
31
import io.temporal.serviceclient.WorkflowServiceStubs;
32
import io.temporal.worker.MetricsType;
33
import io.temporal.worker.tuning.*;
34
import java.util.Objects;
35
import java.util.function.Supplier;
36
import javax.annotation.Nonnull;
37
import javax.annotation.Nullable;
38
import org.slf4j.Logger;
39
import org.slf4j.LoggerFactory;
40

41
final class NexusPollTask implements Poller.PollTask<NexusTask> {
42
  private static final Logger log = LoggerFactory.getLogger(NexusPollTask.class);
1✔
43

44
  private final WorkflowServiceStubs service;
45
  private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
46
  private final Scope metricsScope;
47
  private final PollNexusTaskQueueRequest pollRequest;
48

49
  public NexusPollTask(
50
      @Nonnull WorkflowServiceStubs service,
51
      @Nonnull String namespace,
52
      @Nonnull String taskQueue,
53
      @Nonnull String identity,
54
      @Nullable String buildId,
55
      boolean useBuildIdForVersioning,
56
      @Nonnull TrackingSlotSupplier<NexusSlotInfo> slotSupplier,
57
      @Nonnull Scope metricsScope,
58
      @Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
1✔
59
    this.service = Objects.requireNonNull(service);
1✔
60
    this.slotSupplier = slotSupplier;
1✔
61
    this.metricsScope = Objects.requireNonNull(metricsScope);
1✔
62

63
    PollNexusTaskQueueRequest.Builder pollRequest =
64
        PollNexusTaskQueueRequest.newBuilder()
1✔
65
            .setNamespace(namespace)
1✔
66
            .setIdentity(identity)
1✔
67
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
68

69
    if (serverCapabilities.get().getBuildIdBasedVersioning()) {
1✔
70
      pollRequest.setWorkerVersionCapabilities(
×
71
          WorkerVersionCapabilities.newBuilder()
×
72
              .setBuildId(buildId)
×
73
              .setUseVersioning(useBuildIdForVersioning)
×
74
              .build());
×
75
    }
76
    this.pollRequest = pollRequest.build();
1✔
77
  }
1✔
78

79
  @Override
80
  public NexusTask poll() {
81
    if (log.isTraceEnabled()) {
1✔
82
      log.trace("poll request begin: " + pollRequest);
×
83
    }
84
    PollNexusTaskQueueResponse response;
85
    SlotPermit permit;
86
    boolean isSuccessful = false;
1✔
87

88
    try {
89
      permit =
1✔
90
          slotSupplier.reserveSlot(
1✔
91
              new SlotReservationData(
92
                  pollRequest.getTaskQueue().getName(),
1✔
93
                  pollRequest.getIdentity(),
1✔
94
                  pollRequest.getWorkerVersionCapabilities().getBuildId()));
1✔
95
    } catch (InterruptedException e) {
×
96
      Thread.currentThread().interrupt();
×
97
      return null;
×
98
    } catch (Exception e) {
×
99
      log.warn("Error while trying to reserve a slot for a nexus task", e.getCause());
×
100
      return null;
×
101
    }
1✔
102

103
    try {
104
      response =
1✔
105
          service
106
              .blockingStub()
1✔
107
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
108
              .pollNexusTaskQueue(pollRequest);
1✔
109

110
      if (response == null || response.getTaskToken().isEmpty()) {
1✔
UNCOV
111
        metricsScope.counter(MetricsType.NEXUS_POLL_NO_TASK_COUNTER).inc(1);
×
UNCOV
112
        return null;
×
113
      }
114

115
      Timestamp startedTime = ProtobufTimeUtils.getCurrentProtoTime();
1✔
116
      metricsScope
1✔
117
          .timer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY)
1✔
118
          .record(
1✔
119
              ProtobufTimeUtils.toM3Duration(
1✔
120
                  startedTime, response.getRequest().getScheduledTime()));
1✔
121

122
      isSuccessful = true;
1✔
123
      return new NexusTask(
1✔
124
          response,
125
          permit,
126
          () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
1✔
127
    } finally {
128
      if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
1✔
129
    }
130
  }
131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc