• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13091454488

01 Feb 2025 06:48PM UTC coverage: 82.681% (-0.05%) from 82.727%
13091454488

push

github

web-flow
fix(jetstream): direct batch get missing `next_by_subj` filter (#201)

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2272 of 3100 branches covered (73.29%)

Branch coverage included in aggregate %.

9625 of 11289 relevant lines covered (85.26%)

787241.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.4
/core/src/queued_iterator.ts
1
/*
2
 * Copyright 2020-2022 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import type { Deferred } from "./util.ts";
16
import { deferred } from "./util.ts";
55✔
17
import type { QueuedIterator } from "./core.ts";
18
import type { CallbackFn, Dispatcher } from "./core.ts";
19
import { InvalidOperationError } from "./errors.ts";
55✔
20

21
export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
55✔
22
  inflight: number;
54✔
23
  processed: number;
2,264✔
24
  // this is updated by the protocol
25
  received: number;
2,264✔
26
  noIterator: boolean;
2,264✔
27
  iterClosed: Deferred<void | Error>;
2,264✔
28
  done: boolean;
2,264✔
29
  signal: Deferred<void>;
2,264✔
30
  yields: (T | CallbackFn)[];
2,264✔
31
  filtered: number;
2,264✔
32
  pendingFiltered: number;
2,264✔
33
  ctx?: unknown;
2,264✔
34
  _data?: unknown; //data is for use by extenders in any way they like
2,264✔
35
  err?: Error;
2,264✔
36
  time: number;
2,264✔
37
  profile: boolean;
2,264✔
38
  yielding: boolean;
2,264✔
39
  didBreak: boolean;
54✔
40

41
  constructor() {
54✔
42
    this.inflight = 0;
2,264✔
43
    this.filtered = 0;
2,264✔
44
    this.pendingFiltered = 0;
2,264✔
45
    this.processed = 0;
2,264✔
46
    this.received = 0;
2,264✔
47
    this.noIterator = false;
2,264✔
48
    this.done = false;
2,264✔
49
    this.signal = deferred<void>();
2,264✔
50
    this.yields = [];
2,264✔
51
    this.iterClosed = deferred<void | Error>();
2,264✔
52
    this.time = 0;
2,264✔
53
    this.yielding = false;
2,264✔
54
    this.didBreak = false;
2,264✔
55
    this.profile = false;
2,264✔
56
  }
2,264✔
57

58
  [Symbol.asyncIterator](): AsyncIterator<T> {
109✔
59
    return this.iterate();
882✔
60
  }
882✔
61

62
  push(v: T | CallbackFn): void {
54✔
63
    if (this.done) {
264,436✔
64
      return;
264,736✔
65
    }
264,736✔
66
    // if they `break` from a `for await`, any signaling that is pushed via
67
    // a function is not handled this can prevent closed promises from
68
    // resolving downstream.
69
    if (this.didBreak) {
7,071✔
70
      if (typeof v === "function") {
7,246✔
71
        const cb = v as CallbackFn;
7,246✔
72
        try {
7,246✔
73
          cb();
7,246✔
74
        } catch (_) {
×
75
          // ignored
76
        }
×
77
      }
7,246✔
78
      return;
7,246✔
79
    }
7,246✔
80
    if (typeof v === "function") {
264,436✔
81
      this.pendingFiltered++;
266,311✔
82
    }
266,311✔
83
    this.yields.push(v);
528,343✔
84
    this.signal.resolve();
528,343✔
85
  }
264,436✔
86

87
  async *iterate(): AsyncIterableIterator<T> {
54✔
88
    if (this.noIterator) {
690✔
89
      throw new InvalidOperationError(
693✔
90
        "iterator cannot be used when a callback is registered",
693✔
91
      );
92
    }
693✔
93
    if (this.yielding) {
169!
94
      throw new InvalidOperationError("iterator is already yielding");
170✔
95
    }
170✔
96
    this.yielding = true;
1,517✔
97
    try {
1,517✔
98
      while (true) {
882✔
99
        if (this.yields.length === 0) {
3,354✔
100
          await this.signal;
5,484✔
101
        }
6,828✔
102
        if (this.err) {
900!
103
          throw this.err;
907✔
104
        }
907✔
105
        const yields = this.yields;
5,527✔
106
        this.inflight = yields.length;
5,527✔
107
        this.yields = [];
5,527✔
108
        for (let i = 0; i < yields.length; i++) {
3,354✔
109
          if (typeof yields[i] === "function") {
266,412✔
110
            this.pendingFiltered--;
267,653✔
111
            const fn = yields[i] as CallbackFn;
267,653✔
112
            try {
267,653✔
113
              fn();
267,653✔
114
            } catch (err) {
266,523!
115
              // failed on the invocation - fail the iterator
116
              // so they know to fix the callback
117
              throw err;
266,529✔
118
            }
266,529✔
119
            // fn could have also set an error
120
            if (this.err) {
266,758!
121
              throw this.err;
266,783✔
122
            }
266,783✔
123
            continue;
268,825✔
124
          }
268,825✔
125

126
          this.processed++;
784,971✔
127
          this.inflight--;
784,971✔
128
          const start = this.profile ? Date.now() : 0;
182!
129
          yield yields[i] as T;
266,412✔
130
          this.time = this.profile ? Date.now() - start : 0;
182!
131
        }
266,412✔
132
        // yielding could have paused and microtask
133
        // could have added messages. Prevent allocations
134
        // if possible
135
        if (this.done) {
4,863✔
136
          break;
7,459✔
137
        } else if (this.yields.length === 0) {
6,885✔
138
          yields.length = 0;
10,059✔
139
          this.yields = yields;
10,059✔
140
          this.signal = deferred();
10,059✔
141
        }
10,059✔
142
      }
3,354✔
143
    } finally {
882✔
144
      // the iterator used break/return
145
      this.didBreak = true;
1,679✔
146
      this.stop();
1,679✔
147
    }
1,679✔
148
  }
882✔
149

150
  stop(err?: Error): void {
54✔
151
    if (this.done) {
2,324✔
152
      return;
3,242✔
153
    }
3,242✔
154
    this.err = err;
4,346✔
155
    this.done = true;
4,346✔
156
    this.signal.resolve();
4,346✔
157
    this.iterClosed.resolve(err);
4,346✔
158
  }
2,324✔
159

160
  getProcessed(): number {
52✔
161
    return this.noIterator ? this.received : this.processed;
111✔
162
  }
118✔
163

164
  getPending(): number {
51✔
165
    return this.yields.length + this.inflight - this.pendingFiltered;
206✔
166
  }
206✔
167

168
  getReceived(): number {
52✔
169
    return this.received - this.filtered;
203✔
170
  }
203✔
171
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc