• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TAKETODAY / today-infrastructure / 8308118446

16 Mar 2024 01:26PM UTC coverage: 78.393% (+0.02%) from 78.374%
8308118446

push

github

TAKETODAY
:white_check_mark:

63592 of 86119 branches covered (73.84%)

Branch coverage included in aggregate %.

157000 of 195273 relevant lines covered (80.4%)

3.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.28
today-context/src/main/java/cn/taketoday/scheduling/concurrent/SimpleAsyncTaskScheduler.java
1
/*
2
 * Copyright 2017 - 2024 the original author or authors.
3
 *
4
 * This program is free software: you can redistribute it and/or modify
5
 * it under the terms of the GNU General Public License as published by
6
 * the Free Software Foundation, either version 3 of the License, or
7
 * (at your option) any later version.
8
 *
9
 * This program is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU General Public License
15
 * along with this program. If not, see [https://www.gnu.org/licenses/]
16
 */
17

18
package cn.taketoday.scheduling.concurrent;
19

20
import java.time.Clock;
21
import java.time.Duration;
22
import java.time.Instant;
23
import java.util.concurrent.Callable;
24
import java.util.concurrent.Executor;
25
import java.util.concurrent.Future;
26
import java.util.concurrent.RejectedExecutionException;
27
import java.util.concurrent.ScheduledExecutorService;
28
import java.util.concurrent.ScheduledFuture;
29
import java.util.concurrent.ScheduledThreadPoolExecutor;
30
import java.util.concurrent.TimeUnit;
31

32
import cn.taketoday.context.ApplicationContext;
33
import cn.taketoday.context.ApplicationContextAware;
34
import cn.taketoday.context.ApplicationListener;
35
import cn.taketoday.context.SmartLifecycle;
36
import cn.taketoday.context.event.ContextClosedEvent;
37
import cn.taketoday.core.task.SimpleAsyncTaskExecutor;
38
import cn.taketoday.core.task.TaskRejectedException;
39
import cn.taketoday.lang.Assert;
40
import cn.taketoday.lang.Nullable;
41
import cn.taketoday.logging.LoggerFactory;
42
import cn.taketoday.scheduling.TaskScheduler;
43
import cn.taketoday.scheduling.Trigger;
44
import cn.taketoday.scheduling.support.DelegatingErrorHandlingRunnable;
45
import cn.taketoday.scheduling.support.TaskUtils;
46
import cn.taketoday.util.ErrorHandler;
47
import cn.taketoday.util.concurrent.ListenableFuture;
48

49
/**
50
 * A simple implementation of Infra {@link TaskScheduler} interface, using
51
 * a single scheduler thread and executing every scheduled task in an individual
52
 * separate thread. This is an attractive choice with virtual threads on JDK 21,
53
 * expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
54
 *
55
 * <p><b>NOTE: Scheduling with a fixed delay enforces execution on the single
56
 * scheduler thread, in order to provide traditional fixed-delay semantics!</b>
57
 * Prefer the use of fixed rates or cron triggers instead which are a better fit
58
 * with this thread-per-task scheduler variant.
59
 *
60
 * <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
61
 * at the expense of task tracking overhead per execution thread at runtime.
62
 * Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
63
 * By default, the number of concurrent task executions is unlimited.
64
 * This allows for dynamic concurrency of scheduled task executions, in contrast
65
 * to {@link ThreadPoolTaskScheduler} which requires a fixed pool size.
66
 *
67
 * <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
68
 * thread-pooling TaskScheduler implementation instead, in particular for
69
 * scheduling a large number of short-lived tasks. Alternatively, on JDK 21,
70
 * consider setting {@link #setVirtualThreads} to {@code true}.
71
 *
72
 * <p>Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable
73
 * replacement for it, e.g. as a single shared instance serving as a
74
 * {@link cn.taketoday.core.task.TaskExecutor} as well as a {@link TaskScheduler}.
75
 * This is generally not the case with other executor/scheduler implementations
76
 * which tend to have specific constraints for the scheduler thread pool,
77
 * requiring a separate thread pool for general executor purposes in practice.
78
 *
79
 * <p>As an alternative to the built-in thread-per-task capability, this scheduler
80
 * can also be configured with a separate target executor for scheduled task
81
 * execution through {@link #setTargetTaskExecutor}: e.g. pointing to a shared
82
 * {@link ThreadPoolTaskExecutor} bean. This is still rather different from a
83
 * {@link ThreadPoolTaskScheduler} setup since it always uses a single scheduler
84
 * thread while dynamically dispatching to the target thread pool which may have
85
 * a dynamic core/max pool size range, participating in a shared concurrency limit.
86
 *
87
 * @author Juergen Hoeller
88
 * @author <a href="https://github.com/TAKETODAY">Harry Yang</a>
89
 * @see #setVirtualThreads
90
 * @see #setTaskTerminationTimeout
91
 * @see #setConcurrencyLimit
92
 * @see SimpleAsyncTaskExecutor
93
 * @see ThreadPoolTaskScheduler
94
 * @since 4.0
95
 */
96
@SuppressWarnings("serial")
97
public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler,
2✔
98
        ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent> {
99

100
  /**
101
   * The default phase for an executor {@link SmartLifecycle}: {@code Integer.MAX_VALUE / 2}.
102
   *
103
   * @see #getPhase()
104
   * @see ExecutorConfigurationSupport#DEFAULT_PHASE
105
   */
106
  public static final int DEFAULT_PHASE = ExecutorConfigurationSupport.DEFAULT_PHASE;
107

108
  private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
3✔
109

110
  private final ScheduledExecutorService scheduledExecutor = createScheduledExecutor();
4✔
111

112
  private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor);
7✔
113

114
  @Nullable
115
  private ErrorHandler errorHandler;
116

117
  private Clock clock = Clock.systemDefaultZone();
3✔
118

119
  private int phase = DEFAULT_PHASE;
4✔
120

121
  @Nullable
122
  private Executor targetTaskExecutor;
123

124
  @Nullable
125
  private ApplicationContext applicationContext;
126

127
  /**
128
   * Provide an {@link ErrorHandler} strategy.
129
   */
130
  public void setErrorHandler(ErrorHandler errorHandler) {
131
    Assert.notNull(errorHandler, "ErrorHandler is required");
3✔
132
    this.errorHandler = errorHandler;
3✔
133
  }
1✔
134

135
  /**
136
   * Set the clock to use for scheduling purposes.
137
   * <p>The default clock is the system clock for the default time zone.
138
   *
139
   * @see Clock#systemDefaultZone()
140
   */
141
  public void setClock(Clock clock) {
142
    Assert.notNull(clock, "Clock is required");
×
143
    this.clock = clock;
×
144
  }
×
145

146
  @Override
147
  public Clock getClock() {
148
    return this.clock;
×
149
  }
150

151
  /**
152
   * Specify the lifecycle phase for pausing and resuming this executor.
153
   * The default is {@link #DEFAULT_PHASE}.
154
   *
155
   * @see SmartLifecycle#getPhase()
156
   */
157
  public void setPhase(int phase) {
158
    this.phase = phase;
×
159
  }
×
160

161
  /**
162
   * Return the lifecycle phase for pausing and resuming this executor.
163
   *
164
   * @see #setPhase
165
   */
166
  @Override
167
  public int getPhase() {
168
    return this.phase;
3✔
169
  }
170

171
  /**
172
   * Specify a custom target {@link Executor} to delegate to for
173
   * the individual execution of scheduled tasks. This can for example
174
   * be set to a separate thread pool for executing scheduled tasks,
175
   * whereas this scheduler keeps using its single scheduler thread.
176
   * <p>If not set, the regular {@link SimpleAsyncTaskExecutor}
177
   * arrangements kicks in with a new thread per task.
178
   */
179
  public void setTargetTaskExecutor(Executor targetTaskExecutor) {
180
    this.targetTaskExecutor = (targetTaskExecutor == this ? null : targetTaskExecutor);
×
181
  }
×
182

183
  @Override
184
  public void setApplicationContext(ApplicationContext applicationContext) {
185
    this.applicationContext = applicationContext;
3✔
186
  }
1✔
187

188
  private ScheduledExecutorService createScheduledExecutor() {
189
    return new ScheduledThreadPoolExecutor(1, this::newThread) {
20✔
190
      @Override
191
      protected void beforeExecute(Thread thread, Runnable task) {
192
        lifecycleDelegate.beforeExecute(thread);
5✔
193
      }
1✔
194

195
      @Override
196
      protected void afterExecute(Runnable task, Throwable ex) {
197
        lifecycleDelegate.afterExecute();
4✔
198
      }
1✔
199
    };
200
  }
201

202
  @Override
203
  protected void doExecute(Runnable task) {
204
    if (this.targetTaskExecutor != null) {
3!
205
      this.targetTaskExecutor.execute(task);
×
206
    }
207
    else {
208
      super.doExecute(task);
3✔
209
    }
210
  }
1✔
211

212
  private Runnable taskOnSchedulerThread(Runnable task) {
213
    return new DelegatingErrorHandlingRunnable(task,
×
214
            (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)));
×
215
  }
216

217
  private Runnable scheduledTask(Runnable task) {
218
    return () -> execute(new DelegatingErrorHandlingRunnable(task, this::shutdownAwareErrorHandler));
13✔
219
  }
220

221
  private void shutdownAwareErrorHandler(Throwable ex) {
222
    if (this.errorHandler != null) {
3!
223
      this.errorHandler.handleError(ex);
5✔
224
    }
225
    else if (this.scheduledExecutor.isTerminated()) {
×
226
      LoggerFactory.getLogger(getClass()).debug("Ignoring scheduled task exception after shutdown", ex);
×
227
    }
228
    else {
229
      TaskUtils.getDefaultErrorHandler(true).handleError(ex);
×
230
    }
231
  }
1✔
232

233
  @Override
234
  public void execute(Runnable task) {
235
    super.execute(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
7✔
236
  }
1✔
237

238
  @Override
239
  public Future<?> submit(Runnable task) {
240
    return super.submit(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
8✔
241
  }
242

243
  @Override
244
  public <T> Future<T> submit(Callable<T> task) {
245
    return super.submit(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
9✔
246
  }
247

248
  @Override
249
  public ListenableFuture<?> submitListenable(Runnable task) {
250
    return super.submitListenable(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
8✔
251
  }
252

253
  @Override
254
  public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
255
    return super.submitListenable(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
9✔
256
  }
257

258
  @Override
259
  @Nullable
260
  public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
261
    try {
262
      Runnable delegate = scheduledTask(task);
4✔
263
      ErrorHandler errorHandler =
264
              (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
6!
265
      return new ReschedulingRunnable(
11✔
266
              delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
1✔
267
    }
268
    catch (RejectedExecutionException ex) {
×
269
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
270
    }
271
  }
272

273
  @Override
274
  public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
275
    Duration delay = Duration.between(this.clock.instant(), startTime);
6✔
276
    try {
277
      return this.scheduledExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO);
11✔
278
    }
279
    catch (RejectedExecutionException ex) {
×
280
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
281
    }
282
  }
283

284
  @Override
285
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
286
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
×
287
    try {
288
      return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
×
289
              NANO.convert(initialDelay), NANO.convert(period), NANO);
×
290
    }
291
    catch (RejectedExecutionException ex) {
×
292
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
293
    }
294
  }
295

296
  @Override
297
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
298
    try {
299
      return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
×
300
              0, NANO.convert(period), NANO);
×
301
    }
302
    catch (RejectedExecutionException ex) {
×
303
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
304
    }
305
  }
306

307
  @Override
308
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
309
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
×
310
    try {
311
      // Blocking task on scheduler thread for fixed delay semantics
312
      return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
×
313
              NANO.convert(initialDelay), NANO.convert(delay), NANO);
×
314
    }
315
    catch (RejectedExecutionException ex) {
×
316
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
317
    }
318
  }
319

320
  @Override
321
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
322
    try {
323
      // Blocking task on scheduler thread for fixed delay semantics
324
      return this.scheduledExecutor.scheduleWithFixedDelay(taskOnSchedulerThread(task),
×
325
              0, NANO.convert(delay), NANO);
×
326
    }
327
    catch (RejectedExecutionException ex) {
×
328
      throw new TaskRejectedException(this.scheduledExecutor, task, ex);
×
329
    }
330
  }
331

332
  @Override
333
  public void start() {
334
    this.lifecycleDelegate.start();
×
335
  }
×
336

337
  @Override
338
  public void stop() {
339
    this.lifecycleDelegate.stop();
×
340
  }
×
341

342
  @Override
343
  public void stop(Runnable callback) {
344
    this.lifecycleDelegate.stop(callback);
×
345
  }
×
346

347
  @Override
348
  public boolean isRunning() {
349
    return this.lifecycleDelegate.isRunning();
4✔
350
  }
351

352
  @Override
353
  public void onApplicationEvent(ContextClosedEvent event) {
354
    if (event.getApplicationContext() == this.applicationContext) {
5!
355
      this.scheduledExecutor.shutdown();
3✔
356
    }
357
  }
1✔
358

359
  @Override
360
  public void close() {
361
    for (Runnable remainingTask : this.scheduledExecutor.shutdownNow()) {
8!
362
      if (remainingTask instanceof Future<?> future) {
×
363
        future.cancel(true);
×
364
      }
365
    }
×
366
    super.close();
2✔
367
  }
1✔
368

369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc