• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.35
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.uber.m3.tally.Scope;
26
import com.uber.m3.util.ImmutableMap;
27
import io.temporal.api.common.v1.WorkerVersionCapabilities;
28
import io.temporal.api.enums.v1.TaskQueueKind;
29
import io.temporal.api.taskqueue.v1.TaskQueue;
30
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
31
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
32
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
33
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.serviceclient.MetricsTag;
36
import io.temporal.serviceclient.WorkflowServiceStubs;
37
import io.temporal.worker.MetricsType;
38
import io.temporal.worker.tuning.*;
39
import java.util.Objects;
40
import java.util.function.Supplier;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
final class WorkflowPollTask implements Poller.PollTask<WorkflowTask> {
47
  private static final Logger log = LoggerFactory.getLogger(WorkflowPollTask.class);
1✔
48

49
  private final TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier;
50
  private final StickyQueueBalancer stickyQueueBalancer;
51
  private final Scope metricsScope;
52
  private final Scope stickyMetricsScope;
53
  private final WorkflowServiceGrpc.WorkflowServiceBlockingStub serviceStub;
54
  private final PollWorkflowTaskQueueRequest pollRequest;
55
  private final PollWorkflowTaskQueueRequest stickyPollRequest;
56

57
  public WorkflowPollTask(
58
      @Nonnull WorkflowServiceStubs service,
59
      @Nonnull String namespace,
60
      @Nonnull String taskQueue,
61
      @Nullable String stickyTaskQueue,
62
      @Nonnull String identity,
63
      @Nullable String buildId,
64
      boolean useBuildIdForVersioning,
65
      @Nonnull TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
66
      @Nonnull StickyQueueBalancer stickyQueueBalancer,
67
      @Nonnull Scope workerMetricsScope,
68
      @Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
1✔
69
    this.slotSupplier = Objects.requireNonNull(slotSupplier);
1✔
70
    this.stickyQueueBalancer = Objects.requireNonNull(stickyQueueBalancer);
1✔
71
    this.metricsScope = Objects.requireNonNull(workerMetricsScope);
1✔
72
    this.stickyMetricsScope =
1✔
73
        workerMetricsScope.tagged(
1✔
74
            new ImmutableMap.Builder<String, String>(1)
75
                .put(MetricsTag.TASK_QUEUE, String.format("%s:%s", taskQueue, "sticky"))
1✔
76
                .build());
1✔
77
    this.serviceStub =
1✔
78
        Objects.requireNonNull(service)
1✔
79
            .blockingStub()
1✔
80
            .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope);
1✔
81

82
    PollWorkflowTaskQueueRequest.Builder pollRequestBuilder =
83
        PollWorkflowTaskQueueRequest.newBuilder()
1✔
84
            .setNamespace(Objects.requireNonNull(namespace))
1✔
85
            .setIdentity(Objects.requireNonNull(identity));
1✔
86

87
    if (serverCapabilities.get().getBuildIdBasedVersioning()) {
1✔
88
      pollRequestBuilder.setWorkerVersionCapabilities(
×
89
          WorkerVersionCapabilities.newBuilder()
×
90
              .setBuildId(buildId)
×
91
              .setUseVersioning(useBuildIdForVersioning)
×
92
              .build());
×
93
    } else {
94
      pollRequestBuilder.setBinaryChecksum(buildId);
1✔
95
    }
96

97
    this.pollRequest =
1✔
98
        pollRequestBuilder
99
            .setTaskQueue(
1✔
100
                TaskQueue.newBuilder()
1✔
101
                    .setName(taskQueue)
1✔
102
                    // For matching performance optimizations of Temporal Server it's important to
103
                    // know if the poll comes for a sticky or a normal queue. Because sticky queues
104
                    // have only 1 partition, no forwarding is needed.
105
                    .setKind(TaskQueueKind.TASK_QUEUE_KIND_NORMAL)
1✔
106
                    .build())
1✔
107
            .build();
1✔
108

109
    this.stickyPollRequest =
1✔
110
        pollRequestBuilder
111
            .setTaskQueue(
1✔
112
                TaskQueue.newBuilder()
1✔
113
                    .setName(stickyTaskQueue)
1✔
114
                    .setKind(TaskQueueKind.TASK_QUEUE_KIND_STICKY)
1✔
115
                    .setNormalName(taskQueue)
1✔
116
                    .build())
1✔
117
            .build();
1✔
118
  }
1✔
119

120
  @Override
121
  public WorkflowTask poll() {
122
    boolean isSuccessful = false;
1✔
123
    SlotPermit permit;
124
    try {
125
      permit =
1✔
126
          slotSupplier.reserveSlot(
1✔
127
              new SlotReservationData(
128
                  pollRequest.getTaskQueue().getName(),
1✔
129
                  pollRequest.getIdentity(),
1✔
130
                  pollRequest.getWorkerVersionCapabilities().getBuildId()));
1✔
131
    } catch (InterruptedException e) {
1✔
132
      Thread.currentThread().interrupt();
1✔
133
      return null;
1✔
NEW
134
    } catch (Exception e) {
×
NEW
135
      log.warn("Error while trying to reserve a slot for workflow task", e.getCause());
×
NEW
136
      return null;
×
137
    }
1✔
138

139
    TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
1✔
140
    boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
1✔
141
    PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;
1✔
142
    Scope scope = isSticky ? stickyMetricsScope : metricsScope;
1✔
143

144
    log.trace("poll request begin: {}", request);
1✔
145
    try {
146
      PollWorkflowTaskQueueResponse response = doPoll(request, scope);
1✔
147
      if (response == null) {
1✔
148
        return null;
1✔
149
      }
150
      isSuccessful = true;
1✔
151
      stickyQueueBalancer.finishPoll(taskQueueKind, response.getBacklogCountHint());
1✔
152
      slotSupplier.markSlotUsed(new WorkflowSlotInfo(response, pollRequest), permit);
1✔
153
      return new WorkflowTask(response, (rr) -> slotSupplier.releaseSlot(rr, permit));
1✔
154
    } finally {
155
      if (!isSuccessful) {
1✔
156
        slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
1✔
157
        stickyQueueBalancer.finishPoll(taskQueueKind, 0);
1✔
158
      }
159
    }
160
  }
161

162
  @Nullable
163
  private PollWorkflowTaskQueueResponse doPoll(
164
      PollWorkflowTaskQueueRequest request, Scope metricsScope) {
165
    PollWorkflowTaskQueueResponse response = serviceStub.pollWorkflowTaskQueue(request);
1✔
166

167
    if (log.isTraceEnabled()) {
1✔
168
      log.trace(
×
169
          "poll request returned workflow task: taskQueue={}, workflowType={}, workflowExecution={}, startedEventId={}, previousStartedEventId={}{}",
170
          request.getTaskQueue().getName(),
×
171
          response.getWorkflowType(),
×
172
          response.getWorkflowExecution(),
×
173
          response.getStartedEventId(),
×
174
          response.getPreviousStartedEventId(),
×
175
          response.hasQuery() ? ", queryType=" + response.getQuery().getQueryType() : "");
×
176
    }
177

178
    if (response == null || response.getTaskToken().isEmpty()) {
1✔
179
      metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1);
1✔
180
      return null;
1✔
181
    }
182
    metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1);
1✔
183
    metricsScope
1✔
184
        .timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY)
1✔
185
        .record(
1✔
186
            ProtobufTimeUtils.toM3Duration(response.getStartedTime(), response.getScheduledTime()));
1✔
187
    return response;
1✔
188
  }
189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc