• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 16152724641

08 Jul 2025 07:36PM UTC coverage: 70.4% (-14.3%) from 84.715%
16152724641

push

github

aricart
chore: bump versions across modules and update dependencies

- Updated versions for NATS modules, including core, JetStream, KV, object store, services, and transport.
- Upgraded `@types/node`, `shx`, `typescript`, and `js-sha256` dependencies.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

1496 of 2225 branches covered (67.24%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

248 existing lines in 7 files now uncovered.

7882 of 11096 relevant lines covered (71.03%)

236642.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.89
/jetstream/src/jsclient.ts
1
/*
2
 * Copyright 2022-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import { BaseApiClientImpl } from "./jsbaseclient_api.ts";
20✔
17
import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
20✔
18
import {
20✔
19
  backoff,
20✔
20
  delay,
20✔
21
  Empty,
20✔
22
  QueuedIteratorImpl,
20✔
23
} from "@nats-io/nats-core/internal";
20✔
24

25
import { ConsumersImpl, StreamAPIImpl, StreamsImpl } from "./jsmstream_api.ts";
20✔
26

27
import type {
28
  Advisory,
29
  AdvisoryKind,
30
  ConsumerAPI,
31
  Consumers,
32
  DirectStreamAPI,
33
  JetStreamClient,
34
  JetStreamManager,
35
  JetStreamManagerOptions,
36
  JetStreamOptions,
37
  JetStreamPublishOptions,
38
  PubAck,
39
  StreamAPI,
40
  Streams,
41
} from "./types.ts";
42
import { errors, headers, RequestError } from "@nats-io/nats-core/internal";
20✔
43

44
import type {
45
  Msg,
46
  NatsConnection,
47
  Payload,
48
  RequestOptions,
49
} from "@nats-io/nats-core/internal";
50
import { PubHeaders } from "./jsapi_types.ts";
20✔
51
import type {
52
  AccountInfoResponse,
53
  ApiResponse,
54
  JetStreamAccountStats,
55
} from "./jsapi_types.ts";
56
import { JetStreamError, JetStreamNotEnabled } from "./jserrors.ts";
20✔
57
import { DirectStreamAPIImpl } from "./jsm_direct.ts";
20✔
58

59
export function toJetStreamClient(
2✔
60
  nc: NatsConnection | JetStreamClient,
2✔
61
): JetStreamClient {
62
  //@ts-ignore: see if we have a nc
63
  if (typeof nc.nc === "undefined") {
138✔
64
    return jetstream(nc as NatsConnection);
146✔
65
  }
146!
66
  return nc as JetStreamClient;
189✔
67
}
138✔
68

69
/**
70
 * Returns a {@link JetStreamClient} supported by the specified NatsConnection
71
 * @param nc
72
 * @param opts
73
 */
74
export function jetstream(
20✔
75
  nc: NatsConnection,
20✔
76
  opts: JetStreamManagerOptions = {},
20✔
77
): JetStreamClient {
78
  return new JetStreamClientImpl(nc, opts);
371✔
79
}
371✔
80

81
/**
82
 * Returns a {@link JetStreamManager} supported by the specified NatsConnection
83
 * @param nc
84
 * @param opts
85
 */
86
export async function jetstreamManager(
20✔
87
  nc: NatsConnection,
20✔
88
  opts: JetStreamOptions | JetStreamManagerOptions = {},
20✔
89
): Promise<JetStreamManager> {
90
  const adm = new JetStreamManagerImpl(nc, opts);
543✔
91
  if ((opts as JetStreamManagerOptions).checkAPI !== false) {
543✔
92
    try {
1,006✔
93
      await adm.getAccountInfo();
1,006✔
94
    } catch (err) {
742!
95
      throw err;
744✔
96
    }
744✔
97
  }
1,006✔
98
  return adm;
906✔
99
}
543✔
100

101
export class JetStreamManagerImpl extends BaseApiClientImpl
20✔
102
  implements JetStreamManager {
103
  streams: StreamAPI;
20✔
104
  consumers: ConsumerAPI;
545✔
105
  direct: DirectStreamAPI;
20✔
106

107
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
108
    super(nc, opts);
545✔
109
    this.streams = new StreamAPIImpl(nc, opts);
545✔
110
    this.consumers = new ConsumerAPIImpl(nc, opts);
545✔
111
    this.direct = new DirectStreamAPIImpl(nc, opts);
545✔
112
  }
545✔
113

114
  async getAccountInfo(): Promise<JetStreamAccountStats> {
20✔
115
    const r = await this._request(`${this.prefix}.INFO`);
534✔
116
    return r as AccountInfoResponse;
895✔
117
  }
534✔
118

119
  jetstream(): JetStreamClient {
18✔
120
    return jetstream(this.nc, this.getOptions());
51✔
121
  }
51✔
122

123
  advisories(): AsyncIterable<Advisory> {
18✔
124
    const iter = new QueuedIteratorImpl<Advisory>();
21✔
125
    this.nc.subscribe(`$JS.EVENT.ADVISORY.>`, {
21✔
126
      callback: (err, msg) => {
21✔
127
        if (err) {
×
128
          throw err;
×
129
        }
×
130
        try {
29✔
131
          const d = this.parseJsResponse(msg) as ApiResponse;
29✔
132
          const chunks = d.type.split(".");
29✔
133
          const kind = chunks[chunks.length - 1];
29✔
134
          iter.push({ kind: kind as AdvisoryKind, data: d });
116✔
135
        } catch (err) {
×
136
          iter.stop(err as Error);
×
137
        }
×
138
      },
29✔
139
    });
21✔
140

141
    return iter;
21✔
142
  }
21✔
143
}
20✔
144

145
export class JetStreamClientImpl extends BaseApiClientImpl
20✔
146
  implements JetStreamClient {
147
  consumers: Consumers;
20✔
148
  streams: Streams;
372✔
149
  consumerAPI: ConsumerAPI;
372✔
150
  streamAPI: StreamAPIImpl;
20✔
151
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
152
    super(nc, opts);
373✔
153
    this.consumerAPI = new ConsumerAPIImpl(nc, opts);
373✔
154
    this.streamAPI = new StreamAPIImpl(nc, opts);
373✔
155
    this.consumers = new ConsumersImpl(this.consumerAPI);
373✔
156
    this.streams = new StreamsImpl(this.streamAPI);
373✔
157
  }
373✔
158

159
  jetstreamManager(checkAPI?: boolean): Promise<JetStreamManager> {
20✔
160
    if (checkAPI === undefined) {
180✔
161
      checkAPI = (this.opts as JetStreamManagerOptions).checkAPI;
286✔
162
    }
286✔
163
    const opts = Object.assign(
180✔
164
      {},
180✔
165
      this.opts,
180✔
166
      { checkAPI },
540✔
167
    ) as JetStreamManagerOptions;
168
    return jetstreamManager(this.nc, opts);
180✔
169
  }
180✔
170

171
  get apiPrefix(): string {
1✔
172
    return this.prefix;
105✔
173
  }
105✔
174

175
  async publish(
20✔
176
    subj: string,
20✔
177
    data: Payload = Empty,
20✔
178
    opts?: Partial<JetStreamPublishOptions>,
20✔
179
  ): Promise<PubAck> {
20✔
180
    opts = opts || {};
120,640✔
181
    opts = { ...opts };
361,920✔
182
    opts.expect = opts.expect || {};
120,640✔
183
    const mh = opts?.headers || headers();
120,640✔
184
    if (opts) {
120,640✔
185
      if (opts.msgID) {
107,686!
186
        mh.set(PubHeaders.MsgIdHdr, opts.msgID);
107,707✔
187
      }
107,707✔
188
      if (opts.expect.lastMsgID) {
107,686!
189
        mh.set(PubHeaders.ExpectedLastMsgIdHdr, opts.expect.lastMsgID);
107,688✔
190
      }
107,688✔
191
      if (opts.expect.streamName) {
107,686!
192
        mh.set(PubHeaders.ExpectedStreamHdr, opts.expect.streamName);
107,688✔
193
      }
107,688✔
194
      if (typeof opts.expect.lastSequence === "number") {
107,686!
195
        mh.set(PubHeaders.ExpectedLastSeqHdr, `${opts.expect.lastSequence}`);
107,716✔
196
      }
107,716✔
197
      if (typeof opts.expect.lastSubjectSequence === "number") {
107,686!
198
        mh.set(
107,710✔
199
          PubHeaders.ExpectedLastSubjectSequenceHdr,
107,710✔
200
          `${opts.expect.lastSubjectSequence}`,
107,710✔
201
        );
202
      }
107,710✔
203
      if (opts.expect.lastSubjectSequenceSubject) {
107,686!
204
        mh.set(
107,708✔
205
          PubHeaders.ExpectedLastSubjectSequenceSubjectHdr,
107,708✔
206
          opts.expect.lastSubjectSequenceSubject,
107,708✔
207
        );
208
      }
107,708✔
UNCOV
209
      if (opts.ttl) {
107,686!
UNCOV
210
        mh.set(
107,687✔
UNCOV
211
          PubHeaders.MessageTTL,
107,687✔
UNCOV
212
          `${opts.ttl}`,
107,687✔
213
        );
UNCOV
214
      }
107,687✔
215
    }
120,640✔
216

217
    const to = opts.timeout || this.timeout;
120,640✔
218
    const ro = {} as RequestOptions;
120,640✔
219
    if (to) {
120,640✔
220
      ro.timeout = to;
120,640✔
221
    }
120,640✔
222
    if (opts) {
120,640✔
223
      ro.headers = mh;
120,640✔
224
    }
120,640✔
225

226
    let { retries } = opts as {
120,640✔
227
      retries: number;
228
    };
229
    retries = retries || 1;
120,640✔
230
    const bo = backoff();
120,640✔
231

232
    let r: Msg | null = null;
120,640✔
233
    for (let i = 0; i < retries; i++) {
120,640✔
234
      try {
228,314✔
235
        r = await this.nc.request(subj, data, ro);
228,314✔
236
        // if here we succeeded
237
        break;
336,264✔
238
      } catch (err) {
215,653!
239
        const re = err instanceof RequestError ? err as RequestError : null;
215,372!
240
        if (
215,372✔
241
          (err instanceof errors.TimeoutError || re?.isNoResponders()) &&
215,372!
242
          i + 1 < retries
215,372✔
243
        ) {
215,372!
244
          await delay(bo.backoff(i));
215,378✔
245
        } else {
215,378✔
246
          throw re?.isNoResponders()
215,378!
UNCOV
247
            ? new JetStreamNotEnabled(`jetstream is not enabled`, {
215,378✔
UNCOV
248
              cause: err,
215,380✔
UNCOV
249
            })
215,378✔
250
            : err;
215,378✔
251
        }
215,675✔
252
      }
215,669✔
253
    }
228,314✔
254

255
    const pa = this.parseJsResponse(r!) as PubAck;
228,590✔
UNCOV
256
    if (pa.stream === "") {
×
UNCOV
257
      throw new JetStreamError("invalid ack response");
×
UNCOV
258
    }
✔
UNCOV
259
    pa.duplicate = pa.duplicate ? pa.duplicate : false;
107,686!
260
    return pa;
120,640✔
261
  }
120,640✔
262
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc