• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23489465477

24 Mar 2026 12:29PM UTC coverage: 93.321% (-1.9%) from 95.179%
23489465477

push

github

nekufa
ephermal consumer configuration fix

5 of 6 new or added lines in 3 files covered. (83.33%)

46 existing lines in 3 files now uncovered.

1537 of 1647 relevant lines covered (93.32%)

75.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.5
/src/Consumer/Consumer.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats\Consumer;
6

7
use Basis\Nats\Client;
8
use Basis\Nats\Consumer\Configuration;
9
use Basis\Nats\Queue;
10
use Basis\Nats\Message\Payload;
11
use Basis\Nats\Message\Publish;
12
use Throwable;
13

14
class Consumer
15
{
16
    private ?bool $exists = null;
17
    private bool $interrupt = false;
18
    private float $delay = 1;
19
    private float $expires = 0.1;
20
    private int $batch = 1;
21
    private int $iterations = PHP_INT_MAX;
22
    private ?Configuration $configuration = null;
23

24
    public function __construct(
96✔
25
        public readonly Client $client,
26
        private readonly string $stream,
27
        private readonly ?string $name = null,
28
    ) {
29
    }
96✔
30

31
    public function create($ifNotExists = true): self
80✔
32
    {
33
        if ($this->shouldCreateConsumer($ifNotExists)) {
80✔
34
            if ($this->getConfiguration()->isEphemeral()) {
80✔
35
                $command = 'CONSUMER.CREATE.' . $this->getStream();
12✔
36
            } else {
37
                $command = 'CONSUMER.DURABLE.CREATE.' . $this->getStream() . '.' . $this->getName();
68✔
38
            }
39

40
            $result = $this->client->api($command, $this->getConfiguration()->toArray());
80✔
41

42
            if ($this->getConfiguration()->isEphemeral()) {
80✔
43
                $this->getConfiguration()->setName($result->name);
12✔
44
            }
45

46
            $this->exists = true;
80✔
47
        }
48

49
        return $this;
80✔
50
    }
51

52
    public function delete(): self
8✔
53
    {
54
        $this->client->api('CONSUMER.DELETE.' . $this->getStream() . '.' . $this->getName());
8✔
55
        $this->exists = false;
8✔
56

57
        return $this;
8✔
58
    }
59

60
    public function exists(): bool
92✔
61
    {
62
        if ($this->exists !== null) {
92✔
63
            return $this->exists;
80✔
64
        }
65
        $stream = $this->client->getApi()->getStream($this->getStream());
80✔
66
        if (!$stream->exists()) {
80✔
67
            return false;
12✔
68
        }
69
        $consumers = $stream->getConsumerNames();
68✔
70
        return $this->exists = in_array($this->getName(), $consumers);
68✔
71
    }
72

73
    public function getConfiguration(): Configuration
92✔
74
    {
75
        if ($this->configuration == null) {
92✔
76
            if ($this->exists()) {
80✔
77
                $this->configuration = Configuration::fromObject($this->info()->getValues());
4✔
78
            } else {
79
                $this->configuration = new Configuration($this->getStream(), $this->getName());
80✔
80
            }
81
        }
82
        return $this->configuration;
92✔
83
    }
84

85
    public function getName(): ?string
92✔
86
    {
87
        // For ephemeral consumers, get name from configuration after creation
88
        if ($this->name === null && $this->configuration !== null) {
92✔
89
            return $this->configuration->getName();
12✔
90
        }
91
        return $this->name;
80✔
92
    }
93

94
    public function getStream(): string
92✔
95
    {
96
        return $this->stream;
92✔
97
    }
98

99
    public function getBatching(): int
68✔
100
    {
101
        return $this->batch;
68✔
102
    }
103

104
    public function getDelay(): float
4✔
105
    {
106
        return $this->delay;
4✔
107
    }
108

109
    public function getExpires(): float
68✔
110
    {
111
        return $this->expires;
68✔
112
    }
113

114
    public function getIterations(): int
44✔
115
    {
116
        return $this->iterations;
44✔
117
    }
118

119
    public function getQueue(): Queue
68✔
120
    {
121
        $queueSubject = 'handler.' . bin2hex(random_bytes(4));
68✔
122
        $queue = $this->client->subscribe($queueSubject);
68✔
123

124
        $args = [
68✔
125
            'batch' => $this->getBatching(),
68✔
126
        ];
68✔
127

128
        // convert to nanoseconds
129
        $expires = intval(1_000_000_000 * $this->getExpires());
68✔
130
        if ($expires) {
68✔
131
            $args['expires'] = $expires;
64✔
132
        } else {
133
            $args['no_wait'] = true;
4✔
134
        }
135

136
        $launcher = new Publish([
68✔
137
            'payload' => Payload::parse($args),
68✔
138
            'replyTo' => $queue->subject,
68✔
139
            'subject' => '$JS.API.CONSUMER.MSG.NEXT.' . $this->getStream() . '.' . $this->getName(),
68✔
140
        ]);
68✔
141

142
        $queue->setLauncher($launcher);
68✔
143
        return $queue;
68✔
144
    }
145

146
    public function handle(callable $messageHandler, ?callable $emptyHandler = null, bool $ack = true): int
44✔
147
    {
148
        $queue = $this->create()->getQueue();
44✔
149
        $iterations = $this->getIterations();
44✔
150
        $processed = 0;
44✔
151

152
        while (!$this->interrupt && $iterations--) {
44✔
153
            $messages = $queue->fetchAll($this->getBatching());
44✔
154
            foreach ($messages as $message) {
44✔
155
                $payload = $message->payload;
36✔
156
                if ($payload->isEmpty()) {
36✔
157
                    if ($emptyHandler && !in_array($payload->getHeader('KV-Operation'), ['DEL', 'PURGE'])) {
16✔
158
                        $emptyHandler($payload, $message->replyTo);
8✔
159
                    }
160
                    continue;
16✔
161
                }
162
                $processed++;
36✔
163
                try {
164
                    $messageHandler($payload, $message->replyTo);
36✔
165
                    if ($ack) {
36✔
166
                        $message->ack();
36✔
167
                    }
168
                } catch (Throwable $e) {
×
UNCOV
169
                    if ($ack) {
×
UNCOV
170
                        $message->nack();
×
171
                    }
UNCOV
172
                    throw $e;
×
173
                }
174
                if ($this->interrupt) {
36✔
175
                    $this->interrupt = false;
4✔
176
                    break 2;
4✔
177
                }
178
            }
179
            if (!count($messages) && $emptyHandler) {
40✔
180
                $emptyHandler();
4✔
181
                if ($iterations) {
4✔
UNCOV
182
                    usleep((int) floor($this->getDelay() * 1_000_000));
×
183
                }
184
            }
185
        }
186

187
        $this->client->unsubscribe($queue);
44✔
188
        return $processed;
44✔
189
    }
190

191
    public function info()
32✔
192
    {
193
        return $this->client->api("CONSUMER.INFO." . $this->getStream() . '.' . $this->getName());
32✔
194
    }
195

196
    public function interrupt()
12✔
197
    {
198
        $this->interrupt = true;
12✔
199
    }
200

201
    public function setBatching(int $batch): self
36✔
202
    {
203
        $this->batch = $batch;
36✔
204

205
        return $this;
36✔
206
    }
207

208
    public function setConfiguration(Configuration $configuration)
12✔
209
    {
210
        $this->configuration = $configuration;
12✔
211
    }
212

213
    public function setDelay(float $delay): self
16✔
214
    {
215
        $this->delay = $delay;
16✔
216

217
        return $this;
16✔
218
    }
219

220
    public function setExpires(float $expires): self
56✔
221
    {
222
        $this->expires = $expires;
56✔
223

224
        return $this;
56✔
225
    }
226

227
    public function setIterations(int $iterations): self
36✔
228
    {
229
        $this->iterations = $iterations;
36✔
230

231
        return $this;
36✔
232
    }
233

234
    private function shouldCreateConsumer(bool $ifNotExists): bool
80✔
235
    {
236
        return ($this->getConfiguration()->isEphemeral() && $this->getConfiguration()->getName() === null)
80✔
237
            || !$this->exists();
80✔
238
    }
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc