• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 16152724641

08 Jul 2025 07:36PM UTC coverage: 70.4% (-14.3%) from 84.715%
16152724641

push

github

aricart
chore: bump versions across modules and update dependencies

- Updated versions for NATS modules, including core, JetStream, KV, object store, services, and transport.
- Upgraded `@types/node`, `shx`, `typescript`, and `js-sha256` dependencies.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

1496 of 2225 branches covered (67.24%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

248 existing lines in 7 files now uncovered.

7882 of 11096 relevant lines covered (71.03%)

236642.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.19
/jetstream/src/pushconsumer.ts
1
/*
2
 * Copyright 2024 Synadia Communications, Inc
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import { toJsMsg } from "./jsmsg.ts";
20✔
17
import type { JsMsg } from "./jsmsg.ts";
18
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
20✔
19
import type { ConsumerConfig, ConsumerInfo } from "./jsapi_types.ts";
20
import type { ConsumerNotification } from "./types.ts";
21

22
import type {
23
  ConsumerAPI,
24
  ConsumerCallbackFn,
25
  ConsumerMessages,
26
  PushConsumer,
27
  PushConsumerOptions,
28
} from "./types.ts";
29
import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
30
import {
20✔
31
  backoff,
20✔
32
  createInbox,
20✔
33
  delay,
20✔
34
  errors,
20✔
35
  IdleHeartbeatMonitor,
20✔
36
  millis,
20✔
37
  nanos,
20✔
38
  nuid,
20✔
39
  QueuedIteratorImpl,
20✔
40
} from "@nats-io/nats-core/internal";
20✔
41
import type {
42
  CallbackFn,
43
  Delay,
44
  QueuedIterator,
45
  Status,
46
  Subscription,
47
} from "@nats-io/nats-core/internal";
48
import { JetStreamStatus } from "./jserrors.ts";
20✔
49

50
export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
20✔
51
  implements ConsumerMessages {
52
  consumer: PushConsumerImpl;
20✔
53
  sub!: Subscription;
130✔
54
  monitor: IdleHeartbeatMonitor | null;
130✔
55
  listeners: QueuedIterator<ConsumerNotification>[];
130✔
56
  abortOnMissingResource: boolean;
130✔
57
  callback: ConsumerCallbackFn | null;
130✔
58
  ordered: boolean;
130✔
59
  cursor!: { stream_seq: number; deliver_seq: number };
130✔
60
  namePrefix: string | null;
130✔
61
  deliverPrefix: string | null;
130✔
62
  serial: number;
130✔
63
  createFails!: number;
130✔
64
  statusIterator!: QueuedIteratorImpl<Status>;
130✔
65
  cancelables: Delay[];
20✔
66

67
  constructor(
20✔
68
    c: PushConsumerImpl,
20✔
69
    userOptions: Partial<PushConsumerOptions> = {},
20✔
70
    internalOptions: Partial<PushConsumerInternalOptions> = {},
20✔
71
  ) {
20✔
72
    super();
130✔
73
    this.consumer = c;
130✔
74
    this.monitor = null;
130✔
75
    this.listeners = [];
130✔
76
    this.cancelables = [];
130✔
77
    this.abortOnMissingResource =
130✔
78
      userOptions.abort_on_missing_resource === true;
130✔
79
    this.callback = userOptions.callback || null;
73✔
80
    this.noIterator = this.callback !== null;
130✔
81
    this.namePrefix = null;
130✔
82
    this.deliverPrefix = null;
130✔
83
    this.ordered = internalOptions.ordered === true;
130✔
84
    this.serial = 1;
130✔
85
    if (this.ordered) {
130✔
UNCOV
86
      this.namePrefix = internalOptions.name_prefix ?? `oc_${nuid.next()}`;
×
87
      // this already should be set
UNCOV
88
      this.deliverPrefix = internalOptions.deliver_prefix ??
×
89
        createInbox(this.consumer.api.nc.options.inboxPrefix);
×
90
      this.cursor = { stream_seq: 1, deliver_seq: 0 };
552✔
91
      const startSeq = c._info.config.opt_start_seq || 0;
138✔
UNCOV
92
      this.cursor.stream_seq = startSeq > 0 ? startSeq - 1 : 0;
117!
93
      this.createFails = 0;
138✔
94
    }
138✔
95

96
    this.start();
130✔
97
  }
130✔
98

UNCOV
99
  reset() {
18✔
100
    const { name } = this.consumer._info?.config;
20✔
101
    if (name) {
20✔
102
      this.consumer.api.delete(this.consumer.stream, name)
×
103
        .catch(() => {
×
104
          // ignored
UNCOV
105
        });
×
106
    }
20✔
107

UNCOV
108
    const config = this.getConsumerOpts();
20✔
109
    // reset delivery seq
UNCOV
110
    this.cursor.deliver_seq = 0;
20✔
111
    // if they do a consumer info, they get the new one.
UNCOV
112
    this.consumer.name = config.name!;
20✔
113
    // sync the serial - if they stop and restart, it will go forward
UNCOV
114
    this.consumer.serial = this.serial;
20✔
115
    // remap the subscription
UNCOV
116
    this.consumer.api.nc._resub(this.sub, config.deliver_subject!);
20✔
117
    // create the consumer
UNCOV
118
    this.consumer.api.add(
20✔
119
      this.consumer.stream,
20✔
120
      config,
20✔
121
    ).then((ci) => {
20✔
122
      this.createFails = 0;
22✔
123
      this.consumer._info = ci;
22✔
124
      this.notify({ type: "ordered_consumer_recreated", name: ci.name });
88✔
125
    }).catch((err) => {
×
126
      this.createFails++;
×
127
      if (err.message === "stream not found") {
×
128
        this.notify({
×
129
          type: "stream_not_found",
×
130
          name: this.consumer.stream,
×
131
          consumerCreateFails: this.createFails,
×
132
        });
×
133
        if (this.abortOnMissingResource) {
×
134
          this.stop(err);
×
135
          return;
×
136
        }
×
137
      }
×
138
      // we have attempted to create 30 times, never succeeded
UNCOV
139
      if (this.createFails >= 30 && this.received === 0) {
×
140
        this.stop(err);
×
141
      }
×
142
      const bo = backoff();
×
143

UNCOV
144
      const c = delay(bo.backoff(this.createFails));
×
145
      c.then(() => {
×
146
        if (!this.done) {
×
147
          this.reset();
×
148
        }
×
149
      }).catch(() => {})
×
150
        .finally(() => {
×
151
          const idx = this.cancelables.indexOf(c);
×
152
          if (idx !== -1) {
×
153
            this.cancelables = this.cancelables.splice(idx, idx);
×
154
          }
×
155
        });
×
156
      this.cancelables.push(c);
×
157
    });
×
158
  }
20✔
159

UNCOV
160
  getConsumerOpts(): ConsumerConfig {
18✔
161
    const src = Object.assign({}, this.consumer._info.config);
20✔
162
    this.serial++;
20✔
163
    const name = `${this.namePrefix}_${this.serial}`;
20✔
164

UNCOV
165
    return Object.assign(src, {
20✔
166
      name,
20✔
167
      deliver_policy: DeliverPolicy.StartSequence,
20✔
168
      opt_start_seq: this.cursor.stream_seq + 1,
20✔
169
      ack_policy: AckPolicy.None,
20✔
170
      inactive_threshold: nanos(5 * 60 * 1000),
20✔
171
      num_replicas: 1,
20✔
172
      flow_control: true,
20✔
173
      idle_heartbeat: nanos(30 * 1000),
20✔
174
      deliver_subject: `${this.deliverPrefix}.${this.serial}`,
20✔
175
    });
20✔
176
  }
20✔
177

178
  closed(): Promise<void | Error> {
20✔
179
    return this.iterClosed;
317✔
180
  }
317✔
UNCOV
181
  close(): Promise<void | Error> {
18✔
182
    this.stop();
23✔
183
    return this.iterClosed;
23✔
184
  }
23✔
185

186
  override stop(err?: Error) {
20✔
187
    if (this.done) {
233✔
188
      return;
316✔
189
    }
316✔
190
    this.statusIterator?.stop();
233✔
191
    this.monitor?.cancel();
233✔
192
    this.monitor = null;
233✔
193
    // if we have delays, stop them
UNCOV
194
    this.cancelables.forEach((c) => {
×
195
      c.cancel();
×
196
    });
×
197
    Promise.all(this.cancelables)
233✔
198
      .then(() => {
233✔
199
        this.cancelables = [];
363✔
UNCOV
200
      })
×
201
      .catch(() => {})
×
202
      .finally(() => {
×
203
        this._push(() => {
363✔
204
          super.stop(err);
489✔
205
          this.listeners.forEach((n) => {
489✔
206
            n.stop();
539✔
207
          });
489✔
208
        });
363✔
209
      });
233✔
210
  }
233✔
211

212
  _push(r: JsMsg | CallbackFn) {
20✔
213
    if (!this.callback) {
1,580✔
214
      super.push(r);
3,048✔
215
    } else {
1,580✔
216
      const fn = typeof r === "function" ? r as CallbackFn : null;
2,043✔
217
      try {
2,043✔
218
        if (!fn) {
2,043✔
219
          const m = r as JsMsg;
2,331✔
220
          this.received++;
2,331✔
221
          this.callback(m);
2,331✔
222
          this.processed++;
2,331✔
223
        } else {
2,043✔
224
          fn();
2,217✔
225
        }
2,217✔
UNCOV
226
      } catch (err) {
×
227
        this.stop(err as Error);
×
228
      }
×
229
    }
2,043✔
230
  }
1,950✔
231
  status(): AsyncIterable<ConsumerNotification> {
20✔
232
    const iter = new QueuedIteratorImpl<ConsumerNotification>();
63✔
233
    this.listeners.push(iter);
63✔
234
    return iter;
63✔
235
  }
63✔
236

237
  start(): void {
20✔
238
    const {
130✔
239
      deliver_subject: subject,
130✔
240
      deliver_group: queue,
130✔
241
      idle_heartbeat: hbNanos,
130✔
242
    } = this.consumer._info.config;
130✔
UNCOV
243
    if (!subject) {
×
244
      // this shouldn't happen - the push consumer should be validated
UNCOV
245
      throw new Error("bad consumer info");
×
246
    }
×
247

248
    if (hbNanos) {
130✔
249
      const ms = millis(hbNanos);
147✔
UNCOV
250
      this.monitor = new IdleHeartbeatMonitor(
69✔
251
        ms,
69✔
252
        (count): boolean => {
69✔
253
          this.notify({ type: "heartbeats_missed", count });
292✔
254
          if (this.ordered) {
×
255
            this.reset();
×
256
          }
×
257
          return false;
73✔
258
        },
69✔
259
        { maxOut: 2 },
363✔
260
      );
261

262
      (async () => {
147✔
263
        this.statusIterator = this.consumer.api.nc
240✔
264
          .status() as QueuedIteratorImpl<Status>;
240✔
UNCOV
265
        for await (const s of this.statusIterator) {
199✔
266
          switch (s.type) {
215✔
267
            case "disconnect":
128!
268
              this.monitor?.cancel();
129✔
269
              break;
129✔
270
            case "reconnect":
128!
271
              this.monitor?.restart();
129✔
272
              break;
129✔
273
            default:
215✔
274
              // ignored
UNCOV
275
          }
215✔
276
        }
215✔
277
      })();
147✔
278
    }
147✔
279

280
    this.sub = this.consumer.api.nc.subscribe(subject, {
130✔
281
      queue,
130✔
282
      callback: (err, msg) => {
130✔
UNCOV
283
        if (err) {
×
284
          this.stop(err);
×
285
          return;
×
286
        }
×
287
        this.monitor?.work();
1,827✔
288

289
        // need to make sure to catch all protocol messages even
UNCOV
290
        const isProtocol = this.ordered
1,333✔
291
          ? msg.subject.indexOf(this?.deliverPrefix!) === 0
1,333!
292
          : msg.subject === subject;
1,333✔
293

294
        if (isProtocol) {
1,525✔
UNCOV
295
          if (msg.subject !== this.sub.getSubject()) {
×
296
            // this is a stale message - was not sent to the current inbox
UNCOV
297
            return;
×
298
          }
×
299

300
          const status = new JetStreamStatus(msg);
1,536✔
301
          if (status.isFlowControlRequest()) {
1,536✔
302
            this._push(() => {
1,545✔
303
              msg.respond();
1,550✔
304
              this.notify({ type: "flow_control" });
4,650✔
305
            });
1,545✔
306
            return;
1,545✔
307
          }
1,545!
308

UNCOV
309
          if (status.isIdleHeartbeat()) {
1,347✔
310
            const lastConsumerSequence = parseInt(
×
311
              msg.headers?.get("Nats-Last-Consumer") || "0",
×
312
            );
UNCOV
313
            const lastStreamSequence = parseInt(
×
314
              msg.headers?.get("Nats-Last-Stream") ?? "0",
×
315
            );
UNCOV
316
            this.notify({
1,347✔
317
              type: "heartbeat",
1,347✔
318
              lastStreamSequence,
1,347✔
319
              lastConsumerSequence,
1,347✔
320
            });
1,347✔
321
            return;
1,347✔
322
          }
1,347!
323

UNCOV
324
          const code = status.code;
×
325
          const description = status.description;
×
326

UNCOV
327
          if (status.isConsumerDeleted()) {
×
328
            this.notify({ type: "consumer_deleted", code, description });
×
329
          }
×
330
          if (this.abortOnMissingResource) {
×
331
            this._push(() => {
×
332
              this.stop(status.toError());
×
333
            });
×
334
            return;
×
335
          }
×
336
        } else {
1,525✔
337
          const m = toJsMsg(msg);
3,268✔
338
          if (this.ordered) {
3,268✔
339
            const dseq = m.info.deliverySequence;
3,285✔
UNCOV
340
            if (dseq !== this.cursor.deliver_seq + 1) {
2,622!
341
              this.reset();
2,624✔
342
              return;
2,624✔
343
            }
2,624✔
344
            this.cursor.deliver_seq = dseq;
3,300✔
345
            this.cursor.stream_seq = m.info.streamSequence;
3,300✔
346
          }
3,300✔
347
          this._push(m);
4,538✔
348
        }
4,538✔
349
      },
1,827✔
350
    });
130✔
351

352
    this.sub.closed.then(() => {
130✔
353
      // for ordered consumer we cannot break the iterator
354
      this._push(() => {
240✔
355
        this.stop();
324✔
356
      });
240✔
357
    });
130✔
358

359
    this.closed().then(() => {
130✔
360
      this.sub?.unsubscribe();
239✔
361
    });
130✔
362
  }
130✔
363

364
  notify(n: ConsumerNotification) {
19✔
UNCOV
365
    if (this.listeners.length > 0) {
33!
366
      (() => {
45✔
367
        this.listeners.forEach((l) => {
57✔
368
          const qi = l as QueuedIteratorImpl<ConsumerNotification>;
69✔
369
          if (!qi.done) {
69✔
370
            qi.push(n);
69✔
371
          }
69✔
372
        });
57✔
373
      })();
45✔
374
    }
45✔
375
  }
35✔
376
}
20✔
377

378
export type PushConsumerInternalOptions = PushConsumerOptions & {
379
  bound: boolean;
380
  ordered: boolean;
381
  name_prefix: string;
382
  deliver_prefix: string;
383
};
384

385
export class PushConsumerImpl implements PushConsumer {
20✔
386
  api: ConsumerAPIImpl;
20✔
387
  _info: ConsumerInfo;
137✔
388
  stream: string;
137✔
389
  name: string;
137✔
390
  bound: boolean;
137✔
391
  ordered: boolean;
137✔
392
  started: boolean;
137✔
393
  serial: number;
137✔
394
  opts: Partial<PushConsumerInternalOptions>;
20✔
395

396
  constructor(
20✔
397
    api: ConsumerAPI,
20✔
398
    info: ConsumerInfo,
20✔
399
    opts: Partial<PushConsumerInternalOptions> = {},
20✔
400
  ) {
20✔
401
    this.api = api as ConsumerAPIImpl;
137✔
402
    this._info = info;
137✔
403
    this.stream = info.stream_name;
137✔
404
    this.name = info.name;
137✔
405
    this.bound = opts.bound === true;
137✔
406
    this.started = false;
137✔
407
    this.opts = opts;
137✔
408
    this.serial = 0;
137✔
UNCOV
409
    this.ordered = opts.ordered || false;
57!
410

411
    if (this.ordered) {
137✔
412
      this.serial = 1;
148✔
413
    }
148✔
414
  }
137✔
415

416
  consume(
20✔
417
    userOptions: Partial<PushConsumerOptions> = {},
20✔
418
  ): Promise<ConsumerMessages> {
20✔
419
    userOptions = { ...userOptions };
396✔
UNCOV
420
    if (this.started) {
54!
421
      return Promise.reject(
55✔
422
        new errors.InvalidOperationError("consumer already started"),
55✔
423
      );
UNCOV
424
    }
55✔
425

UNCOV
426
    if (!this._info.config.deliver_subject) {
×
427
      return Promise.reject(
×
428
        new Error("deliver_subject is not set, not a push consumer"),
×
429
      );
UNCOV
430
    }
✔
431
    if (!this._info.config.deliver_group && this._info.push_bound) {
54!
432
      return Promise.reject(
55✔
433
        new errors.InvalidOperationError("consumer is already bound"),
55✔
434
      );
UNCOV
435
    }
55✔
436
    const v = new PushConsumerMessagesImpl(this, userOptions, this.opts);
166✔
437
    this.started = true;
166✔
438
    v.closed().then(() => {
166✔
439
      this.started = false;
275✔
440
    });
166✔
441
    return Promise.resolve(v);
166✔
442
  }
132✔
443

UNCOV
444
  delete(): Promise<boolean> {
18✔
445
    if (this.bound) {
21✔
446
      return Promise.reject(
22✔
447
        new errors.InvalidOperationError("bound consumers cannot delete"),
22✔
448
      );
UNCOV
449
    }
22✔
450
    const { stream_name, name } = this._info;
23✔
451
    return this.api.delete(stream_name, name);
23✔
452
  }
21✔
453

454
  async info(cached?: boolean): Promise<ConsumerInfo> {
20✔
UNCOV
455
    if (this.bound) {
37!
456
      return Promise.reject(
38✔
457
        new errors.InvalidOperationError("bound consumers cannot info"),
38✔
458
      );
UNCOV
459
    }
38✔
460
    if (cached) {
110✔
461
      return Promise.resolve(this._info);
162✔
462
    }
162!
463
    // FIXME: this can possibly return a stale ci if this is an ordered
464
    //   consumer, and the consumer reset while we awaited the info...
UNCOV
465
    const info = await this.api.info(this.stream, this.name);
138✔
466
    this._info = info;
136✔
467
    return info;
136✔
468
  }
99✔
469

UNCOV
470
  isPullConsumer(): boolean {
18✔
471
    return false;
19✔
472
  }
19✔
473

UNCOV
474
  isPushConsumer(): boolean {
18✔
475
    return true;
19✔
476
  }
19✔
477
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc