• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.35
/core/src/queued_iterator.ts
1
/*
2
 * Copyright 2020-2022 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import type { Deferred } from "./util.ts";
16
import { deferred } from "./util.ts";
55✔
17
import type { QueuedIterator } from "./core.ts";
18
import type { CallbackFn, Dispatcher } from "./core.ts";
19
import { InvalidOperationError } from "./errors.ts";
55✔
20

21
export class QueuedIteratorImpl<T> implements QueuedIterator<T>, Dispatcher<T> {
55✔
UNCOV
22
  inflight: number;
54✔
UNCOV
23
  processed: number;
2,512✔
24
  // this is updated by the protocol
UNCOV
25
  received: number;
2,512✔
UNCOV
26
  noIterator: boolean;
2,512✔
UNCOV
27
  iterClosed: Deferred<void | Error>;
2,512✔
UNCOV
28
  done: boolean;
2,512✔
UNCOV
29
  signal: Deferred<void>;
2,512✔
UNCOV
30
  yields: (T | CallbackFn)[];
2,512✔
UNCOV
31
  filtered: number;
2,512✔
UNCOV
32
  pendingFiltered: number;
2,512✔
UNCOV
33
  ctx?: unknown;
2,512✔
UNCOV
34
  _data?: unknown; //data is for use by extenders in any way they like
2,512✔
UNCOV
35
  err?: Error;
2,512✔
UNCOV
36
  time: number;
2,512✔
UNCOV
37
  profile: boolean;
2,512✔
UNCOV
38
  yielding: boolean;
2,512✔
UNCOV
39
  didBreak: boolean;
54✔
40

UNCOV
41
  constructor() {
54✔
UNCOV
42
    this.inflight = 0;
2,512✔
UNCOV
43
    this.filtered = 0;
2,512✔
UNCOV
44
    this.pendingFiltered = 0;
2,512✔
UNCOV
45
    this.processed = 0;
2,512✔
UNCOV
46
    this.received = 0;
2,512✔
UNCOV
47
    this.noIterator = false;
2,512✔
UNCOV
48
    this.done = false;
2,512✔
UNCOV
49
    this.signal = deferred<void>();
2,512✔
UNCOV
50
    this.yields = [];
2,512✔
UNCOV
51
    this.iterClosed = deferred<void | Error>();
2,512✔
UNCOV
52
    this.time = 0;
2,512✔
UNCOV
53
    this.yielding = false;
2,512✔
UNCOV
54
    this.didBreak = false;
2,512✔
UNCOV
55
    this.profile = false;
2,512✔
UNCOV
56
  }
2,512✔
57

58
  [Symbol.asyncIterator](): AsyncIterator<T> {
109✔
UNCOV
59
    return this.iterate();
1,018✔
UNCOV
60
  }
1,018✔
61

UNCOV
62
  push(v: T | CallbackFn): void {
54✔
UNCOV
63
    if (this.done) {
264,924✔
UNCOV
64
      return;
265,246✔
UNCOV
65
    }
265,246✔
66
    // if they `break` from a `for await`, any signaling that is pushed via
67
    // a function is not handled this can prevent closed promises from
68
    // resolving downstream.
UNCOV
69
    if (this.didBreak) {
7,559✔
UNCOV
70
      if (typeof v === "function") {
7,743✔
UNCOV
71
        const cb = v as CallbackFn;
7,743✔
UNCOV
72
        try {
7,743✔
UNCOV
73
          cb();
7,743✔
74
        } catch (_) {
×
75
          // ignored
76
        }
×
UNCOV
77
      }
7,743✔
UNCOV
78
      return;
7,743✔
UNCOV
79
    }
7,743✔
UNCOV
80
    if (typeof v === "function") {
264,924✔
UNCOV
81
      this.pendingFiltered++;
266,895✔
UNCOV
82
    }
266,895✔
UNCOV
83
    this.yields.push(v);
529,288✔
UNCOV
84
    this.signal.resolve();
529,288✔
UNCOV
85
  }
264,924✔
86

UNCOV
87
  async *iterate(): AsyncIterableIterator<T> {
54✔
UNCOV
88
    if (this.noIterator) {
768✔
UNCOV
89
      throw new InvalidOperationError(
771✔
UNCOV
90
        "iterator cannot be used when a callback is registered",
771✔
91
      );
UNCOV
92
    }
771✔
93
    if (this.yielding) {
169!
94
      throw new InvalidOperationError("iterator is already yielding");
170✔
95
    }
170✔
UNCOV
96
    this.yielding = true;
1,731✔
UNCOV
97
    try {
1,731✔
UNCOV
98
      while (true) {
1,018✔
UNCOV
99
        if (this.yields.length === 0) {
3,702✔
UNCOV
100
          await this.signal;
6,015✔
UNCOV
101
        }
7,487✔
UNCOV
102
        if (this.err) {
887!
UNCOV
103
          throw this.err;
894✔
UNCOV
104
        }
894✔
UNCOV
105
        const yields = this.yields;
6,027✔
UNCOV
106
        this.inflight = yields.length;
6,027✔
UNCOV
107
        this.yields = [];
6,027✔
UNCOV
108
        for (let i = 0; i < yields.length; i++) {
3,702✔
UNCOV
109
          if (typeof yields[i] === "function") {
267,187✔
UNCOV
110
            this.pendingFiltered--;
268,494✔
UNCOV
111
            const fn = yields[i] as CallbackFn;
268,494✔
UNCOV
112
            try {
268,494✔
UNCOV
113
              fn();
268,494✔
UNCOV
114
            } catch (err) {
267,245!
115
              // failed on the invocation - fail the iterator
116
              // so they know to fix the callback
UNCOV
117
              throw err;
267,251✔
UNCOV
118
            }
267,251✔
119
            // fn could have also set an error
UNCOV
120
            if (this.err) {
267,480!
UNCOV
121
              throw this.err;
267,514✔
UNCOV
122
            }
267,514✔
UNCOV
123
            continue;
269,722✔
UNCOV
124
          }
269,722✔
125

UNCOV
126
          this.processed++;
786,107✔
UNCOV
127
          this.inflight--;
786,107✔
UNCOV
128
          const start = this.profile ? Date.now() : 0;
182!
UNCOV
129
          yield yields[i] as T;
267,187✔
UNCOV
130
          this.time = this.profile ? Date.now() - start : 0;
182!
UNCOV
131
        }
267,187✔
132
        // yielding could have paused and microtask
133
        // could have added messages. Prevent allocations
134
        // if possible
UNCOV
135
        if (this.done) {
5,374✔
UNCOV
136
          break;
6,587✔
UNCOV
137
        } else if (this.yields.length === 0) {
4,226✔
UNCOV
138
          yields.length = 0;
7,551✔
UNCOV
139
          this.yields = yields;
7,551✔
UNCOV
140
          this.signal = deferred();
7,551✔
UNCOV
141
        }
7,551✔
UNCOV
142
      }
3,702✔
UNCOV
143
    } finally {
1,018✔
144
      // the iterator used break/return
UNCOV
145
      this.didBreak = true;
1,920✔
UNCOV
146
      this.stop();
1,920✔
UNCOV
147
    }
1,920✔
UNCOV
148
  }
1,018✔
149

UNCOV
150
  stop(err?: Error): void {
54✔
UNCOV
151
    if (this.done) {
2,693✔
UNCOV
152
      return;
3,757✔
UNCOV
153
    }
3,757✔
UNCOV
154
    this.err = err;
4,938✔
UNCOV
155
    this.done = true;
4,938✔
UNCOV
156
    this.signal.resolve();
4,938✔
UNCOV
157
    this.iterClosed.resolve(err);
4,938✔
UNCOV
158
  }
2,693✔
159

UNCOV
160
  getProcessed(): number {
52✔
UNCOV
161
    return this.noIterator ? this.received : this.processed;
112✔
UNCOV
162
  }
119✔
163

UNCOV
164
  getPending(): number {
51✔
UNCOV
165
    return this.yields.length + this.inflight - this.pendingFiltered;
206✔
UNCOV
166
  }
206✔
167

UNCOV
168
  getReceived(): number {
52✔
UNCOV
169
    return this.received - this.filtered;
203✔
UNCOV
170
  }
203✔
171
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc