• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

TAKETODAY / today-infrastructure / 8308118446

16 Mar 2024 01:26PM UTC coverage: 78.393% (+0.02%) from 78.374%
8308118446

push

github

TAKETODAY
:white_check_mark:

63592 of 86119 branches covered (73.84%)

Branch coverage included in aggregate %.

157000 of 195273 relevant lines covered (80.4%)

3.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.58
today-context/src/main/java/cn/taketoday/scheduling/concurrent/ConcurrentTaskScheduler.java
1
/*
2
 * Copyright 2017 - 2024 the original author or authors.
3
 *
4
 * This program is free software: you can redistribute it and/or modify
5
 * it under the terms of the GNU General Public License as published by
6
 * the Free Software Foundation, either version 3 of the License, or
7
 * (at your option) any later version.
8
 *
9
 * This program is distributed in the hope that it will be useful,
10
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
 * GNU General Public License for more details.
13
 *
14
 * You should have received a copy of the GNU General Public License
15
 * along with this program. If not, see [https://www.gnu.org/licenses/]
16
 */
17

18
package cn.taketoday.scheduling.concurrent;
19

20
import java.time.Clock;
21
import java.time.Duration;
22
import java.time.Instant;
23
import java.util.Date;
24
import java.util.concurrent.Callable;
25
import java.util.concurrent.Executor;
26
import java.util.concurrent.Executors;
27
import java.util.concurrent.Future;
28
import java.util.concurrent.RejectedExecutionException;
29
import java.util.concurrent.ScheduledExecutorService;
30
import java.util.concurrent.ScheduledFuture;
31
import java.util.concurrent.TimeUnit;
32

33
import cn.taketoday.core.task.TaskRejectedException;
34
import cn.taketoday.lang.Assert;
35
import cn.taketoday.lang.Nullable;
36
import cn.taketoday.scheduling.SchedulingTaskExecutor;
37
import cn.taketoday.scheduling.TaskScheduler;
38
import cn.taketoday.scheduling.Trigger;
39
import cn.taketoday.scheduling.TriggerContext;
40
import cn.taketoday.scheduling.support.TaskUtils;
41
import cn.taketoday.util.ClassUtils;
42
import cn.taketoday.util.ErrorHandler;
43
import cn.taketoday.util.concurrent.ListenableFuture;
44
import jakarta.enterprise.concurrent.LastExecution;
45
import jakarta.enterprise.concurrent.ManagedScheduledExecutorService;
46

47
/**
48
 * Adapter that takes a {@code java.util.concurrent.ScheduledExecutorService} and
49
 * exposes a Framework {@link TaskScheduler} for it.
50
 * Extends {@link ConcurrentTaskExecutor} in order to implement the
51
 * {@link cn.taketoday.scheduling.SchedulingTaskExecutor} interface as well.
52
 *
53
 * <p>Autodetects a JSR-236 {@link jakarta.enterprise.concurrent.ManagedScheduledExecutorService}
54
 * in order to use it for trigger-based scheduling if possible, instead of
55
 * local trigger management which ends up delegating to regular delay-based scheduling
56
 * against the {@code java.util.concurrent.ScheduledExecutorService} API. For JSR-236 style
57
 * lookup in a Jakarta EE environment, consider using {@link DefaultManagedTaskScheduler}.
58
 *
59
 * <p>Note that there is a pre-built {@link ThreadPoolTaskScheduler} that allows for
60
 * defining a {@link java.util.concurrent.ScheduledThreadPoolExecutor} in bean style,
61
 * exposing it as a Framework {@link TaskScheduler} directly.
62
 * This is a convenient alternative to a raw ScheduledThreadPoolExecutor definition with
63
 * a separate definition of the present adapter class.
64
 *
65
 * @author Juergen Hoeller
66
 * @author Mark Fisher
67
 * @author <a href="https://github.com/TAKETODAY">Harry Yang</a>
68
 * @see ScheduledExecutorService
69
 * @see java.util.concurrent.ScheduledThreadPoolExecutor
70
 * @see Executors
71
 * @see DefaultManagedTaskScheduler
72
 * @see ThreadPoolTaskScheduler
73
 * @since 4.0
74
 */
75
public class ConcurrentTaskScheduler extends ConcurrentTaskExecutor implements TaskScheduler {
76

77
  private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
2✔
78

79
  @Nullable
80
  private static final Class<?> managedScheduledExecutorServiceClass = ClassUtils.load(
5✔
81
          "jakarta.enterprise.concurrent.ManagedScheduledExecutorService",
82
          ConcurrentTaskScheduler.class.getClassLoader()
1✔
83
  );
84

85
  @Nullable
86
  private ScheduledExecutorService scheduledExecutor;
87

88
  private boolean enterpriseConcurrentScheduler = false;
3✔
89

90
  @Nullable
91
  private ErrorHandler errorHandler;
92

93
  private Clock clock = Clock.systemDefaultZone();
3✔
94

95
  /**
96
   * Create a new ConcurrentTaskScheduler,
97
   * using a single thread executor as default.
98
   *
99
   * @see java.util.concurrent.Executors#newSingleThreadScheduledExecutor()
100
   */
101
  public ConcurrentTaskScheduler() {
102
    super();
×
103
    this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
×
104
    this.enterpriseConcurrentScheduler = false;
×
105
  }
×
106

107
  /**
108
   * Create a new ConcurrentTaskScheduler, using the given
109
   * {@link java.util.concurrent.ScheduledExecutorService} as shared delegate.
110
   * <p>Autodetects a JSR-236 {@link jakarta.enterprise.concurrent.ManagedScheduledExecutorService}
111
   * in order to use it for trigger-based scheduling if possible,
112
   * instead of Infra local trigger management.
113
   *
114
   * @param scheduledExecutor the {@link java.util.concurrent.ScheduledExecutorService}
115
   * to delegate to for {@link SchedulingTaskExecutor}
116
   * as well as {@link TaskScheduler} invocations
117
   */
118
  public ConcurrentTaskScheduler(@Nullable ScheduledExecutorService scheduledExecutor) {
119
    super(scheduledExecutor);
3✔
120
    if (scheduledExecutor != null) {
2✔
121
      initScheduledExecutor(scheduledExecutor);
3✔
122
    }
123
  }
1✔
124

125
  /**
126
   * Create a new ConcurrentTaskScheduler, using the given {@link java.util.concurrent.Executor}
127
   * and {@link java.util.concurrent.ScheduledExecutorService} as delegates.
128
   * <p>Autodetects a JSR-236 {@link jakarta.enterprise.concurrent.ManagedScheduledExecutorService}
129
   * in order to use it for trigger-based scheduling if possible,
130
   * instead of Infra local trigger management.
131
   *
132
   * @param concurrentExecutor the {@link java.util.concurrent.Executor} to delegate to
133
   * for {@link SchedulingTaskExecutor} invocations
134
   * @param scheduledExecutor the {@link java.util.concurrent.ScheduledExecutorService}
135
   * to delegate to for {@link TaskScheduler} invocations
136
   */
137
  public ConcurrentTaskScheduler(Executor concurrentExecutor, ScheduledExecutorService scheduledExecutor) {
138
    super(concurrentExecutor);
×
139
    initScheduledExecutor(scheduledExecutor);
×
140
  }
×
141

142
  private void initScheduledExecutor(ScheduledExecutorService scheduledExecutor) {
143
    this.scheduledExecutor = scheduledExecutor;
3✔
144
    this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&
5✔
145
            managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));
4!
146
  }
1✔
147

148
  /**
149
   * Specify the {@link java.util.concurrent.ScheduledExecutorService} to delegate to.
150
   * <p>Autodetects a JSR-236 {@link jakarta.enterprise.concurrent.ManagedScheduledExecutorService}
151
   * in order to use it for trigger-based scheduling if possible,
152
   * instead of Infra local trigger management.
153
   * <p>Note: This will only apply to {@link TaskScheduler} invocations.
154
   * If you want the given executor to apply to
155
   * {@link cn.taketoday.scheduling.SchedulingTaskExecutor} invocations
156
   * as well, pass the same executor reference to {@link #setConcurrentExecutor}.
157
   *
158
   * @see #setConcurrentExecutor
159
   */
160
  public void setScheduledExecutor(ScheduledExecutorService scheduledExecutor) {
161
    initScheduledExecutor(scheduledExecutor);
×
162
  }
×
163

164
  private ScheduledExecutorService getScheduledExecutor() {
165
    if (this.scheduledExecutor == null) {
3✔
166
      throw new IllegalStateException("No ScheduledExecutor is configured");
5✔
167
    }
168
    return this.scheduledExecutor;
3✔
169
  }
170

171
  /**
172
   * Provide an {@link ErrorHandler} strategy.
173
   */
174
  public void setErrorHandler(ErrorHandler errorHandler) {
175
    Assert.notNull(errorHandler, "ErrorHandler is required");
3✔
176
    this.errorHandler = errorHandler;
3✔
177
  }
1✔
178

179
  /**
180
   * Set the clock to use for scheduling purposes.
181
   * <p>The default clock is the system clock for the default time zone.
182
   *
183
   * @see Clock#systemDefaultZone()
184
   */
185
  public void setClock(Clock clock) {
186
    Assert.notNull(clock, "Clock is required");
×
187
    this.clock = clock;
×
188
  }
×
189

190
  @Override
191
  public Clock getClock() {
192
    return this.clock;
×
193
  }
194

195
  @Override
196
  public void execute(Runnable task) {
197
    super.execute(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
7✔
198
  }
1✔
199

200
  @Override
201
  public Future<?> submit(Runnable task) {
202
    return super.submit(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
8✔
203
  }
204

205
  @Override
206
  public <T> Future<T> submit(Callable<T> task) {
207
    return super.submit(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
9✔
208
  }
209

210
  @Override
211
  public ListenableFuture<?> submitListenable(Runnable task) {
212
    return super.submitListenable(TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, false));
8✔
213
  }
214

215
  @Override
216
  public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
217
    return super.submitListenable(new DelegatingErrorHandlingCallable<>(task, this.errorHandler));
9✔
218
  }
219

220
  @Override
221
  @Nullable
222
  public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
223
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
3✔
224
    try {
225
      if (this.enterpriseConcurrentScheduler) {
3!
226
        return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
×
227
      }
228
      else {
229
        ErrorHandler errorHandler =
230
                (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
6!
231
        return new ReschedulingRunnable(
5✔
232
                decorateTaskIfNecessary(task), trigger, this.clock, scheduleExecutorToUse, errorHandler)
7✔
233
                .schedule();
1✔
234
      }
235
    }
236
    catch (RejectedExecutionException ex) {
×
237
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
238
    }
239
  }
240

241
  @Override
242
  public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
243
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
3✔
244
    Duration delay = Duration.between(this.clock.instant(), startTime);
6✔
245
    try {
246
      return scheduleExecutorToUse.schedule(decorateTask(task, false), NANO.convert(delay), NANO);
11✔
247
    }
248
    catch (RejectedExecutionException ex) {
×
249
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
250
    }
251
  }
252

253
  @Override
254
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
255
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
3✔
256
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
257
    try {
258
      return scheduleExecutorToUse.scheduleAtFixedRate(decorateTask(task, true),
9✔
259
              NANO.convert(initialDelay), NANO.convert(period), NANO);
5✔
260
    }
261
    catch (RejectedExecutionException ex) {
×
262
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
263
    }
264
  }
265

266
  @Override
267
  public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
268
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
3✔
269
    try {
270
      return scheduleExecutorToUse.scheduleAtFixedRate(decorateTask(task, true),
10✔
271
              0, NANO.convert(period), NANO);
2✔
272
    }
273
    catch (RejectedExecutionException ex) {
×
274
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
275
    }
276
  }
277

278
  @Override
279
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
280
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
3✔
281
    Duration initialDelay = Duration.between(this.clock.instant(), startTime);
6✔
282
    try {
283
      return scheduleExecutorToUse.scheduleWithFixedDelay(decorateTask(task, true),
9✔
284
              NANO.convert(initialDelay), NANO.convert(delay), NANO);
5✔
285
    }
286
    catch (RejectedExecutionException ex) {
×
287
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
288
    }
289
  }
290

291
  @Override
292
  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
293
    ScheduledExecutorService scheduleExecutorToUse = getScheduledExecutor();
×
294
    try {
295
      return scheduleExecutorToUse.scheduleWithFixedDelay(decorateTask(task, true),
×
296
              0, NANO.convert(delay), NANO);
×
297
    }
298
    catch (RejectedExecutionException ex) {
×
299
      throw new TaskRejectedException(scheduleExecutorToUse, task, ex);
×
300
    }
301
  }
302

303
  private Runnable decorateTask(Runnable task, boolean isRepeatingTask) {
304
    Runnable result = TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
6✔
305
    result = decorateTaskIfNecessary(result);
4✔
306
    if (this.enterpriseConcurrentScheduler) {
3!
307
      result = ManagedTaskBuilder.buildManagedTask(result, task.toString());
×
308
    }
309
    return result;
2✔
310
  }
311

312
  /**
313
   * Delegate that adapts an Infra Trigger to a JSR-236 Trigger.
314
   * Separated into an inner class in order to avoid a hard dependency on the JSR-236 API.
315
   */
316
  private class EnterpriseConcurrentTriggerScheduler {
×
317

318
    public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
319
      ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) getScheduledExecutor();
×
320
      return executor.schedule(task, new TriggerAdapter(trigger));
×
321
    }
322

323
    private static class TriggerAdapter implements jakarta.enterprise.concurrent.Trigger {
324

325
      private final Trigger adaptee;
326

327
      public TriggerAdapter(Trigger adaptee) {
×
328
        this.adaptee = adaptee;
×
329
      }
×
330

331
      @Override
332
      @Nullable
333
      public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {
334
        Instant instant = this.adaptee.nextExecution(new LastExecutionAdapter(le));
×
335
        return (instant != null ? Date.from(instant) : null);
×
336
      }
337

338
      @Override
339
      public boolean skipRun(LastExecution lastExecutionInfo, Date scheduledRunTime) {
340
        return false;
×
341
      }
342

343
      private static class LastExecutionAdapter implements TriggerContext {
344

345
        @Nullable
346
        private final LastExecution le;
347

348
        public LastExecutionAdapter(@Nullable LastExecution le) {
×
349
          this.le = le;
×
350
        }
×
351

352
        @Override
353
        public Instant lastScheduledExecution() {
354
          return this.le != null ? toInstant(this.le.getScheduledStart()) : null;
×
355
        }
356

357
        @Override
358
        public Instant lastActualExecution() {
359
          return (this.le != null ? toInstant(this.le.getRunStart()) : null);
×
360
        }
361

362
        @Override
363
        public Instant lastCompletion() {
364
          return (this.le != null ? toInstant(this.le.getRunEnd()) : null);
×
365
        }
366

367
        @Nullable
368
        private static Instant toInstant(@Nullable Date date) {
369
          return (date != null ? date.toInstant() : null);
×
370
        }
371
      }
372
    }
373
  }
374

375
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc