• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orisai / scheduler / 24164245551

08 Apr 2026 11:36PM UTC coverage: 97.876% (-1.7%) from 99.561%
24164245551

push

github

mabar
Maintenance mode, run registry

406 of 414 new or added lines in 17 files covered. (98.07%)

30 existing lines in 4 files now uncovered.

2719 of 2778 relevant lines covered (97.88%)

66.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.99
/src/Command/WorkerCommand.php
1
<?php declare(strict_types = 1);
2

3
namespace Orisai\Scheduler\Command;
4

5
use Closure;
6
use DateTimeImmutable;
7
use Orisai\Clock\Adapter\ClockAdapterFactory;
8
use Orisai\Clock\Clock;
9
use Orisai\Clock\SystemClock;
10
use Orisai\Exceptions\Logic\InvalidState;
11
use Psr\Clock\ClockInterface;
12
use Symfony\Component\Console\Command\Command;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
use Symfony\Component\Process\PhpExecutableFinder;
17
use Symfony\Component\Process\Process;
18
use function array_merge;
19
use function assert;
20
use function function_exists;
21
use function is_bool;
22
use function pcntl_async_signals;
23
use function pcntl_signal;
24
use function pcntl_signal_get_handler;
25
use function trim;
26
use function usleep;
27
use const SIGINT;
28
use const SIGTERM;
29

30
/**
31
 * @infection-ignore-all
32
 */
33
final class WorkerCommand extends Command
34
{
35

36
        private Clock $clock;
37

38
        private bool $shouldStop = false;
39

40
        private ?int $testRuns = null;
41

42
        /** @var Closure(): void|null */
43
        private ?Closure $testCb = null;
44

45
        private string $script = 'bin/console';
46

47
        private string $command = 'scheduler:run';
48

49
        public function __construct(?ClockInterface $clock = null)
50
        {
51
                parent::__construct();
88✔
52
                $this->clock = ClockAdapterFactory::create($clock ?? new SystemClock());
88✔
53
        }
54

55
        public function setExecutable(string $script, string $command = 'scheduler:run'): void
56
        {
57
                $this->script = $script;
8✔
58
                $this->command = $command;
8✔
59
        }
60

61
        public static function getDefaultName(): string
62
        {
63
                return 'scheduler:worker';
88✔
64
        }
65

66
        public static function getDefaultDescription(): string
67
        {
68
                return 'Start the scheduler worker';
88✔
69
        }
70

71
        protected function configure(): void
72
        {
73
                parent::configure();
88✔
74
                $this->addOption(
88✔
75
                        'script',
88✔
76
                        's',
88✔
77
                        InputOption::VALUE_REQUIRED,
88✔
78
                        'Executable file for executing console commands',
88✔
79
                );
88✔
80
                $this->addOption(
88✔
81
                        'command',
88✔
82
                        'c',
88✔
83
                        InputOption::VALUE_REQUIRED,
88✔
84
                        'Name of executed command',
88✔
85
                );
88✔
86
                $this->addOption(
88✔
87
                        'force',
88✔
88
                        null,
88✔
89
                        InputOption::VALUE_NONE,
88✔
90
                        'Force run in non-interactive environment',
88✔
91
                );
88✔
92
        }
93

94
        protected function execute(InputInterface $input, OutputInterface $output): int
95
        {
96
                $previousHandlers = $this->registerSignalHandlers($output);
88✔
97

98
                $finder = new PhpExecutableFinder();
88✔
99
                $binary = $finder->find(false);
88✔
100
                $script = $input->getOption('script') ?? $this->script;
88✔
101
                $command = $input->getOption('command') ?? $this->command;
88✔
102
                $force = $input->getOption('force');
88✔
103
                assert(is_bool($force));
88✔
104

105
                // @codeCoverageIgnoreStart
106
                if ($binary === false) {
107
                        throw InvalidState::create()
108
                                ->withMessage('PHP executable could not be found, subprocess cannot be executed.');
109
                }
110

111
                // @codeCoverageIgnoreEnd
112

113
                $phpCommand = array_merge([$binary], $finder->findArguments(), [$script, $command]);
88✔
114

115
                if (!$force && !$input->isInteractive()) {
88✔
116
                        $output->writeln(
8✔
117
                                '<error>CLI is non-interactive. If you are sure you can terminate the worker and want to'
8✔
118
                                . ' bypass limitation, then use the --force option.</error>',
8✔
119
                        );
8✔
120

121
                        return self::FAILURE;
8✔
122
                }
123

124
                $output->writeln('<info>Running scheduled tasks every minute.</info>');
80✔
125

126
                $lastExecutionStartedAt = $this->nullSeconds($this->clock->now()->modify('-1 minute'));
80✔
127
                $executions = [];
80✔
128
                while (true) {
80✔
129
                        usleep(100_000);
80✔
130

131
                        if ($this->shouldStop) {
80✔
132
                                break;
24✔
133
                        }
134

135
                        $currentTime = $this->clock->now();
72✔
136

137
                        if (
138
                                (int) $currentTime->format('s') === 0
72✔
139
                                && $this->nullSeconds($currentTime)->format('U') !== $lastExecutionStartedAt->format('U')
72✔
140
                                && $this->testRuns !== 0
72✔
141
                        ) {
142
                                $executions[] = $execution = new Process($phpCommand);
56✔
143

144
                                // @codeCoverageIgnoreStart
145
                                if (Process::isTtySupported()) {
146
                                        $execution->setTty(true);
147
                                } elseif (Process::isPtySupported()) {
148
                                        $execution->setPty(true);
149
                                }
150

151
                                // @codeCoverageIgnoreEnd
152

153
                                $execution->start();
56✔
154
                                $lastExecutionStartedAt = $this->nullSeconds($this->clock->now());
56✔
155

156
                                if ($this->testRuns !== null) {
56✔
157
                                        $this->testRuns--;
56✔
158
                                        assert($this->testCb !== null);
56✔
159
                                        ($this->testCb)();
56✔
160
                                }
161
                        }
162

163
                        $this->processExecutions($executions, $output);
72✔
164

165
                        if ($this->testRuns === 0 && $executions === []) {
72✔
166
                                break;
56✔
167
                        }
168
                }
169

170
                // Wait for running subprocesses to finish
171
                while ($executions !== []) {
80✔
172
                        $this->processExecutions($executions, $output);
16✔
173

174
                        usleep(100_000);
16✔
175
                }
176

177
                if ($this->shouldStop) {
80✔
178
                        $output->writeln('<info>Scheduler worker stopped.</info>');
24✔
179
                }
180

181
                $this->restoreSignalHandlers($previousHandlers);
80✔
182

183
                return self::SUCCESS;
80✔
184
        }
185

186
        /**
187
         * @param array<Process> $executions
188
         * @param-out array<Process> $executions
189
         */
190
        private function processExecutions(array &$executions, OutputInterface $output): void
191
        {
192
                foreach ($executions as $key => $execution) {
72✔
193
                        $this->writeOutput($output, $execution);
56✔
194

195
                        if (!$execution->isRunning()) {
56✔
196
                                $this->writeOutput($output, $execution); // Process may write right before finish
56✔
197
                                unset($executions[$key]);
56✔
198
                        }
199
                }
200
        }
201

202
        /**
203
         * @return array{(callable(): mixed)|int, (callable(): mixed)|int}|null
204
         */
205
        private function registerSignalHandlers(OutputInterface $output): ?array
206
        {
207
                if (!function_exists('pcntl_async_signals')) {
88✔
NEW
208
                        return null;
×
209
                }
210

211
                pcntl_async_signals(true);
88✔
212

213
                $previousTermHandler = pcntl_signal_get_handler(SIGTERM);
88✔
214
                $previousIntHandler = pcntl_signal_get_handler(SIGINT);
88✔
215

216
                $handler = function (): void {
88✔
217
                        // @codeCoverageIgnoreStart
218
                        // Tested via real subprocess in SignalHandlingTest, but coverage is not collected from subprocesses
219
                        if ($this->shouldStop) {
220
                                exit(1);
221
                        }
222

223
                        // @codeCoverageIgnoreEnd
224
                        $this->shouldStop = true;
8✔
225
                };
88✔
226

227
                pcntl_signal(SIGTERM, $handler);
88✔
228
                pcntl_signal(SIGINT, $handler);
88✔
229

230
                return [$previousTermHandler, $previousIntHandler];
88✔
231
        }
232

233
        /**
234
         * @param array{(callable(): mixed)|int, (callable(): mixed)|int}|null $previousHandlers
235
         */
236
        private function restoreSignalHandlers(?array $previousHandlers): void
237
        {
238
                if ($previousHandlers === null) {
80✔
239
                        return; // @codeCoverageIgnore
240
                }
241

242
                pcntl_signal(SIGTERM, $previousHandlers[0]);
80✔
243
                pcntl_signal(SIGINT, $previousHandlers[1]);
80✔
244
        }
245

246
        private function writeOutput(OutputInterface $output, Process $process): void
247
        {
248
                $stdout = trim($process->getIncrementalOutput());
56✔
249
                if ($stdout !== '') {
56✔
250
                        $output->writeln($stdout);
37✔
251
                }
252

253
                $stderr = trim($process->getIncrementalErrorOutput());
56✔
254
                if ($stderr !== '') {
56✔
255
                        $output->writeln($stderr);
3✔
256
                }
257
        }
258

259
        private function nullSeconds(DateTimeImmutable $dt): DateTimeImmutable
260
        {
261
                return $dt->setTime(
80✔
262
                        (int) $dt->format('H'),
80✔
263
                        (int) $dt->format('i'),
80✔
264
                );
80✔
265
        }
266

267
        /**
268
         * @param Closure(): void $cb
269
         * @param-later-invoked-callable $cb
270
         *
271
         * @internal
272
         */
273
        public function enableTestMode(int $runs, Closure $cb): void
274
        {
275
                $this->testRuns = $runs;
88✔
276
                $this->testCb = $cb;
88✔
277
        }
278

279
        /**
280
         * @internal
281
         */
282
        public function requestStop(): void
283
        {
284
                $this->shouldStop = true;
16✔
285
        }
286

287
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc