• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orisai / scheduler / 24056544130

06 Apr 2026 11:33PM UTC coverage: 97.997% (-1.6%) from 99.561%
24056544130

push

github

mabar
Maintenance mode

308 of 318 new or added lines in 15 files covered. (96.86%)

33 existing lines in 4 files now uncovered.

2593 of 2646 relevant lines covered (98.0%)

61.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.27
/src/Executor/ProcessJobExecutor.php
1
<?php declare(strict_types = 1);
2

3
namespace Orisai\Scheduler\Executor;
4

5
use Closure;
6
use DateTimeImmutable;
7
use DateTimeZone;
8
use Generator;
9
use JsonException;
10
use Orisai\Clock\Adapter\ClockAdapterFactory;
11
use Orisai\Clock\Clock;
12
use Orisai\Clock\SystemClock;
13
use Orisai\Exceptions\Logic\InvalidState;
14
use Orisai\Exceptions\Message;
15
use Orisai\Scheduler\Exception\JobProcessFailure;
16
use Orisai\Scheduler\Exception\RunFailure;
17
use Orisai\Scheduler\Job\JobSchedule;
18
use Orisai\Scheduler\Status\JobInfo;
19
use Orisai\Scheduler\Status\JobResult;
20
use Orisai\Scheduler\Status\JobResultState;
21
use Orisai\Scheduler\Status\JobSummary;
22
use Orisai\Scheduler\Status\RunParameters;
23
use Orisai\Scheduler\Status\RunSummary;
24
use Psr\Clock\ClockInterface;
25
use Psr\Log\LoggerInterface;
26
use Psr\Log\NullLogger;
27
use Symfony\Component\Process\PhpExecutableFinder;
28
use Symfony\Component\Process\Process;
29
use function array_merge;
30
use function assert;
31
use function is_array;
32
use function json_decode;
33
use function json_encode;
34
use function trim;
35
use const JSON_THROW_ON_ERROR;
36

37
/**
38
 * @infection-ignore-all
39
 */
40
final class ProcessJobExecutor implements JobExecutor
41
{
42

43
        private Clock $clock;
44

45
        private LoggerInterface $logger;
46

47
        private string $script = 'bin/console';
48

49
        private string $command = 'scheduler:run-job';
50

51
        public function __construct(?ClockInterface $clock = null, ?LoggerInterface $logger = null)
52
        {
53
                $this->clock = ClockAdapterFactory::create($clock ?? new SystemClock());
232✔
54
                $this->logger = $logger ?? new NullLogger();
232✔
55
        }
56

57
        public function setExecutable(string $script, string $command = 'scheduler:run-job'): void
58
        {
59
                $this->script = $script;
112✔
60
                $this->command = $command;
112✔
61
        }
62

63
        public function supportsJobShutdown(): bool
64
        {
65
                return true;
232✔
66
        }
67

68
        public function runJobs(
69
                array $jobSchedulesBySecond,
70
                DateTimeImmutable $runStart,
71
                Closure $beforeRunCallback,
72
                Closure $afterRunCallback,
73
                ?ShutdownCheck $shutdownCheck = null
74
        ): Generator
75
        {
76
                $finder = new PhpExecutableFinder();
152✔
77
                $binary = $finder->find(false);
152✔
78
                if ($binary === false) {
152✔
79
                        throw InvalidState::create()
×
80
                                ->withMessage('PHP executable could not be found, subprocess cannot be executed.');
×
81
                }
82

83
                $phpCommand = array_merge([$binary], $finder->findArguments());
152✔
84

85
                $beforeRunCallback();
152✔
86

87
                $jobExecutions = [];
152✔
88
                $jobSummaries = [];
152✔
89
                $suppressedExceptions = [];
152✔
90
                $maintenanceActive = false;
152✔
91

92
                $shutdownDetectedAt = null;
152✔
93
                $lastShutdownCheckAt = 0.0;
152✔
94

95
                $lastExecutedSecond = -1;
152✔
96
                while ($jobExecutions !== [] || $jobSchedulesBySecond !== []) {
152✔
97
                        // Check for shutdown (throttled to every 100ms)
98
                        if ($shutdownCheck !== null && $shutdownDetectedAt === null) {
120✔
99
                                $now = (float) $this->clock->now()->format('U.u');
48✔
100
                                if ($now - $lastShutdownCheckAt >= 0.1) {
48✔
101
                                        $lastShutdownCheckAt = $now;
48✔
102

103
                                        if ($shutdownCheck->shouldShutdown()) {
48✔
104
                                                $shutdownDetectedAt = $now;
48✔
105
                                                $maintenanceActive = true;
48✔
106

107
                                                // Create maintenance summaries for jobs not yet started
108
                                                foreach ($jobSchedulesBySecond as $second => $schedules) {
48✔
109
                                                        foreach ($schedules as $id => $jobSchedule) {
16✔
110
                                                                yield $jobSummaries[] = $this->createMaintenanceSummary(
16✔
111
                                                                        $id,
16✔
112
                                                                        $jobSchedule,
16✔
113
                                                                        $second,
16✔
114
                                                                        $runStart,
16✔
115
                                                                );
16✔
116
                                                        }
117
                                                }
118

119
                                                $jobSchedulesBySecond = [];
48✔
120
                                        }
121
                                }
122
                        }
123

124
                        // Force-kill remaining processes after grace period
125
                        if ($shutdownDetectedAt !== null && $shutdownCheck !== null && $jobExecutions !== []) {
120✔
126
                                $elapsed = (float) $this->clock->now()->format('U.u') - $shutdownDetectedAt;
32✔
127
                                if ($elapsed >= $shutdownCheck->getGracePeriodSeconds()) {
32✔
128
                                        foreach ($jobExecutions as $i => [$execution, $jobSchedule, $jobId]) {
24✔
129
                                                assert($execution instanceof Process);
24✔
130
                                                if ($execution->isRunning()) {
24✔
131
                                                        $execution->stop(10);
24✔
132
                                                }
133

134
                                                unset($jobExecutions[$i]);
24✔
135

136
                                                $stdout = trim($execution->getOutput());
24✔
137
                                                $stderr = trim($execution->getErrorOutput());
24✔
138

139
                                                try {
140
                                                        $decoded = json_decode($stdout, true, 512, JSON_THROW_ON_ERROR);
24✔
141
                                                        assert(is_array($decoded));
8✔
142

143
                                                        yield $jobSummaries[] = $this->createSummary($decoded, $jobSchedule);
8✔
144
                                                } catch (JsonException $e) {
16✔
145
                                                        // Process was killed before producing output
146
                                                        yield $jobSummaries[] = $this->createMaintenanceSummary(
16✔
147
                                                                $jobId,
16✔
148
                                                                $jobSchedule,
16✔
149
                                                                0,
16✔
150
                                                                $runStart,
16✔
151
                                                        );
16✔
152
                                                }
153
                                        }
154

155
                                        break;
24✔
156
                                }
157
                        }
158

159
                        // If we have scheduled jobs and are at right second, execute them
160
                        if ($jobSchedulesBySecond !== []) {
120✔
161
                                $shouldRunSecond = $this->clock->now()->getTimestamp() - $runStart->getTimestamp();
104✔
162

163
                                while ($lastExecutedSecond < $shouldRunSecond) {
104✔
164
                                        $currentSecond = $lastExecutedSecond + 1;
104✔
165
                                        if (isset($jobSchedulesBySecond[$currentSecond])) {
104✔
166
                                                $jobExecutions = $this->startJobs(
104✔
167
                                                        $phpCommand,
104✔
168
                                                        $jobSchedulesBySecond[$currentSecond],
104✔
169
                                                        $jobExecutions,
104✔
170
                                                        new RunParameters($currentSecond, false),
104✔
171
                                                );
104✔
172
                                                unset($jobSchedulesBySecond[$currentSecond]);
104✔
173
                                        }
174

175
                                        $lastExecutedSecond = $currentSecond;
104✔
176
                                }
177
                        }
178

179
                        // Check running jobs
180
                        foreach ($jobExecutions as $i => [$execution, $jobSchedule, $jobId]) {
120✔
181
                                assert($execution instanceof Process);
104✔
182
                                if ($execution->isRunning()) {
104✔
183
                                        continue;
104✔
184
                                }
185

186
                                unset($jobExecutions[$i]);
80✔
187

188
                                $stdout = trim($execution->getOutput());
80✔
189
                                $stderr = trim($execution->getErrorOutput());
80✔
190

191
                                try {
192
                                        $decoded = json_decode($stdout, true, 512, JSON_THROW_ON_ERROR);
80✔
193
                                        assert(is_array($decoded));
56✔
194
                                } catch (JsonException $e) {
32✔
195
                                        $suppressedExceptions[] = $this->createSubprocessFail(
32✔
196
                                                $execution,
32✔
197
                                                $stdout,
32✔
198
                                                $stderr,
32✔
199
                                        );
32✔
200

201
                                        continue;
32✔
202
                                }
203

204
                                $unexpectedStdout = $decoded['stdout'];
56✔
205
                                if ($unexpectedStdout !== '') {
56✔
206
                                        $this->logUnexpectedStdout($execution, $jobId, $unexpectedStdout);
8✔
207
                                }
208

209
                                if ($stderr !== '') {
56✔
210
                                        $this->logUnexpectedStderr($execution, $jobId, $stderr);
8✔
211
                                }
212

213
                                yield $jobSummaries[] = $this->createSummary($decoded, $jobSchedule);
56✔
214
                        }
215

216
                        // Nothing to do, wait
217
                        $this->clock->sleep(0, 1);
120✔
218
                }
219

220
                $summary = new RunSummary($runStart, $this->clock->now(), $jobSummaries, $maintenanceActive);
152✔
221

222
                $afterRunCallback($summary);
152✔
223

224
                if ($suppressedExceptions !== []) {
152✔
225
                        throw RunFailure::create($summary, $suppressedExceptions);
32✔
226
                }
227

228
                return $summary;
120✔
229
        }
230

231
        /**
232
         * @param list<string> $phpCommand
233
         * @param array<int|string, JobSchedule> $jobSchedules
234
         * @param array<int, array{Process, JobSchedule, int|string}> $jobExecutions
235
         * @return array<int, array{Process, JobSchedule, int|string}>
236
         */
237
        private function startJobs(
238
                array $phpCommand,
239
                array $jobSchedules,
240
                array $jobExecutions,
241
                RunParameters $parameters
242
        ): array
243
        {
244
                foreach ($jobSchedules as $id => $jobSchedule) {
104✔
245
                        $execution = new Process(
104✔
246
                                array_merge($phpCommand, [
104✔
247
                                        $this->script,
104✔
248
                                        $this->command,
104✔
249
                                        $id,
104✔
250
                                        '--json',
104✔
251
                                        '--parameters',
104✔
252
                                        json_encode($parameters->toArray(), JSON_THROW_ON_ERROR),
104✔
253
                                ]),
104✔
254
                        );
104✔
255
                        $execution->start();
104✔
256

257
                        $jobExecutions[] = [$execution, $jobSchedule, $id];
104✔
258
                }
259

260
                return $jobExecutions;
104✔
261
        }
262

263
        /**
264
         * @param array<mixed> $raw
265
         */
266
        private function createSummary(array $raw, JobSchedule $jobSchedule): JobSummary
267
        {
268
                return new JobSummary(
64✔
269
                        new JobInfo(
64✔
270
                                $raw['info']['id'],
64✔
271
                                $raw['info']['name'],
64✔
272
                                $raw['info']['expression'],
64✔
273
                                $raw['info']['repeatAfterSeconds'],
64✔
274
                                $raw['info']['runSecond'],
64✔
275
                                DateTimeImmutable::createFromFormat('U.u', $raw['info']['start'][0])
64✔
276
                                        ->setTimezone(new DateTimeZone($raw['info']['start'][1])),
64✔
277
                                $jobSchedule->getTimeZone(),
64✔
278
                                $raw['info']['forcedRun'],
64✔
279
                        ),
64✔
280
                        new JobResult(
64✔
281
                                $jobSchedule->getExpression(),
64✔
282
                                DateTimeImmutable::createFromFormat('U.u', $raw['result']['end'][0])
64✔
283
                                        ->setTimezone(new DateTimeZone($raw['result']['end'][1])),
64✔
284
                                JobResultState::from($raw['result']['state']),
64✔
285
                        ),
64✔
286
                );
64✔
287
        }
288

289
        /**
290
         * @param int|string $id
291
         */
292
        private function createMaintenanceSummary(
293
                $id,
294
                JobSchedule $jobSchedule,
295
                int $runSecond,
296
                DateTimeImmutable $runStart
297
        ): JobSummary
298
        {
299
                $job = $jobSchedule->getJob();
32✔
300
                $timezone = $jobSchedule->getTimeZone();
32✔
301
                $now = $timezone !== null
32✔
NEW
302
                        ? $this->clock->now()->setTimezone($timezone)
×
303
                        : $this->clock->now();
32✔
304

305
                $info = new JobInfo(
32✔
306
                        $id,
32✔
307
                        $job->getName(),
32✔
308
                        $jobSchedule->getExpression()->getExpression(),
32✔
309
                        $jobSchedule->getRepeatAfterSeconds(),
32✔
310
                        $runSecond,
32✔
311
                        $timezone !== null ? $runStart->setTimezone($timezone) : $runStart,
32✔
312
                        $timezone,
32✔
313
                        false,
32✔
314
                );
32✔
315

316
                $result = new JobResult(
32✔
317
                        $jobSchedule->getExpression(),
32✔
318
                        $now,
32✔
319
                        JobResultState::maintenance(),
32✔
320
                );
32✔
321

322
                return new JobSummary($info, $result);
32✔
323
        }
324

325
        private function createSubprocessFail(Process $execution, string $output, string $errorOutput): JobProcessFailure
326
        {
327
                $message = Message::create()
32✔
328
                        ->withContext("Running job via command {$execution->getCommandLine()}")
32✔
329
                        ->withProblem('Job subprocess did not correctly write job result to stdout.')
32✔
330
                        ->with('Tip', 'Check the documentation for troubleshooting guide.')
32✔
331
                        ->with('Exit code', (string) $execution->getExitCode())
32✔
332
                        ->with('stdout', $output)
32✔
333
                        ->with('stderr', $errorOutput);
32✔
334

335
                return JobProcessFailure::create()
32✔
336
                        ->withMessage($message);
32✔
337
        }
338

339
        /**
340
         * @param int|string $jobId
341
         */
342
        private function logUnexpectedStderr(Process $execution, $jobId, string $stderr): void
343
        {
344
                $this->logger->warning("Subprocess running job '$jobId' produced unexpected stderr output.", [
8✔
345
                        'id' => $jobId,
8✔
346
                        'command' => $execution->getCommandLine(),
8✔
347
                        'exitCode' => $execution->getExitCode(),
8✔
348
                        'stderr' => $stderr,
8✔
349
                ]);
8✔
350
        }
351

352
        /**
353
         * @param int|string $jobId
354
         */
355
        private function logUnexpectedStdout(Process $execution, $jobId, string $stdout): void
356
        {
357
                $this->logger->warning("Subprocess running job '$jobId' produced unexpected stdout output.", [
8✔
358
                        'id' => $jobId,
8✔
359
                        'command' => $execution->getCommandLine(),
8✔
360
                        'exitCode' => $execution->getExitCode(),
8✔
361
                        'stdout' => $stdout,
8✔
362
                ]);
8✔
363
        }
364

365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc