• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 21856134653

10 Feb 2026 07:44AM UTC coverage: 95.05% (+0.07%) from 94.983%
21856134653

push

github

nekufa
verbose tests

1421 of 1495 relevant lines covered (95.05%)

72.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/Queue.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats;
6

7
use Basis\Nats\Message\Msg;
8
use Basis\Nats\Message\Publish;
9
use Exception;
10

11
class Queue
12
{
13
    private array $queue = [];
14
    private float $timeout;
15
    private ?Publish $launcher = null;
16
    private ?Msg $lastMessage = null;
17

18
    public function __construct(
60✔
19
        public readonly Client $client,
20
        public readonly string $subject,
21
    ) {
22
        $this->timeout = $client->configuration->timeout;
60✔
23
    }
24

25
    public function fetch(): ?Msg
8✔
26
    {
27
        $messages = $this->fetchAll(1);
8✔
28
        return array_shift($messages);
8✔
29
    }
30

31
    public function fetchAll(int $limit = 0): array
56✔
32
    {
33
        if ($this->launcher) {
56✔
34
            $this->client->connection->sendMessage($this->launcher);
52✔
35
        }
36
        $max = microtime(true) + $this->timeout;
56✔
37
        while (true) {
56✔
38
            $now = microtime(true);
56✔
39
            if ($limit && count($this->queue) >= $limit) {
56✔
40
                // optional limit reached
41
                break;
44✔
42
            }
43

44
            $now = microtime(true);
56✔
45
            $processingTimeout = $this->timeout ? $max - $now : 0;
56✔
46
            if ($processingTimeout < 0) {
56✔
47
                // optional timeout reached
48
                break;
×
49
            }
50

51
            if ($this->client->process($processingTimeout) !== $this) {
56✔
52
                // stop when clients got message for another handler or there are no more messages
53
                break;
21✔
54
            }
55

56
            if (
57
                $this->lastMessage?->payload->isEmpty()
48✔
58
                && $this->lastMessage->payload->getHeader('Status-Code') === '404'
48✔
59
            ) {
60
                break;
3✔
61
            }
62
        }
63

64
        $result = [];
56✔
65
        while (count($this->queue) && (!$limit || count($result) < $limit)) {
56✔
66
            $message = array_shift($this->queue);
48✔
67
            $result[] = $message;
48✔
68
        }
69

70
        return $result;
56✔
71
    }
72

73
    public function handle(Msg $message)
48✔
74
    {
75
        $this->queue[] = $this->lastMessage = $message;
48✔
76
    }
77

78
    public function next(float $timeout = 0): Msg
4✔
79
    {
80
        $start = microtime(true);
4✔
81
        while (true) {
4✔
82
            $message = $this->fetch();
4✔
83
            if ($message) {
4✔
84
                return $message;
4✔
85
            }
86
            if ($timeout && ($start + $timeout < microtime(true))) {
4✔
87
                throw new Exception("Subject $this->subject is empty");
4✔
88
            }
89
        }
90
    }
91

92
    public function setLauncher(Publish $message): void
52✔
93
    {
94
        $this->launcher = $message;
52✔
95
    }
96

97
    public function setTimeout(float $value): void
8✔
98
    {
99
        $this->timeout = $value;
8✔
100
    }
101
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc