• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #280

08 Jul 2024 07:05PM UTC coverage: 77.577% (-0.01%) from 77.589%
#280

push

github

web-flow
Release v1.24.1 (#2140)

19091 of 24609 relevant lines covered (77.58%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.52
/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.uber.m3.tally.Scope;
26
import com.uber.m3.util.ImmutableMap;
27
import io.temporal.api.common.v1.WorkerVersionCapabilities;
28
import io.temporal.api.enums.v1.TaskQueueKind;
29
import io.temporal.api.taskqueue.v1.TaskQueue;
30
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
31
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest;
32
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
33
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.serviceclient.MetricsTag;
36
import io.temporal.serviceclient.WorkflowServiceStubs;
37
import io.temporal.worker.MetricsType;
38
import java.util.Objects;
39
import java.util.concurrent.Semaphore;
40
import java.util.function.Supplier;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43
import org.slf4j.Logger;
44
import org.slf4j.LoggerFactory;
45

46
final class WorkflowPollTask implements Poller.PollTask<WorkflowTask> {
47
  private static final Logger log = LoggerFactory.getLogger(WorkflowPollTask.class);
1✔
48

49
  private final Semaphore workflowTaskExecutorSemaphore;
50
  private final StickyQueueBalancer stickyQueueBalancer;
51
  private final Scope metricsScope;
52
  private final Scope stickyMetricsScope;
53
  private final WorkflowServiceGrpc.WorkflowServiceBlockingStub serviceStub;
54
  private final PollWorkflowTaskQueueRequest pollRequest;
55
  private final PollWorkflowTaskQueueRequest stickyPollRequest;
56

57
  public WorkflowPollTask(
58
      @Nonnull WorkflowServiceStubs service,
59
      @Nonnull String namespace,
60
      @Nonnull String taskQueue,
61
      @Nullable String stickyTaskQueue,
62
      @Nonnull String identity,
63
      @Nullable String buildId,
64
      boolean useBuildIdForVersioning,
65
      @Nonnull Semaphore workflowTaskExecutorSemaphore,
66
      @Nonnull StickyQueueBalancer stickyQueueBalancer,
67
      @Nonnull Scope workerMetricsScope,
68
      @Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
1✔
69
    this.workflowTaskExecutorSemaphore = Objects.requireNonNull(workflowTaskExecutorSemaphore);
1✔
70
    this.stickyQueueBalancer = Objects.requireNonNull(stickyQueueBalancer);
1✔
71
    this.metricsScope = Objects.requireNonNull(workerMetricsScope);
1✔
72
    this.stickyMetricsScope =
1✔
73
        workerMetricsScope.tagged(
1✔
74
            new ImmutableMap.Builder<String, String>(1)
75
                .put(MetricsTag.TASK_QUEUE, String.format("%s:%s", taskQueue, "sticky"))
1✔
76
                .build());
1✔
77
    this.serviceStub =
1✔
78
        Objects.requireNonNull(service)
1✔
79
            .blockingStub()
1✔
80
            .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope);
1✔
81

82
    PollWorkflowTaskQueueRequest.Builder pollRequestBuilder =
83
        PollWorkflowTaskQueueRequest.newBuilder()
1✔
84
            .setNamespace(Objects.requireNonNull(namespace))
1✔
85
            .setIdentity(Objects.requireNonNull(identity));
1✔
86

87
    if (serverCapabilities.get().getBuildIdBasedVersioning()) {
1✔
88
      pollRequestBuilder.setWorkerVersionCapabilities(
×
89
          WorkerVersionCapabilities.newBuilder()
×
90
              .setBuildId(buildId)
×
91
              .setUseVersioning(useBuildIdForVersioning)
×
92
              .build());
×
93
    } else {
94
      pollRequestBuilder.setBinaryChecksum(buildId);
1✔
95
    }
96

97
    this.pollRequest =
1✔
98
        pollRequestBuilder
99
            .setTaskQueue(
1✔
100
                TaskQueue.newBuilder()
1✔
101
                    .setName(taskQueue)
1✔
102
                    // For matching performance optimizations of Temporal Server it's important to
103
                    // know if the poll comes for a sticky or a normal queue. Because sticky queues
104
                    // have only 1 partition, no forwarding is needed.
105
                    .setKind(TaskQueueKind.TASK_QUEUE_KIND_NORMAL)
1✔
106
                    .build())
1✔
107
            .build();
1✔
108

109
    this.stickyPollRequest =
1✔
110
        pollRequestBuilder
111
            .setTaskQueue(
1✔
112
                TaskQueue.newBuilder()
1✔
113
                    .setName(stickyTaskQueue)
1✔
114
                    .setKind(TaskQueueKind.TASK_QUEUE_KIND_STICKY)
1✔
115
                    .setNormalName(taskQueue)
1✔
116
                    .build())
1✔
117
            .build();
1✔
118
  }
1✔
119

120
  @Override
121
  public WorkflowTask poll() {
122
    boolean isSuccessful = false;
1✔
123
    try {
124
      workflowTaskExecutorSemaphore.acquire();
1✔
125
    } catch (InterruptedException e) {
×
126
      Thread.currentThread().interrupt();
×
127
      return null;
×
128
    }
1✔
129

130
    TaskQueueKind taskQueueKind = stickyQueueBalancer.makePoll();
1✔
131
    boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind);
1✔
132
    PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest;
1✔
133
    Scope scope = isSticky ? stickyMetricsScope : metricsScope;
1✔
134

135
    log.trace("poll request begin: {}", request);
1✔
136
    try {
137
      PollWorkflowTaskQueueResponse response = doPoll(request, scope);
1✔
138
      if (response == null) {
1✔
139
        return null;
1✔
140
      }
141
      isSuccessful = true;
1✔
142
      stickyQueueBalancer.finishPoll(taskQueueKind, response.getBacklogCountHint());
1✔
143
      return new WorkflowTask(response, workflowTaskExecutorSemaphore::release);
1✔
144
    } finally {
145
      if (!isSuccessful) {
1✔
146
        workflowTaskExecutorSemaphore.release();
1✔
147
        stickyQueueBalancer.finishPoll(taskQueueKind, 0);
1✔
148
      }
149
    }
150
  }
151

152
  @Nullable
153
  private PollWorkflowTaskQueueResponse doPoll(
154
      PollWorkflowTaskQueueRequest request, Scope metricsScope) {
155
    PollWorkflowTaskQueueResponse response = serviceStub.pollWorkflowTaskQueue(request);
1✔
156

157
    if (log.isTraceEnabled()) {
1✔
158
      log.trace(
×
159
          "poll request returned workflow task: taskQueue={}, workflowType={}, workflowExecution={}, startedEventId={}, previousStartedEventId={}{}",
160
          request.getTaskQueue().getName(),
×
161
          response.getWorkflowType(),
×
162
          response.getWorkflowExecution(),
×
163
          response.getStartedEventId(),
×
164
          response.getPreviousStartedEventId(),
×
165
          response.hasQuery() ? ", queryType=" + response.getQuery().getQueryType() : "");
×
166
    }
167

168
    if (response == null || response.getTaskToken().isEmpty()) {
1✔
169
      metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_EMPTY_COUNTER).inc(1);
1✔
170
      return null;
1✔
171
    }
172
    metricsScope.counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER).inc(1);
1✔
173
    metricsScope
1✔
174
        .timer(MetricsType.WORKFLOW_TASK_SCHEDULE_TO_START_LATENCY)
1✔
175
        .record(
1✔
176
            ProtobufTimeUtils.toM3Duration(response.getStartedTime(), response.getScheduledTime()));
1✔
177
    return response;
1✔
178
  }
179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc