• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

PHP-CS-Fixer / PHP-CS-Fixer / 21223212585

21 Jan 2026 07:38PM UTC coverage: 92.939% (-0.04%) from 92.982%
21223212585

push

github

web-flow
refactor: add base key-existence check for cross-process communication in parallel runner (#9360)

9 of 25 new or added lines in 2 files covered. (36.0%)

5 existing lines in 2 files now uncovered.

29208 of 31427 relevant lines covered (92.94%)

44.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.16
/src/Console/Command/WorkerCommand.php
1
<?php
2

3
declare(strict_types=1);
4

5
/*
6
 * This file is part of PHP CS Fixer.
7
 *
8
 * (c) Fabien Potencier <fabien@symfony.com>
9
 *     Dariusz Rumiński <dariusz.ruminski@gmail.com>
10
 *
11
 * This source file is subject to the MIT license that is bundled
12
 * with this source code in the file LICENSE.
13
 */
14

15
namespace PhpCsFixer\Console\Command;
16

17
use Clue\React\NDJson\Decoder;
18
use Clue\React\NDJson\Encoder;
19
use PhpCsFixer\Cache\NullCacheManager;
20
use PhpCsFixer\Config;
21
use PhpCsFixer\Console\ConfigurationResolver;
22
use PhpCsFixer\Error\ErrorsManager;
23
use PhpCsFixer\Runner\Event\FileProcessed;
24
use PhpCsFixer\Runner\Parallel\ParallelAction;
25
use PhpCsFixer\Runner\Parallel\ParallelConfigFactory;
26
use PhpCsFixer\Runner\Parallel\ParallelisationException;
27
use PhpCsFixer\Runner\Runner;
28
use PhpCsFixer\ToolInfoInterface;
29
use React\EventLoop\StreamSelectLoop;
30
use React\Socket\ConnectionInterface;
31
use React\Socket\TcpConnector;
32
use Symfony\Component\Console\Attribute\AsCommand;
33
use Symfony\Component\Console\Command\Command;
34
use Symfony\Component\Console\Input\InputInterface;
35
use Symfony\Component\Console\Input\InputOption;
36
use Symfony\Component\Console\Output\ConsoleOutputInterface;
37
use Symfony\Component\Console\Output\OutputInterface;
38
use Symfony\Component\EventDispatcher\EventDispatcher;
39
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
40

41
/**
42
 * @author Greg Korba <greg@codito.dev>
43
 *
44
 * @internal
45
 *
46
 * @no-named-arguments Parameter names are not covered by the backward compatibility promise.
47
 */
48
#[AsCommand(name: 'worker', description: 'Internal command for running fixers in parallel', hidden: true)]
49
final class WorkerCommand extends Command
50
{
51
    /** @var string Prefix used before JSON-encoded error printed in the worker's process */
52
    public const ERROR_PREFIX = 'WORKER_ERROR::';
53

54
    /** @TODO PHP 8.0 - remove the property */
55
    protected static $defaultName = 'worker';
56

57
    /** @TODO PHP 8.0 - remove the property */
58
    protected static $defaultDescription = 'Internal command for running fixers in parallel';
59

60
    private ToolInfoInterface $toolInfo;
61
    private ConfigurationResolver $configurationResolver;
62
    private ErrorsManager $errorsManager;
63
    private EventDispatcherInterface $eventDispatcher;
64

65
    /** @var list<FileProcessed> */
66
    private array $events;
67

68
    public function __construct(ToolInfoInterface $toolInfo)
69
    {
70
        parent::__construct();
5✔
71

72
        $this->setHidden(true);
5✔
73
        $this->toolInfo = $toolInfo;
5✔
74
        $this->errorsManager = new ErrorsManager();
5✔
75
        $this->eventDispatcher = new EventDispatcher();
5✔
76
    }
77

78
    protected function configure(): void
79
    {
80
        $this->setDefinition(
5✔
81
            [
5✔
82
                new InputOption('port', null, InputOption::VALUE_REQUIRED, 'Specifies parallelisation server\'s port.'),
5✔
83
                new InputOption('identifier', null, InputOption::VALUE_REQUIRED, 'Specifies parallelisation process\' identifier.'),
5✔
84
                new InputOption('allow-risky', '', InputOption::VALUE_REQUIRED, HelpCommand::getDescriptionWithAllowedValues('Are risky fixers allowed (%s).', ConfigurationResolver::BOOL_VALUES), null, ConfigurationResolver::BOOL_VALUES),
5✔
85
                new InputOption('config', '', InputOption::VALUE_REQUIRED, 'The path to a config file.'),
5✔
86
                new InputOption('dry-run', '', InputOption::VALUE_NONE, 'Only shows which files would have been modified.'),
5✔
87
                new InputOption('rules', '', InputOption::VALUE_REQUIRED, 'List of rules that should be run against configured paths.'),
5✔
88
                new InputOption('using-cache', '', InputOption::VALUE_REQUIRED, HelpCommand::getDescriptionWithAllowedValues('Should cache be used (%s).', ConfigurationResolver::BOOL_VALUES), null, ConfigurationResolver::BOOL_VALUES),
5✔
89
                new InputOption('cache-file', '', InputOption::VALUE_REQUIRED, 'The path to the cache file.'),
5✔
90
                new InputOption('diff', '', InputOption::VALUE_NONE, 'Prints diff for each file.'),
5✔
91
                new InputOption('stop-on-violation', '', InputOption::VALUE_NONE, 'Stop execution on first violation.'),
5✔
92
            ],
5✔
93
        );
5✔
94
    }
95

96
    protected function execute(InputInterface $input, OutputInterface $output): int
97
    {
98
        $errorOutput = $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output;
3✔
99
        $identifier = $input->getOption('identifier');
3✔
100
        $port = $input->getOption('port');
3✔
101

102
        if (null === $identifier || !is_numeric($port)) {
3✔
103
            throw new ParallelisationException('Missing parallelisation options');
2✔
104
        }
105

106
        try {
107
            $runner = $this->createRunner($input);
1✔
108
        } catch (\Throwable $e) {
×
109
            throw new ParallelisationException('Unable to create runner: '.$e->getMessage(), 0, $e);
×
110
        }
111

112
        $loop = new StreamSelectLoop();
1✔
113
        $tcpConnector = new TcpConnector($loop);
1✔
114
        $tcpConnector
1✔
115
            ->connect(\sprintf('127.0.0.1:%d', $port))
1✔
116
            ->then(
1✔
117
                /** @codeCoverageIgnore */
118
                function (ConnectionInterface $connection) use ($loop, $runner, $identifier): void {
1✔
119
                    $out = new Encoder($connection, \JSON_INVALID_UTF8_IGNORE);
×
120
                    $in = new Decoder($connection, true, 512, \JSON_INVALID_UTF8_IGNORE);
×
121

122
                    // [REACT] Initialise connection with the parallelisation operator
123
                    $out->write(['action' => ParallelAction::WORKER_HELLO, 'identifier' => $identifier]);
×
124

125
                    $handleError = static function (\Throwable $error) use ($out): void {
×
126
                        $out->write([
×
127
                            'action' => ParallelAction::WORKER_ERROR_REPORT,
×
128
                            'class' => \get_class($error),
×
129
                            'message' => $error->getMessage(),
×
130
                            'file' => $error->getFile(),
×
131
                            'line' => $error->getLine(),
×
132
                            'code' => $error->getCode(),
×
133
                            'trace' => $error->getTraceAsString(),
×
134
                        ]);
×
135
                    };
×
136
                    $out->on('error', $handleError);
×
137
                    $in->on('error', $handleError);
×
138

139
                    // [REACT] Listen for messages from the parallelisation operator (analysis requests)
140
                    $in->on('data', function (array $json) use ($loop, $runner, $out): void {
×
NEW
141
                        \assert(isset($json['action']));
×
142

NEW
143
                        $action = $json['action'];
×
144

145
                        // Parallelisation operator does not have more to do, let's close the connection
146
                        if (ParallelAction::RUNNER_THANK_YOU === $action) {
×
147
                            // no payload to assert on
148

UNCOV
149
                            $loop->stop();
×
150

151
                            return;
×
152
                        }
153

154
                        if (ParallelAction::RUNNER_REQUEST_ANALYSIS !== $action) {
×
155
                            // At this point we only expect analysis requests, if any other action happen, we need to fix the code.
156
                            throw new \LogicException(\sprintf('Unexpected action ParallelAction::%s.', $action));
×
157
                        }
158

NEW
159
                        \assert(isset(
×
NEW
160
                            $json['files'],
×
NEW
161
                        ));
×
162

163
                        /** @var iterable<int, string> $files */
164
                        $files = $json['files'];
×
165

166
                        foreach ($files as $path) {
×
167
                            // Reset events because we want to collect only those coming from analysed files chunk
168
                            $this->events = [];
×
169
                            $runner->setFileIterator(new \ArrayIterator([new \SplFileInfo($path)]));
×
170
                            $analysisResult = $runner->fix();
×
171

172
                            if (1 !== \count($this->events)) {
×
173
                                throw new ParallelisationException('Runner did not report a fixing event or reported too many.');
×
174
                            }
175

176
                            if (1 < \count($analysisResult)) {
×
177
                                throw new ParallelisationException('Runner returned more analysis results than expected.');
×
178
                            }
179

180
                            $out->write([
×
181
                                'action' => ParallelAction::WORKER_RESULT,
×
NEW
182
                                'errors' => $this->errorsManager->forPath($path),
×
183
                                'file' => $path,
×
184
                                'fileHash' => $this->events[0]->getFileHash(),
×
185
                                'fixInfo' => array_pop($analysisResult),
×
186
                                'memoryUsage' => memory_get_peak_usage(true),
×
NEW
187
                                'status' => $this->events[0]->getStatus(),
×
UNCOV
188
                            ]);
×
189
                        }
190

191
                        // Request another file chunk (if available, the parallelisation operator will request new "run" action)
192
                        $out->write(['action' => ParallelAction::WORKER_GET_FILE_CHUNK]);
×
193
                    });
×
194
                },
1✔
195
                static function (\Throwable $error) use ($errorOutput): void {
1✔
196
                    // @TODO Verify onRejected behaviour → https://github.com/PHP-CS-Fixer/PHP-CS-Fixer/pull/7777#discussion_r1590399285
197
                    $errorOutput->writeln($error->getMessage());
1✔
198
                },
1✔
199
            )
1✔
200
        ;
1✔
201

202
        $loop->run();
1✔
203

204
        return Command::SUCCESS;
1✔
205
    }
206

207
    private function createRunner(InputInterface $input): Runner
208
    {
209
        $passedConfig = $input->getOption('config');
1✔
210
        $passedRules = $input->getOption('rules');
1✔
211

212
        if (null !== $passedConfig && ConfigurationResolver::IGNORE_CONFIG_FILE !== $passedConfig && null !== $passedRules) {
1✔
213
            throw new \RuntimeException('Passing both `--config` and `--rules` options is not allowed');
×
214
        }
215

216
        // There's no one single source of truth when it comes to fixing single file, we need to collect statuses from events.
217
        $this->eventDispatcher->addListener(FileProcessed::NAME, function (FileProcessed $event): void {
1✔
218
            $this->events[] = $event;
×
219
        });
1✔
220

221
        $this->configurationResolver = new ConfigurationResolver(
1✔
222
            new Config(),
1✔
223
            [
1✔
224
                'allow-risky' => $input->getOption('allow-risky'),
1✔
225
                'config' => $passedConfig,
1✔
226
                'dry-run' => $input->getOption('dry-run'),
1✔
227
                'rules' => $passedRules,
1✔
228
                'path' => [],
1✔
229
                'path-mode' => ConfigurationResolver::PATH_MODE_OVERRIDE, // IMPORTANT! WorkerCommand is called with file that already passed filtering, so here we can rely on PATH_MODE_OVERRIDE.
1✔
230
                'using-cache' => $input->getOption('using-cache'),
1✔
231
                'cache-file' => $input->getOption('cache-file'),
1✔
232
                'diff' => $input->getOption('diff'),
1✔
233
                'stop-on-violation' => $input->getOption('stop-on-violation'),
1✔
234
            ],
1✔
235
            getcwd(), // @phpstan-ignore argument.type
1✔
236
            $this->toolInfo,
1✔
237
        );
1✔
238

239
        return new Runner(
1✔
240
            null, // Paths are known when parallelisation server requests new chunk, not now
1✔
241
            $this->configurationResolver->getFixers(),
1✔
242
            $this->configurationResolver->getDiffer(),
1✔
243
            $this->eventDispatcher,
1✔
244
            $this->errorsManager,
1✔
245
            $this->configurationResolver->getLinter(),
1✔
246
            $this->configurationResolver->isDryRun(),
1✔
247
            new NullCacheManager(), // IMPORTANT! We pass null cache, as cache is read&write in main process and we do not need to do it again.
1✔
248
            $this->configurationResolver->getDirectory(),
1✔
249
            $this->configurationResolver->shouldStopOnViolation(),
1✔
250
            ParallelConfigFactory::sequential(), // IMPORTANT! Worker must run in sequential mode.
1✔
251
            null,
1✔
252
            $this->configurationResolver->getConfigFile(),
1✔
253
            $this->configurationResolver->getRuleCustomisationPolicy(),
1✔
254
        );
1✔
255
    }
256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc