• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.56
/core/src/headers.ts
1
/*
2
 * Copyright 2020-2023 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15

16
// Heavily inspired by Golang's https://golang.org/src/net/http/header.go
17

18
import { TD, TE } from "./encoders.ts";
55✔
19
import type { MsgHdrs } from "./core.ts";
20
import { Match } from "./core.ts";
55✔
21
import { InvalidArgumentError } from "./errors.ts";
55✔
22

23
// https://www.ietf.org/rfc/rfc822.txt
24
// 3.1.2.  STRUCTURE OF HEADER FIELDS
25
//
26
// Once a field has been unfolded, it may be viewed as being com-
27
// posed of a field-name followed by a colon (":"), followed by a
28
// field-body, and  terminated  by  a  carriage-return/line-feed.
29
// The  field-name must be composed of printable ASCII characters
30
// (i.e., characters that  have  values  between  33.  and  126.,
31
// decimal, except colon).  The field-body may be composed of any
32
// ASCII characters, except CR or LF.  (While CR and/or LF may be
33
// present  in the actual text, they are removed by the action of
34
// unfolding the field.)
UNCOV
35
export function canonicalMIMEHeaderKey(k: string): string {
54✔
UNCOV
36
  const a = 97;
3,228✔
UNCOV
37
  const A = 65;
3,228✔
UNCOV
38
  const Z = 90;
3,228✔
UNCOV
39
  const z = 122;
3,228✔
UNCOV
40
  const dash = 45;
3,228✔
UNCOV
41
  const colon = 58;
3,228✔
UNCOV
42
  const start = 33;
3,228✔
UNCOV
43
  const end = 126;
3,228✔
UNCOV
44
  const toLower = a - A;
3,228✔
45

UNCOV
46
  let upper = true;
3,228✔
UNCOV
47
  const buf: number[] = new Array(k.length);
3,228✔
UNCOV
48
  for (let i = 0; i < k.length; i++) {
3,228✔
UNCOV
49
    let c = k.charCodeAt(i);
48,376✔
50
    if (c === colon || c < start || c > end) {
5,507✔
51
      throw InvalidArgumentError.format(
5,510✔
52
        "header",
5,510✔
53
        `'${k[i]}' is not a valid character in a header name`,
5,510✔
54
      );
55
    }
5,510✔
UNCOV
56
    if (upper && a <= c && c <= z) {
42,154✔
UNCOV
57
      c -= toLower;
42,620✔
UNCOV
58
    } else if (!upper && A <= c && c <= Z) {
41,235✔
UNCOV
59
      c += toLower;
85,271✔
UNCOV
60
    }
85,271✔
UNCOV
61
    buf[i] = c;
53,375✔
UNCOV
62
    upper = c == dash;
53,375✔
UNCOV
63
  }
53,375✔
UNCOV
64
  return String.fromCharCode(...buf);
3,697✔
UNCOV
65
}
3,228✔
66

UNCOV
67
export function headers(code = 0, description = ""): MsgHdrs {
54✔
68
  if ((code === 0 && description !== "") || (code > 0 && description === "")) {
261✔
69
    throw InvalidArgumentError.format("description", "is required");
263✔
70
  }
263✔
UNCOV
71
  return new MsgHdrsImpl(code, description);
120,882✔
UNCOV
72
}
120,656✔
73

74
const HEADER = "NATS/1.0";
55✔
75

76
export class MsgHdrsImpl implements MsgHdrs {
55✔
UNCOV
77
  _code: number;
54✔
UNCOV
78
  headers: Map<string, string[]>;
121,561✔
UNCOV
79
  _description: string;
54✔
80

UNCOV
81
  constructor(code = 0, description = "") {
54✔
UNCOV
82
    this._code = code;
121,561✔
UNCOV
83
    this._description = description;
121,561✔
UNCOV
84
    this.headers = new Map();
121,561✔
UNCOV
85
  }
121,561✔
86

87
  [Symbol.iterator](): IterableIterator<[string, string[]]> {
88✔
88
    return this.headers.entries();
34✔
89
  }
34✔
90

91
  size(): number {
33✔
92
    return this.headers.size;
46✔
93
  }
46✔
94

95
  equals(mh: MsgHdrsImpl): boolean {
33✔
96
    if (
41✔
97
      mh && this.headers.size === mh.headers.size &&
41✔
98
      this._code === mh._code
41✔
99
    ) {
41✔
100
      for (const [k, v] of this.headers) {
48✔
101
        const a = mh.values(k);
48✔
102
        if (v.length !== a.length) {
48✔
103
          return false;
49✔
104
        }
49✔
105
        const vv = [...v].sort();
162✔
106
        const aa = [...a].sort();
162✔
107
        for (let i = 0; i < vv.length; i++) {
48✔
108
          if (vv[i] !== aa[i]) {
56✔
109
            return false;
57✔
110
          }
57✔
111
        }
56✔
112
      }
53✔
113
      return true;
53✔
114
    }
53✔
115
    return false;
42✔
116
  }
41✔
117

UNCOV
118
  static decode(a: Uint8Array): MsgHdrsImpl {
54✔
UNCOV
119
    const mh = new MsgHdrsImpl();
950✔
UNCOV
120
    const s = TD.decode(a);
950✔
UNCOV
121
    const lines = s.split("\r\n");
950✔
UNCOV
122
    const h = lines[0];
950✔
UNCOV
123
    if (h !== HEADER) {
950✔
124
      // malformed headers could add extra space without adding a code or description
UNCOV
125
      let str = h.replace(HEADER, "").trim();
1,206✔
UNCOV
126
      if (str.length > 0) {
1,206✔
UNCOV
127
        mh._code = parseInt(str, 10);
1,223✔
128
        if (isNaN(mh._code)) {
×
129
          mh._code = 0;
×
130
        }
×
UNCOV
131
        const scode = mh._code.toString();
1,223✔
UNCOV
132
        str = str.replace(scode, "");
1,223✔
UNCOV
133
        mh._description = str.trim();
1,223✔
UNCOV
134
      }
1,223✔
UNCOV
135
    }
1,206✔
UNCOV
136
    if (lines.length >= 1) {
950✔
UNCOV
137
      lines.slice(1).map((s) => {
950✔
UNCOV
138
        if (s) {
5,454✔
UNCOV
139
          const idx = s.indexOf(":");
8,168✔
UNCOV
140
          if (idx > -1) {
8,168✔
UNCOV
141
            const k = s.slice(0, idx);
8,381✔
UNCOV
142
            const v = s.slice(idx + 1).trim();
8,381✔
UNCOV
143
            mh.append(k, v);
8,381✔
UNCOV
144
          }
8,381✔
UNCOV
145
        }
8,168✔
UNCOV
146
      });
950✔
UNCOV
147
    }
950✔
UNCOV
148
    return mh;
950✔
UNCOV
149
  }
950✔
150

UNCOV
151
  toString(): string {
54✔
UNCOV
152
    if (this.headers.size === 0 && this._code === 0) {
120,642✔
UNCOV
153
      return "";
240,844✔
UNCOV
154
    }
240,844✔
UNCOV
155
    let s = HEADER;
223,579✔
156
    if (this._code > 0 && this._description !== "") {
246!
157
      s += ` ${this._code} ${this._description}`;
248✔
158
    }
248✔
UNCOV
159
    for (const [k, v] of this.headers) {
223,452✔
UNCOV
160
      for (let i = 0; i < v.length; i++) {
223,862✔
UNCOV
161
        s = `${s}\r\n${k}: ${v[i]}`;
223,862✔
UNCOV
162
      }
223,862✔
UNCOV
163
    }
223,862✔
UNCOV
164
    return `${s}\r\n\r\n`;
223,839✔
UNCOV
165
  }
120,646✔
166

UNCOV
167
  encode(): Uint8Array {
54✔
UNCOV
168
    return TE.encode(this.toString());
120,645✔
UNCOV
169
  }
120,645✔
170

UNCOV
171
  static validHeaderValue(k: string): string {
54✔
UNCOV
172
    const inv = /[\r\n]/;
3,214✔
173
    if (inv.test(k)) {
491✔
174
      throw InvalidArgumentError.format(
493✔
175
        "header",
493✔
176
        "values cannot contain \\r or \\n",
493✔
177
      );
178
    }
493✔
UNCOV
179
    return k.trim();
3,670✔
UNCOV
180
  }
3,214✔
181

UNCOV
182
  keys(): string[] {
54✔
UNCOV
183
    const keys = [];
5,541✔
UNCOV
184
    for (const sk of this.headers.keys()) {
5,541✔
UNCOV
185
      keys.push(sk);
18,880✔
UNCOV
186
    }
18,880✔
UNCOV
187
    return keys;
5,541✔
UNCOV
188
  }
5,541✔
189

UNCOV
190
  findKeys(k: string, match: Match = Match.Exact): string[] {
54✔
UNCOV
191
    const keys = this.keys();
5,539✔
UNCOV
192
    switch (match) {
5,539✔
UNCOV
193
      case Match.Exact:
5,539✔
UNCOV
194
        return keys.filter((v) => {
6,464✔
UNCOV
195
          return v === k;
19,785✔
UNCOV
196
        });
6,464✔
197
      case Match.CanonicalMIME:
977!
198
        k = canonicalMIMEHeaderKey(k);
985✔
199
        return keys.filter((v) => {
985✔
200
          return v === k;
990✔
201
        });
985✔
202
      default: {
1,965!
203
        const lci = k.toLowerCase();
988✔
204
        return keys.filter((v) => {
988✔
205
          return lci === v.toLowerCase();
998✔
206
        });
988✔
207
      }
988✔
UNCOV
208
    }
5,539✔
UNCOV
209
  }
5,539✔
210

UNCOV
211
  get(k: string, match = Match.Exact): string {
54✔
UNCOV
212
    const keys = this.findKeys(k, match);
741✔
UNCOV
213
    if (keys.length) {
741✔
UNCOV
214
      const v = this.headers.get(keys[0]);
1,293✔
UNCOV
215
      if (v) {
1,293✔
216
        return Array.isArray(v) ? v[0] : v;
×
UNCOV
217
      }
1,293✔
UNCOV
218
    }
1,293!
UNCOV
219
    return "";
856✔
UNCOV
220
  }
730✔
221

UNCOV
222
  last(k: string, match = Match.Exact): string {
52✔
UNCOV
223
    const keys = this.findKeys(k, match);
1,219✔
UNCOV
224
    if (keys.length) {
1,219✔
UNCOV
225
      const v = this.headers.get(keys[0]);
1,222✔
UNCOV
226
      if (v) {
1,222✔
227
        return Array.isArray(v) ? v[v.length - 1] : v;
×
UNCOV
228
      }
1,222✔
UNCOV
229
    }
1,222!
230
    return "";
38✔
231
  }
37✔
232

233
  has(k: string, match: Match = Match.Exact): boolean {
33✔
234
    return this.findKeys(k, match).length > 0;
46✔
235
  }
46✔
236

UNCOV
237
  set(k: string, v: string, match: Match = Match.Exact): void {
54✔
UNCOV
238
    this.delete(k, match);
485✔
UNCOV
239
    this.append(k, v, match);
485✔
UNCOV
240
  }
485✔
241

UNCOV
242
  append(k: string, v: string, match: Match = Match.Exact): void {
54✔
243
    // validate the key
UNCOV
244
    const ck = canonicalMIMEHeaderKey(k);
3,217✔
245
    if (match === Match.CanonicalMIME) {
494✔
246
      k = ck;
498✔
247
    }
498✔
248
    // if we get non-sensical ignores/etc, we should try
249
    // to do the right thing and use the first key that matches
UNCOV
250
    const keys = this.findKeys(k, match);
3,675✔
UNCOV
251
    k = keys.length > 0 ? keys[0] : k;
879✔
252

UNCOV
253
    const value = MsgHdrsImpl.validHeaderValue(v);
3,217✔
UNCOV
254
    let a = this.headers.get(k);
3,217✔
UNCOV
255
    if (!a) {
3,217✔
UNCOV
256
      a = [];
4,040✔
UNCOV
257
      this.headers.set(k, a);
4,040✔
UNCOV
258
    }
4,040✔
UNCOV
259
    a.push(value);
3,673✔
UNCOV
260
  }
3,217✔
261

UNCOV
262
  values(k: string, match: Match = Match.Exact): string[] {
34✔
UNCOV
263
    const buf: string[] = [];
58✔
UNCOV
264
    const keys = this.findKeys(k, match);
58✔
UNCOV
265
    keys.forEach((v) => {
58✔
UNCOV
266
      const values = this.headers.get(v);
83✔
UNCOV
267
      if (values) {
83✔
UNCOV
268
        buf.push(...values);
83✔
UNCOV
269
      }
83✔
UNCOV
270
    });
58✔
UNCOV
271
    return buf;
58✔
UNCOV
272
  }
58✔
273

UNCOV
274
  delete(k: string, match: Match = Match.Exact): void {
54✔
UNCOV
275
    const keys = this.findKeys(k, match);
488✔
276
    keys.forEach((v) => {
267✔
277
      this.headers.delete(v);
274✔
278
    });
267✔
UNCOV
279
  }
488✔
280

281
  get hasError(): boolean {
33✔
282
    return this._code >= 300;
38✔
283
  }
38✔
284

285
  get status(): string {
33✔
286
    return `${this._code} ${this._description}`.trim();
39✔
287
  }
39✔
288

UNCOV
289
  toRecord(): Record<string, string[]> {
34✔
UNCOV
290
    const data = {} as Record<string, string[]>;
36✔
UNCOV
291
    this.keys().forEach((v) => {
36✔
UNCOV
292
      data[v] = this.values(v);
39✔
UNCOV
293
    });
36✔
UNCOV
294
    return data;
36✔
UNCOV
295
  }
36✔
296

UNCOV
297
  get code(): number {
54✔
UNCOV
298
    return this._code;
1,034✔
UNCOV
299
  }
1,034✔
300

UNCOV
301
  get description(): string {
53✔
UNCOV
302
    return this._description;
301✔
UNCOV
303
  }
301✔
304

UNCOV
305
  static fromRecord(r: Record<string, string[]>): MsgHdrs {
34✔
UNCOV
306
    const h = new MsgHdrsImpl();
36✔
UNCOV
307
    for (const k in r) {
36✔
UNCOV
308
      h.headers.set(k, r[k]);
38✔
UNCOV
309
    }
38✔
UNCOV
310
    return h;
36✔
UNCOV
311
  }
36✔
312
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc