• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TAKETODAY / today-infrastructure / 8308118446

16 Mar 2024 01:26PM UTC coverage: 78.393% (+0.02%) from 78.374%
8308118446

push

github

TAKETODAY
:white_check_mark:

63592 of 86119 branches covered (73.84%)

Branch coverage included in aggregate %.

157000 of 195273 relevant lines covered (80.4%)

3.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.19
today-context/src/main/java/cn/taketoday/scheduling/concurrent/ThreadPoolTaskScheduler.java
1
/*
2
 * Copyright 2017 - 2024 the original author or authors.
3
 *
4
 * This program is free software: you can redistribute it and/or modify
5
 * it under the terms of the GNU General Public License as published by
6
 * the Free Software Foundation, either version 3 of the License, or
7
 * (at your option) any later version.
8
 *
9
 * This program is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU General Public License
15
 * along with this program. If not, see [https://www.gnu.org/licenses/]
16
 */
17

18
package cn.taketoday.scheduling.concurrent;
19

20
import java.time.Clock;
21
import java.time.Duration;
22
import java.time.Instant;
23
import java.util.concurrent.Callable;
24
import java.util.concurrent.Delayed;
25
import java.util.concurrent.ExecutionException;
26
import java.util.concurrent.Executor;
27
import java.util.concurrent.ExecutorService;
28
import java.util.concurrent.Future;
29
import java.util.concurrent.RejectedExecutionException;
30
import java.util.concurrent.RejectedExecutionHandler;
31
import java.util.concurrent.RunnableScheduledFuture;
32
import java.util.concurrent.ScheduledExecutorService;
33
import java.util.concurrent.ScheduledFuture;
34
import java.util.concurrent.ScheduledThreadPoolExecutor;
35
import java.util.concurrent.ThreadFactory;
36
import java.util.concurrent.TimeUnit;
37
import java.util.concurrent.TimeoutException;
38

39
import cn.taketoday.core.task.AsyncListenableTaskExecutor;
40
import cn.taketoday.core.task.TaskDecorator;
41
import cn.taketoday.core.task.TaskRejectedException;
42
import cn.taketoday.lang.Assert;
43
import cn.taketoday.lang.Nullable;
44
import cn.taketoday.scheduling.SchedulingTaskExecutor;
45
import cn.taketoday.scheduling.TaskScheduler;
46
import cn.taketoday.scheduling.Trigger;
47
import cn.taketoday.scheduling.support.TaskUtils;
48
import cn.taketoday.util.ConcurrentReferenceHashMap;
49
import cn.taketoday.util.ErrorHandler;
50
import cn.taketoday.util.concurrent.ListenableFuture;
51
import cn.taketoday.util.concurrent.ListenableFutureTask;
52

53
/**
54
 * A standard implementation of Infra {@link TaskScheduler} interface, wrapping
55
 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor} and providing
56
 * all applicable configuration options for it.
57
 *
58
 * @author Juergen Hoeller
59
 * @author Mark Fisher
60
 * @author <a href="https://github.com/TAKETODAY">Harry Yang</a>
61
 * @see #setPoolSize
62
 * @see #setRemoveOnCancelPolicy
63
 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
64
 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
65
 * @see #setThreadFactory
66
 * @see #setErrorHandler
67
 * @since 4.0
68
 */
69
@SuppressWarnings("serial")
70
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
2✔
71
        implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
72

73
  private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
3✔
74

75
  private volatile int poolSize = 1;
3✔
76

77
  private volatile boolean removeOnCancelPolicy;
78

79
  private volatile boolean continueExistingPeriodicTasksAfterShutdownPolicy;
80

81
  private volatile boolean executeExistingDelayedTasksAfterShutdownPolicy = true;
3✔
82

83
  @Nullable
84
  private TaskDecorator taskDecorator;
85

86
  @Nullable
87
  private volatile ErrorHandler errorHandler;
88

89
  private Clock clock = Clock.systemDefaultZone();
3✔
90

91
  @Nullable
92
  private ScheduledExecutorService scheduledExecutor;
93

94
  // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
95
  private final ConcurrentReferenceHashMap<Object, ListenableFuture<?>> listenableFutureMap =
8✔
96
          new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
97

98
  /**
99
   * Set the ScheduledExecutorService's pool size.
100
   * Default is 1.
101
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
102
   */
103
  public void setPoolSize(int poolSize) {
104
    Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
6!
105
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor threadPoolExecutor) {
6!
106
      threadPoolExecutor.setCorePoolSize(poolSize);
×
107
    }
108
    this.poolSize = poolSize;
3✔
109
  }
1✔
110

111
  /**
112
   * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor}.
113
   * <p>Default is {@code false}. If set to {@code true}, the target executor will be
114
   * switched into remove-on-cancel mode (if possible).
115
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
116
   *
117
   * @see ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy
118
   */
119
  public void setRemoveOnCancelPolicy(boolean flag) {
120
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor threadPoolExecutor) {
×
121
      threadPoolExecutor.setRemoveOnCancelPolicy(flag);
×
122
    }
123
    this.removeOnCancelPolicy = flag;
×
124
  }
×
125

126
  /**
127
   * Set whether to continue existing periodic tasks even when this executor has been shutdown.
128
   * <p>Default is {@code false}. If set to {@code true}, the target executor will be
129
   * switched into continuing periodic tasks (if possible).
130
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
131
   *
132
   * @see ScheduledThreadPoolExecutor#setContinueExistingPeriodicTasksAfterShutdownPolicy
133
   */
134
  public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean flag) {
135
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor threadPoolExecutor) {
×
136
      threadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(flag);
×
137
    }
138
    this.continueExistingPeriodicTasksAfterShutdownPolicy = flag;
×
139
  }
×
140

141
  /**
142
   * Set whether to execute existing delayed tasks even when this executor has been shutdown.
143
   * <p>Default is {@code true}. If set to {@code false}, the target executor will be
144
   * switched into dropping remaining tasks (if possible).
145
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
146
   *
147
   * @see ScheduledThreadPoolExecutor#setExecuteExistingDelayedTasksAfterShutdownPolicy
148
   */
149
  public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean flag) {
150
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor threadPoolExecutor) {
×
151
      threadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(flag);
×
152
    }
153
    this.executeExistingDelayedTasksAfterShutdownPolicy = flag;
×
154
  }
×
155

156
  /**
157
   * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
158
   * about to be executed.
159
   * <p>Note that such a decorator is not being applied to the user-supplied
160
   * {@code Runnable}/{@code Callable} but rather to the scheduled execution
161
   * callback (a wrapper around the user-supplied task).
162
   * <p>The primary use case is to set some execution context around the task's
163
   * invocation, or to provide some monitoring/statistics for task execution.
164
   */
165
  public void setTaskDecorator(TaskDecorator taskDecorator) {
166
    this.taskDecorator = taskDecorator;
3✔
167
  }
1✔
168

169
  /**
170
   * Set a custom {@link ErrorHandler} strategy.
171
   */
172
  public void setErrorHandler(ErrorHandler errorHandler) {
173
    this.errorHandler = errorHandler;
3✔
174
  }
1✔
175

176
  /**
177
   * Set the clock to use for scheduling purposes.
178
   * <p>The default clock is the system clock for the default time zone.
179
   *
180
   * @see Clock#systemDefaultZone()
181
   */
182
  public void setClock(Clock clock) {
183
    Assert.notNull(clock, "Clock is required");
×
184
    this.clock = clock;
×
185
  }
×
186

187
  @Override
188
  public Clock getClock() {
189
    return this.clock;
3✔
190
  }
191

192
  @Override
193
  protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
194
    this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
8✔
195

196
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor tpExecutor) {
9!
197
      if (this.removeOnCancelPolicy) {
3!
198
        tpExecutor.setRemoveOnCancelPolicy(true);
×
199
      }
200
      if (this.continueExistingPeriodicTasksAfterShutdownPolicy) {
3!
201
        tpExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
×
202
      }
203
      if (!this.executeExistingDelayedTasksAfterShutdownPolicy) {
3!
204
        tpExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
×
205
      }
206
    }
207

208
    return this.scheduledExecutor;
3✔
209
  }
210

211
  /**
212
   * Create a new {@link ScheduledExecutorService} instance.
213
   * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
214
   * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
215
   *
216
   * @param poolSize the specified pool size
217
   * @param threadFactory the ThreadFactory to use
218
   * @param rejectedExecutionHandler the RejectedExecutionHandler to use
219
   * @return a new ScheduledExecutorService instance
220
   * @see #afterPropertiesSet()
221
   * @see java.util.concurrent.ScheduledThreadPoolExecutor
222
   */
223
  protected ScheduledExecutorService createExecutor(
224
          int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
225

226
    return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler) {
17✔
227
      @Override
228
      protected void beforeExecute(Thread thread, Runnable task) {
229
        ThreadPoolTaskScheduler.this.beforeExecute(thread, task);
5✔
230
      }
1✔
231

232
      @Override
233
      protected void afterExecute(Runnable task, Throwable ex) {
234
        ThreadPoolTaskScheduler.this.afterExecute(task, ex);
5✔
235
      }
1✔
236

237
      @Override
238
      protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
239
        return decorateTaskIfNecessary(task);
5✔
240
      }
241

242
      @Override
243
      protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
244
        return decorateTaskIfNecessary(task);
5✔
245
      }
246
    };
247
  }
248

249
  /**
250
   * Return the underlying ScheduledExecutorService for native access.
251
   *
252
   * @return the underlying ScheduledExecutorService (never {@code null})
253
   * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
254
   */
255
  public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
256
    Assert.state(scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
7!
257
    return scheduledExecutor;
3✔
258
  }
259

260
  /**
261
   * Return the underlying ScheduledThreadPoolExecutor, if available.
262
   *
263
   * @return the underlying ScheduledExecutorService (never {@code null})
264
   * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
265
   * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor
266
   * @see #getScheduledExecutor()
267
   */
268
  public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
269
    Assert.state(scheduledExecutor instanceof ScheduledThreadPoolExecutor,
×
270
            "No ScheduledThreadPoolExecutor available");
271
    return (ScheduledThreadPoolExecutor) scheduledExecutor;
×
272
  }
273

274
  /**
275
   * Return the current pool size.
276
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
277
   *
278
   * @see #getScheduledThreadPoolExecutor()
279
   * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize()
280
   */
281
  public int getPoolSize() {
282
    if (scheduledExecutor == null) {
3!
283
      // Not initialized yet: assume initial pool size.
284
      return poolSize;
3✔
285
    }
286
    return getScheduledThreadPoolExecutor().getPoolSize();
×
287
  }
288

289
  /**
290
   * Return the number of currently active threads.
291
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
292
   *
293
   * @see #getScheduledThreadPoolExecutor()
294
   * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount()
295
   */
296
  public int getActiveCount() {
297
    if (scheduledExecutor == null) {
×
298
      // Not initialized yet: assume no active threads.
299
      return 0;
×
300
    }
301
    return getScheduledThreadPoolExecutor().getActiveCount();
×
302
  }
303

304
  /**
305
   * Return the current setting for the remove-on-cancel mode.
306
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
307
   */
308
  public boolean isRemoveOnCancelPolicy() {
309
    if (scheduledExecutor == null) {
×
310
      // Not initialized yet: return our setting for the time being.
311
      return removeOnCancelPolicy;
×
312
    }
313
    return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
×
314
  }
315

316
  // SchedulingTaskExecutor implementation
317

318
  @Override
319
  public void execute(Runnable task) {
320
    Executor executor = getScheduledExecutor();
3✔
321
    try {
322
      executor.execute(errorHandlingTask(task, false));
6✔
323
    }
324
    catch (RejectedExecutionException ex) {
×
325
      throw new TaskRejectedException(executor, task, ex);
×
326
    }
1✔
327
  }
1✔
328

329
  @Override
330
  public Future<?> submit(Runnable task) {
331
    ExecutorService executor = getScheduledExecutor();
3✔
332
    try {
333
      return executor.submit(errorHandlingTask(task, false));
7✔
334
    }
335
    catch (RejectedExecutionException ex) {
×
336
      throw new TaskRejectedException(executor, task, ex);
×
337
    }
338
  }
339

340
  @Override
341
  public <T> Future<T> submit(Callable<T> task) {
342
    ExecutorService executor = getScheduledExecutor();
3✔
343
    try {
344
      return executor.submit(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
9✔
345
    }
346
    catch (RejectedExecutionException ex) {
×
347
      throw new TaskRejectedException(executor, task, ex);
×
348
    }
349
  }
350

351
  @Override
352
  public ListenableFuture<?> submitListenable(Runnable task) {
353
    ExecutorService executor = getScheduledExecutor();
3✔
354
    try {
355
      ListenableFutureTask<Object> listenableFuture = new ListenableFutureTask<>(task, null);
6✔
356
      executeAndTrack(executor, listenableFuture);
4✔
357
      return listenableFuture;
2✔
358
    }
359
    catch (RejectedExecutionException ex) {
×
360
      throw new TaskRejectedException(executor, task, ex);
×
361
    }
362
  }
363

364
  @Override
365
  public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
366
    ExecutorService executor = getScheduledExecutor();
3✔
367
    try {
368
      ListenableFutureTask<T> listenableFuture = new ListenableFutureTask<>(task);
5✔
369
      executeAndTrack(executor, listenableFuture);
4✔
370
      return listenableFuture;
2✔
371
    }
372
    catch (RejectedExecutionException ex) {
×
373
      throw new TaskRejectedException(executor, task, ex);
×
374
    }
375
  }
376

377
  private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> listenableFuture) {
378
    Future<?> scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false));
7✔
379
    listenableFutureMap.put(scheduledFuture, listenableFuture);
6✔
380
    listenableFuture.addListener(future -> listenableFutureMap.remove(scheduledFuture));
12✔
381
  }
1✔
382

383
  @Override
384
  protected void cancelRemainingTask(Runnable task) {
385
    super.cancelRemainingTask(task);
3✔
386
    // Cancel associated user-level ListenableFuture handle as well
387
    ListenableFuture<?> listenableFuture = this.listenableFutureMap.get(task);
6✔
388
    if (listenableFuture != null) {
2✔
389
      listenableFuture.cancel(true);
4✔
390
    }
391
  }
1✔
392

393
  // TaskScheduler implementation
394

395
  @Override
396
  @Nullable
397
  public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
398
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
399
    try {
400
      ErrorHandler errorHandler = this.errorHandler;
3✔
401
      if (errorHandler == null) {
2!
402
        errorHandler = TaskUtils.getDefaultErrorHandler(true);
3✔
403
      }
404
      return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule();
11✔
405
    }
406
    catch (RejectedExecutionException ex) {
×
407
      throw new TaskRejectedException(executor, task, ex);
×
408
    }
409
  }
410

411
  @Override
412
  public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
413
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
414
    Duration delay = Duration.between(this.clock.instant(), startTime);
6✔
415
    try {
416
      return executor.schedule(errorHandlingTask(task, false), NANO.convert(delay), NANO);
11✔
417
    }
418
    catch (RejectedExecutionException ex) {
×
419
      throw new TaskRejectedException(executor, task, ex);
×
420
    }
421
  }
422

423
  @Override
424
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
425
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
426
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
427
    try {
428
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true),
9✔
429
              NANO.convert(initialDelay), NANO.convert(period), NANO);
5✔
430
    }
431
    catch (RejectedExecutionException ex) {
×
432
      throw new TaskRejectedException(executor, task, ex);
×
433
    }
434
  }
435

436
  @Override
437
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
438
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
439
    try {
440
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true),
10✔
441
              0, NANO.convert(period), NANO);
2✔
442
    }
443
    catch (RejectedExecutionException ex) {
×
444
      throw new TaskRejectedException(executor, task, ex);
×
445
    }
446
  }
447

448
  @Override
449
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
450
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
451
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
452
    try {
453
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true),
9✔
454
              NANO.convert(initialDelay), NANO.convert(delay), NANO);
5✔
455
    }
456
    catch (RejectedExecutionException ex) {
×
457
      throw new TaskRejectedException(executor, task, ex);
×
458
    }
459
  }
460

461
  @Override
462
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
463
    ScheduledExecutorService executor = getScheduledExecutor();
×
464
    try {
465
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true),
×
466
              0, NANO.convert(delay), NANO);
×
467
    }
468
    catch (RejectedExecutionException ex) {
×
469
      throw new TaskRejectedException(executor, task, ex);
×
470
    }
471
  }
472

473
  private <V> RunnableScheduledFuture<V> decorateTaskIfNecessary(RunnableScheduledFuture<V> future) {
474
    return taskDecorator != null ? new DelegatingRunnableScheduledFuture<>(future, taskDecorator) : future;
12✔
475
  }
476

477
  private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
478
    return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
6✔
479
  }
480

481
  private static class DelegatingRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
482

483
    private final RunnableScheduledFuture<V> future;
484

485
    private final Runnable decoratedRunnable;
486

487
    public DelegatingRunnableScheduledFuture(RunnableScheduledFuture<V> future, TaskDecorator taskDecorator) {
2✔
488
      this.future = future;
3✔
489
      this.decoratedRunnable = taskDecorator.decorate(this.future);
6✔
490
    }
1✔
491

492
    @Override
493
    public void run() {
494
      this.decoratedRunnable.run();
3✔
495
    }
1✔
496

497
    @Override
498
    public boolean cancel(boolean mayInterruptIfRunning) {
499
      return this.future.cancel(mayInterruptIfRunning);
5✔
500
    }
501

502
    @Override
503
    public boolean isCancelled() {
504
      return this.future.isCancelled();
4✔
505
    }
506

507
    @Override
508
    public boolean isDone() {
509
      return this.future.isDone();
4✔
510
    }
511

512
    @Override
513
    public V get() throws InterruptedException, ExecutionException {
514
      return this.future.get();
×
515
    }
516

517
    @Override
518
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
519
      return this.future.get(timeout, unit);
6✔
520
    }
521

522
    @Override
523
    public boolean isPeriodic() {
524
      return this.future.isPeriodic();
×
525
    }
526

527
    @Override
528
    public long getDelay(TimeUnit unit) {
529
      return this.future.getDelay(unit);
5✔
530
    }
531

532
    @Override
533
    public int compareTo(Delayed o) {
534
      return this.future.compareTo(o);
5✔
535
    }
536

537
  }
538

539
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc