• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.6
/jetstream/src/jsmsg.ts
1
/*
2
 * Copyright 2021-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import type {
17
  Msg,
18
  MsgHdrs,
19
  MsgImpl,
20
  ProtocolHandler,
21
  RequestOptions,
22
} from "@nats-io/nats-core/internal";
23
import {
20✔
24
  DataBuffer,
20✔
25
  deferred,
20✔
26
  millis,
20✔
27
  nanos,
20✔
28
  RequestOne,
20✔
29
} from "@nats-io/nats-core/internal";
20✔
30
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
31

32
export const ACK = Uint8Array.of(43, 65, 67, 75);
20✔
33
const NAK = Uint8Array.of(45, 78, 65, 75);
20✔
34
const WPI = Uint8Array.of(43, 87, 80, 73);
20✔
35
const NXT = Uint8Array.of(43, 78, 88, 84);
20✔
36
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
20✔
37
const SPACE = Uint8Array.of(32);
20✔
38

39
/**
40
 * Represents a message stored in JetStream
41
 */
42
export type JsMsg = {
43
  /**
44
   * True if the message was redelivered
45
   */
46
  redelivered: boolean;
47
  /**
48
   * The delivery info for the message
49
   */
50
  info: DeliveryInfo;
51
  /**
52
   * The sequence number for the message
53
   */
54
  seq: number;
55
  /**
56
   * Any headers associated with the message
57
   */
58
  headers: MsgHdrs | undefined;
59
  /**
60
   * The message's data
61
   */
62
  data: Uint8Array;
63
  /**
64
   * The subject on which the message was published
65
   */
66
  subject: string;
67
  /**
68
   * @ignore
69
   */
70
  sid: number;
71

72
  /**
73
   * The time the message was received
74
   */
75
  time: Date;
76

77
  /**
78
   * The time the message was received as an ISO formatted date string
79
   */
80
  timestamp: string;
81

82
  /**
83
   * Represents the message timestamp in nanoseconds as a BigInt.
84
   */
85
  timestampNanos: bigint;
86

87
  /**
88
   * Indicate to the JetStream server that the message was processed
89
   * successfully.
90
   */
91
  ack(): void;
92

93
  /**
94
   * Indicate to the JetStream server that processing of the message
95
   * failed, and that it should be resent after the specified number of
96
   * milliseconds.
97
   * @param millis
98
   */
99
  nak(millis?: number): void;
100

101
  /**
102
   * Indicate to the JetStream server that processing of the message
103
   * is on going, and that the ack wait timer for the message should be
104
   * reset preventing a redelivery.
105
   */
106
  working(): void;
107

108
  /**
109
   * !! this is an experimental feature - and could be removed
110
   *
111
   * next() combines ack() and pull(), requires the subject for a
112
   * subscription processing to process a message is provided
113
   * (can be the same) however, because the ability to specify
114
   * how long to keep the request open can be specified, this
115
   * functionality doesn't work well with iterators, as an error
116
   * (408s) are expected and needed to re-trigger a pull in case
117
   * there was a timeout. In an iterator, the error will close
118
   * the iterator, requiring a subscription to be reset.
119
   */
120
  next(subj: string, ro?: Partial<PullOptions>): void;
121

122
  /**
123
   * Indicate to the JetStream server that processing of the message
124
   * failed and that the message should not be sent to the consumer again.
125
   * @param reason is a string describing why the message was termed. Note
126
   * that `reason` is only available on servers 2.11.0 or better.
127
   */
128
  term(reason?: string): void;
129

130
  /**
131
   * Indicate to the JetStream server that the message was processed
132
   * successfully and that the JetStream server should acknowledge back
133
   * that the acknowledgement was received.
134
   * @param opts are optional options (currently only a timeout value
135
   * if not specified uses the timeout specified in the JetStreamOptions
136
   * when creating the JetStream context.
137
   */
138
  ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
139

140
  /**
141
   * Convenience method to parse the message payload as JSON. This method
142
   * will throw an exception if there's a parsing error;
143
   */
144
  json<T>(): T;
145

146
  /**
147
   * Convenience method to parse the message payload as string. This method
148
   * may throw an exception if there's a conversion error
149
   */
150
  string(): string;
151
};
152

153
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
20✔
154
  return new JsMsgImpl(m, ackTimeout);
5,144✔
155
}
5,144✔
156

157
export function parseInfo(s: string): DeliveryInfo {
20✔
158
  const tokens = s.split(".");
3,985✔
159
  if (tokens.length === 9) {
3,985✔
160
    tokens.splice(2, 0, "_", "");
7,528✔
161
  }
7,528✔
162

UNCOV
163
  if (
3,575✔
UNCOV
164
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
3,575✔
UNCOV
165
  ) {
3,575!
UNCOV
166
    throw new Error(`unable to parse delivery info - not a jetstream message`);
7,132✔
UNCOV
167
  }
7,132✔
168

169
  // old
170
  // "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
171
  // new
172
  // $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
173
  const di = {} as DeliveryInfo;
7,533✔
174
  // if domain is "_", replace with blank
UNCOV
175
  di.domain = tokens[2] === "_" ? "" : tokens[2];
3,575!
176
  di.account_hash = tokens[3];
3,985✔
177
  di.stream = tokens[4];
3,985✔
178
  di.consumer = tokens[5];
3,985✔
179
  di.deliveryCount = parseInt(tokens[6], 10);
3,985✔
180
  di.redelivered = di.deliveryCount > 1;
3,985✔
181
  di.streamSequence = parseInt(tokens[7], 10);
3,985✔
182
  di.deliverySequence = parseInt(tokens[8], 10);
3,985✔
183
  di.timestampNanos = parseInt(tokens[9], 10);
3,985✔
184
  di.pending = parseInt(tokens[10], 10);
3,985✔
185
  return di;
3,985✔
186
}
3,985✔
187

188
function parseTimestampNanos(s: string): bigint {
×
189
  const tokens = s.split(".");
×
190
  if (tokens.length === 9) {
×
191
    tokens.splice(2, 0, "_", "");
×
192
  }
×
193

194
  if (
×
195
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
×
196
  ) {
×
197
    throw new Error(`unable to parse delivery info - not a jetstream message`);
×
198
  }
×
199
  return BigInt(tokens[9]);
×
200
}
×
201

202
export class JsMsgImpl implements JsMsg {
20✔
203
  msg: Msg;
20✔
204
  di?: DeliveryInfo;
5,144✔
205
  didAck: boolean;
5,144✔
206
  timeout: number;
20✔
207

208
  constructor(msg: Msg, timeout: number) {
20✔
209
    this.msg = msg;
5,144✔
210
    this.didAck = false;
5,144✔
211
    this.timeout = timeout;
5,144✔
212
  }
5,144✔
213

UNCOV
214
  get subject(): string {
19✔
UNCOV
215
    return this.msg.subject;
275✔
UNCOV
216
  }
275✔
217

UNCOV
218
  get sid(): number {
18✔
UNCOV
219
    return this.msg.sid;
19✔
UNCOV
220
  }
19✔
221

222
  get data(): Uint8Array {
20✔
223
    return this.msg.data;
497✔
224
  }
497✔
225

UNCOV
226
  get headers(): MsgHdrs {
19✔
UNCOV
227
    return this.msg.headers!;
98✔
UNCOV
228
  }
98✔
229

230
  get info(): DeliveryInfo {
20✔
231
    if (!this.di) {
5,895✔
232
      this.di = parseInfo(this.reply);
9,844✔
233
    }
9,844✔
234
    return this.di;
5,895✔
235
  }
5,895✔
236

UNCOV
237
  get redelivered(): boolean {
18✔
UNCOV
238
    return this.info.deliveryCount > 1;
24✔
UNCOV
239
  }
24✔
240

241
  get reply(): string {
20✔
242
    return this.msg.reply || "";
×
243
  }
3,970✔
244

UNCOV
245
  get seq(): number {
19✔
UNCOV
246
    return this.info.streamSequence;
1,177✔
UNCOV
247
  }
1,177✔
248

UNCOV
249
  get time(): Date {
18✔
UNCOV
250
    const ms = millis(this.info.timestampNanos);
20✔
UNCOV
251
    return new Date(ms);
20✔
UNCOV
252
  }
20✔
253

UNCOV
254
  get timestamp(): string {
18✔
UNCOV
255
    return this.time.toISOString();
19✔
UNCOV
256
  }
19✔
257

258
  get timestampNanos(): bigint {
×
259
    return parseTimestampNanos(this.reply);
×
260
  }
×
261

UNCOV
262
  doAck(payload: Uint8Array) {
18✔
UNCOV
263
    if (!this.didAck) {
3,576✔
264
      // all acks are final with the exception of +WPI
UNCOV
265
      this.didAck = !this.isWIP(payload);
3,576✔
UNCOV
266
      this.msg.respond(payload);
3,576✔
UNCOV
267
    }
3,576✔
UNCOV
268
  }
3,576✔
269

UNCOV
270
  isWIP(p: Uint8Array) {
18✔
UNCOV
271
    return p.length === 4 && p[0] === WPI[0] && p[1] === WPI[1] &&
3,576✔
UNCOV
272
      p[2] === WPI[2] && p[3] === WPI[3];
3,576✔
UNCOV
273
  }
3,576✔
274

275
  // this has to dig into the internals as the message has access
276
  // to the protocol but not the high-level client.
UNCOV
277
  async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
18✔
UNCOV
278
    const d = deferred<boolean>();
34✔
UNCOV
279
    if (!this.didAck) {
34✔
UNCOV
280
      this.didAck = true;
48✔
UNCOV
281
      if (this.msg.reply) {
48✔
UNCOV
282
        opts = opts || {};
48✔
UNCOV
283
        opts.timeout = opts.timeout || this.timeout;
48✔
UNCOV
284
        const mi = this.msg as MsgImpl;
48✔
UNCOV
285
        const proto = mi.publisher as unknown as ProtocolHandler;
48✔
UNCOV
286
        const trace = !(proto.options?.noAsyncTraces || false);
48✔
UNCOV
287
        const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
48✔
UNCOV
288
          timeout: opts.timeout,
48✔
UNCOV
289
        }, trace);
48✔
UNCOV
290
        proto.request(r);
48✔
UNCOV
291
        try {
48✔
UNCOV
292
          proto.publish(
48✔
UNCOV
293
            this.msg.reply,
48✔
UNCOV
294
            ACK,
48✔
UNCOV
295
            {
48✔
UNCOV
296
              reply: `${proto.muxSubscriptions.baseInbox}${r.token}`,
48✔
UNCOV
297
            },
48✔
298
          );
299
        } catch (err) {
×
300
          r.cancel(err as Error);
×
301
        }
×
UNCOV
302
        try {
48✔
UNCOV
303
          await Promise.race([r.timer, r.deferred]);
192✔
UNCOV
304
          d.resolve(true);
58✔
UNCOV
305
        } catch (err) {
48✔
UNCOV
306
          r.cancel(err as Error);
52✔
UNCOV
307
          d.reject(err);
52✔
UNCOV
308
        }
52✔
309
      } else {
×
310
        d.resolve(false);
×
311
      }
×
UNCOV
312
    } else {
34✔
UNCOV
313
      d.resolve(false);
36✔
UNCOV
314
    }
36✔
UNCOV
315
    return d;
34✔
UNCOV
316
  }
34✔
317

UNCOV
318
  ack() {
18✔
UNCOV
319
    this.doAck(ACK);
3,470✔
UNCOV
320
  }
3,470✔
321

UNCOV
322
  nak(millis?: number) {
18✔
UNCOV
323
    let payload: Uint8Array | string = NAK;
121✔
UNCOV
324
    if (millis) {
121✔
UNCOV
325
      payload = new TextEncoder().encode(
122✔
UNCOV
326
        `-NAK ${JSON.stringify({ delay: nanos(millis) })}`,
366✔
327
      );
UNCOV
328
    }
122✔
UNCOV
329
    this.doAck(payload);
121✔
UNCOV
330
  }
121✔
331

UNCOV
332
  working() {
18✔
UNCOV
333
    this.doAck(WPI);
19✔
UNCOV
334
  }
19✔
335

336
  next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
×
337
    const args: Partial<PullOptions> = {};
×
338
    args.batch = opts.batch || 1;
×
339
    args.no_wait = opts.no_wait || false;
×
340
    if (opts.expires && opts.expires > 0) {
×
341
      args.expires = nanos(opts.expires);
×
342
    }
×
343
    const data = new TextEncoder().encode(JSON.stringify(args));
×
344
    const payload = DataBuffer.concat(NXT, SPACE, data);
×
345
    const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
×
346
    this.msg.respond(payload, reqOpts);
×
347
  }
×
348

UNCOV
349
  term(reason = "") {
18✔
UNCOV
350
    let term = TERM;
20✔
UNCOV
351
    if (reason?.length > 0) {
20✔
UNCOV
352
      term = new TextEncoder().encode(`+TERM ${reason}`) as Uint8Array<
21✔
353
        ArrayBuffer
354
      >;
UNCOV
355
    }
21✔
UNCOV
356
    this.doAck(term);
20✔
UNCOV
357
  }
20✔
358

359
  json<T = unknown>(): T {
19✔
360
    return this.msg.json();
64✔
361
  }
64✔
362

UNCOV
363
  string(): string {
19✔
UNCOV
364
    return this.msg.string();
54✔
UNCOV
365
  }
54✔
366
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc