• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 19188141845

07 Nov 2025 01:16PM UTC coverage: 84.586% (-1.4%) from 86.033%
19188141845

push

github

web-flow
Add meta snapshot metrics to jsz monitoring (#7524)

Exposes snapshot related metrics under /jsz

```js
"meta_cluster": {
    "pending": 0,
    "snapshot": {
        "pending_entries": 1,
        "pending_size": 1314,
        "last_time": "2025-11-06T18:14:40.659678019Z", # UTC
        "last_duration": 161096363
     }
}
```

Signed-off-by: Waldemar Quevedo <wally@nats.io>

73649 of 87070 relevant lines covered (84.59%)

340562.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.1
/server/util.go
1
// Copyright 2012-2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "encoding/json"
20
        "errors"
21
        "fmt"
22
        "math"
23
        "net"
24
        "net/url"
25
        "reflect"
26
        "runtime"
27
        "strconv"
28
        "strings"
29
        "time"
30
)
31

32
// This map is used to store URLs string as the key with a reference count as
33
// the value. This is used to handle gossiped URLs such as connect_urls, etc..
34
type refCountedUrlSet map[string]int
35

36
// Ascii numbers 0-9
37
const (
38
        asciiZero = 48
39
        asciiNine = 57
40
)
41

42
func versionComponents(version string) (major, minor, patch int, err error) {
7,646✔
43
        m := semVerRe.FindStringSubmatch(version)
7,646✔
44
        if len(m) == 0 {
7,648✔
45
                return 0, 0, 0, errors.New("invalid semver")
2✔
46
        }
2✔
47
        major, err = strconv.Atoi(m[1])
7,644✔
48
        if err != nil {
7,644✔
49
                return -1, -1, -1, err
×
50
        }
×
51
        minor, err = strconv.Atoi(m[2])
7,644✔
52
        if err != nil {
7,644✔
53
                return -1, -1, -1, err
×
54
        }
×
55
        patch, err = strconv.Atoi(m[3])
7,644✔
56
        if err != nil {
7,644✔
57
                return -1, -1, -1, err
×
58
        }
×
59
        return major, minor, patch, err
7,644✔
60
}
61

62
func versionAtLeastCheckError(version string, emajor, eminor, epatch int) (bool, error) {
7,324✔
63
        major, minor, patch, err := versionComponents(version)
7,324✔
64
        if err != nil {
7,326✔
65
                return false, err
2✔
66
        }
2✔
67
        if major > emajor ||
7,322✔
68
                (major == emajor && minor > eminor) ||
7,322✔
69
                (major == emajor && minor == eminor && patch >= epatch) {
14,641✔
70
                return true, nil
7,319✔
71
        }
7,319✔
72
        return false, err
3✔
73
}
74

75
func versionAtLeast(version string, emajor, eminor, epatch int) bool {
7,315✔
76
        res, _ := versionAtLeastCheckError(version, emajor, eminor, epatch)
7,315✔
77
        return res
7,315✔
78
}
7,315✔
79

80
// parseSize expects decimal positive numbers. We
81
// return -1 to signal error.
82
func parseSize(d []byte) (n int) {
17,918,558✔
83
        const maxParseSizeLen = 9 //999M
17,918,558✔
84

17,918,558✔
85
        l := len(d)
17,918,558✔
86
        if l == 0 || l > maxParseSizeLen {
17,918,561✔
87
                return -1
3✔
88
        }
3✔
89
        var (
17,918,555✔
90
                i   int
17,918,555✔
91
                dec byte
17,918,555✔
92
        )
17,918,555✔
93

17,918,555✔
94
        // Note: Use `goto` here to avoid for loop in order
17,918,555✔
95
        // to have the function be inlined.
17,918,555✔
96
        // See: https://github.com/golang/go/issues/14768
17,918,555✔
97
loop:
17,918,555✔
98
        dec = d[i]
29,471,041✔
99
        if dec < asciiZero || dec > asciiNine {
29,471,044✔
100
                return -1
3✔
101
        }
3✔
102
        n = n*10 + (int(dec) - asciiZero)
29,471,038✔
103

29,471,038✔
104
        i++
29,471,038✔
105
        if i < l {
41,023,524✔
106
                goto loop
11,552,486✔
107
        }
108
        return n
17,918,552✔
109
}
110

111
// parseInt64 expects decimal positive numbers. We
112
// return -1 to signal error
113
func parseInt64(d []byte) (n int64) {
3,593✔
114
        if len(d) == 0 {
3,593✔
115
                return -1
×
116
        }
×
117
        for _, dec := range d {
11,187✔
118
                if dec < asciiZero || dec > asciiNine {
7,604✔
119
                        return -1
10✔
120
                }
10✔
121
                n = n*10 + (int64(dec) - asciiZero)
7,584✔
122
        }
123
        return n
3,583✔
124
}
125

126
// Helper to move from float seconds to time.Duration
127
func secondsToDuration(seconds float64) time.Duration {
11,672✔
128
        ttl := seconds * float64(time.Second)
11,672✔
129
        return time.Duration(ttl)
11,672✔
130
}
11,672✔
131

132
// Parse a host/port string with a default port to use
133
// if none (or 0 or -1) is specified in `hostPort` string.
134
func parseHostPort(hostPort string, defaultPort int) (host string, port int, err error) {
70✔
135
        if hostPort != "" {
140✔
136
                host, sPort, err := net.SplitHostPort(hostPort)
70✔
137
                if ae, ok := err.(*net.AddrError); ok && strings.Contains(ae.Err, "missing port") {
106✔
138
                        // try appending the current port
36✔
139
                        host, sPort, err = net.SplitHostPort(fmt.Sprintf("%s:%d", hostPort, defaultPort))
36✔
140
                }
36✔
141
                if err != nil {
71✔
142
                        return "", -1, err
1✔
143
                }
1✔
144
                port, err = strconv.Atoi(strings.TrimSpace(sPort))
69✔
145
                if err != nil {
70✔
146
                        return "", -1, err
1✔
147
                }
1✔
148
                if port == 0 || port == -1 {
75✔
149
                        port = defaultPort
7✔
150
                }
7✔
151
                return strings.TrimSpace(host), port, nil
68✔
152
        }
153
        return "", -1, errors.New("no hostport specified")
×
154
}
155

156
// Returns true if URL u1 represents the same URL than u2,
157
// false otherwise.
158
func urlsAreEqual(u1, u2 *url.URL) bool {
268,259✔
159
        return reflect.DeepEqual(u1, u2)
268,259✔
160
}
268,259✔
161

162
// comma produces a string form of the given number in base 10 with
163
// commas after every three orders of magnitude.
164
//
165
// e.g. comma(834142) -> 834,142
166
//
167
// This function was copied from the github.com/dustin/go-humanize
168
// package and is Copyright Dustin Sallings <dustin@spy.net>
169
func comma(v int64) string {
675✔
170
        sign := ""
675✔
171

675✔
172
        // Min int64 can't be negated to a usable value, so it has to be special cased.
675✔
173
        if v == math.MinInt64 {
675✔
174
                return "-9,223,372,036,854,775,808"
×
175
        }
×
176

177
        if v < 0 {
675✔
178
                sign = "-"
×
179
                v = 0 - v
×
180
        }
×
181

182
        parts := []string{"", "", "", "", "", "", ""}
675✔
183
        j := len(parts) - 1
675✔
184

675✔
185
        for v > 999 {
699✔
186
                parts[j] = strconv.FormatInt(v%1000, 10)
24✔
187
                switch len(parts[j]) {
24✔
188
                case 2:
1✔
189
                        parts[j] = "0" + parts[j]
1✔
190
                case 1:
18✔
191
                        parts[j] = "00" + parts[j]
18✔
192
                }
193
                v = v / 1000
24✔
194
                j--
24✔
195
        }
196
        parts[j] = strconv.Itoa(int(v))
675✔
197
        return sign + strings.Join(parts[j:], ",")
675✔
198
}
199

200
// Adds urlStr to the given map. If the string was already present, simply
201
// bumps the reference count.
202
// Returns true only if it was added for the first time.
203
func (m refCountedUrlSet) addUrl(urlStr string) bool {
18,539✔
204
        m[urlStr]++
18,539✔
205
        return m[urlStr] == 1
18,539✔
206
}
18,539✔
207

208
// Removes urlStr from the given map. If the string is not present, nothing
209
// is done and false is returned.
210
// If the string was present, its reference count is decreased. Returns true
211
// if this was the last reference, false otherwise.
212
func (m refCountedUrlSet) removeUrl(urlStr string) bool {
14,820✔
213
        removed := false
14,820✔
214
        if ref, ok := m[urlStr]; ok {
28,476✔
215
                if ref == 1 {
27,306✔
216
                        removed = true
13,650✔
217
                        delete(m, urlStr)
13,650✔
218
                } else {
13,656✔
219
                        m[urlStr]--
6✔
220
                }
6✔
221
        }
222
        return removed
14,820✔
223
}
224

225
// Returns the unique URLs in this map as a slice
226
func (m refCountedUrlSet) getAsStringSlice() []string {
32,565✔
227
        a := make([]string, 0, len(m))
32,565✔
228
        for u := range m {
72,107✔
229
                a = append(a, u)
39,542✔
230
        }
39,542✔
231
        return a
32,565✔
232
}
233

234
// natsListenConfig provides a common configuration to match the one used by
235
// net.Listen() but with our own defaults.
236
// Go 1.13 introduced default-on TCP keepalives with aggressive timings and
237
// there's no sane portable way in Go with stdlib to split the initial timer
238
// from the retry timer.  Linux/BSD defaults are 2hrs/75s and Go sets both
239
// to 15s; the issue re making them indepedently tunable has been open since
240
// 2014 and this code here is being written in 2020.
241
// The NATS protocol has its own L7 PING/PONG keepalive system and the Go
242
// defaults are inappropriate for IoT deployment scenarios.
243
// Replace any NATS-protocol calls to net.Listen(...) with
244
// natsListenConfig.Listen(ctx,...) or use natsListen(); leave calls for HTTP
245
// monitoring, etc, on the default.
246
var natsListenConfig = &net.ListenConfig{
247
        KeepAlive: -1,
248
}
249

250
// natsListen() is the same as net.Listen() except that TCP keepalives are
251
// disabled (to match Go's behavior before Go 1.13).
252
func natsListen(network, address string) (net.Listener, error) {
16,429✔
253
        return natsListenConfig.Listen(context.Background(), network, address)
16,429✔
254
}
16,429✔
255

256
// natsDialTimeout is the same as net.DialTimeout() except the TCP keepalives
257
// are disabled (to match Go's behavior before Go 1.13).
258
func natsDialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
119,631✔
259
        d := net.Dialer{
119,631✔
260
                Timeout:   timeout,
119,631✔
261
                KeepAlive: -1,
119,631✔
262
        }
119,631✔
263
        return d.Dial(network, address)
119,631✔
264
}
119,631✔
265

266
// redactURLList() returns a copy of a list of URL pointers where each item
267
// in the list will either be the same pointer if the URL does not contain a
268
// password, or to a new object if there is a password.
269
// The intended use-case is for logging lists of URLs safely.
270
func redactURLList(unredacted []*url.URL) []*url.URL {
7✔
271
        r := make([]*url.URL, len(unredacted))
7✔
272
        // In the common case of no passwords, if we don't let the new object leave
7✔
273
        // this function then GC should be easier.
7✔
274
        needCopy := false
7✔
275
        for i := range unredacted {
24✔
276
                if unredacted[i] == nil {
17✔
277
                        r[i] = nil
×
278
                        continue
×
279
                }
280
                if _, has := unredacted[i].User.Password(); !has {
34✔
281
                        r[i] = unredacted[i]
17✔
282
                        continue
17✔
283
                }
284
                needCopy = true
×
285
                ru := *unredacted[i]
×
286
                ru.User = url.UserPassword(ru.User.Username(), "xxxxx")
×
287
                r[i] = &ru
×
288
        }
289
        if needCopy {
7✔
290
                return r
×
291
        }
×
292
        return unredacted
7✔
293
}
294

295
// redactURLString() attempts to redact a URL string.
296
func redactURLString(raw string) string {
3✔
297
        if !strings.ContainsRune(raw, '@') {
6✔
298
                return raw
3✔
299
        }
3✔
300
        u, err := url.Parse(raw)
×
301
        if err != nil {
×
302
                return raw
×
303
        }
×
304
        return u.Redacted()
×
305
}
306

307
// getURLsAsString returns a slice of u.Host from the given slice of url.URL's
308
func getURLsAsString(urls []*url.URL) []string {
484✔
309
        a := make([]string, 0, len(urls))
484✔
310
        for _, u := range urls {
2,096✔
311
                a = append(a, u.Host)
1,612✔
312
        }
1,612✔
313
        return a
484✔
314
}
315

316
// copyBytes make a new slice of the same size as `src` and copy its content.
317
// If `src` is nil or its length is 0, then this returns `nil`
318
func copyBytes(src []byte) []byte {
4,643,510✔
319
        if len(src) == 0 {
4,931,080✔
320
                return nil
287,570✔
321
        }
287,570✔
322
        dst := make([]byte, len(src))
4,355,940✔
323
        copy(dst, src)
4,355,940✔
324
        return dst
4,355,940✔
325
}
326

327
// copyStrings make a new slice of the same size than `src` and copy its content.
328
// If `src` is nil, then this returns `nil`
329
func copyStrings(src []string) []string {
34,226✔
330
        if src == nil {
61,889✔
331
                return nil
27,663✔
332
        }
27,663✔
333
        dst := make([]string, len(src))
6,563✔
334
        copy(dst, src)
6,563✔
335
        return dst
6,563✔
336
}
337

338
// Returns a byte slice for the INFO protocol.
339
func generateInfoJSON(info *Info) []byte {
84,344✔
340
        b, _ := json.Marshal(info)
84,344✔
341
        pcs := [][]byte{[]byte("INFO"), b, []byte(CR_LF)}
84,344✔
342
        return bytes.Join(pcs, []byte(" "))
84,344✔
343
}
84,344✔
344

345
// parallelTaskQueue starts a number of goroutines and returns a channel
346
// which functions can be sent to for queued parallel execution. The
347
// goroutines will stop running when the returned channel is closed and
348
// all queued tasks have completed. The passed in mp limits concurrency,
349
// or a value <= 0 will default to GOMAXPROCS.
350
func parallelTaskQueue(mp int) chan<- func() {
4,192✔
351
        if rmp := runtime.GOMAXPROCS(-1); mp <= 0 {
4,192✔
352
                mp = rmp
×
353
        } else {
4,192✔
354
                mp = max(rmp, mp)
4,192✔
355
        }
4,192✔
356
        tq := make(chan func(), mp)
4,192✔
357
        for range mp {
20,960✔
358
                go func() {
33,536✔
359
                        for fn := range tq {
17,472✔
360
                                fn()
704✔
361
                        }
704✔
362
                }()
363
        }
364
        return tq
4,192✔
365
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc