• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #333

16 Oct 2024 07:28PM UTC coverage: 78.65% (+0.6%) from 78.085%
#333

push

github

web-flow
Fix code coverage (#2275)

22670 of 28824 relevant lines covered (78.65%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.42
/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.worker.tuning;
22

23
import io.temporal.common.Experimental;
24
import java.time.Duration;
25
import java.time.Instant;
26
import java.util.Optional;
27

28
/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
29
@Experimental
30
public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
31

32
  private final ResourceBasedController resourceController;
33
  private final ResourceBasedSlotOptions options;
34
  private Instant lastSlotIssuedAt = Instant.EPOCH;
1✔
35

36
  /**
37
   * Construct a slot supplier for workflow tasks with the given resource controller and options.
38
   *
39
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
40
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
41
   */
42
  public static ResourceBasedSlotSupplier<WorkflowSlotInfo> createForWorkflow(
43
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
44
    return new ResourceBasedSlotSupplier<>(
1✔
45
        WorkflowSlotInfo.class, resourceBasedController, options);
46
  }
47

48
  /**
49
   * Construct a slot supplier for activity tasks with the given resource controller and options.
50
   *
51
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
52
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
53
   */
54
  public static ResourceBasedSlotSupplier<ActivitySlotInfo> createForActivity(
55
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
56
    return new ResourceBasedSlotSupplier<>(
1✔
57
        ActivitySlotInfo.class, resourceBasedController, options);
58
  }
59

60
  /**
61
   * Construct a slot supplier for local activities with the given resource controller and options.
62
   *
63
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
64
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
65
   */
66
  public static ResourceBasedSlotSupplier<LocalActivitySlotInfo> createForLocalActivity(
67
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
68
    return new ResourceBasedSlotSupplier<>(
1✔
69
        LocalActivitySlotInfo.class, resourceBasedController, options);
70
  }
71

72
  /**
73
   * Construct a slot supplier for nexus tasks with the given resource controller and options.
74
   *
75
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
76
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
77
   */
78
  public static ResourceBasedSlotSupplier<NexusSlotInfo> createForNexus(
79
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
80
    return new ResourceBasedSlotSupplier<>(NexusSlotInfo.class, resourceBasedController, options);
1✔
81
  }
82

83
  private ResourceBasedSlotSupplier(
84
      Class<SI> clazz,
85
      ResourceBasedController resourceBasedController,
86
      ResourceBasedSlotOptions options) {
1✔
87
    this.resourceController = resourceBasedController;
1✔
88
    // Merge default options for any unset fields
89
    if (WorkflowSlotInfo.class.isAssignableFrom(clazz)) {
1✔
90
      this.options =
1✔
91
          ResourceBasedSlotOptions.newBuilder()
1✔
92
              .setMinimumSlots(
1✔
93
                  options.getMinimumSlots() == 0
1✔
94
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMinimumSlots()
×
95
                      : options.getMinimumSlots())
1✔
96
              .setMaximumSlots(
1✔
97
                  options.getMaximumSlots() == 0
1✔
98
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMaximumSlots()
×
99
                      : options.getMaximumSlots())
1✔
100
              .setRampThrottle(
1✔
101
                  options.getRampThrottle() == null
1✔
102
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getRampThrottle()
×
103
                      : options.getRampThrottle())
1✔
104
              .build();
1✔
105
    } else if (ActivitySlotInfo.class.isAssignableFrom(clazz)
1✔
106
        || LocalActivitySlotInfo.class.isAssignableFrom(clazz)) {
1✔
107
      this.options =
1✔
108
          ResourceBasedSlotOptions.newBuilder()
1✔
109
              .setMinimumSlots(
1✔
110
                  options.getMinimumSlots() == 0
1✔
111
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMinimumSlots()
×
112
                      : options.getMinimumSlots())
1✔
113
              .setMaximumSlots(
1✔
114
                  options.getMaximumSlots() == 0
1✔
115
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMaximumSlots()
×
116
                      : options.getMaximumSlots())
1✔
117
              .setRampThrottle(
1✔
118
                  options.getRampThrottle() == null
1✔
119
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getRampThrottle()
×
120
                      : options.getRampThrottle())
1✔
121
              .build();
1✔
122
    } else {
123
      this.options =
1✔
124
          ResourceBasedSlotOptions.newBuilder()
1✔
125
              .setMinimumSlots(
1✔
126
                  options.getMinimumSlots() == 0
1✔
127
                      ? ResourceBasedTuner.DEFAULT_NEXUS_SLOT_OPTIONS.getMinimumSlots()
×
128
                      : options.getMinimumSlots())
1✔
129
              .setMaximumSlots(
1✔
130
                  options.getMaximumSlots() == 0
1✔
131
                      ? ResourceBasedTuner.DEFAULT_NEXUS_SLOT_OPTIONS.getMaximumSlots()
×
132
                      : options.getMaximumSlots())
1✔
133
              .setRampThrottle(
1✔
134
                  options.getRampThrottle() == null
1✔
135
                      ? ResourceBasedTuner.DEFAULT_NEXUS_SLOT_OPTIONS.getRampThrottle()
×
136
                      : options.getRampThrottle())
1✔
137
              .build();
1✔
138
    }
139
  }
1✔
140

141
  @Override
142
  public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
143
    while (true) {
144
      if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
1✔
145
        return new SlotPermit();
1✔
146
      } else {
147
        Duration mustWaitFor;
148
        try {
149
          mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
1✔
150
        } catch (ArithmeticException e) {
×
151
          mustWaitFor = Duration.ZERO;
×
152
        }
1✔
153
        if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
1✔
154
          Thread.sleep(mustWaitFor.toMillis());
1✔
155
        }
156

157
        Optional<SlotPermit> permit = tryReserveSlot(ctx);
1✔
158
        if (permit.isPresent()) {
1✔
159
          return permit.get();
1✔
160
        } else {
161
          Thread.sleep(10);
1✔
162
        }
163
      }
1✔
164
    }
165
  }
166

167
  @Override
168
  public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
169
    int numIssued = ctx.getNumIssuedSlots();
1✔
170
    if (numIssued < options.getMinimumSlots()
1✔
171
        || (timeSinceLastSlotIssued().compareTo(options.getRampThrottle()) > 0
1✔
172
            && numIssued < options.getMaximumSlots()
1✔
173
            && resourceController.pidDecision())) {
1✔
174
      lastSlotIssuedAt = Instant.now();
1✔
175
      return Optional.of(new SlotPermit());
1✔
176
    }
177
    return Optional.empty();
1✔
178
  }
179

180
  @Override
181
  public void markSlotUsed(SlotMarkUsedContext<SI> ctx) {}
1✔
182

183
  @Override
184
  public void releaseSlot(SlotReleaseContext<SI> ctx) {}
1✔
185

186
  public ResourceBasedController getResourceController() {
187
    return resourceController;
1✔
188
  }
189

190
  private Duration timeSinceLastSlotIssued() {
191
    return Duration.between(lastSlotIssuedAt, Instant.now());
1✔
192
  }
193
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc