• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #330

10 Oct 2024 04:35PM CUT coverage: 78.113% (-0.1%) from 78.238%
#330

push

github

web-flow
Test server support for bidi links (#2258)

* Test server support for bidi links

* typo

* license

* feedback

* link validation

* describe fields

* link validation

103 of 163 new or added lines in 3 files covered. (63.19%)

11 existing lines in 4 files now uncovered.

21332 of 27309 relevant lines covered (78.11%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.73
/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import java.io.Closeable;
24
import java.time.Duration;
25
import java.util.concurrent.*;
26
import javax.annotation.Nullable;
27
import org.slf4j.Logger;
28
import org.slf4j.LoggerFactory;
29

30
public class ShutdownManager implements Closeable {
1✔
31
  private static final Logger log = LoggerFactory.getLogger(ShutdownManager.class);
1✔
32

33
  private final ScheduledExecutorService scheduledExecutorService =
1✔
34
      Executors.newSingleThreadScheduledExecutor(
1✔
35
          new ExecutorThreadFactory(
36
              WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
37

38
  private static final int CHECK_PERIOD_MS = 250;
39

40
  /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */
41
  public CompletableFuture<Void> shutdownExecutorNow(
42
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
43
    executorToShutdown.shutdownNow();
1✔
44
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
45
  }
46

47
  /** executorToShutdown.shutdownNow() -&gt; unlimited wait for termination */
48
  public CompletableFuture<Void> shutdownExecutorNowUntimed(
49
      ExecutorService executorToShutdown, String executorName) {
50
    executorToShutdown.shutdownNow();
1✔
51
    return untimedWait(executorToShutdown, executorName);
1✔
52
  }
53

54
  /**
55
   * executorToShutdown.shutdown() -&gt; timed wait for graceful termination -&gt;
56
   * executorToShutdown.shutdownNow()
57
   */
58
  public CompletableFuture<Void> shutdownExecutor(
59
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
60
    executorToShutdown.shutdown();
1✔
61
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
62
  }
63

64
  /** executorToShutdown.shutdown() -&gt; unlimited wait for graceful termination */
65
  public CompletableFuture<Void> shutdownExecutorUntimed(
66
      ExecutorService executorToShutdown, String executorName) {
67
    executorToShutdown.shutdown();
1✔
68
    return untimedWait(executorToShutdown, executorName);
1✔
69
  }
70

71
  public CompletableFuture<Void> waitForSupplierPermitsReleasedUnlimited(
72
      TrackingSlotSupplier<?> slotSupplier, String name) {
73
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
74
    scheduledExecutorService.submit(new SlotSupplierDelayShutdown(slotSupplier, name, future));
1✔
75
    return future;
1✔
76
  }
77

78
  /**
79
   * waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
80
   * sticky workflows
81
   */
82
  public CompletableFuture<Void> waitForStickyQueueBalancer(
83
      StickyQueueBalancer balancer, Duration timeout) {
84
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
85
    balancer.disableNormalPoll();
1✔
86
    scheduledExecutorService.schedule(
1✔
87
        () -> {
88
          future.complete(null);
1✔
89
        },
1✔
90
        timeout.toMillis(),
1✔
91
        TimeUnit.MILLISECONDS);
92
    return future;
1✔
93
  }
94

95
  /**
96
   * Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
97
   * when the executor is terminated.
98
   */
99
  private CompletableFuture<Void> untimedWait(
100
      ExecutorService executorToShutdown, String executorName) {
101
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
102
    scheduledExecutorService.submit(
1✔
103
        new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
104
    return future;
1✔
105
  }
106

107
  /**
108
   * Wait for {@code executorToShutdown} to terminate for a defined interval, shutdownNow after
109
   * that. Always completes the returned CompletableFuture on termination of the executor or on a
110
   * timeout, whatever happens earlier.
111
   */
112
  private CompletableFuture<Void> limitedWait(
113
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
114
    int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
1✔
115

116
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
117
    scheduledExecutorService.submit(
1✔
118
        new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
119
    return future;
1✔
120
  }
121

122
  @Override
123
  public void close() {
124
    scheduledExecutorService.shutdownNow();
1✔
125
  }
1✔
126

127
  private abstract class LimitedWaitShutdown implements Runnable {
128
    private final CompletableFuture<Void> promise;
129
    private final int maxAttempts;
130
    private int attempt;
131

132
    public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
1✔
133
      this.promise = promise;
1✔
134
      this.maxAttempts = maxAttempts;
1✔
135
    }
1✔
136

137
    @Override
138
    public void run() {
139
      if (isTerminated()) {
1✔
140
        onSuccessfulTermination();
1✔
141
        promise.complete(null);
1✔
142
        return;
1✔
143
      }
144
      attempt++;
1✔
145
      if (attempt > maxAttempts) {
1✔
146
        onAttemptExhaustion();
1✔
147
        // we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
148
        // so just log and complete the promise
149
        promise.complete(null);
1✔
150
        return;
1✔
151
      }
152
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
153
    }
1✔
154

155
    abstract boolean isTerminated();
156

157
    abstract void onAttemptExhaustion();
158

159
    abstract void onSuccessfulTermination();
160
  }
161

162
  private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
163
    private final ExecutorService executorToShutdown;
164
    private final String executorName;
165

166
    public ExecutorLimitedWaitShutdown(
167
        ExecutorService executorToShutdown,
168
        int maxAttempts,
169
        String executorName,
170
        CompletableFuture<Void> promise) {
1✔
171
      super(maxAttempts, promise);
1✔
172
      this.executorToShutdown = executorToShutdown;
1✔
173
      this.executorName = executorName;
1✔
174
    }
1✔
175

176
    @Override
177
    boolean isTerminated() {
178
      return executorToShutdown.isTerminated();
1✔
179
    }
180

181
    @Override
182
    void onAttemptExhaustion() {
183
      log.warn(
1✔
184
          "Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
185
      executorToShutdown.shutdownNow();
1✔
186
    }
1✔
187

188
    @Override
189
    void onSuccessfulTermination() {}
1✔
190
  }
191

192
  private abstract class ReportingDelayShutdown implements Runnable {
193
    // measures in attempts count, not in ms
194
    private static final int BLOCKED_REPORTING_THRESHOLD = 60;
195
    private static final int BLOCKED_REPORTING_PERIOD = 20;
196

197
    private final CompletableFuture<Void> promise;
198
    private int attempt;
199

200
    public ReportingDelayShutdown(CompletableFuture<Void> promise) {
1✔
201
      this.promise = promise;
1✔
202
    }
1✔
203

204
    @Override
205
    public void run() {
206
      if (isTerminated()) {
1✔
207
        if (attempt > BLOCKED_REPORTING_THRESHOLD) {
1✔
208
          onSlowSuccessfulTermination();
×
209
        } else {
210
          onSuccessfulTermination();
1✔
211
        }
212
        promise.complete(null);
1✔
213
        return;
1✔
214
      }
215
      attempt++;
1✔
216
      // log a problem after BLOCKED_REPORTING_THRESHOLD attempts only
217
      if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
1✔
218
        // and repeat every BLOCKED_REPORTING_PERIOD attempts
UNCOV
219
        if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
×
UNCOV
220
          onSlowTermination();
×
221
        }
222
      }
223
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
224
    }
1✔
225

226
    abstract boolean isTerminated();
227

228
    abstract void onSlowTermination();
229

230
    abstract void onSuccessfulTermination();
231

232
    /** Called only if {@link #onSlowTermination()} was called before */
233
    abstract void onSlowSuccessfulTermination();
234
  }
235

236
  private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
237
    private final ExecutorService executorToShutdown;
238
    private final String executorName;
239

240
    public ExecutorReportingDelayShutdown(
241
        ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
1✔
242
      super(promise);
1✔
243
      this.executorToShutdown = executorToShutdown;
1✔
244
      this.executorName = executorName;
1✔
245
    }
1✔
246

247
    @Override
248
    boolean isTerminated() {
249
      return executorToShutdown.isTerminated();
1✔
250
    }
251

252
    @Override
253
    void onSlowTermination() {
UNCOV
254
      log.warn(
×
255
          "Graceful shutdown of {} is blocked by one of the long currently processing tasks",
256
          executorName);
UNCOV
257
    }
×
258

259
    @Override
260
    void onSuccessfulTermination() {}
1✔
261

262
    @Override
263
    void onSlowSuccessfulTermination() {
264
      log.warn("{} successfully terminated", executorName);
×
265
    }
×
266
  }
267

268
  private class SlotSupplierDelayShutdown extends ReportingDelayShutdown {
269
    private final TrackingSlotSupplier<?> slotSupplier;
270
    private final String name;
271

272
    public SlotSupplierDelayShutdown(
273
        TrackingSlotSupplier<?> supplier, String name, CompletableFuture<Void> promise) {
1✔
274
      super(promise);
1✔
275
      this.slotSupplier = supplier;
1✔
276
      this.name = name;
1✔
277
    }
1✔
278

279
    @Override
280
    boolean isTerminated() {
281
      return slotSupplier.getIssuedSlots() == 0;
1✔
282
    }
283

284
    @Override
285
    void onSlowTermination() {
286
      log.warn("Wait for release of slots of {} takes a long time", name);
×
287
    }
×
288

289
    @Override
290
    void onSuccessfulTermination() {}
1✔
291

292
    @Override
293
    void onSlowSuccessfulTermination() {
294
      log.warn("All slots of {} were successfully released", name);
×
295
    }
×
296
  }
297

298
  public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
299
    if (s == null) {
1✔
300
      return timeoutMillis;
1✔
301
    }
302
    return runAndGetRemainingTimeoutMs(
1✔
303
        timeoutMillis,
304
        () -> {
305
          try {
306
            boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
1✔
307
          } catch (InterruptedException e) {
×
308
            Thread.currentThread().interrupt();
×
309
          }
1✔
310
        });
1✔
311
  }
312

313
  public static long runAndGetRemainingTimeoutMs(long initialTimeoutMs, Runnable toRun) {
314
    long startedNs = System.nanoTime();
1✔
315
    try {
316
      toRun.run();
1✔
317
    } catch (Throwable e) {
×
318
      log.warn("Exception during waiting for termination", e);
×
319
    }
1✔
320
    long remainingTimeoutMs =
1✔
321
        initialTimeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNs);
1✔
322
    return remainingTimeoutMs < 0 ? 0 : remainingTimeoutMs;
1✔
323
  }
324

325
  public static long awaitTermination(@Nullable Shutdownable s, long timeoutMillis) {
326
    if (s == null) {
1✔
327
      return timeoutMillis;
1✔
328
    }
329
    return runAndGetRemainingTimeoutMs(
1✔
330
        timeoutMillis, () -> s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
1✔
331
  }
332
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc