• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TAKETODAY / today-infrastructure / 18487778446

14 Oct 2025 06:36AM UTC coverage: 83.339% (-0.06%) from 83.395%
18487778446

Pull #289

github

web-flow
Merge cc8b92d97 into a95a0c299
Pull Request #289: Netty HTTP 处理重构

60804 of 78045 branches covered (77.91%)

Branch coverage included in aggregate %.

143855 of 167528 relevant lines covered (85.87%)

3.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.24
today-context/src/main/java/infra/scheduling/concurrent/ThreadPoolTaskScheduler.java
1
/*
2
 * Copyright 2017 - 2025 the original author or authors.
3
 *
4
 * This program is free software: you can redistribute it and/or modify
5
 * it under the terms of the GNU General Public License as published by
6
 * the Free Software Foundation, either version 3 of the License, or
7
 * (at your option) any later version.
8
 *
9
 * This program is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU General Public License
15
 * along with this program. If not, see [https://www.gnu.org/licenses/]
16
 */
17

18
package infra.scheduling.concurrent;
19

20
import org.jspecify.annotations.Nullable;
21

22
import java.time.Clock;
23
import java.time.Duration;
24
import java.time.Instant;
25
import java.util.concurrent.Callable;
26
import java.util.concurrent.Delayed;
27
import java.util.concurrent.ExecutionException;
28
import java.util.concurrent.Executor;
29
import java.util.concurrent.ExecutorService;
30
import java.util.concurrent.RejectedExecutionException;
31
import java.util.concurrent.RejectedExecutionHandler;
32
import java.util.concurrent.RunnableScheduledFuture;
33
import java.util.concurrent.ScheduledExecutorService;
34
import java.util.concurrent.ScheduledFuture;
35
import java.util.concurrent.ScheduledThreadPoolExecutor;
36
import java.util.concurrent.ThreadFactory;
37
import java.util.concurrent.TimeUnit;
38
import java.util.concurrent.TimeoutException;
39

40
import infra.core.task.AsyncTaskExecutor;
41
import infra.core.task.TaskDecorator;
42
import infra.core.task.TaskRejectedException;
43
import infra.lang.Assert;
44
import infra.scheduling.SchedulingTaskExecutor;
45
import infra.scheduling.TaskScheduler;
46
import infra.scheduling.Trigger;
47
import infra.scheduling.support.TaskUtils;
48
import infra.util.ConcurrentReferenceHashMap;
49
import infra.util.ErrorHandler;
50
import infra.util.concurrent.Future;
51
import infra.util.concurrent.ListenableFutureTask;
52

53
/**
54
 * A standard implementation of Infra {@link TaskScheduler} interface, wrapping
55
 * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor} and providing
56
 * all applicable configuration options for it. The default number of scheduler
57
 * threads is 1; a higher number can be configured through {@link #setPoolSize}.
58
 *
59
 * <p>This is Infra traditional scheduler variant, staying as close as possible to
60
 * {@link java.util.concurrent.ScheduledExecutorService} semantics. Task execution happens
61
 * on the scheduler thread(s) rather than on separate execution threads. As a consequence,
62
 * a {@link ScheduledFuture} handle (e.g. from {@link #schedule(Runnable, Instant)})
63
 * represents the actual completion of the provided task (or series of repeated tasks).
64
 *
65
 * @author Juergen Hoeller
66
 * @author Mark Fisher
67
 * @author <a href="https://github.com/TAKETODAY">Harry Yang</a>
68
 * @see #setPoolSize
69
 * @see #setRemoveOnCancelPolicy
70
 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
71
 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
72
 * @see #setThreadFactory
73
 * @see #setErrorHandler
74
 * @since 4.0
75
 */
76
@SuppressWarnings("serial")
77
public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport implements AsyncTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
2✔
78

79
  private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
3✔
80

81
  private volatile int poolSize = 1;
3✔
82

83
  private volatile boolean removeOnCancelPolicy;
84

85
  private volatile boolean continueExistingPeriodicTasksAfterShutdownPolicy;
86

87
  private volatile boolean executeExistingDelayedTasksAfterShutdownPolicy = true;
3✔
88

89
  @Nullable
90
  private TaskDecorator taskDecorator;
91

92
  @Nullable
93
  private volatile ErrorHandler errorHandler;
94

95
  private Clock clock = Clock.systemDefaultZone();
3✔
96

97
  @Nullable
98
  private ScheduledExecutorService scheduledExecutor;
99

100
  // Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
101
  private final ConcurrentReferenceHashMap<Object, Future<?>> listenableFutureMap =
8✔
102
          new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
103

104
  /**
105
   * Set the ScheduledExecutorService's pool size.
106
   * Default is 1.
107
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
108
   */
109
  public void setPoolSize(int poolSize) {
110
    Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
6!
111
    if (scheduledExecutor instanceof ScheduledThreadPoolExecutor tpe) {
6!
112
      tpe.setCorePoolSize(poolSize);
×
113
    }
114
    this.poolSize = poolSize;
3✔
115
  }
1✔
116

117
  /**
118
   * Set the remove-on-cancel mode on {@link ScheduledThreadPoolExecutor}.
119
   * <p>Default is {@code false}. If set to {@code true}, the target executor will be
120
   * switched into remove-on-cancel mode (if possible).
121
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
122
   *
123
   * @see ScheduledThreadPoolExecutor#setRemoveOnCancelPolicy
124
   */
125
  public void setRemoveOnCancelPolicy(boolean flag) {
126
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor stpe) {
×
127
      stpe.setRemoveOnCancelPolicy(flag);
×
128
    }
129
    this.removeOnCancelPolicy = flag;
×
130
  }
×
131

132
  /**
133
   * Set whether to continue existing periodic tasks even when this executor has been shutdown.
134
   * <p>Default is {@code false}. If set to {@code true}, the target executor will be
135
   * switched into continuing periodic tasks (if possible).
136
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
137
   *
138
   * @see ScheduledThreadPoolExecutor#setContinueExistingPeriodicTasksAfterShutdownPolicy
139
   */
140
  public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean flag) {
141
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor tpe) {
×
142
      tpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(flag);
×
143
    }
144
    this.continueExistingPeriodicTasksAfterShutdownPolicy = flag;
×
145
  }
×
146

147
  /**
148
   * Set whether to execute existing delayed tasks even when this executor has been shutdown.
149
   * <p>Default is {@code true}. If set to {@code false}, the target executor will be
150
   * switched into dropping remaining tasks (if possible).
151
   * <p><b>This setting can be modified at runtime, for example through JMX.</b>
152
   *
153
   * @see ScheduledThreadPoolExecutor#setExecuteExistingDelayedTasksAfterShutdownPolicy
154
   */
155
  public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean flag) {
156
    if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor tpe) {
×
157
      tpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(flag);
×
158
    }
159
    this.executeExistingDelayedTasksAfterShutdownPolicy = flag;
×
160
  }
×
161

162
  /**
163
   * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
164
   * about to be executed.
165
   * <p>Note that such a decorator is not being applied to the user-supplied
166
   * {@code Runnable}/{@code Callable} but rather to the scheduled execution
167
   * callback (a wrapper around the user-supplied task).
168
   * <p>The primary use case is to set some execution context around the task's
169
   * invocation, or to provide some monitoring/statistics for task execution.
170
   */
171
  public void setTaskDecorator(TaskDecorator taskDecorator) {
172
    this.taskDecorator = taskDecorator;
3✔
173
  }
1✔
174

175
  /**
176
   * Set a custom {@link ErrorHandler} strategy.
177
   */
178
  public void setErrorHandler(ErrorHandler errorHandler) {
179
    this.errorHandler = errorHandler;
3✔
180
  }
1✔
181

182
  /**
183
   * Set the clock to use for scheduling purposes.
184
   * <p>The default clock is the system clock for the default time zone.
185
   *
186
   * @see Clock#systemDefaultZone()
187
   */
188
  public void setClock(Clock clock) {
189
    Assert.notNull(clock, "Clock is required");
×
190
    this.clock = clock;
×
191
  }
×
192

193
  @Override
194
  public Clock getClock() {
195
    return this.clock;
3✔
196
  }
197

198
  @Override
199
  protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedHandler) {
200
    ScheduledExecutorService executor = createExecutor(this.poolSize, threadFactory, rejectedHandler);
7✔
201
    if (executor instanceof ScheduledThreadPoolExecutor tpExecutor) {
6!
202
      if (this.removeOnCancelPolicy) {
3!
203
        tpExecutor.setRemoveOnCancelPolicy(true);
×
204
      }
205
      if (this.continueExistingPeriodicTasksAfterShutdownPolicy) {
3!
206
        tpExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
×
207
      }
208
      if (!this.executeExistingDelayedTasksAfterShutdownPolicy) {
3!
209
        tpExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
×
210
      }
211
    }
212
    this.scheduledExecutor = executor;
3✔
213
    return executor;
2✔
214
  }
215

216
  /**
217
   * Create a new {@link ScheduledExecutorService} instance.
218
   * <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
219
   * Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
220
   *
221
   * @param poolSize the specified pool size
222
   * @param threadFactory the ThreadFactory to use
223
   * @param rejectedHandler the RejectedExecutionHandler to use
224
   * @return a new ScheduledExecutorService instance
225
   * @see #afterPropertiesSet()
226
   * @see java.util.concurrent.ScheduledThreadPoolExecutor
227
   */
228
  protected ScheduledExecutorService createExecutor(int poolSize,
229
          ThreadFactory threadFactory, RejectedExecutionHandler rejectedHandler) {
230

231
    return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedHandler) {
17✔
232
      @Override
233
      protected void beforeExecute(Thread thread, Runnable task) {
234
        ThreadPoolTaskScheduler.this.beforeExecute(thread, task);
5✔
235
      }
1✔
236

237
      @Override
238
      protected void afterExecute(Runnable task, Throwable ex) {
239
        ThreadPoolTaskScheduler.this.afterExecute(task, ex);
5✔
240
      }
1✔
241

242
      @Override
243
      protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
244
        return decorateTaskIfNecessary(task);
5✔
245
      }
246

247
      @Override
248
      protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
249
        return decorateTaskIfNecessary(task);
×
250
      }
251
    };
252
  }
253

254
  /**
255
   * Return the underlying ScheduledExecutorService for native access.
256
   *
257
   * @return the underlying ScheduledExecutorService (never {@code null})
258
   * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
259
   */
260
  public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
261
    Assert.state(scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
7!
262
    return scheduledExecutor;
3✔
263
  }
264

265
  /**
266
   * Return the underlying ScheduledThreadPoolExecutor, if available.
267
   *
268
   * @return the underlying ScheduledExecutorService (never {@code null})
269
   * @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
270
   * or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor
271
   * @see #getScheduledExecutor()
272
   */
273
  public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
274
    Assert.state(scheduledExecutor instanceof ScheduledThreadPoolExecutor,
×
275
            "No ScheduledThreadPoolExecutor available");
276
    return (ScheduledThreadPoolExecutor) scheduledExecutor;
×
277
  }
278

279
  /**
280
   * Return the current pool size.
281
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
282
   *
283
   * @see #getScheduledThreadPoolExecutor()
284
   * @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize()
285
   */
286
  public int getPoolSize() {
287
    if (scheduledExecutor == null) {
3!
288
      // Not initialized yet: assume initial pool size.
289
      return poolSize;
3✔
290
    }
291
    return getScheduledThreadPoolExecutor().getPoolSize();
×
292
  }
293

294
  /**
295
   * Return the number of currently active threads.
296
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
297
   *
298
   * @see #getScheduledThreadPoolExecutor()
299
   * @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount()
300
   */
301
  public int getActiveCount() {
302
    if (scheduledExecutor == null) {
×
303
      // Not initialized yet: assume no active threads.
304
      return 0;
×
305
    }
306
    return getScheduledThreadPoolExecutor().getActiveCount();
×
307
  }
308

309
  /**
310
   * Return the current setting for the remove-on-cancel mode.
311
   * <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
312
   */
313
  public boolean isRemoveOnCancelPolicy() {
314
    if (scheduledExecutor == null) {
×
315
      // Not initialized yet: return our setting for the time being.
316
      return removeOnCancelPolicy;
×
317
    }
318
    return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
×
319
  }
320

321
  // SchedulingTaskExecutor implementation
322

323
  @Override
324
  public void execute(Runnable task) {
325
    Executor executor = getScheduledExecutor();
3✔
326
    try {
327
      executor.execute(errorHandlingTask(task, false));
6✔
328
    }
329
    catch (RejectedExecutionException ex) {
×
330
      throw new TaskRejectedException(executor, task, ex);
×
331
    }
1✔
332
  }
1✔
333

334
  @Override
335
  public Future<Void> submit(Runnable task) {
336
    ExecutorService executor = getScheduledExecutor();
3✔
337
    try {
338
      var future = Future.<Void>forFutureTask(errorHandlingTask(task, false), executor);
7✔
339
      executeAndTrack(executor, future);
4✔
340
      return future;
2✔
341
    }
342
    catch (RejectedExecutionException ex) {
×
343
      throw new TaskRejectedException(executor, task, ex);
×
344
    }
345
  }
346

347
  @Override
348
  public <T> Future<T> submit(Callable<T> task) {
349
    ExecutorService executor = getScheduledExecutor();
3✔
350
    try {
351
      var future = Future.forFutureTask(new DelegatingErrorHandlingCallable<>(task, this.errorHandler), executor);
9✔
352
      executeAndTrack(executor, future);
4✔
353
      return future;
2✔
354
    }
355
    catch (RejectedExecutionException ex) {
×
356
      throw new TaskRejectedException(executor, task, ex);
×
357
    }
358
  }
359

360
  private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> task) {
361
    var scheduledFuture = executor.submit(task);
4✔
362
    listenableFutureMap.put(scheduledFuture, task);
6✔
363
    task.onCompleted(f -> listenableFutureMap.remove(scheduledFuture));
12✔
364
  }
1✔
365

366
  @Override
367
  protected void cancelRemainingTask(Runnable task) {
368
    super.cancelRemainingTask(task);
3✔
369
    // Cancel associated user-level ListenableFuture handle as well
370
    Future<?> future = this.listenableFutureMap.get(task);
6✔
371
    if (future != null) {
2✔
372
      future.cancel(true);
4✔
373
    }
374
  }
1✔
375

376
  // TaskScheduler implementation
377

378
  @Override
379
  @Nullable
380
  public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
381
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
382
    try {
383
      ErrorHandler errorHandler = this.errorHandler;
3✔
384
      if (errorHandler == null) {
2!
385
        errorHandler = TaskUtils.getDefaultErrorHandler(true);
3✔
386
      }
387
      return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule();
11✔
388
    }
389
    catch (RejectedExecutionException ex) {
×
390
      throw new TaskRejectedException(executor, task, ex);
×
391
    }
392
  }
393

394
  @Override
395
  public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
396
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
397
    Duration delay = Duration.between(this.clock.instant(), startTime);
6✔
398
    try {
399
      return executor.schedule(errorHandlingTask(task, false), NANO.convert(delay), NANO);
11✔
400
    }
401
    catch (RejectedExecutionException ex) {
×
402
      throw new TaskRejectedException(executor, task, ex);
×
403
    }
404
  }
405

406
  @Override
407
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
408
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
409
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
410
    try {
411
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true),
9✔
412
              NANO.convert(initialDelay), NANO.convert(period), NANO);
5✔
413
    }
414
    catch (RejectedExecutionException ex) {
×
415
      throw new TaskRejectedException(executor, task, ex);
×
416
    }
417
  }
418

419
  @Override
420
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
421
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
422
    try {
423
      return executor.scheduleAtFixedRate(errorHandlingTask(task, true),
10✔
424
              0, NANO.convert(period), NANO);
2✔
425
    }
426
    catch (RejectedExecutionException ex) {
×
427
      throw new TaskRejectedException(executor, task, ex);
×
428
    }
429
  }
430

431
  @Override
432
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
433
    ScheduledExecutorService executor = getScheduledExecutor();
3✔
434
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
435
    try {
436
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true),
9✔
437
              NANO.convert(initialDelay), NANO.convert(delay), NANO);
5✔
438
    }
439
    catch (RejectedExecutionException ex) {
×
440
      throw new TaskRejectedException(executor, task, ex);
×
441
    }
442
  }
443

444
  @Override
445
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
446
    ScheduledExecutorService executor = getScheduledExecutor();
×
447
    try {
448
      return executor.scheduleWithFixedDelay(errorHandlingTask(task, true),
×
449
              0, NANO.convert(delay), NANO);
×
450
    }
451
    catch (RejectedExecutionException ex) {
×
452
      throw new TaskRejectedException(executor, task, ex);
×
453
    }
454
  }
455

456
  private <V> RunnableScheduledFuture<V> decorateTaskIfNecessary(RunnableScheduledFuture<V> future) {
457
    return taskDecorator != null ? new DelegatingRunnableScheduledFuture<>(future, taskDecorator) : future;
12✔
458
  }
459

460
  private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
461
    return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
6✔
462
  }
463

464
  private static class DelegatingRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
465

466
    private final RunnableScheduledFuture<V> future;
467

468
    private final Runnable decoratedRunnable;
469

470
    public DelegatingRunnableScheduledFuture(RunnableScheduledFuture<V> future, TaskDecorator taskDecorator) {
2✔
471
      this.future = future;
3✔
472
      this.decoratedRunnable = taskDecorator.decorate(this.future);
6✔
473
    }
1✔
474

475
    @Override
476
    public void run() {
477
      this.decoratedRunnable.run();
3✔
478
    }
1✔
479

480
    @Override
481
    public boolean cancel(boolean mayInterruptIfRunning) {
482
      return this.future.cancel(mayInterruptIfRunning);
5✔
483
    }
484

485
    @Override
486
    public boolean isCancelled() {
487
      return this.future.isCancelled();
4✔
488
    }
489

490
    @Override
491
    public boolean isDone() {
492
      return this.future.isDone();
4✔
493
    }
494

495
    @Override
496
    public V get() throws InterruptedException, ExecutionException {
497
      return this.future.get();
×
498
    }
499

500
    @Override
501
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
502
      return this.future.get(timeout, unit);
6✔
503
    }
504

505
    @Override
506
    public boolean isPeriodic() {
507
      return this.future.isPeriodic();
×
508
    }
509

510
    @Override
511
    public long getDelay(TimeUnit unit) {
512
      return this.future.getDelay(unit);
5✔
513
    }
514

515
    @Override
516
    public int compareTo(Delayed o) {
517
      return this.future.compareTo(o);
5✔
518
    }
519

520
  }
521

522
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc