• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13845550996

13 Mar 2025 10:08PM UTC coverage: 82.565% (-0.03%) from 82.598%
13845550996

push

github

web-flow
fix(node): websocket under node doesn't emit close+error (only error) so redials didn't happen (#219)

Introduce a `cleanup` method for socket event handling, ensuring consistent resource cleanup across close/error scenarios. Additionally, implement a test case to verify websocket reconnect functionality after disconnection.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2276 of 3117 branches covered (73.02%)

Branch coverage included in aggregate %.

2 of 14 new or added lines in 3 files covered. (14.29%)

392 existing lines in 14 files now uncovered.

9838 of 11555 relevant lines covered (85.14%)

769151.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.4
/core/src/queued_iterator.ts
1
/*
2
 * Copyright 2020-2022 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import type { Deferred } from "./util.ts";
16
import { deferred } from "./util.ts";
55✔
17
import type { QueuedIterator } from "./core.ts";
18
import type { CallbackFn, Dispatcher } from "./core.ts";
19
import { InvalidOperationError } from "./errors.ts";
55✔
20

21
export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
55✔
22
  inflight: number;
54✔
23
  processed: number;
2,292✔
24
  // this is updated by the protocol
25
  received: number;
2,292✔
26
  noIterator: boolean;
2,292✔
27
  iterClosed: Deferred<void | Error>;
2,292✔
28
  done: boolean;
2,292✔
29
  signal: Deferred<void>;
2,292✔
30
  yields: (T | CallbackFn)[];
2,292✔
31
  filtered: number;
2,292✔
32
  pendingFiltered: number;
2,292✔
33
  ctx?: unknown;
2,292✔
34
  _data?: unknown; //data is for use by extenders in any way they like
2,292✔
35
  err?: Error;
2,292✔
36
  time: number;
2,292✔
37
  profile: boolean;
2,292✔
38
  yielding: boolean;
2,292✔
39
  didBreak: boolean;
54✔
40

41
  constructor() {
54✔
42
    this.inflight = 0;
2,292✔
43
    this.filtered = 0;
2,292✔
44
    this.pendingFiltered = 0;
2,292✔
45
    this.processed = 0;
2,292✔
46
    this.received = 0;
2,292✔
47
    this.noIterator = false;
2,292✔
48
    this.done = false;
2,292✔
49
    this.signal = deferred<void>();
2,292✔
50
    this.yields = [];
2,292✔
51
    this.iterClosed = deferred<void | Error>();
2,292✔
52
    this.time = 0;
2,292✔
53
    this.yielding = false;
2,292✔
54
    this.didBreak = false;
2,292✔
55
    this.profile = false;
2,292✔
56
  }
2,292✔
57

58
  [Symbol.asyncIterator](): AsyncIterator<T> {
109✔
59
    return this.iterate();
910✔
60
  }
910✔
61

62
  push(v: T | CallbackFn): void {
54✔
63
    if (this.done) {
264,440✔
64
      return;
264,740✔
65
    }
264,740✔
66
    // if they `break` from a `for await`, any signaling that is pushed via
67
    // a function is not handled this can prevent closed promises from
68
    // resolving downstream.
69
    if (this.didBreak) {
7,078✔
70
      if (typeof v === "function") {
7,253✔
71
        const cb = v as CallbackFn;
7,253✔
72
        try {
7,253✔
73
          cb();
7,253✔
74
        } catch (_) {
×
75
          // ignored
76
        }
×
77
      }
7,253✔
78
      return;
7,253✔
79
    }
7,253✔
80
    if (typeof v === "function") {
264,440✔
81
      this.pendingFiltered++;
266,315✔
82
    }
266,315✔
83
    this.yields.push(v);
528,351✔
84
    this.signal.resolve();
528,351✔
85
  }
264,440✔
86

87
  async *iterate(): AsyncIterableIterator<T> {
54✔
88
    if (this.noIterator) {
690✔
89
      throw new InvalidOperationError(
693✔
90
        "iterator cannot be used when a callback is registered",
693✔
91
      );
92
    }
693✔
93
    if (this.yielding) {
169!
94
      throw new InvalidOperationError("iterator is already yielding");
170✔
95
    }
170✔
96
    this.yielding = true;
1,545✔
97
    try {
1,545✔
98
      while (true) {
910✔
99
        if (this.yields.length === 0) {
3,401✔
100
          await this.signal;
5,559✔
101
        }
6,911✔
UNCOV
102
        if (this.err) {
883!
UNCOV
103
          throw this.err;
890✔
UNCOV
104
        }
890✔
105
        const yields = this.yields;
5,565✔
106
        this.inflight = yields.length;
5,565✔
107
        this.yields = [];
5,565✔
108
        for (let i = 0; i < yields.length; i++) {
3,401✔
109
          if (typeof yields[i] === "function") {
266,463✔
110
            this.pendingFiltered--;
267,704✔
111
            const fn = yields[i] as CallbackFn;
267,704✔
112
            try {
267,704✔
113
              fn();
267,704✔
114
            } catch (err) {
266,518!
115
              // failed on the invocation - fail the iterator
116
              // so they know to fix the callback
117
              throw err;
266,524✔
118
            }
266,524✔
119
            // fn could have also set an error
UNCOV
120
            if (this.err) {
266,753!
UNCOV
121
              throw this.err;
266,778✔
UNCOV
122
            }
266,778✔
123
            continue;
268,876✔
124
          }
268,876✔
125

126
          this.processed++;
785,023✔
127
          this.inflight--;
785,023✔
UNCOV
128
          const start = this.profile ? Date.now() : 0;
182!
129
          yield yields[i] as T;
266,463✔
UNCOV
130
          this.time = this.profile ? Date.now() - start : 0;
182!
131
        }
266,463✔
132
        // yielding could have paused and microtask
133
        // could have added messages. Prevent allocations
134
        // if possible
135
        if (this.done) {
4,918✔
136
          break;
7,533✔
137
        } else if (this.yields.length === 0) {
6,931✔
138
          yields.length = 0;
10,096✔
139
          this.yields = yields;
10,096✔
140
          this.signal = deferred();
10,096✔
141
        }
10,096✔
142
      }
3,401✔
143
    } finally {
910✔
144
      // the iterator used break/return
145
      this.didBreak = true;
1,735✔
146
      this.stop();
1,735✔
147
    }
1,735✔
148
  }
910✔
149

150
  stop(err?: Error): void {
54✔
151
    if (this.done) {
2,383✔
152
      return;
3,332✔
153
    }
3,332✔
154
    this.err = err;
4,433✔
155
    this.done = true;
4,433✔
156
    this.signal.resolve();
4,433✔
157
    this.iterClosed.resolve(err);
4,433✔
158
  }
2,383✔
159

160
  getProcessed(): number {
52✔
161
    return this.noIterator ? this.received : this.processed;
111✔
162
  }
118✔
163

164
  getPending(): number {
51✔
165
    return this.yields.length + this.inflight - this.pendingFiltered;
206✔
166
  }
206✔
167

168
  getReceived(): number {
52✔
169
    return this.received - this.filtered;
203✔
170
  }
203✔
171
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc