• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

temporalio / sdk-java / #335

21 Oct 2024 02:49PM UTC coverage: 78.67% (-0.02%) from 78.687%
#335

push

github

web-flow
Avoid warning from un-accessed operation promise (#2280)

1 of 1 new or added line in 1 file covered. (100.0%)

14 existing lines in 5 files now uncovered.

22679 of 28828 relevant lines covered (78.67%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/temporal-sdk/src/main/java/io/temporal/internal/worker/ShutdownManager.java
1
/*
2
 * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3
 *
4
 * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5
 *
6
 * Modifications copyright (C) 2017 Uber Technologies, Inc.
7
 *
8
 * Licensed under the Apache License, Version 2.0 (the "License");
9
 * you may not use this material except in compliance with the License.
10
 * You may obtain a copy of the License at
11
 *
12
 *   http://www.apache.org/licenses/LICENSE-2.0
13
 *
14
 * Unless required by applicable law or agreed to in writing, software
15
 * distributed under the License is distributed on an "AS IS" BASIS,
16
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17
 * See the License for the specific language governing permissions and
18
 * limitations under the License.
19
 */
20

21
package io.temporal.internal.worker;
22

23
import static io.temporal.internal.common.GrpcUtils.isChannelShutdownException;
24

25
import com.google.common.util.concurrent.ListenableFuture;
26
import io.grpc.Status;
27
import io.grpc.StatusRuntimeException;
28
import io.temporal.api.workflowservice.v1.ShutdownWorkerResponse;
29
import java.io.Closeable;
30
import java.time.Duration;
31
import java.util.concurrent.*;
32
import javax.annotation.Nullable;
33
import org.slf4j.Logger;
34
import org.slf4j.LoggerFactory;
35

36
public class ShutdownManager implements Closeable {
1✔
37
  private static final Logger log = LoggerFactory.getLogger(ShutdownManager.class);
1✔
38

39
  private final ScheduledExecutorService scheduledExecutorService =
1✔
40
      Executors.newSingleThreadScheduledExecutor(
1✔
41
          new ExecutorThreadFactory(
42
              WorkerThreadsNameHelper.SHUTDOWN_MANAGER_THREAD_NAME_PREFIX, null));
43

44
  private static final int CHECK_PERIOD_MS = 250;
45

46
  /** executorToShutdown.shutdownNow() -> timed wait for a graceful termination */
47
  public CompletableFuture<Void> shutdownExecutorNow(
48
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
49
    executorToShutdown.shutdownNow();
1✔
50
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
51
  }
52

53
  /** executorToShutdown.shutdownNow() -&gt; unlimited wait for termination */
54
  public CompletableFuture<Void> shutdownExecutorNowUntimed(
55
      ExecutorService executorToShutdown, String executorName) {
56
    executorToShutdown.shutdownNow();
1✔
57
    return untimedWait(executorToShutdown, executorName);
1✔
58
  }
59

60
  /**
61
   * executorToShutdown.shutdown() -&gt; timed wait for graceful termination -&gt;
62
   * executorToShutdown.shutdownNow()
63
   */
64
  public CompletableFuture<Void> shutdownExecutor(
65
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
66
    executorToShutdown.shutdown();
1✔
67
    return limitedWait(executorToShutdown, executorName, timeout);
1✔
68
  }
69

70
  /** executorToShutdown.shutdown() -&gt; unlimited wait for graceful termination */
71
  public CompletableFuture<Void> shutdownExecutorUntimed(
72
      ExecutorService executorToShutdown, String executorName) {
73
    executorToShutdown.shutdown();
1✔
74
    return untimedWait(executorToShutdown, executorName);
1✔
75
  }
76

77
  public CompletableFuture<Void> waitForSupplierPermitsReleasedUnlimited(
78
      TrackingSlotSupplier<?> slotSupplier, String name) {
79
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
80
    scheduledExecutorService.submit(new SlotSupplierDelayShutdown(slotSupplier, name, future));
1✔
81
    return future;
1✔
82
  }
83

84
  /**
85
   * waitForStickyQueueBalancer -&gt; disableNormalPoll -&gt; timed wait for graceful completion of
86
   * sticky workflows
87
   */
88
  public CompletableFuture<Void> waitForStickyQueueBalancer(
89
      StickyQueueBalancer balancer, Duration timeout) {
90
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
91
    balancer.disableNormalPoll();
1✔
92
    scheduledExecutorService.schedule(
1✔
93
        () -> {
94
          future.complete(null);
1✔
95
        },
1✔
96
        timeout.toMillis(),
1✔
97
        TimeUnit.MILLISECONDS);
98
    return future;
1✔
99
  }
100

101
  /**
102
   * Wait for {@code executorToShutdown} to terminate. Only completes the returned CompletableFuture
103
   * when the executor is terminated.
104
   */
105
  private CompletableFuture<Void> untimedWait(
106
      ExecutorService executorToShutdown, String executorName) {
107
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
108
    scheduledExecutorService.submit(
1✔
109
        new ExecutorReportingDelayShutdown(executorToShutdown, executorName, future));
110
    return future;
1✔
111
  }
112

113
  /**
114
   * Wait for {@code executorToShutdown} to terminate for a defined interval, shutdownNow after
115
   * that. Always completes the returned CompletableFuture on termination of the executor or on a
116
   * timeout, whatever happens earlier.
117
   */
118
  private CompletableFuture<Void> limitedWait(
119
      ExecutorService executorToShutdown, String executorName, Duration timeout) {
120
    int attempts = (int) Math.ceil((double) timeout.toMillis() / CHECK_PERIOD_MS);
1✔
121

122
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
123
    scheduledExecutorService.submit(
1✔
124
        new ExecutorLimitedWaitShutdown(executorToShutdown, attempts, executorName, future));
125
    return future;
1✔
126
  }
127

128
  /**
129
   * Wait for {@code shutdownRequest} to finish. shutdownRequest is considered best effort, so we do
130
   * not fail the shutdown if it fails.
131
   */
132
  public CompletableFuture<Void> waitOnWorkerShutdownRequest(
133
      ListenableFuture<ShutdownWorkerResponse> shutdownRequest) {
134
    CompletableFuture<Void> future = new CompletableFuture<>();
1✔
135
    shutdownRequest.addListener(
1✔
136
        () -> {
137
          try {
138
            shutdownRequest.get();
1✔
139
          } catch (Exception e) {
1✔
140
            if (e instanceof ExecutionException) {
1✔
141
              e = (Exception) e.getCause();
1✔
142
            }
143
            if (e instanceof StatusRuntimeException) {
1✔
144
              // If the server does not support shutdown, ignore the exception
145
              if (Status.Code.UNIMPLEMENTED.equals(
1✔
146
                      ((StatusRuntimeException) e).getStatus().getCode())
1✔
147
                  || isChannelShutdownException((StatusRuntimeException) e)) {
1✔
148
                return;
1✔
149
              }
150
            }
151
            log.warn("failed to call shutdown worker", e);
×
152
          } finally {
153
            future.complete(null);
1✔
154
          }
155
        },
1✔
156
        scheduledExecutorService);
157
    return future;
1✔
158
  }
159

160
  @Override
161
  public void close() {
162
    scheduledExecutorService.shutdownNow();
1✔
163
  }
1✔
164

165
  private abstract class LimitedWaitShutdown implements Runnable {
166
    private final CompletableFuture<Void> promise;
167
    private final int maxAttempts;
168
    private int attempt;
169

170
    public LimitedWaitShutdown(int maxAttempts, CompletableFuture<Void> promise) {
1✔
171
      this.promise = promise;
1✔
172
      this.maxAttempts = maxAttempts;
1✔
173
    }
1✔
174

175
    @Override
176
    public void run() {
177
      if (isTerminated()) {
1✔
178
        onSuccessfulTermination();
1✔
179
        promise.complete(null);
1✔
180
        return;
1✔
181
      }
182
      attempt++;
1✔
183
      if (attempt > maxAttempts) {
1✔
184
        onAttemptExhaustion();
1✔
185
        // we don't want to complicate shutdown with dealing of exceptions and errors of all sorts,
186
        // so just log and complete the promise
187
        promise.complete(null);
1✔
188
        return;
1✔
189
      }
190
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
191
    }
1✔
192

193
    abstract boolean isTerminated();
194

195
    abstract void onAttemptExhaustion();
196

197
    abstract void onSuccessfulTermination();
198
  }
199

200
  private class ExecutorLimitedWaitShutdown extends LimitedWaitShutdown {
201
    private final ExecutorService executorToShutdown;
202
    private final String executorName;
203

204
    public ExecutorLimitedWaitShutdown(
205
        ExecutorService executorToShutdown,
206
        int maxAttempts,
207
        String executorName,
208
        CompletableFuture<Void> promise) {
1✔
209
      super(maxAttempts, promise);
1✔
210
      this.executorToShutdown = executorToShutdown;
1✔
211
      this.executorName = executorName;
1✔
212
    }
1✔
213

214
    @Override
215
    boolean isTerminated() {
216
      return executorToShutdown.isTerminated();
1✔
217
    }
218

219
    @Override
220
    void onAttemptExhaustion() {
221
      log.warn(
1✔
222
          "Wait for a graceful shutdown of {} timed out, fallback to shutdownNow()", executorName);
223
      executorToShutdown.shutdownNow();
1✔
224
    }
1✔
225

226
    @Override
227
    void onSuccessfulTermination() {}
1✔
228
  }
229

230
  private abstract class ReportingDelayShutdown implements Runnable {
231
    // measures in attempts count, not in ms
232
    private static final int BLOCKED_REPORTING_THRESHOLD = 60;
233
    private static final int BLOCKED_REPORTING_PERIOD = 20;
234

235
    private final CompletableFuture<Void> promise;
236
    private int attempt;
237

238
    public ReportingDelayShutdown(CompletableFuture<Void> promise) {
1✔
239
      this.promise = promise;
1✔
240
    }
1✔
241

242
    @Override
243
    public void run() {
244
      if (isTerminated()) {
1✔
245
        if (attempt > BLOCKED_REPORTING_THRESHOLD) {
1✔
246
          onSlowSuccessfulTermination();
×
247
        } else {
248
          onSuccessfulTermination();
1✔
249
        }
250
        promise.complete(null);
1✔
251
        return;
1✔
252
      }
253
      attempt++;
1✔
254
      // log a problem after BLOCKED_REPORTING_THRESHOLD attempts only
255
      if (attempt >= BLOCKED_REPORTING_THRESHOLD) {
1✔
256
        // and repeat every BLOCKED_REPORTING_PERIOD attempts
UNCOV
257
        if (((float) (attempt - BLOCKED_REPORTING_THRESHOLD) % BLOCKED_REPORTING_PERIOD) < 0.001) {
×
UNCOV
258
          onSlowTermination();
×
259
        }
260
      }
261
      scheduledExecutorService.schedule(this, CHECK_PERIOD_MS, TimeUnit.MILLISECONDS);
1✔
262
    }
1✔
263

264
    abstract boolean isTerminated();
265

266
    abstract void onSlowTermination();
267

268
    abstract void onSuccessfulTermination();
269

270
    /** Called only if {@link #onSlowTermination()} was called before */
271
    abstract void onSlowSuccessfulTermination();
272
  }
273

274
  private class ExecutorReportingDelayShutdown extends ReportingDelayShutdown {
275
    private final ExecutorService executorToShutdown;
276
    private final String executorName;
277

278
    public ExecutorReportingDelayShutdown(
279
        ExecutorService executorToShutdown, String executorName, CompletableFuture<Void> promise) {
1✔
280
      super(promise);
1✔
281
      this.executorToShutdown = executorToShutdown;
1✔
282
      this.executorName = executorName;
1✔
283
    }
1✔
284

285
    @Override
286
    boolean isTerminated() {
287
      return executorToShutdown.isTerminated();
1✔
288
    }
289

290
    @Override
291
    void onSlowTermination() {
UNCOV
292
      log.warn(
×
293
          "Graceful shutdown of {} is blocked by one of the long currently processing tasks",
294
          executorName);
UNCOV
295
    }
×
296

297
    @Override
298
    void onSuccessfulTermination() {}
1✔
299

300
    @Override
301
    void onSlowSuccessfulTermination() {
302
      log.warn("{} successfully terminated", executorName);
×
303
    }
×
304
  }
305

306
  private class SlotSupplierDelayShutdown extends ReportingDelayShutdown {
307
    private final TrackingSlotSupplier<?> slotSupplier;
308
    private final String name;
309

310
    public SlotSupplierDelayShutdown(
311
        TrackingSlotSupplier<?> supplier, String name, CompletableFuture<Void> promise) {
1✔
312
      super(promise);
1✔
313
      this.slotSupplier = supplier;
1✔
314
      this.name = name;
1✔
315
    }
1✔
316

317
    @Override
318
    boolean isTerminated() {
319
      return slotSupplier.getIssuedSlots() == 0;
1✔
320
    }
321

322
    @Override
323
    void onSlowTermination() {
324
      log.warn("Wait for release of slots of {} takes a long time", name);
×
325
    }
×
326

327
    @Override
328
    void onSuccessfulTermination() {}
1✔
329

330
    @Override
331
    void onSlowSuccessfulTermination() {
332
      log.warn("All slots of {} were successfully released", name);
×
333
    }
×
334
  }
335

336
  public static long awaitTermination(@Nullable ExecutorService s, long timeoutMillis) {
337
    if (s == null) {
1✔
338
      return timeoutMillis;
1✔
339
    }
340
    return runAndGetRemainingTimeoutMs(
1✔
341
        timeoutMillis,
342
        () -> {
343
          try {
344
            boolean ignored = s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
1✔
345
          } catch (InterruptedException e) {
×
346
            Thread.currentThread().interrupt();
×
347
          }
1✔
348
        });
1✔
349
  }
350

351
  public static long runAndGetRemainingTimeoutMs(long initialTimeoutMs, Runnable toRun) {
352
    long startedNs = System.nanoTime();
1✔
353
    try {
354
      toRun.run();
1✔
355
    } catch (Throwable e) {
×
356
      log.warn("Exception during waiting for termination", e);
×
357
    }
1✔
358
    long remainingTimeoutMs =
1✔
359
        initialTimeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startedNs);
1✔
360
    return remainingTimeoutMs < 0 ? 0 : remainingTimeoutMs;
1✔
361
  }
362

363
  public static long awaitTermination(@Nullable Shutdownable s, long timeoutMillis) {
364
    if (s == null) {
1✔
365
      return timeoutMillis;
1✔
366
    }
367
    return runAndGetRemainingTimeoutMs(
1✔
368
        timeoutMillis, () -> s.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS));
1✔
369
  }
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc