• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #278

08 Jul 2024 04:42PM UTC coverage: 77.565% (+0.1%) from 77.469%
#278

push

github

web-flow
Revert configurable slot provider (#2134)

* Revert "Resource based tuner (#2110)"

This reverts commit 8a2d5cdcc.

* Revert "Slot supplier interface & fixed-size implementation (#2014)"

This reverts commit d2a06fc6f.

* Fix merge conflict

* Keep Publish Test Report step

* Add tests for worker slots

* Fix white space

* One other whitespace change

117 of 133 new or added lines in 17 files covered. (87.97%)

5 existing lines in 5 files now uncovered.

19088 of 24609 relevant lines covered (77.57%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import java.io.Closeable;
24
import java.time.Duration;
25
import java.util.concurrent.*;
26
import javax.annotation.Nullable;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
public class ShutdownManager implements Closeable {
1✔
31
  private static final Logger log = LoggerFactory.getLogger(ShutdownManager.class);
1✔
32

33
  private final ScheduledExecutorService scheduledExecutorService =
1✔
34
      Executors.newSingleThreadScheduledExecutor(
1✔
35
          new ExecutorThreadFactory(
36
              WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
37

38
  private static final int CHECK_PERIOD_MS = 250;
39

40
  /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */
41
  public CompletableFuture<Void> shutdownExecutorNow(
42
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
43
    executorToShutdown.shutdownNow();
1✔
44
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
45
  }
46

47
  /** executorToShutdown.shutdownNow() -&gt; unlimited wait for termination */
48
  public CompletableFuture<Void> shutdownExecutorNowUntimed(
49
      ExecutorService executorToShutdown, String executorName) {
50
    executorToShutdown.shutdownNow();
1✔
51
    return untimedWait(executorToShutdown, executorName);
1✔
52
  }
53

54
  /**
55
   * executorToShutdown.shutdown() -&gt; timed wait for graceful termination -&gt;
56
   * executorToShutdown.shutdownNow()
57
   */
58
  public CompletableFuture<Void> shutdownExecutor(
59
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
60
    executorToShutdown.shutdown();
1✔
61
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
62
  }
63

64
  /** executorToShutdown.shutdown() -&gt; unlimited wait for graceful termination */
65
  public CompletableFuture<Void> shutdownExecutorUntimed(
66
      ExecutorService executorToShutdown, String executorName) {
67
    executorToShutdown.shutdown();
1✔
68
    return untimedWait(executorToShutdown, executorName);
1✔
69
  }
70

71
  public CompletableFuture<Void> waitForSemaphorePermitsReleaseUntimed(
72
      Semaphore semaphore, int initialSemaphorePermits, String semaphoreName) {
73
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
74
    scheduledExecutorService.submit(
1✔
75
        new SemaphoreReportingDelayShutdown(
76
            semaphore, initialSemaphorePermits, semaphoreName, future));
77
    return future;
1✔
78
  }
79

80
  /**
81
   * waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
82
   * sticky workflows
83
   */
84
  public CompletableFuture<Void> waitForStickyQueueBalancer(
85
      StickyQueueBalancer balancer, Duration timeout) {
86
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
87
    balancer.disableNormalPoll();
1✔
88
    scheduledExecutorService.schedule(
1✔
89
        () -> {
90
          future.complete(null);
1✔
91
        },
1✔
92
        timeout.toMillis(),
1✔
93
        TimeUnit.MILLISECONDS);
94
    return future;
1✔
95
  }
96

97
  /**
98
   * Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
99
   * when the executor is terminated.
100
   */
101
  private CompletableFuture<Void> untimedWait(
102
      ExecutorService executorToShutdown, String executorName) {
103
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
104
    scheduledExecutorService.submit(
1✔
105
        new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
106
    return future;
1✔
107
  }
108

109
  /**
110
   * Wait for {@code executorToShutdown} to terminate for a defined interval, shutdownNow after
111
   * that. Always completes the returned CompletableFuture on termination of the executor or on a
112
   * timeout, whatever happens earlier.
113
   */
114
  private CompletableFuture<Void> limitedWait(
115
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
116
    int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
1✔
117

118
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
119
    scheduledExecutorService.submit(
1✔
120
        new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
121
    return future;
1✔
122
  }
123

124
  @Override
125
  public void close() {
126
    scheduledExecutorService.shutdownNow();
1✔
127
  }
1✔
128

129
  private abstract class LimitedWaitShutdown implements Runnable {
130
    private final CompletableFuture<Void> promise;
131
    private final int maxAttempts;
132
    private int attempt;
133

134
    public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
1✔
135
      this.promise = promise;
1✔
136
      this.maxAttempts = maxAttempts;
1✔
137
    }
1✔
138

139
    @Override
140
    public void run() {
141
      if (isTerminated()) {
1✔
142
        onSuccessfulTermination();
1✔
143
        promise.complete(null);
1✔
144
        return;
1✔
145
      }
146
      attempt++;
1✔
147
      if (attempt > maxAttempts) {
1✔
148
        onAttemptExhaustion();
1✔
149
        // we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
150
        // so just log and complete the promise
151
        promise.complete(null);
1✔
152
        return;
1✔
153
      }
154
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
155
    }
1✔
156

157
    abstract boolean isTerminated();
158

159
    abstract void onAttemptExhaustion();
160

161
    abstract void onSuccessfulTermination();
162
  }
163

164
  private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
165
    private final ExecutorService executorToShutdown;
166
    private final String executorName;
167

168
    public ExecutorLimitedWaitShutdown(
169
        ExecutorService executorToShutdown,
170
        int maxAttempts,
171
        String executorName,
172
        CompletableFuture<Void> promise) {
1✔
173
      super(maxAttempts, promise);
1✔
174
      this.executorToShutdown = executorToShutdown;
1✔
175
      this.executorName = executorName;
1✔
176
    }
1✔
177

178
    @Override
179
    boolean isTerminated() {
180
      return executorToShutdown.isTerminated();
1✔
181
    }
182

183
    @Override
184
    void onAttemptExhaustion() {
185
      log.warn(
1✔
186
          "Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
187
      executorToShutdown.shutdownNow();
1✔
188
    }
1✔
189

190
    @Override
191
    void onSuccessfulTermination() {}
1✔
192
  }
193

194
  private abstract class ReportingDelayShutdown implements Runnable {
195
    // measures in attempts count, not in ms
196
    private static final int BLOCKED_REPORTING_THRESHOLD = 60;
197
    private static final int BLOCKED_REPORTING_PERIOD = 20;
198

199
    private final CompletableFuture<Void> promise;
200
    private int attempt;
201

202
    public ReportingDelayShutdown(CompletableFuture<Void> promise) {
1✔
203
      this.promise = promise;
1✔
204
    }
1✔
205

206
    @Override
207
    public void run() {
208
      if (isTerminated()) {
1✔
209
        if (attempt > BLOCKED_REPORTING_THRESHOLD) {
1✔
210
          onSlowSuccessfulTermination();
×
211
        } else {
212
          onSuccessfulTermination();
1✔
213
        }
214
        promise.complete(null);
1✔
215
        return;
1✔
216
      }
217
      attempt++;
1✔
218
      // log a problem after BLOCKED_REPORTING_THRESHOLD attempts only
219
      if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
1✔
220
        // and repeat every BLOCKED_REPORTING_PERIOD attempts
221
        if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
×
222
          onSlowTermination();
×
223
        }
224
      }
225
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
226
    }
1✔
227

228
    abstract boolean isTerminated();
229

230
    abstract void onSlowTermination();
231

232
    abstract void onSuccessfulTermination();
233

234
    /** Called only if {@link #onSlowTermination()} was called before */
235
    abstract void onSlowSuccessfulTermination();
236
  }
237

238
  private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
239
    private final ExecutorService executorToShutdown;
240
    private final String executorName;
241

242
    public ExecutorReportingDelayShutdown(
243
        ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
1✔
244
      super(promise);
1✔
245
      this.executorToShutdown = executorToShutdown;
1✔
246
      this.executorName = executorName;
1✔
247
    }
1✔
248

249
    @Override
250
    boolean isTerminated() {
251
      return executorToShutdown.isTerminated();
1✔
252
    }
253

254
    @Override
255
    void onSlowTermination() {
256
      log.warn(
×
257
          "Graceful shutdown of {} is blocked by one of the long currently processing tasks",
258
          executorName);
259
    }
×
260

261
    @Override
262
    void onSuccessfulTermination() {}
1✔
263

264
    @Override
265
    void onSlowSuccessfulTermination() {
266
      log.warn("{} successfully terminated", executorName);
×
267
    }
×
268
  }
269

270
  private class SemaphoreReportingDelayShutdown extends ReportingDelayShutdown {
271
    private final Semaphore semaphore;
272
    private final int initialSemaphorePermits;
273
    private final String semaphoreName;
274

275
    public SemaphoreReportingDelayShutdown(
276
        Semaphore semaphore,
277
        int initialSemaphorePermits,
278
        String semaphoreName,
279
        CompletableFuture<Void> promise) {
1✔
280
      super(promise);
1✔
281
      this.semaphore = semaphore;
1✔
282
      this.initialSemaphorePermits = initialSemaphorePermits;
1✔
283
      this.semaphoreName = semaphoreName;
1✔
284
    }
1✔
285

286
    @Override
287
    boolean isTerminated() {
288
      return semaphore.availablePermits() == initialSemaphorePermits;
1✔
289
    }
290

291
    @Override
292
    void onSlowTermination() {
NEW
293
      log.warn("Wait for release of slots of {} takes a long time", semaphoreName);
×
294
    }
×
295

296
    @Override
297
    void onSuccessfulTermination() {}
1✔
298

299
    @Override
300
    void onSlowSuccessfulTermination() {
NEW
301
      log.warn("All slots of {} were successfully released", semaphoreName);
×
302
    }
×
303
  }
304

305
  public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
306
    if (s == null) {
1✔
307
      return timeoutMillis;
1✔
308
    }
309
    return runAndGetRemainingTimeoutMs(
1✔
310
        timeoutMillis,
311
        () -> {
312
          try {
313
            boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
1✔
314
          } catch (InterruptedException e) {
×
315
            Thread.currentThread().interrupt();
×
316
          }
1✔
317
        });
1✔
318
  }
319

320
  public static long runAndGetRemainingTimeoutMs(long initialTimeoutMs, Runnable toRun) {
321
    long startedNs = System.nanoTime();
1✔
322
    try {
323
      toRun.run();
1✔
324
    } catch (Throwable e) {
×
325
      log.warn("Exception during waiting for termination", e);
×
326
    }
1✔
327
    long remainingTimeoutMs =
1✔
328
        initialTimeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNs);
1✔
329
    return remainingTimeoutMs < 0 ? 0 : remainingTimeoutMs;
1✔
330
  }
331

332
  public static long awaitTermination(@Nullable Shutdownable s, long timeoutMillis) {
333
    if (s == null) {
1✔
334
      return timeoutMillis;
1✔
335
    }
336
    return runAndGetRemainingTimeoutMs(
1✔
337
        timeoutMillis, () -> s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
1✔
338
  }
339
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc