• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 14576076578

21 Apr 2025 03:23PM UTC coverage: 94.815% (+0.02%) from 94.798%
14576076578

push

github

web-flow
one subscription for all requests [#106]

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Only one subscription for all requests

* Use one subscription for all requests

* Use one subscription for all requests

* Small process method improvement

41 of 44 new or added lines in 1 file covered. (93.18%)

1 existing line in 1 file now uncovered.

1408 of 1485 relevant lines covered (94.81%)

17.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.19
/src/Client.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Msg;
8
use Basis\Nats\Message\Payload;
9
use Basis\Nats\Message\Publish;
10
use Basis\Nats\Message\Subscribe;
11
use Basis\Nats\Message\Unsubscribe;
12
use Basis\Nats\Service\Service;
13
use Exception;
14
use LogicException;
15
use Psr\Log\LoggerInterface;
16

17
class Client
18
{
19
    public readonly Api $api;
20

21
    private string $name = '';
22

23
    /** @var array<callable|Queue> */
24
    private array $handlers = [];
25
    private array $subscriptions = [];
26

27
    private array $services = [];
28

29
    private bool $skipInvalidMessages = false;
30

31
    private string $requestsSubject = '';
32

33
    private bool $requestsSubscribed = false;
34

35
    private int $nextRid = 0;
36

37
    private string $requestsSid = '';
38

39
    public function __construct(
62✔
40
        public readonly Configuration $configuration = new Configuration(),
41
        public ?LoggerInterface $logger = null,
42
        public ?Connection $connection = null,
43
    ) {
44
        $this->api = new Api($this);
62✔
45
        if (!$connection) {
62✔
46
            $this->connection = new Connection(client: $this, logger: $logger);
62✔
47
        }
48

49
        $this->requestsSubject = '_REQS' . bin2hex(random_bytes(16));
62✔
50
        $this->requestsSid = '_REQS' . $this->getnextRid();
62✔
51
    }
52

53
    public function api($command, array $args = [], ?callable $callback = null): ?object
61✔
54
    {
55
        $subject = "\$JS.API.$command";
61✔
56
        $options = json_encode((object)$args);
61✔
57

58
        if ($callback) {
61✔
59
            return $this->request($subject, $options, $callback);
×
60
        }
61

62
        $result = $this->dispatch($subject, $options);
61✔
63

64
        if ($result->error ?? false) {
61✔
65
            throw new Exception($result->error->description, $result->error->err_code);
1✔
66
        }
67

68
        if (!$result) {
61✔
69
            return null;
×
70
        }
71

72
        return $result;
61✔
73
    }
74

75
    public function dispatch(string $name, mixed $payload, ?float $timeout = null)
61✔
76
    {
77
        if ($timeout === null) {
61✔
78
            $timeout = $this->configuration->timeout;
61✔
79
        }
80

81
        $context = (object)[
61✔
82
            'processed' => false,
61✔
83
            'result' => null,
61✔
84
            'threshold' => microtime(true) + $timeout,
61✔
85
        ];
61✔
86

87
        $this->request($name, $payload, function ($result) use ($context) {
61✔
88
            $context->processed = true;
61✔
89
            $context->result = $result;
61✔
90
        });
61✔
91

92
        while (!$context->processed && microtime(true) < $context->threshold) {
61✔
93
            $this->process();
1✔
94
        }
95

96
        if (!$context->processed) {
61✔
97
            throw new LogicException("Processing timeout");
×
98
        }
99

100
        return $context->result;
61✔
101
    }
102

103
    public function getApi(): Api
61✔
104
    {
105
        return $this->api;
61✔
106
    }
107

108
    public function ping(): bool
12✔
109
    {
110
        return $this->connection->ping();
12✔
111
    }
112

113
    public function publish(string $name, mixed $payload, ?string $replyTo = null): self
61✔
114
    {
115
        $this->connection->sendMessage(new Publish([
61✔
116
            'payload' => Payload::parse($payload),
61✔
117
            'replyTo' => $replyTo,
61✔
118
            'subject' => $name,
61✔
119
        ]));
61✔
120

121
        return $this;
61✔
122
    }
123

124
    public function request(string $name, mixed $payload, callable $handler): self
61✔
125
    {
126
        $this->subscribeRequests();
61✔
127

128
        $replyTo = $this->getNextReplyTo();
61✔
129

130
        $this->handlers[$replyTo] = $handler;
61✔
131

132
        $this->publish($name, $payload, $replyTo);
61✔
133
        $this->process($this->configuration->timeout);
61✔
134

135
        return $this;
61✔
136
    }
137

138
    public function subscribe(string $name, ?callable $handler = null): self|Queue
30✔
139
    {
140
        return $this->doSubscribe($name, null, $handler);
30✔
141
    }
142

143
    public function subscribeQueue(string $name, string $group, ?callable $handler = null): self|Queue
10✔
144
    {
145
        return $this->doSubscribe($name, $group, $handler);
10✔
146
    }
147

148
    public function unsubscribe(string|Queue $name): self
13✔
149
    {
150
        if ($name instanceof Queue) {
13✔
151
            $name = $name->subject;
12✔
152
        }
153
        foreach ($this->subscriptions as $i => $subscription) {
13✔
154
            if ($subscription['name'] == $name) {
13✔
155
                unset($this->subscriptions[$i]);
13✔
156
                $this->connection->sendMessage(new Unsubscribe(['sid' => $subscription['sid']]));
13✔
157
                unset($this->handlers[$subscription['sid']]);
13✔
158
            }
159
        }
160

161
        return $this;
13✔
162
    }
163

164
    public function getSubscriptions(): array
2✔
165
    {
166
        return $this->subscriptions;
2✔
167
    }
168

169
    public function setDelay(float $delay, string $mode = Configuration::DELAY_CONSTANT): self
1✔
170
    {
171
        $this->configuration->setDelay($delay, $mode);
1✔
172
        return $this;
1✔
173
    }
174

175
    public function setLogger(?LoggerInterface $logger): self
1✔
176
    {
177
        $this->logger = $logger;
1✔
178
        return $this;
1✔
179
    }
180

181
    public function process(null|int|float $timeout = 0, bool $reply = true): mixed
61✔
182
    {
183
        $message = $this->connection->getMessage($timeout);
61✔
184

185
        if ($message instanceof Msg) {
61✔
186
            if (array_key_exists($message->subject, $this->handlers)) {
61✔
187
                $result = $this->processMsg($this->handlers[$message->subject], $message, $reply);
61✔
188
                unset($this->handlers[$message->subject]);
61✔
189
                return $result;
61✔
190
            }
191
            if (array_key_exists($message->sid, $this->handlers)) {
21✔
192
                return $this->processMsg($this->handlers[$message->sid], $message, $reply);
21✔
193
            }
NEW
194
            if ($this->skipInvalidMessages) {
×
NEW
195
                return null;
×
196
            }
NEW
197
            throw new LogicException("No handler for message $message->sid or $message->subject");
×
198
        } else {
199
            return $message;
7✔
200
        }
201
    }
202

203
    private function doSubscribe(string $subject, ?string $group, ?callable $handler = null): self|Queue
31✔
204
    {
205
        $sid = bin2hex(random_bytes(4));
31✔
206
        if ($handler == null) {
31✔
207
            $this->handlers[$sid] = new Queue($this, $subject);
15✔
208
        } else {
209
            $this->handlers[$sid] = $handler;
16✔
210
        }
211

212
        $this->connection->sendMessage(new Subscribe([
31✔
213
            'sid' => $sid,
31✔
214
            'subject' => $subject,
31✔
215
            'group' => $group,
31✔
216
        ]));
31✔
217

218
        $this->subscriptions[] = [
31✔
219
            'name' => $subject,
31✔
220
            'sid' => $sid,
31✔
221
        ];
31✔
222

223
        if ($handler == null) {
31✔
224
            return $this->handlers[$sid];
15✔
225
        }
226
        return $this;
16✔
227
    }
228

229
    public function getName(): string
61✔
230
    {
231
        return $this->name;
61✔
232
    }
233

234
    public function setName(string $name): self
1✔
235
    {
236
        $this->name = $name;
1✔
237
        return $this;
1✔
238
    }
239

240
    public function setTimeout(float $value): self
×
241
    {
242
        $this->connection->setTimeout($value);
×
243
        return $this;
×
244
    }
245

246
    public function skipInvalidMessages(bool $skipInvalidMessages): self
6✔
247
    {
248
        $this->skipInvalidMessages = $skipInvalidMessages;
6✔
249
        return $this;
6✔
250
    }
251

252
    public function unsubscribeAll(): self
61✔
253
    {
254
        $this->unsubscribeRequests();
61✔
255

256
        foreach ($this->subscriptions as $index => $subscription) {
61✔
257
            unset($this->subscriptions[$index]);
3✔
258
            $this->connection->sendMessage(new Unsubscribe(['sid' => $subscription['sid']]));
3✔
259
            unset($this->handlers[$subscription['sid']]);
3✔
260
        }
261

262
        return $this;
61✔
263
    }
264

265
    public function disconnect(): self
61✔
266
    {
267
        if ($this->connection) {
61✔
268
            $this->unsubscribeAll();
61✔
269
            $this->connection->close();
61✔
270
            $this->connection = null;
61✔
271
        }
272
        return $this;
61✔
273
    }
274

275
    public function service(string $name, string $description, string $version): Service
9✔
276
    {
277
        if (!array_key_exists($name, $this->services)) {
9✔
278
            $this->services[$name] = new Service($this, $name, $description, $version);
9✔
279
        }
280

281
        return $this->services[$name];
9✔
282
    }
283

284
    private function getNextRid(): string
62✔
285
    {
286
        $this->nextRid++;
62✔
287

288
        return (string)$this->nextRid;
62✔
289
    }
290

291
    private function getNextReplyTo(): string
61✔
292
    {
293
        return $this->requestsSubject . '.' . $this->getNextRid();
61✔
294
    }
295

296
    private function subscribeRequests(): void
61✔
297
    {
298
        if (!$this->requestsSubscribed) {
61✔
299
            $this->connection->sendMessage(new Subscribe([
61✔
300
                'sid' => $this->requestsSid,
61✔
301
                'subject' => $this->requestsSubject . '.' . '*',
61✔
302
            ]));
61✔
303

304
            $this->requestsSubscribed = true;
61✔
305
        }
306
    }
307

308
    private function unsubscribeRequests(): void
61✔
309
    {
310
        if (!$this->requestsSubscribed) {
61✔
311
            $this->connection->sendMessage(new Unsubscribe(['sid' => (string)$this->requestsSid]));
2✔
312
            $this->requestsSubscribed = true;
2✔
313
        }
314
    }
315

316
    private function processMsg($handler, Msg $message, bool $reply): mixed
61✔
317
    {
318
        if ($handler instanceof Queue) {
61✔
319
            $handler->handle($message);
11✔
320
            return $handler;
11✔
321
        } else {
322
            $result = $handler($message->payload, $message->replyTo);
61✔
323
            if ($reply && $message->replyTo) {
61✔
324
                $message->reply($result);
2✔
325
            }
326
            return $result;
61✔
327
        }
328
    }
329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc