• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23217248396

17 Mar 2026 09:28PM UTC coverage: 95.082% (+0.03%) from 95.05%
23217248396

push

github

nekufa
connection get message default timeout

7 of 8 new or added lines in 1 file covered. (87.5%)

33 existing lines in 3 files now uncovered.

1450 of 1525 relevant lines covered (95.08%)

77.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.74
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
264✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
264✔
43
        $this->config = $client->configuration;
264✔
44
    }
45

46
    public function getConnectMessage(): Connect
4✔
47
    {
48
        return $this->connectMessage;
4✔
49
    }
50

51
    public function getInfoMessage(): Info
8✔
52
    {
53
        return $this->infoMessage;
8✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
260✔
57
    {
58
        // null means use config timeout, 0 means non-blocking check
59
        if ($timeout === null) {
260✔
NEW
60
            $timeout = $this->config->timeout;
×
61
        }
62
        
63
        $now = microtime(true);
260✔
64
        $max = $timeout > 0 ? $now + $timeout : PHP_FLOAT_MAX;
260✔
65
        $iteration = 0;
260✔
66

67
        while (true) {
260✔
68
            if (!is_resource($this->socket) || feof($this->socket)) {
260✔
69
                throw new LogicException('supplied resource is not a valid stream resource');
4✔
70
            }
71

72
            $remainingTimeout = $max - microtime(true);
260✔
73
            if ($remainingTimeout <= 0) {
260✔
UNCOV
74
                break;
×
75
            }
76

77
            $read = [$this->socket];
260✔
78
            $write = null;
260✔
79
            $except = null;
260✔
80

81
            // Calculate timeout for stream_select
82
            if ($timeout == 0) {
260✔
83
                // Non-blocking check - just poll once
84
                $seconds = 0;
12✔
85
                $microseconds = 0;
12✔
86
            } else {
87
                $seconds = (int) floor($remainingTimeout);
260✔
88
                $microseconds = (int) (($remainingTimeout - $seconds) * 1_000_000);
260✔
89
            }
90

91
            $result = stream_select($read, $write, $except, $seconds, $microseconds);
260✔
92

93
            if ($result === false || $result === 0) {
260✔
94
                break;
69✔
95
            }
96

97
            $message = null;
260✔
98
            $line = stream_get_line($this->socket, 1024, "\r\n");
260✔
99
            $now = microtime(true);
260✔
100
            if ($line) {
260✔
101
                $message = Factory::create($line);
260✔
102
                $this->activityAt = $now;
260✔
103
                if ($message instanceof Msg) {
260✔
104
                    $payload = $this->getPayload($message->length);
260✔
105
                    $message->parse($payload);
260✔
106
                    $message->setClient($this->client);
260✔
107
                    $this->logger?->debug('receive ' . $line . $payload);
260✔
108
                    return $message;
260✔
109
                }
110
                $this->logger?->debug('receive ' . $line);
260✔
111
                if ($message instanceof Ok) {
260✔
112
                    continue;
260✔
113
                } elseif ($message instanceof Ping) {
260✔
UNCOV
114
                    $this->sendMessage(new Pong([]));
1✔
115
                } elseif ($message instanceof Pong) {
260✔
116
                    $this->pongAt = $now;
28✔
117
                } elseif ($message instanceof Info) {
260✔
118
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
260✔
119
                        $this->enableTls(true);
16✔
120
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
260✔
UNCOV
121
                        $this->enableTls(false);
×
122
                    }
123
                    return $message;
260✔
124
                }
125
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
120✔
126
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
28✔
UNCOV
127
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
UNCOV
128
                        $this->sendMessage(new Ping());
×
129
                    }
130
                }
131
            }
132
            if ($message && $now < $max) {
144✔
133
                $this->logger?->debug('sleep', compact('max', 'now'));
29✔
134
                $this->config->delay($iteration++);
29✔
135
            }
136
        }
137

138
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
69✔
139
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
4✔
UNCOV
140
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
UNCOV
141
                    $this->processException(new LogicException('Socket read timeout'));
×
142
                }
143
            }
144
        }
145

146
        return null;
69✔
147
    }
148

149
    public function ping(): bool
48✔
150
    {
151
        $this->sendMessage(new Ping());
48✔
152
        $this->getMessage($this->config->timeout);
32✔
153

154
        return $this->pingAt <= $this->pongAt;
28✔
155
    }
156

157
    public function sendMessage(Message $message)
260✔
158
    {
159
        $this->init();
260✔
160

161
        $line = $message->render() . "\r\n";
260✔
162
        $length = strlen($line);
260✔
163
        $total = 0;
260✔
164

165
        $this->logger?->debug('send ' . $line);
260✔
166

167
        while ($total < $length) {
260✔
168
            try {
169
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
260✔
170
                if ($written === false) {
260✔
UNCOV
171
                    throw new LogicException('Error sending data');
×
172
                }
173
                if ($written === 0) {
260✔
UNCOV
174
                    throw new LogicException('Broken pipe or closed connection');
×
175
                }
176
                $total += $written;
260✔
177

178
                if ($length == $total) {
260✔
179
                    break;
260✔
180
                }
181
            } catch (Throwable $e) {
4✔
182
                $this->processException($e);
4✔
183
                $line = $message->render() . "\r\n";
4✔
184
            }
185
        }
186

187
        unset($line);
260✔
188

189
        if ($message instanceof Publish) {
260✔
190
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
260✔
191
                $prolongate = $message->payload->expires / 1_000_000_000;
64✔
192
                $this->prolongateTill = microtime(true) + $prolongate;
64✔
193
            }
194
        }
195
        if ($message instanceof Ping) {
260✔
196
            $this->pingAt = microtime(true);
32✔
197
        }
198
    }
199

UNCOV
200
    public function setLogger(?LoggerInterface $logger)
×
201
    {
UNCOV
202
        $this->logger = $logger;
×
203
    }
204

205
    public function setTimeout(float $value)
260✔
206
    {
207
        $this->init();
260✔
208
        $seconds = (int) floor($value);
260✔
209
        $microseconds = (int) (1000000 * ($value - $seconds));
260✔
210

211
        stream_set_timeout($this->socket, $seconds, $microseconds);
260✔
212
    }
213

214
    protected function init()
260✔
215
    {
216
        if ($this->socket) {
260✔
217
            return $this;
260✔
218
        }
219

220
        $config = $this->config;
260✔
221
        $dsn = "$config->host:$config->port";
260✔
222
        $flags = STREAM_CLIENT_CONNECT;
260✔
223
        $this->context = stream_context_create();
260✔
224
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
260✔
225

226
        if ($error || !$this->socket) {
260✔
227
            throw new Exception($errorMessage ?: "Connection error", $error);
4✔
228
        }
229

230
        $this->setTimeout($config->timeout);
260✔
231

232
        if ($config->tlsHandshakeFirst) {
260✔
UNCOV
233
            $this->enableTls(true);
×
234
        }
235

236
        $this->connectMessage = new Connect($config->getOptions());
260✔
237

238
        if ($this->client->getName()) {
260✔
239
            $this->connectMessage->name = $this->client->getName();
4✔
240
        }
241

242
        $this->infoMessage = $this->getMessage($config->timeout);
260✔
243
        assert($this->infoMessage instanceof Info);
244

245
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
260✔
246
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
4✔
247
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
4✔
248
        }
249

250
        $this->sendMessage($this->connectMessage);
260✔
251
    }
252

253
    protected function enableTls(bool $requireClientCert): void
16✔
254
    {
255
        if ($requireClientCert) {
16✔
256
            if (!empty($this->config->tlsKeyFile)) {
16✔
257
                if (!file_exists($this->config->tlsKeyFile)) {
12✔
258
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
4✔
259
                }
260
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
8✔
261
            }
262
            if (!empty($this->config->tlsCertFile)) {
12✔
263
                if (!file_exists($this->config->tlsCertFile)) {
8✔
264
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
4✔
265
                }
266
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
4✔
267
            }
268
        }
269

270
        if (!empty($this->config->tlsCaFile)) {
8✔
271
            if (!file_exists($this->config->tlsCaFile)) {
8✔
272
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
4✔
273
            }
274
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
4✔
275
        }
276

277
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
4✔
278
            throw new Exception('Failed to connect: Error enabling TLS');
×
279
        }
280
    }
281

282
    protected function getPayload(int $length): string
260✔
283
    {
284
        $payload = '';
260✔
285
        $iteration = 0;
260✔
286
        while (strlen($payload) < $length) {
260✔
287
            $payloadLine = stream_get_line($this->socket, $length, '');
260✔
288
            if (!$payloadLine) {
260✔
UNCOV
289
                if ($iteration > 16) {
×
UNCOV
290
                    break;
×
291
                }
UNCOV
292
                $this->config->delay($iteration++);
×
UNCOV
293
                continue;
×
294
            }
295
            if (strlen($payloadLine) != $length) {
260✔
UNCOV
296
                $this->logger?->debug(
×
UNCOV
297
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
UNCOV
298
                );
×
299
            }
300
            $payload .= $payloadLine;
260✔
301
        }
302
        return $payload;
260✔
303
    }
304

305
    private function processException(Throwable $e)
4✔
306
    {
307
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
4✔
308

309
        if (!$this->config->reconnect) {
4✔
UNCOV
310
            throw $e;
×
311
        }
312

313
        $iteration = 0;
4✔
314

315
        while (true) {
4✔
316
            try {
317
                $this->socket = null;
4✔
318
                $this->init();
4✔
UNCOV
319
            } catch (Throwable $e) {
×
UNCOV
320
                $this->config->delay($iteration++);
×
UNCOV
321
                continue;
×
322
            }
323
            break;
4✔
324
        }
325

326
        foreach ($this->client->getSubscriptions() as $subscription) {
4✔
327
            $this->sendMessage(new Subscribe([
4✔
328
                'sid' => $subscription['sid'],
4✔
329
                'subject' => $subscription['name'],
4✔
330
            ]));
4✔
331
        }
332

333
        if ($this->client->requestsSubscribed()) {
4✔
334
            $this->client->subscribeRequests(true);
4✔
335
        }
336
    }
337

338
    public function setPacketSize(int $size): void
4✔
339
    {
340
        $this->packetSize = $size;
4✔
341
    }
342

343
    public function close(): void
260✔
344
    {
345
        if ($this->socket) {
260✔
346
            fclose($this->socket);
260✔
347
            $this->socket = null;
260✔
348
        }
349
    }
350
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc