• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #284

23 Jul 2024 10:09PM UTC coverage: 77.304% (-0.06%) from 77.364%
#284

push

github

web-flow
Reintroduce slot supplier & add many tests (#2143)

593 of 752 new or added lines in 37 files covered. (78.86%)

22 existing lines in 10 files now uncovered.

19554 of 25295 relevant lines covered (77.3%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.85
/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import com.uber.m3.tally.Scope;
24
import io.temporal.worker.MetricsType;
25
import io.temporal.worker.tuning.*;
26
import java.util.Collections;
27
import java.util.Map;
28
import java.util.Optional;
29
import java.util.concurrent.ConcurrentHashMap;
30
import java.util.concurrent.atomic.AtomicInteger;
31

32
/**
33
 * Wraps a slot supplier and supplements it with additional tracking information that is useful to
34
 * provide to all implementations. This type is used internally rather than {@link SlotSupplier}
35
 * directly.
36
 *
37
 * @param <SI> The slot info type
38
 */
39
public class TrackingSlotSupplier<SI extends SlotInfo> {
40
  private final SlotSupplier<SI> inner;
41
  private final AtomicInteger issuedSlots = new AtomicInteger();
1✔
42
  private final Map<SlotPermit, SI> usedSlots = new ConcurrentHashMap<>();
1✔
43
  private final Scope metricsScope;
44

45
  public TrackingSlotSupplier(SlotSupplier<SI> inner, Scope metricsScope) {
1✔
46
    this.inner = inner;
1✔
47
    this.metricsScope = metricsScope;
1✔
48
    publishSlotsMetric();
1✔
49
  }
1✔
50

51
  public SlotPermit reserveSlot(SlotReservationData dat) throws InterruptedException {
52
    SlotPermit p = inner.reserveSlot(createCtx(dat));
1✔
53
    issuedSlots.incrementAndGet();
1✔
54
    return p;
1✔
55
  }
56

57
  public Optional<SlotPermit> tryReserveSlot(SlotReservationData dat) {
58
    Optional<SlotPermit> p = inner.tryReserveSlot(createCtx(dat));
1✔
59
    if (p.isPresent()) {
1✔
60
      issuedSlots.incrementAndGet();
1✔
61
    }
62
    return p;
1✔
63
  }
64

65
  public void markSlotUsed(SI slotInfo, SlotPermit permit) {
66
    if (permit == null) {
1✔
NEW
67
      throw new IllegalArgumentException(
×
68
          "Permit cannot be null when marking slot as used. This is an SDK bug.");
69
    }
70
    if (usedSlots.put(permit, slotInfo) != null) {
1✔
NEW
71
      throw new IllegalStateException("Slot is being marked used twice. This is an SDK bug.");
×
72
    }
73
    inner.markSlotUsed(new SlotMarkUsedContextImpl(slotInfo, permit));
1✔
74
    publishSlotsMetric();
1✔
75
  }
1✔
76

77
  public void releaseSlot(SlotReleaseReason reason, SlotPermit permit) {
78
    if (permit == null) {
1✔
NEW
79
      throw new IllegalArgumentException(
×
80
          "Permit cannot be null when releasing a slot. This is an SDK bug.");
81
    }
82
    SI slotInfo = usedSlots.get(permit);
1✔
83
    inner.releaseSlot(new SlotReleaseContextImpl(reason, permit, slotInfo));
1✔
84
    issuedSlots.decrementAndGet();
1✔
85
    usedSlots.remove(permit);
1✔
86
    publishSlotsMetric();
1✔
87
  }
1✔
88

89
  public Optional<Integer> maximumSlots() {
90
    return inner.getMaximumSlots();
1✔
91
  }
92

93
  public int getIssuedSlots() {
94
    return issuedSlots.get();
1✔
95
  }
96

97
  Map<SlotPermit, SI> getUsedSlots() {
98
    return usedSlots;
1✔
99
  }
100

101
  private void publishSlotsMetric() {
102
    if (maximumSlots().isPresent()) {
1✔
103
      this.metricsScope
1✔
104
          .gauge(MetricsType.WORKER_TASK_SLOTS_AVAILABLE)
1✔
105
          .update(maximumSlots().get() - usedSlots.size());
1✔
106
    }
107
    this.metricsScope.gauge(MetricsType.WORKER_TASK_SLOTS_USED).update(usedSlots.size());
1✔
108
  }
1✔
109

110
  private SlotReserveContext<SI> createCtx(SlotReservationData dat) {
111
    return new SlotReserveContextImpl(
1✔
112
        dat.taskQueue,
113
        Collections.unmodifiableMap(usedSlots),
1✔
114
        dat.workerIdentity,
115
        dat.workerBuildId,
116
        issuedSlots);
117
  }
118

119
  private class SlotReserveContextImpl implements SlotReserveContext<SI> {
120
    private final String taskQueue;
121
    private final Map<SlotPermit, SI> usedSlots;
122
    private final String workerIdentity;
123
    private final String workerBuildId;
124
    private final AtomicInteger issuedSlots;
125

126
    private SlotReserveContextImpl(
127
        String taskQueue,
128
        Map<SlotPermit, SI> usedSlots,
129
        String workerIdentity,
130
        String workerBuildId,
131
        AtomicInteger issuedSlots) {
1✔
132
      this.taskQueue = taskQueue;
1✔
133
      this.usedSlots = usedSlots;
1✔
134
      this.workerIdentity = workerIdentity;
1✔
135
      this.workerBuildId = workerBuildId;
1✔
136
      this.issuedSlots = issuedSlots;
1✔
137
    }
1✔
138

139
    @Override
140
    public String getTaskQueue() {
141
      return taskQueue;
1✔
142
    }
143

144
    @Override
145
    public Map<SlotPermit, SI> getUsedSlots() {
146
      return usedSlots;
1✔
147
    }
148

149
    @Override
150
    public String getWorkerIdentity() {
NEW
151
      return workerIdentity;
×
152
    }
153

154
    @Override
155
    public String getWorkerBuildId() {
NEW
156
      return workerBuildId;
×
157
    }
158

159
    @Override
160
    public int getNumIssuedSlots() {
161
      return issuedSlots.get();
1✔
162
    }
163
  }
164

165
  private class SlotMarkUsedContextImpl implements SlotMarkUsedContext<SI> {
166
    private final SI slotInfo;
167
    private final SlotPermit slotPermit;
168

169
    protected SlotMarkUsedContextImpl(SI slotInfo, SlotPermit slotPermit) {
1✔
170
      this.slotInfo = slotInfo;
1✔
171
      this.slotPermit = slotPermit;
1✔
172
    }
1✔
173

174
    @Override
175
    public SI getSlotInfo() {
NEW
176
      return slotInfo;
×
177
    }
178

179
    @Override
180
    public SlotPermit getSlotPermit() {
NEW
181
      return slotPermit;
×
182
    }
183
  }
184

185
  private class SlotReleaseContextImpl implements SlotReleaseContext<SI> {
186
    private final SlotPermit slotPermit;
187
    private final SlotReleaseReason reason;
188
    private final SI slotInfo;
189

190
    protected SlotReleaseContextImpl(SlotReleaseReason reason, SlotPermit slotPermit, SI slotInfo) {
1✔
191
      this.slotPermit = slotPermit;
1✔
192
      this.reason = reason;
1✔
193
      this.slotInfo = slotInfo;
1✔
194
    }
1✔
195

196
    @Override
197
    public SlotReleaseReason getSlotReleaseReason() {
NEW
198
      return reason;
×
199
    }
200

201
    @Override
202
    public SlotPermit getSlotPermit() {
NEW
203
      return slotPermit;
×
204
    }
205

206
    @Override
207
    public SI getSlotInfo() {
NEW
208
      return slotInfo;
×
209
    }
210
  }
211
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc