• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23218288118

17 Mar 2026 09:57PM UTC coverage: 94.977% (-0.1%) from 95.104%
23218288118

push

github

nekufa
pong msg should be returned from connection #119

1 of 1 new or added line in 1 file covered. (100.0%)

5 existing lines in 2 files now uncovered.

1456 of 1533 relevant lines covered (94.98%)

79.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.81
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
272✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
272✔
43
        $this->config = $client->configuration;
272✔
44
    }
45

46
    public function getConnectMessage(): Connect
4✔
47
    {
48
        return $this->connectMessage;
4✔
49
    }
50

51
    public function getInfoMessage(): Info
8✔
52
    {
53
        return $this->infoMessage;
8✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
268✔
57
    {
58
        // null means use config timeout, 0 means non-blocking check
59
        if ($timeout === null) {
268✔
60
            $timeout = $this->config->timeout;
×
61
        }
62

63
        $now = microtime(true);
268✔
64
        $max = $timeout > 0 ? $now + $timeout : PHP_FLOAT_MAX;
268✔
65
        $iteration = 0;
268✔
66

67
        while (true) {
268✔
68
            if (!is_resource($this->socket) || feof($this->socket)) {
268✔
69
                throw new LogicException('supplied resource is not a valid stream resource');
4✔
70
            }
71

72
            $remainingTimeout = $max - microtime(true);
268✔
73
            if ($remainingTimeout <= 0) {
268✔
74
                break;
×
75
            }
76

77
            $read = [$this->socket];
268✔
78
            $write = null;
268✔
79
            $except = null;
268✔
80

81
            // Calculate timeout for stream_select
82
            if ($timeout == 0) {
268✔
83
                // Non-blocking check - use minimal poll interval
84
                $seconds = 0;
12✔
85
                $microseconds = 100; // 0.1ms - minimal reasonable poll
12✔
86
            } else {
87
                $seconds = (int) floor($remainingTimeout);
268✔
88
                $microseconds = (int) (($remainingTimeout - $seconds) * 1_000_000);
268✔
89
            }
90

91
            $result = stream_select($read, $write, $except, $seconds, $microseconds);
268✔
92

93
            if ($result === false || $result === 0) {
268✔
94
                break;
37✔
95
            }
96

97
            $message = null;
268✔
98
            $line = stream_get_line($this->socket, 1024, "\r\n");
268✔
99
            $now = microtime(true);
268✔
100
            if ($line) {
268✔
101
                $message = Factory::create($line);
268✔
102
                $this->activityAt = $now;
268✔
103
                if ($message instanceof Msg) {
268✔
104
                    $payload = $this->getPayload($message->length);
268✔
105
                    $message->parse($payload);
268✔
106
                    $message->setClient($this->client);
268✔
107
                    $this->logger?->debug('receive ' . $line . $payload);
268✔
108
                    return $message;
268✔
109
                }
110
                $this->logger?->debug('receive ' . $line);
268✔
111
                if ($message instanceof Ok) {
268✔
112
                    continue;
268✔
113
                } elseif ($message instanceof Ping) {
268✔
UNCOV
114
                    $this->sendMessage(new Pong([]));
1✔
115
                } elseif ($message instanceof Pong) {
268✔
116
                    $this->pongAt = $now;
32✔
117
                    return $message;
32✔
118
                } elseif ($message instanceof Info) {
268✔
119
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
268✔
120
                        $this->enableTls(true);
16✔
121
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
268✔
122
                        $this->enableTls(false);
×
123
                    }
124
                    return $message;
268✔
125
                }
126
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
124✔
127
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
28✔
128
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
129
                        $this->sendMessage(new Ping());
×
130
                    }
131
                }
132
            }
133
            if ($message && $now < $max) {
124✔
UNCOV
134
                $this->logger?->debug('sleep', compact('max', 'now'));
1✔
UNCOV
135
                $this->config->delay($iteration++);
1✔
136
            }
137
        }
138

139
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
37✔
140
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
4✔
141
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
142
                    $this->processException(new LogicException('Socket read timeout'));
×
143
                }
144
            }
145
        }
146

147
        return null;
37✔
148
    }
149

150
    public function ping(): bool
52✔
151
    {
152
        $this->sendMessage(new Ping());
52✔
153
        $this->getMessage($this->config->timeout);
36✔
154

155
        return $this->pingAt <= $this->pongAt;
32✔
156
    }
157

158
    public function sendMessage(Message $message)
268✔
159
    {
160
        $this->init();
268✔
161

162
        $line = $message->render() . "\r\n";
268✔
163
        $length = strlen($line);
268✔
164
        $total = 0;
268✔
165

166
        $this->logger?->debug('send ' . $line);
268✔
167

168
        while ($total < $length) {
268✔
169
            try {
170
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
268✔
171
                if ($written === false) {
268✔
172
                    throw new LogicException('Error sending data');
×
173
                }
174
                if ($written === 0) {
268✔
175
                    throw new LogicException('Broken pipe or closed connection');
×
176
                }
177
                $total += $written;
268✔
178

179
                if ($length == $total) {
268✔
180
                    break;
268✔
181
                }
182
            } catch (Throwable $e) {
4✔
183
                $this->processException($e);
4✔
184
                $line = $message->render() . "\r\n";
4✔
185
            }
186
        }
187

188
        unset($line);
268✔
189

190
        if ($message instanceof Publish) {
268✔
191
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
268✔
192
                $prolongate = $message->payload->expires / 1_000_000_000;
68✔
193
                $this->prolongateTill = microtime(true) + $prolongate;
68✔
194
            }
195
        }
196
        if ($message instanceof Ping) {
268✔
197
            $this->pingAt = microtime(true);
36✔
198
        }
199
    }
200

201
    public function setLogger(?LoggerInterface $logger)
×
202
    {
203
        $this->logger = $logger;
×
204
    }
205

206
    public function setTimeout(float $value)
268✔
207
    {
208
        $this->init();
268✔
209
        $seconds = (int) floor($value);
268✔
210
        $microseconds = (int) (1000000 * ($value - $seconds));
268✔
211

212
        stream_set_timeout($this->socket, $seconds, $microseconds);
268✔
213
    }
214

215
    protected function init()
268✔
216
    {
217
        if ($this->socket) {
268✔
218
            return $this;
268✔
219
        }
220

221
        $config = $this->config;
268✔
222
        $dsn = "$config->host:$config->port";
268✔
223
        $flags = STREAM_CLIENT_CONNECT;
268✔
224
        $this->context = stream_context_create();
268✔
225
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
268✔
226

227
        if ($error || !$this->socket) {
268✔
228
            throw new Exception($errorMessage ?: "Connection error", $error);
4✔
229
        }
230

231
        $this->setTimeout($config->timeout);
268✔
232

233
        if ($config->tlsHandshakeFirst) {
268✔
234
            $this->enableTls(true);
×
235
        }
236

237
        $this->connectMessage = new Connect($config->getOptions());
268✔
238

239
        if ($this->client->getName()) {
268✔
240
            $this->connectMessage->name = $this->client->getName();
4✔
241
        }
242

243
        $this->infoMessage = $this->getMessage($config->timeout);
268✔
244
        assert($this->infoMessage instanceof Info);
245

246
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
268✔
247
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
4✔
248
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
4✔
249
        }
250

251
        $this->sendMessage($this->connectMessage);
268✔
252
    }
253

254
    protected function enableTls(bool $requireClientCert): void
16✔
255
    {
256
        if ($requireClientCert) {
16✔
257
            if (!empty($this->config->tlsKeyFile)) {
16✔
258
                if (!file_exists($this->config->tlsKeyFile)) {
12✔
259
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
4✔
260
                }
261
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
8✔
262
            }
263
            if (!empty($this->config->tlsCertFile)) {
12✔
264
                if (!file_exists($this->config->tlsCertFile)) {
8✔
265
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
4✔
266
                }
267
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
4✔
268
            }
269
        }
270

271
        if (!empty($this->config->tlsCaFile)) {
8✔
272
            if (!file_exists($this->config->tlsCaFile)) {
8✔
273
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
4✔
274
            }
275
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
4✔
276
        }
277

278
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
4✔
279
            throw new Exception('Failed to connect: Error enabling TLS');
×
280
        }
281
    }
282

283
    protected function getPayload(int $length): string
268✔
284
    {
285
        $payload = '';
268✔
286
        $iteration = 0;
268✔
287
        while (strlen($payload) < $length) {
268✔
288
            $payloadLine = stream_get_line($this->socket, $length, '');
268✔
289
            if (!$payloadLine) {
268✔
290
                if ($iteration > 16) {
×
291
                    break;
×
292
                }
293
                $this->config->delay($iteration++);
×
294
                continue;
×
295
            }
296
            if (strlen($payloadLine) != $length) {
268✔
297
                $this->logger?->debug(
×
298
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
299
                );
×
300
            }
301
            $payload .= $payloadLine;
268✔
302
        }
303
        return $payload;
268✔
304
    }
305

306
    private function processException(Throwable $e)
4✔
307
    {
308
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
4✔
309

310
        if (!$this->config->reconnect) {
4✔
311
            throw $e;
×
312
        }
313

314
        $iteration = 0;
4✔
315

316
        while (true) {
4✔
317
            try {
318
                $this->socket = null;
4✔
319
                $this->init();
4✔
320
            } catch (Throwable $e) {
×
321
                $this->config->delay($iteration++);
×
322
                continue;
×
323
            }
324
            break;
4✔
325
        }
326

327
        foreach ($this->client->getSubscriptions() as $subscription) {
4✔
328
            $this->sendMessage(new Subscribe([
4✔
329
                'sid' => $subscription['sid'],
4✔
330
                'subject' => $subscription['name'],
4✔
331
            ]));
4✔
332
        }
333

334
        if ($this->client->requestsSubscribed()) {
4✔
335
            $this->client->subscribeRequests(true);
4✔
336
        }
337
    }
338

339
    public function setPacketSize(int $size): void
4✔
340
    {
341
        $this->packetSize = $size;
4✔
342
    }
343

344
    public function close(): void
268✔
345
    {
346
        if ($this->socket) {
268✔
347
            fclose($this->socket);
268✔
348
            $this->socket = null;
268✔
349
        }
350
    }
351
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc