• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #244

10 Apr 2024 08:19PM UTC coverage: 77.465% (-0.08%) from 77.549%
#244

push

github

web-flow
Slot supplier interface & fixed-size implementation (#2014)

https://github.com/temporalio/proposals/blob/master/all-sdk/autotuning.md

286 of 388 new or added lines in 25 files covered. (73.71%)

3 existing lines in 3 files now uncovered.

19116 of 24677 relevant lines covered (77.46%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.97
/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
24

25
import com.google.protobuf.DoubleValue;
26
import com.uber.m3.tally.Scope;
27
import io.temporal.api.common.v1.WorkerVersionCapabilities;
28
import io.temporal.api.taskqueue.v1.TaskQueue;
29
import io.temporal.api.taskqueue.v1.TaskQueueMetadata;
30
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
31
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueRequest;
32
import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse;
33
import io.temporal.internal.activity.ActivityPollResponseToInfo;
34
import io.temporal.internal.common.ProtobufTimeUtils;
35
import io.temporal.serviceclient.WorkflowServiceStubs;
36
import io.temporal.worker.MetricsType;
37
import io.temporal.worker.tuning.*;
38
import java.util.Objects;
39
import java.util.function.Supplier;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import org.slf4j.Logger;
43
import org.slf4j.LoggerFactory;
44

45
final class ActivityPollTask implements Poller.PollTask<ActivityTask> {
46
  private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
1✔
47

48
  private final WorkflowServiceStubs service;
49
  private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
50
  private final Scope metricsScope;
51
  private final PollActivityTaskQueueRequest pollRequest;
52

53
  public ActivityPollTask(
54
      @Nonnull WorkflowServiceStubs service,
55
      @Nonnull String namespace,
56
      @Nonnull String taskQueue,
57
      @Nonnull String identity,
58
      @Nullable String buildId,
59
      boolean useBuildIdForVersioning,
60
      double activitiesPerSecond,
61
      @Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
62
      @Nonnull Scope metricsScope,
63
      @Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities) {
1✔
64
    this.service = Objects.requireNonNull(service);
1✔
65
    this.slotSupplier = slotSupplier;
1✔
66
    this.metricsScope = Objects.requireNonNull(metricsScope);
1✔
67

68
    PollActivityTaskQueueRequest.Builder pollRequest =
69
        PollActivityTaskQueueRequest.newBuilder()
1✔
70
            .setNamespace(namespace)
1✔
71
            .setIdentity(identity)
1✔
72
            .setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
1✔
73
    if (activitiesPerSecond > 0) {
1✔
74
      pollRequest.setTaskQueueMetadata(
×
75
          TaskQueueMetadata.newBuilder()
×
76
              .setMaxTasksPerSecond(DoubleValue.newBuilder().setValue(activitiesPerSecond).build())
×
77
              .build());
×
78
    }
79

80
    if (serverCapabilities.get().getBuildIdBasedVersioning()) {
1✔
81
      pollRequest.setWorkerVersionCapabilities(
×
82
          WorkerVersionCapabilities.newBuilder()
×
83
              .setBuildId(buildId)
×
84
              .setUseVersioning(useBuildIdForVersioning)
×
85
              .build());
×
86
    }
87
    this.pollRequest = pollRequest.build();
1✔
88
  }
1✔
89

90
  @Override
91
  public ActivityTask poll() {
92
    if (log.isTraceEnabled()) {
1✔
93
      log.trace("poll request begin: " + pollRequest);
×
94
    }
95
    PollActivityTaskQueueResponse response;
96
    SlotPermit permit;
97
    boolean isSuccessful = false;
1✔
98

99
    try {
100
      permit =
1✔
101
          slotSupplier.reserveSlot(
1✔
102
              new SlotReservationData(
103
                  pollRequest.getTaskQueue().getName(),
1✔
104
                  pollRequest.getIdentity(),
1✔
105
                  pollRequest.getWorkerVersionCapabilities().getBuildId()));
1✔
106
    } catch (InterruptedException e) {
1✔
107
      Thread.currentThread().interrupt();
1✔
108
      return null;
1✔
NEW
109
    } catch (Exception e) {
×
NEW
110
      log.warn("Error while trying to reserve a slot for an activity", e.getCause());
×
NEW
111
      return null;
×
112
    }
1✔
113

114
    try {
115
      response =
1✔
116
          service
117
              .blockingStub()
1✔
118
              .withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope)
1✔
119
              .pollActivityTaskQueue(pollRequest);
1✔
120

121
      if (response == null || response.getTaskToken().isEmpty()) {
1✔
122
        metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
1✔
123
        return null;
1✔
124
      }
125
      metricsScope
1✔
126
          .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY)
1✔
127
          .record(
1✔
128
              ProtobufTimeUtils.toM3Duration(
1✔
129
                  response.getStartedTime(), response.getCurrentAttemptScheduledTime()));
1✔
130
      isSuccessful = true;
1✔
131
      slotSupplier.markSlotUsed(
1✔
132
          new ActivitySlotInfo(
133
              ActivityPollResponseToInfo.toActivityInfoImpl(
1✔
134
                  response,
135
                  pollRequest.getNamespace(),
1✔
136
                  pollRequest.getTaskQueue().getNormalName(),
1✔
137
                  false),
138
              pollRequest.getIdentity(),
1✔
139
              pollRequest.getWorkerVersionCapabilities().getBuildId()),
1✔
140
          permit);
141
      return new ActivityTask(
1✔
142
          response, () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit));
1✔
143
    } finally {
144
      if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit);
1✔
145
    }
146
  }
147
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc