• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #331

10 Oct 2024 05:57PM UTC coverage: 78.105% (-0.008%) from 78.113%
#331

push

github

web-flow
Call shutdown RPC on worker shutdown (#2264)

Call shutdownWorker on worker shutdown

38 of 44 new or added lines in 2 files covered. (86.36%)

14 existing lines in 3 files now uncovered.

21350 of 27335 relevant lines covered (78.1%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.4
/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import com.google.common.util.concurrent.ListenableFuture;
24
import io.grpc.Status;
25
import io.grpc.StatusRuntimeException;
26
import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse;
27
import java.io.Closeable;
28
import java.time.Duration;
29
import java.util.concurrent.*;
30
import javax.annotation.Nullable;
31
import org.slf4j.Logger;
32
import org.slf4j.LoggerFactory;
33

34
public class ShutdownManager implements Closeable {
1✔
35
  private static final Logger log = LoggerFactory.getLogger(ShutdownManager.class);
1✔
36

37
  private final ScheduledExecutorService scheduledExecutorService =
1✔
38
      Executors.newSingleThreadScheduledExecutor(
1✔
39
          new ExecutorThreadFactory(
40
              WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
41

42
  private static final int CHECK_PERIOD_MS = 250;
43

44
  /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */
45
  public CompletableFuture<Void> shutdownExecutorNow(
46
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
47
    executorToShutdown.shutdownNow();
1✔
48
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
49
  }
50

51
  /** executorToShutdown.shutdownNow() -&gt; unlimited wait for termination */
52
  public CompletableFuture<Void> shutdownExecutorNowUntimed(
53
      ExecutorService executorToShutdown, String executorName) {
54
    executorToShutdown.shutdownNow();
1✔
55
    return untimedWait(executorToShutdown, executorName);
1✔
56
  }
57

58
  /**
59
   * executorToShutdown.shutdown() -&gt; timed wait for graceful termination -&gt;
60
   * executorToShutdown.shutdownNow()
61
   */
62
  public CompletableFuture<Void> shutdownExecutor(
63
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
64
    executorToShutdown.shutdown();
1✔
65
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
66
  }
67

68
  /** executorToShutdown.shutdown() -&gt; unlimited wait for graceful termination */
69
  public CompletableFuture<Void> shutdownExecutorUntimed(
70
      ExecutorService executorToShutdown, String executorName) {
71
    executorToShutdown.shutdown();
1✔
72
    return untimedWait(executorToShutdown, executorName);
1✔
73
  }
74

75
  public CompletableFuture<Void> waitForSupplierPermitsReleasedUnlimited(
76
      TrackingSlotSupplier<?> slotSupplier, String name) {
77
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
78
    scheduledExecutorService.submit(new SlotSupplierDelayShutdown(slotSupplier, name, future));
1✔
79
    return future;
1✔
80
  }
81

82
  /**
83
   * waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
84
   * sticky workflows
85
   */
86
  public CompletableFuture<Void> waitForStickyQueueBalancer(
87
      StickyQueueBalancer balancer, Duration timeout) {
88
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
89
    balancer.disableNormalPoll();
1✔
90
    scheduledExecutorService.schedule(
1✔
91
        () -> {
92
          future.complete(null);
1✔
93
        },
1✔
94
        timeout.toMillis(),
1✔
95
        TimeUnit.MILLISECONDS);
96
    return future;
1✔
97
  }
98

99
  /**
100
   * Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
101
   * when the executor is terminated.
102
   */
103
  private CompletableFuture<Void> untimedWait(
104
      ExecutorService executorToShutdown, String executorName) {
105
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
106
    scheduledExecutorService.submit(
1✔
107
        new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
108
    return future;
1✔
109
  }
110

111
  /**
112
   * Wait for {@code executorToShutdown} to terminate for a defined interval, shutdownNow after
113
   * that. Always completes the returned CompletableFuture on termination of the executor or on a
114
   * timeout, whatever happens earlier.
115
   */
116
  private CompletableFuture<Void> limitedWait(
117
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
118
    int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
1✔
119

120
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
121
    scheduledExecutorService.submit(
1✔
122
        new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
123
    return future;
1✔
124
  }
125

126
  /**
127
   * Wait for {@code shutdownRequest} to finish. shutdownRequest is considered best effort, so we do
128
   * not fail the shutdown if it fails.
129
   */
130
  public CompletableFuture<Void> waitOnWorkerShutdownRequest(
131
      ListenableFuture<ShutdownWorkerResponse> shutdownRequest) {
132
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
133
    shutdownRequest.addListener(
1✔
134
        () -> {
135
          try {
136
            shutdownRequest.get();
1✔
NEW
137
          } catch (StatusRuntimeException e) {
×
138
            // If the server does not support shutdown, ignore the exception
NEW
139
            if (Status.Code.UNIMPLEMENTED.equals(e.getStatus().getCode())) {
×
NEW
140
              return;
×
141
            }
NEW
142
            log.warn("failed to call shutdown worker", e);
×
143
          } catch (Exception e) {
1✔
144
            log.warn("failed to call shutdown worker", e);
1✔
145
          } finally {
146
            future.complete(null);
1✔
147
          }
148
        },
1✔
149
        scheduledExecutorService);
150
    return future;
1✔
151
  }
152

153
  @Override
154
  public void close() {
155
    scheduledExecutorService.shutdownNow();
1✔
156
  }
1✔
157

158
  private abstract class LimitedWaitShutdown implements Runnable {
159
    private final CompletableFuture<Void> promise;
160
    private final int maxAttempts;
161
    private int attempt;
162

163
    public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
1✔
164
      this.promise = promise;
1✔
165
      this.maxAttempts = maxAttempts;
1✔
166
    }
1✔
167

168
    @Override
169
    public void run() {
170
      if (isTerminated()) {
1✔
171
        onSuccessfulTermination();
1✔
172
        promise.complete(null);
1✔
173
        return;
1✔
174
      }
175
      attempt++;
1✔
176
      if (attempt > maxAttempts) {
1✔
177
        onAttemptExhaustion();
1✔
178
        // we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
179
        // so just log and complete the promise
180
        promise.complete(null);
1✔
181
        return;
1✔
182
      }
183
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
184
    }
1✔
185

186
    abstract boolean isTerminated();
187

188
    abstract void onAttemptExhaustion();
189

190
    abstract void onSuccessfulTermination();
191
  }
192

193
  private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
194
    private final ExecutorService executorToShutdown;
195
    private final String executorName;
196

197
    public ExecutorLimitedWaitShutdown(
198
        ExecutorService executorToShutdown,
199
        int maxAttempts,
200
        String executorName,
201
        CompletableFuture<Void> promise) {
1✔
202
      super(maxAttempts, promise);
1✔
203
      this.executorToShutdown = executorToShutdown;
1✔
204
      this.executorName = executorName;
1✔
205
    }
1✔
206

207
    @Override
208
    boolean isTerminated() {
209
      return executorToShutdown.isTerminated();
1✔
210
    }
211

212
    @Override
213
    void onAttemptExhaustion() {
214
      log.warn(
1✔
215
          "Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
216
      executorToShutdown.shutdownNow();
1✔
217
    }
1✔
218

219
    @Override
220
    void onSuccessfulTermination() {}
1✔
221
  }
222

223
  private abstract class ReportingDelayShutdown implements Runnable {
224
    // measures in attempts count, not in ms
225
    private static final int BLOCKED_REPORTING_THRESHOLD = 60;
226
    private static final int BLOCKED_REPORTING_PERIOD = 20;
227

228
    private final CompletableFuture<Void> promise;
229
    private int attempt;
230

231
    public ReportingDelayShutdown(CompletableFuture<Void> promise) {
1✔
232
      this.promise = promise;
1✔
233
    }
1✔
234

235
    @Override
236
    public void run() {
237
      if (isTerminated()) {
1✔
238
        if (attempt > BLOCKED_REPORTING_THRESHOLD) {
1✔
239
          onSlowSuccessfulTermination();
1✔
240
        } else {
241
          onSuccessfulTermination();
1✔
242
        }
243
        promise.complete(null);
1✔
244
        return;
1✔
245
      }
246
      attempt++;
1✔
247
      // log a problem after BLOCKED_REPORTING_THRESHOLD attempts only
248
      if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
1✔
249
        // and repeat every BLOCKED_REPORTING_PERIOD attempts
250
        if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
1✔
251
          onSlowTermination();
1✔
252
        }
253
      }
254
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
255
    }
1✔
256

257
    abstract boolean isTerminated();
258

259
    abstract void onSlowTermination();
260

261
    abstract void onSuccessfulTermination();
262

263
    /** Called only if {@link #onSlowTermination()} was called before */
264
    abstract void onSlowSuccessfulTermination();
265
  }
266

267
  private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
268
    private final ExecutorService executorToShutdown;
269
    private final String executorName;
270

271
    public ExecutorReportingDelayShutdown(
272
        ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
1✔
273
      super(promise);
1✔
274
      this.executorToShutdown = executorToShutdown;
1✔
275
      this.executorName = executorName;
1✔
276
    }
1✔
277

278
    @Override
279
    boolean isTerminated() {
280
      return executorToShutdown.isTerminated();
1✔
281
    }
282

283
    @Override
284
    void onSlowTermination() {
285
      log.warn(
1✔
286
          "Graceful shutdown of {} is blocked by one of the long currently processing tasks",
287
          executorName);
288
    }
1✔
289

290
    @Override
291
    void onSuccessfulTermination() {}
1✔
292

293
    @Override
294
    void onSlowSuccessfulTermination() {
295
      log.warn("{} successfully terminated", executorName);
1✔
296
    }
1✔
297
  }
298

299
  private class SlotSupplierDelayShutdown extends ReportingDelayShutdown {
300
    private final TrackingSlotSupplier<?> slotSupplier;
301
    private final String name;
302

303
    public SlotSupplierDelayShutdown(
304
        TrackingSlotSupplier<?> supplier, String name, CompletableFuture<Void> promise) {
1✔
305
      super(promise);
1✔
306
      this.slotSupplier = supplier;
1✔
307
      this.name = name;
1✔
308
    }
1✔
309

310
    @Override
311
    boolean isTerminated() {
312
      return slotSupplier.getIssuedSlots() == 0;
1✔
313
    }
314

315
    @Override
316
    void onSlowTermination() {
317
      log.warn("Wait for release of slots of {} takes a long time", name);
×
318
    }
×
319

320
    @Override
321
    void onSuccessfulTermination() {}
1✔
322

323
    @Override
324
    void onSlowSuccessfulTermination() {
325
      log.warn("All slots of {} were successfully released", name);
×
326
    }
×
327
  }
328

329
  public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
330
    if (s == null) {
1✔
331
      return timeoutMillis;
1✔
332
    }
333
    return runAndGetRemainingTimeoutMs(
1✔
334
        timeoutMillis,
335
        () -> {
336
          try {
337
            boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
1✔
338
          } catch (InterruptedException e) {
×
339
            Thread.currentThread().interrupt();
×
340
          }
1✔
341
        });
1✔
342
  }
343

344
  public static long runAndGetRemainingTimeoutMs(long initialTimeoutMs, Runnable toRun) {
345
    long startedNs = System.nanoTime();
1✔
346
    try {
347
      toRun.run();
1✔
348
    } catch (Throwable e) {
×
349
      log.warn("Exception during waiting for termination", e);
×
350
    }
1✔
351
    long remainingTimeoutMs =
1✔
352
        initialTimeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNs);
1✔
353
    return remainingTimeoutMs < 0 ? 0 : remainingTimeoutMs;
1✔
354
  }
355

356
  public static long awaitTermination(@Nullable Shutdownable s, long timeoutMillis) {
357
    if (s == null) {
1✔
358
      return timeoutMillis;
1✔
359
    }
360
    return runAndGetRemainingTimeoutMs(
1✔
361
        timeoutMillis, () -> s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
1✔
362
  }
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc