• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats.js / 13159645374

05 Feb 2025 02:30PM UTC coverage: 82.694% (+13.1%) from 69.574%
13159645374

push

github

web-flow
Add heartbeat handling to key iteration (#203)

* Add heartbeat detection to listing keys and history. In cases where the stream is purged as the client is making progress it could stall it as client is relying on numpending to signal out of the processing. By detecting a heartbeat, that means the server didn't have anything to send in the last 5s, providing a hint that we can use to signal that the operation is done.

Introduce a heartbeat case to handle key iteration status updates. This ensures proper key fetching and stops the iteration appropriately upon receiving a heartbeat.

* history for kv has the same issue - if values are purged in flight, the iteration may hang.

Signed-off-by: Alberto Ricart <alberto@synadia.com>

---------

Signed-off-by: Alberto Ricart <alberto@synadia.com>

2259 of 3072 branches covered (73.54%)

Branch coverage included in aggregate %.

4 of 22 new or added lines in 1 file covered. (18.18%)

2912 existing lines in 30 files now uncovered.

9634 of 11310 relevant lines covered (85.18%)

788186.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.5
/core/src/msg.ts
1
/*
2
 * Copyright 2020-2024 The NATS Authors
3
 * Licensed under the Apache License, Version 2.0 (the "License");
4
 * you may not use this file except in compliance with the License.
5
 * You may obtain a copy of the License at
6
 *
7
 * http://www.apache.org/licenses/LICENSE-2.0
8
 *
9
 * Unless required by applicable law or agreed to in writing, software
10
 * distributed under the License is distributed on an "AS IS" BASIS,
11
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 * See the License for the specific language governing permissions and
13
 * limitations under the License.
14
 */
15
import { MsgHdrsImpl } from "./headers.ts";
55✔
16
import type { MsgArg } from "./parser.ts";
17
import { Empty, TD } from "./encoders.ts";
55✔
18
import type {
19
  Msg,
20
  MsgHdrs,
21
  Payload,
22
  Publisher,
23
  RequestInfo,
24
  ReviverFn,
25
} from "./core.ts";
26

27
export class MsgImpl implements Msg {
55✔
UNCOV
28
  _headers?: MsgHdrs;
54✔
UNCOV
29
  _msg: MsgArg;
395,821✔
UNCOV
30
  _rdata: Uint8Array;
395,821✔
UNCOV
31
  _reply!: string;
395,821✔
UNCOV
32
  _subject!: string;
395,821✔
UNCOV
33
  publisher: Publisher;
54✔
34

UNCOV
35
  constructor(msg: MsgArg, data: Uint8Array, publisher: Publisher) {
54✔
UNCOV
36
    this._msg = msg;
395,821✔
UNCOV
37
    this._rdata = data;
395,821✔
UNCOV
38
    this.publisher = publisher;
395,821✔
UNCOV
39
  }
395,821✔
40

UNCOV
41
  get subject(): string {
54✔
UNCOV
42
    if (this._subject) {
116,081✔
UNCOV
43
      return this._subject;
117,735✔
UNCOV
44
    }
117,735✔
UNCOV
45
    this._subject = TD.decode(this._msg.subject);
245,313✔
UNCOV
46
    return this._subject;
245,313✔
UNCOV
47
  }
129,256✔
48

UNCOV
49
  get reply(): string {
54✔
UNCOV
50
    if (this._reply) {
12,100✔
UNCOV
51
      return this._reply;
30,262✔
UNCOV
52
    }
30,262✔
UNCOV
53
    this._reply = TD.decode(this._msg.reply);
29,232✔
UNCOV
54
    return this._reply;
29,232✔
UNCOV
55
  }
12,510✔
56

UNCOV
57
  get sid(): number {
51✔
UNCOV
58
    return this._msg.sid;
54✔
UNCOV
59
  }
54✔
60

UNCOV
61
  get headers(): MsgHdrs | undefined {
53✔
UNCOV
62
    if (this._msg.hdr > -1 && !this._headers) {
1,763✔
UNCOV
63
      const buf = this._rdata.subarray(0, this._msg.hdr);
2,285✔
UNCOV
64
      this._headers = MsgHdrsImpl.decode(buf);
2,285✔
UNCOV
65
    }
2,285✔
UNCOV
66
    return this._headers;
1,763✔
UNCOV
67
  }
1,763✔
68

UNCOV
69
  get data(): Uint8Array {
54✔
70
    if (!this._rdata) {
×
71
      return new Uint8Array(0);
×
72
    }
×
UNCOV
73
    return this._msg.hdr > -1
210,218,223✔
UNCOV
74
      ? this._rdata.subarray(this._msg.hdr)
210,218,223✔
UNCOV
75
      : this._rdata;
210,218,223✔
UNCOV
76
  }
210,218,223✔
77

78
  // eslint-ignore-next-line @typescript-eslint/no-explicit-any
UNCOV
79
  respond(
52✔
UNCOV
80
    data: Payload = Empty,
52✔
UNCOV
81
    opts?: { headers?: MsgHdrs; reply?: string },
52✔
UNCOV
82
  ): boolean {
52✔
UNCOV
83
    if (this.reply) {
3,835✔
UNCOV
84
      this.publisher.publish(this.reply, data, opts);
4,005✔
UNCOV
85
      return true;
4,005✔
UNCOV
86
    }
4,005!
87
    return false;
213✔
88
  }
208✔
89

UNCOV
90
  size(): number {
51✔
UNCOV
91
    const subj = this._msg.subject.length;
1,052✔
92
    const reply = this._msg.reply?.length || 0;
×
93
    const payloadAndHeaders = this._msg.size === -1 ? 0 : this._msg.size;
×
UNCOV
94
    return subj + reply + payloadAndHeaders;
1,052✔
UNCOV
95
  }
1,052✔
96

UNCOV
97
  json<T = unknown>(reviver?: ReviverFn): T {
53✔
UNCOV
98
    return JSON.parse(this.string(), reviver);
153✔
UNCOV
99
  }
153✔
100

UNCOV
101
  string(): string {
54✔
UNCOV
102
    return TD.decode(this.data);
222✔
UNCOV
103
  }
222✔
104

105
  requestInfo(): RequestInfo | null {
33✔
106
    const v = this.headers?.get("Nats-Request-Info");
34✔
107
    if (v) {
34✔
108
      return JSON.parse(
34✔
109
        v,
34✔
110
        function (this: unknown, key: string, value: unknown): unknown {
34✔
111
          if ((key === "start" || key === "stop") && value !== "") {
×
112
            return new Date(Date.parse(value as string));
×
113
          }
×
114
          return value;
37✔
115
        },
34✔
116
      ) as RequestInfo;
117
    }
34!
118
    return null;
×
119
  }
×
120
}
55✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc