• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13159645374

05 Feb 2025 02:30PM UTC coverage: 82.694% (+13.1%) from 69.574%
13159645374

push

github

web-flow
Add heartbeat handling to key iteration (#203)

* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done.

Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat.

* history for kv has the same issue - if values are purged in flight, the iteration may hang.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2259 of 3072 branches covered (73.54%)

Branch coverage included in aggregate %.

4 of 22 new or added lines in 1 file covered. (18.18%)

2912 existing lines in 30 files now uncovered.

9634 of 11310 relevant lines covered (85.18%)

788186.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/jetstream/src/jsbaseclient_api.ts
1
/*
2
 * Copyright 2021-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import {
20✔
17
  backoff,
20✔
18
  delay,
20✔
19
  Empty,
20✔
20
  errors,
20✔
21
  extend,
20✔
22
  RequestError,
20✔
23
} from "@nats-io/nats-core/internal";
20✔
24
import type {
25
  Msg,
26
  NatsConnection,
27
  NatsConnectionImpl,
28
  RequestOptions,
29
} from "@nats-io/nats-core/internal";
30
import type { ApiResponse } from "./jsapi_types.ts";
31
import type { JetStreamOptions } from "./types.ts";
32
import {
20✔
33
  ConsumerNotFoundError,
20✔
34
  JetStreamApiCodes,
20✔
35
  JetStreamApiError,
20✔
36
  JetStreamNotEnabled,
20✔
37
  StreamNotFoundError,
20✔
38
} from "./jserrors.ts";
20✔
39

40
const defaultPrefix = "$JS.API";
20✔
41
const defaultTimeout = 5000;
20✔
42

43
export function defaultJsOptions(opts?: JetStreamOptions): JetStreamOptions {
20✔
UNCOV
44
  opts = opts || {} as JetStreamOptions;
1,981✔
UNCOV
45
  if (opts.domain) {
2,608✔
UNCOV
46
    opts.apiPrefix = `$JS.${opts.domain}.API`;
2,611✔
UNCOV
47
    delete opts.domain;
2,611✔
UNCOV
48
  }
2,611✔
49
  return extend({ apiPrefix: defaultPrefix, timeout: defaultTimeout }, opts);
11,616✔
50
}
2,904✔
51

52
export type StreamNames = {
53
  streams: string[];
54
};
55

56
export type StreamNameBySubject = {
57
  subject: string;
58
};
59

60
export class BaseApiClientImpl {
20✔
61
  nc: NatsConnectionImpl;
20✔
62
  opts: JetStreamOptions;
2,901✔
63
  prefix: string;
2,901✔
64
  timeout: number;
20✔
65

66
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
67
    this.nc = nc as NatsConnectionImpl;
2,901✔
68
    this.opts = defaultJsOptions(opts);
2,901✔
69
    this._parseOpts();
2,901✔
70
    this.prefix = this.opts.apiPrefix!;
2,901✔
71
    this.timeout = this.opts.timeout!;
2,901✔
72
  }
2,901✔
73

74
  getOptions(): JetStreamOptions {
19✔
75
    return Object.assign({}, this.opts);
83✔
76
  }
83✔
77

78
  _parseOpts() {
20✔
79
    let prefix = this.opts.apiPrefix;
2,901✔
UNCOV
80
    if (!prefix || prefix.length === 0) {
1,978✔
UNCOV
81
      throw errors.InvalidArgumentError.format("prefix", "cannot be empty");
1,979✔
UNCOV
82
    }
1,979✔
83
    const c = prefix[prefix.length - 1];
4,860✔
UNCOV
84
    if (c === ".") {
1,978!
UNCOV
85
      prefix = prefix.substr(0, prefix.length - 1);
1,981✔
UNCOV
86
    }
1,981✔
87
    this.opts.apiPrefix = prefix;
4,860✔
88
  }
2,901✔
89

90
  async _request(
20✔
91
    subj: string,
20✔
92
    data: unknown = null,
20✔
93
    opts?: Partial<RequestOptions> & { retries?: number },
20✔
94
  ): Promise<unknown> {
20✔
95
    opts = opts || {} as RequestOptions;
2,408✔
96
    opts.timeout = this.timeout;
2,408✔
97

98
    let a: Uint8Array = Empty;
2,408✔
99
    if (data) {
2,408✔
100
      a = new TextEncoder().encode(JSON.stringify(data));
3,814✔
101
    }
3,814✔
102

103
    let { retries } = opts as {
2,408✔
104
      retries: number;
105
    };
106

107
    retries = retries || 1;
2,408✔
108
    retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
×
109
    const bo = backoff();
2,408✔
110

111
    for (let i = 0; i < retries; i++) {
2,408✔
112
      try {
2,408✔
113
        const m = await this.nc.request(
2,408✔
114
          subj,
2,408✔
115
          a,
2,408✔
116
          opts as RequestOptions,
2,408✔
117
        );
118
        return this.parseJsResponse(m);
4,106✔
119
      } catch (err) {
2,408✔
UNCOV
120
        const re = err instanceof RequestError ? err as RequestError : null;
1,777!
121
        if (
×
122
          (err instanceof errors.TimeoutError || re?.isNoResponders()) &&
×
123
          i + 1 < retries
×
124
        ) {
×
125
          await delay(bo.backoff(i));
×
126
        } else {
×
UNCOV
127
          throw re?.isNoResponders()
1,777!
UNCOV
128
            ? new JetStreamNotEnabled("jetstream is not enabled", {
1,777✔
UNCOV
129
              cause: err,
1,778✔
UNCOV
130
            })
1,777✔
UNCOV
131
            : err;
1,777✔
132
        }
2,654✔
133
      }
2,654✔
134
    }
2,408!
135
  }
×
136

UNCOV
137
  async findStream(subject: string): Promise<string> {
18✔
UNCOV
138
    const q = { subject } as StreamNameBySubject;
63✔
UNCOV
139
    const r = await this._request(`${this.prefix}.STREAM.NAMES`, q);
21✔
UNCOV
140
    const names = r as StreamNames;
21✔
UNCOV
141
    if (!names.streams || names.streams.length !== 1) {
21✔
UNCOV
142
      throw StreamNotFoundError.fromMessage("no stream matches subject");
22✔
UNCOV
143
    }
22✔
UNCOV
144
    return names.streams[0];
23✔
UNCOV
145
  }
21✔
146

147
  getConnection(): NatsConnection {
×
148
    return this.nc;
×
149
  }
×
150

151
  parseJsResponse(m: Msg): unknown {
20✔
152
    const v = JSON.parse(new TextDecoder().decode(m.data));
122,345✔
153
    const r = v as ApiResponse;
122,345✔
154
    if (r.error) {
122,345✔
155
      switch (r.error.err_code) {
122,603✔
UNCOV
156
        case JetStreamApiCodes.ConsumerNotFound:
108,776✔
UNCOV
157
          throw new ConsumerNotFoundError(r.error);
108,794✔
158
        case JetStreamApiCodes.StreamNotFound:
122,603✔
159
          throw new StreamNotFoundError(r.error);
122,741✔
UNCOV
160
        case JetStreamApiCodes.JetStreamNotEnabledForAccount: {
217,553✔
UNCOV
161
          const jserr = new JetStreamApiError(r.error);
108,777✔
UNCOV
162
          throw new JetStreamNotEnabled(jserr.message, { cause: jserr });
326,331✔
UNCOV
163
        }
108,777✔
164
        default:
122,603✔
165
          throw new JetStreamApiError(r.error);
122,704✔
166
      }
122,603✔
167
    }
122,603✔
168
    return v;
244,411✔
169
  }
122,345✔
170
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc