• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 14248085491

03 Apr 2025 04:30PM UTC coverage: 84.536% (+0.002%) from 84.534%
14248085491

push

github

web-flow
Additional hardening of the object store client (#256)

* Additional hardening of the object store client

- made obj put publish messages sequentially - as current server can drop requests when the producer is too fast. This will degrade put performance.

- changed the internal push consumer used on get, to have an idle_heartbeat, if this fires, the get has stalled, and the ordered consumer should reset.

enabled flow control on the ordered consumer, this prevents slow consumers when the client is getting very large objects

Signed-off-by: Alberto Ricart <alberto@synadia.com>

* - hardening of the PullConsumer implementation, by default ordered consumers set flow control and idle_heartbeats. This enables the ordered consumer to self-recreate on `heartbeats_missed`, simplifying watcher/lister configurations.

- objectstore doesn't require listening to heartbeats_missed, since the pull consumer handles it automagically.

- objectstore watch added a check in watch for `heartbeat` notifications - if the watcher is only listing history this is a hint to stop as all records have been processed.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2370 of 3217 branches covered (73.67%)

Branch coverage included in aggregate %.

10 of 23 new or added lines in 3 files covered. (43.48%)

4552 existing lines in 36 files now uncovered.

10170 of 11617 relevant lines covered (87.54%)

768618.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.65
/core/src/msg.ts
1
/*
2
 * Copyright 2020-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import { MsgHdrsImpl } from "./headers.ts";
55✔
16
import type { MsgArg } from "./parser.ts";
17
import { Empty, TD } from "./encoders.ts";
55✔
18
import type {
19
  Msg,
20
  MsgHdrs,
21
  Payload,
22
  Publisher,
23
  RequestInfo,
24
  ReviverFn,
25
} from "./core.ts";
26

27
export class MsgImpl implements Msg {
55✔
UNCOV
28
  _headers?: MsgHdrs;
54✔
UNCOV
29
  _msg: MsgArg;
396,805✔
UNCOV
30
  _rdata: Uint8Array;
396,805✔
UNCOV
31
  _reply!: string;
396,805✔
UNCOV
32
  _subject!: string;
396,805✔
UNCOV
33
  publisher: Publisher;
54✔
34

UNCOV
35
  constructor(msg: MsgArg, data: Uint8Array, publisher: Publisher) {
54✔
UNCOV
36
    this._msg = msg;
396,805✔
UNCOV
37
    this._rdata = data;
396,805✔
UNCOV
38
    this.publisher = publisher;
396,805✔
UNCOV
39
  }
396,805✔
40

UNCOV
41
  get subject(): string {
54✔
UNCOV
42
    if (this._subject) {
129,842✔
UNCOV
43
      return this._subject;
131,526✔
UNCOV
44
    }
131,526✔
UNCOV
45
    this._subject = TD.decode(this._msg.subject);
259,682✔
UNCOV
46
    return this._subject;
259,682✔
UNCOV
47
  }
129,895✔
48

UNCOV
49
  get reply(): string {
54✔
UNCOV
50
    if (this._reply) {
12,289✔
UNCOV
51
      return this._reply;
30,476✔
UNCOV
52
    }
30,476✔
UNCOV
53
    this._reply = TD.decode(this._msg.reply);
29,448✔
UNCOV
54
    return this._reply;
29,448✔
UNCOV
55
  }
12,529✔
56

UNCOV
57
  get sid(): number {
51✔
UNCOV
58
    return this._msg.sid;
54✔
UNCOV
59
  }
54✔
60

UNCOV
61
  get headers(): MsgHdrs | undefined {
54✔
UNCOV
62
    if (this._msg.hdr > -1 && !this._headers) {
2,972✔
UNCOV
63
      const buf = this._rdata.subarray(0, this._msg.hdr);
3,859✔
UNCOV
64
      this._headers = MsgHdrsImpl.decode(buf);
3,859✔
UNCOV
65
    }
3,859✔
UNCOV
66
    return this._headers;
2,972✔
UNCOV
67
  }
2,972✔
68

UNCOV
69
  get data(): Uint8Array {
54✔
70
    if (!this._rdata) {
×
71
      return new Uint8Array(0);
×
72
    }
×
UNCOV
73
    return this._msg.hdr > -1
210,219,657✔
UNCOV
74
      ? this._rdata.subarray(this._msg.hdr)
210,219,657✔
UNCOV
75
      : this._rdata;
210,219,657✔
UNCOV
76
  }
210,219,657✔
77

78
  // eslint-ignore-next-line @typescript-eslint/no-explicit-any
UNCOV
79
  respond(
53✔
UNCOV
80
    data: Payload = Empty,
53✔
UNCOV
81
    opts?: { headers?: MsgHdrs; reply?: string },
53✔
UNCOV
82
  ): boolean {
53✔
UNCOV
83
    if (this.reply) {
3,844✔
UNCOV
84
      this.publisher.publish(this.reply, data, opts);
4,014✔
UNCOV
85
      return true;
4,014✔
UNCOV
86
    }
4,014!
87
    return false;
213✔
88
  }
208✔
89

UNCOV
90
  size(): number {
51✔
UNCOV
91
    const subj = this._msg.subject.length;
1,052✔
92
    const reply = this._msg.reply?.length || 0;
×
93
    const payloadAndHeaders = this._msg.size === -1 ? 0 : this._msg.size;
×
UNCOV
94
    return subj + reply + payloadAndHeaders;
1,052✔
UNCOV
95
  }
1,052✔
96

UNCOV
97
  json<T = unknown>(reviver?: ReviverFn): T {
53✔
UNCOV
98
    return JSON.parse(this.string(), reviver);
162✔
UNCOV
99
  }
162✔
100

UNCOV
101
  string(): string {
54✔
UNCOV
102
    return TD.decode(this.data);
231✔
UNCOV
103
  }
231✔
104

105
  requestInfo(): RequestInfo | null {
33✔
106
    const v = this.headers?.get("Nats-Request-Info");
34✔
107
    if (v) {
34✔
108
      return JSON.parse(
34✔
109
        v,
34✔
110
        function (this: unknown, key: string, value: unknown): unknown {
34✔
111
          if ((key === "start" || key === "stop") && value !== "") {
×
112
            return new Date(Date.parse(value as string));
×
113
          }
×
114
          return value;
37✔
115
        },
34✔
116
      ) as RequestInfo;
117
    }
34!
118
    return null;
×
119
  }
×
120
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc