• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14209026827

02 Apr 2025 12:47AM UTC coverage: 84.534% (-0.02%) from 84.558%
14209026827

push

github

web-flow
Fix conditional logic for retry mechanism in jsclient.ts (#254)

Updated the conditional statement to ensure proper grouping and evaluate the timeout and no-responders error check accurately. This fixes a potential issue where retries may not trigger as intended.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2356 of 3206 branches covered (73.49%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 1 file covered. (100.0%)

1592 existing lines in 26 files now uncovered.

10161 of 11601 relevant lines covered (87.59%)

767944.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.4
/core/src/queued_iterator.ts
1
/*
2
 * Copyright 2020-2022 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import type { Deferred } from "./util.ts";
16
import { deferred } from "./util.ts";
55✔
17
import type { QueuedIterator } from "./core.ts";
18
import type { CallbackFn, Dispatcher } from "./core.ts";
19
import { InvalidOperationError } from "./errors.ts";
55✔
20

21
export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
55✔
22
  inflight: number;
54✔
23
  processed: number;
2,472✔
24
  // this is updated by the protocol
25
  received: number;
2,472✔
26
  noIterator: boolean;
2,472✔
27
  iterClosed: Deferred<void | Error>;
2,472✔
28
  done: boolean;
2,472✔
29
  signal: Deferred<void>;
2,472✔
30
  yields: (T | CallbackFn)[];
2,472✔
31
  filtered: number;
2,472✔
32
  pendingFiltered: number;
2,472✔
33
  ctx?: unknown;
2,472✔
34
  _data?: unknown; //data is for use by extenders in any way they like
2,472✔
35
  err?: Error;
2,472✔
36
  time: number;
2,472✔
37
  profile: boolean;
2,472✔
38
  yielding: boolean;
2,472✔
39
  didBreak: boolean;
54✔
40

41
  constructor() {
54✔
42
    this.inflight = 0;
2,472✔
43
    this.filtered = 0;
2,472✔
44
    this.pendingFiltered = 0;
2,472✔
45
    this.processed = 0;
2,472✔
46
    this.received = 0;
2,472✔
47
    this.noIterator = false;
2,472✔
48
    this.done = false;
2,472✔
49
    this.signal = deferred<void>();
2,472✔
50
    this.yields = [];
2,472✔
51
    this.iterClosed = deferred<void | Error>();
2,472✔
52
    this.time = 0;
2,472✔
53
    this.yielding = false;
2,472✔
54
    this.didBreak = false;
2,472✔
55
    this.profile = false;
2,472✔
56
  }
2,472✔
57

58
  [Symbol.asyncIterator](): AsyncIterator<T> {
109✔
59
    return this.iterate();
980✔
60
  }
980✔
61

62
  push(v: T | CallbackFn): void {
54✔
63
    if (this.done) {
264,912✔
64
      return;
265,234✔
65
    }
265,234✔
66
    // if they `break` from a `for await`, any signaling that is pushed via
67
    // a function is not handled this can prevent closed promises from
68
    // resolving downstream.
69
    if (this.didBreak) {
7,555✔
70
      if (typeof v === "function") {
7,739✔
71
        const cb = v as CallbackFn;
7,739✔
72
        try {
7,739✔
73
          cb();
7,739✔
74
        } catch (_) {
×
75
          // ignored
76
        }
×
77
      }
7,739✔
78
      return;
7,739✔
79
    }
7,739✔
80
    if (typeof v === "function") {
264,912✔
81
      this.pendingFiltered++;
266,879✔
82
    }
266,879✔
83
    this.yields.push(v);
529,264✔
84
    this.signal.resolve();
529,264✔
85
  }
264,912✔
86

87
  async *iterate(): AsyncIterableIterator<T> {
54✔
88
    if (this.noIterator) {
760✔
89
      throw new InvalidOperationError(
763✔
90
        "iterator cannot be used when a callback is registered",
763✔
91
      );
92
    }
763✔
UNCOV
93
    if (this.yielding) {
169!
UNCOV
94
      throw new InvalidOperationError("iterator is already yielding");
170✔
UNCOV
95
    }
170✔
96
    this.yielding = true;
1,685✔
97
    try {
1,685✔
98
      while (true) {
980✔
99
        if (this.yields.length === 0) {
3,642✔
100
          await this.signal;
5,930✔
101
        }
7,410✔
102
        if (this.err) {
886!
103
          throw this.err;
893✔
104
        }
893✔
105
        const yields = this.yields;
5,977✔
106
        this.inflight = yields.length;
5,977✔
107
        this.yields = [];
5,977✔
108
        for (let i = 0; i < yields.length; i++) {
3,642✔
109
          if (typeof yields[i] === "function") {
267,125✔
110
            this.pendingFiltered--;
268,431✔
111
            const fn = yields[i] as CallbackFn;
268,431✔
112
            try {
268,431✔
113
              fn();
268,431✔
114
            } catch (err) {
267,246!
115
              // failed on the invocation - fail the iterator
116
              // so they know to fix the callback
117
              throw err;
267,252✔
118
            }
267,252✔
119
            // fn could have also set an error
120
            if (this.err) {
267,481!
121
              throw this.err;
267,515✔
122
            }
267,515✔
123
            continue;
269,659✔
124
          }
269,659✔
125

126
          this.processed++;
786,044✔
127
          this.inflight--;
786,044✔
128
          const start = this.profile ? Date.now() : 0;
182!
129
          yield yields[i] as T;
267,125✔
130
          this.time = this.profile ? Date.now() - start : 0;
182!
131
        }
267,125✔
132
        // yielding could have paused and microtask
133
        // could have added messages. Prevent allocations
134
        // if possible
135
        if (this.done) {
7,011✔
136
          break;
8,185✔
137
        } else if (this.yields.length === 0) {
5,851✔
138
          yields.length = 0;
9,205✔
139
          this.yields = yields;
9,205✔
140
          this.signal = deferred();
9,205✔
141
        }
9,205✔
142
      }
3,642✔
143
    } finally {
980✔
144
      // the iterator used break/return
145
      this.didBreak = true;
1,874✔
146
      this.stop();
1,874✔
147
    }
1,874✔
148
  }
980✔
149

150
  stop(err?: Error): void {
54✔
151
    if (this.done) {
2,602✔
152
      return;
3,614✔
153
    }
3,614✔
154
    this.err = err;
4,808✔
155
    this.done = true;
4,808✔
156
    this.signal.resolve();
4,808✔
157
    this.iterClosed.resolve(err);
4,808✔
158
  }
2,602✔
159

160
  getProcessed(): number {
52✔
161
    return this.noIterator ? this.received : this.processed;
112✔
162
  }
119✔
163

164
  getPending(): number {
51✔
165
    return this.yields.length + this.inflight - this.pendingFiltered;
206✔
166
  }
206✔
167

168
  getReceived(): number {
52✔
169
    return this.received - this.filtered;
203✔
170
  }
203✔
171
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc