• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orisai / scheduler / 24056471189

06 Apr 2026 11:33PM UTC coverage: 97.997% (-1.6%) from 99.561%
24056471189

push

github

mabar
Maintenance mode

308 of 318 new or added lines in 15 files covered. (96.86%)

33 existing lines in 4 files now uncovered.

2593 of 2646 relevant lines covered (98.0%)

61.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.49
/src/ManagedScheduler.php
1
<?php declare(strict_types = 1);
2

3
namespace Orisai\Scheduler;
4

5
use Closure;
6
use DateTimeImmutable;
7
use Generator;
8
use LogicException;
9
use Orisai\Clock\Adapter\ClockAdapterFactory;
10
use Orisai\Clock\Clock;
11
use Orisai\Clock\SystemClock;
12
use Orisai\Exceptions\Logic\InvalidArgument;
13
use Orisai\Exceptions\Message;
14
use Orisai\Scheduler\Exception\JobFailure;
15
use Orisai\Scheduler\Executor\BasicJobExecutor;
16
use Orisai\Scheduler\Executor\JobExecutor;
17
use Orisai\Scheduler\Executor\ShutdownCheck;
18
use Orisai\Scheduler\Job\JobLock;
19
use Orisai\Scheduler\Job\JobSchedule;
20
use Orisai\Scheduler\Maintenance\MaintenanceManager;
21
use Orisai\Scheduler\Manager\JobManager;
22
use Orisai\Scheduler\Status\JobInfo;
23
use Orisai\Scheduler\Status\JobResult;
24
use Orisai\Scheduler\Status\JobResultState;
25
use Orisai\Scheduler\Status\JobSummary;
26
use Orisai\Scheduler\Status\PlannedJobInfo;
27
use Orisai\Scheduler\Status\RunInfo;
28
use Orisai\Scheduler\Status\RunParameters;
29
use Orisai\Scheduler\Status\RunSummary;
30
use Psr\Clock\ClockInterface;
31
use Psr\Log\LoggerInterface;
32
use Psr\Log\NullLogger;
33
use Symfony\Component\Lock\LockFactory;
34
use Symfony\Component\Lock\Store\InMemoryStore;
35
use Throwable;
36
use function bin2hex;
37
use function iterator_to_array;
38
use function random_bytes;
39
use function time;
40

41
class ManagedScheduler implements Scheduler
42
{
43

44
        private JobManager $jobManager;
45

46
        /** @var Closure(Throwable, JobInfo, JobResult): (void)|null */
47
        private ?Closure $errorHandler;
48

49
        private LockFactory $lockFactory;
50

51
        private JobExecutor $executor;
52

53
        private Clock $clock;
54

55
        private LoggerInterface $logger;
56

57
        private ?MaintenanceManager $maintenanceManager;
58

59
        /** @var list<Closure(JobInfo, JobResult): void> */
60
        private array $lockedJobCallbacks = [];
61

62
        /** @var list<Closure(JobInfo): void> */
63
        private array $beforeJobCallbacks = [];
64

65
        /** @var list<Closure(JobInfo, JobResult): void> */
66
        private array $afterJobCallbacks = [];
67

68
        /** @var list<Closure(RunInfo): void> */
69
        private array $beforeRunCallbacks = [];
70

71
        /** @var list<Closure(RunSummary): void> */
72
        private array $afterRunCallbacks = [];
73

74
        /**
75
         * @param Closure(Throwable, JobInfo, JobResult): (void)|null $errorHandler
76
         * @param-later-invoked-callable $errorHandler
77
         */
78
        public function __construct(
79
                JobManager $jobManager,
80
                ?Closure $errorHandler = null,
81
                ?LockFactory $lockFactory = null,
82
                ?JobExecutor $executor = null,
83
                ?ClockInterface $clock = null,
84
                ?LoggerInterface $logger = null,
85
                ?MaintenanceManager $maintenanceManager = null
86
        )
87
        {
88
                $this->jobManager = $jobManager;
832✔
89
                $this->errorHandler = $errorHandler;
832✔
90
                $this->lockFactory = $lockFactory ?? new LockFactory(new InMemoryStore());
832✔
91
                $this->clock = ClockAdapterFactory::create($clock ?? new SystemClock());
832✔
92
                $this->logger = $logger ?? new NullLogger();
832✔
93

94
                $this->executor = $executor ?? new BasicJobExecutor(
832✔
95
                        $this->clock,
832✔
96
                        fn ($id, JobSchedule $jobSchedule, int $runSecond): array => $this->runInternal(
832✔
97
                                $id,
832✔
98
                                $jobSchedule,
832✔
99
                                new RunParameters($runSecond, false),
832✔
100
                        ),
832✔
101
                );
832✔
102

103
                if ($maintenanceManager !== null && !$this->executor->supportsJobShutdown()) {
832✔
104
                        throw new LogicException(
8✔
105
                                'MaintenanceManager requires an executor that supports job shutdown (e.g. ProcessJobExecutor).'
8✔
106
                                . ' BasicJobExecutor cannot terminate running jobs.',
8✔
107
                        );
8✔
108
                }
109

110
                $this->maintenanceManager = $maintenanceManager;
824✔
111
        }
112

113
        public function getJobSchedules(): array
114
        {
115
                return $this->jobManager->getJobSchedules();
112✔
116
        }
117

118
        public function runJob($id, bool $force = true, ?RunParameters $parameters = null): ?JobSummary
119
        {
120
                $jobSchedule = $this->jobManager->getJobSchedule($id);
168✔
121
                $parameters ??= new RunParameters(0, $force);
168✔
122

123
                if ($jobSchedule === null) {
168✔
124
                        $message = Message::create()
16✔
125
                                ->withContext("Running job with ID '$id'")
16✔
126
                                ->withProblem('Job is not registered by scheduler.')
16✔
127
                                ->with(
16✔
128
                                        'Tip',
16✔
129
                                        "Inspect keys in 'Scheduler->getJobSchedules()' or run command 'scheduler:list' to find correct job ID.",
16✔
130
                                );
16✔
131

132
                        throw InvalidArgument::create()
16✔
133
                                ->withMessage($message);
16✔
134
                }
135

136
                $expression = $jobSchedule->getExpression();
152✔
137

138
                $timeZone = $jobSchedule->getTimeZone();
152✔
139
                $jobDueTime = $timeZone !== null
152✔
140
                        ? $this->clock->now()->setTimezone($timeZone)
8✔
141
                        : $this->clock->now();
146✔
142

143
                // Intentionally ignores repeat after seconds
144
                if (!$force && !$expression->isDue($jobDueTime)) {
152✔
145
                        return null;
32✔
146
                }
147

148
                [$summary, $throwable] = $this->runInternal($id, $jobSchedule, $parameters);
152✔
149

150
                if ($throwable !== null) {
152✔
151
                        throw JobFailure::create($summary, $throwable);
8✔
152
                }
153

154
                return $summary;
144✔
155
        }
156

157
        /**
158
         * @param array<int|string, JobSchedule> $jobSchedules
159
         * @return array<int, array<int|string, JobSchedule>>
160
         */
161
        private function groupJobSchedulesBySecond(array $jobSchedules): array
162
        {
163
                $scheduledJobsBySecond = [];
368✔
164
                foreach ($jobSchedules as $id => $jobSchedule) {
368✔
165
                        $repeatAfterSeconds = $jobSchedule->getRepeatAfterSeconds();
296✔
166

167
                        if ($repeatAfterSeconds === 0) {
296✔
168
                                $scheduledJobsBySecond[0][$id] = $jobSchedule;
288✔
169
                        } else {
170
                                for ($second = 0; $second <= 59; $second += $repeatAfterSeconds) {
56✔
171
                                        $scheduledJobsBySecond[$second][$id] = $jobSchedule;
56✔
172
                                }
173
                        }
174
                }
175

176
                return $scheduledJobsBySecond;
368✔
177
        }
178

179
        public function getMaintenanceManager(): ?MaintenanceManager
180
        {
181
                return $this->maintenanceManager;
24✔
182
        }
183

184
        public function runPromise(): Generator
185
        {
186
                $runStart = $this->clock->now();
432✔
187
                $runId = time() . '-' . bin2hex(random_bytes(3));
432✔
188

189
                if ($this->maintenanceManager !== null) {
432✔
190
                        $this->maintenanceManager->registerRun($runId);
128✔
191
                }
192

193
                try {
194
                        $jobSchedules = [];
432✔
195
                        foreach ($this->jobManager->getJobSchedules() as $id => $schedule) {
432✔
196
                                $timeZone = $schedule->getTimeZone();
352✔
197
                                $jobDueTime = $timeZone !== null
352✔
198
                                        ? $runStart->setTimezone($timeZone)
32✔
199
                                        : $runStart;
340✔
200

201
                                if ($schedule->getExpression()->isDue($jobDueTime)) {
352✔
202
                                        $jobSchedules[$id] = $schedule;
352✔
203
                                }
204
                        }
205

206
                        // Check maintenance before starting any jobs
207
                        if ($this->maintenanceManager !== null && $this->maintenanceManager->isMaintenance()) {
432✔
208
                                $generator = $this->createMaintenanceRunSummary($runStart, $jobSchedules);
64✔
209

210
                                yield from $generator;
64✔
211

212
                                return $generator->getReturn();
64✔
213
                        }
214

215
                        $shutdownCheck = $this->createShutdownCheck();
368✔
216

217
                        $generator = $this->executor->runJobs(
368✔
218
                                $this->groupJobSchedulesBySecond($jobSchedules),
368✔
219
                                $runStart,
368✔
220
                                $this->getBeforeRunCallback($runStart, $jobSchedules),
368✔
221
                                $this->getAfterRunCallback(),
368✔
222
                                $shutdownCheck,
368✔
223
                        );
368✔
224

225
                        yield from $generator;
368✔
226

227
                        return $generator->getReturn();
320✔
228
                } finally {
229
                        if ($this->maintenanceManager !== null) {
432✔
230
                                $this->maintenanceManager->deregisterRun($runId);
128✔
231
                        }
232
                }
233
        }
234

235
        private function createShutdownCheck(): ?ShutdownCheck
236
        {
237
                if (!$this->executor->supportsJobShutdown() || $this->maintenanceManager === null) {
368✔
238
                        return null;
304✔
239
                }
240

241
                $manager = $this->maintenanceManager;
64✔
242

243
                return new ShutdownCheck(
64✔
244
                        static fn (): bool => $manager->isShutdownRequested() || $manager->isMaintenance(),
64✔
245
                        $manager->getGracePeriodSeconds(),
64✔
246
                );
64✔
247
        }
248

249
        /**
250
         * @param array<int|string, JobSchedule> $jobSchedules
251
         * @return Generator<int, JobSummary, void, RunSummary>
252
         */
253
        private function createMaintenanceRunSummary(DateTimeImmutable $runStart, array $jobSchedules): Generator
254
        {
255
                $jobSummaries = [];
64✔
256
                foreach ($jobSchedules as $id => $jobSchedule) {
64✔
257
                        $job = $jobSchedule->getJob();
56✔
258
                        $timezone = $jobSchedule->getTimeZone();
56✔
259
                        $jobStart = $timezone !== null
56✔
NEW
260
                                ? $runStart->setTimezone($timezone)
×
261
                                : $runStart;
56✔
262

263
                        $info = new JobInfo(
56✔
264
                                $id,
56✔
265
                                $job->getName(),
56✔
266
                                $jobSchedule->getExpression()->getExpression(),
56✔
267
                                $jobSchedule->getRepeatAfterSeconds(),
56✔
268
                                0,
56✔
269
                                $jobStart,
56✔
270
                                $timezone,
56✔
271
                                false,
56✔
272
                        );
56✔
273

274
                        $result = new JobResult(
56✔
275
                                $jobSchedule->getExpression(),
56✔
276
                                $jobStart,
56✔
277
                                JobResultState::maintenance(),
56✔
278
                        );
56✔
279

280
                        yield $jobSummaries[] = new JobSummary($info, $result);
56✔
281
                }
282

283
                $summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries, true);
64✔
284

285
                foreach ($this->afterRunCallbacks as $cb) {
64✔
286
                        $cb($summary);
16✔
287
                }
288

289
                return $summary;
64✔
290
        }
291

292
        /**
293
         * @param array<int|string, JobSchedule> $jobSchedules
294
         * @return Closure(): void
295
         */
296
        private function getBeforeRunCallback(DateTimeImmutable $runStart, array $jobSchedules): Closure
297
        {
298
                return function () use ($runStart, $jobSchedules): void {
368✔
299
                        if ($this->beforeRunCallbacks === []) {
368✔
300
                                return;
336✔
301
                        }
302

303
                        $jobInfos = [];
32✔
304
                        foreach ($jobSchedules as $id => $jobSchedule) {
32✔
305
                                $job = $jobSchedule->getJob();
24✔
306
                                $timezone = $jobSchedule->getTimeZone();
24✔
307
                                $jobStart = $timezone !== null
24✔
308
                                        ? $runStart->setTimezone($timezone)
8✔
309
                                        : $runStart;
24✔
310
                                $jobInfos[] = new PlannedJobInfo(
24✔
311
                                        $id,
24✔
312
                                        $job->getName(),
24✔
313
                                        $jobSchedule->getExpression()->getExpression(),
24✔
314
                                        $jobSchedule->getRepeatAfterSeconds(),
24✔
315
                                        $jobStart,
24✔
316
                                        $timezone,
24✔
317
                                );
24✔
318
                        }
319

320
                        $info = new RunInfo($runStart, $jobInfos);
32✔
321

322
                        foreach ($this->beforeRunCallbacks as $cb) {
32✔
323
                                $cb($info);
32✔
324
                        }
325
                };
368✔
326
        }
327

328
        /**
329
         * @return Closure(RunSummary): void
330
         */
331
        private function getAfterRunCallback(): Closure
332
        {
333
                return function (RunSummary $runSummary): void {
368✔
334
                        foreach ($this->afterRunCallbacks as $cb) {
368✔
335
                                $cb($runSummary);
32✔
336
                        }
337
                };
368✔
338
        }
339

340
        public function run(): RunSummary
341
        {
342
                $generator = $this->runPromise();
344✔
343
                // Forces generator to execute
344
                iterator_to_array($generator);
344✔
345

346
                return $generator->getReturn();
296✔
347
        }
348

349
        /**
350
         * @param string|int  $id
351
         * @return array{JobSummary, Throwable|null}
352
         */
353
        private function runInternal($id, JobSchedule $jobSchedule, RunParameters $runParameters): array
354
        {
355
                $job = $jobSchedule->getJob();
264✔
356
                $expression = $jobSchedule->getExpression();
264✔
357

358
                $info = new JobInfo(
264✔
359
                        $id,
264✔
360
                        $job->getName(),
264✔
361
                        $expression->getExpression(),
264✔
362
                        $jobSchedule->getRepeatAfterSeconds(),
264✔
363
                        $runParameters->getSecond(),
264✔
364
                        $this->getCurrentTime($jobSchedule),
264✔
365
                        $jobSchedule->getTimeZone(),
264✔
366
                        $runParameters->isForcedRun(),
264✔
367
                );
264✔
368

369
                $lock = $this->lockFactory->createLock("Orisai.Scheduler.Job/$id");
264✔
370

371
                if (!$lock->acquire()) {
264✔
372
                        $result = new JobResult($expression, $info->getStart(), JobResultState::lock());
32✔
373

374
                        foreach ($this->lockedJobCallbacks as $cb) {
32✔
375
                                $cb($info, $result);
8✔
376
                        }
377

378
                        return [
32✔
379
                                new JobSummary($info, $result),
32✔
380
                                null,
32✔
381
                        ];
32✔
382
                }
383

384
                $throwable = null;
248✔
385
                try {
386
                        foreach ($this->beforeJobCallbacks as $cb) {
248✔
387
                                $cb($info);
24✔
388
                        }
389

390
                        try {
391
                                $job->run(new JobLock($lock));
248✔
392
                        } catch (Throwable $throwable) {
56✔
393
                                // Handled bellow
394
                        }
395

396
                        if ($lock->isExpired()) {
248✔
397
                                $this->logger->warning("Lock of job '$id' expired before the job finished.", [
8✔
398
                                        'id' => $id,
8✔
399
                                ]);
8✔
400
                        }
401

402
                        $result = new JobResult(
248✔
403
                                $expression,
248✔
404
                                $this->getCurrentTime($jobSchedule),
248✔
405
                                $throwable === null ? JobResultState::done() : JobResultState::fail(),
248✔
406
                        );
248✔
407

408
                        foreach ($this->afterJobCallbacks as $cb) {
248✔
409
                                $cb($info, $result);
32✔
410
                        }
411

412
                        if ($throwable !== null && $this->errorHandler !== null) {
248✔
413
                                ($this->errorHandler)($throwable, $info, $result);
40✔
414
                                $throwable = null;
40✔
415
                        }
416
                } finally {
417
                        $lock->release();
248✔
418
                }
419

420
                return [
248✔
421
                        new JobSummary($info, $result),
248✔
422
                        $throwable,
248✔
423
                ];
248✔
424
        }
425

426
        private function getCurrentTime(JobSchedule $schedule): DateTimeImmutable
427
        {
428
                $now = $this->clock->now();
264✔
429
                $timezone = $schedule->getTimeZone();
264✔
430

431
                return $timezone !== null
264✔
432
                        ? $now->setTimezone($timezone)
32✔
433
                        : $now;
264✔
434
        }
435

436
        /**
437
         * @param Closure(JobInfo, JobResult): void $callback
438
         * @param-later-invoked-callable $callback
439
         */
440
        public function addLockedJobCallback(Closure $callback): void
441
        {
442
                $this->lockedJobCallbacks[] = $callback;
8✔
443
        }
444

445
        /**
446
         * @param Closure(JobInfo): void $callback
447
         * @param-later-invoked-callable $callback
448
         */
449
        public function addBeforeJobCallback(Closure $callback): void
450
        {
451
                $this->beforeJobCallbacks[] = $callback;
24✔
452
        }
453

454
        /**
455
         * @param Closure(JobInfo, JobResult): void $callback
456
         * @param-later-invoked-callable $callback
457
         */
458
        public function addAfterJobCallback(Closure $callback): void
459
        {
460
                $this->afterJobCallbacks[] = $callback;
32✔
461
        }
462

463
        /**
464
         * @param Closure(RunInfo): void $callback
465
         * @param-later-invoked-callable $callback
466
         */
467
        public function addBeforeRunCallback(Closure $callback): void
468
        {
469
                $this->beforeRunCallbacks[] = $callback;
32✔
470
        }
471

472
        /**
473
         * @param Closure(RunSummary): void $callback
474
         * @param-later-invoked-callable $callback
475
         */
476
        public function addAfterRunCallback(Closure $callback): void
477
        {
478
                $this->afterRunCallbacks[] = $callback;
48✔
479
        }
480

481
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc