• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13159645374

05 Feb 2025 02:30PM UTC coverage: 82.694% (+13.1%) from 69.574%
13159645374

push

github

web-flow
Add heartbeat handling to key iteration (#203)

* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done.

Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat.

* history for kv has the same issue - if values are purged in flight, the iteration may hang.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2259 of 3072 branches covered (73.54%)

Branch coverage included in aggregate %.

4 of 22 new or added lines in 1 file covered. (18.18%)

2912 existing lines in 30 files now uncovered.

9634 of 11310 relevant lines covered (85.18%)

788186.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.79
/jetstream/src/jsmsg.ts
1
/*
2
 * Copyright 2021-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import type {
17
  Msg,
18
  MsgHdrs,
19
  MsgImpl,
20
  ProtocolHandler,
21
  RequestOptions,
22
} from "@nats-io/nats-core/internal";
23
import {
20✔
24
  DataBuffer,
20✔
25
  deferred,
20✔
26
  millis,
20✔
27
  nanos,
20✔
28
  RequestOne,
20✔
29
} from "@nats-io/nats-core/internal";
20✔
30
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
31

32
export const ACK = Uint8Array.of(43, 65, 67, 75);
20✔
33
const NAK = Uint8Array.of(45, 78, 65, 75);
20✔
34
const WPI = Uint8Array.of(43, 87, 80, 73);
20✔
35
const NXT = Uint8Array.of(43, 78, 88, 84);
20✔
36
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
20✔
37
const SPACE = Uint8Array.of(32);
20✔
38

39
/**
40
 * Represents a message stored in JetStream
41
 */
42
export type JsMsg = {
43
  /**
44
   * True if the message was redelivered
45
   */
46
  redelivered: boolean;
47
  /**
48
   * The delivery info for the message
49
   */
50
  info: DeliveryInfo;
51
  /**
52
   * The sequence number for the message
53
   */
54
  seq: number;
55
  /**
56
   * Any headers associated with the message
57
   */
58
  headers: MsgHdrs | undefined;
59
  /**
60
   * The message's data
61
   */
62
  data: Uint8Array;
63
  /**
64
   * The subject on which the message was published
65
   */
66
  subject: string;
67
  /**
68
   * @ignore
69
   */
70
  sid: number;
71

72
  /**
73
   * The time the message was received
74
   */
75
  time: Date;
76

77
  /**
78
   * The time the message was received as an ISO formatted date string
79
   */
80
  timestamp: string;
81

82
  /**
83
   * Indicate to the JetStream server that the message was processed
84
   * successfully.
85
   */
86
  ack(): void;
87

88
  /**
89
   * Indicate to the JetStream server that processing of the message
90
   * failed, and that it should be resent after the specified number of
91
   * milliseconds.
92
   * @param millis
93
   */
94
  nak(millis?: number): void;
95

96
  /**
97
   * Indicate to the JetStream server that processing of the message
98
   * is on going, and that the ack wait timer for the message should be
99
   * reset preventing a redelivery.
100
   */
101
  working(): void;
102

103
  /**
104
   * !! this is an experimental feature - and could be removed
105
   *
106
   * next() combines ack() and pull(), requires the subject for a
107
   * subscription processing to process a message is provided
108
   * (can be the same) however, because the ability to specify
109
   * how long to keep the request open can be specified, this
110
   * functionality doesn't work well with iterators, as an error
111
   * (408s) are expected and needed to re-trigger a pull in case
112
   * there was a timeout. In an iterator, the error will close
113
   * the iterator, requiring a subscription to be reset.
114
   */
115
  next(subj: string, ro?: Partial<PullOptions>): void;
116

117
  /**
118
   * Indicate to the JetStream server that processing of the message
119
   * failed and that the message should not be sent to the consumer again.
120
   * @param reason is a string describing why the message was termed. Note
121
   * that `reason` is only available on servers 2.11.0 or better.
122
   */
123
  term(reason?: string): void;
124

125
  /**
126
   * Indicate to the JetStream server that the message was processed
127
   * successfully and that the JetStream server should acknowledge back
128
   * that the acknowledgement was received.
129
   * @param opts are optional options (currently only a timeout value
130
   * if not specified uses the timeout specified in the JetStreamOptions
131
   * when creating the JetStream context.
132
   */
133
  ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
134

135
  /**
136
   * Convenience method to parse the message payload as JSON. This method
137
   * will throw an exception if there's a parsing error;
138
   */
139
  json<T>(): T;
140

141
  /**
142
   * Convenience method to parse the message payload as string. This method
143
   * may throw an exception if there's a conversion error
144
   */
145
  string(): string;
146
};
147

148
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
20✔
149
  return new JsMsgImpl(m, ackTimeout);
5,126✔
150
}
5,126✔
151

152
export function parseInfo(s: string): DeliveryInfo {
20✔
153
  const tokens = s.split(".");
3,982✔
154
  if (tokens.length === 9) {
3,982✔
155
    tokens.splice(2, 0, "_", "");
7,522✔
156
  }
7,522✔
157

UNCOV
158
  if (
3,572✔
UNCOV
159
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
3,572✔
UNCOV
160
  ) {
3,572!
UNCOV
161
    throw new Error(`unable to parse delivery info - not a jetstream message`);
7,126✔
UNCOV
162
  }
7,126✔
163

164
  // old
165
  // "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
166
  // new
167
  // $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
168
  const di = {} as DeliveryInfo;
7,527✔
169
  // if domain is "_", replace with blank
UNCOV
170
  di.domain = tokens[2] === "_" ? "" : tokens[2];
3,572!
171
  di.account_hash = tokens[3];
3,982✔
172
  di.stream = tokens[4];
3,982✔
173
  di.consumer = tokens[5];
3,982✔
174
  di.deliveryCount = parseInt(tokens[6], 10);
3,982✔
175
  di.redelivered = di.deliveryCount > 1;
3,982✔
176
  di.streamSequence = parseInt(tokens[7], 10);
3,982✔
177
  di.deliverySequence = parseInt(tokens[8], 10);
3,982✔
178
  di.timestampNanos = parseInt(tokens[9], 10);
3,982✔
179
  di.pending = parseInt(tokens[10], 10);
3,982✔
180
  return di;
3,982✔
181
}
3,982✔
182

183
export class JsMsgImpl implements JsMsg {
20✔
184
  msg: Msg;
20✔
185
  di?: DeliveryInfo;
5,126✔
186
  didAck: boolean;
5,126✔
187
  timeout: number;
20✔
188

189
  constructor(msg: Msg, timeout: number) {
20✔
190
    this.msg = msg;
5,126✔
191
    this.didAck = false;
5,126✔
192
    this.timeout = timeout;
5,126✔
193
  }
5,126✔
194

UNCOV
195
  get subject(): string {
19✔
UNCOV
196
    return this.msg.subject;
272✔
UNCOV
197
  }
272✔
198

UNCOV
199
  get sid(): number {
18✔
UNCOV
200
    return this.msg.sid;
19✔
UNCOV
201
  }
19✔
202

203
  get data(): Uint8Array {
20✔
204
    return this.msg.data;
497✔
205
  }
497✔
206

UNCOV
207
  get headers(): MsgHdrs {
19✔
UNCOV
208
    return this.msg.headers!;
98✔
UNCOV
209
  }
98✔
210

211
  get info(): DeliveryInfo {
20✔
212
    if (!this.di) {
5,889✔
213
      this.di = parseInfo(this.reply);
9,835✔
214
    }
9,835✔
215
    return this.di;
5,889✔
216
  }
5,889✔
217

UNCOV
218
  get redelivered(): boolean {
18✔
UNCOV
219
    return this.info.deliveryCount > 1;
24✔
UNCOV
220
  }
24✔
221

222
  get reply(): string {
20✔
223
    return this.msg.reply || "";
×
224
  }
3,967✔
225

UNCOV
226
  get seq(): number {
19✔
UNCOV
227
    return this.info.streamSequence;
1,177✔
UNCOV
228
  }
1,177✔
229

UNCOV
230
  get time(): Date {
18✔
UNCOV
231
    const ms = millis(this.info.timestampNanos);
20✔
UNCOV
232
    return new Date(ms);
20✔
UNCOV
233
  }
20✔
234

UNCOV
235
  get timestamp(): string {
18✔
UNCOV
236
    return this.time.toISOString();
19✔
UNCOV
237
  }
19✔
238

UNCOV
239
  doAck(payload: Uint8Array) {
18✔
UNCOV
240
    if (!this.didAck) {
3,569✔
241
      // all acks are final with the exception of +WPI
UNCOV
242
      this.didAck = !this.isWIP(payload);
3,569✔
UNCOV
243
      this.msg.respond(payload);
3,569✔
UNCOV
244
    }
3,569✔
UNCOV
245
  }
3,569✔
246

UNCOV
247
  isWIP(p: Uint8Array) {
18✔
UNCOV
248
    return p.length === 4 && p[0] === WPI[0] && p[1] === WPI[1] &&
3,569✔
UNCOV
249
      p[2] === WPI[2] && p[3] === WPI[3];
3,569✔
UNCOV
250
  }
3,569✔
251

252
  // this has to dig into the internals as the message has access
253
  // to the protocol but not the high-level client.
UNCOV
254
  async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
18✔
UNCOV
255
    const d = deferred<boolean>();
34✔
UNCOV
256
    if (!this.didAck) {
34✔
UNCOV
257
      this.didAck = true;
48✔
UNCOV
258
      if (this.msg.reply) {
48✔
UNCOV
259
        opts = opts || {};
48✔
UNCOV
260
        opts.timeout = opts.timeout || this.timeout;
48✔
UNCOV
261
        const mi = this.msg as MsgImpl;
48✔
UNCOV
262
        const proto = mi.publisher as unknown as ProtocolHandler;
48✔
UNCOV
263
        const trace = !(proto.options?.noAsyncTraces || false);
48✔
UNCOV
264
        const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
48✔
UNCOV
265
          timeout: opts.timeout,
48✔
UNCOV
266
        }, trace);
48✔
UNCOV
267
        proto.request(r);
48✔
UNCOV
268
        try {
48✔
UNCOV
269
          proto.publish(
48✔
UNCOV
270
            this.msg.reply,
48✔
UNCOV
271
            ACK,
48✔
UNCOV
272
            {
48✔
UNCOV
273
              reply: `${proto.muxSubscriptions.baseInbox}${r.token}`,
48✔
UNCOV
274
            },
48✔
275
          );
276
        } catch (err) {
×
277
          r.cancel(err as Error);
×
278
        }
×
UNCOV
279
        try {
48✔
UNCOV
280
          await Promise.race([r.timer, r.deferred]);
192✔
UNCOV
281
          d.resolve(true);
58✔
UNCOV
282
        } catch (err) {
48✔
UNCOV
283
          r.cancel(err as Error);
52✔
UNCOV
284
          d.reject(err);
52✔
UNCOV
285
        }
52✔
286
      } else {
×
287
        d.resolve(false);
×
288
      }
×
UNCOV
289
    } else {
34✔
UNCOV
290
      d.resolve(false);
36✔
UNCOV
291
    }
36✔
UNCOV
292
    return d;
34✔
UNCOV
293
  }
34✔
294

UNCOV
295
  ack() {
18✔
UNCOV
296
    this.doAck(ACK);
3,464✔
UNCOV
297
  }
3,464✔
298

UNCOV
299
  nak(millis?: number) {
18✔
UNCOV
300
    let payload: Uint8Array | string = NAK;
121✔
UNCOV
301
    if (millis) {
121✔
UNCOV
302
      payload = new TextEncoder().encode(
122✔
UNCOV
303
        `-NAK ${JSON.stringify({ delay: nanos(millis) })}`,
366✔
304
      );
UNCOV
305
    }
122✔
UNCOV
306
    this.doAck(payload);
121✔
UNCOV
307
  }
121✔
308

UNCOV
309
  working() {
18✔
UNCOV
310
    this.doAck(WPI);
19✔
UNCOV
311
  }
19✔
312

313
  next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
×
314
    const args: Partial<PullOptions> = {};
×
315
    args.batch = opts.batch || 1;
×
316
    args.no_wait = opts.no_wait || false;
×
317
    if (opts.expires && opts.expires > 0) {
×
318
      args.expires = nanos(opts.expires);
×
319
    }
×
320
    const data = new TextEncoder().encode(JSON.stringify(args));
×
321
    const payload = DataBuffer.concat(NXT, SPACE, data);
×
322
    const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
×
323
    this.msg.respond(payload, reqOpts);
×
324
  }
×
325

UNCOV
326
  term(reason = "") {
18✔
UNCOV
327
    let term = TERM;
19✔
328
    if (reason?.length > 0) {
×
329
      term = new TextEncoder().encode(`+TERM ${reason}`);
×
330
    }
×
UNCOV
331
    this.doAck(term);
19✔
UNCOV
332
  }
19✔
333

334
  json<T = unknown>(): T {
19✔
335
    return this.msg.json();
64✔
336
  }
64✔
337

UNCOV
338
  string(): string {
19✔
UNCOV
339
    return this.msg.string();
54✔
UNCOV
340
  }
54✔
341
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc