• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 24232858496

10 Apr 2026 07:57AM UTC coverage: 94.961%. Remained the same
24232858496

push

github

web-flow
Fix logging and improve type hints (#128)

* Declare struct types in Service
* Fix logger issue when logger is null
* Use strict comparison
* Add more return type-hint

36 of 37 new or added lines in 10 files covered. (97.3%)

3 existing lines in 1 file now uncovered.

1564 of 1647 relevant lines covered (94.96%)

84.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
304✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
304✔
43
        $this->config = $client->configuration;
304✔
44
    }
45

46
    public function getConnectMessage(): Connect
4✔
47
    {
48
        return $this->connectMessage;
4✔
49
    }
50

51
    public function getInfoMessage(): Info
8✔
52
    {
53
        return $this->infoMessage;
8✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
300✔
57
    {
58
        // null means use config timeout, 0 means non-blocking check
59
        if ($timeout === null) {
300✔
60
            $timeout = $this->config->timeout;
×
61
        }
62

63
        $now = microtime(true);
300✔
64
        $max = $timeout > 0 ? $now + $timeout : PHP_FLOAT_MAX;
300✔
65
        $iteration = 0;
300✔
66

67
        while (true) {
300✔
68
            if (!is_resource($this->socket) || feof($this->socket)) {
300✔
69
                throw new LogicException('supplied resource is not a valid stream resource');
4✔
70
            }
71

72
            $remainingTimeout = $max - microtime(true);
300✔
73
            if ($remainingTimeout <= 0) {
300✔
74
                break;
33✔
75
            }
76

77
            $read = [$this->socket];
300✔
78
            $write = null;
300✔
79
            $except = null;
300✔
80

81
            // Calculate timeout for stream_select
82
            if ($timeout === 0) {
300✔
83
                // Non-blocking check - use stream_select with 0 timeout to check if data is available
84
                $seconds = 0;
12✔
85
                $microseconds = 0;
12✔
86
            } else {
87
                $seconds = (int) floor($remainingTimeout);
300✔
88
                $microseconds = (int) (($remainingTimeout - $seconds) * 1_000_000);
300✔
89
            }
90

91
            $result = stream_select($read, $write, $except, $seconds, $microseconds);
300✔
92

93
            if ($result === false || $result === 0) {
300✔
94
                // For non-blocking check (timeout=0), exit immediately
95
                if ($timeout === 0) {
45✔
96
                    break;
12✔
97
                }
98
                // For blocking calls, continue waiting
99
                continue;
33✔
100
            }
101

102
            $message = null;
300✔
103
            $line = stream_get_line($this->socket, 1024, "\r\n");
300✔
104
            $now = microtime(true);
300✔
105
            if ($line) {
300✔
106
                $message = Factory::create($line);
300✔
107
                $this->activityAt = $now;
300✔
108
                if ($message instanceof Msg) {
300✔
109
                    $payload = $this->getPayload($message->length);
300✔
110
                    $message->parse($payload);
300✔
111
                    $message->setClient($this->client);
300✔
112
                    $this->logger?->debug('receive ' . $line . $payload);
300✔
113
                    return $message;
300✔
114
                }
115
                $this->logger?->debug('receive ' . $line);
300✔
116
                if ($message instanceof Ok) {
300✔
117
                    continue;
300✔
118
                } elseif ($message instanceof Ping) {
300✔
UNCOV
119
                    $this->sendMessage(new Pong([]));
1✔
120
                } elseif ($message instanceof Pong) {
300✔
121
                    $this->pongAt = $now;
32✔
122
                    return $message;
32✔
123
                } elseif ($message instanceof Info) {
300✔
124
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
300✔
125
                        $this->enableTls(true);
16✔
126
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
300✔
127
                        $this->enableTls(false);
×
128
                    }
129
                    return $message;
300✔
130
                }
131
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
156✔
132
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
28✔
133
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
134
                        $this->sendMessage(new Ping());
×
135
                    }
136
                }
137
            }
138
            if ($message && $now < $max) {
156✔
UNCOV
139
                $this->logger?->debug('sleep', compact('max', 'now'));
1✔
UNCOV
140
                $this->config->delay($iteration++);
1✔
141
            }
142
        }
143

144
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
45✔
145
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
4✔
146
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
147
                    $this->processException(new LogicException('Socket read timeout'));
×
148
                }
149
            }
150
        }
151

152
        return null;
45✔
153
    }
154

155
    public function ping(): bool
52✔
156
    {
157
        $this->sendMessage(new Ping());
52✔
158
        $this->getMessage($this->config->timeout);
36✔
159

160
        return $this->pingAt <= $this->pongAt;
32✔
161
    }
162

163
    public function sendMessage(Message $message): void
300✔
164
    {
165
        $this->init();
300✔
166

167
        $line = $message->render() . "\r\n";
300✔
168
        $length = strlen($line);
300✔
169
        $total = 0;
300✔
170

171
        $this->logger?->debug('send ' . $line);
300✔
172

173
        while ($total < $length) {
300✔
174
            try {
175
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
300✔
176
                if ($written === false) {
300✔
177
                    throw new LogicException('Error sending data');
×
178
                }
179
                if ($written === 0) {
300✔
180
                    throw new LogicException('Broken pipe or closed connection');
×
181
                }
182
                $total += $written;
300✔
183

184
                if ($length === $total) {
300✔
185
                    break;
300✔
186
                }
187
            } catch (Throwable $e) {
4✔
188
                $this->processException($e);
4✔
189
                $line = $message->render() . "\r\n";
4✔
190
            }
191
        }
192

193
        unset($line);
300✔
194

195
        if ($message instanceof Publish) {
300✔
196
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
300✔
197
                $prolongate = $message->payload->expires / 1_000_000_000;
68✔
198
                $this->prolongateTill = microtime(true) + $prolongate;
68✔
199
            }
200
        }
201
        if ($message instanceof Ping) {
300✔
202
            $this->pingAt = microtime(true);
36✔
203
        }
204
    }
205

NEW
206
    public function setLogger(?LoggerInterface $logger): void
×
207
    {
208
        $this->logger = $logger;
×
209
    }
210

211
    public function setTimeout(float $value): void
300✔
212
    {
213
        $this->init();
300✔
214
        $seconds = (int) floor($value);
300✔
215
        $microseconds = (int) (1000000 * ($value - $seconds));
300✔
216

217
        stream_set_timeout($this->socket, $seconds, $microseconds);
300✔
218
    }
219

220
    protected function init()
300✔
221
    {
222
        if ($this->socket) {
300✔
223
            return $this;
300✔
224
        }
225

226
        $config = $this->config;
300✔
227
        $dsn = "$config->host:$config->port";
300✔
228
        $flags = STREAM_CLIENT_CONNECT;
300✔
229
        $this->context = stream_context_create();
300✔
230
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
300✔
231

232
        if ($error || !$this->socket) {
300✔
233
            throw new Exception($errorMessage ?: "Connection error", $error);
4✔
234
        }
235

236
        $this->setTimeout($config->timeout);
300✔
237

238
        if ($config->tlsHandshakeFirst) {
300✔
239
            $this->enableTls(true);
×
240
        }
241

242
        $this->connectMessage = new Connect($config->getOptions());
300✔
243

244
        if ($this->client->getName()) {
300✔
245
            $this->connectMessage->name = $this->client->getName();
4✔
246
        }
247

248
        $this->infoMessage = $this->getMessage($config->timeout);
300✔
249
        assert($this->infoMessage instanceof Info);
250

251
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
300✔
252
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
4✔
253
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
4✔
254
        }
255

256
        $this->sendMessage($this->connectMessage);
300✔
257
    }
258

259
    protected function enableTls(bool $requireClientCert): void
16✔
260
    {
261
        if ($requireClientCert) {
16✔
262
            if (!empty($this->config->tlsKeyFile)) {
16✔
263
                if (!file_exists($this->config->tlsKeyFile)) {
12✔
264
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
4✔
265
                }
266
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
8✔
267
            }
268
            if (!empty($this->config->tlsCertFile)) {
12✔
269
                if (!file_exists($this->config->tlsCertFile)) {
8✔
270
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
4✔
271
                }
272
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
4✔
273
            }
274
        }
275

276
        if (!empty($this->config->tlsCaFile)) {
8✔
277
            if (!file_exists($this->config->tlsCaFile)) {
8✔
278
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
4✔
279
            }
280
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
4✔
281
        }
282

283
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
4✔
284
            throw new Exception('Failed to connect: Error enabling TLS');
×
285
        }
286
    }
287

288
    protected function getPayload(int $length): string
300✔
289
    {
290
        $payload = '';
300✔
291
        $iteration = 0;
300✔
292
        while (strlen($payload) < $length) {
300✔
293
            $payloadLine = stream_get_line($this->socket, $length, '');
300✔
294
            if (!$payloadLine) {
300✔
295
                if ($iteration > 16) {
×
296
                    break;
×
297
                }
298
                $this->config->delay($iteration++);
×
299
                continue;
×
300
            }
301
            if (strlen($payloadLine) !== $length) {
300✔
302
                $this->logger?->debug(
×
303
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
304
                );
×
305
            }
306
            $payload .= $payloadLine;
300✔
307
        }
308
        return $payload;
300✔
309
    }
310

311
    private function processException(Throwable $e): void
4✔
312
    {
313
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
4✔
314

315
        if (!$this->config->reconnect) {
4✔
316
            throw $e;
×
317
        }
318

319
        $iteration = 0;
4✔
320

321
        while (true) {
4✔
322
            try {
323
                $this->socket = null;
4✔
324
                $this->init();
4✔
325
            } catch (Throwable $e) {
×
326
                $this->config->delay($iteration++);
×
327
                continue;
×
328
            }
329
            break;
4✔
330
        }
331

332
        foreach ($this->client->getSubscriptions() as $subscription) {
4✔
333
            $this->sendMessage(new Subscribe([
4✔
334
                'sid' => $subscription['sid'],
4✔
335
                'subject' => $subscription['name'],
4✔
336
            ]));
4✔
337
        }
338

339
        if ($this->client->requestsSubscribed()) {
4✔
340
            $this->client->subscribeRequests(true);
4✔
341
        }
342
    }
343

344
    public function setPacketSize(int $size): void
4✔
345
    {
346
        $this->packetSize = $size;
4✔
347
    }
348

349
    public function close(): void
300✔
350
    {
351
        if ($this->socket) {
300✔
352
            fclose($this->socket);
300✔
353
            $this->socket = null;
300✔
354
        }
355
    }
356
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc