• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

keradus / PHP-CS-Fixer / 17319949156

29 Aug 2025 09:20AM UTC coverage: 94.696% (-0.05%) from 94.744%
17319949156

push

github

keradus
CS

28333 of 29920 relevant lines covered (94.7%)

45.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.89
/src/Runner/Parallel/Process.php
1
<?php
2

3
declare(strict_types=1);
4

5
/*
6
 * This file is part of PHP CS Fixer.
7
 *
8
 * (c) Fabien Potencier <fabien@symfony.com>
9
 *     Dariusz RumiƄski <dariusz.ruminski@gmail.com>
10
 *
11
 * This source file is subject to the MIT license that is bundled
12
 * with this source code in the file LICENSE.
13
 */
14

15
namespace PhpCsFixer\Runner\Parallel;
16

17
use React\ChildProcess\Process as ReactProcess;
18
use React\EventLoop\LoopInterface;
19
use React\EventLoop\TimerInterface;
20
use React\Stream\ReadableStreamInterface;
21
use React\Stream\WritableStreamInterface;
22

23
/**
24
 * Represents single process that is handled within parallel run.
25
 * Inspired by:
26
 *   - https://github.com/phpstan/phpstan-src/blob/9ce425bca5337039fb52c0acf96a20a2b8ace490/src/Parallel/Process.php
27
 *   - https://github.com/phpstan/phpstan-src/blob/1477e752b4b5893f323b6d2c43591e68b3d85003/src/Process/ProcessHelper.php.
28
 *
29
 * @author Greg Korba <greg@codito.dev>
30
 *
31
 * @internal
32
 *
33
 * @no-named-arguments Parameter names are not covered by the backward compatibility promise.
34
 */
35
final class Process
36
{
37
    // Properties required for process instantiation
38
    private string $command;
39
    private LoopInterface $loop;
40
    private int $timeoutSeconds;
41

42
    // Properties required for process execution
43
    private ?ReactProcess $process = null;
44
    private ?WritableStreamInterface $in = null;
45

46
    /** @var resource */
47
    private $stdErr;
48

49
    /** @var resource */
50
    private $stdOut;
51

52
    /** @var callable(array<array-key, mixed>): void */
53
    private $onData;
54

55
    /** @var callable(\Throwable): void */
56
    private $onError;
57

58
    private ?TimerInterface $timer = null;
59

60
    public function __construct(string $command, LoopInterface $loop, int $timeoutSeconds)
61
    {
62
        $this->command = $command;
1✔
63
        $this->loop = $loop;
1✔
64
        $this->timeoutSeconds = $timeoutSeconds;
1✔
65
    }
66

67
    /**
68
     * @param callable(array<array-key, mixed> $json): void  $onData  callback to be called when data is received from the parallelisation operator
69
     * @param callable(\Throwable $exception): void          $onError callback to be called when an exception occurs
70
     * @param callable(?int $exitCode, string $output): void $onExit  callback to be called when the process exits
71
     */
72
    public function start(callable $onData, callable $onError, callable $onExit): void
73
    {
74
        $stdOut = tmpfile();
×
75
        if (false === $stdOut) {
×
76
            throw new ParallelisationException('Failed creating temp file for stdOut.');
×
77
        }
78
        $this->stdOut = $stdOut;
×
79

80
        $stdErr = tmpfile();
×
81
        if (false === $stdErr) {
×
82
            throw new ParallelisationException('Failed creating temp file for stdErr.');
×
83
        }
84
        $this->stdErr = $stdErr;
×
85

86
        $this->onData = $onData;
×
87
        $this->onError = $onError;
×
88

89
        $this->process = new ReactProcess($this->command, null, null, [
×
90
            1 => $this->stdOut,
×
91
            2 => $this->stdErr,
×
92
        ]);
×
93
        $this->process->start($this->loop);
×
94
        $this->process->on('exit', function ($exitCode) use ($onExit): void {
×
95
            $this->cancelTimer();
×
96

97
            $output = '';
×
98
            rewind($this->stdOut);
×
99
            $stdOut = stream_get_contents($this->stdOut);
×
100
            if (\is_string($stdOut)) {
×
101
                $output .= $stdOut;
×
102
            }
103

104
            rewind($this->stdErr);
×
105
            $stdErr = stream_get_contents($this->stdErr);
×
106
            if (\is_string($stdErr)) {
×
107
                $output .= $stdErr;
×
108
            }
109

110
            $onExit($exitCode, $output);
×
111

112
            fclose($this->stdOut);
×
113
            fclose($this->stdErr);
×
114
        });
×
115
    }
116

117
    /**
118
     * Handles requests from parallelisation operator to its worker (spawned process).
119
     *
120
     * @param array<array-key, mixed> $data
121
     */
122
    public function request(array $data): void
123
    {
124
        $this->cancelTimer(); // Configured process timeout actually means "chunk timeout" (each request resets timer)
1✔
125

126
        if (null === $this->in) {
1✔
127
            throw new ParallelisationException(
1✔
128
                'Process not connected with parallelisation operator, ensure `bindConnection()` was called'
1✔
129
            );
1✔
130
        }
131

132
        $this->in->write($data);
×
133
        $this->timer = $this->loop->addTimer($this->timeoutSeconds, function (): void {
×
134
            ($this->onError)(
×
135
                new \Exception(
×
136
                    \sprintf(
×
137
                        'Child process timed out after %d seconds. Try making it longer using `ParallelConfig`.',
×
138
                        $this->timeoutSeconds
×
139
                    )
×
140
                )
×
141
            );
×
142
        });
×
143
    }
144

145
    public function quit(): void
146
    {
147
        $this->cancelTimer();
×
148
        if (null === $this->process || !$this->process->isRunning()) {
×
149
            return;
×
150
        }
151

152
        foreach ($this->process->pipes as $pipe) {
×
153
            $pipe->close();
×
154
        }
155

156
        if (null === $this->in) {
×
157
            return;
×
158
        }
159

160
        $this->in->end();
×
161
    }
162

163
    public function bindConnection(ReadableStreamInterface $out, WritableStreamInterface $in): void
164
    {
165
        $this->in = $in;
×
166

167
        $in->on('error', function (\Throwable $error): void {
×
168
            ($this->onError)($error);
×
169
        });
×
170

171
        $out->on('data', function (array $json): void {
×
172
            $this->cancelTimer();
×
173

174
            // Pass everything to the parallelisation operator, it should decide how to handle the data
175
            ($this->onData)($json);
×
176
        });
×
177
        $out->on('error', function (\Throwable $error): void {
×
178
            ($this->onError)($error);
×
179
        });
×
180
    }
181

182
    private function cancelTimer(): void
183
    {
184
        if (null === $this->timer) {
1✔
185
            return;
1✔
186
        }
187

188
        $this->loop->cancelTimer($this->timer);
×
189
        $this->timer = null;
×
190
    }
191
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc