• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13041776921

29 Jan 2025 10:36PM UTC coverage: 82.727% (+0.02%) from 82.71%
13041776921

push

github

web-flow
fix: typo in the documentation of the list() method of the Objm class (#198)

2271 of 3098 branches covered (73.31%)

Branch coverage included in aggregate %.

9631 of 11289 relevant lines covered (85.31%)

787245.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.95
/core/src/protocol.ts
1
/*
2
 * Copyright 2018-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import { decode, Empty, encode, TE } from "./encoders.ts";
55✔
16
import type { Transport } from "./transport.ts";
17
import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts";
55✔
18
import type { Deferred, Delay, Timeout } from "./util.ts";
19
import { deferred, delay, extend, timeout } from "./util.ts";
55✔
20
import { DataBuffer } from "./databuffer.ts";
55✔
21
import type { ServerImpl } from "./servers.ts";
22
import { Servers } from "./servers.ts";
55✔
23
import { QueuedIteratorImpl } from "./queued_iterator.ts";
55✔
24
import type { MsgHdrsImpl } from "./headers.ts";
25
import { MuxSubscription } from "./muxsubscription.ts";
55✔
26
import type { PH } from "./heartbeats.ts";
27
import { Heartbeat } from "./heartbeats.ts";
55✔
28
import type { MsgArg, ParserEvent } from "./parser.ts";
29
import { Kind, Parser } from "./parser.ts";
55✔
30
import { MsgImpl } from "./msg.ts";
55✔
31
import { Features, parseSemVer } from "./semver.ts";
55✔
32
import type {
33
  ConnectionOptions,
34
  Dispatcher,
35
  Msg,
36
  Payload,
37
  Publisher,
38
  PublishOptions,
39
  Request,
40
  Server,
41
  ServerInfo,
42
  Status,
43
  Subscription,
44
  SubscriptionOptions,
45
} from "./core.ts";
46

47
import {
55✔
48
  DEFAULT_MAX_PING_OUT,
55✔
49
  DEFAULT_PING_INTERVAL,
55✔
50
  DEFAULT_RECONNECT_TIME_WAIT,
55✔
51
} from "./options.ts";
55✔
52
import { errors, InvalidArgumentError } from "./errors.ts";
55✔
53

54
import type {
55
  AuthorizationError,
56
  PermissionViolationError,
57
  UserAuthenticationExpiredError,
58
} from "./errors.ts";
59

60
const FLUSH_THRESHOLD = 1024 * 32;
55✔
61

62
export const INFO = /^INFO\s+([^\r\n]+)\r\n/i;
55✔
63

64
const PONG_CMD = encode("PONG\r\n");
55✔
65
const PING_CMD = encode("PING\r\n");
55✔
66

67
export class Connect {
55✔
68
  echo?: boolean;
54✔
69
  no_responders?: boolean;
778✔
70
  protocol: number;
778✔
71
  verbose?: boolean;
778✔
72
  pedantic?: boolean;
778✔
73
  jwt?: string;
778✔
74
  nkey?: string;
778✔
75
  sig?: string;
778✔
76
  user?: string;
778✔
77
  pass?: string;
778✔
78
  auth_token?: string;
778✔
79
  tls_required?: boolean;
778✔
80
  name?: string;
778✔
81
  lang: string;
778✔
82
  version: string;
778✔
83
  headers?: boolean;
54✔
84

85
  constructor(
54✔
86
    transport: { version: string; lang: string },
54✔
87
    opts: ConnectionOptions,
54✔
88
    nonce?: string,
54✔
89
  ) {
54✔
90
    this.protocol = 1;
778✔
91
    this.version = transport.version;
778✔
92
    this.lang = transport.lang;
778✔
93
    this.echo = opts.noEcho ? false : undefined;
×
94
    this.verbose = opts.verbose;
778✔
95
    this.pedantic = opts.pedantic;
778✔
96
    this.tls_required = opts.tls ? true : undefined;
339✔
97
    this.name = opts.name;
778✔
98

99
    const creds =
339✔
100
      (opts && typeof opts.authenticator === "function"
339✔
101
        ? opts.authenticator(nonce)
339!
102
        : {}) || {};
339✔
103
    extend(this, creds);
778✔
104
  }
778✔
105
}
55✔
106

107
class SlowNotifier {
55✔
108
  slow: number;
33✔
109
  cb: (pending: number) => void;
34✔
110
  notified: boolean;
33✔
111

112
  constructor(slow: number, cb: (pending: number) => void) {
33✔
113
    this.slow = slow;
34✔
114
    this.cb = cb;
34✔
115
    this.notified = false;
34✔
116
  }
34✔
117

118
  maybeNotify(pending: number): void {
33✔
119
    // if we are below the threshold reset the ability to notify
120
    if (pending <= this.slow) {
50✔
121
      this.notified = false;
64✔
122
    } else {
50✔
123
      if (!this.notified) {
53✔
124
        // crossed the threshold, notify and silence.
125
        this.cb(pending);
55✔
126
        this.notified = true;
55✔
127
      }
55✔
128
    }
53✔
129
  }
50✔
130
}
55✔
131

132
export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
55✔
133
  implements Subscription {
134
  sid!: number;
54✔
135
  queue?: string;
1,371✔
136
  draining: boolean;
1,371✔
137
  max?: number;
1,371✔
138
  subject: string;
1,371✔
139
  drained?: Promise<void>;
1,371✔
140
  protocol: ProtocolHandler;
1,371✔
141
  timer?: Timeout<void>;
1,371✔
142
  info?: unknown;
1,371✔
143
  cleanupFn?: (sub: Subscription, info?: unknown) => void;
1,371✔
144
  closed: Deferred<void | Error>;
1,371✔
145
  requestSubject?: string;
1,371✔
146
  slow?: SlowNotifier;
54✔
147

148
  constructor(
54✔
149
    protocol: ProtocolHandler,
54✔
150
    subject: string,
54✔
151
    opts: SubscriptionOptions = {},
54✔
152
  ) {
54✔
153
    super();
1,371✔
154
    extend(this, opts);
1,371✔
155
    this.protocol = protocol;
1,371✔
156
    this.subject = subject;
1,371✔
157
    this.draining = false;
1,371✔
158
    this.noIterator = typeof opts.callback === "function";
1,371✔
159
    this.closed = deferred<void | Error>();
1,371✔
160

161
    const asyncTraces = !(protocol.options?.noAsyncTraces || false);
1,371✔
162

163
    if (opts.timeout) {
856✔
164
      this.timer = timeout<void>(opts.timeout, asyncTraces);
885✔
165
      this.timer
×
166
        .then(() => {
×
167
          // timer was cancelled
168
          this.timer = undefined;
×
169
        })
×
170
        .catch((err) => {
×
171
          // timer fired
172
          this.stop(err);
311✔
173
          if (this.noIterator) {
311✔
174
            this.callback(err, {} as Msg);
314✔
175
          }
314✔
176
        });
307✔
177
    }
885✔
178
    if (!this.noIterator) {
1,371✔
179
      // cleanup - they used break or return from the iterator
180
      // make sure we clean up, if they didn't call unsub
181
      this.iterClosed.then((err: void | Error) => {
1,469✔
182
        this.closed.resolve(err);
1,530✔
183
        this.unsubscribe();
1,530✔
184
      });
1,469✔
185
    }
1,907✔
186
  }
1,371✔
187

188
  setSlowNotificationFn(slow: number, fn?: (pending: number) => void): void {
33✔
189
    this.slow = undefined;
34✔
190
    if (fn) {
34✔
191
      if (this.noIterator) {
×
192
        throw new Error("callbacks don't support slow notifications");
×
193
      }
×
194
      this.slow = new SlowNotifier(slow, fn);
34✔
195
    }
34✔
196
  }
34✔
197

198
  callback(err: Error | null, msg: Msg) {
52✔
199
    this.cancelTimeout();
256,322✔
200
    err ? this.stop(err) : this.push(msg);
256,296✔
201
    if (!err && this.slow) {
256,296!
202
      this.slow.maybeNotify(this.getPending());
256,313✔
203
    }
256,313✔
204
  }
256,322✔
205

206
  close(err?: Error): void {
54✔
207
    if (!this.isClosed()) {
1,496✔
208
      this.cancelTimeout();
2,805✔
209
      const fn = () => {
2,805✔
210
        this.stop();
3,631✔
211
        if (this.cleanupFn) {
×
212
          try {
×
213
            this.cleanupFn(this, this.info);
×
214
          } catch (_err) {
×
215
            // ignoring
216
          }
×
217
        }
×
218
        this.closed.resolve(err);
3,631✔
219
      };
2,805✔
220

221
      if (this.noIterator) {
2,805✔
222
        fn();
4,372✔
223
      } else {
3,594✔
224
        this.push(fn);
4,125✔
225
      }
4,125✔
226
    }
2,805✔
227
  }
1,496✔
228

229
  unsubscribe(max?: number): void {
53✔
230
    this.protocol.unsubscribe(this, max);
533✔
231
  }
533✔
232

233
  cancelTimeout(): void {
54✔
234
    if (this.timer) {
257,115✔
235
      this.timer.cancel();
257,140✔
236
      this.timer = undefined;
257,140✔
237
    }
257,140✔
238
  }
257,633✔
239

240
  drain(): Promise<void> {
53✔
241
    if (this.protocol.isClosed()) {
71✔
242
      return Promise.reject(new errors.ClosedConnectionError());
72✔
243
    }
72✔
244
    if (this.isClosed()) {
142✔
245
      return Promise.reject(
146✔
246
        new errors.InvalidOperationError("subscription is already closed"),
146✔
247
      );
248
    }
146✔
249
    if (!this.drained) {
234✔
250
      this.draining = true;
268✔
251
      this.protocol.unsub(this);
268✔
252
      this.drained = this.protocol.flush(deferred<void>())
268✔
253
        .then(() => {
268✔
254
          this.protocol.subscriptions.cancel(this);
370✔
255
        })
×
256
        .catch(() => {
×
257
          this.protocol.subscriptions.cancel(this);
×
258
        });
×
259
    }
268✔
260
    return this.drained;
269✔
261
  }
166✔
262

263
  isDraining(): boolean {
33✔
264
    return this.draining;
5,104✔
265
  }
5,104✔
266

267
  isClosed(): boolean {
54✔
268
    return this.done;
1,613✔
269
  }
1,613✔
270

271
  getSubject(): string {
51✔
272
    return this.subject;
68✔
273
  }
68✔
274

275
  getMax(): number | undefined {
×
276
    return this.max;
×
277
  }
×
278

279
  getID(): number {
33✔
280
    return this.sid;
37✔
281
  }
37✔
282
}
55✔
283

284
export class Subscriptions {
55✔
285
  mux: SubscriptionImpl | null;
54✔
286
  subs: Map<number, SubscriptionImpl>;
734✔
287
  sidCounter: number;
54✔
288

289
  constructor() {
54✔
290
    this.sidCounter = 0;
734✔
291
    this.mux = null;
734✔
292
    this.subs = new Map<number, SubscriptionImpl>();
734✔
293
  }
734✔
294

295
  size(): number {
52✔
296
    return this.subs.size;
96✔
297
  }
96✔
298

299
  add(s: SubscriptionImpl): SubscriptionImpl {
54✔
300
    this.sidCounter++;
1,371✔
301
    s.sid = this.sidCounter;
1,371✔
302
    this.subs.set(s.sid, s as SubscriptionImpl);
1,371✔
303
    return s;
1,371✔
304
  }
1,371✔
305

306
  setMux(s: SubscriptionImpl | null): SubscriptionImpl | null {
54✔
307
    this.mux = s;
486✔
308
    return s;
486✔
309
  }
486✔
310

311
  getMux(): SubscriptionImpl | null {
54✔
312
    return this.mux;
122,625✔
313
  }
122,625✔
314

315
  get(sid: number): SubscriptionImpl | undefined {
54✔
316
    return this.subs.get(sid);
395,443✔
317
  }
395,443✔
318

319
  resub(s: SubscriptionImpl): SubscriptionImpl {
51✔
320
    this.sidCounter++;
65✔
321
    this.subs.delete(s.sid);
65✔
322
    s.sid = this.sidCounter;
65✔
323
    this.subs.set(s.sid, s);
65✔
324
    return s;
65✔
325
  }
65✔
326

327
  all(): (SubscriptionImpl)[] {
54✔
328
    return Array.from(this.subs.values());
808✔
329
  }
808✔
330

331
  cancel(s: SubscriptionImpl): void {
54✔
332
    if (s) {
634✔
333
      s.close();
634✔
334
      this.subs.delete(s.sid);
634✔
335
    }
634✔
336
  }
634✔
337

338
  handleError(err: PermissionViolationError): boolean {
34✔
339
    const subs = this.all();
56✔
340
    let sub;
56✔
341
    if (err.operation === "subscription") {
56✔
342
      sub = subs.find((s) => {
85✔
343
        return s.subject === err.subject && s.queue === err.queue;
120✔
344
      });
85✔
345
    } else if (err.operation === "publish") {
70!
346
      // we have a no mux subscription
347
      sub = subs.find((s) => {
78✔
348
        return s.requestSubject === err.subject;
86✔
349
      });
78✔
350
    }
78✔
351
    if (sub) {
56✔
352
      sub.callback(err, {} as Msg);
72✔
353
      sub.close(err);
72✔
354
      this.subs.delete(sub.sid);
72✔
355
      return sub !== this.mux;
72✔
356
    }
72!
357

358
    return false;
57✔
359
  }
53✔
360

361
  close() {
54✔
362
    this.subs.forEach((sub) => {
722✔
363
      sub.close();
1,564✔
364
    });
722✔
365
  }
722✔
366
}
55✔
367

368
export class ProtocolHandler implements Dispatcher<ParserEvent> {
55✔
369
  connected: boolean;
54✔
370
  connectedOnce: boolean;
732✔
371
  infoReceived: boolean;
732✔
372
  info?: ServerInfo;
732✔
373
  muxSubscriptions: MuxSubscription;
732✔
374
  options: ConnectionOptions;
732✔
375
  outbound: DataBuffer;
732✔
376
  pongs: Array<Deferred<void>>;
732✔
377
  subscriptions: Subscriptions;
732✔
378
  transport!: Transport;
732✔
379
  noMorePublishing: boolean;
732✔
380
  connectError?: (err?: Error) => void;
732✔
381
  publisher: Publisher;
732✔
382
  _closed: boolean;
732✔
383
  closed: Deferred<Error | void>;
732✔
384
  listeners: QueuedIteratorImpl<Status>[];
732✔
385
  heartbeats: Heartbeat;
732✔
386
  parser: Parser;
732✔
387
  outMsgs: number;
732✔
388
  inMsgs: number;
732✔
389
  outBytes: number;
732✔
390
  inBytes: number;
732✔
391
  pendingLimit: number;
732✔
392
  lastError?: Error;
732✔
393
  abortReconnect: boolean;
732✔
394
  whyClosed: string;
732✔
395

396
  servers: Servers;
732✔
397
  server!: ServerImpl;
732✔
398
  features: Features;
732✔
399
  connectPromise: Promise<void> | null;
732✔
400
  dialDelay: Delay | null;
732✔
401
  raceTimer?: Timeout<void>;
54✔
402

403
  constructor(options: ConnectionOptions, publisher: Publisher) {
54✔
404
    this._closed = false;
732✔
405
    this.connected = false;
732✔
406
    this.connectedOnce = false;
732✔
407
    this.infoReceived = false;
732✔
408
    this.noMorePublishing = false;
732✔
409
    this.abortReconnect = false;
732✔
410
    this.listeners = [];
732✔
411
    this.pendingLimit = FLUSH_THRESHOLD;
732✔
412
    this.outMsgs = 0;
732✔
413
    this.inMsgs = 0;
732✔
414
    this.outBytes = 0;
732✔
415
    this.inBytes = 0;
732✔
416
    this.options = options;
732✔
417
    this.publisher = publisher;
732✔
418
    this.subscriptions = new Subscriptions();
732✔
419
    this.muxSubscriptions = new MuxSubscription();
732✔
420
    this.outbound = new DataBuffer();
732✔
421
    this.pongs = [];
732✔
422
    this.whyClosed = "";
732✔
423
    //@ts-ignore: options.pendingLimit is hidden
424
    this.pendingLimit = options.pendingLimit || this.pendingLimit;
732✔
425
    this.features = new Features({ major: 0, minor: 0, micro: 0 });
3,660✔
426
    this.connectPromise = null;
732✔
427
    this.dialDelay = null;
732✔
428

429
    const servers = typeof options.servers === "string"
×
430
      ? [options.servers]
×
431
      : options.servers;
×
432

433
    this.servers = new Servers(servers, {
732✔
434
      randomize: !options.noRandomize,
732✔
435
    });
732✔
436
    this.closed = deferred<Error | void>();
732✔
437
    this.parser = new Parser(this);
732✔
438

439
    this.heartbeats = new Heartbeat(
×
440
      this as PH,
×
441
      this.options.pingInterval || DEFAULT_PING_INTERVAL,
×
442
      this.options.maxPingOut || DEFAULT_MAX_PING_OUT,
×
443
    );
444
  }
732✔
445

446
  resetOutbound(): void {
54✔
447
    this.outbound.reset();
848✔
448
    const pongs = this.pongs;
848✔
449
    this.pongs = [];
848✔
450
    // reject the pongs - the disconnect from here shouldn't have a trace
451
    // because that confuses API consumers
452
    const err = new errors.RequestError("connection disconnected");
848✔
453
    err.stack = "";
848✔
454
    pongs.forEach((p) => {
708✔
455
      p.reject(err);
793✔
456
    });
708✔
457
    this.parser = new Parser(this);
848✔
458
    this.infoReceived = false;
848✔
459
  }
848✔
460

461
  dispatchStatus(status: Status): void {
54✔
462
    this.listeners.forEach((q) => {
1,621✔
463
      q.push(status);
1,881✔
464
    });
1,621✔
465
  }
1,757✔
466

467
  private prepare(): Deferred<void> {
54✔
468
    if (this.transport) {
780!
469
      this.transport.discard();
896✔
470
    }
896✔
471
    this.info = undefined;
848✔
472
    this.resetOutbound();
848✔
473

474
    const pong = deferred<void>();
848✔
475
    pong.catch(() => {
708✔
476
      // provide at least one catch - as pong rejection can happen before it is expected
477
    });
708✔
478
    this.pongs.unshift(pong);
848✔
479

480
    this.connectError = (err?: Error) => {
407✔
481
      pong.reject(err);
442✔
482
    };
407✔
483

484
    this.transport = newTransport();
848✔
485
    this.transport.closed()
780✔
486
      .then(async (_err?) => {
780✔
487
        this.connected = false;
839✔
488
        if (!this.isClosed()) {
839✔
489
          // if the transport gave an error use that, otherwise
490
          // we may have received a protocol error
491
          await this.disconnected(this.transport.closeError || this.lastError);
888✔
492
          return;
888✔
493
        }
888✔
494
      });
780✔
495

496
    return pong;
848✔
497
  }
848✔
498

499
  public disconnect(): void {
33✔
500
    this.dispatchStatus({ type: "staleConnection" });
129✔
501
    this.transport.disconnect();
43✔
502
  }
43✔
503

504
  public reconnect(): Promise<void> {
33✔
505
    if (this.connected) {
44✔
506
      this.dispatchStatus({
44✔
507
        type: "forceReconnect",
44✔
508
      });
44✔
509
      this.transport.disconnect();
44✔
510
    }
44✔
511
    return Promise.resolve();
44✔
512
  }
44✔
513

514
  async disconnected(err?: Error): Promise<void> {
52✔
515
    this.dispatchStatus(
104✔
516
      {
104✔
517
        type: "disconnect",
104✔
518
        server: this.servers.getCurrentServer().toString(),
104✔
519
      },
104✔
520
    );
521
    if (this.options.reconnect) {
104✔
522
      await this.dialLoop()
192✔
523
        .then(() => {
192✔
524
          this.dispatchStatus(
225✔
525
            {
225✔
526
              type: "reconnect",
225✔
527
              server: this.servers.getCurrentServer().toString(),
225✔
528
            },
225✔
529
          );
530
          // if we are here we reconnected, but we have an authentication
531
          // that expired, we need to clean it up, otherwise we'll queue up
532
          // two of these, and the default for the client will be to
533
          // close, rather than attempt again - possibly they have an
534
          // authenticator that dynamically updates
535
          if (this.lastError instanceof errors.UserAuthenticationExpiredError) {
200!
536
            this.lastError = undefined;
204✔
537
          }
204✔
538
        })
170✔
539
        .catch((err) => {
170✔
540
          this.close(err).catch();
180✔
541
        });
170✔
542
    } else {
130!
543
      await this.close(err).catch();
139✔
544
    }
139✔
545
  }
104✔
546

547
  async dial(srv: Server): Promise<void> {
54✔
548
    const pong = this.prepare();
848✔
549
    try {
848✔
550
      this.raceTimer = timeout(this.options.timeout || 20000);
848✔
551
      const cp = this.transport.connect(srv, this.options);
848✔
552
      await Promise.race([cp, this.raceTimer]);
3,392✔
553
      (async () => {
1,430✔
554
        try {
2,149✔
555
          for await (const b of this.transport) {
2,149✔
556
            this.parser.parse(b);
10,129✔
557
          }
10,129✔
558
        } catch (err) {
×
559
          console.log("reader closed", err);
×
560
        }
×
561
      })().then();
1,430✔
562
    } catch (err) {
708!
563
      pong.reject(err);
782✔
564
    }
782✔
565

566
    try {
1,221✔
567
      await Promise.race([this.raceTimer, pong]);
4,884✔
568
      this.raceTimer?.cancel();
848✔
569
      this.connected = true;
848✔
570
      this.connectError = undefined;
848✔
571
      this.sendSubscriptions();
848✔
572
      this.connectedOnce = true;
848✔
573
      this.server.didConnect = true;
848✔
574
      this.server.reconnects = 0;
848✔
575
      this.flushPending();
848✔
576
      this.heartbeats.start();
848✔
577
    } catch (err) {
708!
578
      this.raceTimer?.cancel();
808✔
579
      await this.transport.close(err as Error);
808✔
580
      throw err;
808✔
581
    }
808✔
582
  }
848✔
583

584
  async _doDial(srv: Server): Promise<void> {
54✔
585
    const { resolve } = this.options;
838✔
586
    const alts = await srv.resolve({
838✔
587
      fn: getResolveFn(),
838✔
588
      debug: this.options.debug,
838✔
589
      randomize: !this.options.noRandomize,
838✔
590
      resolve,
838✔
591
    });
838✔
592

593
    let lastErr: Error | null = null;
838✔
594
    for (const a of alts) {
838✔
595
      try {
1,212✔
596
        lastErr = null;
1,212✔
597
        this.dispatchStatus(
1,212✔
598
          { type: "reconnecting" },
3,636✔
599
        );
600
        await this.dial(a);
1,212✔
601
        // if here we connected
602
        return;
1,768✔
603
      } catch (err) {
1,072!
604
        lastErr = err as Error;
1,173✔
605
      }
1,173✔
606
    }
1,212!
607
    // if we are here, we failed, and we have no additional
608
    // alternatives for this server
609
    throw lastErr;
789✔
610
  }
698✔
611

612
  dialLoop(): Promise<void> {
54✔
613
    if (this.connectPromise === null) {
775✔
614
      this.connectPromise = this.dodialLoop();
1,072✔
615
      this.connectPromise
633✔
616
        .then(() => {})
633✔
617
        .catch(() => {})
633✔
618
        .finally(() => {
633✔
619
          this.connectPromise = null;
1,787✔
620
        });
1,072✔
621
    }
1,072✔
622
    return this.connectPromise;
775✔
623
  }
775✔
624

625
  async dodialLoop(): Promise<void> {
54✔
626
    let lastError: Error | undefined;
769✔
627
    while (true) {
769✔
628
      if (this._closed) {
863!
629
        // if we are disconnected, and close is called, the client
630
        // still tries to reconnect - to match the reconnect policy
631
        // in the case of close, want to stop.
632
        this.servers.clear();
867✔
633
      }
867✔
634
      const wait = this.options.reconnectDelayHandler
×
635
        ? this.options.reconnectDelayHandler()
×
636
        : DEFAULT_RECONNECT_TIME_WAIT;
×
637
      let maxWait = wait;
1,663✔
638
      const srv = this.selectServer();
1,663✔
639
      if (!srv || this.abortReconnect) {
863!
640
        if (lastError) {
885✔
641
          throw lastError;
905✔
642
        } else if (this.lastError) {
×
643
          throw this.lastError;
×
644
        } else {
×
645
          throw new errors.ConnectionError("connection refused");
887✔
646
        }
887✔
647
      }
885✔
648
      const now = Date.now();
2,174✔
649
      if (srv.lastConnect === 0 || srv.lastConnect + wait <= now) {
1,595!
650
        srv.lastConnect = Date.now();
2,381✔
651
        try {
2,381✔
652
          await this._doDial(srv);
2,381✔
653
          break;
2,937✔
654
        } catch (err) {
2,097!
655
          lastError = err as Error;
2,188✔
656
          if (!this.connectedOnce) {
1,316!
657
            if (this.options.waitOnFirstConnect) {
1,351✔
658
              continue;
1,373✔
659
            }
1,373✔
660
            this.servers.removeCurrentServer();
1,364✔
661
          }
1,364✔
662
          srv.reconnects++;
2,255✔
663
          const mra = this.options.maxReconnectAttempts || 0;
×
664
          if (mra !== -1 && srv.reconnects >= mra) {
1,316!
665
            this.servers.removeCurrentServer();
1,323✔
666
          }
1,323✔
667
        }
2,188✔
668
      } else {
1,595!
669
        maxWait = Math.min(maxWait, srv.lastConnect + wait - now);
1,749✔
670
        this.dialDelay = delay(maxWait);
1,749✔
671
        await this.dialDelay;
1,749✔
672
      }
1,749✔
673
    }
1,663✔
674
  }
769✔
675

676
  public static async connect(
54✔
677
    options: ConnectionOptions,
54✔
678
    publisher: Publisher,
54✔
679
  ): Promise<ProtocolHandler> {
54✔
680
    const h = new ProtocolHandler(options, publisher);
732✔
681
    await h.dialLoop();
732✔
682
    return h;
983✔
683
  }
732✔
684

685
  static toError(s: string): Error {
34✔
686
    let err: Error | null = errors.PermissionViolationError.parse(s);
90✔
687
    if (err) {
90✔
688
      return err;
110✔
689
    }
110!
690
    err = errors.UserAuthenticationExpiredError.parse(s);
170✔
691
    if (err) {
136✔
692
      return err;
141✔
693
    }
141✔
694
    err = errors.AuthorizationError.parse(s);
196✔
695
    if (err) {
167✔
696
      return err;
192✔
697
    }
192✔
698
    return new errors.ProtocolError(s);
140✔
699
  }
87✔
700

701
  processMsg(msg: MsgArg, data: Uint8Array) {
54✔
702
    this.inMsgs++;
395,436✔
703
    this.inBytes += data.length;
395,436✔
704
    if (!this.subscriptions.sidCounter) {
×
705
      return;
×
706
    }
×
707

708
    const sub = this.subscriptions.get(msg.sid) as SubscriptionImpl;
395,436✔
709
    if (!sub) {
382,212!
710
      return;
382,231✔
711
    }
382,231✔
712
    sub.received += 1;
777,577✔
713

714
    if (sub.callback) {
777,577✔
715
      sub.callback(null, new MsgImpl(msg, data, this));
777,577✔
716
    }
777,577✔
717

718
    if (sub.max !== undefined && sub.received >= sub.max) {
382,212!
719
      sub.unsubscribe();
382,257✔
720
    }
382,257✔
721
  }
395,436✔
722

723
  processError(m: Uint8Array) {
34✔
724
    let s = decode(m);
90✔
725
    if (s.startsWith("'") && s.endsWith("'")) {
90✔
726
      s = s.slice(1, s.length - 1);
90✔
727
    }
90✔
728
    const err = ProtocolHandler.toError(s);
90✔
729

730
    switch (err.constructor) {
90✔
731
      case errors.PermissionViolationError: {
200✔
732
        const pe = err as PermissionViolationError;
110✔
733
        const mux = this.subscriptions.getMux();
110✔
734
        const isMuxPermission = mux ? pe.subject === mux.subject : false;
107!
735
        this.subscriptions.handleError(pe);
110✔
736
        this.muxSubscriptions.handleError(isMuxPermission, pe);
110✔
737
        if (isMuxPermission) {
107!
738
          // remove the permission - enable it to be recreated
739
          this.subscriptions.setMux(null);
108✔
740
        }
108✔
741
      }
110✔
742
    }
90✔
743

744
    this.dispatchStatus({ type: "error", error: err });
360✔
745
    this.handleError(err);
90✔
746
  }
90✔
747

748
  handleError(err: Error) {
34✔
749
    if (
87✔
750
      err instanceof errors.UserAuthenticationExpiredError ||
87✔
751
      err instanceof errors.AuthorizationError
87✔
752
    ) {
87!
753
      this.handleAuthError(err);
117✔
754
    }
117✔
755

756
    if (!(err instanceof errors.PermissionViolationError)) {
87!
757
      this.lastError = err;
121✔
758
    }
121✔
759
  }
90✔
760

761
  handleAuthError(err: UserAuthenticationExpiredError | AuthorizationError) {
33✔
762
    if (
63✔
763
      (this.lastError instanceof errors.UserAuthenticationExpiredError ||
63✔
764
        this.lastError instanceof errors.AuthorizationError) &&
63✔
765
      this.options.ignoreAuthErrorAbort === false
63✔
766
    ) {
63✔
767
      this.abortReconnect = true;
67✔
768
    }
67✔
769
    if (this.connectError) {
63✔
770
      this.connectError(err);
87✔
771
    } else {
63✔
772
      this.disconnect();
69✔
773
    }
69✔
774
  }
63✔
775

776
  processPing() {
53✔
777
    this.transport.send(PONG_CMD);
90✔
778
  }
90✔
779

780
  processPong() {
54✔
781
    const cb = this.pongs.shift();
1,062✔
782
    if (cb) {
1,062✔
783
      cb.resolve();
1,062✔
784
    }
1,062✔
785
  }
1,062✔
786

787
  processInfo(m: Uint8Array) {
54✔
788
    const info = JSON.parse(decode(m));
791✔
789
    this.info = info;
791✔
790
    const updates = this.options && this.options.ignoreClusterUpdates
350!
791
      ? undefined
350✔
792
      : this.servers.update(info, this.transport.isEncrypted());
350✔
793
    if (!this.infoReceived) {
791✔
794
      this.features.update(parseSemVer(info.version));
1,373✔
795
      this.infoReceived = true;
1,373✔
796
      if (this.transport.isEncrypted()) {
1,233!
797
        this.servers.updateTLSName();
1,253✔
798
      }
1,253✔
799
      // send connect
800
      const { version, lang } = this.transport;
1,373✔
801
      try {
1,373✔
802
        const c = new Connect(
1,373✔
803
          { version, lang },
5,492✔
804
          this.options,
1,373✔
805
          info.nonce,
1,373✔
806
        );
807

808
        if (info.headers) {
1,373✔
809
          c.headers = true;
1,970✔
810
          c.no_responders = true;
1,970✔
811
        }
1,970✔
812
        const cs = JSON.stringify(c);
1,673✔
813
        this.transport.send(
1,673✔
814
          encode(`CONNECT ${cs}${CR_LF}`),
1,673✔
815
        );
816
        this.transport.send(PING_CMD);
1,673✔
817
      } catch (err) {
651!
818
        // if we are dying here, this is likely some an authenticator blowing up
819
        this.close(err as Error).catch();
652✔
820
      }
652✔
821
    }
1,373✔
822
    if (updates) {
791✔
823
      const { added, deleted } = updates;
1,106✔
824

825
      this.dispatchStatus({ type: "update", added, deleted });
5,530✔
826
    }
1,106✔
827
    const ldm = info.ldm !== undefined ? info.ldm : false;
350!
828
    if (ldm) {
350!
829
      this.dispatchStatus(
351✔
830
        {
351✔
831
          type: "ldm",
351✔
832
          server: this.servers.getCurrentServer().toString(),
351✔
833
        },
351✔
834
      );
835
    }
351✔
836
  }
791✔
837

838
  push(e: ParserEvent): void {
54✔
839
    switch (e.kind) {
397,274✔
840
      case Kind.MSG: {
1,189,930✔
841
        const { msg, data } = e;
792,656✔
842
        this.processMsg(msg!, data!);
792,656✔
843
        break;
792,656✔
844
      }
792,656✔
845
      case Kind.OK:
×
846
        break;
×
847
      case Kind.ERR:
268,688!
848
        this.processError(e.data!);
268,744✔
849
        break;
268,744✔
850
      case Kind.PING:
384,078!
851
        this.processPing();
384,115✔
852
        break;
384,115✔
853
      case Kind.PONG:
397,274✔
854
        this.processPong();
398,282✔
855
        break;
398,282✔
856
      case Kind.INFO:
397,274✔
857
        this.processInfo(e.data!);
398,011✔
858
        break;
398,011✔
859
    }
397,274✔
860
  }
397,274✔
861

862
  sendCommand(cmd: string | Uint8Array, ...payloads: Uint8Array[]) {
54✔
863
    const len = this.outbound.length();
398,641✔
864
    let buf: Uint8Array;
398,641✔
865
    if (typeof cmd === "string") {
398,641✔
866
      buf = encode(cmd);
398,641✔
867
    } else {
×
868
      buf = cmd as Uint8Array;
×
869
    }
×
870
    this.outbound.fill(buf, ...payloads);
398,641✔
871

872
    if (len === 0) {
398,641✔
873
      queueMicrotask(() => {
685,157✔
874
        this.flushPending();
703,644✔
875
      });
685,157✔
876
    } else if (this.outbound.size() >= this.pendingLimit) {
665,271✔
877
      // flush inline
878
      this.flushPending();
1,055,564✔
879
    }
1,055,564✔
880
  }
398,641✔
881

882
  publish(
54✔
883
    subject: string,
54✔
884
    payload: Payload = Empty,
54✔
885
    options?: PublishOptions,
54✔
886
  ): void {
54✔
887
    let data;
396,717✔
888
    if (payload instanceof Uint8Array) {
396,717✔
889
      data = payload;
791,523✔
890
    } else if (typeof payload === "string") {
396,717✔
891
      data = TE.encode(payload);
398,574✔
892
    } else {
×
893
      throw new TypeError(
×
894
        "payload types can be strings or Uint8Array",
×
895
      );
896
    }
×
897

898
    let len = data.length;
396,717✔
899
    options = options || {};
383,764!
900
    options.reply = options.reply || "";
383,764!
901

902
    let headers = Empty;
396,717✔
903
    let hlen = 0;
396,717✔
904
    if (options.headers) {
396,717✔
905
      if (this.info && !this.info.headers) {
×
906
        InvalidArgumentError.format(
×
907
          "headers",
×
908
          "are not available on this server",
×
909
        );
910
      }
×
911
      const hdrs = options.headers as MsgHdrsImpl;
516,675✔
912
      headers = hdrs.encode();
516,675✔
913
      hlen = headers.length;
516,675✔
914
      len = data.length + hlen;
516,675✔
915
    }
516,675✔
916

917
    if (this.info && len > this.info.max_payload) {
267,635!
918
      throw InvalidArgumentError.format("payload", "max_payload size exceeded");
267,638✔
919
    }
267,638✔
920
    this.outBytes += len;
664,316✔
921
    this.outMsgs++;
664,316✔
922

923
    let proto: string;
664,316✔
924
    if (options.headers) {
396,717✔
925
      if (options.reply) {
516,576!
926
        proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}\r\n`;
516,579✔
927
      } else {
267,645!
928
        proto = `HPUB ${subject} ${hlen} ${len}\r\n`;
267,751✔
929
      }
267,751✔
930
      this.sendCommand(proto, headers, data, CRLF);
516,675✔
931
    } else {
396,717✔
932
      if (options.reply) {
673,419✔
933
        proto = `PUB ${subject} ${options.reply} ${len}\r\n`;
944,418✔
934
      } else {
927,805!
935
        proto = `PUB ${subject} ${len}\r\n`;
1,200,755✔
936
      }
1,200,755✔
937
      this.sendCommand(proto, data, CRLF);
673,419✔
938
    }
673,419✔
939
  }
396,717✔
940

941
  request(r: Request): Request {
54✔
942
    this.initMux();
122,596✔
943
    this.muxSubscriptions.add(r);
122,596✔
944
    return r;
122,596✔
945
  }
122,596✔
946

947
  subscribe(s: SubscriptionImpl): Subscription {
54✔
948
    this.subscriptions.add(s);
1,369✔
949
    this._subunsub(s);
1,369✔
950
    return s;
1,369✔
951
  }
1,369✔
952

953
  _sub(s: SubscriptionImpl): void {
54✔
954
    if (s.queue) {
1,196!
955
      this.sendCommand(`SUB ${s.subject} ${s.queue} ${s.sid}\r\n`);
1,265✔
956
    } else {
1,196✔
957
      this.sendCommand(`SUB ${s.subject} ${s.sid}\r\n`);
2,458✔
958
    }
2,458✔
959
  }
1,383✔
960

961
  _subunsub(s: SubscriptionImpl): SubscriptionImpl {
54✔
962
    this._sub(s);
1,369✔
963
    if (s.max) {
983!
964
      this.unsubscribe(s, s.max);
1,044✔
965
    }
1,044✔
966
    return s;
1,369✔
967
  }
1,369✔
968

969
  unsubscribe(s: SubscriptionImpl, max?: number): void {
53✔
970
    this.unsub(s, max);
594✔
971
    if (s.max === undefined || s.received >= s.max) {
573!
972
      this.subscriptions.cancel(s);
1,050✔
973
    }
1,050✔
974
  }
594✔
975

976
  unsub(s: SubscriptionImpl, max?: number): void {
54✔
977
    if (!s || this.isClosed()) {
647✔
978
      return;
1,000✔
979
    }
1,000✔
980
    if (max) {
626!
981
      this.sendCommand(`UNSUB ${s.sid} ${max}\r\n`);
894✔
982
    } else {
830✔
983
      this.sendCommand(`UNSUB ${s.sid}\r\n`);
1,646✔
984
    }
1,646✔
985
    s.max = max;
1,238✔
986
  }
716✔
987

988
  resub(s: SubscriptionImpl, subject: string): void {
51✔
989
    if (!s || this.isClosed()) {
×
990
      return;
×
991
    }
×
992
    this.unsub(s);
65✔
993
    s.subject = subject;
65✔
994
    this.subscriptions.resub(s);
65✔
995
    // we don't auto-unsub here because we don't
996
    // really know "processed"
997
    this._sub(s);
65✔
998
  }
65✔
999

1000
  flush(p?: Deferred<void>): Promise<void> {
53✔
1001
    if (!p) {
312✔
1002
      p = deferred<void>();
527✔
1003
    }
527✔
1004
    this.pongs.push(p);
381✔
1005
    this.outbound.fill(PING_CMD);
381✔
1006
    this.flushPending();
381✔
1007
    return p;
381✔
1008
  }
381✔
1009

1010
  sendSubscriptions(): void {
54✔
1011
    const cmds: string[] = [];
747✔
1012
    this.subscriptions.all().forEach((s) => {
679✔
1013
      const sub = s as SubscriptionImpl;
691✔
1014
      if (sub.queue) {
×
1015
        cmds.push(`SUB ${sub.subject} ${sub.queue} ${sub.sid}${CR_LF}`);
×
1016
      } else {
×
1017
        cmds.push(`SUB ${sub.subject} ${sub.sid}${CR_LF}`);
691✔
1018
      }
691✔
1019
    });
679✔
1020
    if (cmds.length) {
679!
1021
      this.transport.send(encode(cmds.join("")));
684✔
1022
    }
684✔
1023
  }
747✔
1024

1025
  async close(err?: Error): Promise<void> {
54✔
1026
    if (this._closed) {
602!
1027
      return;
883✔
1028
    }
883✔
1029
    this.whyClosed = new Error("close trace").stack || "";
×
1030
    this.heartbeats.cancel();
741✔
1031
    if (this.connectError) {
301!
1032
      this.connectError(err);
312✔
1033
      this.connectError = undefined;
312✔
1034
    }
312✔
1035
    this.muxSubscriptions.close();
1,273✔
1036
    this.subscriptions.close();
1,273✔
1037
    const proms = [];
1,273✔
1038
    for (let i = 0; i < this.listeners.length; i++) {
673!
1039
      const qi = this.listeners[i];
734✔
1040
      if (qi) {
734✔
1041
        qi.stop();
734✔
1042
        proms.push(qi.iterClosed);
734✔
1043
      }
734✔
1044
    }
734✔
1045
    if (proms.length) {
673!
1046
      await Promise.all(proms);
732✔
1047
    }
732✔
1048
    this._closed = true;
1,273✔
1049
    await this.transport.close(err);
1,273✔
1050
    this.raceTimer?.cancel();
741✔
1051
    this.dialDelay?.cancel();
673!
1052
    this.closed.resolve(err);
741✔
1053
  }
741✔
1054

1055
  isClosed(): boolean {
54✔
1056
    return this._closed;
641,320✔
1057
  }
641,320✔
1058

1059
  async drain(): Promise<void> {
51✔
1060
    const subs = this.subscriptions.all();
66✔
1061
    const promises: Promise<void>[] = [];
66✔
1062
    subs.forEach((sub: Subscription) => {
66✔
1063
      promises.push(sub.drain());
90✔
1064
    });
66✔
1065
    try {
66✔
1066
      await Promise.allSettled(promises);
66✔
1067
    } catch {
×
1068
      // nothing we can do here
1069
    } finally {
×
1070
      this.noMorePublishing = true;
66✔
1071
      await this.flush();
66!
1072
    }
46✔
1073
    return this.close();
46✔
1074
  }
46✔
1075

1076
  private flushPending() {
54✔
1077
    if (!this.infoReceived || !this.connected) {
18,478!
1078
      return;
18,481✔
1079
    }
18,481✔
1080

1081
    if (this.outbound.size()) {
30,131✔
1082
      const d = this.outbound.drain();
48,690✔
1083
      this.transport.send(d);
48,690✔
1084
    }
48,690✔
1085
  }
30,131✔
1086

1087
  private initMux(): void {
54✔
1088
    const mux = this.subscriptions.getMux();
122,596✔
1089
    if (!mux) {
122,596✔
1090
      const inbox = this.muxSubscriptions.init(
123,027✔
1091
        this.options.inboxPrefix,
123,027✔
1092
      );
1093
      // dot is already part of mux
1094
      const sub = new SubscriptionImpl(this, `${inbox}*`);
123,027✔
1095
      sub.callback = this.muxSubscriptions.dispatcher();
123,027✔
1096
      this.subscriptions.setMux(sub);
123,027✔
1097
      this.subscribe(sub);
123,027✔
1098
    }
123,027✔
1099
  }
122,596✔
1100

1101
  private selectServer(): ServerImpl | undefined {
54✔
1102
    const server = this.servers.selectServer();
1,014✔
1103
    if (server === undefined) {
566!
1104
      return undefined;
585✔
1105
    }
585✔
1106
    // Place in client context.
1107
    this.server = server;
1,528✔
1108
    return this.server;
1,528✔
1109
  }
1,014✔
1110

1111
  getServer(): ServerImpl | undefined {
×
1112
    return this.server;
×
1113
  }
×
1114
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc