• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

basis-company / nats.php / 23217248396

17 Mar 2026 09:28PM UTC coverage: 95.082% (+0.03%) from 95.05%
23217248396

push

github

nekufa
connection get message default timeout

7 of 8 new or added lines in 1 file covered. (87.5%)

33 existing lines in 3 files now uncovered.

1450 of 1525 relevant lines covered (95.08%)

77.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/src/Message/Msg.php
1
<?php
2

3
declare(strict_types=1);
4

5
namespace Basis\Nats\Message;
6

7
use Basis\Nats\Client;
8
use Exception;
9
use LogicException;
10

11
class Msg extends Prototype
12
{
13
    public int $length;
14
    public Payload $payload;
15
    public string $sid;
16
    public string $subject;
17

18
    public ?int $hlength = null;
19
    public ?int $timestampNanos = null;
20
    public ?string $replyTo = null;
21

22
    private ?Client $client = null;
23

24
    public static function create(string $data): self
272✔
25
    {
26
        $args = explode(' ', $data, 5);
272✔
27
        $values = [];
272✔
28
        foreach ($args as $k => $v) {
272✔
29
            if ($v === '') {
272✔
30
                unset($args[$k]);
8✔
31
            }
32
        }
33
        $args = array_values($args);
272✔
34

35
        switch (count($args)) {
272✔
36
            case 3:
272✔
37
                $values = array_combine(['subject', 'sid', 'length'], $args);
268✔
38
                break;
268✔
39

40
            case 4:
72✔
41
                $values = array_combine(['subject', 'sid', 'replyTo', 'length'], $args);
64✔
42
                if (is_numeric($values['replyTo'])) {
64✔
43
                    $values = array_combine(['subject', 'sid', 'hlength', 'length'], $args);
20✔
44
                }
45
                break;
64✔
46

47
            case 5:
12✔
48
                $values = array_combine(['subject', 'sid', 'replyTo', 'hlength', 'length'], $args);
8✔
49
                break;
8✔
50

51
            default:
52
                throw new Exception("Invalid Msg: $data");
4✔
53
        }
54

55
        foreach (['length', 'hlength'] as $key) {
268✔
56
            if (array_key_exists($key, $values)) {
268✔
57
                $values[$key] = (int) $values[$key];
268✔
58
            }
59
        }
60

61
        $values = self::tryParseMessageTime($values);
268✔
62

63
        return new self($values);
268✔
64
    }
65

66
    public function ack(): void
48✔
67
    {
68
        $this->reply(new Ack([
48✔
69
            'subject' => $this->replyTo
48✔
70
        ]));
48✔
71
    }
72

73
    public function getClient(): ?Client
4✔
74
    {
75
        return $this->client;
4✔
76
    }
77

78
    public function nack(float $delay = 0): void
8✔
79
    {
80
        $this->reply(new Nak([
8✔
81
            'subject' => $this->replyTo,
8✔
82
            'delay' => $delay,
8✔
83
        ]));
8✔
84
    }
85

86
    public function parse($payload): self
260✔
87
    {
88
        $headers = [];
260✔
89
        if ($this->hlength) {
260✔
90
            $headerString = substr($payload, 0, $this->hlength);
24✔
91
            foreach (explode("\r\n", $headerString) as $row) {
24✔
92
                if (!$row) {
24✔
93
                    continue;
24✔
94
                }
95
                if (strpos($row, 'NATS/') !== false) {
24✔
96
                    $parts = explode(' ', $row, 3);
24✔
97
                    if (count($parts) == 1) {
24✔
98
                        // empty header
99
                        continue;
8✔
100
                    }
101
                    [$nats, $code, $message] = $parts;
20✔
102
                    $headers['Status-Code'] = trim($code);
20✔
103
                    $headers['Status-Message'] = trim($message);
20✔
104
                } elseif (strpos($row, ':') !== false) {
21✔
105
                    [$key, $value] = explode(':', $row, 2);
21✔
106
                    $headers[trim($key)] = trim($value);
21✔
107
                } else {
108
                    throw new Exception("Invalid header row: " . $row);
×
109
                }
110
            }
111
            $payload = substr($payload, $this->hlength);
24✔
112
        }
113
        $this->payload = new Payload(
260✔
114
            $payload,
260✔
115
            $headers,
260✔
116
            $this->subject,
260✔
117
            $this->timestampNanos
260✔
118
        );
260✔
119

120
        return $this;
260✔
121
    }
122

123
    public function progress(): void
4✔
124
    {
125
        $this->reply(new Progress([
4✔
126
            'subject' => $this->replyTo,
4✔
127
        ]));
4✔
128
    }
129

130
    /**
131
     * Terminally reject this message so the server never redelivers it.
132
     *
133
     * Unlike nack() which schedules a redelivery, term() tells the server that
134
     * this message is permanently unprocessable (e.g. malformed payload, schema
135
     * mismatch) and should be removed from the consumer's pending set immediately.
136
     *
137
     * @param string $reason Optional human-readable reason for the termination.
138
     */
139
    public function term(string $reason = ''): void
4✔
140
    {
141
        $this->reply(new Term([
4✔
142
            'subject' => $this->replyTo,
4✔
143
            'reason' => $reason,
4✔
144
        ]));
4✔
145
    }
146

147
    public function render(): string
4✔
148
    {
149
        return 'MSG ' . json_encode($this);
4✔
150
    }
151

152
    public function reply($data): void
64✔
153
    {
154
        if (!$this->replyTo) {
64✔
UNCOV
155
            throw new LogicException("Invalid replyTo property");
×
156
        }
157
        if ($data instanceof Prototype) {
64✔
158
            $this->client->connection->sendMessage($data);
52✔
159
        } else {
160
            $this->client->publish($this->replyTo, $data);
12✔
161
        }
162
    }
163

164
    public function setClient($client): void
264✔
165
    {
166
        $this->client = $client;
264✔
167
    }
168

UNCOV
169
    public function __toString(): string
×
170
    {
UNCOV
171
        return $this->payload->body;
×
172
    }
173

174
    private static function tryParseMessageTime(array $values): array
268✔
175
    {
176
        if (
177
            !array_key_exists('replyTo', $values) || !str_starts_with($values['replyTo'], '$JS.ACK')
268✔
178
        ) {
179
            # This is not a JetStream message
180
            return $values;
268✔
181
        }
182

183
        # old format
184
        # "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
185
        # new format
186
        # $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
187
        $tokens = explode('.', $values['replyTo']);
56✔
188
        if (count($tokens) === 9) {
56✔
189
            # if it is an old format we will add two missing items to process tokens in the same way
190
            array_splice($tokens, 2, 0, ['', '']);
56✔
191
        }
192

193
        if (count($tokens) < 11) {
56✔
194
            # Looks like invalid format was given
UNCOV
195
            return $values;
×
196
        }
197

198
        $values['timestampNanos'] = (int) $tokens[9];
56✔
199

200
        return $values;
56✔
201
    }
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc