• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13864023451

14 Mar 2025 07:32PM UTC coverage: 70.268% (-5.2%) from 75.444%
13864023451

push

github

web-flow
Add support for message timestamp in nanoseconds (#220)

Introduced a new property `timestampNanos` to represent message timestamps as a BigInt for higher precision.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

1507 of 2310 branches covered (65.24%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 1 file covered. (0.0%)

646 existing lines in 23 files now uncovered.

8263 of 11594 relevant lines covered (71.27%)

222724.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/jetstream/src/jsbaseclient_api.ts
1
/*
2
 * Copyright 2021-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
import {
20✔
17
  backoff,
20✔
18
  delay,
20✔
19
  Empty,
20✔
20
  errors,
20✔
21
  extend,
20✔
22
  RequestError,
20✔
23
} from "@nats-io/nats-core/internal";
20✔
24
import type {
25
  Msg,
26
  NatsConnection,
27
  NatsConnectionImpl,
28
  RequestOptions,
29
} from "@nats-io/nats-core/internal";
30
import type { ApiResponse } from "./jsapi_types.ts";
31
import type { JetStreamOptions } from "./types.ts";
32
import {
20✔
33
  ConsumerNotFoundError,
20✔
34
  JetStreamApiCodes,
20✔
35
  JetStreamApiError,
20✔
36
  JetStreamNotEnabled,
20✔
37
  StreamNotFoundError,
20✔
38
} from "./jserrors.ts";
20✔
39

40
const defaultPrefix = "$JS.API";
20✔
41
const defaultTimeout = 5000;
20✔
42

43
export function defaultJsOptions(opts?: JetStreamOptions): JetStreamOptions {
20✔
44
  opts = opts || {} as JetStreamOptions;
2,074✔
UNCOV
45
  if (opts.domain) {
2,701✔
UNCOV
46
    opts.apiPrefix = `$JS.${opts.domain}.API`;
2,704✔
UNCOV
47
    delete opts.domain;
2,704✔
UNCOV
48
  }
2,704✔
49
  return extend({ apiPrefix: defaultPrefix, timeout: defaultTimeout }, opts);
11,988✔
50
}
2,997✔
51

52
export type StreamNames = {
53
  streams: string[];
54
};
55

56
export type StreamNameBySubject = {
57
  subject: string;
58
};
59

60
export class BaseApiClientImpl {
20✔
61
  nc: NatsConnectionImpl;
20✔
62
  opts: JetStreamOptions;
2,994✔
63
  prefix: string;
2,994✔
64
  timeout: number;
20✔
65

66
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
20✔
67
    this.nc = nc as NatsConnectionImpl;
2,994✔
68
    this.opts = defaultJsOptions(opts);
2,994✔
69
    this._parseOpts();
2,994✔
70
    this.prefix = this.opts.apiPrefix!;
2,994✔
71
    this.timeout = this.opts.timeout!;
2,994✔
72
  }
2,994✔
73

74
  getOptions(): JetStreamOptions {
19✔
75
    return Object.assign({}, this.opts);
101✔
76
  }
101✔
77

78
  _parseOpts() {
20✔
79
    let prefix = this.opts.apiPrefix;
2,994✔
80
    if (!prefix || prefix.length === 0) {
2,071✔
81
      throw errors.InvalidArgumentError.format("prefix", "cannot be empty");
2,072✔
82
    }
2,072✔
83
    const c = prefix[prefix.length - 1];
5,046✔
84
    if (c === ".") {
2,071!
85
      prefix = prefix.substr(0, prefix.length - 1);
2,074✔
86
    }
2,074✔
87
    this.opts.apiPrefix = prefix;
5,046✔
88
  }
2,994✔
89

90
  async _request(
20✔
91
    subj: string,
20✔
92
    data: unknown = null,
20✔
93
    opts?: Partial<RequestOptions> & { retries?: number },
20✔
94
  ): Promise<unknown> {
20✔
95
    opts = opts || {} as RequestOptions;
2,476✔
96
    opts.timeout = this.timeout;
2,476✔
97

98
    let a: Uint8Array = Empty;
2,476✔
99
    if (data) {
2,476✔
100
      a = new TextEncoder().encode(JSON.stringify(data));
3,915✔
101
    }
3,915✔
102

103
    let { retries } = opts as {
2,476✔
104
      retries: number;
105
    };
106

107
    retries = retries || 1;
2,476✔
108
    retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
×
109
    const bo = backoff();
2,476✔
110

111
    for (let i = 0; i < retries; i++) {
2,476✔
112
      try {
2,476✔
113
        const m = await this.nc.request(
2,476✔
114
          subj,
2,476✔
115
          a,
2,476✔
116
          opts as RequestOptions,
2,476✔
117
        );
118
        return this.parseJsResponse(m);
4,242✔
119
      } catch (err) {
2,476✔
120
        const re = err instanceof RequestError ? err as RequestError : null;
1,850!
121
        if (
×
122
          (err instanceof errors.TimeoutError || re?.isNoResponders()) &&
×
123
          i + 1 < retries
×
124
        ) {
×
125
          await delay(bo.backoff(i));
×
126
        } else {
×
127
          throw re?.isNoResponders()
1,850!
128
            ? new JetStreamNotEnabled("jetstream is not enabled", {
1,850✔
129
              cause: err,
1,851✔
130
            })
1,850✔
131
            : err;
1,850✔
132
        }
2,727✔
133
      }
2,727✔
134
    }
2,476!
135
  }
×
136

137
  async findStream(subject: string): Promise<string> {
18✔
138
    const q = { subject } as StreamNameBySubject;
63✔
139
    const r = await this._request(`${this.prefix}.STREAM.NAMES`, q);
21✔
140
    const names = r as StreamNames;
21✔
141
    if (!names.streams || names.streams.length !== 1) {
21✔
142
      throw StreamNotFoundError.fromMessage("no stream matches subject");
22✔
143
    }
22✔
144
    return names.streams[0];
23✔
145
  }
21✔
146

147
  getConnection(): NatsConnection {
×
148
    return this.nc;
×
149
  }
×
150

151
  parseJsResponse(m: Msg): unknown {
20✔
152
    const v = JSON.parse(new TextDecoder().decode(m.data));
122,422✔
153
    const r = v as ApiResponse;
122,422✔
154
    if (r.error) {
122,422✔
155
      switch (r.error.err_code) {
122,685✔
156
        case JetStreamApiCodes.ConsumerNotFound:
108,858✔
157
          throw new ConsumerNotFoundError(r.error);
108,879✔
158
        case JetStreamApiCodes.StreamNotFound:
122,685✔
159
          throw new StreamNotFoundError(r.error);
122,825✔
160
        case JetStreamApiCodes.JetStreamNotEnabledForAccount: {
217,717✔
161
          const jserr = new JetStreamApiError(r.error);
108,859✔
162
          throw new JetStreamNotEnabled(jserr.message, { cause: jserr });
326,577✔
163
        }
108,859✔
164
        default:
122,685✔
165
          throw new JetStreamApiError(r.error);
122,786✔
166
      }
122,685✔
167
    }
122,685✔
168
    return v;
244,560✔
169
  }
122,422✔
170
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc