• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orisai / scheduler / 24634959708

19 Apr 2026 05:31PM UTC coverage: 95.05% (-2.8%) from 97.882%
24634959708

push

github

mabar
Move events from subprocesses to main process, fire after job callbacks for locked jobs and maintenance, parse subprocess events via new protocol

171 of 223 new or added lines in 4 files covered. (76.68%)

54 existing lines in 3 files now uncovered.

3053 of 3212 relevant lines covered (95.05%)

76.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.82
/src/Executor/ProcessJobExecutor.php
1
<?php declare(strict_types = 1);
2

3
namespace Orisai\Scheduler\Executor;
4

5
use Closure;
6
use DateTimeImmutable;
7
use DateTimeZone;
8
use Generator;
9
use JsonException;
10
use Orisai\Clock\Adapter\ClockAdapterFactory;
11
use Orisai\Clock\Clock;
12
use Orisai\Clock\SystemClock;
13
use Orisai\Exceptions\Logic\InvalidState;
14
use Orisai\Exceptions\Message;
15
use Orisai\Scheduler\Exception\JobProcessFailure;
16
use Orisai\Scheduler\Exception\RunFailure;
17
use Orisai\Scheduler\Job\JobSchedule;
18
use Orisai\Scheduler\Maintenance\CreatesMaintenanceJobSummary;
19
use Orisai\Scheduler\Status\JobInfo;
20
use Orisai\Scheduler\Status\JobResult;
21
use Orisai\Scheduler\Status\JobResultState;
22
use Orisai\Scheduler\Status\JobSummary;
23
use Orisai\Scheduler\Status\RunParameters;
24
use Orisai\Scheduler\Status\RunSummary;
25
use Psr\Clock\ClockInterface;
26
use Psr\Log\LoggerInterface;
27
use Psr\Log\NullLogger;
28
use Symfony\Component\Process\PhpExecutableFinder;
29
use Symfony\Component\Process\Process;
30
use function array_merge;
31
use function assert;
32
use function is_array;
33
use function json_decode;
34
use function json_encode;
35
use function strlen;
36
use function strpos;
37
use function substr;
38
use function trim;
39
use const JSON_THROW_ON_ERROR;
40

41
/**
42
 * @infection-ignore-all
43
 */
44
final class ProcessJobExecutor implements JobExecutor
45
{
46

47
        use CreatesMaintenanceJobSummary;
48

49
        private Clock $clock;
50

51
        private LoggerInterface $logger;
52

53
        private string $script = 'bin/console';
54

55
        private string $command = 'scheduler:run-job';
56

57
        public function __construct(?ClockInterface $clock = null, ?LoggerInterface $logger = null)
58
        {
59
                $this->clock = ClockAdapterFactory::create($clock ?? new SystemClock());
280✔
60
                $this->logger = $logger ?? new NullLogger();
280✔
61
        }
62

63
        public function setExecutable(string $script, string $command = 'scheduler:run-job'): void
64
        {
65
                $this->script = $script;
160✔
66
                $this->command = $command;
160✔
67
        }
68

69
        public function runJobs(
70
                array $jobSchedulesBySecond,
71
                DateTimeImmutable $runStart,
72
                Closure $beforeRunCallback,
73
                Closure $afterRunCallback,
74
                ?ShutdownCheck $shutdownCheck = null,
75
                ?Closure $onJobEvent = null
76
        ): Generator
77
        {
78
                $finder = new PhpExecutableFinder();
200✔
79
                $binary = $finder->find(false);
200✔
80
                // @codeCoverageIgnoreStart
81
                if ($binary === false) {
82
                        throw InvalidState::create()
83
                                ->withMessage('PHP executable could not be found, subprocess cannot be executed.');
84
                }
85

86
                // @codeCoverageIgnoreEnd
87
                $phpCommand = array_merge([$binary], $finder->findArguments());
200✔
88

89
                $beforeRunCallback();
200✔
90

91
                /** @var array<int, SubprocessExecutionState> $jobExecutions */
92
                $jobExecutions = [];
200✔
93
                $jobSummaries = [];
200✔
94
                $suppressedExceptions = [];
200✔
95
                $maintenanceActive = false;
200✔
96

97
                $shutdownDetectedAt = null;
200✔
98
                $lastShutdownCheckAt = 0.0;
200✔
99
                $lastRefreshAt = 0.0;
200✔
100

101
                $lastExecutedSecond = -1;
200✔
102
                while ($jobExecutions !== [] || $jobSchedulesBySecond !== []) {
200✔
103
                        // Refresh registry (throttled to every 30 seconds)
104
                        if ($shutdownCheck !== null && $shutdownDetectedAt === null) {
168✔
105
                                $now = (float) $this->clock->now()->format('U.u');
72✔
106
                                if ($now - $lastRefreshAt >= 30.0) {
72✔
107
                                        $lastRefreshAt = $now;
24✔
108
                                        $shutdownCheck->refresh();
24✔
109
                                }
110
                        }
111

112
                        // Check for shutdown (throttled to every 1 second)
113
                        if ($shutdownCheck !== null && $shutdownDetectedAt === null) {
168✔
114
                                $now = (float) $this->clock->now()->format('U.u');
72✔
115
                                if ($now - $lastShutdownCheckAt >= 1.0) {
72✔
116
                                        $lastShutdownCheckAt = $now;
72✔
117

118
                                        if ($shutdownCheck->shouldShutdown()) {
72✔
119
                                                $shutdownDetectedAt = $now;
72✔
120
                                                $maintenanceActive = true;
72✔
121

122
                                                // Create maintenance summaries for jobs not yet started
123
                                                foreach ($jobSchedulesBySecond as $second => $schedules) {
72✔
124
                                                        foreach ($schedules as $id => $jobSchedule) {
16✔
125
                                                                yield $jobSummaries[] = $this->createMaintenanceJobSummary(
16✔
126
                                                                        $id,
16✔
127
                                                                        $jobSchedule,
16✔
128
                                                                        $second,
16✔
129
                                                                        $runStart,
16✔
130
                                                                );
16✔
131
                                                        }
132
                                                }
133

134
                                                $jobSchedulesBySecond = [];
72✔
135
                                        }
136
                                }
137
                        }
138

139
                        // Force-kill remaining processes after grace period
140
                        if ($shutdownDetectedAt !== null && $shutdownCheck !== null && $jobExecutions !== []) {
168✔
141
                                $elapsed = (float) $this->clock->now()->format('U.u') - $shutdownDetectedAt;
56✔
142
                                if ($elapsed >= $shutdownCheck->getGracePeriodSeconds()) {
56✔
143
                                        foreach ($jobExecutions as $i => $state) {
40✔
144
                                                if ($state->process->isRunning()) {
40✔
145
                                                        $state->process->stop(10);
40✔
146
                                                }
147

148
                                                // Final drain of subprocess output after stop()
149
                                                $this->pollSubprocess($state, $onJobEvent);
40✔
150

151
                                                unset($jobExecutions[$i]);
40✔
152

153
                                                $summary = $this->tryCollectJobSummary($state);
40✔
154
                                                if ($summary === null) {
40✔
155
                                                        // Subprocess was killed before emitting `finished`. Reuse the
156
                                                        // JobInfo from `started` (if received) to preserve executionId
157
                                                        // pairing with the already-fired beforeJob callback.
158
                                                        $summary = $state->startedInfo !== null
24✔
159
                                                                ? $this->createJobSummaryFromStartedInfo(
8✔
160
                                                                        $state->startedInfo,
8✔
161
                                                                        $state->schedule,
8✔
162
                                                                        JobResultState::maintenance(),
8✔
163
                                                                )
8✔
164
                                                                : $this->createMaintenanceJobSummary(
18✔
165
                                                                        $state->id,
18✔
166
                                                                        $state->schedule,
18✔
167
                                                                        0,
18✔
168
                                                                        $runStart,
18✔
169
                                                                );
18✔
170
                                                }
171

172
                                                $this->logUnexpectedOutputIfAny($state);
40✔
173

174
                                                yield $jobSummaries[] = $summary;
40✔
175
                                        }
176

177
                                        break;
40✔
178
                                }
179
                        }
180

181
                        // If we have scheduled jobs and are at right second, execute them
182
                        if ($jobSchedulesBySecond !== []) {
168✔
183
                                $shouldRunSecond = $this->clock->now()->getTimestamp() - $runStart->getTimestamp();
152✔
184

185
                                while ($lastExecutedSecond < $shouldRunSecond) {
152✔
186
                                        $currentSecond = $lastExecutedSecond + 1;
152✔
187
                                        if (isset($jobSchedulesBySecond[$currentSecond])) {
152✔
188
                                                $jobExecutions = $this->startJobs(
152✔
189
                                                        $phpCommand,
152✔
190
                                                        $jobSchedulesBySecond[$currentSecond],
152✔
191
                                                        $jobExecutions,
152✔
192
                                                        new RunParameters($currentSecond, false),
152✔
193
                                                );
152✔
194
                                                unset($jobSchedulesBySecond[$currentSecond]);
152✔
195
                                        }
196

197
                                        $lastExecutedSecond = $currentSecond;
152✔
198
                                }
199
                        }
200

201
                        // Check running jobs
202
                        foreach ($jobExecutions as $i => $state) {
168✔
203
                                // Poll subprocess for new framework events (dispatches `started`)
204
                                $this->pollSubprocess($state, $onJobEvent);
152✔
205

206
                                if ($state->process->isRunning()) {
152✔
207
                                        continue;
152✔
208
                                }
209

210
                                // Subprocess exited — drain any remaining output (finished event may
211
                                // arrive right before exit and not be visible until after isRunning() flipped).
212
                                $this->pollSubprocess($state, $onJobEvent);
112✔
213

214
                                unset($jobExecutions[$i]);
112✔
215

216
                                $summary = $this->tryCollectJobSummary($state);
112✔
217
                                if ($summary === null) {
112✔
218
                                        // Check shutdown directly - SIGINT may have killed the subprocess before
219
                                        // the throttled shutdown check had a chance to set $shutdownDetectedAt
220
                                        if ($shutdownCheck !== null && $shutdownCheck->shouldShutdown()) {
32✔
221
                                                // Reuse startedInfo if received (preserves executionId pairing).
222
                                                $summary = $state->startedInfo !== null
8✔
NEW
223
                                                        ? $this->createJobSummaryFromStartedInfo(
×
NEW
224
                                                                $state->startedInfo,
×
NEW
225
                                                                $state->schedule,
×
NEW
226
                                                                JobResultState::maintenance(),
×
NEW
227
                                                        )
×
228
                                                        : $this->createMaintenanceJobSummary(
8✔
229
                                                                $state->id,
8✔
230
                                                                $state->schedule,
8✔
231
                                                                0,
8✔
232
                                                                $runStart,
8✔
233
                                                        );
8✔
234
                                                $maintenanceActive = true;
8✔
235
                                        } elseif ($state->startedInfo !== null) {
24✔
236
                                                // Subprocess crashed after `started` but before `finished`. beforeJob
237
                                                // was already fired for this job — yield a synthetic fail summary so
238
                                                // afterJob fires and the pairing invariant holds. Also surface the
239
                                                // subprocess-level failure via RunFailure.
240
                                                $summary = $this->createJobSummaryFromStartedInfo(
8✔
241
                                                        $state->startedInfo,
8✔
242
                                                        $state->schedule,
8✔
243
                                                        JobResultState::fail(),
8✔
244
                                                );
8✔
245
                                                $suppressedExceptions[] = $this->createSubprocessFail(
8✔
246
                                                        $state->process,
8✔
247
                                                        trim($state->process->getOutput()),
8✔
248
                                                        trim($state->process->getErrorOutput()),
8✔
249
                                                );
8✔
250
                                        } else {
251
                                                // Subprocess died before emitting any event. beforeJob never fired
252
                                                // either, so skipping yield keeps the invariant intact.
253
                                                $suppressedExceptions[] = $this->createSubprocessFail(
16✔
254
                                                        $state->process,
16✔
255
                                                        trim($state->process->getOutput()),
16✔
256
                                                        trim($state->process->getErrorOutput()),
16✔
257
                                                );
16✔
258

259
                                                continue;
24✔
260
                                        }
261
                                } elseif ($state->failureEvent !== null) {
80✔
262
                                        // Job threw in the subprocess and had no errorHandler — surface it as a
263
                                        // suppressed exception so runPromise throws RunFailure (parity with
264
                                        // BasicJobExecutor behavior, parity with pre-events-protocol behavior).
265
                                        $suppressedExceptions[] = $this->createUnhandledJobFailure(
16✔
266
                                                $state->process,
16✔
267
                                                $state->failureEvent,
16✔
268
                                        );
16✔
269
                                }
270

271
                                $this->logUnexpectedOutputIfAny($state);
96✔
272

273
                                yield $jobSummaries[] = $summary;
96✔
274
                        }
275

276
                        // Nothing to do, wait
277
                        $this->clock->sleep(0, 1);
168✔
278
                }
279

280
                $summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries, $maintenanceActive);
200✔
281

282
                $afterRunCallback($summary);
200✔
283

284
                if ($suppressedExceptions !== []) {
200✔
285
                        throw RunFailure::create($summary, $suppressedExceptions);
40✔
286
                }
287

288
                return $summary;
160✔
289
        }
290

291
        /**
292
         * @param list<string> $phpCommand
293
         * @param array<int|string, JobSchedule> $jobSchedules
294
         * @param array<int, SubprocessExecutionState> $jobExecutions
295
         * @return array<int, SubprocessExecutionState>
296
         */
297
        private function startJobs(
298
                array $phpCommand,
299
                array $jobSchedules,
300
                array $jobExecutions,
301
                RunParameters $parameters
302
        ): array
303
        {
304
                foreach ($jobSchedules as $id => $jobSchedule) {
152✔
305
                        $execution = new Process(
152✔
306
                                array_merge($phpCommand, [
152✔
307
                                        $this->script,
152✔
308
                                        $this->command,
152✔
309
                                        $id,
152✔
310
                                        '--events',
152✔
311
                                        '--parameters',
152✔
312
                                        json_encode($parameters->toArray(), JSON_THROW_ON_ERROR),
152✔
313
                                ]),
152✔
314
                        );
152✔
315
                        $execution->start();
152✔
316

317
                        $jobExecutions[] = new SubprocessExecutionState($execution, $jobSchedule, $id);
152✔
318
                }
319

320
                return $jobExecutions;
152✔
321
        }
322

323
        /**
324
         * Reads incremental subprocess stdout, parses marker-prefixed JSON lines as
325
         * framework events, and dispatches `started` events to $onJobEvent. Non-marker
326
         * lines accumulate in the state's unexpectedStdout buffer.
327
         *
328
         * @param (Closure(int|string, JobSchedule, int<0, max>, JobInfo): void)|null $onJobEvent
329
         */
330
        private function pollSubprocess(SubprocessExecutionState $state, ?Closure $onJobEvent): void
331
        {
332
                $state->unparsedBuffer .= $state->process->getIncrementalOutput();
152✔
333

334
                $marker = SubprocessEventProtocol::EventMarker;
152✔
335
                $markerLen = strlen($marker);
152✔
336

337
                while (($nlPos = strpos($state->unparsedBuffer, "\n")) !== false) {
152✔
338
                        $line = substr($state->unparsedBuffer, 0, $nlPos);
117✔
339
                        $state->unparsedBuffer = substr($state->unparsedBuffer, $nlPos + 1);
117✔
340

341
                        if ($line === '') {
117✔
NEW
342
                                continue;
×
343
                        }
344

345
                        if (strpos($line, $marker) !== 0) {
117✔
346
                                $state->unexpectedStdout .= $line . "\n";
5✔
347

348
                                continue;
5✔
349
                        }
350

351
                        $json = substr($line, $markerLen);
112✔
352
                        try {
353
                                $event = json_decode($json, true, 512, JSON_THROW_ON_ERROR);
112✔
NEW
354
                        } catch (JsonException $e) {
×
355
                                // Malformed framework event — treat the whole line as unexpected output
NEW
356
                                $state->unexpectedStdout .= $line . "\n";
×
357

NEW
358
                                continue;
×
359
                        }
360

361
                        assert(is_array($event));
112✔
362

363
                        $type = $event['type'] ?? null;
112✔
364
                        if ($type === SubprocessEventProtocol::TypeStarted) {
112✔
365
                                if (!$state->startedDispatched) {
112✔
366
                                        $info = $this->buildJobInfo($event['info'], $state->schedule);
112✔
367
                                        $state->startedInfo = $info;
112✔
368
                                        if ($onJobEvent !== null) {
112✔
369
                                                $onJobEvent($state->id, $state->schedule, $info->getRunSecond(), $info);
112✔
370
                                        }
371

372
                                        $state->startedDispatched = true;
112✔
373
                                }
374
                        } elseif ($type === SubprocessEventProtocol::TypeFinished) {
96✔
375
                                $state->finishedEvent = $event;
96✔
376
                        } elseif ($type === SubprocessEventProtocol::TypeFailure) {
16✔
377
                                $state->failureEvent = $event;
16✔
378
                        }
379
                }
380
        }
381

382
        private function tryCollectJobSummary(SubprocessExecutionState $state): ?JobSummary
383
        {
384
                $stderr = trim($state->process->getErrorOutput());
152✔
385
                if ($stderr !== '') {
152✔
386
                        $this->logUnexpectedStderr($state->process, $state->id, $stderr);
27✔
387
                }
388

389
                if ($state->finishedEvent === null) {
152✔
390
                        return null;
56✔
391
                }
392

393
                return $this->createSummary($state->finishedEvent, $state->schedule);
96✔
394
        }
395

396
        private function logUnexpectedOutputIfAny(SubprocessExecutionState $state): void
397
        {
398
                $captured = '';
136✔
399
                if ($state->finishedEvent !== null && isset($state->finishedEvent['stdout'])) {
136✔
400
                        $captured = (string) $state->finishedEvent['stdout'];
96✔
401
                }
402

403
                $combined = $state->unexpectedStdout . $captured;
136✔
404
                if ($combined !== '') {
136✔
405
                        $this->logUnexpectedStdout($state->process, $state->id, $combined);
16✔
406
                }
407
        }
408

409
        /**
410
         * Builds a synthetic JobSummary reusing the JobInfo from the subprocess's
411
         * `started` event. Used when the subprocess exited without emitting `finished`
412
         * (maintenance-killed or crashed) — reusing the JobInfo preserves the
413
         * `executionId` pairing between beforeJob and afterJob user callbacks.
414
         */
415
        private function createJobSummaryFromStartedInfo(
416
                JobInfo $startedInfo,
417
                JobSchedule $jobSchedule,
418
                JobResultState $state
419
        ): JobSummary
420
        {
421
                $timezone = $jobSchedule->getTimeZone();
16✔
422
                $now = $timezone !== null
16✔
NEW
423
                        ? $this->clock->now()->setTimezone($timezone)
×
424
                        : $this->clock->now();
16✔
425

426
                $result = new JobResult($jobSchedule->getExpression(), $now, $state);
16✔
427

428
                return new JobSummary($startedInfo, $result);
16✔
429
        }
430

431
        /**
432
         * @param array<mixed> $rawInfo
433
         */
434
        private function buildJobInfo(array $rawInfo, JobSchedule $jobSchedule): JobInfo
435
        {
436
                return new JobInfo(
112✔
437
                        $rawInfo['id'],
112✔
438
                        $rawInfo['name'],
112✔
439
                        $rawInfo['expression'],
112✔
440
                        $rawInfo['repeatAfterSeconds'],
112✔
441
                        $rawInfo['runSecond'],
112✔
442
                        DateTimeImmutable::createFromFormat('U.u', $rawInfo['start'][0])
112✔
443
                                ->setTimezone(new DateTimeZone($rawInfo['start'][1])),
112✔
444
                        $jobSchedule->getTimeZone(),
112✔
445
                        $rawInfo['forcedRun'],
112✔
446
                );
112✔
447
        }
448

449
        /**
450
         * @param array<mixed> $raw
451
         */
452
        private function createSummary(array $raw, JobSchedule $jobSchedule): JobSummary
453
        {
454
                return new JobSummary(
96✔
455
                        $this->buildJobInfo($raw['info'], $jobSchedule),
96✔
456
                        new JobResult(
96✔
457
                                $jobSchedule->getExpression(),
96✔
458
                                DateTimeImmutable::createFromFormat('U.u', $raw['result']['end'][0])
96✔
459
                                        ->setTimezone(new DateTimeZone($raw['result']['end'][1])),
96✔
460
                                JobResultState::from($raw['result']['state']),
96✔
461
                                $raw['result']['lockExpired'] ?? false,
96✔
462
                        ),
96✔
463
                );
96✔
464
        }
465

466
        private function createSubprocessFail(Process $execution, string $output, string $errorOutput): JobProcessFailure
467
        {
468
                $message = Message::create()
24✔
469
                        ->withContext("Running job via command {$execution->getCommandLine()}")
24✔
470
                        ->withProblem('Job subprocess did not correctly write job result to stdout.')
24✔
471
                        ->with('Tip', 'Check the documentation for troubleshooting guide.')
24✔
472
                        ->with('Exit code', (string) $execution->getExitCode())
24✔
473
                        ->with('stdout', $output)
24✔
474
                        ->with('stderr', $errorOutput);
24✔
475

476
                return JobProcessFailure::create()
24✔
477
                        ->withMessage($message);
24✔
478
        }
479

480
        /**
481
         * @param array<mixed> $failureEvent
482
         */
483
        private function createUnhandledJobFailure(Process $execution, array $failureEvent): JobProcessFailure
484
        {
485
                $class = isset($failureEvent['class']) ? (string) $failureEvent['class'] : 'Throwable';
16✔
486
                $text = isset($failureEvent['message']) ? (string) $failureEvent['message'] : '';
16✔
487

488
                $message = Message::create()
16✔
489
                        ->withContext("Running job via command {$execution->getCommandLine()}")
16✔
490
                        ->withProblem("Job threw an unhandled $class: $text")
16✔
491
                        ->with('Tip', 'Register an error handler on the scheduler to handle job failures gracefully.');
16✔
492

493
                return JobProcessFailure::create()
16✔
494
                        ->withMessage($message);
16✔
495
        }
496

497
        /**
498
         * @param int|string $jobId
499
         */
500
        private function logUnexpectedStderr(Process $execution, $jobId, string $stderr): void
501
        {
502
                $this->logger->warning("Subprocess running job '$jobId' produced unexpected stderr output.", [
27✔
503
                        'id' => $jobId,
27✔
504
                        'command' => $execution->getCommandLine(),
27✔
505
                        'exitCode' => $execution->getExitCode(),
27✔
506
                        'stderr' => $stderr,
27✔
507
                ]);
27✔
508
        }
509

510
        /**
511
         * @param int|string $jobId
512
         */
513
        private function logUnexpectedStdout(Process $execution, $jobId, string $stdout): void
514
        {
515
                $this->logger->warning("Subprocess running job '$jobId' produced unexpected stdout output.", [
16✔
516
                        'id' => $jobId,
16✔
517
                        'command' => $execution->getCommandLine(),
16✔
518
                        'exitCode' => $execution->getExitCode(),
16✔
519
                        'stdout' => $stdout,
16✔
520
                ]);
16✔
521
        }
522

523
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc