• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #160

pending completion
#160

push

github-actions

web-flow
Wait for worker slots to be fully released in the graceful worker shutdown (#1679)

95 of 95 new or added lines in 5 files covered. (100.0%)

17058 of 20907 relevant lines covered (81.59%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.98
/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import java.io.Closeable;
24
import java.time.Duration;
25
import java.util.concurrent.*;
26
import javax.annotation.Nullable;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
public class ShutdownManager implements Closeable {
1✔
31
  private static final Logger log = LoggerFactory.getLogger(ShutdownManager.class);
1✔
32

33
  private final ScheduledExecutorService scheduledExecutorService =
1✔
34
      Executors.newSingleThreadScheduledExecutor(
1✔
35
          new ExecutorThreadFactory(
36
              WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
37

38
  private static final int CHECK_PERIOD_MS = 250;
39

40
  /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */
41
  public CompletableFuture<Void> shutdownExecutorNow(
42
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
43
    executorToShutdown.shutdownNow();
1✔
44
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
45
  }
46

47
  /** executorToShutdown.shutdownNow() -&gt; unlimited wait for termination */
48
  public CompletableFuture<Void> shutdownExecutorNowUntimed(
49
      ExecutorService executorToShutdown, String executorName) {
50
    executorToShutdown.shutdownNow();
1✔
51
    return untimedWait(executorToShutdown, executorName);
1✔
52
  }
53

54
  /**
55
   * executorToShutdown.shutdown() -&gt; timed wait for graceful termination -&gt;
56
   * executorToShutdown.shutdownNow()
57
   */
58
  public CompletableFuture<Void> shutdownExecutor(
59
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
60
    executorToShutdown.shutdown();
1✔
61
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
62
  }
63

64
  /** executorToShutdown.shutdown() -&gt; unlimited wait for graceful termination */
65
  public CompletableFuture<Void> shutdownExecutorUntimed(
66
      ExecutorService executorToShutdown, String executorName) {
67
    executorToShutdown.shutdown();
1✔
68
    return untimedWait(executorToShutdown, executorName);
1✔
69
  }
70

71
  public CompletableFuture<Void> waitForSemaphorePermitsReleaseUntimed(
72
      Semaphore semaphore, int initialSemaphorePermits, String semaphoreName) {
73
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
74
    scheduledExecutorService.submit(
1✔
75
        new SemaphoreReportingDelayShutdown(
76
            semaphore, initialSemaphorePermits, semaphoreName, future));
77
    return future;
1✔
78
  }
79

80
  /**
81
   * Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
82
   * when the executor is terminated.
83
   */
84
  private CompletableFuture<Void> untimedWait(
85
      ExecutorService executorToShutdown, String executorName) {
86
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
87
    scheduledExecutorService.submit(
1✔
88
        new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
89
    return future;
1✔
90
  }
91

92
  /**
93
   * Wait for {@code executorToShutdown} to terminate for a defined interval, shutdownNow after
94
   * that. Always completes the returned CompletableFuture on termination of the executor or on a
95
   * timeout, whatever happens earlier.
96
   */
97
  private CompletableFuture<Void> limitedWait(
98
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
99
    int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
1✔
100

101
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
102
    scheduledExecutorService.submit(
1✔
103
        new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
104
    return future;
1✔
105
  }
106

107
  @Override
108
  public void close() {
109
    scheduledExecutorService.shutdownNow();
1✔
110
  }
1✔
111

112
  private abstract class LimitedWaitShutdown implements Runnable {
113
    private final CompletableFuture<Void> promise;
114
    private final int maxAttempts;
115
    private int attempt;
116

117
    public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
1✔
118
      this.promise = promise;
1✔
119
      this.maxAttempts = maxAttempts;
1✔
120
    }
1✔
121

122
    @Override
123
    public void run() {
124
      if (isTerminated()) {
1✔
125
        onSuccessfulTermination();
1✔
126
        promise.complete(null);
1✔
127
        return;
1✔
128
      }
129
      attempt++;
1✔
130
      if (attempt > maxAttempts) {
1✔
131
        onAttemptExhaustion();
1✔
132
        // we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
133
        // so just log and complete the promise
134
        promise.complete(null);
1✔
135
        return;
1✔
136
      }
137
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
138
    }
1✔
139

140
    abstract boolean isTerminated();
141

142
    abstract void onAttemptExhaustion();
143

144
    abstract void onSuccessfulTermination();
145
  }
146

147
  private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
148
    private final ExecutorService executorToShutdown;
149
    private final String executorName;
150

151
    public ExecutorLimitedWaitShutdown(
152
        ExecutorService executorToShutdown,
153
        int maxAttempts,
154
        String executorName,
155
        CompletableFuture<Void> promise) {
1✔
156
      super(maxAttempts, promise);
1✔
157
      this.executorToShutdown = executorToShutdown;
1✔
158
      this.executorName = executorName;
1✔
159
    }
1✔
160

161
    @Override
162
    boolean isTerminated() {
163
      return executorToShutdown.isTerminated();
1✔
164
    }
165

166
    @Override
167
    void onAttemptExhaustion() {
168
      log.warn(
1✔
169
          "Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
170
      executorToShutdown.shutdownNow();
1✔
171
    }
1✔
172

173
    @Override
174
    void onSuccessfulTermination() {}
1✔
175
  }
176

177
  private abstract class ReportingDelayShutdown implements Runnable {
178
    // measures in attempts count, not in ms
179
    private static final int BLOCKED_REPORTING_THRESHOLD = 60;
180
    private static final int BLOCKED_REPORTING_PERIOD = 20;
181

182
    private final CompletableFuture<Void> promise;
183
    private int attempt;
184

185
    public ReportingDelayShutdown(CompletableFuture<Void> promise) {
1✔
186
      this.promise = promise;
1✔
187
    }
1✔
188

189
    @Override
190
    public void run() {
191
      if (isTerminated()) {
1✔
192
        if (attempt > BLOCKED_REPORTING_THRESHOLD) {
1✔
193
          onSlowSuccessfulTermination();
×
194
        } else {
195
          onSuccessfulTermination();
1✔
196
        }
197
        promise.complete(null);
1✔
198
        return;
1✔
199
      }
200
      attempt++;
1✔
201
      // log a problem after BLOCKED_REPORTING_THRESHOLD attempts only
202
      if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
1✔
203
        // and repeat every BLOCKED_REPORTING_PERIOD attempts
204
        if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
×
205
          onSlowTermination();
×
206
        }
207
      }
208
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
209
    }
1✔
210

211
    abstract boolean isTerminated();
212

213
    abstract void onSlowTermination();
214

215
    abstract void onSuccessfulTermination();
216

217
    /** Called only if {@link #onSlowTermination()} was called before */
218
    abstract void onSlowSuccessfulTermination();
219
  }
220

221
  private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
222
    private final ExecutorService executorToShutdown;
223
    private final String executorName;
224

225
    public ExecutorReportingDelayShutdown(
226
        ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
1✔
227
      super(promise);
1✔
228
      this.executorToShutdown = executorToShutdown;
1✔
229
      this.executorName = executorName;
1✔
230
    }
1✔
231

232
    @Override
233
    boolean isTerminated() {
234
      return executorToShutdown.isTerminated();
1✔
235
    }
236

237
    @Override
238
    void onSlowTermination() {
239
      log.warn(
×
240
          "Graceful shutdown of {} is blocked by one of the long currently processing tasks",
241
          executorName);
242
    }
×
243

244
    @Override
245
    void onSuccessfulTermination() {}
1✔
246

247
    @Override
248
    void onSlowSuccessfulTermination() {
249
      log.warn("{} successfully terminated", executorName);
×
250
    }
×
251
  }
252

253
  private class SemaphoreReportingDelayShutdown extends ReportingDelayShutdown {
254
    private final Semaphore semaphore;
255
    private final int initialSemaphorePermits;
256
    private final String semaphoreName;
257

258
    public SemaphoreReportingDelayShutdown(
259
        Semaphore semaphore,
260
        int initialSemaphorePermits,
261
        String semaphoreName,
262
        CompletableFuture<Void> promise) {
1✔
263
      super(promise);
1✔
264
      this.semaphore = semaphore;
1✔
265
      this.initialSemaphorePermits = initialSemaphorePermits;
1✔
266
      this.semaphoreName = semaphoreName;
1✔
267
    }
1✔
268

269
    @Override
270
    boolean isTerminated() {
271
      return semaphore.availablePermits() == initialSemaphorePermits;
1✔
272
    }
273

274
    @Override
275
    void onSlowTermination() {
276
      log.warn("Wait for release of slots of {} takes a long time", semaphoreName);
×
277
    }
×
278

279
    @Override
280
    void onSuccessfulTermination() {}
1✔
281

282
    @Override
283
    void onSlowSuccessfulTermination() {
284
      log.warn("All slots of {} were successfully released", semaphoreName);
×
285
    }
×
286
  }
287

288
  public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
289
    if (s == null) {
1✔
290
      return timeoutMillis;
1✔
291
    }
292
    return runAndGetRemainingTimeoutMs(
1✔
293
        timeoutMillis,
294
        () -> {
295
          try {
296
            boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
1✔
297
          } catch (InterruptedException e) {
×
298
            Thread.currentThread().interrupt();
×
299
          }
1✔
300
        });
1✔
301
  }
302

303
  public static long runAndGetRemainingTimeoutMs(long initialTimeoutMs, Runnable toRun) {
304
    long startedNs = System.nanoTime();
1✔
305
    try {
306
      toRun.run();
1✔
307
    } catch (Throwable e) {
×
308
      log.warn("Exception during waiting for termination", e);
×
309
    }
1✔
310
    long remainingTimeoutMs =
1✔
311
        initialTimeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNs);
1✔
312
    return remainingTimeoutMs < 0 ? 0 : remainingTimeoutMs;
1✔
313
  }
314

315
  public static long awaitTermination(@Nullable Shutdownable s, long timeoutMillis) {
316
    if (s == null) {
1✔
317
      return timeoutMillis;
1✔
318
    }
319
    return runAndGetRemainingTimeoutMs(
1✔
320
        timeoutMillis, () -> s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
1✔
321
  }
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc