• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 17808376885

17 Sep 2025 07:22PM UTC coverage: 84.865% (+1.1%) from 83.791%
17808376885

push

github

web-flow
chore: expose some internal types downstream (#318)

* chore: bump `@nats-io/jetstream` to `3.2.0-2` across modules

some internal types exposed to allow support for counters.
fixed some linter issues related to deno 2.25.x

Signed-off-by: Alberto Ricart <alberto@synadia.com>

1979 of 2556 branches covered (77.43%)

Branch coverage included in aggregate %.

7 of 7 new or added lines in 2 files covered. (100.0%)

65 existing lines in 7 files now uncovered.

8484 of 9773 relevant lines covered (86.81%)

886438.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.6
/jetstream/src/jsmsg.ts
1
/*
2
 * Copyright 2021-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import type {
17
  Msg,
18
  MsgHdrs,
19
  MsgImpl,
20
  ProtocolHandler,
21
  RequestOptions,
22
} from "@nats-io/nats-core/internal";
23
import {
19✔
24
  DataBuffer,
19✔
25
  deferred,
19✔
26
  millis,
19✔
27
  nanos,
19✔
28
  RequestOne,
19✔
29
} from "@nats-io/nats-core/internal";
19✔
30
import type { DeliveryInfo, PullOptions } from "./jsapi_types.ts";
31

32
export const ACK = Uint8Array.of(43, 65, 67, 75);
19✔
33
const NAK = Uint8Array.of(45, 78, 65, 75);
19✔
34
const WPI = Uint8Array.of(43, 87, 80, 73);
19✔
35
const NXT = Uint8Array.of(43, 78, 88, 84);
19✔
36
const TERM = Uint8Array.of(43, 84, 69, 82, 77);
19✔
37
const SPACE = Uint8Array.of(32);
19✔
38

39
/**
40
 * Represents a message stored in JetStream
41
 */
42
export type JsMsg = {
43
  /**
44
   * True if the message was redelivered
45
   */
46
  redelivered: boolean;
47
  /**
48
   * The delivery info for the message
49
   */
50
  info: DeliveryInfo;
51
  /**
52
   * The sequence number for the message
53
   */
54
  seq: number;
55
  /**
56
   * Any headers associated with the message
57
   */
58
  headers: MsgHdrs | undefined;
59
  /**
60
   * The message's data
61
   */
62
  data: Uint8Array;
63
  /**
64
   * The subject on which the message was published
65
   */
66
  subject: string;
67
  /**
68
   * @ignore
69
   */
70
  sid: number;
71

72
  /**
73
   * The time the message was received
74
   */
75
  time: Date;
76

77
  /**
78
   * The time the message was received as an ISO formatted date string
79
   */
80
  timestamp: string;
81

82
  /**
83
   * Represents the message timestamp in nanoseconds as a BigInt.
84
   */
85
  timestampNanos: bigint;
86

87
  /**
88
   * Indicate to the JetStream server that the message was processed
89
   * successfully.
90
   */
91
  ack(): void;
92

93
  /**
94
   * Indicate to the JetStream server that processing of the message
95
   * failed, and that it should be resent after the specified number of
96
   * milliseconds.
97
   * @param millis
98
   */
99
  nak(millis?: number): void;
100

101
  /**
102
   * Indicate to the JetStream server that processing of the message
103
   * is on going, and that the ack wait timer for the message should be
104
   * reset preventing a redelivery.
105
   */
106
  working(): void;
107

108
  /**
109
   * !! this is an experimental feature - and could be removed
110
   *
111
   * next() combines ack() and pull(), requires the subject for a
112
   * subscription processing to process a message is provided
113
   * (can be the same) however, because the ability to specify
114
   * how long to keep the request open can be specified, this
115
   * functionality doesn't work well with iterators, as an error
116
   * (408s) are expected and needed to re-trigger a pull in case
117
   * there was a timeout. In an iterator, the error will close
118
   * the iterator, requiring a subscription to be reset.
119
   */
120
  next(subj: string, ro?: Partial<PullOptions>): void;
121

122
  /**
123
   * Indicate to the JetStream server that processing of the message
124
   * failed and that the message should not be sent to the consumer again.
125
   * @param reason is a string describing why the message was termed. Note
126
   * that `reason` is only available on servers 2.11.0 or better.
127
   */
128
  term(reason?: string): void;
129

130
  /**
131
   * Indicate to the JetStream server that the message was processed
132
   * successfully and that the JetStream server should acknowledge back
133
   * that the acknowledgement was received.
134
   * @param opts are optional options (currently only a timeout value
135
   * if not specified uses the timeout specified in the JetStreamOptions
136
   * when creating the JetStream context.
137
   */
138
  ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean>;
139

140
  /**
141
   * Convenience method to parse the message payload as JSON. This method
142
   * will throw an exception if there's a parsing error;
143
   */
144
  json<T>(): T;
145

146
  /**
147
   * Convenience method to parse the message payload as string. This method
148
   * may throw an exception if there's a conversion error
149
   */
150
  string(): string;
151
};
152

153
export function toJsMsg(m: Msg, ackTimeout = 5000): JsMsg {
19✔
154
  return new JsMsgImpl(m, ackTimeout);
5,085✔
155
}
5,085✔
156

157
export function parseInfo(s: string): DeliveryInfo {
19✔
158
  const tokens = s.split(".");
3,923✔
159
  if (tokens.length === 9) {
3,923✔
160
    tokens.splice(2, 0, "_", "");
7,569✔
161
  }
7,569✔
162

163
  if (
3,678✔
164
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
3,678✔
165
  ) {
3,678!
166
    throw new Error(`unable to parse delivery info - not a jetstream message`);
7,338✔
167
  }
7,338✔
168

169
  // old
170
  // "$JS.ACK.<stream>.<consumer>.<deliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
171
  // new
172
  // $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<deliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
173
  const di = {} as DeliveryInfo;
7,574✔
174
  // if domain is "_", replace with blank
175
  di.domain = tokens[2] === "_" ? "" : tokens[2];
3,678!
176
  di.account_hash = tokens[3];
3,923✔
177
  di.stream = tokens[4];
3,923✔
178
  di.consumer = tokens[5];
3,923✔
179
  di.deliveryCount = parseInt(tokens[6], 10);
3,923✔
180
  di.redelivered = di.deliveryCount > 1;
3,923✔
181
  di.streamSequence = parseInt(tokens[7], 10);
3,923✔
182
  di.deliverySequence = parseInt(tokens[8], 10);
3,923✔
183
  di.timestampNanos = parseInt(tokens[9], 10);
3,923✔
184
  di.pending = parseInt(tokens[10], 10);
3,923✔
185
  return di;
3,923✔
186
}
3,923✔
187

188
function parseTimestampNanos(s: string): bigint {
×
189
  const tokens = s.split(".");
×
190
  if (tokens.length === 9) {
×
191
    tokens.splice(2, 0, "_", "");
×
192
  }
×
193

194
  if (
×
195
    (tokens.length < 11) || tokens[0] !== "$JS" || tokens[1] !== "ACK"
×
196
  ) {
×
197
    throw new Error(`unable to parse delivery info - not a jetstream message`);
×
198
  }
×
199
  return BigInt(tokens[9]);
×
200
}
×
201

202
export class JsMsgImpl implements JsMsg {
19✔
203
  msg: Msg;
19✔
204
  di?: DeliveryInfo;
5,085✔
205
  didAck: boolean;
5,085✔
206
  timeout: number;
19✔
207

208
  constructor(msg: Msg, timeout: number) {
19✔
209
    this.msg = msg;
5,085✔
210
    this.didAck = false;
5,085✔
211
    this.timeout = timeout;
5,085✔
212
  }
5,085✔
213

214
  get subject(): string {
19✔
215
    return this.msg.subject;
142✔
216
  }
142✔
217

218
  get sid(): number {
18✔
219
    return this.msg.sid;
19✔
220
  }
19✔
221

222
  get data(): Uint8Array {
19✔
223
    return this.msg.data;
57✔
224
  }
57✔
225

226
  get headers(): MsgHdrs {
19✔
227
    return this.msg.headers!;
267✔
228
  }
267✔
229

230
  get info(): DeliveryInfo {
19✔
231
    if (!this.di) {
5,518✔
232
      this.di = parseInfo(this.reply);
9,406✔
233
    }
9,406✔
234
    return this.di;
5,518✔
235
  }
5,518✔
236

237
  get redelivered(): boolean {
18✔
238
    return this.info.deliveryCount > 1;
24✔
239
  }
24✔
240

241
  get reply(): string {
19✔
242
    return this.msg.reply || "";
×
243
  }
3,908✔
244

245
  get seq(): number {
19✔
246
    return this.info.streamSequence;
1,185✔
247
  }
1,185✔
248

249
  get time(): Date {
18✔
250
    const ms = millis(this.info.timestampNanos);
20✔
251
    return new Date(ms);
20✔
252
  }
20✔
253

254
  get timestamp(): string {
18✔
255
    return this.time.toISOString();
19✔
256
  }
19✔
257

258
  get timestampNanos(): bigint {
×
259
    return parseTimestampNanos(this.reply);
×
260
  }
×
261

262
  doAck(payload: Uint8Array) {
18✔
263
    if (!this.didAck) {
3,678✔
264
      // all acks are final with the exception of +WPI
265
      this.didAck = !this.isWIP(payload);
3,678✔
266
      this.msg.respond(payload);
3,678✔
267
    }
3,678✔
268
  }
3,678✔
269

270
  isWIP(p: Uint8Array) {
18✔
271
    return p.length === 4 && p[0] === WPI[0] && p[1] === WPI[1] &&
3,678✔
272
      p[2] === WPI[2] && p[3] === WPI[3];
3,678✔
273
  }
3,678✔
274

275
  // this has to dig into the internals as the message has access
276
  // to the protocol but not the high-level client.
277
  async ackAck(opts?: Partial<{ timeout: number }>): Promise<boolean> {
18✔
278
    const d = deferred<boolean>();
34✔
279
    if (!this.didAck) {
34✔
280
      this.didAck = true;
48✔
281
      if (this.msg.reply) {
48✔
282
        opts = opts || {};
48✔
283
        opts.timeout = opts.timeout || this.timeout;
48✔
284
        const mi = this.msg as MsgImpl;
48✔
285
        const proto = mi.publisher as unknown as ProtocolHandler;
48✔
286
        const trace = !(proto.options?.noAsyncTraces || false);
48✔
287
        const r = new RequestOne(proto.muxSubscriptions, this.msg.reply, {
48✔
288
          timeout: opts.timeout,
48✔
289
        }, trace);
48✔
290
        proto.request(r);
48✔
291
        try {
48✔
292
          proto.publish(
48✔
293
            this.msg.reply,
48✔
294
            ACK,
48✔
295
            {
48✔
296
              reply: `${proto.muxSubscriptions.baseInbox}${r.token}`,
48✔
297
            },
48✔
298
          );
299
        } catch (err) {
×
300
          r.cancel(err as Error);
×
301
        }
×
302
        try {
48✔
303
          await Promise.race([r.timer, r.deferred]);
192✔
304
          d.resolve(true);
58✔
305
        } catch (err) {
48✔
306
          r.cancel(err as Error);
52✔
307
          d.reject(err);
52✔
308
        }
52✔
309
      } else {
×
310
        d.resolve(false);
×
311
      }
×
312
    } else {
34✔
313
      d.resolve(false);
36✔
314
    }
36✔
315
    return d;
34✔
316
  }
34✔
317

318
  ack() {
18✔
319
    this.doAck(ACK);
3,572✔
320
  }
3,572✔
321

322
  nak(millis?: number) {
18✔
323
    let payload: Uint8Array | string = NAK;
121✔
324
    if (millis) {
121✔
325
      payload = new TextEncoder().encode(
122✔
326
        `-NAK ${JSON.stringify({ delay: nanos(millis) })}`,
366✔
327
      );
328
    }
122✔
329
    this.doAck(payload);
121✔
330
  }
121✔
331

332
  working() {
18✔
333
    this.doAck(WPI);
19✔
334
  }
19✔
335

336
  next(subj: string, opts: Partial<PullOptions> = { batch: 1 }) {
×
337
    const args: Partial<PullOptions> = {};
×
338
    args.batch = opts.batch || 1;
×
339
    args.no_wait = opts.no_wait || false;
×
340
    if (opts.expires && opts.expires > 0) {
×
341
      args.expires = nanos(opts.expires);
×
342
    }
×
343
    const data = new TextEncoder().encode(JSON.stringify(args));
×
344
    const payload = DataBuffer.concat(NXT, SPACE, data);
×
345
    const reqOpts = subj ? { reply: subj } as RequestOptions : undefined;
×
346
    this.msg.respond(payload, reqOpts);
×
347
  }
×
348

349
  term(reason = "") {
18✔
350
    let term = TERM;
20✔
351
    if (reason?.length > 0) {
20✔
352
      term = new TextEncoder().encode(`+TERM ${reason}`) as Uint8Array<
21✔
353
        ArrayBuffer
354
      >;
355
    }
21✔
356
    this.doAck(term);
20✔
357
  }
20✔
358

UNCOV
359
  json<T = unknown>(): T {
18✔
UNCOV
360
    return this.msg.json();
52✔
UNCOV
361
  }
52✔
362

363
  string(): string {
18✔
364
    return this.msg.string();
19✔
365
  }
19✔
366
}
19✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc