• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 27822845282

19 Jun 2026 11:24AM UTC coverage: 94.8% (-0.2%) from 94.967%
27822845282

push

github

nekufa
Fix issue 92.

6 of 8 new or added lines in 1 file covered. (75.0%)

1 existing line in 1 file now uncovered.

1568 of 1654 relevant lines covered (94.8%)

21.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.12
/src/Queue.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Msg;
8
use Basis\Nats\Message\Publish;
9
use Exception;
10

11
class Queue
12
{
13
    private array $queue = [];
14
    private float $timeout;
15
    private ?Publish $launcher = null;
16
    private ?Msg $lastMessage = null;
17

18
    public function __construct(
19✔
19
        public readonly Client $client,
20
        public readonly string $subject,
21
    ) {
22
        $this->timeout = $client->configuration->timeout;
19✔
23
    }
24

25
    public function fetch(): ?Msg
2✔
26
    {
27
        $messages = $this->fetchAll(1);
2✔
28
        return array_shift($messages);
2✔
29
    }
30

31
    public function fetchAll(int $limit = 0): array
18✔
32
    {
33
        if ($this->launcher) {
18✔
34
            $this->client->connection->sendMessage($this->launcher);
17✔
35
        }
36
        $max = microtime(true) + $this->timeout;
18✔
37
        while (true) {
18✔
38
            $now = microtime(true);
18✔
39
            if ($limit && count($this->queue) >= $limit) {
18✔
40
                // optional limit reached
41
                break;
15✔
42
            }
43

44
            $now = microtime(true);
18✔
45
            $processingTimeout = $this->timeout ? $max - $now : 0;
18✔
46
            if ($processingTimeout < 0) {
18✔
47
                // optional timeout reached
48
                break;
×
49
            }
50

51
            if ($this->client->process($processingTimeout) !== $this) {
18✔
52
                // stop when clients got message for another handler or there are no more messages
53
                break;
9✔
54
            }
55

56
            if (
57
                $this->lastMessage?->payload->isEmpty()
16✔
58
                && $this->lastMessage->payload->getHeader('Status-Code') === '404'
16✔
59
            ) {
UNCOV
60
                break;
×
61
            }
62
        }
63

64
        $result = [];
18✔
65
        while (count($this->queue) && (!$limit || count($result) < $limit)) {
18✔
66
            $message = array_shift($this->queue);
16✔
67
            $result[] = $message;
16✔
68
        }
69

70
        return $result;
18✔
71
    }
72

73
    public function handle(Msg $message): void
16✔
74
    {
75
        $this->queue[] = $this->lastMessage = $message;
16✔
76
    }
77

78
    public function next(float $timeout = 0): Msg
1✔
79
    {
80
        $start = microtime(true);
1✔
81
        while (true) {
1✔
82
            $message = $this->fetch();
1✔
83
            if ($message) {
1✔
84
                return $message;
1✔
85
            }
86
            if ($timeout && ($start + $timeout < microtime(true))) {
1✔
87
                throw new Exception("Subject $this->subject is empty");
1✔
88
            }
89
        }
90
    }
91

92
    public function setLauncher(Publish $message): void
17✔
93
    {
94
        $this->launcher = $message;
17✔
95
    }
96

97
    public function setTimeout(float $value): void
6✔
98
    {
99
        $this->timeout = $value;
6✔
100
    }
101
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc