• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 21854883870

10 Feb 2026 06:54AM UTC coverage: 94.916% (+0.006%) from 94.91%
21854883870

push

github

nekufa
stream validation exception fix

1 of 1 new or added line in 1 file covered. (100.0%)

20 existing lines in 1 file now uncovered.

1419 of 1495 relevant lines covered (94.92%)

18.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.63
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
63✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
63✔
43
        $this->config = $client->configuration;
63✔
44
    }
45

46
    public function getConnectMessage(): Connect
1✔
47
    {
48
        return $this->connectMessage;
1✔
49
    }
50

51
    public function getInfoMessage(): Info
2✔
52
    {
53
        return $this->infoMessage;
2✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
62✔
57
    {
58
        $now = microtime(true);
62✔
59
        $max = $now + $timeout;
62✔
60
        $iteration = 0;
62✔
61

62
        while (true) {
62✔
63
            if (!is_resource($this->socket) || feof($this->socket)) {
62✔
64
                throw new LogicException('supplied resource is not a valid stream resource');
1✔
65
            }
66
            $message = null;
62✔
67
            $line = stream_get_line($this->socket, 1024, "\r\n");
62✔
68
            $now = microtime(true);
62✔
69
            if ($line) {
62✔
70
                $message = Factory::create($line);
62✔
71
                $this->activityAt = $now;
62✔
72
                if ($message instanceof Msg) {
62✔
73
                    $payload = $this->getPayload($message->length);
62✔
74
                    $message->parse($payload);
62✔
75
                    $message->setClient($this->client);
62✔
76
                    $this->logger?->debug('receive ' . $line . $payload);
62✔
77
                    return $message;
62✔
78
                }
79
                $this->logger?->debug('receive ' . $line);
62✔
80
                if ($message instanceof Ok) {
62✔
UNCOV
81
                    continue;
×
82
                } elseif ($message instanceof Ping) {
62✔
UNCOV
83
                    $this->sendMessage(new Pong([]));
×
84
                } elseif ($message instanceof Pong) {
62✔
85
                    $this->pongAt = $now;
7✔
86
                } elseif ($message instanceof Info) {
62✔
87
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
62✔
88
                        $this->enableTls(true);
4✔
89
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
62✔
UNCOV
90
                        $this->enableTls(false);
×
91
                    }
92
                    return $message;
62✔
93
                }
94
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
62✔
95
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
10✔
UNCOV
96
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
UNCOV
97
                        $this->sendMessage(new Ping());
×
98
                    }
99
                }
100
            }
101
            if ($now > $max) {
62✔
102
                break;
14✔
103
            }
104
            if ($message && $now < $max) {
62✔
105
                $this->logger?->debug('sleep', compact('max', 'now'));
7✔
106
                $this->config->delay($iteration++);
7✔
107
            }
108
        }
109

110
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
14✔
111
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
4✔
UNCOV
112
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
UNCOV
113
                    $this->processException(new LogicException('Socket read timeout'));
×
114
                }
115
            }
116
        }
117

118
        return null;
14✔
119
    }
120

121
    public function ping(): bool
12✔
122
    {
123
        $this->sendMessage(new Ping());
12✔
124
        $this->getMessage($this->config->timeout);
8✔
125

126
        return $this->pingAt <= $this->pongAt;
7✔
127
    }
128

129
    public function sendMessage(Message $message)
62✔
130
    {
131
        $this->init();
62✔
132

133
        $line = $message->render() . "\r\n";
62✔
134
        $length = strlen($line);
62✔
135
        $total = 0;
62✔
136

137
        $this->logger?->debug('send ' . $line);
62✔
138

139
        while ($total < $length) {
62✔
140
            try {
141
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
62✔
142
                if ($written === false) {
62✔
143
                    throw new LogicException('Error sending data');
×
144
                }
145
                if ($written === 0) {
62✔
UNCOV
146
                    throw new LogicException('Broken pipe or closed connection');
×
147
                }
148
                $total += $written;
62✔
149

150
                if ($length == $total) {
62✔
151
                    break;
62✔
152
                }
153
            } catch (Throwable $e) {
1✔
154
                $this->processException($e);
1✔
155
                $line = $message->render() . "\r\n";
1✔
156
            }
157
        }
158

159
        unset($line);
62✔
160

161
        if ($message instanceof Publish) {
62✔
162
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
62✔
163
                $prolongate = $message->payload->expires / 1_000_000_000;
13✔
164
                $this->prolongateTill = microtime(true) + $prolongate;
13✔
165
            }
166
        }
167
        if ($message instanceof Ping) {
62✔
168
            $this->pingAt = microtime(true);
8✔
169
        }
170
    }
171

UNCOV
172
    public function setLogger(?LoggerInterface $logger)
×
173
    {
UNCOV
174
        $this->logger = $logger;
×
175
    }
176

177
    public function setTimeout(float $value)
62✔
178
    {
179
        $this->init();
62✔
180
        $seconds = (int) floor($value);
62✔
181
        $milliseconds = (int) (1000 * ($value - $seconds));
62✔
182

183
        stream_set_timeout($this->socket, $seconds, $milliseconds);
62✔
184
    }
185

186
    protected function init()
62✔
187
    {
188
        if ($this->socket) {
62✔
189
            return $this;
62✔
190
        }
191

192
        $config = $this->config;
62✔
193
        $dsn = "$config->host:$config->port";
62✔
194
        $flags = STREAM_CLIENT_CONNECT;
62✔
195
        $this->context = stream_context_create();
62✔
196
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
62✔
197

198
        if ($error || !$this->socket) {
62✔
199
            throw new Exception($errorMessage ?: "Connection error", $error);
1✔
200
        }
201

202
        $this->setTimeout($config->timeout);
62✔
203

204
        if ($config->tlsHandshakeFirst) {
62✔
UNCOV
205
            $this->enableTls(true);
×
206
        }
207

208
        $this->connectMessage = new Connect($config->getOptions());
62✔
209

210
        if ($this->client->getName()) {
62✔
211
            $this->connectMessage->name = $this->client->getName();
1✔
212
        }
213

214
        $this->infoMessage = $this->getMessage($config->timeout);
62✔
215
        assert($this->infoMessage instanceof Info);
216

217
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
62✔
218
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
1✔
219
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
1✔
220
        }
221

222
        $this->sendMessage($this->connectMessage);
62✔
223
    }
224

225
    protected function enableTls(bool $requireClientCert): void
4✔
226
    {
227
        if ($requireClientCert) {
4✔
228
            if (!empty($this->config->tlsKeyFile)) {
4✔
229
                if (!file_exists($this->config->tlsKeyFile)) {
3✔
230
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
1✔
231
                }
232
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
2✔
233
            }
234
            if (!empty($this->config->tlsCertFile)) {
3✔
235
                if (!file_exists($this->config->tlsCertFile)) {
2✔
236
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
1✔
237
                }
238
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
1✔
239
            }
240
        }
241

242
        if (!empty($this->config->tlsCaFile)) {
2✔
243
            if (!file_exists($this->config->tlsCaFile)) {
2✔
244
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
1✔
245
            }
246
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
1✔
247
        }
248

249
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
1✔
UNCOV
250
            throw new Exception('Failed to connect: Error enabling TLS');
×
251
        }
252
    }
253

254
    protected function getPayload(int $length): string
62✔
255
    {
256
        $payload = '';
62✔
257
        $iteration = 0;
62✔
258
        while (strlen($payload) < $length) {
62✔
259
            $payloadLine = stream_get_line($this->socket, $length, '');
62✔
260
            if (!$payloadLine) {
62✔
261
                if ($iteration > 16) {
×
262
                    break;
×
263
                }
UNCOV
264
                $this->config->delay($iteration++);
×
265
                continue;
×
266
            }
267
            if (strlen($payloadLine) != $length) {
62✔
UNCOV
268
                $this->logger?->debug(
×
UNCOV
269
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
UNCOV
270
                );
×
271
            }
272
            $payload .= $payloadLine;
62✔
273
        }
274
        return $payload;
62✔
275
    }
276

277
    private function processException(Throwable $e)
1✔
278
    {
279
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
1✔
280

281
        if (!$this->config->reconnect) {
1✔
UNCOV
282
            throw $e;
×
283
        }
284

285
        $iteration = 0;
1✔
286

287
        while (true) {
1✔
288
            try {
289
                $this->socket = null;
1✔
290
                $this->init();
1✔
UNCOV
291
            } catch (Throwable $e) {
×
UNCOV
292
                $this->config->delay($iteration++);
×
UNCOV
293
                continue;
×
294
            }
295
            break;
1✔
296
        }
297

298
        foreach ($this->client->getSubscriptions() as $subscription) {
1✔
299
            $this->sendMessage(new Subscribe([
1✔
300
                'sid' => $subscription['sid'],
1✔
301
                'subject' => $subscription['name'],
1✔
302
            ]));
1✔
303
        }
304

305
        if ($this->client->requestsSubscribed()) {
1✔
306
            $this->client->subscribeRequests(true);
1✔
307
        }
308
    }
309

310
    public function setPacketSize(int $size): void
1✔
311
    {
312
        $this->packetSize = $size;
1✔
313
    }
314

315
    public function close(): void
62✔
316
    {
317
        if ($this->socket) {
62✔
318
            fclose($this->socket);
62✔
319
            $this->socket = null;
62✔
320
        }
321
    }
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc