• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.deno / 13316451319

13 Feb 2025 08:24PM UTC coverage: 85.018% (-0.1%) from 85.119%
13316451319

push

github

web-flow
Merge pull request #748 from nats-io/1_29_2

[BUMP] ci server version, client version

1900 of 2293 branches covered (82.86%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

24 existing lines in 3 files now uncovered.

11163 of 13072 relevant lines covered (85.4%)

834069.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/jetstream/jsbaseclient_api.ts
1
/*
1✔
2
 * Copyright 2021-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
53✔
15

16
import { Empty } from "../nats-base-client/encoders.ts";
53✔
17
import { Codec, JSONCodec } from "../nats-base-client/codec.ts";
53✔
18
import { backoff, delay, extend } from "../nats-base-client/util.ts";
53✔
19
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
20
import { checkJsErrorCode } from "./jsutil.ts";
53✔
21
import {
53✔
22
  ErrorCode,
53✔
23
  JetStreamOptions,
24
  Msg,
25
  NatsConnection,
26
  NatsError,
27
  RequestOptions,
28
} from "../nats-base-client/core.ts";
53✔
29
import { ApiResponse } from "./jsapi_types.ts";
30

31
const defaultPrefix = "$JS.API";
53✔
32
const defaultTimeout = 5000;
53✔
33

34
export function defaultJsOptions(opts?: JetStreamOptions): JetStreamOptions {
53✔
35
  opts = opts || {} as JetStreamOptions;
4,080✔
36
  if (opts.domain) {
4,080✔
37
    opts.apiPrefix = `$JS.${opts.domain}.API`;
4,087✔
38
    delete opts.domain;
4,087✔
39
  }
4,087✔
40
  return extend({ apiPrefix: defaultPrefix, timeout: defaultTimeout }, opts);
16,320✔
41
}
4,080✔
42

43
export interface StreamNames {
44
  streams: string[];
45
}
46

47
export interface StreamNameBySubject {
48
  subject: string;
49
}
50

51
export class BaseApiClient {
53✔
52
  nc: NatsConnectionImpl;
53✔
53
  opts: JetStreamOptions;
4,071✔
54
  prefix: string;
4,071✔
55
  timeout: number;
4,071✔
56
  jc: Codec<unknown>;
53✔
57

58
  constructor(nc: NatsConnection, opts?: JetStreamOptions) {
53✔
59
    this.nc = nc as NatsConnectionImpl;
4,071✔
60
    this.opts = defaultJsOptions(opts);
4,071✔
61
    this._parseOpts();
4,071✔
62
    this.prefix = this.opts.apiPrefix!;
4,071✔
63
    this.timeout = this.opts.timeout!;
4,071✔
64
    this.jc = JSONCodec();
4,071✔
65
  }
4,071✔
66

67
  getOptions(): JetStreamOptions {
53✔
68
    return Object.assign({}, this.opts);
107✔
69
  }
107✔
70

71
  _parseOpts() {
53✔
72
    let prefix = this.opts.apiPrefix;
4,071✔
73
    if (!prefix || prefix.length === 0) {
4,071✔
74
      throw new Error("invalid empty prefix");
4,074✔
75
    }
4,074✔
76
    const c = prefix[prefix.length - 1];
8,086✔
77
    if (c === ".") {
4,071✔
78
      prefix = prefix.substr(0, prefix.length - 1);
4,080✔
79
    }
4,080✔
80
    this.opts.apiPrefix = prefix;
8,086✔
81
  }
4,071✔
82

83
  async _request(
53✔
84
    subj: string,
53✔
85
    data: unknown = null,
53✔
86
    opts?: Partial<RequestOptions> & { retries?: number },
53✔
87
  ): Promise<unknown> {
53✔
88
    opts = opts || {} as RequestOptions;
3,118✔
89
    opts.timeout = this.timeout;
3,118✔
90

91
    let a: Uint8Array = Empty;
3,118✔
92
    if (data) {
3,118✔
93
      a = this.jc.encode(data);
4,884✔
94
    }
4,884✔
95

96
    let { retries } = opts as {
3,118✔
97
      retries: number;
98
    };
99

100
    retries = retries || 1;
3,118✔
101
    retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
3,118✔
102
    const bo = backoff();
3,118✔
103

104
    for (let i = 0; i < retries; i++) {
3,118✔
105
      try {
3,118✔
106
        const m = await this.nc.request(
3,118✔
107
          subj,
3,118✔
108
          a,
3,118✔
109
          opts as RequestOptions,
3,118✔
110
        );
111
        return this.parseJsResponse(m);
6,180✔
112
      } catch (err) {
3,118✔
113
        const ne = err as NatsError;
3,413✔
UNCOV
114
        if (
×
UNCOV
115
          (ne.code === "503" || ne.code === ErrorCode.Timeout) &&
✔
UNCOV
116
          i + 1 < retries
×
UNCOV
117
        ) {
×
UNCOV
118
          await delay(bo.backoff(i));
×
UNCOV
119
        } else {
×
120
          throw err;
3,413✔
121
        }
3,413✔
122
      }
3,413✔
123
    }
3,118!
124
  }
×
125

126
  async findStream(subject: string): Promise<string> {
53✔
127
    const q = { subject } as StreamNameBySubject;
462✔
128
    const r = await this._request(`${this.prefix}.STREAM.NAMES`, q);
154✔
129
    const names = r as StreamNames;
154✔
130
    if (!names.streams || names.streams.length !== 1) {
154✔
131
      throw new Error("no stream matches subject");
157✔
132
    }
157✔
133
    return names.streams[0];
252✔
134
  }
154✔
135

136
  getConnection(): NatsConnection {
×
137
    return this.nc;
×
138
  }
×
139

140
  parseJsResponse(m: Msg): unknown {
53✔
141
    const v = this.jc.decode(m.data);
125,959✔
142
    const r = v as ApiResponse;
125,959✔
143
    if (r.error) {
125,959✔
144
      const err = checkJsErrorCode(r.error.code, r.error.description);
126,275✔
145
      if (err !== null) {
126,275✔
146
        err.api_error = r.error;
126,275✔
147
        throw err;
126,275✔
148
      }
126,275✔
149
    }
126,275✔
150
    return v;
251,549✔
151
  }
125,959✔
152
}
53✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc