• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.34
/core/src/protocol.ts
1
/*
2
 * Copyright 2018-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import { decode, Empty, encode, TE } from "./encoders.ts";
55✔
16
import type { Transport } from "./transport.ts";
17
import { CR_LF, CRLF, getResolveFn, newTransport } from "./transport.ts";
55✔
18
import type { Deferred, Delay, Timeout } from "./util.ts";
19
import { deferred, delay, extend, timeout } from "./util.ts";
55✔
20
import { DataBuffer } from "./databuffer.ts";
55✔
21
import type { ServerImpl } from "./servers.ts";
22
import { Servers } from "./servers.ts";
55✔
23
import { QueuedIteratorImpl } from "./queued_iterator.ts";
55✔
24
import type { MsgHdrsImpl } from "./headers.ts";
25
import { MuxSubscription } from "./muxsubscription.ts";
55✔
26
import type { PH } from "./heartbeats.ts";
27
import { Heartbeat } from "./heartbeats.ts";
55✔
28
import type { MsgArg, ParserEvent } from "./parser.ts";
29
import { Kind, Parser } from "./parser.ts";
55✔
30
import { MsgImpl } from "./msg.ts";
55✔
31
import { Features, parseSemVer } from "./semver.ts";
55✔
32
import type {
33
  ConnectionOptions,
34
  Dispatcher,
35
  Msg,
36
  Payload,
37
  Publisher,
38
  PublishOptions,
39
  Request,
40
  Server,
41
  ServerInfo,
42
  Status,
43
  Subscription,
44
  SubscriptionOptions,
45
} from "./core.ts";
46

47
import {
55✔
48
  DEFAULT_MAX_PING_OUT,
55✔
49
  DEFAULT_PING_INTERVAL,
55✔
50
  DEFAULT_RECONNECT_TIME_WAIT,
55✔
51
} from "./options.ts";
55✔
52
import { errors, InvalidArgumentError } from "./errors.ts";
55✔
53

54
import type {
55
  AuthorizationError,
56
  PermissionViolationError,
57
  UserAuthenticationExpiredError,
58
} from "./errors.ts";
59

60
const FLUSH_THRESHOLD = 1024 * 32;
55✔
61

62
export const INFO = /^INFO\s+([^\r\n]+)\r\n/i;
55✔
63

64
const PONG_CMD = encode("PONG\r\n");
55✔
65
const PING_CMD = encode("PING\r\n");
55✔
66

67
export class Connect {
55✔
UNCOV
68
  echo?: boolean;
54✔
UNCOV
69
  no_responders?: boolean;
790✔
UNCOV
70
  protocol: number;
790✔
UNCOV
71
  verbose?: boolean;
790✔
UNCOV
72
  pedantic?: boolean;
790✔
UNCOV
73
  jwt?: string;
790✔
UNCOV
74
  nkey?: string;
790✔
UNCOV
75
  sig?: string;
790✔
UNCOV
76
  user?: string;
790✔
UNCOV
77
  pass?: string;
790✔
UNCOV
78
  auth_token?: string;
790✔
UNCOV
79
  tls_required?: boolean;
790✔
UNCOV
80
  name?: string;
790✔
UNCOV
81
  lang: string;
790✔
UNCOV
82
  version: string;
790✔
UNCOV
83
  headers?: boolean;
54✔
84

UNCOV
85
  constructor(
54✔
UNCOV
86
    transport: { version: string; lang: string },
54✔
UNCOV
87
    opts: ConnectionOptions,
54✔
UNCOV
88
    nonce?: string,
54✔
UNCOV
89
  ) {
54✔
UNCOV
90
    this.protocol = 1;
790✔
UNCOV
91
    this.version = transport.version;
790✔
UNCOV
92
    this.lang = transport.lang;
790✔
93
    this.echo = opts.noEcho ? false : undefined;
×
UNCOV
94
    this.verbose = opts.verbose;
790✔
UNCOV
95
    this.pedantic = opts.pedantic;
790✔
96
    this.tls_required = opts.tls ? true : undefined;
339✔
UNCOV
97
    this.name = opts.name;
790✔
98

99
    const creds =
339✔
100
      (opts && typeof opts.authenticator === "function"
339✔
101
        ? opts.authenticator(nonce)
339!
102
        : {}) || {};
339✔
UNCOV
103
    extend(this, creds);
790✔
UNCOV
104
  }
790✔
105
}
55✔
106

107
class SlowNotifier {
55✔
108
  slow: number;
33✔
109
  cb: (pending: number) => void;
34✔
110
  notified: boolean;
33✔
111

112
  constructor(slow: number, cb: (pending: number) => void) {
33✔
113
    this.slow = slow;
34✔
114
    this.cb = cb;
34✔
115
    this.notified = false;
34✔
116
  }
34✔
117

118
  maybeNotify(pending: number): void {
33✔
119
    // if we are below the threshold reset the ability to notify
120
    if (pending <= this.slow) {
50✔
121
      this.notified = false;
64✔
122
    } else {
50✔
123
      if (!this.notified) {
53✔
124
        // crossed the threshold, notify and silence.
125
        this.cb(pending);
55✔
126
        this.notified = true;
55✔
127
      }
55✔
128
    }
53✔
129
  }
50✔
130
}
55✔
131

132
export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
55✔
133
  implements Subscription {
UNCOV
134
  sid!: number;
54✔
UNCOV
135
  queue?: string;
1,463✔
UNCOV
136
  draining: boolean;
1,463✔
UNCOV
137
  max?: number;
1,463✔
UNCOV
138
  subject: string;
1,463✔
UNCOV
139
  drained?: Promise<void>;
1,463✔
UNCOV
140
  protocol: ProtocolHandler;
1,463✔
UNCOV
141
  timer?: Timeout<void>;
1,463✔
UNCOV
142
  info?: unknown;
1,463✔
UNCOV
143
  cleanupFn?: (sub: Subscription, info?: unknown) => void;
1,463✔
UNCOV
144
  closed: Deferred<void | Error>;
1,463✔
UNCOV
145
  requestSubject?: string;
1,463✔
UNCOV
146
  slow?: SlowNotifier;
54✔
147

UNCOV
148
  constructor(
54✔
UNCOV
149
    protocol: ProtocolHandler,
54✔
UNCOV
150
    subject: string,
54✔
UNCOV
151
    opts: SubscriptionOptions = {},
54✔
UNCOV
152
  ) {
54✔
UNCOV
153
    super();
1,463✔
UNCOV
154
    extend(this, opts);
1,463✔
UNCOV
155
    this.protocol = protocol;
1,463✔
UNCOV
156
    this.subject = subject;
1,463✔
UNCOV
157
    this.draining = false;
1,463✔
UNCOV
158
    this.noIterator = typeof opts.callback === "function";
1,463✔
UNCOV
159
    this.closed = deferred<void | Error>();
1,463✔
160

UNCOV
161
    const asyncTraces = !(protocol.options?.noAsyncTraces || false);
1,463✔
162

UNCOV
163
    if (opts.timeout) {
947✔
UNCOV
164
      this.timer = timeout<void>(opts.timeout, asyncTraces);
1,008✔
165
      this.timer
×
166
        .then(() => {
×
167
          // timer was cancelled
168
          this.timer = undefined;
×
169
        })
×
170
        .catch((err) => {
×
171
          // timer fired
172
          this.stop(err);
311✔
173
          if (this.noIterator) {
311✔
174
            this.callback(err, {} as Msg);
314✔
175
          }
314✔
176
        });
307✔
UNCOV
177
    }
1,008✔
UNCOV
178
    if (!this.noIterator) {
1,463✔
179
      // cleanup - they used break or return from the iterator
180
      // make sure we clean up, if they didn't call unsub
UNCOV
181
      this.iterClosed.then((err: void | Error) => {
1,583✔
UNCOV
182
        this.closed.resolve(err);
1,644✔
UNCOV
183
        this.unsubscribe();
1,644✔
UNCOV
184
      });
1,583✔
UNCOV
185
    }
2,023✔
UNCOV
186
  }
1,463✔
187

188
  setSlowNotificationFn(slow: number, fn?: (pending: number) => void): void {
33✔
189
    this.slow = undefined;
34✔
190
    if (fn) {
34✔
191
      if (this.noIterator) {
×
192
        throw new Error("callbacks don't support slow notifications");
×
193
      }
×
194
      this.slow = new SlowNotifier(slow, fn);
34✔
195
    }
34✔
196
  }
34✔
197

UNCOV
198
  callback(err: Error | null, msg: Msg) {
52✔
UNCOV
199
    this.cancelTimeout();
256,322✔
200
    err ? this.stop(err) : this.push(msg);
256,296✔
201
    if (!err && this.slow) {
256,296!
202
      this.slow.maybeNotify(this.getPending());
256,313✔
203
    }
256,313✔
UNCOV
204
  }
256,322✔
205

UNCOV
206
  close(err?: Error): void {
54✔
UNCOV
207
    if (!this.isClosed()) {
1,592✔
UNCOV
208
      this.cancelTimeout();
2,993✔
UNCOV
209
      const fn = () => {
2,993✔
UNCOV
210
        this.stop();
3,887✔
211
        if (this.cleanupFn) {
×
212
          try {
×
213
            this.cleanupFn(this, this.info);
×
214
          } catch (_err) {
×
215
            // ignoring
216
          }
×
217
        }
×
UNCOV
218
        this.closed.resolve(err);
3,887✔
UNCOV
219
      };
2,993✔
220

UNCOV
221
      if (this.noIterator) {
2,993✔
UNCOV
222
        fn();
4,701✔
UNCOV
223
      } else {
3,855✔
UNCOV
224
        this.push(fn);
4,410✔
UNCOV
225
      }
4,410✔
UNCOV
226
    }
2,993✔
UNCOV
227
  }
1,592✔
228

UNCOV
229
  unsubscribe(max?: number): void {
53✔
UNCOV
230
    this.protocol.unsubscribe(this, max);
578✔
UNCOV
231
  }
578✔
232

UNCOV
233
  cancelTimeout(): void {
54✔
UNCOV
234
    if (this.timer) {
257,206✔
UNCOV
235
      this.timer.cancel();
257,263✔
UNCOV
236
      this.timer = undefined;
257,263✔
UNCOV
237
    }
257,263✔
UNCOV
238
  }
257,725✔
239

UNCOV
240
  drain(): Promise<void> {
53✔
241
    if (this.protocol.isClosed()) {
71✔
242
      return Promise.reject(new errors.ClosedConnectionError());
72✔
243
    }
72✔
UNCOV
244
    if (this.isClosed()) {
142✔
UNCOV
245
      return Promise.reject(
146✔
UNCOV
246
        new errors.InvalidOperationError("subscription is already closed"),
146✔
247
      );
UNCOV
248
    }
146✔
UNCOV
249
    if (!this.drained) {
234✔
UNCOV
250
      this.draining = true;
268✔
UNCOV
251
      this.protocol.unsub(this);
268✔
UNCOV
252
      this.drained = this.protocol.flush(deferred<void>())
268✔
UNCOV
253
        .then(() => {
268✔
UNCOV
254
          this.protocol.subscriptions.cancel(this);
370✔
255
        })
×
256
        .catch(() => {
×
257
          this.protocol.subscriptions.cancel(this);
×
258
        });
×
UNCOV
259
    }
268✔
UNCOV
260
    return this.drained;
269✔
UNCOV
261
  }
166✔
262

263
  isDraining(): boolean {
33✔
264
    return this.draining;
4,981✔
265
  }
4,981✔
266

UNCOV
267
  isClosed(): boolean {
54✔
UNCOV
268
    return this.done;
1,709✔
UNCOV
269
  }
1,709✔
270

UNCOV
271
  getSubject(): string {
51✔
UNCOV
272
    return this.subject;
68✔
UNCOV
273
  }
68✔
274

275
  getMax(): number | undefined {
×
276
    return this.max;
×
277
  }
×
278

279
  getID(): number {
33✔
280
    return this.sid;
37✔
281
  }
37✔
282
}
55✔
283

284
export class Subscriptions {
55✔
UNCOV
285
  mux: SubscriptionImpl | null;
54✔
UNCOV
286
  subs: Map<number, SubscriptionImpl>;
746✔
UNCOV
287
  sidCounter: number;
54✔
288

UNCOV
289
  constructor() {
54✔
UNCOV
290
    this.sidCounter = 0;
746✔
UNCOV
291
    this.mux = null;
746✔
UNCOV
292
    this.subs = new Map<number, SubscriptionImpl>();
746✔
UNCOV
293
  }
746✔
294

UNCOV
295
  size(): number {
52✔
UNCOV
296
    return this.subs.size;
96✔
UNCOV
297
  }
96✔
298

UNCOV
299
  add(s: SubscriptionImpl): SubscriptionImpl {
54✔
UNCOV
300
    this.sidCounter++;
1,463✔
UNCOV
301
    s.sid = this.sidCounter;
1,463✔
UNCOV
302
    this.subs.set(s.sid, s as SubscriptionImpl);
1,463✔
UNCOV
303
    return s;
1,463✔
UNCOV
304
  }
1,463✔
305

UNCOV
306
  setMux(s: SubscriptionImpl | null): SubscriptionImpl | null {
54✔
UNCOV
307
    this.mux = s;
510✔
UNCOV
308
    return s;
510✔
UNCOV
309
  }
510✔
310

UNCOV
311
  getMux(): SubscriptionImpl | null {
54✔
UNCOV
312
    return this.mux;
123,188✔
UNCOV
313
  }
123,188✔
314

UNCOV
315
  get(sid: number): SubscriptionImpl | undefined {
54✔
UNCOV
316
    return this.subs.get(sid);
396,428✔
UNCOV
317
  }
396,428✔
318

UNCOV
319
  resub(s: SubscriptionImpl): SubscriptionImpl {
51✔
UNCOV
320
    this.sidCounter++;
66✔
UNCOV
321
    this.subs.delete(s.sid);
66✔
UNCOV
322
    s.sid = this.sidCounter;
66✔
UNCOV
323
    this.subs.set(s.sid, s);
66✔
UNCOV
324
    return s;
66✔
UNCOV
325
  }
66✔
326

UNCOV
327
  all(): (SubscriptionImpl)[] {
54✔
UNCOV
328
    return Array.from(this.subs.values());
820✔
UNCOV
329
  }
820✔
330

UNCOV
331
  cancel(s: SubscriptionImpl): void {
54✔
UNCOV
332
    if (s) {
679✔
UNCOV
333
      s.close();
679✔
UNCOV
334
      this.subs.delete(s.sid);
679✔
UNCOV
335
    }
679✔
UNCOV
336
  }
679✔
337

UNCOV
338
  handleError(err: PermissionViolationError): boolean {
34✔
UNCOV
339
    const subs = this.all();
56✔
UNCOV
340
    let sub;
56✔
UNCOV
341
    if (err.operation === "subscription") {
56✔
UNCOV
342
      sub = subs.find((s) => {
85✔
UNCOV
343
        return s.subject === err.subject && s.queue === err.queue;
120✔
UNCOV
344
      });
85✔
345
    } else if (err.operation === "publish") {
70!
346
      // we have a no mux subscription
347
      sub = subs.find((s) => {
78✔
348
        return s.requestSubject === err.subject;
86✔
349
      });
78✔
350
    }
78✔
UNCOV
351
    if (sub) {
56✔
UNCOV
352
      sub.callback(err, {} as Msg);
72✔
UNCOV
353
      sub.close(err);
72✔
UNCOV
354
      this.subs.delete(sub.sid);
72✔
UNCOV
355
      return sub !== this.mux;
72✔
UNCOV
356
    }
72!
357

358
    return false;
57✔
359
  }
53✔
360

UNCOV
361
  close() {
54✔
UNCOV
362
    this.subs.forEach((sub) => {
734✔
UNCOV
363
      sub.close();
1,627✔
UNCOV
364
    });
734✔
UNCOV
365
  }
734✔
366
}
55✔
367

368
export class ProtocolHandler implements Dispatcher<ParserEvent> {
55✔
UNCOV
369
  connected: boolean;
54✔
UNCOV
370
  connectedOnce: boolean;
744✔
UNCOV
371
  infoReceived: boolean;
744✔
UNCOV
372
  info?: ServerInfo;
744✔
UNCOV
373
  muxSubscriptions: MuxSubscription;
744✔
UNCOV
374
  options: ConnectionOptions;
744✔
UNCOV
375
  outbound: DataBuffer;
744✔
UNCOV
376
  pongs: Array<Deferred<void>>;
744✔
UNCOV
377
  subscriptions: Subscriptions;
744✔
UNCOV
378
  transport!: Transport;
744✔
UNCOV
379
  noMorePublishing: boolean;
744✔
UNCOV
380
  connectError?: (err?: Error) => void;
744✔
UNCOV
381
  publisher: Publisher;
744✔
UNCOV
382
  _closed: boolean;
744✔
UNCOV
383
  closed: Deferred<Error | void>;
744✔
UNCOV
384
  listeners: QueuedIteratorImpl<Status>[];
744✔
UNCOV
385
  heartbeats: Heartbeat;
744✔
UNCOV
386
  parser: Parser;
744✔
UNCOV
387
  outMsgs: number;
744✔
UNCOV
388
  inMsgs: number;
744✔
UNCOV
389
  outBytes: number;
744✔
UNCOV
390
  inBytes: number;
744✔
UNCOV
391
  pendingLimit: number;
744✔
UNCOV
392
  lastError?: Error;
744✔
UNCOV
393
  abortReconnect: boolean;
744✔
UNCOV
394
  whyClosed: string;
744✔
395

UNCOV
396
  servers: Servers;
744✔
UNCOV
397
  server!: ServerImpl;
744✔
UNCOV
398
  features: Features;
744✔
UNCOV
399
  connectPromise: Promise<void> | null;
744✔
UNCOV
400
  dialDelay: Delay | null;
744✔
UNCOV
401
  raceTimer?: Timeout<void>;
54✔
402

UNCOV
403
  constructor(options: ConnectionOptions, publisher: Publisher) {
54✔
UNCOV
404
    this._closed = false;
744✔
UNCOV
405
    this.connected = false;
744✔
UNCOV
406
    this.connectedOnce = false;
744✔
UNCOV
407
    this.infoReceived = false;
744✔
UNCOV
408
    this.noMorePublishing = false;
744✔
UNCOV
409
    this.abortReconnect = false;
744✔
UNCOV
410
    this.listeners = [];
744✔
UNCOV
411
    this.pendingLimit = FLUSH_THRESHOLD;
744✔
UNCOV
412
    this.outMsgs = 0;
744✔
UNCOV
413
    this.inMsgs = 0;
744✔
UNCOV
414
    this.outBytes = 0;
744✔
UNCOV
415
    this.inBytes = 0;
744✔
UNCOV
416
    this.options = options;
744✔
UNCOV
417
    this.publisher = publisher;
744✔
UNCOV
418
    this.subscriptions = new Subscriptions();
744✔
UNCOV
419
    this.muxSubscriptions = new MuxSubscription();
744✔
UNCOV
420
    this.outbound = new DataBuffer();
744✔
UNCOV
421
    this.pongs = [];
744✔
UNCOV
422
    this.whyClosed = "";
744✔
423
    //@ts-ignore: options.pendingLimit is hidden
UNCOV
424
    this.pendingLimit = options.pendingLimit || this.pendingLimit;
744✔
UNCOV
425
    this.features = new Features({ major: 0, minor: 0, micro: 0 });
3,720✔
UNCOV
426
    this.connectPromise = null;
744✔
UNCOV
427
    this.dialDelay = null;
744✔
428

429
    const servers = typeof options.servers === "string"
×
430
      ? [options.servers]
×
431
      : options.servers;
×
432

UNCOV
433
    this.servers = new Servers(servers, {
744✔
UNCOV
434
      randomize: !options.noRandomize,
744✔
UNCOV
435
    });
744✔
UNCOV
436
    this.closed = deferred<Error | void>();
744✔
UNCOV
437
    this.parser = new Parser(this);
744✔
438

439
    this.heartbeats = new Heartbeat(
×
440
      this as PH,
×
441
      this.options.pingInterval || DEFAULT_PING_INTERVAL,
×
442
      this.options.maxPingOut || DEFAULT_MAX_PING_OUT,
×
443
    );
UNCOV
444
  }
744✔
445

UNCOV
446
  resetOutbound(): void {
54✔
UNCOV
447
    this.outbound.reset();
858✔
UNCOV
448
    const pongs = this.pongs;
858✔
UNCOV
449
    this.pongs = [];
858✔
450
    // reject the pongs - the disconnect from here shouldn't have a trace
451
    // because that confuses API consumers
UNCOV
452
    const err = new errors.RequestError("connection disconnected");
858✔
UNCOV
453
    err.stack = "";
858✔
UNCOV
454
    pongs.forEach((p) => {
717✔
UNCOV
455
      p.reject(err);
800✔
UNCOV
456
    });
717✔
UNCOV
457
    this.parser = new Parser(this);
858✔
UNCOV
458
    this.infoReceived = false;
858✔
UNCOV
459
  }
858✔
460

UNCOV
461
  dispatchStatus(status: Status): void {
54✔
UNCOV
462
    this.listeners.forEach((q) => {
1,641✔
UNCOV
463
      q.push(status);
1,901✔
UNCOV
464
    });
1,641✔
UNCOV
465
  }
1,779✔
466

UNCOV
467
  private prepare(): Deferred<void> {
54✔
UNCOV
468
    if (this.transport) {
789!
UNCOV
469
      this.transport.discard();
903✔
UNCOV
470
    }
903✔
UNCOV
471
    this.info = undefined;
858✔
UNCOV
472
    this.resetOutbound();
858✔
473

UNCOV
474
    const pong = deferred<void>();
858✔
UNCOV
475
    pong.catch(() => {
717✔
476
      // provide at least one catch - as pong rejection can happen before it is expected
UNCOV
477
    });
717✔
UNCOV
478
    this.pongs.unshift(pong);
858✔
479

480
    this.connectError = (err?: Error) => {
405✔
481
      pong.reject(err);
440✔
482
    };
405✔
483

UNCOV
484
    this.transport = newTransport();
858✔
UNCOV
485
    this.transport.closed()
789✔
UNCOV
486
      .then(async (_err?) => {
789✔
UNCOV
487
        this.connected = false;
848✔
UNCOV
488
        if (!this.isClosed()) {
848✔
489
          // if the transport gave an error use that, otherwise
490
          // we may have received a protocol error
UNCOV
491
          await this.disconnected(this.transport.closeError || this.lastError);
897✔
UNCOV
492
          return;
897✔
UNCOV
493
        }
897✔
UNCOV
494
      });
789✔
495

UNCOV
496
    return pong;
858✔
UNCOV
497
  }
858✔
498

499
  public disconnect(): void {
33✔
500
    this.dispatchStatus({ type: "staleConnection" });
129✔
501
    this.transport.disconnect();
43✔
502
  }
43✔
503

504
  public reconnect(): Promise<void> {
33✔
505
    if (this.connected) {
44✔
506
      this.dispatchStatus({
44✔
507
        type: "forceReconnect",
44✔
508
      });
44✔
509
      this.transport.disconnect();
44✔
510
    }
44✔
511
    return Promise.resolve();
44✔
512
  }
44✔
513

UNCOV
514
  async disconnected(err?: Error): Promise<void> {
52✔
UNCOV
515
    this.dispatchStatus(
104✔
UNCOV
516
      {
104✔
UNCOV
517
        type: "disconnect",
104✔
UNCOV
518
        server: this.servers.getCurrentServer().toString(),
104✔
UNCOV
519
      },
104✔
520
    );
UNCOV
521
    if (this.options.reconnect) {
104✔
UNCOV
522
      await this.dialLoop()
192✔
UNCOV
523
        .then(() => {
192✔
UNCOV
524
          this.dispatchStatus(
225✔
UNCOV
525
            {
225✔
UNCOV
526
              type: "reconnect",
225✔
UNCOV
527
              server: this.servers.getCurrentServer().toString(),
225✔
UNCOV
528
            },
225✔
529
          );
530
          // if we are here we reconnected, but we have an authentication
531
          // that expired, we need to clean it up, otherwise we'll queue up
532
          // two of these, and the default for the client will be to
533
          // close, rather than attempt again - possibly they have an
534
          // authenticator that dynamically updates
535
          if (this.lastError instanceof errors.UserAuthenticationExpiredError) {
200!
536
            this.lastError = undefined;
204✔
537
          }
204✔
538
        })
170✔
539
        .catch((err) => {
170✔
540
          this.close(err).catch();
180✔
541
        });
170✔
542
    } else {
130!
543
      await this.close(err).catch();
139✔
544
    }
139✔
UNCOV
545
  }
104✔
546

UNCOV
547
  async dial(srv: Server): Promise<void> {
54✔
UNCOV
548
    const pong = this.prepare();
858✔
UNCOV
549
    try {
858✔
UNCOV
550
      this.raceTimer = timeout(this.options.timeout || 20000);
858✔
UNCOV
551
      const cp = this.transport.connect(srv, this.options);
858✔
UNCOV
552
      await Promise.race([cp, this.raceTimer]);
3,432✔
UNCOV
553
      (async () => {
1,451✔
UNCOV
554
        try {
2,182✔
UNCOV
555
          for await (const b of this.transport) {
2,182✔
UNCOV
556
            this.parser.parse(b);
22,973✔
UNCOV
557
          }
22,973✔
558
        } catch (err) {
×
559
          console.log("reader closed", err);
×
560
        }
×
UNCOV
561
      })().then();
1,451✔
UNCOV
562
    } catch (err) {
717!
UNCOV
563
      pong.reject(err);
789✔
UNCOV
564
    }
789✔
565

UNCOV
566
    try {
1,229✔
UNCOV
567
      await Promise.race([this.raceTimer, pong]);
4,916✔
UNCOV
568
      this.raceTimer?.cancel();
858✔
UNCOV
569
      this.connected = true;
858✔
UNCOV
570
      this.connectError = undefined;
858✔
UNCOV
571
      this.sendSubscriptions();
858✔
UNCOV
572
      this.connectedOnce = true;
858✔
UNCOV
573
      this.server.didConnect = true;
858✔
UNCOV
574
      this.server.reconnects = 0;
858✔
UNCOV
575
      this.flushPending();
858✔
UNCOV
576
      this.heartbeats.start();
858✔
UNCOV
577
    } catch (err) {
717!
UNCOV
578
      this.raceTimer?.cancel();
815✔
UNCOV
579
      await this.transport.close(err as Error);
815✔
UNCOV
580
      throw err;
815✔
UNCOV
581
    }
815✔
UNCOV
582
  }
858✔
583

UNCOV
584
  async _doDial(srv: Server): Promise<void> {
54✔
UNCOV
585
    const { resolve } = this.options;
849✔
UNCOV
586
    const alts = await srv.resolve({
849✔
UNCOV
587
      fn: getResolveFn(),
849✔
UNCOV
588
      debug: this.options.debug,
849✔
UNCOV
589
      randomize: !this.options.noRandomize,
849✔
UNCOV
590
      resolve,
849✔
UNCOV
591
    });
849✔
592

UNCOV
593
    let lastErr: Error | null = null;
849✔
UNCOV
594
    for (const a of alts) {
849✔
UNCOV
595
      try {
1,221✔
UNCOV
596
        lastErr = null;
1,221✔
UNCOV
597
        this.dispatchStatus(
1,221✔
UNCOV
598
          { type: "reconnecting" },
3,663✔
599
        );
UNCOV
600
        await this.dial(a);
1,221✔
601
        // if here we connected
UNCOV
602
        return;
1,788✔
UNCOV
603
      } catch (err) {
1,080!
UNCOV
604
        lastErr = err as Error;
1,179✔
UNCOV
605
      }
1,179✔
UNCOV
606
    }
1,221!
607
    // if we are here, we failed, and we have no additional
608
    // alternatives for this server
UNCOV
609
    throw lastErr;
798✔
UNCOV
610
  }
708✔
611

UNCOV
612
  dialLoop(): Promise<void> {
54✔
UNCOV
613
    if (this.connectPromise === null) {
787✔
UNCOV
614
      this.connectPromise = this.dodialLoop();
1,084✔
615
      this.connectPromise
633✔
616
        .then(() => {})
633✔
617
        .catch(() => {})
633✔
618
        .finally(() => {
633✔
UNCOV
619
          this.connectPromise = null;
1,811✔
UNCOV
620
        });
1,084✔
UNCOV
621
    }
1,084✔
UNCOV
622
    return this.connectPromise;
787✔
UNCOV
623
  }
787✔
624

UNCOV
625
  async dodialLoop(): Promise<void> {
54✔
UNCOV
626
    let lastError: Error | undefined;
781✔
UNCOV
627
    while (true) {
781✔
628
      if (this._closed) {
861!
629
        // if we are disconnected, and close is called, the client
630
        // still tries to reconnect - to match the reconnect policy
631
        // in the case of close, want to stop.
632
        this.servers.clear();
865✔
633
      }
865✔
634
      const wait = this.options.reconnectDelayHandler
×
635
        ? this.options.reconnectDelayHandler()
×
636
        : DEFAULT_RECONNECT_TIME_WAIT;
×
UNCOV
637
      let maxWait = wait;
1,687✔
UNCOV
638
      const srv = this.selectServer();
1,687✔
639
      if (!srv || this.abortReconnect) {
861!
640
        if (lastError) {
883✔
641
          throw lastError;
903✔
642
        } else if (this.lastError) {
×
643
          throw this.lastError;
×
644
        } else {
×
645
          throw new errors.ConnectionError("connection refused");
885✔
646
        }
885✔
647
      }
883✔
UNCOV
648
      const now = Date.now();
2,196✔
UNCOV
649
      if (srv.lastConnect === 0 || srv.lastConnect + wait <= now) {
1,618!
UNCOV
650
        srv.lastConnect = Date.now();
2,415✔
UNCOV
651
        try {
2,415✔
UNCOV
652
          await this._doDial(srv);
2,415✔
UNCOV
653
          break;
2,982✔
UNCOV
654
        } catch (err) {
2,131!
UNCOV
655
          lastError = err as Error;
2,221✔
656
          if (!this.connectedOnce) {
1,312!
657
            if (this.options.waitOnFirstConnect) {
1,346✔
658
              continue;
1,367✔
659
            }
1,367✔
660
            this.servers.removeCurrentServer();
1,359✔
661
          }
1,359✔
UNCOV
662
          srv.reconnects++;
2,288✔
663
          const mra = this.options.maxReconnectAttempts || 0;
×
664
          if (mra !== -1 && srv.reconnects >= mra) {
1,312!
665
            this.servers.removeCurrentServer();
1,319✔
666
          }
1,319✔
UNCOV
667
        }
2,221✔
UNCOV
668
      } else {
1,618!
UNCOV
669
        maxWait = Math.min(maxWait, srv.lastConnect + wait - now);
1,774✔
UNCOV
670
        this.dialDelay = delay(maxWait);
1,774✔
UNCOV
671
        await this.dialDelay;
1,774✔
UNCOV
672
      }
1,774✔
UNCOV
673
    }
1,687✔
UNCOV
674
  }
781✔
675

UNCOV
676
  public static async connect(
54✔
UNCOV
677
    options: ConnectionOptions,
54✔
UNCOV
678
    publisher: Publisher,
54✔
UNCOV
679
  ): Promise<ProtocolHandler> {
54✔
UNCOV
680
    const h = new ProtocolHandler(options, publisher);
744✔
UNCOV
681
    await h.dialLoop();
744✔
UNCOV
682
    return h;
995✔
UNCOV
683
  }
744✔
684

UNCOV
685
  static toError(s: string): Error {
34✔
UNCOV
686
    let err: Error | null = errors.PermissionViolationError.parse(s);
90✔
UNCOV
687
    if (err) {
90✔
UNCOV
688
      return err;
110✔
UNCOV
689
    }
110!
690
    err = errors.UserAuthenticationExpiredError.parse(s);
170✔
691
    if (err) {
136✔
692
      return err;
141✔
693
    }
141✔
694
    err = errors.AuthorizationError.parse(s);
196✔
695
    if (err) {
167✔
696
      return err;
192✔
697
    }
192✔
698
    return new errors.ProtocolError(s);
140✔
699
  }
87✔
700

UNCOV
701
  processMsg(msg: MsgArg, data: Uint8Array) {
54✔
UNCOV
702
    this.inMsgs++;
396,421✔
UNCOV
703
    this.inBytes += data.length;
396,421✔
704
    if (!this.subscriptions.sidCounter) {
×
705
      return;
×
706
    }
×
707

UNCOV
708
    const sub = this.subscriptions.get(msg.sid) as SubscriptionImpl;
396,421✔
UNCOV
709
    if (!sub) {
383,191!
UNCOV
710
      return;
383,211✔
UNCOV
711
    }
383,211✔
UNCOV
712
    sub.received += 1;
779,540✔
713

UNCOV
714
    if (sub.callback) {
779,540✔
UNCOV
715
      sub.callback(null, new MsgImpl(msg, data, this));
779,540✔
UNCOV
716
    }
779,540✔
717

UNCOV
718
    if (sub.max !== undefined && sub.received >= sub.max) {
383,191!
UNCOV
719
      sub.unsubscribe();
383,236✔
UNCOV
720
    }
383,236✔
UNCOV
721
  }
396,421✔
722

UNCOV
723
  processError(m: Uint8Array) {
34✔
UNCOV
724
    let s = decode(m);
90✔
UNCOV
725
    if (s.startsWith("'") && s.endsWith("'")) {
90✔
UNCOV
726
      s = s.slice(1, s.length - 1);
90✔
UNCOV
727
    }
90✔
UNCOV
728
    const err = ProtocolHandler.toError(s);
90✔
729

UNCOV
730
    switch (err.constructor) {
90✔
UNCOV
731
      case errors.PermissionViolationError: {
200✔
UNCOV
732
        const pe = err as PermissionViolationError;
110✔
UNCOV
733
        const mux = this.subscriptions.getMux();
110✔
734
        const isMuxPermission = mux ? pe.subject === mux.subject : false;
107!
UNCOV
735
        this.subscriptions.handleError(pe);
110✔
UNCOV
736
        this.muxSubscriptions.handleError(isMuxPermission, pe);
110✔
737
        if (isMuxPermission) {
107!
738
          // remove the permission - enable it to be recreated
739
          this.subscriptions.setMux(null);
108✔
740
        }
108✔
UNCOV
741
      }
110✔
UNCOV
742
    }
90✔
743

UNCOV
744
    this.dispatchStatus({ type: "error", error: err });
360✔
UNCOV
745
    this.handleError(err);
90✔
UNCOV
746
  }
90✔
747

UNCOV
748
  handleError(err: Error) {
34✔
749
    if (
87✔
750
      err instanceof errors.UserAuthenticationExpiredError ||
87✔
751
      err instanceof errors.AuthorizationError
87✔
752
    ) {
87!
753
      this.handleAuthError(err);
117✔
754
    }
117✔
755

756
    if (!(err instanceof errors.PermissionViolationError)) {
87!
757
      this.lastError = err;
121✔
758
    }
121✔
UNCOV
759
  }
90✔
760

761
  handleAuthError(err: UserAuthenticationExpiredError | AuthorizationError) {
33✔
762
    if (
63✔
763
      (this.lastError instanceof errors.UserAuthenticationExpiredError ||
63✔
764
        this.lastError instanceof errors.AuthorizationError) &&
63✔
765
      this.options.ignoreAuthErrorAbort === false
63✔
766
    ) {
63✔
767
      this.abortReconnect = true;
67✔
768
    }
67✔
769
    if (this.connectError) {
63✔
770
      this.connectError(err);
87✔
771
    } else {
63✔
772
      this.disconnect();
69✔
773
    }
69✔
774
  }
63✔
775

UNCOV
776
  processPing() {
54✔
UNCOV
777
    this.transport.send(PONG_CMD);
97✔
UNCOV
778
  }
97✔
779

UNCOV
780
  processPong() {
54✔
UNCOV
781
    const cb = this.pongs.shift();
1,075✔
UNCOV
782
    if (cb) {
1,075✔
UNCOV
783
      cb.resolve();
1,075✔
UNCOV
784
    }
1,075✔
UNCOV
785
  }
1,075✔
786

UNCOV
787
  processInfo(m: Uint8Array) {
54✔
UNCOV
788
    const info = JSON.parse(decode(m));
803✔
UNCOV
789
    this.info = info;
803✔
790
    const updates = this.options && this.options.ignoreClusterUpdates
350!
791
      ? undefined
350✔
792
      : this.servers.update(info, this.transport.isEncrypted());
350✔
UNCOV
793
    if (!this.infoReceived) {
803✔
UNCOV
794
      this.features.update(parseSemVer(info.version));
1,396✔
UNCOV
795
      this.infoReceived = true;
1,396✔
UNCOV
796
      if (this.transport.isEncrypted()) {
1,255!
UNCOV
797
        this.servers.updateTLSName();
1,275✔
UNCOV
798
      }
1,275✔
799
      // send connect
UNCOV
800
      const { version, lang } = this.transport;
1,396✔
UNCOV
801
      try {
1,396✔
UNCOV
802
        const c = new Connect(
1,396✔
UNCOV
803
          { version, lang },
5,584✔
UNCOV
804
          this.options,
1,396✔
UNCOV
805
          info.nonce,
1,396✔
806
        );
807

UNCOV
808
        if (info.headers) {
1,396✔
UNCOV
809
          c.headers = true;
1,993✔
UNCOV
810
          c.no_responders = true;
1,993✔
UNCOV
811
        }
1,993✔
UNCOV
812
        const cs = JSON.stringify(c);
1,696✔
UNCOV
813
        this.transport.send(
1,696✔
UNCOV
814
          encode(`CONNECT ${cs}${CR_LF}`),
1,696✔
815
        );
UNCOV
816
        this.transport.send(PING_CMD);
1,696✔
817
      } catch (err) {
651!
818
        // if we are dying here, this is likely some an authenticator blowing up
819
        this.close(err as Error).catch();
652✔
820
      }
652✔
UNCOV
821
    }
1,396✔
UNCOV
822
    if (updates) {
803✔
UNCOV
823
      const { added, deleted } = updates;
1,118✔
824

UNCOV
825
      this.dispatchStatus({ type: "update", added, deleted });
5,590✔
UNCOV
826
    }
1,118✔
827
    const ldm = info.ldm !== undefined ? info.ldm : false;
350!
828
    if (ldm) {
350!
829
      this.dispatchStatus(
351✔
830
        {
351✔
831
          type: "ldm",
351✔
832
          server: this.servers.getCurrentServer().toString(),
351✔
833
        },
351✔
834
      );
835
    }
351✔
UNCOV
836
  }
803✔
837

UNCOV
838
  push(e: ParserEvent): void {
54✔
UNCOV
839
    switch (e.kind) {
398,290✔
UNCOV
840
      case Kind.MSG: {
1,192,947✔
UNCOV
841
        const { msg, data } = e;
794,657✔
UNCOV
842
        this.processMsg(msg!, data!);
794,657✔
UNCOV
843
        break;
794,657✔
UNCOV
844
      }
794,657✔
845
      case Kind.OK:
×
846
        break;
×
UNCOV
847
      case Kind.ERR:
268,689!
UNCOV
848
        this.processError(e.data!);
268,745✔
UNCOV
849
        break;
268,745✔
UNCOV
850
      case Kind.PING:
398,290✔
UNCOV
851
        this.processPing();
398,333✔
UNCOV
852
        break;
398,333✔
UNCOV
853
      case Kind.PONG:
398,290✔
UNCOV
854
        this.processPong();
399,311✔
UNCOV
855
        break;
399,311✔
UNCOV
856
      case Kind.INFO:
398,290✔
UNCOV
857
        this.processInfo(e.data!);
399,039✔
UNCOV
858
        break;
399,039✔
UNCOV
859
    }
398,290✔
UNCOV
860
  }
398,290✔
861

UNCOV
862
  sendCommand(cmd: string | Uint8Array, ...payloads: Uint8Array[]) {
54✔
UNCOV
863
    const len = this.outbound.length();
399,408✔
UNCOV
864
    let buf: Uint8Array;
399,408✔
UNCOV
865
    if (typeof cmd === "string") {
399,408✔
UNCOV
866
      buf = encode(cmd);
399,408✔
867
    } else {
×
868
      buf = cmd as Uint8Array;
×
869
    }
×
UNCOV
870
    this.outbound.fill(buf, ...payloads);
399,408✔
871

UNCOV
872
    if (len === 0) {
399,408✔
UNCOV
873
      queueMicrotask(() => {
698,669✔
UNCOV
874
        this.flushPending();
729,901✔
UNCOV
875
      });
698,669✔
UNCOV
876
    } else if (this.outbound.size() >= this.pendingLimit) {
653,009!
877
      // flush inline
UNCOV
878
      this.flushPending();
1,031,246✔
UNCOV
879
    }
1,031,246✔
UNCOV
880
  }
399,408✔
881

UNCOV
882
  publish(
54✔
UNCOV
883
    subject: string,
54✔
UNCOV
884
    payload: Payload = Empty,
54✔
UNCOV
885
    options?: PublishOptions,
54✔
UNCOV
886
  ): void {
54✔
UNCOV
887
    let data;
397,348✔
UNCOV
888
    if (payload instanceof Uint8Array) {
397,348✔
UNCOV
889
      data = payload;
792,509✔
UNCOV
890
    } else if (typeof payload === "string") {
397,348✔
UNCOV
891
      data = TE.encode(payload);
399,481✔
892
    } else {
×
893
      throw new TypeError(
×
894
        "payload types can be strings or Uint8Array",
×
895
      );
896
    }
×
897

UNCOV
898
    let len = data.length;
397,348✔
UNCOV
899
    options = options || {};
397,348✔
UNCOV
900
    options.reply = options.reply || "";
397,348✔
901

UNCOV
902
    let headers = Empty;
397,348✔
UNCOV
903
    let hlen = 0;
397,348✔
UNCOV
904
    if (options.headers) {
397,348✔
905
      if (this.info && !this.info.headers) {
×
906
        InvalidArgumentError.format(
×
907
          "headers",
×
908
          "are not available on this server",
×
909
        );
910
      }
×
UNCOV
911
      const hdrs = options.headers as MsgHdrsImpl;
517,737✔
UNCOV
912
      headers = hdrs.encode();
517,737✔
UNCOV
913
      hlen = headers.length;
517,737✔
UNCOV
914
      len = data.length + hlen;
517,737✔
UNCOV
915
    }
517,737✔
916

917
    if (this.info && len > this.info.max_payload) {
267,635!
918
      throw InvalidArgumentError.format("payload", "max_payload size exceeded");
267,638✔
919
    }
267,638✔
UNCOV
920
    this.outBytes += len;
664,947✔
UNCOV
921
    this.outMsgs++;
664,947✔
922

UNCOV
923
    let proto: string;
664,947✔
UNCOV
924
    if (options.headers) {
397,348✔
UNCOV
925
      if (options.reply) {
517,638!
UNCOV
926
        proto = `HPUB ${subject} ${options.reply} ${hlen} ${len}\r\n`;
517,641✔
927
      } else {
267,645!
UNCOV
928
        proto = `HPUB ${subject} ${hlen} ${len}\r\n`;
267,751✔
UNCOV
929
      }
267,751✔
UNCOV
930
      this.sendCommand(proto, headers, data, CRLF);
517,737✔
UNCOV
931
    } else {
397,348✔
UNCOV
932
      if (options.reply) {
674,250✔
UNCOV
933
        proto = `PUB ${subject} ${options.reply} ${len}\r\n`;
945,733✔
UNCOV
934
      } else {
941,881✔
UNCOV
935
        proto = `PUB ${subject} ${len}\r\n`;
1,214,839✔
UNCOV
936
      }
1,214,839✔
UNCOV
937
      this.sendCommand(proto, data, CRLF);
674,250✔
UNCOV
938
    }
674,250✔
UNCOV
939
  }
397,348✔
940

UNCOV
941
  request(r: Request): Request {
54✔
UNCOV
942
    this.initMux();
123,159✔
UNCOV
943
    this.muxSubscriptions.add(r);
123,159✔
UNCOV
944
    return r;
123,159✔
UNCOV
945
  }
123,159✔
946

UNCOV
947
  subscribe(s: SubscriptionImpl): Subscription {
54✔
UNCOV
948
    this.subscriptions.add(s);
1,461✔
UNCOV
949
    this._subunsub(s);
1,461✔
UNCOV
950
    return s;
1,461✔
UNCOV
951
  }
1,461✔
952

UNCOV
953
  _sub(s: SubscriptionImpl): void {
54✔
UNCOV
954
    if (s.queue) {
1,288!
UNCOV
955
      this.sendCommand(`SUB ${s.subject} ${s.queue} ${s.sid}\r\n`);
1,357✔
UNCOV
956
    } else {
1,288✔
UNCOV
957
      this.sendCommand(`SUB ${s.subject} ${s.sid}\r\n`);
2,643✔
UNCOV
958
    }
2,643✔
UNCOV
959
  }
1,476✔
960

UNCOV
961
  _subunsub(s: SubscriptionImpl): SubscriptionImpl {
54✔
UNCOV
962
    this._sub(s);
1,461✔
UNCOV
963
    if (s.max) {
1,074!
UNCOV
964
      this.unsubscribe(s, s.max);
1,135✔
UNCOV
965
    }
1,135✔
UNCOV
966
    return s;
1,461✔
UNCOV
967
  }
1,461✔
968

UNCOV
969
  unsubscribe(s: SubscriptionImpl, max?: number): void {
53✔
UNCOV
970
    this.unsub(s, max);
639✔
UNCOV
971
    if (s.max === undefined || s.received >= s.max) {
618!
UNCOV
972
      this.subscriptions.cancel(s);
1,140✔
UNCOV
973
    }
1,140✔
UNCOV
974
  }
639✔
975

UNCOV
976
  unsub(s: SubscriptionImpl, max?: number): void {
54✔
UNCOV
977
    if (!s || this.isClosed()) {
693✔
UNCOV
978
      return;
1,095✔
UNCOV
979
    }
1,095✔
UNCOV
980
    if (max) {
672!
UNCOV
981
      this.sendCommand(`UNSUB ${s.sid} ${max}\r\n`);
940✔
UNCOV
982
    } else {
876✔
UNCOV
983
      this.sendCommand(`UNSUB ${s.sid}\r\n`);
1,778✔
UNCOV
984
    }
1,778✔
UNCOV
985
    s.max = max;
1,327✔
UNCOV
986
  }
762✔
987

UNCOV
988
  resub(s: SubscriptionImpl, subject: string): void {
51✔
989
    if (!s || this.isClosed()) {
×
990
      return;
×
991
    }
×
UNCOV
992
    this.unsub(s);
66✔
UNCOV
993
    s.subject = subject;
66✔
UNCOV
994
    this.subscriptions.resub(s);
66✔
995
    // we don't auto-unsub here because we don't
996
    // really know "processed"
UNCOV
997
    this._sub(s);
66✔
UNCOV
998
  }
66✔
999

UNCOV
1000
  flush(p?: Deferred<void>): Promise<void> {
53✔
UNCOV
1001
    if (!p) {
313✔
UNCOV
1002
      p = deferred<void>();
529✔
UNCOV
1003
    }
529✔
UNCOV
1004
    this.pongs.push(p);
382✔
UNCOV
1005
    this.outbound.fill(PING_CMD);
382✔
UNCOV
1006
    this.flushPending();
382✔
UNCOV
1007
    return p;
382✔
UNCOV
1008
  }
382✔
1009

UNCOV
1010
  sendSubscriptions(): void {
54✔
UNCOV
1011
    const cmds: string[] = [];
759✔
UNCOV
1012
    this.subscriptions.all().forEach((s) => {
690✔
UNCOV
1013
      const sub = s as SubscriptionImpl;
702✔
1014
      if (sub.queue) {
×
1015
        cmds.push(`SUB ${sub.subject} ${sub.queue} ${sub.sid}${CR_LF}`);
×
1016
      } else {
×
UNCOV
1017
        cmds.push(`SUB ${sub.subject} ${sub.sid}${CR_LF}`);
702✔
UNCOV
1018
      }
702✔
UNCOV
1019
    });
690✔
UNCOV
1020
    if (cmds.length) {
690!
UNCOV
1021
      this.transport.send(encode(cmds.join("")));
695✔
UNCOV
1022
    }
695✔
UNCOV
1023
  }
759✔
1024

UNCOV
1025
  async close(err?: Error): Promise<void> {
54✔
UNCOV
1026
    if (this._closed) {
613!
UNCOV
1027
      return;
894✔
UNCOV
1028
    }
894✔
1029
    this.whyClosed = new Error("close trace").stack || "";
×
UNCOV
1030
    this.heartbeats.cancel();
753✔
1031
    if (this.connectError) {
301!
1032
      this.connectError(err);
312✔
1033
      this.connectError = undefined;
312✔
1034
    }
312✔
UNCOV
1035
    this.muxSubscriptions.close();
1,296✔
UNCOV
1036
    this.subscriptions.close();
1,296✔
UNCOV
1037
    const proms = [];
1,296✔
UNCOV
1038
    for (let i = 0; i < this.listeners.length; i++) {
723✔
UNCOV
1039
      const qi = this.listeners[i];
790✔
UNCOV
1040
      if (qi) {
790✔
UNCOV
1041
        qi.stop();
790✔
UNCOV
1042
        proms.push(qi.iterClosed);
790✔
UNCOV
1043
      }
790✔
UNCOV
1044
    }
790✔
UNCOV
1045
    if (proms.length) {
723✔
UNCOV
1046
      await Promise.all(proms);
788✔
UNCOV
1047
    }
788✔
UNCOV
1048
    this._closed = true;
1,296✔
UNCOV
1049
    await this.transport.close(err);
1,296✔
UNCOV
1050
    this.raceTimer?.cancel();
753✔
UNCOV
1051
    this.dialDelay?.cancel();
684!
UNCOV
1052
    this.closed.resolve(err);
753✔
UNCOV
1053
  }
753✔
1054

UNCOV
1055
  isClosed(): boolean {
54✔
UNCOV
1056
    return this._closed;
643,251✔
UNCOV
1057
  }
643,251✔
1058

UNCOV
1059
  async drain(): Promise<void> {
51✔
UNCOV
1060
    const subs = this.subscriptions.all();
66✔
UNCOV
1061
    const promises: Promise<void>[] = [];
66✔
UNCOV
1062
    subs.forEach((sub: Subscription) => {
66✔
UNCOV
1063
      promises.push(sub.drain());
90✔
UNCOV
1064
    });
66✔
UNCOV
1065
    try {
66✔
UNCOV
1066
      await Promise.allSettled(promises);
66✔
1067
    } catch {
×
1068
      // nothing we can do here
1069
    } finally {
×
UNCOV
1070
      this.noMorePublishing = true;
66✔
UNCOV
1071
      await this.flush();
66!
1072
    }
46✔
1073
    return this.close();
46✔
1074
  }
46✔
1075

UNCOV
1076
  private flushPending() {
54✔
UNCOV
1077
    if (!this.infoReceived || !this.connected) {
18,478!
UNCOV
1078
      return;
18,481✔
UNCOV
1079
    }
18,481✔
1080

UNCOV
1081
    if (this.outbound.size()) {
42,849✔
UNCOV
1082
      const d = this.outbound.drain();
74,154✔
UNCOV
1083
      this.transport.send(d);
74,154✔
UNCOV
1084
    }
74,154✔
UNCOV
1085
  }
42,849✔
1086

UNCOV
1087
  private initMux(): void {
54✔
UNCOV
1088
    const mux = this.subscriptions.getMux();
123,159✔
UNCOV
1089
    if (!mux) {
123,159✔
UNCOV
1090
      const inbox = this.muxSubscriptions.init(
123,614✔
UNCOV
1091
        this.options.inboxPrefix,
123,614✔
1092
      );
1093
      // dot is already part of mux
UNCOV
1094
      const sub = new SubscriptionImpl(this, `${inbox}*`);
123,614✔
UNCOV
1095
      sub.callback = this.muxSubscriptions.dispatcher();
123,614✔
UNCOV
1096
      this.subscriptions.setMux(sub);
123,614✔
UNCOV
1097
      this.subscribe(sub);
123,614✔
UNCOV
1098
    }
123,614✔
UNCOV
1099
  }
123,159✔
1100

UNCOV
1101
  private selectServer(): ServerImpl | undefined {
54✔
UNCOV
1102
    const server = this.servers.selectServer();
1,027✔
1103
    if (server === undefined) {
564!
1104
      return undefined;
583✔
1105
    }
583✔
1106
    // Place in client context.
UNCOV
1107
    this.server = server;
1,539✔
UNCOV
1108
    return this.server;
1,539✔
UNCOV
1109
  }
1,027✔
1110

1111
  getServer(): ServerImpl | undefined {
×
1112
    return this.server;
×
1113
  }
×
1114
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc