• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13091454488

01 Feb 2025 06:48PM UTC coverage: 82.681% (-0.05%) from 82.727%
13091454488

push

github

web-flow
fix(jetstream): direct batch get missing `next_by_subj` filter (#201)

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2272 of 3100 branches covered (73.29%)

Branch coverage included in aggregate %.

9625 of 11289 relevant lines covered (85.26%)

787241.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/jetstream/src/jsbaseclient_api.ts
1
/*
2
 * Copyright 2021-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import {
20✔
17
  backoff,
20✔
18
  delay,
20✔
19
  Empty,
20✔
20
  errors,
20✔
21
  extend,
20✔
22
  RequestError,
20✔
23
} from "@nats-io/nats-core/internal";
20✔
24
import type {
25
  Msg,
26
  NatsConnection,
27
  NatsConnectionImpl,
28
  RequestOptions,
29
} from "@nats-io/nats-core/internal";
30
import type { ApiResponse } from "./jsapi_types.ts";
31
import type { JetStreamOptions } from "./types.ts";
32
import {
20✔
33
  ConsumerNotFoundError,
20✔
34
  JetStreamApiCodes,
20✔
35
  JetStreamApiError,
20✔
36
  JetStreamNotEnabled,
20✔
37
  StreamNotFoundError,
20✔
38
} from "./jserrors.ts";
20✔
39

40
const defaultPrefix = "$JS.API";
20✔
41
const defaultTimeout = 5000;
20✔
42

43
export function defaultJsOptions(opts?: JetStreamOptions): JetStreamOptions {
20✔
44
  opts = opts || {} as JetStreamOptions;
1,981✔
45
  if (opts.domain) {
2,608✔
46
    opts.apiPrefix = `$JS.${opts.domain}.API`;
2,611✔
47
    delete opts.domain;
2,611✔
48
  }
2,611✔
49
  return extend({ apiPrefix: defaultPrefix, timeout: defaultTimeout }, opts);
11,616✔
50
}
2,904✔
51

52
export type StreamNames = {
53
  streams: string[];
54
};
55

56
export type StreamNameBySubject = {
57
  subject: string;
58
};
59

60
export class BaseApiClientImpl {
20✔
61
  nc: NatsConnectionImpl;
20✔
62
  opts: JetStreamOptions;
2,901✔
63
  prefix: string;
2,901✔
64
  timeout: number;
20✔
65

66
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
67
    this.nc = nc as NatsConnectionImpl;
2,901✔
68
    this.opts = defaultJsOptions(opts);
2,901✔
69
    this._parseOpts();
2,901✔
70
    this.prefix = this.opts.apiPrefix!;
2,901✔
71
    this.timeout = this.opts.timeout!;
2,901✔
72
  }
2,901✔
73

74
  getOptions(): JetStreamOptions {
19✔
75
    return Object.assign({}, this.opts);
83✔
76
  }
83✔
77

78
  _parseOpts() {
20✔
79
    let prefix = this.opts.apiPrefix;
2,901✔
80
    if (!prefix || prefix.length === 0) {
1,978✔
81
      throw errors.InvalidArgumentError.format("prefix", "cannot be empty");
1,979✔
82
    }
1,979✔
83
    const c = prefix[prefix.length - 1];
4,860✔
84
    if (c === ".") {
1,978!
85
      prefix = prefix.substr(0, prefix.length - 1);
1,981✔
86
    }
1,981✔
87
    this.opts.apiPrefix = prefix;
4,860✔
88
  }
2,901✔
89

90
  async _request(
20✔
91
    subj: string,
20✔
92
    data: unknown = null,
20✔
93
    opts?: Partial<RequestOptions> & { retries?: number },
20✔
94
  ): Promise<unknown> {
20✔
95
    opts = opts || {} as RequestOptions;
2,408✔
96
    opts.timeout = this.timeout;
2,408✔
97

98
    let a: Uint8Array = Empty;
2,408✔
99
    if (data) {
2,408✔
100
      a = new TextEncoder().encode(JSON.stringify(data));
3,814✔
101
    }
3,814✔
102

103
    let { retries } = opts as {
2,408✔
104
      retries: number;
105
    };
106

107
    retries = retries || 1;
2,408✔
108
    retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
×
109
    const bo = backoff();
2,408✔
110

111
    for (let i = 0; i < retries; i++) {
2,408✔
112
      try {
2,408✔
113
        const m = await this.nc.request(
2,408✔
114
          subj,
2,408✔
115
          a,
2,408✔
116
          opts as RequestOptions,
2,408✔
117
        );
118
        return this.parseJsResponse(m);
4,106✔
119
      } catch (err) {
2,408✔
120
        const re = err instanceof RequestError ? err as RequestError : null;
1,776!
121
        if (
×
122
          (err instanceof errors.TimeoutError || re?.isNoResponders()) &&
×
123
          i + 1 < retries
×
124
        ) {
×
125
          await delay(bo.backoff(i));
×
126
        } else {
×
127
          throw re?.isNoResponders()
1,776!
128
            ? new JetStreamNotEnabled("jetstream is not enabled", {
1,776✔
129
              cause: err,
1,777✔
130
            })
1,776✔
131
            : err;
1,776✔
132
        }
2,653✔
133
      }
2,653✔
134
    }
2,408!
135
  }
×
136

137
  async findStream(subject: string): Promise<string> {
18✔
138
    const q = { subject } as StreamNameBySubject;
63✔
139
    const r = await this._request(`${this.prefix}.STREAM.NAMES`, q);
21✔
140
    const names = r as StreamNames;
21✔
141
    if (!names.streams || names.streams.length !== 1) {
21✔
142
      throw StreamNotFoundError.fromMessage("no stream matches subject");
22✔
143
    }
22✔
144
    return names.streams[0];
23✔
145
  }
21✔
146

147
  getConnection(): NatsConnection {
×
148
    return this.nc;
×
149
  }
×
150

151
  parseJsResponse(m: Msg): unknown {
20✔
152
    const v = JSON.parse(new TextDecoder().decode(m.data));
122,345✔
153
    const r = v as ApiResponse;
122,345✔
154
    if (r.error) {
122,345✔
155
      switch (r.error.err_code) {
122,602✔
156
        case JetStreamApiCodes.ConsumerNotFound:
108,775✔
157
          throw new ConsumerNotFoundError(r.error);
108,792✔
158
        case JetStreamApiCodes.StreamNotFound:
122,602✔
159
          throw new StreamNotFoundError(r.error);
122,740✔
160
        case JetStreamApiCodes.JetStreamNotEnabledForAccount: {
217,551✔
161
          const jserr = new JetStreamApiError(r.error);
108,776✔
162
          throw new JetStreamNotEnabled(jserr.message, { cause: jserr });
326,328✔
163
        }
108,776✔
164
        default:
122,602✔
165
          throw new JetStreamApiError(r.error);
122,703✔
166
      }
122,602✔
167
    }
122,602✔
168
    return v;
244,412✔
169
  }
122,345✔
170
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc