• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13159645374

05 Feb 2025 02:30PM UTC coverage: 82.694% (+13.1%) from 69.574%
13159645374

push

github

web-flow
Add heartbeat handling to key iteration (#203)

* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done.

Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat.

* history for kv has the same issue - if values are purged in flight, the iteration may hang.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2259 of 3072 branches covered (73.54%)

Branch coverage included in aggregate %.

4 of 22 new or added lines in 1 file covered. (18.18%)

2912 existing lines in 30 files now uncovered.

9634 of 11310 relevant lines covered (85.18%)

788186.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.07
/core/src/parser.ts
1
// deno-lint-ignore-file no-undef
2
/*
3
 * Copyright 2020-2021 The NATS Authors
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 * http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
import { DenoBuffer } from "./denobuffer.ts";
55✔
17
import { TD } from "./encoders.ts";
55✔
18
import type { Dispatcher } from "./core.ts";
19

20
export enum Kind {
110✔
21
  OK,
22
  ERR,
23
  MSG,
24
  INFO,
25
  PING,
26
  PONG,
27
}
28

29
export interface ParserEvent {
30
  kind: Kind;
31
  msg?: MsgArg;
32
  data?: Uint8Array;
33
}
34

35
export function describe(e: ParserEvent): string {
×
36
  let ks: string;
×
37
  let data = "";
×
38

39
  switch (e.kind) {
×
40
    case Kind.MSG:
×
41
      ks = "MSG";
×
42
      break;
×
43
    case Kind.OK:
×
44
      ks = "OK";
×
45
      break;
×
46
    case Kind.ERR:
×
47
      ks = "ERR";
×
48
      data = TD.decode(e.data);
×
49
      break;
×
50
    case Kind.PING:
×
51
      ks = "PING";
×
52
      break;
×
53
    case Kind.PONG:
×
54
      ks = "PONG";
×
55
      break;
×
56
    case Kind.INFO:
×
57
      ks = "INFO";
×
58
      data = TD.decode(e.data);
×
59
  }
×
60
  return `${ks}: ${data}`;
×
61
}
×
62

63
export interface MsgArg {
64
  subject: Uint8Array;
65
  reply?: Uint8Array;
66
  sid: number;
67
  hdr: number;
68
  size: number;
69
}
70

71
function newMsgArg(): MsgArg {
395,874✔
72
  const ma = {} as MsgArg;
395,874✔
73
  ma.sid = -1;
395,874✔
74
  ma.hdr = -1;
395,874✔
75
  ma.size = -1;
395,874✔
76

77
  return ma;
395,874✔
78
}
395,874✔
79

80
const ASCII_0 = 48;
55✔
81
const ASCII_9 = 57;
55✔
82

83
// This is an almost verbatim port of the Go NATS parser
84
// https://github.com/nats-io/nats.go/blob/master/parser.go
85
export class Parser {
55✔
86
  dispatcher: Dispatcher<ParserEvent>;
54✔
87
  state: State;
1,605✔
88
  as: number;
1,605✔
89
  drop: number;
1,605✔
90
  hdr: number;
1,605✔
91
  ma!: MsgArg;
1,605✔
92
  argBuf?: DenoBuffer;
1,605✔
93
  msgBuf?: DenoBuffer;
54✔
94

95
  constructor(dispatcher: Dispatcher<ParserEvent>) {
54✔
96
    this.dispatcher = dispatcher;
1,605✔
97
    this.state = State.OP_START;
1,605✔
98
    this.as = 0;
1,605✔
99
    this.drop = 0;
1,605✔
100
    this.hdr = 0;
1,605✔
101
  }
1,605✔
102

103
  parse(buf: Uint8Array): void {
54✔
104
    let i: number;
8,830✔
105
    for (i = 0; i < buf.length; i++) {
8,830✔
106
      const b = buf[i];
20,570,216✔
107
      switch (this.state) {
20,570,216✔
108
        case State.OP_START:
20,570,216✔
109
          switch (b) {
20,967,932✔
110
            case cc.M:
20,967,932✔
111
            case cc.m:
20,967,932✔
112
              this.state = State.OP_M;
21,363,170✔
113
              this.hdr = -1;
21,363,170✔
114
              this.ma = newMsgArg();
21,363,170✔
115
              break;
21,363,170✔
116
            case cc.H:
20,967,932✔
117
            case cc.h:
20,967,932✔
118
              this.state = State.OP_H;
20,968,514✔
119
              this.hdr = 0;
20,968,514✔
120
              this.ma = newMsgArg();
20,968,514✔
121
              break;
20,968,514✔
122
            case cc.P:
20,967,932✔
123
            case cc.p:
20,967,932✔
124
              this.state = State.OP_P;
20,968,994✔
125
              break;
20,968,994✔
126
            case cc.PLUS:
12,184,481✔
127
              this.state = State.OP_PLUS;
12,184,489✔
128
              break;
12,184,489✔
129
            case cc.MINUS:
12,204,456✔
130
              this.state = State.OP_MINUS;
12,204,524✔
131
              break;
12,204,524✔
132
            case cc.I:
20,967,932✔
133
            case cc.i:
20,967,932✔
134
              this.state = State.OP_I;
20,968,686✔
135
              break;
20,968,686✔
136
            default:
12,184,481✔
137
              throw this.fail(buf.subarray(i));
12,184,485✔
138
          }
20,967,932✔
139
          break;
21,236,846✔
140
        case State.OP_H:
20,570,216✔
141
          switch (b) {
20,570,798✔
142
            case cc.M:
20,570,798✔
143
            case cc.m:
20,570,798✔
144
              this.state = State.OP_M;
20,570,798✔
145
              break;
20,570,798✔
146
            default:
×
147
              throw this.fail(buf.subarray(i));
×
148
          }
20,570,798✔
149
          break;
20,570,798✔
150
        case State.OP_M:
20,570,216✔
151
          switch (b) {
20,966,036✔
152
            case cc.S:
20,966,036✔
153
            case cc.s:
20,966,036✔
154
              this.state = State.OP_MS;
21,234,016✔
155
              break;
21,234,016✔
156
            default:
12,183,545!
157
              throw this.fail(buf.subarray(i));
12,183,547✔
158
          }
20,966,036✔
159
          break;
21,234,016✔
160
        case State.OP_MS:
20,570,216✔
161
          switch (b) {
20,966,034✔
162
            case cc.G:
20,966,034✔
163
            case cc.g:
20,966,034✔
164
              this.state = State.OP_MSG;
21,234,012✔
165
              break;
21,234,012✔
166
            default:
12,183,543!
167
              throw this.fail(buf.subarray(i));
12,183,545✔
168
          }
20,966,034✔
169
          break;
21,234,012✔
170
        case State.OP_MSG:
20,570,216✔
171
          switch (b) {
20,966,032✔
172
            case cc.SPACE:
20,966,032✔
173
            case cc.TAB:
20,966,032✔
174
              this.state = State.OP_MSG_SPC;
21,234,008✔
175
              break;
21,234,008✔
176
            default:
12,183,541!
177
              throw this.fail(buf.subarray(i));
12,183,543✔
178
          }
20,966,032✔
179
          break;
21,234,008✔
180
        case State.OP_MSG_SPC:
20,570,216✔
181
          switch (b) {
20,966,030✔
182
            case cc.SPACE:
×
183
            case cc.TAB:
×
184
              continue;
×
185
            default:
20,966,030✔
186
              this.state = State.MSG_ARG;
20,966,030✔
187
              this.as = i;
20,966,030✔
188
          }
20,966,030✔
189
          break;
20,966,030✔
190
        case State.MSG_ARG:
20,570,216✔
191
          switch (b) {
38,047,218✔
192
            case cc.CR:
38,047,218✔
193
              this.drop = 1;
38,443,030✔
194
              break;
38,443,030✔
195
            case cc.NL: {
76,490,250✔
196
              const arg: Uint8Array = this.argBuf
23,745,453✔
197
                ? this.argBuf.bytes()
23,745,453✔
198
                : buf.subarray(this.as, i - this.drop);
23,745,453✔
199
              this.processMsgArgs(arg);
38,443,032✔
200
              this.drop = 0;
38,443,032✔
201
              this.as = i + 1;
38,443,032✔
202
              this.state = State.MSG_PAYLOAD;
38,443,032✔
203

204
              // jump ahead with the index. If this overruns
205
              // what is left we fall out and process a split buffer.
206
              i = this.as + this.ma.size - 1;
38,443,032✔
207
              break;
38,443,032✔
208
            }
38,443,032✔
209
            default:
38,047,218✔
210
              if (this.argBuf) {
33,572,671!
211
                this.argBuf.writeByte(b);
33,573,008✔
212
              }
33,573,008✔
213
          }
38,047,218✔
214
          break;
47,947,295✔
215
        case State.MSG_PAYLOAD:
20,570,216✔
216
          if (this.msgBuf) {
20,841,268✔
217
            if (this.msgBuf.length >= this.ma.size) {
20,844,124✔
218
              const data = this.msgBuf.bytes({ copy: false });
62,541,237✔
219
              this.dispatcher.push(
20,847,079✔
220
                { kind: Kind.MSG, msg: this.ma, data: data },
104,235,395✔
221
              );
222
              this.argBuf = undefined;
20,847,079✔
223
              this.msgBuf = undefined;
20,847,079✔
224
              this.state = State.MSG_END;
20,847,079✔
225
            } else {
20,845,914✔
226
              let toCopy = this.ma.size - this.msgBuf.length;
20,847,605✔
227
              const avail = buf.length - i;
20,847,605✔
228

229
              if (avail < toCopy) {
20,847,605✔
230
                toCopy = avail;
20,848,129✔
231
              }
20,848,129✔
232

233
              if (toCopy > 0) {
20,847,605✔
234
                this.msgBuf.write(buf.subarray(i, i + toCopy));
20,847,605✔
235
                i = (i + toCopy) - 1;
20,847,605✔
236
              } else {
×
237
                this.msgBuf.writeByte(b);
×
238
              }
×
239
            }
20,847,605✔
240
          } else if (i - this.as >= this.ma.size) {
20,841,268✔
241
            this.dispatcher.push(
21,361,244✔
242
              { kind: Kind.MSG, msg: this.ma, data: buf.subarray(this.as, i) },
106,806,220✔
243
            );
244
            this.argBuf = undefined;
21,361,244✔
245
            this.msgBuf = undefined;
21,361,244✔
246
            this.state = State.MSG_END;
21,361,244✔
247
          }
21,361,244✔
248
          break;
20,967,702✔
249
        case State.MSG_END:
20,570,216✔
250
          switch (b) {
20,966,011✔
251
            case cc.NL:
20,966,011✔
252
              this.drop = 0;
20,966,011✔
253
              this.as = i + 1;
20,966,011✔
254
              this.state = State.OP_START;
20,966,011✔
255
              break;
20,966,011✔
256
            default:
×
257
              continue;
×
258
          }
20,966,011✔
259
          break;
20,966,011✔
260
        case State.OP_PLUS:
11,915,563!
261
          switch (b) {
11,915,571✔
262
            case cc.O:
11,915,571✔
263
            case cc.o:
11,915,571✔
264
              this.state = State.OP_PLUS_O;
11,915,575✔
265
              break;
11,915,575✔
266
            default:
11,915,571✔
267
              throw this.fail(buf.subarray(i));
11,915,575✔
268
          }
11,915,571✔
269
          break;
11,915,575✔
270
        case State.OP_PLUS_O:
11,915,563!
271
          switch (b) {
11,915,567✔
272
            case cc.K:
11,915,567✔
273
            case cc.k:
11,915,567✔
274
              this.state = State.OP_PLUS_OK;
11,915,567✔
275
              break;
11,915,567✔
276
            default:
×
277
              throw this.fail(buf.subarray(i));
×
278
          }
11,915,567✔
279
          break;
11,915,567✔
280
        case State.OP_PLUS_OK:
11,915,563!
281
          switch (b) {
11,915,575✔
282
            case cc.NL:
11,915,575✔
283
              this.dispatcher.push({ kind: Kind.OK });
35,746,737✔
284
              this.drop = 0;
11,915,579✔
285
              this.state = State.OP_START;
11,915,579✔
286
              break;
11,915,579✔
287
          }
11,915,575✔
288
          break;
11,915,575✔
289
        case State.OP_MINUS:
11,935,307!
290
          switch (b) {
11,935,375✔
291
            case cc.E:
11,935,375✔
292
            case cc.e:
11,935,375✔
293
              this.state = State.OP_MINUS_E;
11,935,439✔
294
              break;
11,935,439✔
295
            default:
11,915,629!
296
              throw this.fail(buf.subarray(i));
11,915,631✔
297
          }
11,935,375✔
298
          break;
11,935,439✔
299
        case State.OP_MINUS_E:
11,935,307!
300
          switch (b) {
11,935,373✔
301
            case cc.R:
11,935,373✔
302
            case cc.r:
11,935,373✔
303
              this.state = State.OP_MINUS_ER;
11,935,435✔
304
              break;
11,935,435✔
305
            default:
11,915,627!
306
              throw this.fail(buf.subarray(i));
11,915,629✔
307
          }
11,935,373✔
308
          break;
11,935,435✔
309
        case State.OP_MINUS_ER:
11,935,307!
310
          switch (b) {
11,935,371✔
311
            case cc.R:
11,935,371✔
312
            case cc.r:
11,935,371✔
313
              this.state = State.OP_MINUS_ERR;
11,935,431✔
314
              break;
11,935,431✔
315
            default:
11,915,625!
316
              throw this.fail(buf.subarray(i));
11,915,627✔
317
          }
11,935,371✔
318
          break;
11,935,431✔
319
        case State.OP_MINUS_ERR:
11,935,307!
320
          switch (b) {
11,935,369✔
321
            case cc.SPACE:
11,935,369✔
322
            case cc.TAB:
11,935,369✔
323
              this.state = State.OP_MINUS_ERR_SPC;
11,935,427✔
324
              break;
11,935,427✔
325
            default:
11,915,623!
326
              throw this.fail(buf.subarray(i));
11,915,625✔
327
          }
11,935,369✔
328
          break;
11,935,427✔
329
        case State.OP_MINUS_ERR_SPC:
11,935,307!
330
          switch (b) {
11,935,367✔
331
            case cc.SPACE:
×
332
            case cc.TAB:
×
333
              continue;
×
334
            default:
11,935,367✔
335
              this.state = State.MINUS_ERR_ARG;
11,935,367✔
336
              this.as = i;
11,935,367✔
337
          }
11,935,367✔
338
          break;
11,935,367✔
339
        case State.MINUS_ERR_ARG:
11,935,307!
340
          switch (b) {
11,937,453✔
341
            case cc.CR:
11,937,453✔
342
              this.drop = 1;
11,937,513✔
343
              break;
11,937,513✔
344
            case cc.NL: {
23,874,966✔
345
              let arg: Uint8Array;
11,937,513✔
346
              if (this.argBuf) {
11,917,633!
347
                arg = this.argBuf.bytes();
11,917,691✔
348
                this.argBuf = undefined;
11,917,691✔
349
              } else {
11,917,689✔
350
                arg = buf.subarray(this.as, i - this.drop);
11,937,569✔
351
              }
11,937,569✔
352
              this.dispatcher.push({ kind: Kind.ERR, data: arg });
47,750,052✔
353
              this.drop = 0;
11,937,513✔
354
              this.as = i + 1;
11,937,513✔
355
              this.state = State.OP_START;
11,937,513✔
356
              break;
11,937,513✔
357
            }
11,937,513✔
358
            default:
11,937,453✔
359
              if (this.argBuf) {
11,919,471!
360
                this.argBuf.write(Uint8Array.of(b));
11,919,489✔
361
              }
11,919,489✔
362
          }
11,937,453✔
363
          break;
11,937,453✔
364
        case State.OP_P:
20,570,216✔
365
          switch (b) {
20,571,278✔
366
            case cc.I:
20,571,278✔
367
            case cc.i:
20,571,278✔
368
              this.state = State.OP_PI;
20,571,325✔
369
              break;
20,571,325✔
370
            case cc.O:
20,571,278✔
371
            case cc.o:
20,571,278✔
372
              this.state = State.OP_PO;
20,572,291✔
373
              break;
20,572,291✔
374
            default:
11,916,088!
375
              throw this.fail(buf.subarray(i));
11,916,090✔
376
          }
20,571,278✔
377
          break;
20,571,801✔
378
        case State.OP_PO:
20,570,216✔
379
          switch (b) {
20,571,229✔
380
            case cc.N:
20,571,229✔
381
            case cc.n:
20,571,229✔
382
              this.state = State.OP_PON;
20,571,732✔
383
              break;
20,571,732✔
384
            default:
11,916,068!
385
              throw this.fail(buf.subarray(i));
11,916,070✔
386
          }
20,571,229✔
387
          break;
20,571,732✔
388
        case State.OP_PON:
20,570,216✔
389
          switch (b) {
20,571,227✔
390
            case cc.G:
20,571,227✔
391
            case cc.g:
20,571,227✔
392
              this.state = State.OP_PONG;
20,571,728✔
393
              break;
20,571,728✔
394
            default:
11,916,066!
395
              throw this.fail(buf.subarray(i));
11,916,068✔
396
          }
20,571,227✔
397
          break;
20,571,728✔
398
        case State.OP_PONG:
20,570,216✔
399
          switch (b) {
20,572,234✔
400
            case cc.NL:
20,572,234✔
401
              this.dispatcher.push({ kind: Kind.PONG });
61,719,729✔
402
              this.drop = 0;
20,573,243✔
403
              this.state = State.OP_START;
20,573,243✔
404
              break;
20,573,243✔
405
          }
20,572,234✔
406
          break;
20,572,234✔
407
        case State.OP_PI:
20,570,216✔
408
          switch (b) {
20,570,263✔
409
            case cc.N:
20,570,263✔
410
            case cc.n:
20,570,263✔
411
              this.state = State.OP_PIN;
20,570,279✔
412
              break;
20,570,279✔
413
            default:
11,915,581!
414
              throw this.fail(buf.subarray(i));
11,915,583✔
415
          }
20,570,263✔
416
          break;
20,570,279✔
417
        case State.OP_PIN:
20,570,216✔
418
          switch (b) {
20,570,261✔
419
            case cc.G:
20,570,261✔
420
            case cc.g:
20,570,261✔
421
              this.state = State.OP_PING;
20,570,275✔
422
              break;
20,570,275✔
423
            default:
11,915,579!
424
              throw this.fail(buf.subarray(i));
11,915,581✔
425
          }
20,570,261✔
426
          break;
20,570,275✔
427
        case State.OP_PING:
20,570,216✔
428
          switch (b) {
20,570,318✔
429
            case cc.NL:
20,570,318✔
430
              this.dispatcher.push({ kind: Kind.PING });
61,711,083✔
431
              this.drop = 0;
20,570,361✔
432
              this.state = State.OP_START;
20,570,361✔
433
              break;
20,570,361✔
434
          }
20,570,318✔
435
          break;
20,570,318✔
436
        case State.OP_I:
20,570,216✔
437
          switch (b) {
20,570,980✔
438
            case cc.N:
20,570,980✔
439
            case cc.n:
20,570,980✔
440
              this.state = State.OP_IN;
20,571,311✔
441
              break;
20,571,311✔
442
            default:
11,915,906!
443
              throw this.fail(buf.subarray(i));
11,915,918✔
444
          }
20,570,980✔
445
          break;
20,571,311✔
446
        case State.OP_IN:
20,570,216✔
447
          switch (b) {
20,570,968✔
448
            case cc.F:
20,570,968✔
449
            case cc.f:
20,570,968✔
450
              this.state = State.OP_INF;
20,570,968✔
451
              break;
20,570,968✔
452
            default:
×
453
              throw this.fail(buf.subarray(i));
×
454
          }
20,570,968✔
455
          break;
20,570,968✔
456
        case State.OP_INF:
20,570,216✔
457
          switch (b) {
20,570,968✔
458
            case cc.O:
20,570,968✔
459
            case cc.o:
20,570,968✔
460
              this.state = State.OP_INFO;
20,570,968✔
461
              break;
20,570,968✔
462
            default:
×
463
              throw this.fail(buf.subarray(i));
×
464
          }
20,570,968✔
465
          break;
20,570,968✔
466
        case State.OP_INFO:
20,570,216✔
467
          switch (b) {
20,570,968✔
468
            case cc.SPACE:
20,570,968✔
469
            case cc.TAB:
20,570,968✔
470
              this.state = State.OP_INFO_SPC;
20,570,968✔
471
              break;
20,570,968✔
472
            default:
×
473
              throw this.fail(buf.subarray(i));
×
474
          }
20,570,968✔
475
          break;
20,570,968✔
476
        case State.OP_INFO_SPC:
20,570,216✔
477
          switch (b) {
20,570,970✔
478
            case cc.SPACE:
11,915,896!
479
            case cc.TAB:
11,915,896!
480
              continue;
11,915,898✔
481
            default:
20,570,970✔
482
              this.state = State.INFO_ARG;
20,571,301✔
483
              this.as = i;
20,571,301✔
484
          }
20,570,970✔
485
          break;
20,571,301✔
486
        case State.INFO_ARG:
20,570,216✔
487
          switch (b) {
20,868,191✔
488
            case cc.CR:
20,868,191✔
489
              this.drop = 1;
20,868,943✔
490
              break;
20,868,943✔
491
            case cc.NL: {
41,737,134✔
492
              let arg: Uint8Array;
20,868,943✔
493
              if (this.argBuf) {
12,042,800!
494
                arg = this.argBuf.bytes();
12,042,804✔
495
                this.argBuf = undefined;
12,042,804✔
496
              } else {
12,042,800✔
497
                arg = buf.subarray(this.as, i - this.drop);
20,869,270✔
498
              }
20,869,270✔
499
              this.dispatcher.push({ kind: Kind.INFO, data: arg });
83,475,772✔
500
              this.drop = 0;
20,868,943✔
501
              this.as = i + 1;
20,868,943✔
502
              this.state = State.OP_START;
20,868,943✔
503
              break;
20,868,943✔
504
            }
20,868,943✔
505
            default:
20,868,191✔
506
              if (this.argBuf) {
12,168,713!
507
                this.argBuf.writeByte(b);
12,169,073✔
508
              }
12,169,073✔
509
          }
20,868,191✔
510
          break;
20,868,191✔
511
        default:
×
512
          throw this.fail(buf.subarray(i));
×
513
      }
20,570,216✔
514
    }
20,570,216✔
515

516
    if (
3,867✔
517
      (this.state === State.MSG_ARG || this.state === State.MINUS_ERR_ARG ||
3,867✔
518
        this.state === State.INFO_ARG) && !this.argBuf
3,867!
519
    ) {
3,867!
520
      this.argBuf = new DenoBuffer(buf.subarray(this.as, i - this.drop));
3,907✔
521
    }
3,907✔
522

523
    if (this.state === State.MSG_PAYLOAD && !this.msgBuf) {
7,674!
524
      if (!this.argBuf) {
8,842✔
525
        this.cloneMsgArg();
9,953✔
526
      }
9,953✔
527
      this.msgBuf = new DenoBuffer(buf.subarray(this.as));
8,842✔
528
    }
8,842✔
529
  }
8,830✔
530

531
  cloneMsgArg() {
52✔
532
    const s = this.ma.subject.length;
1,218✔
533
    const r = this.ma.reply ? this.ma.reply.length : 0;
1,218✔
534
    const buf = new Uint8Array(s + r);
1,218✔
535
    buf.set(this.ma.subject);
1,218✔
536
    if (this.ma.reply) {
1,218✔
537
      buf.set(this.ma.reply, s);
1,372✔
538
    }
1,372✔
539
    this.argBuf = new DenoBuffer(buf);
1,218✔
540
    this.ma.subject = buf.subarray(0, s);
1,218✔
541
    if (this.ma.reply) {
1,218✔
542
      this.ma.reply = buf.subarray(s);
1,372✔
543
    }
1,372✔
544
  }
1,218✔
545

546
  processMsgArgs(arg: Uint8Array): void {
54✔
547
    if (this.hdr >= 0) {
395,868✔
548
      return this.processHeaderMsgArgs(arg);
664,424✔
549
    }
664,424✔
550

551
    const args: Uint8Array[] = [];
1,059,074✔
552
    let start = -1;
1,059,074✔
553
    for (let i = 0; i < arg.length; i++) {
663,842✔
554
      const b = arg[i];
17,707,586✔
555
      switch (b) {
17,707,586✔
556
        case cc.SPACE:
17,707,586✔
557
        case cc.TAB:
17,707,586✔
558
        case cc.CR:
17,707,586✔
559
        case cc.NL:
17,707,586✔
560
          if (start >= 0) {
18,503,532✔
561
            args.push(arg.subarray(start, i));
19,270,963✔
562
            start = -1;
19,270,963✔
563
          }
19,270,963✔
564
          break;
18,503,532✔
565
        default:
17,707,586✔
566
          if (start < 0) {
33,955,384✔
567
            start = i;
35,146,556✔
568
          }
35,146,556✔
569
      }
17,707,586✔
570
    }
17,707,586✔
571
    if (start >= 0) {
1,440,867✔
572
      args.push(arg.subarray(start));
1,708,614✔
573
    }
1,708,614✔
574

575
    switch (args.length) {
1,708,616✔
576
      case 3:
1,045,422✔
577
        this.ma.subject = args[0];
1,435,154✔
578
        this.ma.sid = this.protoParseInt(args[1]);
1,435,154✔
579
        this.ma.reply = undefined;
1,435,154✔
580
        this.ma.size = this.protoParseInt(args[2]);
1,435,154✔
581
        break;
1,435,154✔
582
      case 4:
395,868✔
583
        this.ma.subject = args[0];
401,360✔
584
        this.ma.sid = this.protoParseInt(args[1]);
401,360✔
585
        this.ma.reply = args[2];
401,360✔
586
        this.ma.size = this.protoParseInt(args[3]);
401,360✔
587
        break;
401,360✔
588
      default:
268,009!
589
        throw this.fail(arg, "processMsgArgs Parse Error");
268,017✔
590
    }
395,868✔
591

592
    if (this.ma.sid < 0) {
535,983!
593
      throw this.fail(arg, "processMsgArgs Bad or Missing Sid Error");
535,989✔
594
    }
535,989✔
595
    if (this.ma.size < 0) {
535,983!
596
      throw this.fail(arg, "processMsgArgs Bad or Missing Size Error");
535,985✔
597
    }
535,985✔
598
  }
395,868✔
599

600
  fail(data: Uint8Array, label = ""): Error {
33✔
601
    if (!label) {
93✔
602
      label = `parse error [${this.state}]`;
137✔
603
    } else {
93✔
604
      label = `${label} [${this.state}]`;
109✔
605
    }
109✔
606

607
    return new Error(`${label}: ${TD.decode(data)}`);
93✔
608
  }
93✔
609

610
  processHeaderMsgArgs(arg: Uint8Array): void {
54✔
611
    const args: Uint8Array[] = [];
636✔
612
    let start = -1;
636✔
613
    for (let i = 0; i < arg.length; i++) {
636✔
614
      const b = arg[i];
38,082✔
615
      switch (b) {
38,082✔
616
        case cc.SPACE:
38,082✔
617
        case cc.TAB:
38,082✔
618
        case cc.CR:
38,082✔
619
        case cc.NL:
38,082✔
620
          if (start >= 0) {
40,149✔
621
            args.push(arg.subarray(start, i));
41,037✔
622
            start = -1;
41,037✔
623
          }
41,037✔
624
          break;
40,149✔
625
        default:
38,082✔
626
          if (start < 0) {
73,461✔
627
            start = i;
76,109✔
628
          }
76,109✔
629
      }
38,082✔
630
    }
38,082✔
631
    if (start >= 0) {
636✔
632
      args.push(arg.subarray(start));
636✔
633
    }
636✔
634

635
    switch (args.length) {
636✔
UNCOV
636
      case 4:
625✔
UNCOV
637
        this.ma.subject = args[0];
881✔
UNCOV
638
        this.ma.sid = this.protoParseInt(args[1]);
881✔
UNCOV
639
        this.ma.reply = undefined;
881✔
UNCOV
640
        this.ma.hdr = this.protoParseInt(args[2]);
881✔
UNCOV
641
        this.ma.size = this.protoParseInt(args[3]);
881✔
UNCOV
642
        break;
881✔
643
      case 5:
629!
644
        this.ma.subject = args[0];
939✔
645
        this.ma.sid = this.protoParseInt(args[1]);
939✔
646
        this.ma.reply = args[2];
939✔
647
        this.ma.hdr = this.protoParseInt(args[3]);
939✔
648
        this.ma.size = this.protoParseInt(args[4]);
939✔
649
        break;
939✔
650
      default:
×
651
        throw this.fail(arg, "processHeaderMsgArgs Parse Error");
×
652
    }
636✔
653

654
    if (this.ma.sid < 0) {
×
655
      throw this.fail(arg, "processHeaderMsgArgs Bad or Missing Sid Error");
×
656
    }
×
657
    if (this.ma.hdr < 0 || this.ma.hdr > this.ma.size) {
×
658
      throw this.fail(
×
659
        arg,
×
660
        "processHeaderMsgArgs Bad or Missing Header Size Error",
×
661
      );
662
    }
×
663
    if (this.ma.size < 0) {
×
664
      throw this.fail(arg, "processHeaderMsgArgs Bad or Missing Size Error");
×
665
    }
×
666
  }
636✔
667

668
  protoParseInt(a: Uint8Array): number {
54✔
669
    if (a.length === 0) {
×
670
      return -1;
×
671
    }
×
672
    let n = 0;
792,248✔
673
    for (let i = 0; i < a.length; i++) {
792,248✔
674
      if (a[i] < ASCII_0 || a[i] > ASCII_9) {
1,843,646✔
675
        return -1;
1,843,658✔
676
      }
1,843,658✔
677
      n = n * 10 + (a[i] - ASCII_0);
3,789,590✔
678
    }
3,789,590✔
679
    return n;
1,328,399✔
680
  }
792,248✔
681
}
55✔
682

683
export enum State {
110✔
684
  OP_START = 0,
685
  OP_PLUS,
686
  OP_PLUS_O,
687
  OP_PLUS_OK,
688
  OP_MINUS,
689
  OP_MINUS_E,
690
  OP_MINUS_ER,
691
  OP_MINUS_ERR,
692
  OP_MINUS_ERR_SPC,
693
  MINUS_ERR_ARG,
694
  OP_M,
695
  OP_MS,
696
  OP_MSG,
697
  OP_MSG_SPC,
698
  MSG_ARG,
699
  MSG_PAYLOAD,
700
  MSG_END,
701
  OP_H,
702
  OP_P,
703
  OP_PI,
704
  OP_PIN,
705
  OP_PING,
706
  OP_PO,
707
  OP_PON,
708
  OP_PONG,
709
  OP_I,
710
  OP_IN,
711
  OP_INF,
712
  OP_INFO,
713
  OP_INFO_SPC,
714
  INFO_ARG,
715
}
716

717
enum cc {
110✔
718
  CR = "\r".charCodeAt(0),
110✔
719
  E = "E".charCodeAt(0),
110✔
720
  e = "e".charCodeAt(0),
110✔
721
  F = "F".charCodeAt(0),
110✔
722
  f = "f".charCodeAt(0),
110✔
723
  G = "G".charCodeAt(0),
110✔
724
  g = "g".charCodeAt(0),
110✔
725
  H = "H".charCodeAt(0),
110✔
726
  h = "h".charCodeAt(0),
110✔
727
  I = "I".charCodeAt(0),
110✔
728
  i = "i".charCodeAt(0),
110✔
729
  K = "K".charCodeAt(0),
110✔
730
  k = "k".charCodeAt(0),
110✔
731
  M = "M".charCodeAt(0),
110✔
732
  m = "m".charCodeAt(0),
110✔
733
  MINUS = "-".charCodeAt(0),
110✔
734
  N = "N".charCodeAt(0),
110✔
735
  n = "n".charCodeAt(0),
110✔
736
  NL = "\n".charCodeAt(0),
110✔
737
  O = "O".charCodeAt(0),
110✔
738
  o = "o".charCodeAt(0),
110✔
739
  P = "P".charCodeAt(0),
110✔
740
  p = "p".charCodeAt(0),
110✔
741
  PLUS = "+".charCodeAt(0),
110✔
742
  R = "R".charCodeAt(0),
110✔
743
  r = "r".charCodeAt(0),
110✔
744
  S = "S".charCodeAt(0),
110✔
745
  s = "s".charCodeAt(0),
110✔
746
  SPACE = " ".charCodeAt(0),
110✔
747
  TAB = "\t".charCodeAt(0),
110✔
748
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc