• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.3
/temporal-sdk/src/main/java/io/temporal/worker/tuning/ResourceBasedSlotSupplier.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.worker.tuning;
22

23
import io.temporal.common.Experimental;
24
import java.time.Duration;
25
import java.time.Instant;
26
import java.util.Optional;
27

28
/** Implements a {@link SlotSupplier} based on resource usage for a particular slot type. */
29
@Experimental
30
public class ResourceBasedSlotSupplier<SI extends SlotInfo> implements SlotSupplier<SI> {
31

32
  private final ResourceBasedController resourceController;
33
  private final ResourceBasedSlotOptions options;
34
  private Instant lastSlotIssuedAt = Instant.EPOCH;
1✔
35

36
  /**
37
   * Construct a slot supplier for workflow tasks with the given resource controller and options.
38
   *
39
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
40
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
41
   */
42
  public static ResourceBasedSlotSupplier<WorkflowSlotInfo> createForWorkflow(
43
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
44
    return new ResourceBasedSlotSupplier<>(
1✔
45
        WorkflowSlotInfo.class, resourceBasedController, options);
46
  }
47

48
  /**
49
   * Construct a slot supplier for activity tasks with the given resource controller and options.
50
   *
51
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
52
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
53
   */
54
  public static ResourceBasedSlotSupplier<ActivitySlotInfo> createForActivity(
55
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
56
    return new ResourceBasedSlotSupplier<>(
1✔
57
        ActivitySlotInfo.class, resourceBasedController, options);
58
  }
59

60
  /**
61
   * Construct a slot supplier for local activities with the given resource controller and options.
62
   *
63
   * <p>The resource controller must be the same among all slot suppliers in a worker. If you want
64
   * to use resource-based tuning for all slot suppliers, prefer {@link ResourceBasedTuner}.
65
   */
66
  public static ResourceBasedSlotSupplier<LocalActivitySlotInfo> createForLocalActivity(
67
      ResourceBasedController resourceBasedController, ResourceBasedSlotOptions options) {
68
    return new ResourceBasedSlotSupplier<>(
1✔
69
        LocalActivitySlotInfo.class, resourceBasedController, options);
70
  }
71

72
  private ResourceBasedSlotSupplier(
73
      Class<SI> clazz,
74
      ResourceBasedController resourceBasedController,
75
      ResourceBasedSlotOptions options) {
1✔
76
    this.resourceController = resourceBasedController;
1✔
77
    // Merge default options for any unset fields
78
    if (WorkflowSlotInfo.class.isAssignableFrom(clazz)) {
1✔
79
      this.options =
1✔
80
          ResourceBasedSlotOptions.newBuilder()
1✔
81
              .setMinimumSlots(
1✔
82
                  options.getMinimumSlots() == 0
1✔
NEW
83
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMinimumSlots()
×
84
                      : options.getMinimumSlots())
1✔
85
              .setMaximumSlots(
1✔
86
                  options.getMaximumSlots() == 0
1✔
NEW
87
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getMaximumSlots()
×
88
                      : options.getMaximumSlots())
1✔
89
              .setRampThrottle(
1✔
90
                  options.getRampThrottle() == null
1✔
NEW
91
                      ? ResourceBasedTuner.DEFAULT_WORKFLOW_SLOT_OPTIONS.getRampThrottle()
×
92
                      : options.getRampThrottle())
1✔
93
              .build();
1✔
94
    } else {
95
      this.options =
1✔
96
          ResourceBasedSlotOptions.newBuilder()
1✔
97
              .setMinimumSlots(
1✔
98
                  options.getMinimumSlots() == 0
1✔
NEW
99
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMinimumSlots()
×
100
                      : options.getMinimumSlots())
1✔
101
              .setMaximumSlots(
1✔
102
                  options.getMaximumSlots() == 0
1✔
NEW
103
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getMaximumSlots()
×
104
                      : options.getMaximumSlots())
1✔
105
              .setRampThrottle(
1✔
106
                  options.getRampThrottle() == null
1✔
NEW
107
                      ? ResourceBasedTuner.DEFAULT_ACTIVITY_SLOT_OPTIONS.getRampThrottle()
×
108
                      : options.getRampThrottle())
1✔
109
              .build();
1✔
110
    }
111
  }
1✔
112

113
  @Override
114
  public SlotPermit reserveSlot(SlotReserveContext<SI> ctx) throws InterruptedException {
115
    while (true) {
116
      if (ctx.getNumIssuedSlots() < options.getMinimumSlots()) {
1✔
117
        return new SlotPermit();
1✔
118
      } else {
119
        Duration mustWaitFor;
120
        try {
121
          mustWaitFor = options.getRampThrottle().minus(timeSinceLastSlotIssued());
1✔
NEW
122
        } catch (ArithmeticException e) {
×
NEW
123
          mustWaitFor = Duration.ZERO;
×
124
        }
1✔
125
        if (mustWaitFor.compareTo(Duration.ZERO) > 0) {
1✔
126
          Thread.sleep(mustWaitFor.toMillis());
1✔
127
        }
128

129
        Optional<SlotPermit> permit = tryReserveSlot(ctx);
1✔
130
        if (permit.isPresent()) {
1✔
131
          return permit.get();
1✔
132
        } else {
133
          Thread.sleep(10);
1✔
134
        }
135
      }
1✔
136
    }
137
  }
138

139
  @Override
140
  public Optional<SlotPermit> tryReserveSlot(SlotReserveContext<SI> ctx) {
141
    int numIssued = ctx.getNumIssuedSlots();
1✔
142
    if (numIssued < options.getMinimumSlots()
1✔
143
        || (timeSinceLastSlotIssued().compareTo(options.getRampThrottle()) > 0
1✔
144
            && numIssued < options.getMaximumSlots()
1✔
145
            && resourceController.pidDecision())) {
1✔
146
      lastSlotIssuedAt = Instant.now();
1✔
147
      return Optional.of(new SlotPermit());
1✔
148
    }
149
    return Optional.empty();
1✔
150
  }
151

152
  @Override
153
  public void markSlotUsed(SlotMarkUsedContext<SI> ctx) {}
1✔
154

155
  @Override
156
  public void releaseSlot(SlotReleaseContext<SI> ctx) {}
1✔
157

158
  public ResourceBasedController getResourceController() {
159
    return resourceController;
1✔
160
  }
161

162
  private Duration timeSinceLastSlotIssued() {
163
    return Duration.between(lastSlotIssuedAt, Instant.now());
1✔
164
  }
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc