• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.86
/core/src/parser.ts
1
// deno-lint-ignore-file no-undef
2
/*
3
 * Copyright 2020-2021 The NATS Authors
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import { DenoBuffer } from "./denobuffer.ts";
55✔
17
import { TD } from "./encoders.ts";
55✔
18
import type { Dispatcher } from "./core.ts";
19

20
export const Kind = {
55✔
21
  OK: 0,
55✔
22
  ERR: 1,
55✔
23
  MSG: 2,
55✔
24
  INFO: 3,
55✔
25
  PING: 4,
55✔
26
  PONG: 5,
55✔
27
} as const;
55✔
28

29
export type Kind = typeof Kind[keyof typeof Kind];
30

31
export interface ParserEvent {
32
  kind: Kind;
33
  msg?: MsgArg;
34
  data?: Uint8Array;
35
}
36

37
export function describe(e: ParserEvent): string {
×
38
  let ks: string;
×
39
  let data = "";
×
40

41
  switch (e.kind) {
×
42
    case Kind.MSG:
×
43
      ks = "MSG";
×
44
      break;
×
45
    case Kind.OK:
×
46
      ks = "OK";
×
47
      break;
×
48
    case Kind.ERR:
×
49
      ks = "ERR";
×
50
      data = TD.decode(e.data);
×
51
      break;
×
52
    case Kind.PING:
×
53
      ks = "PING";
×
54
      break;
×
55
    case Kind.PONG:
×
56
      ks = "PONG";
×
57
      break;
×
58
    case Kind.INFO:
×
59
      ks = "INFO";
×
60
      data = TD.decode(e.data);
×
61
  }
×
62
  return `${ks}: ${data}`;
×
63
}
×
64

65
export interface MsgArg {
66
  subject: Uint8Array;
67
  reply?: Uint8Array;
68
  sid: number;
69
  hdr: number;
70
  size: number;
71
}
72

UNCOV
73
function newMsgArg(): MsgArg {
396,860✔
UNCOV
74
  const ma = {} as MsgArg;
396,860✔
UNCOV
75
  ma.sid = -1;
396,860✔
UNCOV
76
  ma.hdr = -1;
396,860✔
UNCOV
77
  ma.size = -1;
396,860✔
78

UNCOV
79
  return ma;
396,860✔
UNCOV
80
}
396,860✔
81

82
const ASCII_0 = 48;
55✔
83
const ASCII_9 = 57;
55✔
84

85
// This is an almost verbatim port of the Go NATS parser
86
// https://github.com/nats-io/nats.go/blob/master/parser.go
87
export class Parser {
55✔
UNCOV
88
  dispatcher: Dispatcher<ParserEvent>;
54✔
UNCOV
89
  state: State;
1,626✔
UNCOV
90
  as: number;
1,626✔
UNCOV
91
  drop: number;
1,626✔
UNCOV
92
  hdr: number;
1,626✔
UNCOV
93
  ma!: MsgArg;
1,626✔
UNCOV
94
  argBuf?: DenoBuffer;
1,626✔
UNCOV
95
  msgBuf?: DenoBuffer;
54✔
96

UNCOV
97
  constructor(dispatcher: Dispatcher<ParserEvent>) {
54✔
UNCOV
98
    this.dispatcher = dispatcher;
1,626✔
UNCOV
99
    this.state = State.OP_START;
1,626✔
UNCOV
100
    this.as = 0;
1,626✔
UNCOV
101
    this.drop = 0;
1,626✔
UNCOV
102
    this.hdr = 0;
1,626✔
UNCOV
103
  }
1,626✔
104

UNCOV
105
  parse(buf: Uint8Array): void {
54✔
UNCOV
106
    let i: number;
21,651✔
UNCOV
107
    for (i = 0; i < buf.length; i++) {
21,651✔
UNCOV
108
      const b = buf[i];
20,651,540✔
UNCOV
109
      switch (this.state) {
20,651,540✔
UNCOV
110
        case State.OP_START:
20,651,540✔
UNCOV
111
          switch (b) {
21,050,271✔
UNCOV
112
            case cc.M:
21,050,271✔
UNCOV
113
            case cc.m:
21,050,271✔
UNCOV
114
              this.state = State.OP_M;
21,446,130✔
UNCOV
115
              this.hdr = -1;
21,446,130✔
UNCOV
116
              this.ma = newMsgArg();
21,446,130✔
UNCOV
117
              break;
21,446,130✔
UNCOV
118
            case cc.H:
21,050,271✔
UNCOV
119
            case cc.h:
21,050,271✔
UNCOV
120
              this.state = State.OP_H;
21,051,218✔
UNCOV
121
              this.hdr = 0;
21,051,218✔
UNCOV
122
              this.ma = newMsgArg();
21,051,218✔
UNCOV
123
              break;
21,051,218✔
UNCOV
124
            case cc.P:
21,050,271✔
UNCOV
125
            case cc.p:
21,050,271✔
UNCOV
126
              this.state = State.OP_P;
21,051,351✔
UNCOV
127
              break;
21,051,351✔
128
            case cc.PLUS:
12,186,363✔
129
              this.state = State.OP_PLUS;
12,186,371✔
130
              break;
12,186,371✔
UNCOV
131
            case cc.MINUS:
12,206,511✔
UNCOV
132
              this.state = State.OP_MINUS;
12,206,579✔
UNCOV
133
              break;
12,206,579✔
UNCOV
134
            case cc.I:
21,050,271✔
UNCOV
135
            case cc.i:
21,050,271✔
UNCOV
136
              this.state = State.OP_I;
21,051,036✔
UNCOV
137
              break;
21,051,036✔
138
            default:
12,186,363✔
139
              throw this.fail(buf.subarray(i));
12,186,367✔
UNCOV
140
          }
21,050,271✔
UNCOV
141
          break;
21,319,185✔
UNCOV
142
        case State.OP_H:
20,651,540✔
UNCOV
143
          switch (b) {
20,652,487✔
UNCOV
144
            case cc.M:
20,652,487✔
UNCOV
145
            case cc.m:
20,652,487✔
UNCOV
146
              this.state = State.OP_M;
20,652,487✔
UNCOV
147
              break;
20,652,487✔
148
            default:
×
149
              throw this.fail(buf.subarray(i));
×
UNCOV
150
          }
20,652,487✔
UNCOV
151
          break;
20,652,487✔
UNCOV
152
        case State.OP_M:
20,651,540✔
UNCOV
153
          switch (b) {
21,048,346✔
UNCOV
154
            case cc.S:
21,048,346✔
UNCOV
155
            case cc.s:
21,048,346✔
UNCOV
156
              this.state = State.OP_MS;
21,316,326✔
UNCOV
157
              break;
21,316,326✔
158
            default:
12,185,427!
159
              throw this.fail(buf.subarray(i));
12,185,429✔
UNCOV
160
          }
21,048,346✔
UNCOV
161
          break;
21,316,326✔
UNCOV
162
        case State.OP_MS:
20,651,540✔
UNCOV
163
          switch (b) {
21,048,344✔
UNCOV
164
            case cc.G:
21,048,344✔
UNCOV
165
            case cc.g:
21,048,344✔
UNCOV
166
              this.state = State.OP_MSG;
21,316,322✔
UNCOV
167
              break;
21,316,322✔
168
            default:
12,185,425!
169
              throw this.fail(buf.subarray(i));
12,185,427✔
UNCOV
170
          }
21,048,344✔
UNCOV
171
          break;
21,316,322✔
UNCOV
172
        case State.OP_MSG:
20,651,540✔
UNCOV
173
          switch (b) {
21,048,342✔
UNCOV
174
            case cc.SPACE:
21,048,342✔
UNCOV
175
            case cc.TAB:
21,048,342✔
UNCOV
176
              this.state = State.OP_MSG_SPC;
21,316,318✔
UNCOV
177
              break;
21,316,318✔
178
            default:
12,185,423!
179
              throw this.fail(buf.subarray(i));
12,185,425✔
UNCOV
180
          }
21,048,342✔
UNCOV
181
          break;
21,316,318✔
UNCOV
182
        case State.OP_MSG_SPC:
20,651,540✔
UNCOV
183
          switch (b) {
21,048,340✔
184
            case cc.SPACE:
×
185
            case cc.TAB:
×
186
              continue;
×
UNCOV
187
            default:
21,048,340✔
UNCOV
188
              this.state = State.MSG_ARG;
21,048,340✔
UNCOV
189
              this.as = i;
21,048,340✔
UNCOV
190
          }
21,048,340✔
UNCOV
191
          break;
21,048,340✔
UNCOV
192
        case State.MSG_ARG:
20,651,540✔
UNCOV
193
          switch (b) {
38,180,581✔
UNCOV
194
            case cc.CR:
38,180,581✔
UNCOV
195
              this.drop = 1;
38,577,379✔
UNCOV
196
              break;
38,577,379✔
UNCOV
197
            case cc.NL: {
76,757,962✔
UNCOV
198
              const arg: Uint8Array = this.argBuf
22,085,515!
UNCOV
199
                ? this.argBuf.bytes()
22,085,515✔
UNCOV
200
                : buf.subarray(this.as, i - this.drop);
22,085,515✔
UNCOV
201
              this.processMsgArgs(arg);
38,577,381✔
UNCOV
202
              this.drop = 0;
38,577,381✔
UNCOV
203
              this.as = i + 1;
38,577,381✔
UNCOV
204
              this.state = State.MSG_PAYLOAD;
38,577,381✔
205

206
              // jump ahead with the index. If this overruns
207
              // what is left we fall out and process a split buffer.
UNCOV
208
              i = this.as + this.ma.size - 1;
38,577,381✔
UNCOV
209
              break;
38,577,381✔
UNCOV
210
            }
38,577,381✔
UNCOV
211
            default:
38,180,581✔
UNCOV
212
              if (this.argBuf) {
31,181,683!
UNCOV
213
                this.argBuf.writeByte(b);
31,182,141✔
UNCOV
214
              }
31,182,141✔
UNCOV
215
          }
38,180,581✔
UNCOV
216
          break;
48,080,659✔
UNCOV
217
        case State.MSG_PAYLOAD:
20,651,540✔
UNCOV
218
          if (this.msgBuf) {
20,923,005✔
UNCOV
219
            if (this.msgBuf.length >= this.ma.size) {
20,925,847✔
UNCOV
220
              const data = this.msgBuf.bytes({ copy: false });
62,781,195✔
UNCOV
221
              this.dispatcher.push(
20,927,065✔
UNCOV
222
                { kind: Kind.MSG, msg: this.ma, data: data },
104,635,325✔
223
              );
UNCOV
224
              this.argBuf = undefined;
20,927,065✔
UNCOV
225
              this.msgBuf = undefined;
20,927,065✔
UNCOV
226
              this.state = State.MSG_END;
20,927,065✔
UNCOV
227
            } else {
20,925,904✔
UNCOV
228
              let toCopy = this.ma.size - this.msgBuf.length;
20,927,585✔
UNCOV
229
              const avail = buf.length - i;
20,927,585✔
230

UNCOV
231
              if (avail < toCopy) {
20,927,585✔
UNCOV
232
                toCopy = avail;
20,928,104✔
UNCOV
233
              }
20,928,104✔
234

UNCOV
235
              if (toCopy > 0) {
20,927,585✔
UNCOV
236
                this.msgBuf.write(buf.subarray(i, i + toCopy));
20,927,585✔
UNCOV
237
                i = (i + toCopy) - 1;
20,927,585✔
238
              } else {
×
239
                this.msgBuf.writeByte(b);
×
240
              }
×
UNCOV
241
            }
20,927,585✔
UNCOV
242
          } else if (i - this.as >= this.ma.size) {
20,923,005✔
UNCOV
243
            this.dispatcher.push(
21,444,535✔
UNCOV
244
              { kind: Kind.MSG, msg: this.ma, data: buf.subarray(this.as, i) },
107,222,675✔
245
            );
UNCOV
246
            this.argBuf = undefined;
21,444,535✔
UNCOV
247
            this.msgBuf = undefined;
21,444,535✔
UNCOV
248
            this.state = State.MSG_END;
21,444,535✔
UNCOV
249
          }
21,444,535✔
UNCOV
250
          break;
21,050,002✔
UNCOV
251
        case State.MSG_END:
20,651,540✔
UNCOV
252
          switch (b) {
21,048,321✔
UNCOV
253
            case cc.NL:
21,048,321✔
UNCOV
254
              this.drop = 0;
21,048,321✔
UNCOV
255
              this.as = i + 1;
21,048,321✔
UNCOV
256
              this.state = State.OP_START;
21,048,321✔
UNCOV
257
              break;
21,048,321✔
258
            default:
×
259
              continue;
×
UNCOV
260
          }
21,048,321✔
UNCOV
261
          break;
21,048,321✔
262
        case State.OP_PLUS:
11,917,445!
263
          switch (b) {
11,917,453✔
264
            case cc.O:
11,917,453✔
265
            case cc.o:
11,917,453✔
266
              this.state = State.OP_PLUS_O;
11,917,457✔
267
              break;
11,917,457✔
268
            default:
11,917,453✔
269
              throw this.fail(buf.subarray(i));
11,917,457✔
270
          }
11,917,453✔
271
          break;
11,917,457✔
272
        case State.OP_PLUS_O:
11,917,445!
273
          switch (b) {
11,917,449✔
274
            case cc.K:
11,917,449✔
275
            case cc.k:
11,917,449✔
276
              this.state = State.OP_PLUS_OK;
11,917,449✔
277
              break;
11,917,449✔
278
            default:
×
279
              throw this.fail(buf.subarray(i));
×
280
          }
11,917,449✔
281
          break;
11,917,449✔
282
        case State.OP_PLUS_OK:
11,917,445!
283
          switch (b) {
11,917,457✔
284
            case cc.NL:
11,917,457✔
285
              this.dispatcher.push({ kind: Kind.OK });
35,752,383✔
286
              this.drop = 0;
11,917,461✔
287
              this.state = State.OP_START;
11,917,461✔
288
              break;
11,917,461✔
289
          }
11,917,457✔
290
          break;
11,917,457✔
UNCOV
291
        case State.OP_MINUS:
11,937,362!
UNCOV
292
          switch (b) {
11,937,430✔
UNCOV
293
            case cc.E:
11,937,430✔
UNCOV
294
            case cc.e:
11,937,430✔
UNCOV
295
              this.state = State.OP_MINUS_E;
11,937,494✔
UNCOV
296
              break;
11,937,494✔
297
            default:
11,917,511!
298
              throw this.fail(buf.subarray(i));
11,917,513✔
UNCOV
299
          }
11,937,430✔
UNCOV
300
          break;
11,937,494✔
UNCOV
301
        case State.OP_MINUS_E:
11,937,362!
UNCOV
302
          switch (b) {
11,937,428✔
UNCOV
303
            case cc.R:
11,937,428✔
UNCOV
304
            case cc.r:
11,937,428✔
UNCOV
305
              this.state = State.OP_MINUS_ER;
11,937,490✔
UNCOV
306
              break;
11,937,490✔
307
            default:
11,917,509!
308
              throw this.fail(buf.subarray(i));
11,917,511✔
UNCOV
309
          }
11,937,428✔
UNCOV
310
          break;
11,937,490✔
UNCOV
311
        case State.OP_MINUS_ER:
11,937,362!
UNCOV
312
          switch (b) {
11,937,426✔
UNCOV
313
            case cc.R:
11,937,426✔
UNCOV
314
            case cc.r:
11,937,426✔
UNCOV
315
              this.state = State.OP_MINUS_ERR;
11,937,486✔
UNCOV
316
              break;
11,937,486✔
317
            default:
11,917,507!
318
              throw this.fail(buf.subarray(i));
11,917,509✔
UNCOV
319
          }
11,937,426✔
UNCOV
320
          break;
11,937,486✔
UNCOV
321
        case State.OP_MINUS_ERR:
11,937,362!
UNCOV
322
          switch (b) {
11,937,424✔
UNCOV
323
            case cc.SPACE:
11,937,424✔
UNCOV
324
            case cc.TAB:
11,937,424✔
UNCOV
325
              this.state = State.OP_MINUS_ERR_SPC;
11,937,482✔
UNCOV
326
              break;
11,937,482✔
327
            default:
11,917,505!
328
              throw this.fail(buf.subarray(i));
11,917,507✔
UNCOV
329
          }
11,937,424✔
UNCOV
330
          break;
11,937,482✔
UNCOV
331
        case State.OP_MINUS_ERR_SPC:
11,937,362!
UNCOV
332
          switch (b) {
11,937,422✔
333
            case cc.SPACE:
×
334
            case cc.TAB:
×
335
              continue;
×
UNCOV
336
            default:
11,937,422✔
UNCOV
337
              this.state = State.MINUS_ERR_ARG;
11,937,422✔
UNCOV
338
              this.as = i;
11,937,422✔
UNCOV
339
          }
11,937,422✔
UNCOV
340
          break;
11,937,422✔
UNCOV
341
        case State.MINUS_ERR_ARG:
11,937,362!
UNCOV
342
          switch (b) {
11,939,508✔
UNCOV
343
            case cc.CR:
11,939,508✔
UNCOV
344
              this.drop = 1;
11,939,568✔
UNCOV
345
              break;
11,939,568✔
UNCOV
346
            case cc.NL: {
23,879,076✔
UNCOV
347
              let arg: Uint8Array;
11,939,568✔
348
              if (this.argBuf) {
11,919,515!
349
                arg = this.argBuf.bytes();
11,919,573✔
350
                this.argBuf = undefined;
11,919,573✔
351
              } else {
11,919,571✔
UNCOV
352
                arg = buf.subarray(this.as, i - this.drop);
11,939,624✔
UNCOV
353
              }
11,939,624✔
UNCOV
354
              this.dispatcher.push({ kind: Kind.ERR, data: arg });
47,758,272✔
UNCOV
355
              this.drop = 0;
11,939,568✔
UNCOV
356
              this.as = i + 1;
11,939,568✔
UNCOV
357
              this.state = State.OP_START;
11,939,568✔
UNCOV
358
              break;
11,939,568✔
UNCOV
359
            }
11,939,568✔
UNCOV
360
            default:
11,939,508✔
361
              if (this.argBuf) {
11,921,353!
362
                this.argBuf.write(Uint8Array.of(b));
11,921,371✔
363
              }
11,921,371✔
UNCOV
364
          }
11,939,508✔
UNCOV
365
          break;
11,939,508✔
UNCOV
366
        case State.OP_P:
20,651,540✔
UNCOV
367
          switch (b) {
20,652,620✔
UNCOV
368
            case cc.I:
20,652,620✔
UNCOV
369
            case cc.i:
20,652,620✔
UNCOV
370
              this.state = State.OP_PI;
20,652,673✔
UNCOV
371
              break;
20,652,673✔
UNCOV
372
            case cc.O:
20,652,620✔
UNCOV
373
            case cc.o:
20,652,620✔
UNCOV
374
              this.state = State.OP_PO;
20,653,645✔
UNCOV
375
              break;
20,653,645✔
376
            default:
11,917,970!
377
              throw this.fail(buf.subarray(i));
11,917,972✔
UNCOV
378
          }
20,652,620✔
UNCOV
379
          break;
20,653,143✔
UNCOV
380
        case State.OP_PO:
20,651,540✔
UNCOV
381
          switch (b) {
20,652,565✔
UNCOV
382
            case cc.N:
20,652,565✔
UNCOV
383
            case cc.n:
20,652,565✔
UNCOV
384
              this.state = State.OP_PON;
20,653,068✔
UNCOV
385
              break;
20,653,068✔
386
            default:
11,917,950!
387
              throw this.fail(buf.subarray(i));
11,917,952✔
UNCOV
388
          }
20,652,565✔
UNCOV
389
          break;
20,653,068✔
UNCOV
390
        case State.OP_PON:
20,651,540✔
UNCOV
391
          switch (b) {
20,652,563✔
UNCOV
392
            case cc.G:
20,652,563✔
UNCOV
393
            case cc.g:
20,652,563✔
UNCOV
394
              this.state = State.OP_PONG;
20,653,064✔
UNCOV
395
              break;
20,653,064✔
396
            default:
11,917,948!
397
              throw this.fail(buf.subarray(i));
11,917,950✔
UNCOV
398
          }
20,652,563✔
UNCOV
399
          break;
20,653,064✔
UNCOV
400
        case State.OP_PONG:
20,651,540✔
UNCOV
401
          switch (b) {
20,653,582✔
UNCOV
402
            case cc.NL:
20,653,582✔
UNCOV
403
              this.dispatcher.push({ kind: Kind.PONG });
61,963,809✔
UNCOV
404
              this.drop = 0;
20,654,603✔
UNCOV
405
              this.state = State.OP_START;
20,654,603✔
UNCOV
406
              break;
20,654,603✔
UNCOV
407
          }
20,653,582✔
UNCOV
408
          break;
20,653,582✔
UNCOV
409
        case State.OP_PI:
20,651,540✔
UNCOV
410
          switch (b) {
20,651,593✔
UNCOV
411
            case cc.N:
20,651,593✔
UNCOV
412
            case cc.n:
20,651,593✔
UNCOV
413
              this.state = State.OP_PIN;
20,651,609✔
UNCOV
414
              break;
20,651,609✔
415
            default:
11,917,463!
416
              throw this.fail(buf.subarray(i));
11,917,465✔
UNCOV
417
          }
20,651,593✔
UNCOV
418
          break;
20,651,609✔
UNCOV
419
        case State.OP_PIN:
20,651,540✔
UNCOV
420
          switch (b) {
20,651,591✔
UNCOV
421
            case cc.G:
20,651,591✔
UNCOV
422
            case cc.g:
20,651,591✔
UNCOV
423
              this.state = State.OP_PING;
20,651,605✔
UNCOV
424
              break;
20,651,605✔
425
            default:
11,917,461!
426
              throw this.fail(buf.subarray(i));
11,917,463✔
UNCOV
427
          }
20,651,591✔
UNCOV
428
          break;
20,651,605✔
UNCOV
429
        case State.OP_PING:
20,651,540✔
UNCOV
430
          switch (b) {
20,651,654✔
UNCOV
431
            case cc.NL:
20,651,654✔
UNCOV
432
              this.dispatcher.push({ kind: Kind.PING });
61,955,109✔
UNCOV
433
              this.drop = 0;
20,651,703✔
UNCOV
434
              this.state = State.OP_START;
20,651,703✔
UNCOV
435
              break;
20,651,703✔
UNCOV
436
          }
20,651,654✔
UNCOV
437
          break;
20,651,654✔
UNCOV
438
        case State.OP_I:
20,651,540✔
UNCOV
439
          switch (b) {
20,652,315✔
UNCOV
440
            case cc.N:
20,652,315✔
UNCOV
441
            case cc.n:
20,652,315✔
UNCOV
442
              this.state = State.OP_IN;
20,652,646✔
UNCOV
443
              break;
20,652,646✔
444
            default:
11,917,788!
445
              throw this.fail(buf.subarray(i));
11,917,800✔
UNCOV
446
          }
20,652,315✔
UNCOV
447
          break;
20,652,646✔
UNCOV
448
        case State.OP_IN:
20,651,540✔
UNCOV
449
          switch (b) {
20,652,303✔
UNCOV
450
            case cc.F:
20,652,303✔
UNCOV
451
            case cc.f:
20,652,303✔
UNCOV
452
              this.state = State.OP_INF;
20,652,303✔
UNCOV
453
              break;
20,652,303✔
454
            default:
×
455
              throw this.fail(buf.subarray(i));
×
UNCOV
456
          }
20,652,303✔
UNCOV
457
          break;
20,652,303✔
UNCOV
458
        case State.OP_INF:
20,651,540✔
UNCOV
459
          switch (b) {
20,652,303✔
UNCOV
460
            case cc.O:
20,652,303✔
UNCOV
461
            case cc.o:
20,652,303✔
UNCOV
462
              this.state = State.OP_INFO;
20,652,303✔
UNCOV
463
              break;
20,652,303✔
464
            default:
×
465
              throw this.fail(buf.subarray(i));
×
UNCOV
466
          }
20,652,303✔
UNCOV
467
          break;
20,652,303✔
UNCOV
468
        case State.OP_INFO:
20,651,540✔
UNCOV
469
          switch (b) {
20,652,303✔
UNCOV
470
            case cc.SPACE:
20,652,303✔
UNCOV
471
            case cc.TAB:
20,652,303✔
UNCOV
472
              this.state = State.OP_INFO_SPC;
20,652,303✔
UNCOV
473
              break;
20,652,303✔
474
            default:
×
475
              throw this.fail(buf.subarray(i));
×
UNCOV
476
          }
20,652,303✔
UNCOV
477
          break;
20,652,303✔
UNCOV
478
        case State.OP_INFO_SPC:
20,651,540✔
UNCOV
479
          switch (b) {
20,652,305✔
480
            case cc.SPACE:
11,917,778!
481
            case cc.TAB:
11,917,778!
482
              continue;
11,917,780✔
UNCOV
483
            default:
20,652,305✔
UNCOV
484
              this.state = State.INFO_ARG;
20,652,636✔
UNCOV
485
              this.as = i;
20,652,636✔
UNCOV
486
          }
20,652,305✔
UNCOV
487
          break;
20,652,636✔
UNCOV
488
        case State.INFO_ARG:
20,651,540✔
UNCOV
489
          switch (b) {
20,958,548✔
UNCOV
490
            case cc.CR:
20,958,548✔
UNCOV
491
              this.drop = 1;
20,959,311✔
UNCOV
492
              break;
20,959,311✔
UNCOV
493
            case cc.NL: {
41,917,859✔
UNCOV
494
              let arg: Uint8Array;
20,959,311✔
495
              if (this.argBuf) {
12,046,582!
496
                arg = this.argBuf.bytes();
12,046,586✔
497
                this.argBuf = undefined;
12,046,586✔
498
              } else {
12,046,582✔
UNCOV
499
                arg = buf.subarray(this.as, i - this.drop);
20,959,638✔
UNCOV
500
              }
20,959,638✔
UNCOV
501
              this.dispatcher.push({ kind: Kind.INFO, data: arg });
83,837,244✔
UNCOV
502
              this.drop = 0;
20,959,311✔
UNCOV
503
              this.as = i + 1;
20,959,311✔
UNCOV
504
              this.state = State.OP_START;
20,959,311✔
UNCOV
505
              break;
20,959,311✔
UNCOV
506
            }
20,959,311✔
UNCOV
507
            default:
20,958,548✔
508
              if (this.argBuf) {
12,174,395!
509
                this.argBuf.writeByte(b);
12,174,755✔
510
              }
12,174,755✔
UNCOV
511
          }
20,958,548✔
UNCOV
512
          break;
20,958,548✔
513
        default:
×
514
          throw this.fail(buf.subarray(i));
×
UNCOV
515
      }
20,651,540✔
UNCOV
516
    }
20,651,540✔
517

UNCOV
518
    if (
3,327✔
UNCOV
519
      (this.state === State.MSG_ARG || this.state === State.MINUS_ERR_ARG ||
3,327✔
UNCOV
520
        this.state === State.INFO_ARG) && !this.argBuf
3,327!
UNCOV
521
    ) {
3,327!
UNCOV
522
      this.argBuf = new DenoBuffer(buf.subarray(this.as, i - this.drop));
3,369✔
UNCOV
523
    }
3,369✔
524

UNCOV
525
    if (this.state === State.MSG_PAYLOAD && !this.msgBuf) {
20,491!
UNCOV
526
      if (!this.argBuf) {
21,655✔
UNCOV
527
        this.cloneMsgArg();
22,763✔
UNCOV
528
      }
22,763✔
UNCOV
529
      this.msgBuf = new DenoBuffer(buf.subarray(this.as));
21,655✔
UNCOV
530
    }
21,655✔
UNCOV
531
  }
21,651✔
532

UNCOV
533
  cloneMsgArg() {
52✔
UNCOV
534
    const s = this.ma.subject.length;
1,214✔
UNCOV
535
    const r = this.ma.reply ? this.ma.reply.length : 0;
1,210✔
UNCOV
536
    const buf = new Uint8Array(s + r);
1,214✔
UNCOV
537
    buf.set(this.ma.subject);
1,214✔
UNCOV
538
    if (this.ma.reply) {
1,214✔
UNCOV
539
      buf.set(this.ma.reply, s);
1,366✔
UNCOV
540
    }
1,366✔
UNCOV
541
    this.argBuf = new DenoBuffer(buf);
1,214✔
UNCOV
542
    this.ma.subject = buf.subarray(0, s);
1,214✔
UNCOV
543
    if (this.ma.reply) {
1,214✔
UNCOV
544
      this.ma.reply = buf.subarray(s);
1,366✔
UNCOV
545
    }
1,366✔
UNCOV
546
  }
1,214✔
547

UNCOV
548
  processMsgArgs(arg: Uint8Array): void {
54✔
UNCOV
549
    if (this.hdr >= 0) {
396,854✔
UNCOV
550
      return this.processHeaderMsgArgs(arg);
665,775✔
UNCOV
551
    }
665,775✔
552

UNCOV
553
    const args: Uint8Array[] = [];
1,060,681✔
UNCOV
554
    let start = -1;
1,060,681✔
UNCOV
555
    for (let i = 0; i < arg.length; i++) {
664,828✔
UNCOV
556
      const b = arg[i];
17,745,073✔
UNCOV
557
      switch (b) {
17,745,073✔
UNCOV
558
        case cc.SPACE:
17,745,073✔
UNCOV
559
        case cc.TAB:
17,745,073✔
UNCOV
560
        case cc.CR:
17,745,073✔
UNCOV
561
        case cc.NL:
17,745,073✔
UNCOV
562
          if (start >= 0) {
18,542,327✔
UNCOV
563
            args.push(arg.subarray(start, i));
19,311,057✔
UNCOV
564
            start = -1;
19,311,057✔
UNCOV
565
          }
19,311,057✔
UNCOV
566
          break;
18,542,327✔
UNCOV
567
        default:
17,745,073✔
UNCOV
568
          if (start < 0) {
34,028,064✔
UNCOV
569
            start = i;
35,221,164✔
UNCOV
570
          }
35,221,164✔
UNCOV
571
      }
17,745,073✔
UNCOV
572
    }
17,745,073✔
UNCOV
573
    if (start >= 0) {
1,443,117✔
UNCOV
574
      args.push(arg.subarray(start));
1,710,864✔
UNCOV
575
    }
1,710,864✔
576

UNCOV
577
    switch (args.length) {
1,710,866✔
UNCOV
578
      case 3:
1,047,051✔
UNCOV
579
        this.ma.subject = args[0];
1,437,339✔
UNCOV
580
        this.ma.sid = this.protoParseInt(args[1]);
1,437,339✔
UNCOV
581
        this.ma.reply = undefined;
1,437,339✔
UNCOV
582
        this.ma.size = this.protoParseInt(args[2]);
1,437,339✔
UNCOV
583
        break;
1,437,339✔
UNCOV
584
      case 4:
396,854✔
UNCOV
585
        this.ma.subject = args[0];
402,411✔
UNCOV
586
        this.ma.sid = this.protoParseInt(args[1]);
402,411✔
UNCOV
587
        this.ma.reply = args[2];
402,411✔
UNCOV
588
        this.ma.size = this.protoParseInt(args[3]);
402,411✔
UNCOV
589
        break;
402,411✔
590
      default:
268,009!
591
        throw this.fail(arg, "processMsgArgs Parse Error");
268,017✔
UNCOV
592
    }
396,854✔
593

594
    if (this.ma.sid < 0) {
535,983!
595
      throw this.fail(arg, "processMsgArgs Bad or Missing Sid Error");
535,989✔
596
    }
535,989✔
597
    if (this.ma.size < 0) {
535,983!
598
      throw this.fail(arg, "processMsgArgs Bad or Missing Size Error");
535,985✔
599
    }
535,985✔
UNCOV
600
  }
396,854✔
601

602
  fail(data: Uint8Array, label = ""): Error {
33✔
603
    if (!label) {
93✔
604
      label = `parse error [${this.state}]`;
137✔
605
    } else {
93✔
606
      label = `${label} [${this.state}]`;
109✔
607
    }
109✔
608

609
    return new Error(`${label}: ${TD.decode(data)}`);
93✔
610
  }
93✔
611

UNCOV
612
  processHeaderMsgArgs(arg: Uint8Array): void {
54✔
UNCOV
613
    const args: Uint8Array[] = [];
1,001✔
UNCOV
614
    let start = -1;
1,001✔
UNCOV
615
    for (let i = 0; i < arg.length; i++) {
1,001✔
UNCOV
616
      const b = arg[i];
52,999✔
UNCOV
617
      switch (b) {
52,999✔
UNCOV
618
        case cc.SPACE:
52,999✔
UNCOV
619
        case cc.TAB:
52,999✔
UNCOV
620
        case cc.CR:
52,999✔
UNCOV
621
        case cc.NL:
52,999✔
UNCOV
622
          if (start >= 0) {
56,163✔
UNCOV
623
            args.push(arg.subarray(start, i));
57,051✔
UNCOV
624
            start = -1;
57,051✔
UNCOV
625
          }
57,051✔
UNCOV
626
          break;
56,163✔
UNCOV
627
        default:
52,999✔
UNCOV
628
          if (start < 0) {
101,833✔
UNCOV
629
            start = i;
105,943✔
UNCOV
630
          }
105,943✔
UNCOV
631
      }
52,999✔
UNCOV
632
    }
52,999✔
UNCOV
633
    if (start >= 0) {
1,001✔
UNCOV
634
      args.push(arg.subarray(start));
1,001✔
UNCOV
635
    }
1,001✔
636

UNCOV
637
    switch (args.length) {
1,001✔
UNCOV
638
      case 4:
988✔
UNCOV
639
        this.ma.subject = args[0];
1,607✔
UNCOV
640
        this.ma.sid = this.protoParseInt(args[1]);
1,607✔
UNCOV
641
        this.ma.reply = undefined;
1,607✔
UNCOV
642
        this.ma.hdr = this.protoParseInt(args[2]);
1,607✔
UNCOV
643
        this.ma.size = this.protoParseInt(args[3]);
1,607✔
UNCOV
644
        break;
1,607✔
UNCOV
645
      case 5:
994!
UNCOV
646
        this.ma.subject = args[0];
1,304✔
UNCOV
647
        this.ma.sid = this.protoParseInt(args[1]);
1,304✔
UNCOV
648
        this.ma.reply = args[2];
1,304✔
UNCOV
649
        this.ma.hdr = this.protoParseInt(args[3]);
1,304✔
UNCOV
650
        this.ma.size = this.protoParseInt(args[4]);
1,304✔
UNCOV
651
        break;
1,304✔
652
      default:
×
653
        throw this.fail(arg, "processHeaderMsgArgs Parse Error");
×
UNCOV
654
    }
1,001✔
655

656
    if (this.ma.sid < 0) {
×
657
      throw this.fail(arg, "processHeaderMsgArgs Bad or Missing Sid Error");
×
658
    }
×
659
    if (this.ma.hdr < 0 || this.ma.hdr > this.ma.size) {
×
660
      throw this.fail(
×
661
        arg,
×
662
        "processHeaderMsgArgs Bad or Missing Header Size Error",
×
663
      );
664
    }
×
665
    if (this.ma.size < 0) {
×
666
      throw this.fail(arg, "processHeaderMsgArgs Bad or Missing Size Error");
×
667
    }
×
UNCOV
668
  }
1,001✔
669

UNCOV
670
  protoParseInt(a: Uint8Array): number {
54✔
671
    if (a.length === 0) {
×
672
      return -1;
×
673
    }
×
UNCOV
674
    let n = 0;
794,585✔
UNCOV
675
    for (let i = 0; i < a.length; i++) {
794,585✔
676
      if (a[i] < ASCII_0 || a[i] > ASCII_9) {
1,843,647✔
677
        return -1;
1,843,659✔
678
      }
1,843,659✔
UNCOV
679
      n = n * 10 + (a[i] - ASCII_0);
3,796,473✔
UNCOV
680
    }
3,796,473✔
UNCOV
681
    return n;
1,330,736✔
UNCOV
682
  }
794,585✔
683
}
55✔
684

685
export const State = {
55✔
686
  OP_START: 0,
55✔
687
  OP_PLUS: 1,
55✔
688
  OP_PLUS_O: 2,
55✔
689
  OP_PLUS_OK: 3,
55✔
690
  OP_MINUS: 4,
55✔
691
  OP_MINUS_E: 5,
55✔
692
  OP_MINUS_ER: 6,
55✔
693
  OP_MINUS_ERR: 7,
55✔
694
  OP_MINUS_ERR_SPC: 8,
55✔
695
  MINUS_ERR_ARG: 9,
55✔
696
  OP_M: 10,
55✔
697
  OP_MS: 11,
55✔
698
  OP_MSG: 12,
55✔
699
  OP_MSG_SPC: 13,
55✔
700
  MSG_ARG: 14,
55✔
701
  MSG_PAYLOAD: 15,
55✔
702
  MSG_END: 16,
55✔
703
  OP_H: 17,
55✔
704
  OP_P: 18,
55✔
705
  OP_PI: 19,
55✔
706
  OP_PIN: 20,
55✔
707
  OP_PING: 21,
55✔
708
  OP_PO: 22,
55✔
709
  OP_PON: 23,
55✔
710
  OP_PONG: 24,
55✔
711
  OP_I: 25,
55✔
712
  OP_IN: 26,
55✔
713
  OP_INF: 27,
55✔
714
  OP_INFO: 28,
55✔
715
  OP_INFO_SPC: 29,
55✔
716
  INFO_ARG: 30,
55✔
717
} as const;
55✔
718

719
export type State = typeof State[keyof typeof State];
720

721
export const cc = {
55✔
722
  CR: "\r".charCodeAt(0),
55✔
723
  E: "E".charCodeAt(0),
55✔
724
  e: "e".charCodeAt(0),
55✔
725
  F: "F".charCodeAt(0),
55✔
726
  f: "f".charCodeAt(0),
55✔
727
  G: "G".charCodeAt(0),
55✔
728
  g: "g".charCodeAt(0),
55✔
729
  H: "H".charCodeAt(0),
55✔
730
  h: "h".charCodeAt(0),
55✔
731
  I: "I".charCodeAt(0),
55✔
732
  i: "i".charCodeAt(0),
55✔
733
  K: "K".charCodeAt(0),
55✔
734
  k: "k".charCodeAt(0),
55✔
735
  M: "M".charCodeAt(0),
55✔
736
  m: "m".charCodeAt(0),
55✔
737
  MINUS: "-".charCodeAt(0),
55✔
738
  N: "N".charCodeAt(0),
55✔
739
  n: "n".charCodeAt(0),
55✔
740
  NL: "\n".charCodeAt(0),
55✔
741
  O: "O".charCodeAt(0),
55✔
742
  o: "o".charCodeAt(0),
55✔
743
  P: "P".charCodeAt(0),
55✔
744
  p: "p".charCodeAt(0),
55✔
745
  PLUS: "+".charCodeAt(0),
55✔
746
  R: "R".charCodeAt(0),
55✔
747
  r: "r".charCodeAt(0),
55✔
748
  S: "S".charCodeAt(0),
55✔
749
  s: "s".charCodeAt(0),
55✔
750
  SPACE: " ".charCodeAt(0),
55✔
751
  TAB: "\t".charCodeAt(0),
55✔
752
} as const;
55✔
753

754
export type cc = typeof cc[keyof typeof cc];
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc