• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23489983957

24 Mar 2026 12:42PM UTC coverage: 94.961% (+1.6%) from 93.321%
23489983957

push

github

nekufa
configuration restore tests

1564 of 1647 relevant lines covered (94.96%)

84.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/Queue.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Msg;
8
use Basis\Nats\Message\Publish;
9
use Exception;
10

11
class Queue
12
{
13
    private array $queue = [];
14
    private float $timeout;
15
    private ?Publish $launcher = null;
16
    private ?Msg $lastMessage = null;
17

18
    public function __construct(
76✔
19
        public readonly Client $client,
20
        public readonly string $subject,
21
    ) {
22
        $this->timeout = $client->configuration->timeout;
76✔
23
    }
24

25
    public function fetch(): ?Msg
8✔
26
    {
27
        $messages = $this->fetchAll(1);
8✔
28
        return array_shift($messages);
8✔
29
    }
30

31
    public function fetchAll(int $limit = 0): array
72✔
32
    {
33
        if ($this->launcher) {
72✔
34
            $this->client->connection->sendMessage($this->launcher);
68✔
35
        }
36
        $max = microtime(true) + $this->timeout;
72✔
37
        while (true) {
72✔
38
            $now = microtime(true);
72✔
39
            if ($limit && count($this->queue) >= $limit) {
72✔
40
                // optional limit reached
41
                break;
60✔
42
            }
43

44
            $now = microtime(true);
72✔
45
            $processingTimeout = $this->timeout ? $max - $now : 0;
72✔
46
            if ($processingTimeout < 0) {
72✔
47
                // optional timeout reached
48
                break;
×
49
            }
50

51
            if ($this->client->process($processingTimeout) !== $this) {
72✔
52
                // stop when clients got message for another handler or there are no more messages
53
                break;
33✔
54
            }
55

56
            if (
57
                $this->lastMessage?->payload->isEmpty()
64✔
58
                && $this->lastMessage->payload->getHeader('Status-Code') === '404'
64✔
59
            ) {
60
                break;
3✔
61
            }
62
        }
63

64
        $result = [];
72✔
65
        while (count($this->queue) && (!$limit || count($result) < $limit)) {
72✔
66
            $message = array_shift($this->queue);
64✔
67
            $result[] = $message;
64✔
68
        }
69

70
        return $result;
72✔
71
    }
72

73
    public function handle(Msg $message)
64✔
74
    {
75
        $this->queue[] = $this->lastMessage = $message;
64✔
76
    }
77

78
    public function next(float $timeout = 0): Msg
4✔
79
    {
80
        $start = microtime(true);
4✔
81
        while (true) {
4✔
82
            $message = $this->fetch();
4✔
83
            if ($message) {
4✔
84
                return $message;
4✔
85
            }
86
            if ($timeout && ($start + $timeout < microtime(true))) {
4✔
87
                throw new Exception("Subject $this->subject is empty");
4✔
88
            }
89
        }
90
    }
91

92
    public function setLauncher(Publish $message): void
68✔
93
    {
94
        $this->launcher = $message;
68✔
95
    }
96

97
    public function setTimeout(float $value): void
24✔
98
    {
99
        $this->timeout = $value;
24✔
100
    }
101
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc