• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13041776921

29 Jan 2025 10:36PM UTC coverage: 82.727% (+0.02%) from 82.71%
13041776921

push

github

web-flow
fix: typo in the documentation of the list() method of the Objm class (#198)

2271 of 3098 branches covered (73.31%)

Branch coverage included in aggregate %.

9631 of 11289 relevant lines covered (85.31%)

787245.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.03
/jetstream/src/pushconsumer.ts
1
/*
2
 * Copyright 2024 Synadia Communications, Inc
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import { toJsMsg } from "./jsmsg.ts";
20✔
17
import type { JsMsg } from "./jsmsg.ts";
18
import { AckPolicy, DeliverPolicy } from "./jsapi_types.ts";
20✔
19
import type { ConsumerConfig, ConsumerInfo } from "./jsapi_types.ts";
20
import type { ConsumerNotification } from "./types.ts";
21

22
import type {
23
  ConsumerAPI,
24
  ConsumerCallbackFn,
25
  ConsumerMessages,
26
  PushConsumer,
27
  PushConsumerOptions,
28
} from "./types.ts";
29
import type { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
30
import {
20✔
31
  backoff,
20✔
32
  createInbox,
20✔
33
  delay,
20✔
34
  errors,
20✔
35
  IdleHeartbeatMonitor,
20✔
36
  millis,
20✔
37
  nanos,
20✔
38
  nuid,
20✔
39
  QueuedIteratorImpl,
20✔
40
} from "@nats-io/nats-core/internal";
20✔
41
import type {
42
  CallbackFn,
43
  Delay,
44
  QueuedIterator,
45
  Status,
46
  Subscription,
47
  SubscriptionImpl,
48
} from "@nats-io/nats-core/internal";
49
import { JetStreamStatus } from "./jserrors.ts";
20✔
50

51
export class PushConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
20✔
52
  implements ConsumerMessages {
53
  consumer: PushConsumerImpl;
20✔
54
  sub!: Subscription;
128✔
55
  monitor: IdleHeartbeatMonitor | null;
128✔
56
  listeners: QueuedIterator<ConsumerNotification>[];
128✔
57
  abortOnMissingResource: boolean;
128✔
58
  callback: ConsumerCallbackFn | null;
128✔
59
  ordered: boolean;
128✔
60
  cursor!: { stream_seq: number; deliver_seq: number };
128✔
61
  namePrefix: string | null;
128✔
62
  deliverPrefix: string | null;
128✔
63
  serial: number;
128✔
64
  createFails!: number;
128✔
65
  statusIterator!: QueuedIteratorImpl<Status>;
128✔
66
  cancelables: Delay[];
20✔
67

68
  constructor(
20✔
69
    c: PushConsumerImpl,
20✔
70
    userOptions: Partial<PushConsumerOptions> = {},
20✔
71
    internalOptions: Partial<PushConsumerInternalOptions> = {},
20✔
72
  ) {
20✔
73
    super();
128✔
74
    this.consumer = c;
128✔
75
    this.monitor = null;
128✔
76
    this.listeners = [];
128✔
77
    this.cancelables = [];
128✔
78
    this.abortOnMissingResource =
128✔
79
      userOptions.abort_on_missing_resource === true;
128✔
80
    this.callback = userOptions.callback || null;
73✔
81
    this.noIterator = this.callback !== null;
128✔
82
    this.namePrefix = null;
128✔
83
    this.deliverPrefix = null;
128✔
84
    this.ordered = internalOptions.ordered === true;
128✔
85
    this.serial = 1;
128✔
86
    if (this.ordered) {
128✔
87
      this.namePrefix = internalOptions.name_prefix ?? `oc_${nuid.next()}`;
×
88
      // this already should be set
89
      this.deliverPrefix = internalOptions.deliver_prefix ??
×
90
        createInbox(this.consumer.api.nc.options.inboxPrefix);
×
91
      this.cursor = { stream_seq: 1, deliver_seq: 0 };
544✔
92
      const startSeq = c._info.config.opt_start_seq || 0;
136✔
93
      this.cursor.stream_seq = startSeq > 0 ? startSeq - 1 : 0;
115!
94
      this.createFails = 0;
136✔
95
    }
136✔
96

97
    this.start();
128✔
98
  }
128✔
99

100
  reset() {
18✔
101
    const { name } = this.consumer._info?.config;
20✔
102
    if (name) {
20✔
103
      this.consumer.api.delete(this.consumer.stream, name)
20✔
104
        .catch(() => {
20✔
105
          // ignored
106
        });
20✔
107
    }
20✔
108

109
    const config = this.getConsumerOpts();
20✔
110
    // reset delivery seq
111
    this.cursor.deliver_seq = 0;
20✔
112
    // if they do a consumer info, they get the new one.
113
    this.consumer.name = config.name!;
20✔
114
    // sync the serial - if they stop and restart, it will go forward
115
    this.consumer.serial = this.serial;
20✔
116
    // remap the subscription
117
    this.consumer.api.nc._resub(this.sub, config.deliver_subject!);
20✔
118
    // create the consumer
119
    this.consumer.api.add(
20✔
120
      this.consumer.stream,
20✔
121
      config,
20✔
122
    ).then((ci) => {
20✔
123
      this.createFails = 0;
22✔
124
      this.consumer._info = ci;
22✔
125
      this.notify({ type: "ordered_consumer_recreated", name: ci.name });
88✔
126
    }).catch((err) => {
×
127
      this.createFails++;
×
128
      if (err.message === "stream not found") {
×
129
        this.notify({
×
130
          type: "stream_not_found",
×
131
          name: this.consumer.stream,
×
132
          consumerCreateFails: this.createFails,
×
133
        });
×
134
        if (this.abortOnMissingResource) {
×
135
          this.stop(err);
×
136
          return;
×
137
        }
×
138
      }
×
139
      // we have attempted to create 30 times, never succeeded
140
      if (this.createFails >= 30 && this.received === 0) {
×
141
        this.stop(err);
×
142
      }
×
143
      const bo = backoff();
×
144

145
      const c = delay(bo.backoff(this.createFails));
×
146
      c.then(() => {
×
147
        if (!this.done) {
×
148
          this.reset();
×
149
        }
×
150
      }).catch(() => {})
×
151
        .finally(() => {
×
152
          const idx = this.cancelables.indexOf(c);
×
153
          if (idx !== -1) {
×
154
            this.cancelables = this.cancelables.splice(idx, idx);
×
155
          }
×
156
        });
×
157
      this.cancelables.push(c);
×
158
    });
×
159
  }
20✔
160

161
  getConsumerOpts(): ConsumerConfig {
18✔
162
    const src = Object.assign({}, this.consumer._info.config);
20✔
163
    this.serial++;
20✔
164
    const name = `${this.namePrefix}_${this.serial}`;
20✔
165

166
    return Object.assign(src, {
20✔
167
      name,
20✔
168
      deliver_policy: DeliverPolicy.StartSequence,
20✔
169
      opt_start_seq: this.cursor.stream_seq + 1,
20✔
170
      ack_policy: AckPolicy.None,
20✔
171
      inactive_threshold: nanos(5 * 60 * 1000),
20✔
172
      num_replicas: 1,
20✔
173
      flow_control: true,
20✔
174
      idle_heartbeat: nanos(30 * 1000),
20✔
175
      deliver_subject: `${this.deliverPrefix}.${this.serial}`,
20✔
176
    });
20✔
177
  }
20✔
178

179
  closed(): Promise<void | Error> {
20✔
180
    return this.iterClosed;
311✔
181
  }
311✔
182
  close(): Promise<void | Error> {
18✔
183
    this.stop();
23✔
184
    return this.iterClosed;
23✔
185
  }
23✔
186

187
  override stop(err?: Error) {
20✔
188
    if (this.done) {
229✔
189
      return;
310✔
190
    }
310✔
191
    this.statusIterator?.stop();
189!
192
    this.monitor?.cancel();
189!
193
    this.monitor = null;
229✔
194
    // if we have delays, stop them
195
    this.cancelables.forEach((c) => {
×
196
      c.cancel();
×
197
    });
×
198
    Promise.all(this.cancelables)
229✔
199
      .then(() => {
229✔
200
        this.cancelables = [];
357✔
201
      })
×
202
      .catch(() => {})
×
203
      .finally(() => {
×
204
        this._push(() => {
357✔
205
          super.stop(err);
481✔
206
          this.listeners.forEach((n) => {
137✔
207
            n.stop();
142✔
208
          });
137✔
209
        });
357✔
210
      });
229✔
211
  }
229✔
212

213
  _push(r: JsMsg | CallbackFn) {
20✔
214
    if (!this.callback) {
1,578✔
215
      super.push(r);
3,044✔
216
    } else {
1,578✔
217
      const fn = typeof r === "function" ? r as CallbackFn : null;
2,030✔
218
      try {
2,030✔
219
        if (!fn) {
2,030✔
220
          const m = r as JsMsg;
2,311✔
221
          this.received++;
2,311✔
222
          this.callback(m);
2,311✔
223
          this.processed++;
2,311✔
224
        } else {
2,030✔
225
          fn();
2,200✔
226
        }
2,200✔
227
      } catch (err) {
×
228
        this.stop(err as Error);
×
229
      }
×
230
    }
2,030✔
231
  }
1,937✔
232
  status(): AsyncIterable<ConsumerNotification> {
18✔
233
    const iter = new QueuedIteratorImpl<ConsumerNotification>();
23✔
234
    this.listeners.push(iter);
23✔
235
    return iter;
23✔
236
  }
23✔
237

238
  start(): void {
20✔
239
    const {
128✔
240
      deliver_subject: subject,
128✔
241
      deliver_group: queue,
128✔
242
      idle_heartbeat: hbNanos,
128✔
243
    } = this.consumer._info.config;
128✔
244
    if (!subject) {
×
245
      // this shouldn't happen - the push consumer should be validated
246
      throw new Error("bad consumer info");
×
247
    }
×
248

249
    if (hbNanos) {
107!
250
      const ms = millis(hbNanos);
116✔
251
      this.monitor = new IdleHeartbeatMonitor(
61✔
252
        ms,
61✔
253
        (count): boolean => {
61✔
254
          this.notify({ type: "heartbeats_missed", count });
260✔
255
          return false;
65✔
256
        },
61✔
257
        { maxOut: 2 },
293✔
258
      );
259

260
      (async () => {
116✔
261
        this.statusIterator = this.consumer.api.nc
179✔
262
          .status() as QueuedIteratorImpl<Status>;
179✔
263
        for await (const s of this.statusIterator) {
109!
264
          switch (s.type) {
113✔
265
            case "disconnect":
113✔
266
              this.monitor?.cancel();
114✔
267
              break;
114✔
268
            case "reconnect":
113✔
269
              this.monitor?.restart();
114✔
270
              break;
114✔
271
            default:
113✔
272
              // ignored
273
          }
113✔
274
        }
113✔
275
      })();
116✔
276
    }
116✔
277

278
    this.sub = this.consumer.api.nc.subscribe(subject, {
128✔
279
      queue,
128✔
280
      callback: (err, msg) => {
128✔
281
        if (err) {
×
282
          this.stop(err);
×
283
          return;
×
284
        }
×
285
        this.monitor?.work();
1,627!
286

287
        // need to make sure to catch all protocol messages even
288
        const isProtocol = this.ordered
1,333✔
289
          ? msg.subject.indexOf(this?.deliverPrefix!) === 0
1,333!
290
          : msg.subject === subject;
1,333✔
291

292
        if (isProtocol) {
1,627!
293
          if (msg.subject !== (this.sub as SubscriptionImpl).subject) {
×
294
            // this is a stale message - was not sent to the current inbox
295
            return;
×
296
          }
×
297

298
          const status = new JetStreamStatus(msg);
1,637✔
299
          if (status.isFlowControlRequest()) {
1,342!
300
            this._push(() => {
1,351✔
301
              msg.respond();
1,355✔
302
              this.notify({ type: "flow_control" });
4,065✔
303
            });
1,351✔
304
            return;
1,351✔
305
          }
1,351✔
306

307
          if (status.isIdleHeartbeat()) {
1,642✔
308
            const lastConsumerSequence = parseInt(
×
309
              msg.headers?.get("Nats-Last-Consumer") || "0",
×
310
            );
311
            const lastStreamSequence = parseInt(
×
312
              msg.headers?.get("Nats-Last-Stream") ?? "0",
×
313
            );
314
            this.notify({
1,642✔
315
              type: "heartbeat",
1,642✔
316
              lastStreamSequence,
1,642✔
317
              lastConsumerSequence,
1,642✔
318
            });
1,642✔
319
            return;
1,642✔
320
          }
1,642!
321

322
          const code = status.code;
×
323
          const description = status.description;
×
324

325
          if (status.isConsumerDeleted()) {
×
326
            this.notify({ type: "consumer_deleted", code, description });
×
327
          }
×
328
          if (this.abortOnMissingResource) {
×
329
            this._push(() => {
×
330
              this.stop(status.toError());
×
331
            });
×
332
            return;
×
333
          }
×
334
        } else {
1,627✔
335
          const m = toJsMsg(msg);
3,327✔
336
          if (this.ordered) {
3,327✔
337
            const dseq = m.info.deliverySequence;
3,344✔
338
            if (dseq !== this.cursor.deliver_seq + 1) {
2,622!
339
              this.reset();
2,624✔
340
              return;
2,624✔
341
            }
2,624✔
342
            this.cursor.deliver_seq = dseq;
3,359✔
343
            this.cursor.stream_seq = m.info.streamSequence;
3,359✔
344
          }
3,359✔
345
          this._push(m);
4,597✔
346
        }
4,597✔
347
      },
1,817✔
348
    });
128✔
349

350
    this.sub.closed.then(() => {
128✔
351
      // for ordered consumer we cannot break the iterator
352
      this._push(() => {
236✔
353
        this.stop();
318✔
354
      });
236✔
355
    });
128✔
356

357
    this.closed().then(() => {
128✔
358
      this.sub?.unsubscribe();
235✔
359
    });
128✔
360
  }
128✔
361

362
  notify(n: ConsumerNotification) {
19✔
363
    if (this.listeners.length > 0) {
33!
364
      (() => {
45✔
365
        this.listeners.forEach((l) => {
57✔
366
          const qi = l as QueuedIteratorImpl<ConsumerNotification>;
69✔
367
          if (!qi.done) {
69✔
368
            qi.push(n);
69✔
369
          }
69✔
370
        });
57✔
371
      })();
45✔
372
    }
45✔
373
  }
35✔
374
}
20✔
375

376
export type PushConsumerInternalOptions = PushConsumerOptions & {
377
  bound: boolean;
378
  ordered: boolean;
379
  name_prefix: string;
380
  deliver_prefix: string;
381
};
382

383
export class PushConsumerImpl implements PushConsumer {
20✔
384
  api: ConsumerAPIImpl;
20✔
385
  _info: ConsumerInfo;
131✔
386
  stream: string;
131✔
387
  name: string;
131✔
388
  bound: boolean;
131✔
389
  ordered: boolean;
131✔
390
  started: boolean;
131✔
391
  serial: number;
131✔
392
  opts: Partial<PushConsumerInternalOptions>;
20✔
393

394
  constructor(
20✔
395
    api: ConsumerAPI,
20✔
396
    info: ConsumerInfo,
20✔
397
    opts: Partial<PushConsumerInternalOptions> = {},
20✔
398
  ) {
20✔
399
    this.api = api as ConsumerAPIImpl;
131✔
400
    this._info = info;
131✔
401
    this.stream = info.stream_name;
131✔
402
    this.name = info.name;
131✔
403
    this.bound = opts.bound === true;
131✔
404
    this.started = false;
131✔
405
    this.opts = opts;
131✔
406
    this.serial = 0;
131✔
407
    this.ordered = opts.ordered || false;
53!
408

409
    if (this.ordered) {
131✔
410
      this.serial = 1;
140✔
411
    }
140✔
412
  }
131✔
413

414
  consume(
20✔
415
    userOptions: Partial<PushConsumerOptions> = {},
20✔
416
  ): Promise<ConsumerMessages> {
20✔
417
    if (this.started) {
54!
418
      return Promise.reject(
55✔
419
        new errors.InvalidOperationError("consumer already started"),
55✔
420
      );
421
    }
55✔
422

423
    if (!this._info.config.deliver_subject) {
×
424
      return Promise.reject(
×
425
        new Error("deliver_subject is not set, not a push consumer"),
×
426
      );
427
    }
✔
428
    if (!this._info.config.deliver_group && this._info.push_bound) {
54!
429
      return Promise.reject(
55✔
430
        new errors.InvalidOperationError("consumer is already bound"),
55✔
431
      );
432
    }
55✔
433
    const v = new PushConsumerMessagesImpl(this, userOptions, this.opts);
164✔
434
    this.started = true;
164✔
435
    v.closed().then(() => {
164✔
436
      this.started = false;
271✔
437
    });
164✔
438
    return Promise.resolve(v);
164✔
439
  }
130✔
440

441
  delete(): Promise<boolean> {
18✔
442
    if (this.bound) {
21✔
443
      return Promise.reject(
22✔
444
        new errors.InvalidOperationError("bound consumers cannot delete"),
22✔
445
      );
446
    }
22✔
447
    const { stream_name, name } = this._info;
23✔
448
    return this.api.delete(stream_name, name);
23✔
449
  }
21✔
450

451
  async info(cached?: boolean): Promise<ConsumerInfo> {
20✔
452
    if (this.bound) {
34!
453
      return Promise.reject(
35✔
454
        new errors.InvalidOperationError("bound consumers cannot info"),
35✔
455
      );
456
    }
35✔
457
    if (cached) {
105✔
458
      return Promise.resolve(this._info);
152✔
459
    }
152!
460
    // FIXME: this can possibly return a stale ci if this is an ordered
461
    //   consumer, and the consumer reset while we awaited the info...
462
    const info = await this.api.info(this.stream, this.name);
133✔
463
    this._info = info;
131✔
464
    return info;
131✔
465
  }
94✔
466

467
  isPullConsumer(): boolean {
18✔
468
    return false;
19✔
469
  }
19✔
470

471
  isPushConsumer(): boolean {
18✔
472
    return true;
19✔
473
  }
19✔
474
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc