• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 14576076578

21 Apr 2025 03:23PM UTC coverage: 94.815% (+0.02%) from 94.798%
14576076578

push

github

web-flow
one subscription for all requests [#106]

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Use one subscription for all requests

* Use one subscription for all requests

* Small process method improvement

41 of 44 new or added lines in 1 file covered. (93.18%)

1 existing line in 1 file now uncovered.

1408 of 1485 relevant lines covered (94.81%)

17.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.28
/src/Connection.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Connect;
8
use Basis\Nats\Message\Factory;
9
use Basis\Nats\Message\Info;
10
use Basis\Nats\Message\Msg;
11
use Basis\Nats\Message\Ok;
12
use Basis\Nats\Message\Ping;
13
use Basis\Nats\Message\Publish;
14
use Basis\Nats\Message\Pong;
15
use Basis\Nats\Message\Prototype as Message;
16
use Basis\Nats\Message\Subscribe;
17
use LogicException;
18
use Psr\Log\LoggerInterface;
19
use Throwable;
20
use Exception;
21

22
class Connection
23
{
24
    private $socket;
25
    private $context;
26

27
    private float $activityAt = 0;
28
    private float $pingAt = 0;
29
    private float $pongAt = 0;
30
    private float $prolongateTill = 0;
31
    private int $packetSize = 1024;
32

33
    private ?Authenticator $authenticator;
34
    private Configuration $config;
35
    private Connect $connectMessage;
36
    private Info $infoMessage;
37

38
    public function __construct(
62✔
39
        private Client $client,
40
        public ?LoggerInterface $logger = null,
41
    ) {
42
        $this->authenticator = Authenticator::create($client->configuration);
62✔
43
        $this->config = $client->configuration;
62✔
44
    }
45

46
    public function getConnectMessage(): Connect
1✔
47
    {
48
        return $this->connectMessage;
1✔
49
    }
50

51
    public function getInfoMessage(): Info
2✔
52
    {
53
        return $this->infoMessage;
2✔
54
    }
55

56
    public function getMessage(null|int|float $timeout = 0): ?Message
61✔
57
    {
58
        $now = microtime(true);
61✔
59
        $max = $now + $timeout;
61✔
60
        $iteration = 0;
61✔
61

62
        while (true) {
61✔
63
            $message = null;
61✔
64
            $line = stream_get_line($this->socket, 1024, "\r\n");
61✔
65
            $now = microtime(true);
61✔
66
            if ($line) {
61✔
67
                $message = Factory::create($line);
61✔
68
                $this->activityAt = $now;
61✔
69
                if ($message instanceof Msg) {
61✔
70
                    $payload = $this->getPayload($message->length);
61✔
71
                    $message->parse($payload);
61✔
72
                    $message->setClient($this->client);
61✔
73
                    $this->logger?->debug('receive ' . $line . $payload);
61✔
74
                    return $message;
61✔
75
                }
76
                $this->logger?->debug('receive ' . $line);
61✔
77
                if ($message instanceof Ok) {
61✔
78
                    continue;
×
79
                } elseif ($message instanceof Ping) {
61✔
UNCOV
80
                    $this->sendMessage(new Pong([]));
×
81
                } elseif ($message instanceof Pong) {
61✔
82
                    $this->pongAt = $now;
7✔
83
                } elseif ($message instanceof Info) {
61✔
84
                    if (isset($message->tls_verify) && $message->tls_verify && !$this->config->tlsHandshakeFirst) {
61✔
85
                        $this->enableTls(true);
4✔
86
                    } elseif (isset($message->tls_required) && $message->tls_required && !$this->config->tlsHandshakeFirst) {
61✔
87
                        $this->enableTls(false);
×
88
                    }
89
                    return $message;
61✔
90
                }
91
            } elseif ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
61✔
92
                if ($this->pingAt && $this->pingAt + $this->config->pingInterval < $now) {
10✔
93
                    if ($this->prolongateTill && $this->prolongateTill < $now) {
×
94
                        $this->sendMessage(new Ping());
×
95
                    }
96
                }
97
            }
98
            if ($now > $max) {
61✔
99
                break;
14✔
100
            }
101
            if ($message && $now < $max) {
61✔
102
                $this->logger?->debug('sleep', compact('max', 'now'));
7✔
103
                $this->config->delay($iteration++);
7✔
104
            }
105
        }
106

107
        if ($this->activityAt && $this->activityAt + $this->config->timeout < $now) {
14✔
108
            if ($this->pongAt && $this->pongAt + $this->config->pingInterval < $now) {
4✔
109
                if ($this->prolongateTill && $this->prolongateTill < $now) {
×
110
                    $this->processException(new LogicException('Socket read timeout'));
×
111
                }
112
            }
113
        }
114

115
        return null;
14✔
116
    }
117

118
    public function ping(): bool
12✔
119
    {
120
        $this->sendMessage(new Ping());
12✔
121
        $this->getMessage($this->config->timeout);
8✔
122

123
        return $this->pingAt <= $this->pongAt;
7✔
124
    }
125

126
    public function sendMessage(Message $message)
61✔
127
    {
128
        $this->init();
61✔
129

130
        $line = $message->render() . "\r\n";
61✔
131
        $length = strlen($line);
61✔
132
        $total = 0;
61✔
133

134
        $this->logger?->debug('send ' . $line);
61✔
135

136
        while ($total < $length) {
61✔
137
            try {
138
                $written = @fwrite($this->socket, substr($line, $total, $this->packetSize));
61✔
139
                if ($written === false) {
61✔
140
                    throw new LogicException('Error sending data');
×
141
                }
142
                if ($written === 0) {
61✔
143
                    throw new LogicException('Broken pipe or closed connection');
×
144
                }
145
                $total += $written;
61✔
146

147
                if ($length == $total) {
61✔
148
                    break;
61✔
149
                }
150
            } catch (Throwable $e) {
1✔
151
                $this->processException($e);
1✔
152
                $line = $message->render() . "\r\n";
1✔
153
            }
154
        }
155

156
        unset($line);
61✔
157

158
        if ($message instanceof Publish) {
61✔
159
            if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) {
61✔
160
                $prolongate = $message->payload->expires / 1_000_000_000;
12✔
161
                $this->prolongateTill = microtime(true) + $prolongate;
12✔
162
            }
163
        }
164
        if ($message instanceof Ping) {
61✔
165
            $this->pingAt = microtime(true);
8✔
166
        }
167
    }
168

169
    public function setLogger(?LoggerInterface $logger)
×
170
    {
171
        $this->logger = $logger;
×
172
    }
173

174
    public function setTimeout(float $value)
61✔
175
    {
176
        $this->init();
61✔
177
        $seconds = (int) floor($value);
61✔
178
        $milliseconds = (int) (1000 * ($value - $seconds));
61✔
179

180
        stream_set_timeout($this->socket, $seconds, $milliseconds);
61✔
181
    }
182

183
    protected function init()
61✔
184
    {
185
        if ($this->socket) {
61✔
186
            return $this;
61✔
187
        }
188

189
        $config = $this->config;
61✔
190
        $dsn = "$config->host:$config->port";
61✔
191
        $flags = STREAM_CLIENT_CONNECT;
61✔
192
        $this->context = stream_context_create();
61✔
193
        $this->socket = @stream_socket_client($dsn, $error, $errorMessage, $config->timeout, $flags, $this->context);
61✔
194

195
        if ($error || !$this->socket) {
61✔
196
            throw new Exception($errorMessage ?: "Connection error", $error);
1✔
197
        }
198

199
        $this->setTimeout($config->timeout);
61✔
200

201
        if ($config->tlsHandshakeFirst) {
61✔
202
            $this->enableTls(true);
×
203
        }
204

205
        $this->connectMessage = new Connect($config->getOptions());
61✔
206

207
        if ($this->client->getName()) {
61✔
208
            $this->connectMessage->name = $this->client->getName();
1✔
209
        }
210

211
        $this->infoMessage = $this->getMessage($config->timeout);
61✔
212
        assert($this->infoMessage instanceof Info);
213

214
        if (isset($this->infoMessage->nonce) && $this->authenticator) {
61✔
215
            $this->connectMessage->sig = $this->authenticator->sign($this->infoMessage->nonce);
1✔
216
            $this->connectMessage->nkey = $this->authenticator->getPublicKey();
1✔
217
        }
218

219
        $this->sendMessage($this->connectMessage);
61✔
220
    }
221

222
    protected function enableTls(bool $requireClientCert): void
4✔
223
    {
224
        if ($requireClientCert) {
4✔
225
            if (!empty($this->config->tlsKeyFile)) {
4✔
226
                if (!file_exists($this->config->tlsKeyFile)) {
3✔
227
                    throw new Exception("tlsKeyFile file does not exist: " . $this->config->tlsKeyFile);
1✔
228
                }
229
                stream_context_set_option($this->context, 'ssl', 'local_pk', $this->config->tlsKeyFile);
2✔
230
            }
231
            if (!empty($this->config->tlsCertFile)) {
3✔
232
                if (!file_exists($this->config->tlsCertFile)) {
2✔
233
                    throw new Exception("tlsCertFile file does not exist: " . $this->config->tlsCertFile);
1✔
234
                }
235
                stream_context_set_option($this->context, 'ssl', 'local_cert', $this->config->tlsCertFile);
1✔
236
            }
237
        }
238

239
        if (!empty($this->config->tlsCaFile)) {
2✔
240
            if (!file_exists($this->config->tlsCaFile)) {
2✔
241
                throw new Exception("tlsCaFile file does not exist: " . $this->config->tlsCaFile);
1✔
242
            }
243
            stream_context_set_option($this->context, 'ssl', 'cafile', $this->config->tlsCaFile);
1✔
244
        }
245

246
        if (!stream_socket_enable_crypto($this->socket, true, STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT)) {
1✔
247
            throw new Exception('Failed to connect: Error enabling TLS');
×
248
        }
249
    }
250

251
    protected function getPayload(int $length): string
61✔
252
    {
253
        $payload = '';
61✔
254
        $iteration = 0;
61✔
255
        while (strlen($payload) < $length) {
61✔
256
            $payloadLine = stream_get_line($this->socket, $length, '');
61✔
257
            if (!$payloadLine) {
61✔
258
                if ($iteration > 16) {
×
259
                    break;
×
260
                }
261
                $this->config->delay($iteration++);
×
262
                continue;
×
263
            }
264
            if (strlen($payloadLine) != $length) {
61✔
265
                $this->logger?->debug(
×
266
                    'got ' . strlen($payloadLine) . '/' . $length . ': ' . $payloadLine
×
267
                );
×
268
            }
269
            $payload .= $payloadLine;
61✔
270
        }
271
        return $payload;
61✔
272
    }
273

274
    private function processException(Throwable $e)
1✔
275
    {
276
        $this->logger?->error($e->getMessage(), ['exception' => $e]);
1✔
277

278
        if (!$this->config->reconnect) {
1✔
279
            throw $e;
×
280
        }
281

282
        $iteration = 0;
1✔
283

284
        while (true) {
1✔
285
            try {
286
                $this->socket = null;
1✔
287
                $this->init();
1✔
288
            } catch (Throwable $e) {
×
289
                $this->config->delay($iteration++);
×
290
                continue;
×
291
            }
292
            break;
1✔
293
        }
294

295
        foreach ($this->client->getSubscriptions() as $subscription) {
1✔
296
            $this->sendMessage(new Subscribe([
1✔
297
                'sid' => $subscription['sid'],
1✔
298
                'subject' => $subscription['name'],
1✔
299
            ]));
1✔
300
        }
301
    }
302

303
    public function setPacketSize(int $size): void
1✔
304
    {
305
        $this->packetSize = $size;
1✔
306
    }
307

308
    public function close(): void
61✔
309
    {
310
        if ($this->socket) {
61✔
311
            fclose($this->socket);
61✔
312
            $this->socket = null;
61✔
313
        }
314
    }
315
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc