• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 17831411252

18 Sep 2025 02:09PM UTC coverage: 83.819% (-1.0%) from 84.865%
17831411252

push

github

web-flow
feat(jsapi): add Raft group and traffic account metadata to cluster types (#319)

- Added additional fields to the ClusterInfo type.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2407 of 3260 branches covered (73.83%)

Branch coverage included in aggregate %.

10258 of 11850 relevant lines covered (86.57%)

754254.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.02
/core/src/queued_iterator.ts
1
/*
2
 * Copyright 2020-2022 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import type { Deferred } from "./util.ts";
16
import { deferred } from "./util.ts";
55✔
17
import type { QueuedIterator } from "./core.ts";
18
import type { CallbackFn, Dispatcher } from "./core.ts";
19
import { InvalidOperationError } from "./errors.ts";
55✔
20

21
export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
55✔
22
  inflight: number;
54✔
23
  processed: number;
2,583✔
24
  // this is updated by the protocol
25
  received: number;
2,583✔
26
  noIterator: boolean;
2,583✔
27
  iterClosed: Deferred<void | Error>;
2,583✔
28
  done: boolean;
2,583✔
29
  signal: Deferred<void>;
2,583✔
30
  yields: (T | CallbackFn)[];
2,583✔
31
  filtered: number;
2,583✔
32
  pendingFiltered: number;
2,583✔
33
  ctx?: unknown;
2,583✔
34
  _data?: unknown; //data is for use by extenders in any way they like
2,583✔
35
  err?: Error;
2,583✔
36
  time: number;
2,583✔
37
  profile: boolean;
2,583✔
38
  yielding: boolean;
2,583✔
39
  didBreak: boolean;
54✔
40

41
  constructor() {
54✔
42
    this.inflight = 0;
2,583✔
43
    this.filtered = 0;
2,583✔
44
    this.pendingFiltered = 0;
2,583✔
45
    this.processed = 0;
2,583✔
46
    this.received = 0;
2,583✔
47
    this.noIterator = false;
2,583✔
48
    this.done = false;
2,583✔
49
    this.signal = deferred<void>();
2,583✔
50
    this.yields = [];
2,583✔
51
    this.iterClosed = deferred<void | Error>();
2,583✔
52
    this.time = 0;
2,583✔
53
    this.yielding = false;
2,583✔
54
    this.didBreak = false;
2,583✔
55
    this.profile = false;
2,583✔
56
  }
2,583✔
57

58
  [Symbol.asyncIterator](): AsyncIterator<T> {
109✔
59
    return this.iterate();
1,052✔
60
  }
1,052✔
61

62
  push(v: T | CallbackFn): void {
54✔
63
    if (this.done) {
265,020✔
64
      return;
265,112✔
65
    }
265,112✔
66
    // if they `break` from a `for await`, any signaling that is pushed via
67
    // a function is not handled this can prevent closed promises from
68
    // resolving downstream.
69
    if (this.didBreak) {
7,580✔
70
      if (typeof v === "function") {
7,767✔
71
        const cb = v as CallbackFn;
7,767✔
72
        try {
7,767✔
73
          cb();
7,767✔
74
        } catch (_) {
×
75
          // ignored
76
        }
×
77
      }
7,767✔
78
      return;
7,767✔
79
    }
7,767✔
80
    if (typeof v === "function") {
265,020✔
81
      this.pendingFiltered++;
267,144✔
82
    }
267,144✔
83
    this.yields.push(v);
529,707✔
84
    this.signal.resolve();
529,707✔
85
  }
265,020✔
86

87
  async *iterate(): AsyncIterableIterator<T> {
54✔
88
    if (this.noIterator) {
795✔
89
      throw new InvalidOperationError(
798✔
90
        "iterator cannot be used when a callback is registered",
798✔
91
      );
92
    }
798✔
93
    if (this.yielding) {
172!
94
      throw new InvalidOperationError("iterator is already yielding");
173✔
95
    }
173✔
96
    this.yielding = true;
1,792✔
97
    try {
1,792✔
98
      while (true) {
1,052✔
99
        if (this.yields.length === 0) {
3,988✔
100
          await this.signal;
6,532✔
101
        }
8,112✔
102
        if (this.err) {
895!
103
          throw this.err;
902✔
104
        }
902✔
105
        const yields = this.yields;
6,559✔
106
        this.inflight = yields.length;
6,559✔
107
        this.yields = [];
6,559✔
108
        for (let i = 0; i < yields.length; i++) {
3,988✔
109
          if (typeof yields[i] === "function") {
267,889✔
110
            this.pendingFiltered--;
269,329✔
111
            const fn = yields[i] as CallbackFn;
269,329✔
112
            try {
269,329✔
113
              fn();
269,329✔
114
            } catch (err) {
267,923!
115
              // failed on the invocation - fail the iterator
116
              // so they know to fix the callback
117
              throw err;
267,929✔
118
            }
267,929✔
119
            // fn could have also set an error
120
            if (this.err) {
268,281!
121
              throw this.err;
269,581✔
122
            }
269,581✔
123
            continue;
271,955✔
124
          }
271,955✔
125

126
          this.processed++;
787,008✔
127
          this.inflight--;
787,008✔
128
          const start = this.profile ? Date.now() : 0;
305!
129
          yield yields[i] as T;
267,889✔
130
          this.time = this.profile ? Date.now() - start : 0;
305!
131
        }
267,889✔
132
        // yielding could have paused and microtask
133
        // could have added messages. Prevent allocations
134
        // if possible
135
        if (this.done) {
5,899✔
136
          break;
7,146✔
137
        } else if (this.yields.length === 0) {
4,515✔
138
          yields.length = 0;
8,267✔
139
          this.yields = yields;
8,267✔
140
          this.signal = deferred();
8,267✔
141
        }
8,267✔
142
      }
3,988✔
143
    } finally {
1,052✔
144
      // the iterator used break/return
145
      this.didBreak = true;
1,930✔
146
      this.stop();
1,930✔
147
    }
1,930✔
148
  }
1,052✔
149

150
  stop(err?: Error): void {
54✔
151
    if (this.done) {
2,799✔
152
      return;
3,888✔
153
    }
3,888✔
154
    this.err = err;
5,149✔
155
    this.done = true;
5,149✔
156
    this.signal.resolve();
5,149✔
157
    this.iterClosed.resolve(err);
5,149✔
158
  }
2,799✔
159

160
  getProcessed(): number {
52✔
161
    return this.noIterator ? this.received : this.processed;
111✔
162
  }
118✔
163

164
  getPending(): number {
51✔
165
    return this.yields.length + this.inflight - this.pendingFiltered;
206✔
166
  }
206✔
167

168
  getReceived(): number {
52✔
169
    return this.received - this.filtered;
221✔
170
  }
221✔
171
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc