• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13091454488

01 Feb 2025 06:48PM UTC coverage: 82.681% (-0.05%) from 82.727%
13091454488

push

github

web-flow
fix(jetstream): direct batch get missing `next_by_subj` filter (#201)

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2272 of 3100 branches covered (73.29%)

Branch coverage included in aggregate %.

9625 of 11289 relevant lines covered (85.26%)

787241.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.79
/jetstream/src/jsmsg.ts
1
/*
2
 * Copyright 2021-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import type {
17
  Msg,
18
  MsgHdrs,
19
  MsgImpl,
20
  ProtocolHandler,
21
  RequestOptions,
22
} from "@nats-io/nats-core/internal";
23
import {
20✔
24
  DataBuffer,
20✔
25
  deferred,
20✔
26
  millis,
20✔
27
  nanos,
20✔
28
  RequestOne,
20✔
29
} from "@nats-io/nats-core/internal";
20✔
30
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
31

32
export const ACK = Uint8Array.of(43, 65, 67, 75);
20✔
33
const NAK = Uint8Array.of(45, 78, 65, 75);
20✔
34
const WPI = Uint8Array.of(43, 87, 80, 73);
20✔
35
const NXT = Uint8Array.of(43, 78, 88, 84);
20✔
36
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
20✔
37
const SPACE = Uint8Array.of(32);
20✔
38

39
/**
40
 * Represents a message stored in JetStream
41
 */
42
export type JsMsg = {
43
  /**
44
   * True if the message was redelivered
45
   */
46
  redelivered: boolean;
47
  /**
48
   * The delivery info for the message
49
   */
50
  info: DeliveryInfo;
51
  /**
52
   * The sequence number for the message
53
   */
54
  seq: number;
55
  /**
56
   * Any headers associated with the message
57
   */
58
  headers: MsgHdrs | undefined;
59
  /**
60
   * The message's data
61
   */
62
  data: Uint8Array;
63
  /**
64
   * The subject on which the message was published
65
   */
66
  subject: string;
67
  /**
68
   * @ignore
69
   */
70
  sid: number;
71

72
  /**
73
   * The time the message was received
74
   */
75
  time: Date;
76

77
  /**
78
   * The time the message was received as an ISO formatted date string
79
   */
80
  timestamp: string;
81

82
  /**
83
   * Indicate to the JetStream server that the message was processed
84
   * successfully.
85
   */
86
  ack(): void;
87

88
  /**
89
   * Indicate to the JetStream server that processing of the message
90
   * failed, and that it should be resent after the specified number of
91
   * milliseconds.
92
   * @param millis
93
   */
94
  nak(millis?: number): void;
95

96
  /**
97
   * Indicate to the JetStream server that processing of the message
98
   * is on going, and that the ack wait timer for the message should be
99
   * reset preventing a redelivery.
100
   */
101
  working(): void;
102

103
  /**
104
   * !! this is an experimental feature - and could be removed
105
   *
106
   * next() combines ack() and pull(), requires the subject for a
107
   * subscription processing to process a message is provided
108
   * (can be the same) however, because the ability to specify
109
   * how long to keep the request open can be specified, this
110
   * functionality doesn't work well with iterators, as an error
111
   * (408s) are expected and needed to re-trigger a pull in case
112
   * there was a timeout. In an iterator, the error will close
113
   * the iterator, requiring a subscription to be reset.
114
   */
115
  next(subj: string, ro?: Partial<PullOptions>): void;
116

117
  /**
118
   * Indicate to the JetStream server that processing of the message
119
   * failed and that the message should not be sent to the consumer again.
120
   * @param reason is a string describing why the message was termed. Note
121
   * that `reason` is only available on servers 2.11.0 or better.
122
   */
123
  term(reason?: string): void;
124

125
  /**
126
   * Indicate to the JetStream server that the message was processed
127
   * successfully and that the JetStream server should acknowledge back
128
   * that the acknowledgement was received.
129
   * @param opts are optional options (currently only a timeout value
130
   * if not specified uses the timeout specified in the JetStreamOptions
131
   * when creating the JetStream context.
132
   */
133
  ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
134

135
  /**
136
   * Convenience method to parse the message payload as JSON. This method
137
   * will throw an exception if there's a parsing error;
138
   */
139
  json<T>(): T;
140

141
  /**
142
   * Convenience method to parse the message payload as string. This method
143
   * may throw an exception if there's a conversion error
144
   */
145
  string(): string;
146
};
147

148
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
20✔
149
  return new JsMsgImpl(m, ackTimeout);
5,126✔
150
}
5,126✔
151

152
export function parseInfo(s: string): DeliveryInfo {
20✔
153
  const tokens = s.split(".");
3,982✔
154
  if (tokens.length === 9) {
3,982✔
155
    tokens.splice(2, 0, "_", "");
7,522✔
156
  }
7,522✔
157

158
  if (
3,572✔
159
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
3,572✔
160
  ) {
3,572!
161
    throw new Error(`unable to parse delivery info - not a jetstream message`);
7,126✔
162
  }
7,126✔
163

164
  // old
165
  // "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
166
  // new
167
  // $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
168
  const di = {} as DeliveryInfo;
7,527✔
169
  // if domain is "_", replace with blank
170
  di.domain = tokens[2] === "_" ? "" : tokens[2];
3,572!
171
  di.account_hash = tokens[3];
3,982✔
172
  di.stream = tokens[4];
3,982✔
173
  di.consumer = tokens[5];
3,982✔
174
  di.deliveryCount = parseInt(tokens[6], 10);
3,982✔
175
  di.redelivered = di.deliveryCount > 1;
3,982✔
176
  di.streamSequence = parseInt(tokens[7], 10);
3,982✔
177
  di.deliverySequence = parseInt(tokens[8], 10);
3,982✔
178
  di.timestampNanos = parseInt(tokens[9], 10);
3,982✔
179
  di.pending = parseInt(tokens[10], 10);
3,982✔
180
  return di;
3,982✔
181
}
3,982✔
182

183
export class JsMsgImpl implements JsMsg {
20✔
184
  msg: Msg;
20✔
185
  di?: DeliveryInfo;
5,126✔
186
  didAck: boolean;
5,126✔
187
  timeout: number;
20✔
188

189
  constructor(msg: Msg, timeout: number) {
20✔
190
    this.msg = msg;
5,126✔
191
    this.didAck = false;
5,126✔
192
    this.timeout = timeout;
5,126✔
193
  }
5,126✔
194

195
  get subject(): string {
19✔
196
    return this.msg.subject;
272✔
197
  }
272✔
198

199
  get sid(): number {
18✔
200
    return this.msg.sid;
19✔
201
  }
19✔
202

203
  get data(): Uint8Array {
20✔
204
    return this.msg.data;
497✔
205
  }
497✔
206

207
  get headers(): MsgHdrs {
19✔
208
    return this.msg.headers!;
98✔
209
  }
98✔
210

211
  get info(): DeliveryInfo {
20✔
212
    if (!this.di) {
5,889✔
213
      this.di = parseInfo(this.reply);
9,835✔
214
    }
9,835✔
215
    return this.di;
5,889✔
216
  }
5,889✔
217

218
  get redelivered(): boolean {
18✔
219
    return this.info.deliveryCount > 1;
24✔
220
  }
24✔
221

222
  get reply(): string {
20✔
223
    return this.msg.reply || "";
×
224
  }
3,967✔
225

226
  get seq(): number {
19✔
227
    return this.info.streamSequence;
1,177✔
228
  }
1,177✔
229

230
  get time(): Date {
18✔
231
    const ms = millis(this.info.timestampNanos);
20✔
232
    return new Date(ms);
20✔
233
  }
20✔
234

235
  get timestamp(): string {
18✔
236
    return this.time.toISOString();
19✔
237
  }
19✔
238

239
  doAck(payload: Uint8Array) {
18✔
240
    if (!this.didAck) {
3,569✔
241
      // all acks are final with the exception of +WPI
242
      this.didAck = !this.isWIP(payload);
3,569✔
243
      this.msg.respond(payload);
3,569✔
244
    }
3,569✔
245
  }
3,569✔
246

247
  isWIP(p: Uint8Array) {
18✔
248
    return p.length === 4 && p[0] === WPI[0] && p[1] === WPI[1] &&
3,569✔
249
      p[2] === WPI[2] && p[3] === WPI[3];
3,569✔
250
  }
3,569✔
251

252
  // this has to dig into the internals as the message has access
253
  // to the protocol but not the high-level client.
254
  async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
18✔
255
    const d = deferred<boolean>();
34✔
256
    if (!this.didAck) {
34✔
257
      this.didAck = true;
48✔
258
      if (this.msg.reply) {
48✔
259
        opts = opts || {};
48✔
260
        opts.timeout = opts.timeout || this.timeout;
48✔
261
        const mi = this.msg as MsgImpl;
48✔
262
        const proto = mi.publisher as unknown as ProtocolHandler;
48✔
263
        const trace = !(proto.options?.noAsyncTraces || false);
48✔
264
        const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
48✔
265
          timeout: opts.timeout,
48✔
266
        }, trace);
48✔
267
        proto.request(r);
48✔
268
        try {
48✔
269
          proto.publish(
48✔
270
            this.msg.reply,
48✔
271
            ACK,
48✔
272
            {
48✔
273
              reply: `${proto.muxSubscriptions.baseInbox}${r.token}`,
48✔
274
            },
48✔
275
          );
276
        } catch (err) {
×
277
          r.cancel(err as Error);
×
278
        }
×
279
        try {
48✔
280
          await Promise.race([r.timer, r.deferred]);
192✔
281
          d.resolve(true);
58✔
282
        } catch (err) {
48✔
283
          r.cancel(err as Error);
52✔
284
          d.reject(err);
52✔
285
        }
52✔
286
      } else {
×
287
        d.resolve(false);
×
288
      }
×
289
    } else {
34✔
290
      d.resolve(false);
36✔
291
    }
36✔
292
    return d;
34✔
293
  }
34✔
294

295
  ack() {
18✔
296
    this.doAck(ACK);
3,464✔
297
  }
3,464✔
298

299
  nak(millis?: number) {
18✔
300
    let payload: Uint8Array | string = NAK;
121✔
301
    if (millis) {
121✔
302
      payload = new TextEncoder().encode(
122✔
303
        `-NAK ${JSON.stringify({ delay: nanos(millis) })}`,
366✔
304
      );
305
    }
122✔
306
    this.doAck(payload);
121✔
307
  }
121✔
308

309
  working() {
18✔
310
    this.doAck(WPI);
19✔
311
  }
19✔
312

313
  next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
×
314
    const args: Partial<PullOptions> = {};
×
315
    args.batch = opts.batch || 1;
×
316
    args.no_wait = opts.no_wait || false;
×
317
    if (opts.expires && opts.expires > 0) {
×
318
      args.expires = nanos(opts.expires);
×
319
    }
×
320
    const data = new TextEncoder().encode(JSON.stringify(args));
×
321
    const payload = DataBuffer.concat(NXT, SPACE, data);
×
322
    const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
×
323
    this.msg.respond(payload, reqOpts);
×
324
  }
×
325

326
  term(reason = "") {
18✔
327
    let term = TERM;
19✔
328
    if (reason?.length > 0) {
×
329
      term = new TextEncoder().encode(`+TERM ${reason}`);
×
330
    }
×
331
    this.doAck(term);
19✔
332
  }
19✔
333

334
  json<T = unknown>(): T {
19✔
335
    return this.msg.json();
64✔
336
  }
64✔
337

338
  string(): string {
19✔
339
    return this.msg.string();
54✔
340
  }
54✔
341
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc