• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

orisai / scheduler / 24056471189

06 Apr 2026 11:33PM UTC coverage: 97.997% (-1.6%) from 99.561%
24056471189

push

github

mabar
Maintenance mode

308 of 318 new or added lines in 15 files covered. (96.86%)

33 existing lines in 4 files now uncovered.

2593 of 2646 relevant lines covered (98.0%)

61.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.26
/src/Command/WorkerCommand.php
1
<?php declare(strict_types = 1);
2

3
namespace Orisai\Scheduler\Command;
4

5
use Closure;
6
use DateTimeImmutable;
7
use Orisai\Clock\Adapter\ClockAdapterFactory;
8
use Orisai\Clock\Clock;
9
use Orisai\Clock\SystemClock;
10
use Orisai\Exceptions\Logic\InvalidState;
11
use Psr\Clock\ClockInterface;
12
use Symfony\Component\Console\Command\Command;
13
use Symfony\Component\Console\Input\InputInterface;
14
use Symfony\Component\Console\Input\InputOption;
15
use Symfony\Component\Console\Output\OutputInterface;
16
use Symfony\Component\Process\PhpExecutableFinder;
17
use Symfony\Component\Process\Process;
18
use function array_merge;
19
use function assert;
20
use function function_exists;
21
use function is_bool;
22
use function pcntl_async_signals;
23
use function pcntl_signal;
24
use function pcntl_signal_get_handler;
25
use function trim;
26
use function usleep;
27
use const SIGINT;
28
use const SIGTERM;
29

30
/**
31
 * @infection-ignore-all
32
 */
33
final class WorkerCommand extends Command
34
{
35

36
        private Clock $clock;
37

38
        private bool $shouldStop = false;
39

40
        private ?int $testRuns = null;
41

42
        /** @var Closure(): void|null */
43
        private ?Closure $testCb = null;
44

45
        private string $script = 'bin/console';
46

47
        private string $command = 'scheduler:run';
48

49
        public function __construct(?ClockInterface $clock = null)
50
        {
51
                parent::__construct();
88✔
52
                $this->clock = ClockAdapterFactory::create($clock ?? new SystemClock());
88✔
53
        }
54

55
        public function setExecutable(string $script, string $command = 'scheduler:run'): void
56
        {
57
                $this->script = $script;
8✔
58
                $this->command = $command;
8✔
59
        }
60

61
        public static function getDefaultName(): string
62
        {
63
                return 'scheduler:worker';
88✔
64
        }
65

66
        public static function getDefaultDescription(): string
67
        {
68
                return 'Start the scheduler worker';
88✔
69
        }
70

71
        protected function configure(): void
72
        {
73
                parent::configure();
88✔
74
                $this->addOption(
88✔
75
                        'script',
88✔
76
                        's',
88✔
77
                        InputOption::VALUE_REQUIRED,
88✔
78
                        'Executable file for executing console commands',
88✔
79
                );
88✔
80
                $this->addOption(
88✔
81
                        'command',
88✔
82
                        'c',
88✔
83
                        InputOption::VALUE_REQUIRED,
88✔
84
                        'Name of executed command',
88✔
85
                );
88✔
86
                $this->addOption(
88✔
87
                        'force',
88✔
88
                        null,
88✔
89
                        InputOption::VALUE_NONE,
88✔
90
                        'Force run in non-interactive environment',
88✔
91
                );
88✔
92
        }
93

94
        protected function execute(InputInterface $input, OutputInterface $output): int
95
        {
96
                $previousHandlers = $this->registerSignalHandlers();
88✔
97

98
                $finder = new PhpExecutableFinder();
88✔
99
                $binary = $finder->find(false);
88✔
100
                $script = $input->getOption('script') ?? $this->script;
88✔
101
                $command = $input->getOption('command') ?? $this->command;
88✔
102
                $force = $input->getOption('force');
88✔
103
                assert(is_bool($force));
88✔
104

105
                if ($binary === false) {
88✔
106
                        throw InvalidState::create()
×
107
                                ->withMessage('PHP executable could not be found, subprocess cannot be executed.');
×
108
                }
109

110
                $phpCommand = array_merge([$binary], $finder->findArguments(), [$script, $command]);
88✔
111

112
                if (!$force && !$input->isInteractive()) {
88✔
113
                        $output->writeln(
8✔
114
                                '<error>CLI is non-interactive. If you are sure you can terminate the worker and want to'
8✔
115
                                . ' bypass limitation, then use the --force option.</error>',
8✔
116
                        );
8✔
117

118
                        return self::FAILURE;
8✔
119
                }
120

121
                $output->writeln('<info>Running scheduled tasks every minute.</info>');
80✔
122

123
                $lastExecutionStartedAt = $this->nullSeconds($this->clock->now()->modify('-1 minute'));
80✔
124
                $executions = [];
80✔
125
                while (true) {
80✔
126
                        usleep(100_000);
80✔
127

128
                        if ($this->shouldStop) {
80✔
129
                                break;
24✔
130
                        }
131

132
                        $currentTime = $this->clock->now();
72✔
133

134
                        if (
135
                                (int) $currentTime->format('s') === 0
72✔
136
                                && $this->nullSeconds($currentTime)->format('U') !== $lastExecutionStartedAt->format('U')
72✔
137
                                && $this->testRuns !== 0
72✔
138
                        ) {
139
                                $executions[] = $execution = new Process($phpCommand);
56✔
140

141
                                // @codeCoverageIgnoreStart
142
                                if (Process::isTtySupported()) {
143
                                        $execution->setTty(true);
144
                                } elseif (Process::isPtySupported()) {
145
                                        $execution->setPty(true);
146
                                }
147

148
                                // @codeCoverageIgnoreEnd
149

150
                                $execution->start();
56✔
151
                                $lastExecutionStartedAt = $this->nullSeconds($this->clock->now());
56✔
152

153
                                if ($this->testRuns !== null) {
56✔
154
                                        $this->testRuns--;
56✔
155
                                        assert($this->testCb !== null);
56✔
156
                                        ($this->testCb)();
56✔
157
                                }
158
                        }
159

160
                        foreach ($executions as $key => $execution) {
72✔
161
                                $this->writeOutput($output, $execution);
56✔
162

163
                                if (!$execution->isRunning()) {
56✔
164
                                        $this->writeOutput($output, $execution); // Process may write right before finish
40✔
165
                                        unset($executions[$key]);
40✔
166
                                }
167
                        }
168

169
                        if ($this->testRuns === 0 && $executions === []) {
72✔
170
                                break;
56✔
171
                        }
172
                }
173

174
                // Wait for running subprocesses to finish
175
                while ($executions !== []) {
80✔
176
                        foreach ($executions as $key => $execution) {
16✔
177
                                $this->writeOutput($output, $execution);
16✔
178

179
                                if (!$execution->isRunning()) {
16✔
180
                                        $this->writeOutput($output, $execution);
16✔
181
                                        unset($executions[$key]);
16✔
182
                                }
183
                        }
184

185
                        usleep(100_000);
16✔
186
                }
187

188
                if ($this->shouldStop) {
80✔
189
                        $output->writeln('<info>Scheduler worker stopped.</info>');
24✔
190
                }
191

192
                $this->restoreSignalHandlers($previousHandlers);
80✔
193

194
                return self::SUCCESS;
80✔
195
        }
196

197
        /**
198
         * @return array{callable|int, callable|int}|null
199
         */
200
        private function registerSignalHandlers(): ?array
201
        {
202
                if (!function_exists('pcntl_async_signals')) {
88✔
NEW
203
                        return null;
×
204
                }
205

206
                pcntl_async_signals(true);
88✔
207

208
                $previousTermHandler = pcntl_signal_get_handler(SIGTERM);
88✔
209
                $previousIntHandler = pcntl_signal_get_handler(SIGINT);
88✔
210

211
                $handler = function (): void {
88✔
212
                        if ($this->shouldStop) {
8✔
213
                                // Second signal - force exit, tested via real subprocess in SignalHandlingTest
214
                                exit(1); // @codeCoverageIgnore
215
                        }
216

217
                        $this->shouldStop = true;
8✔
218
                };
88✔
219

220
                pcntl_signal(SIGTERM, $handler);
88✔
221
                pcntl_signal(SIGINT, $handler);
88✔
222

223
                return [$previousTermHandler, $previousIntHandler];
88✔
224
        }
225

226
        /**
227
         * @param array{callable|int, callable|int}|null $previousHandlers
228
         */
229
        private function restoreSignalHandlers(?array $previousHandlers): void
230
        {
231
                if ($previousHandlers === null) {
80✔
NEW
232
                        return;
×
233
                }
234

235
                pcntl_signal(SIGTERM, $previousHandlers[0]);
80✔
236
                pcntl_signal(SIGINT, $previousHandlers[1]);
80✔
237
        }
238

239
        private function writeOutput(OutputInterface $output, Process $process): void
240
        {
241
                $stdout = trim($process->getIncrementalOutput());
56✔
242
                if ($stdout !== '') {
56✔
243
                        $output->writeln($stdout);
37✔
244
                }
245

246
                $stderr = trim($process->getIncrementalErrorOutput());
56✔
247
                if ($stderr !== '') {
56✔
248
                        $output->writeln($stderr);
3✔
249
                }
250
        }
251

252
        private function nullSeconds(DateTimeImmutable $dt): DateTimeImmutable
253
        {
254
                return $dt->setTime(
80✔
255
                        (int) $dt->format('H'),
80✔
256
                        (int) $dt->format('i'),
80✔
257
                );
80✔
258
        }
259

260
        /**
261
         * @param Closure(): void $cb
262
         * @param-later-invoked-callable $cb
263
         *
264
         * @internal
265
         */
266
        public function enableTestMode(int $runs, Closure $cb): void
267
        {
268
                $this->testRuns = $runs;
88✔
269
                $this->testCb = $cb;
88✔
270
        }
271

272
        /**
273
         * @internal
274
         */
275
        public function requestStop(): void
276
        {
277
                $this->shouldStop = true;
16✔
278
        }
279

280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc