• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 27822845282

19 Jun 2026 11:24AM UTC coverage: 94.8% (-0.2%) from 94.967%
27822845282

push

github

nekufa
Fix issue 92.

6 of 8 new or added lines in 1 file covered. (75.0%)

1 existing line in 1 file now uncovered.

1568 of 1654 relevant lines covered (94.8%)

21.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.77
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
76✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
76✔
43
        $this->config = $client->configuration;
76✔
44
    }
45

46
    public function getConnectMessage(): Connect
1✔
47
    {
48
        return $this->connectMessage;
1✔
49
    }
50

51
    public function getInfoMessage(): Info
2✔
52
    {
53
        return $this->infoMessage;
2✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
75✔
57
    {
58
        // null means use config timeout, 0 means non-blocking check
59
        if ($timeout === null) {
75✔
60
            $timeout = $this->config->timeout;
×
61
        }
62

63
        $now = microtime(true);
75✔
64
        $max = $timeout > 0 ? $now + $timeout : PHP_FLOAT_MAX;
75✔
65
        $iteration = 0;
75✔
66

67
        while (true) {
75✔
68
            if (!is_resource($this->socket) || feof($this->socket)) {
75✔
69
                throw new LogicException('supplied resource is not a valid stream resource');
1✔
70
            }
71

72
            $remainingTimeout = $max - microtime(true);
75✔
73
            if ($remainingTimeout <= 0) {
75✔
74
                break;
9✔
75
            }
76

77
            $read = [$this->socket];
75✔
78
            $write = null;
75✔
79
            $except = null;
75✔
80

81
            // Calculate timeout for stream_select
82
            if ($timeout === 0) {
75✔
83
                // Non-blocking check - use stream_select with 0 timeout to check if data is available
84
                $seconds = 0;
3✔
85
                $microseconds = 0;
3✔
86
            } else {
87
                $seconds = (int) floor($remainingTimeout);
75✔
88
                $microseconds = (int) (($remainingTimeout - $seconds) * 1_000_000);
75✔
89
            }
90

91
            $result = stream_select($read, $write, $except, $seconds, $microseconds);
75✔
92

93
            if ($result === false || $result === 0) {
75✔
94
                // For non-blocking check (timeout=0), exit immediately
95
                if ($timeout === 0) {
12✔
96
                    break;
3✔
97
                }
98
                // For blocking calls, continue waiting
99
                continue;
9✔
100
            }
101

102
            $message = null;
75✔
103
            $line = stream_get_line($this->socket, 1024, "\r\n");
75✔
104
            $now = microtime(true);
75✔
105
            if ($line) {
75✔
106
                $message = Factory::create($line);
75✔
107
                $this->activityAt = $now;
75✔
108
                if ($message instanceof Msg) {
75✔
109
                    $payload = $this->getPayload($message->length);
75✔
110
                    $message->parse($payload);
75✔
111
                    $message->setClient($this->client);
75✔
112
                    $this->logger?->debug('receive ' . $line . $payload);
75✔
113
                    return $message;
75✔
114
                }
115
                $this->logger?->debug('receive ' . $line);
75✔
116
                if ($message instanceof Ok) {
75✔
117
                    continue;
75✔
118
                } elseif ($message instanceof Ping) {
75✔
119
                    $this->sendMessage(new Pong([]));
1✔
120
                } elseif ($message instanceof Pong) {
75✔
121
                    $this->pongAt = $now;
8✔
122
                    return $message;
8✔
123
                } elseif ($message instanceof Info) {
75✔
124
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
75✔
125
                        $this->enableTls(true);
4✔
126
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
75✔
127
                        $this->enableTls(false);
×
128
                    }
129
                    return $message;
75✔
130
                }
131
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
39✔
132
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
7✔
133
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
134
                        $this->sendMessage(new Ping());
×
135
                    }
136
                }
137
            }
138
            if ($message && $now < $max) {
39✔
139
                $this->logger?->debug('sleep', compact('max', 'now'));
1✔
140
                $this->config->delay($iteration++);
1✔
141
            }
142
        }
143

144
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
12✔
145
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
1✔
146
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
147
                    $this->processException(new LogicException('Socket read timeout'));
×
148
                }
149
            }
150
        }
151

152
        return null;
12✔
153
    }
154

155
    public function ping(): bool
13✔
156
    {
157
        $this->sendMessage(new Ping());
13✔
158
        $this->getMessage($this->config->timeout);
9✔
159

160
        return $this->pingAt <= $this->pongAt;
8✔
161
    }
162

163
    public function sendMessage(Message $message): void
75✔
164
    {
165
        $this->init();
75✔
166

167
        $line = $message->render() . "\r\n";
75✔
168
        $length = strlen($line);
75✔
169
        $total = 0;
75✔
170

171
        $this->logger?->debug('send ' . $line);
75✔
172

173
        while ($total < $length) {
75✔
174
            try {
175
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
75✔
176
                if ($written === false) {
75✔
177
                    throw new LogicException('Error sending data');
×
178
                }
179
                if ($written === 0) {
75✔
180
                    throw new LogicException('Broken pipe or closed connection');
×
181
                }
182
                $total += $written;
75✔
183

184
                if ($length === $total) {
75✔
185
                    break;
75✔
186
                }
187
            } catch (Throwable $e) {
1✔
188
                $this->processException($e);
1✔
189
                $line = $message->render() . "\r\n";
1✔
190
            }
191
        }
192

193
        unset($line);
75✔
194

195
        if ($message instanceof Publish) {
75✔
196
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
75✔
197
                $prolongate = $message->payload->expires / 1_000_000_000;
17✔
198
                $this->prolongateTill = microtime(true) + $prolongate;
17✔
199
            }
200
        }
201
        if ($message instanceof Ping) {
75✔
202
            $this->pingAt = microtime(true);
9✔
203
        }
204
    }
205

206
    public function setLogger(?LoggerInterface $logger): void
×
207
    {
208
        $this->logger = $logger;
×
209
    }
210

211
    public function setTimeout(float $value): void
75✔
212
    {
213
        $this->init();
75✔
214
        $seconds = (int) floor($value);
75✔
215
        $microseconds = (int) (1000000 * ($value - $seconds));
75✔
216

217
        stream_set_timeout($this->socket, $seconds, $microseconds);
75✔
218
    }
219

220
    protected function init(): void
75✔
221
    {
222
        if ($this->socket) {
75✔
223
            return;
75✔
224
        }
225

226
        $config = $this->config;
75✔
227
        $dsn = "$config->host:$config->port";
75✔
228
        $flags = STREAM_CLIENT_CONNECT;
75✔
229
        $this->context = stream_context_create();
75✔
230
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
75✔
231

232
        if ($error || !$this->socket) {
75✔
233
            throw new Exception($errorMessage ?: "Connection error", $error);
1✔
234
        }
235

236
        $this->setTimeout($config->timeout);
75✔
237

238
        if ($config->tlsHandshakeFirst) {
75✔
239
            $this->enableTls(true);
×
240
        }
241

242
        $this->connectMessage = new Connect($config->getOptions());
75✔
243

244
        if ($this->client->getName()) {
75✔
245
            $this->connectMessage->name = $this->client->getName();
1✔
246
        }
247

248
        $infoMessage = $this->getMessage($config->timeout);
75✔
249
        if (is_null($infoMessage)) {
75✔
NEW
250
            throw new Exception("Timeout waiting for message from server.");
×
251
        }
252
        if (!$infoMessage instanceof Info) {
75✔
NEW
253
            throw new Exception("Received unexpected message type: " . $infoMessage::class);
×
254
        }
255
        $this->infoMessage = $infoMessage;
75✔
256

257
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
75✔
258
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
1✔
259
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
1✔
260
        }
261

262
        $this->sendMessage($this->connectMessage);
75✔
263
    }
264

265
    protected function enableTls(bool $requireClientCert): void
4✔
266
    {
267
        if ($requireClientCert) {
4✔
268
            if (!empty($this->config->tlsKeyFile)) {
4✔
269
                if (!file_exists($this->config->tlsKeyFile)) {
3✔
270
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
1✔
271
                }
272
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
2✔
273
            }
274
            if (!empty($this->config->tlsCertFile)) {
3✔
275
                if (!file_exists($this->config->tlsCertFile)) {
2✔
276
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
1✔
277
                }
278
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
1✔
279
            }
280
        }
281

282
        if (!empty($this->config->tlsCaFile)) {
2✔
283
            if (!file_exists($this->config->tlsCaFile)) {
2✔
284
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
1✔
285
            }
286
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
1✔
287
        }
288

289
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
1✔
290
            throw new Exception('Failed to connect: Error enabling TLS');
×
291
        }
292
    }
293

294
    protected function getPayload(int $length): string
75✔
295
    {
296
        $payload = '';
75✔
297
        $iteration = 0;
75✔
298
        while (strlen($payload) < $length) {
75✔
299
            $payloadLine = stream_get_line($this->socket, $length, '');
75✔
300
            if (!$payloadLine) {
75✔
301
                if ($iteration > 16) {
×
302
                    break;
×
303
                }
304
                $this->config->delay($iteration++);
×
305
                continue;
×
306
            }
307
            if (strlen($payloadLine) !== $length) {
75✔
308
                $this->logger?->debug(
×
309
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
310
                );
×
311
            }
312
            $payload .= $payloadLine;
75✔
313
        }
314
        return $payload;
75✔
315
    }
316

317
    private function processException(Throwable $e): void
1✔
318
    {
319
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
1✔
320

321
        if (!$this->config->reconnect) {
1✔
322
            throw $e;
×
323
        }
324

325
        $iteration = 0;
1✔
326

327
        while (true) {
1✔
328
            try {
329
                $this->socket = null;
1✔
330
                $this->init();
1✔
331
            } catch (Throwable $e) {
×
332
                $this->config->delay($iteration++);
×
333
                continue;
×
334
            }
335
            break;
1✔
336
        }
337

338
        foreach ($this->client->getSubscriptions() as $subscription) {
1✔
339
            $this->sendMessage(new Subscribe([
1✔
340
                'sid' => $subscription['sid'],
1✔
341
                'subject' => $subscription['name'],
1✔
342
            ]));
1✔
343
        }
344

345
        if ($this->client->requestsSubscribed()) {
1✔
346
            $this->client->subscribeRequests(true);
1✔
347
        }
348
    }
349

350
    public function setPacketSize(int $size): void
1✔
351
    {
352
        $this->packetSize = $size;
1✔
353
    }
354

355
    public function close(): void
75✔
356
    {
357
        if ($this->socket) {
75✔
358
            fclose($this->socket);
75✔
359
            $this->socket = null;
75✔
360
        }
361
    }
362
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc