• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23202798826

17 Mar 2026 03:42PM UTC coverage: 95.05%. Remained the same
23202798826

push

github

web-flow
Update composer.json license field to be an SPDX Identifier (#126)

1421 of 1495 relevant lines covered (95.05%)

72.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.83
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
252✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
252✔
43
        $this->config = $client->configuration;
252✔
44
    }
45

46
    public function getConnectMessage(): Connect
4✔
47
    {
48
        return $this->connectMessage;
4✔
49
    }
50

51
    public function getInfoMessage(): Info
8✔
52
    {
53
        return $this->infoMessage;
8✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
248✔
57
    {
58
        $now = microtime(true);
248✔
59
        $max = $now + $timeout;
248✔
60
        $iteration = 0;
248✔
61

62
        while (true) {
248✔
63
            if (!is_resource($this->socket) || feof($this->socket)) {
248✔
64
                throw new LogicException('supplied resource is not a valid stream resource');
4✔
65
            }
66
            $message = null;
248✔
67
            $line = stream_get_line($this->socket, 1024, "\r\n");
248✔
68
            $now = microtime(true);
248✔
69
            if ($line) {
248✔
70
                $message = Factory::create($line);
248✔
71
                $this->activityAt = $now;
248✔
72
                if ($message instanceof Msg) {
248✔
73
                    $payload = $this->getPayload($message->length);
248✔
74
                    $message->parse($payload);
248✔
75
                    $message->setClient($this->client);
248✔
76
                    $this->logger?->debug('receive ' . $line . $payload);
248✔
77
                    return $message;
248✔
78
                }
79
                $this->logger?->debug('receive ' . $line);
248✔
80
                if ($message instanceof Ok) {
248✔
81
                    continue;
248✔
82
                } elseif ($message instanceof Ping) {
248✔
83
                    $this->sendMessage(new Pong([]));
1✔
84
                } elseif ($message instanceof Pong) {
248✔
85
                    $this->pongAt = $now;
28✔
86
                } elseif ($message instanceof Info) {
248✔
87
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
248✔
88
                        $this->enableTls(true);
16✔
89
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
248✔
90
                        $this->enableTls(false);
×
91
                    }
92
                    return $message;
248✔
93
                }
94
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
248✔
95
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
31✔
96
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
97
                        $this->sendMessage(new Ping());
×
98
                    }
99
                }
100
            }
101
            if ($now > $max) {
248✔
102
                break;
57✔
103
            }
104
            if ($message && $now < $max) {
248✔
105
                $this->logger?->debug('sleep', compact('max', 'now'));
29✔
106
                $this->config->delay($iteration++);
29✔
107
            }
108
        }
109

110
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
57✔
111
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
7✔
112
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
113
                    $this->processException(new LogicException('Socket read timeout'));
×
114
                }
115
            }
116
        }
117

118
        return null;
57✔
119
    }
120

121
    public function ping(): bool
48✔
122
    {
123
        $this->sendMessage(new Ping());
48✔
124
        $this->getMessage($this->config->timeout);
32✔
125

126
        return $this->pingAt <= $this->pongAt;
28✔
127
    }
128

129
    public function sendMessage(Message $message)
248✔
130
    {
131
        $this->init();
248✔
132

133
        $line = $message->render() . "\r\n";
248✔
134
        $length = strlen($line);
248✔
135
        $total = 0;
248✔
136

137
        $this->logger?->debug('send ' . $line);
248✔
138

139
        while ($total < $length) {
248✔
140
            try {
141
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
248✔
142
                if ($written === false) {
248✔
143
                    throw new LogicException('Error sending data');
×
144
                }
145
                if ($written === 0) {
248✔
146
                    throw new LogicException('Broken pipe or closed connection');
×
147
                }
148
                $total += $written;
248✔
149

150
                if ($length == $total) {
248✔
151
                    break;
248✔
152
                }
153
            } catch (Throwable $e) {
4✔
154
                $this->processException($e);
4✔
155
                $line = $message->render() . "\r\n";
4✔
156
            }
157
        }
158

159
        unset($line);
248✔
160

161
        if ($message instanceof Publish) {
248✔
162
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
248✔
163
                $prolongate = $message->payload->expires / 1_000_000_000;
52✔
164
                $this->prolongateTill = microtime(true) + $prolongate;
52✔
165
            }
166
        }
167
        if ($message instanceof Ping) {
248✔
168
            $this->pingAt = microtime(true);
32✔
169
        }
170
    }
171

172
    public function setLogger(?LoggerInterface $logger)
×
173
    {
174
        $this->logger = $logger;
×
175
    }
176

177
    public function setTimeout(float $value)
248✔
178
    {
179
        $this->init();
248✔
180
        $seconds = (int) floor($value);
248✔
181
        $milliseconds = (int) (1000 * ($value - $seconds));
248✔
182

183
        stream_set_timeout($this->socket, $seconds, $milliseconds);
248✔
184
    }
185

186
    protected function init()
248✔
187
    {
188
        if ($this->socket) {
248✔
189
            return $this;
248✔
190
        }
191

192
        $config = $this->config;
248✔
193
        $dsn = "$config->host:$config->port";
248✔
194
        $flags = STREAM_CLIENT_CONNECT;
248✔
195
        $this->context = stream_context_create();
248✔
196
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
248✔
197

198
        if ($error || !$this->socket) {
248✔
199
            throw new Exception($errorMessage ?: "Connection error", $error);
4✔
200
        }
201

202
        $this->setTimeout($config->timeout);
248✔
203

204
        if ($config->tlsHandshakeFirst) {
248✔
205
            $this->enableTls(true);
×
206
        }
207

208
        $this->connectMessage = new Connect($config->getOptions());
248✔
209

210
        if ($this->client->getName()) {
248✔
211
            $this->connectMessage->name = $this->client->getName();
4✔
212
        }
213

214
        $this->infoMessage = $this->getMessage($config->timeout);
248✔
215
        assert($this->infoMessage instanceof Info);
216

217
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
248✔
218
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
4✔
219
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
4✔
220
        }
221

222
        $this->sendMessage($this->connectMessage);
248✔
223
    }
224

225
    protected function enableTls(bool $requireClientCert): void
16✔
226
    {
227
        if ($requireClientCert) {
16✔
228
            if (!empty($this->config->tlsKeyFile)) {
16✔
229
                if (!file_exists($this->config->tlsKeyFile)) {
12✔
230
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
4✔
231
                }
232
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
8✔
233
            }
234
            if (!empty($this->config->tlsCertFile)) {
12✔
235
                if (!file_exists($this->config->tlsCertFile)) {
8✔
236
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
4✔
237
                }
238
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
4✔
239
            }
240
        }
241

242
        if (!empty($this->config->tlsCaFile)) {
8✔
243
            if (!file_exists($this->config->tlsCaFile)) {
8✔
244
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
4✔
245
            }
246
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
4✔
247
        }
248

249
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
4✔
250
            throw new Exception('Failed to connect: Error enabling TLS');
×
251
        }
252
    }
253

254
    protected function getPayload(int $length): string
248✔
255
    {
256
        $payload = '';
248✔
257
        $iteration = 0;
248✔
258
        while (strlen($payload) < $length) {
248✔
259
            $payloadLine = stream_get_line($this->socket, $length, '');
248✔
260
            if (!$payloadLine) {
248✔
261
                if ($iteration > 16) {
×
262
                    break;
×
263
                }
264
                $this->config->delay($iteration++);
×
265
                continue;
×
266
            }
267
            if (strlen($payloadLine) != $length) {
248✔
268
                $this->logger?->debug(
×
269
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
270
                );
×
271
            }
272
            $payload .= $payloadLine;
248✔
273
        }
274
        return $payload;
248✔
275
    }
276

277
    private function processException(Throwable $e)
4✔
278
    {
279
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
4✔
280

281
        if (!$this->config->reconnect) {
4✔
282
            throw $e;
×
283
        }
284

285
        $iteration = 0;
4✔
286

287
        while (true) {
4✔
288
            try {
289
                $this->socket = null;
4✔
290
                $this->init();
4✔
291
            } catch (Throwable $e) {
×
292
                $this->config->delay($iteration++);
×
293
                continue;
×
294
            }
295
            break;
4✔
296
        }
297

298
        foreach ($this->client->getSubscriptions() as $subscription) {
4✔
299
            $this->sendMessage(new Subscribe([
4✔
300
                'sid' => $subscription['sid'],
4✔
301
                'subject' => $subscription['name'],
4✔
302
            ]));
4✔
303
        }
304

305
        if ($this->client->requestsSubscribed()) {
4✔
306
            $this->client->subscribeRequests(true);
4✔
307
        }
308
    }
309

310
    public function setPacketSize(int $size): void
4✔
311
    {
312
        $this->packetSize = $size;
4✔
313
    }
314

315
    public function close(): void
248✔
316
    {
317
        if ($this->socket) {
248✔
318
            fclose($this->socket);
248✔
319
            $this->socket = null;
248✔
320
        }
321
    }
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc