• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/jetstream/src/jsbaseclient_api.ts
1
/*
2
 * Copyright 2021-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import {
20✔
17
  backoff,
20✔
18
  delay,
20✔
19
  Empty,
20✔
20
  errors,
20✔
21
  extend,
20✔
22
  RequestError,
20✔
23
} from "@nats-io/nats-core/internal";
20✔
24
import type {
25
  Msg,
26
  NatsConnection,
27
  NatsConnectionImpl,
28
  RequestOptions,
29
} from "@nats-io/nats-core/internal";
30
import type { ApiResponse } from "./jsapi_types.ts";
31
import type { JetStreamOptions } from "./types.ts";
32
import {
20✔
33
  ConsumerNotFoundError,
20✔
34
  JetStreamApiCodes,
20✔
35
  JetStreamApiError,
20✔
36
  JetStreamNotEnabled,
20✔
37
  StreamNotFoundError,
20✔
38
} from "./jserrors.ts";
20✔
39

40
const defaultPrefix = "$JS.API";
20✔
41
const defaultTimeout = 5000;
20✔
42

43
export function defaultJsOptions(opts?: JetStreamOptions): JetStreamOptions {
20✔
UNCOV
44
  opts = opts || {} as JetStreamOptions;
2,163✔
UNCOV
45
  if (opts.domain) {
2,790✔
UNCOV
46
    opts.apiPrefix = `$JS.${opts.domain}.API`;
2,793✔
UNCOV
47
    delete opts.domain;
2,793✔
UNCOV
48
  }
2,793✔
49
  return extend({ apiPrefix: defaultPrefix, timeout: defaultTimeout }, opts);
12,372✔
50
}
3,093✔
51

52
export type StreamNames = {
53
  streams: string[];
54
};
55

56
export type StreamNameBySubject = {
57
  subject: string;
58
};
59

60
export class BaseApiClientImpl {
20✔
61
  nc: NatsConnectionImpl;
20✔
62
  opts: JetStreamOptions;
3,090✔
63
  prefix: string;
3,090✔
64
  timeout: number;
20✔
65

66
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
67
    this.nc = nc as NatsConnectionImpl;
3,090✔
68
    this.opts = defaultJsOptions(opts);
3,090✔
69
    this._parseOpts();
3,090✔
70
    this.prefix = this.opts.apiPrefix!;
3,090✔
71
    this.timeout = this.opts.timeout!;
3,090✔
72
  }
3,090✔
73

74
  getOptions(): JetStreamOptions {
19✔
75
    return Object.assign({}, this.opts);
105✔
76
  }
105✔
77

78
  _parseOpts() {
20✔
79
    let prefix = this.opts.apiPrefix;
3,090✔
UNCOV
80
    if (!prefix || prefix.length === 0) {
2,160✔
UNCOV
81
      throw errors.InvalidArgumentError.format("prefix", "cannot be empty");
2,161✔
UNCOV
82
    }
2,161✔
83
    const c = prefix[prefix.length - 1];
5,231✔
UNCOV
84
    if (c === ".") {
2,160!
UNCOV
85
      prefix = prefix.substr(0, prefix.length - 1);
2,163✔
UNCOV
86
    }
2,163✔
87
    this.opts.apiPrefix = prefix;
5,231✔
88
  }
3,090✔
89

90
  async _request(
20✔
91
    subj: string,
20✔
92
    data: unknown = null,
20✔
93
    opts?: Partial<RequestOptions> & { retries?: number },
20✔
94
  ): Promise<unknown> {
20✔
95
    opts = opts || {} as RequestOptions;
2,530✔
96
    opts.timeout = this.timeout;
2,530✔
97

98
    let a: Uint8Array = Empty;
2,530✔
99
    if (data) {
2,530✔
100
      a = new TextEncoder().encode(JSON.stringify(data));
3,995✔
101
    }
3,995✔
102

103
    let { retries } = opts as {
2,530✔
104
      retries: number;
105
    };
106

107
    retries = retries || 1;
2,530✔
108
    retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
×
109
    const bo = backoff();
2,530✔
110

111
    for (let i = 0; i < retries; i++) {
2,530✔
112
      try {
2,530✔
113
        const m = await this.nc.request(
2,530✔
114
          subj,
2,530✔
115
          a,
2,530✔
116
          opts as RequestOptions,
2,530✔
117
        );
118
        return this.parseJsResponse(m);
4,346✔
119
      } catch (err) {
2,530✔
UNCOV
120
        const re = err instanceof RequestError ? err as RequestError : null;
1,901!
121
        if (
×
122
          (err instanceof errors.TimeoutError || re?.isNoResponders()) &&
×
123
          i + 1 < retries
×
124
        ) {
×
125
          await delay(bo.backoff(i));
×
126
        } else {
×
UNCOV
127
          throw re?.isNoResponders()
1,901!
UNCOV
128
            ? new JetStreamNotEnabled("jetstream is not enabled", {
1,901✔
UNCOV
129
              cause: err,
1,902✔
UNCOV
130
            })
1,901✔
UNCOV
131
            : err;
1,901✔
132
        }
2,783✔
133
      }
2,783✔
134
    }
2,530!
135
  }
×
136

UNCOV
137
  async findStream(subject: string): Promise<string> {
18✔
UNCOV
138
    const q = { subject } as StreamNameBySubject;
63✔
UNCOV
139
    const r = await this._request(`${this.prefix}.STREAM.NAMES`, q);
21✔
UNCOV
140
    const names = r as StreamNames;
21✔
UNCOV
141
    if (!names.streams || names.streams.length !== 1) {
21✔
UNCOV
142
      throw StreamNotFoundError.fromMessage("no stream matches subject");
22✔
UNCOV
143
    }
22✔
UNCOV
144
    return names.streams[0];
23✔
UNCOV
145
  }
21✔
146

147
  getConnection(): NatsConnection {
×
148
    return this.nc;
×
149
  }
×
150

151
  parseJsResponse(m: Msg): unknown {
20✔
152
    const v = JSON.parse(new TextDecoder().decode(m.data));
122,900✔
153
    const r = v as ApiResponse;
122,900✔
154
    if (r.error) {
122,900✔
155
      switch (r.error.err_code) {
123,165✔
UNCOV
156
        case JetStreamApiCodes.ConsumerNotFound:
109,334✔
UNCOV
157
          throw new ConsumerNotFoundError(r.error);
109,356✔
158
        case JetStreamApiCodes.StreamNotFound:
123,165✔
159
          throw new StreamNotFoundError(r.error);
123,306✔
UNCOV
160
        case JetStreamApiCodes.JetStreamNotEnabledForAccount: {
218,669✔
UNCOV
161
          const jserr = new JetStreamApiError(r.error);
109,335✔
UNCOV
162
          throw new JetStreamNotEnabled(jserr.message, { cause: jserr });
328,005✔
UNCOV
163
        }
109,335✔
164
        default:
123,165✔
165
          throw new JetStreamApiError(r.error);
123,266✔
166
      }
123,165✔
167
    }
123,165✔
168
    return v;
245,515✔
169
  }
122,900✔
170
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc