• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.37
/server/monitor.go
1
// Copyright 2013-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "crypto/sha256"
20
        "crypto/tls"
21
        "crypto/x509"
22
        "encoding/hex"
23
        "encoding/json"
24
        "expvar"
25
        "fmt"
26
        "maps"
27
        "math"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path/filepath"
33
        "runtime"
34
        "runtime/debug"
35
        "runtime/pprof"
36
        "slices"
37
        "sort"
38
        "strconv"
39
        "strings"
40
        "sync/atomic"
41
        "time"
42

43
        "github.com/nats-io/jwt/v2"
44
        "github.com/nats-io/nats-server/v2/server/pse"
45
)
46

47
// Connz represents detailed information on current client connections.
48
type Connz struct {
49
        ID       string      `json:"server_id"`
50
        Now      time.Time   `json:"now"`
51
        NumConns int         `json:"num_connections"`
52
        Total    int         `json:"total"`
53
        Offset   int         `json:"offset"`
54
        Limit    int         `json:"limit"`
55
        Conns    []*ConnInfo `json:"connections"`
56
}
57

58
// ConnzOptions are the options passed to Connz()
59
type ConnzOptions struct {
60
        // Sort indicates how the results will be sorted. Check SortOpt for possible values.
61
        // Only the sort by connection ID (ByCid) is ascending, all others are descending.
62
        Sort SortOpt `json:"sort"`
63

64
        // Username indicates if user names should be included in the results.
65
        Username bool `json:"auth"`
66

67
        // Subscriptions indicates if subscriptions should be included in the results.
68
        Subscriptions bool `json:"subscriptions"`
69

70
        // SubscriptionsDetail indicates if subscription details should be included in the results
71
        SubscriptionsDetail bool `json:"subscriptions_detail"`
72

73
        // Offset is used for pagination. Connz() only returns connections starting at this
74
        // offset from the global results.
75
        Offset int `json:"offset"`
76

77
        // Limit is the maximum number of connections that should be returned by Connz().
78
        Limit int `json:"limit"`
79

80
        // Filter for this explicit client connection.
81
        CID uint64 `json:"cid"`
82

83
        // Filter for this explicit client connection based on the MQTT client ID
84
        MQTTClient string `json:"mqtt_client"`
85

86
        // Filter by connection state.
87
        State ConnState `json:"state"`
88

89
        // The below options only apply if auth is true.
90

91
        // Filter by username.
92
        User string `json:"user"`
93

94
        // Filter by account.
95
        Account string `json:"acc"`
96

97
        // Filter by subject interest
98
        FilterSubject string `json:"filter_subject"`
99
}
100

101
// ConnState is for filtering states of connections. We will only have two, open and closed.
102
type ConnState int
103

104
const (
105
        // ConnOpen filters on open clients.
106
        ConnOpen = ConnState(iota)
107
        // ConnClosed filters on closed clients.
108
        ConnClosed
109
        // ConnAll returns all clients.
110
        ConnAll
111
)
112

113
// ConnInfo has detailed information on a per connection basis.
114
type ConnInfo struct {
115
        Cid            uint64         `json:"cid"`
116
        Kind           string         `json:"kind,omitempty"`
117
        Type           string         `json:"type,omitempty"`
118
        IP             string         `json:"ip"`
119
        Port           int            `json:"port"`
120
        Start          time.Time      `json:"start"`
121
        LastActivity   time.Time      `json:"last_activity"`
122
        Stop           *time.Time     `json:"stop,omitempty"`
123
        Reason         string         `json:"reason,omitempty"`
124
        RTT            string         `json:"rtt,omitempty"`
125
        Uptime         string         `json:"uptime"`
126
        Idle           string         `json:"idle"`
127
        Pending        int            `json:"pending_bytes"`
128
        InMsgs         int64          `json:"in_msgs"`
129
        OutMsgs        int64          `json:"out_msgs"`
130
        InBytes        int64          `json:"in_bytes"`
131
        OutBytes       int64          `json:"out_bytes"`
132
        Stalls         int64          `json:"stalls,omitempty"`
133
        NumSubs        uint32         `json:"subscriptions"`
134
        Name           string         `json:"name,omitempty"`
135
        Lang           string         `json:"lang,omitempty"`
136
        Version        string         `json:"version,omitempty"`
137
        TLSVersion     string         `json:"tls_version,omitempty"`
138
        TLSCipher      string         `json:"tls_cipher_suite,omitempty"`
139
        TLSPeerCerts   []*TLSPeerCert `json:"tls_peer_certs,omitempty"`
140
        TLSFirst       bool           `json:"tls_first,omitempty"`
141
        AuthorizedUser string         `json:"authorized_user,omitempty"`
142
        Account        string         `json:"account,omitempty"`
143
        Subs           []string       `json:"subscriptions_list,omitempty"`
144
        SubsDetail     []SubDetail    `json:"subscriptions_list_detail,omitempty"`
145
        JWT            string         `json:"jwt,omitempty"`
146
        IssuerKey      string         `json:"issuer_key,omitempty"`
147
        NameTag        string         `json:"name_tag,omitempty"`
148
        Tags           jwt.TagList    `json:"tags,omitempty"`
149
        MQTTClient     string         `json:"mqtt_client,omitempty"` // This is the MQTT client id
150
        Proxy          *ProxyInfo     `json:"proxy,omitempty"`
151

152
        // Internal
153
        rtt int64 // For fast sorting
154
}
155

156
// ProxyInfo represents the information about this proxied connection.
157
type ProxyInfo struct {
158
        Key string `json:"key"`
159
}
160

161
// TLSPeerCert contains basic information about a TLS peer certificate
162
type TLSPeerCert struct {
163
        Subject          string `json:"subject,omitempty"`
164
        SubjectPKISha256 string `json:"spki_sha256,omitempty"`
165
        CertSha256       string `json:"cert_sha256,omitempty"`
166
}
167

168
// DefaultConnListSize is the default size of the connection list.
169
const DefaultConnListSize = 1024
170

171
// DefaultSubListSize is the default size of the subscriptions list.
172
const DefaultSubListSize = 1024
173

174
const defaultStackBufSize = 10000
175

176
func newSubsDetailList(client *client) []SubDetail {
6✔
177
        subsDetail := make([]SubDetail, 0, len(client.subs))
6✔
178
        for _, sub := range client.subs {
131✔
179
                subsDetail = append(subsDetail, newClientSubDetail(sub))
125✔
180
        }
125✔
181
        return subsDetail
6✔
182
}
183

184
func newSubsList(client *client) []string {
15✔
185
        subs := make([]string, 0, len(client.subs))
15✔
186
        for _, sub := range client.subs {
162✔
187
                subs = append(subs, string(sub.subject))
147✔
188
        }
147✔
189
        return subs
15✔
190
}
191

192
// Connz returns a Connz struct containing information about connections.
193
func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
1,435✔
194
        var (
1,435✔
195
                sortOpt = ByCid
1,435✔
196
                auth    bool
1,435✔
197
                subs    bool
1,435✔
198
                subsDet bool
1,435✔
199
                offset  int
1,435✔
200
                limit   = DefaultConnListSize
1,435✔
201
                cid     = uint64(0)
1,435✔
202
                state   = ConnOpen
1,435✔
203
                user    string
1,435✔
204
                acc     string
1,435✔
205
                a       *Account
1,435✔
206
                filter  string
1,435✔
207
                mqttCID string
1,435✔
208
        )
1,435✔
209

1,435✔
210
        if opts != nil {
2,806✔
211
                // If no sort option given or sort is by uptime, then sort by cid
1,371✔
212
                if opts.Sort != _EMPTY_ {
1,405✔
213
                        sortOpt = opts.Sort
34✔
214
                        if !sortOpt.IsValid() {
36✔
215
                                return nil, fmt.Errorf("invalid sorting option: %s", sortOpt)
2✔
216
                        }
2✔
217
                }
218

219
                // Auth specifics.
220
                auth = opts.Username
1,369✔
221
                user = opts.User
1,369✔
222
                acc = opts.Account
1,369✔
223
                mqttCID = opts.MQTTClient
1,369✔
224

1,369✔
225
                subs = opts.Subscriptions
1,369✔
226
                subsDet = opts.SubscriptionsDetail
1,369✔
227
                offset = opts.Offset
1,369✔
228
                if offset < 0 {
1,371✔
229
                        offset = 0
2✔
230
                }
2✔
231
                limit = opts.Limit
1,369✔
232
                if limit <= 0 {
2,729✔
233
                        limit = DefaultConnListSize
1,360✔
234
                }
1,360✔
235
                // state
236
                state = opts.State
1,369✔
237

1,369✔
238
                // ByStop only makes sense on closed connections
1,369✔
239
                if sortOpt == ByStop && state != ConnClosed {
1,370✔
240
                        return nil, fmt.Errorf("sort by stop only valid on closed connections")
1✔
241
                }
1✔
242
                // ByReason is the same.
243
                if sortOpt == ByReason && state != ConnClosed {
1,369✔
244
                        return nil, fmt.Errorf("sort by reason only valid on closed connections")
1✔
245
                }
1✔
246
                // If searching by CID
247
                if opts.CID > 0 {
1,397✔
248
                        cid = opts.CID
30✔
249
                        limit = 1
30✔
250
                }
30✔
251
                // If filtering by subject.
252
                if opts.FilterSubject != _EMPTY_ && opts.FilterSubject != fwcs {
1,367✔
253
                        if acc == _EMPTY_ {
×
254
                                return nil, fmt.Errorf("filter by subject only valid with account filtering")
×
255
                        }
×
256
                        filter = opts.FilterSubject
×
257
                }
258
        }
259

260
        c := &Connz{
1,431✔
261
                Offset: offset,
1,431✔
262
                Limit:  limit,
1,431✔
263
                Now:    time.Now().UTC(),
1,431✔
264
        }
1,431✔
265

1,431✔
266
        // Open clients
1,431✔
267
        var openClients []*client
1,431✔
268
        // Hold for closed clients if requested.
1,431✔
269
        var closedClients []*closedClient
1,431✔
270

1,431✔
271
        var clist map[uint64]*client
1,431✔
272

1,431✔
273
        if acc != _EMPTY_ {
1,452✔
274
                var err error
21✔
275
                a, err = s.lookupAccount(acc)
21✔
276
                if err != nil {
22✔
277
                        return c, nil
1✔
278
                }
1✔
279
                a.mu.RLock()
20✔
280
                clist = make(map[uint64]*client, a.numLocalConnections())
20✔
281
                for c := range a.clients {
42✔
282
                        if c.kind == CLIENT || c.kind == LEAF {
40✔
283
                                clist[c.cid] = c
18✔
284
                        }
18✔
285
                }
286
                a.mu.RUnlock()
20✔
287
        }
288

289
        // Walk the open client list with server lock held.
290
        s.mu.RLock()
1,430✔
291
        // Default to all client unless filled in above.
1,430✔
292
        if clist == nil {
2,840✔
293
                clist = make(map[uint64]*client, len(s.clients)+len(s.leafs))
1,410✔
294
                maps.Copy(clist, s.clients)
1,410✔
295
                maps.Copy(clist, s.leafs)
1,410✔
296
        }
1,410✔
297

298
        // copy the server id for monitoring
299
        c.ID = s.info.ID
1,430✔
300

1,430✔
301
        // Number of total clients. The resulting ConnInfo array
1,430✔
302
        // may be smaller if pagination is used.
1,430✔
303
        switch state {
1,430✔
304
        case ConnOpen:
339✔
305
                c.Total = len(clist)
339✔
306
        case ConnClosed:
1,085✔
307
                closedClients = s.closed.closedClients()
1,085✔
308
                c.Total = len(closedClients)
1,085✔
309
        case ConnAll:
6✔
310
                c.Total = len(clist)
6✔
311
                closedClients = s.closed.closedClients()
6✔
312
                c.Total += len(closedClients)
6✔
313
        }
314

315
        // We may need to filter these connections.
316
        if acc != _EMPTY_ && len(closedClients) > 0 {
1,433✔
317
                var ccc []*closedClient
3✔
318
                for _, cc := range closedClients {
21✔
319
                        if cc.acc != acc {
30✔
320
                                continue
12✔
321
                        }
322
                        ccc = append(ccc, cc)
6✔
323
                }
324
                c.Total -= (len(closedClients) - len(ccc))
3✔
325
                closedClients = ccc
3✔
326
        }
327

328
        totalClients := c.Total
1,430✔
329
        if cid > 0 { // Meaning we only want 1.
1,460✔
330
                totalClients = 1
30✔
331
        }
30✔
332
        if state == ConnOpen || state == ConnAll {
1,775✔
333
                openClients = make([]*client, 0, totalClients)
345✔
334
        }
345✔
335

336
        // Data structures for results.
337
        var conns []ConnInfo // Limits allocs for actual ConnInfos.
1,430✔
338
        var pconns ConnInfos
1,430✔
339

1,430✔
340
        switch state {
1,430✔
341
        case ConnOpen:
339✔
342
                conns = make([]ConnInfo, totalClients)
339✔
343
                pconns = make(ConnInfos, totalClients)
339✔
344
        case ConnClosed:
1,085✔
345
                pconns = make(ConnInfos, totalClients)
1,085✔
346
        case ConnAll:
6✔
347
                conns = make([]ConnInfo, cap(openClients))
6✔
348
                pconns = make(ConnInfos, totalClients)
6✔
349
        }
350

351
        // Search by individual CID.
352
        if cid > 0 {
1,460✔
353
                // Let's first check if user also selects on ConnOpen or ConnAll
30✔
354
                // and look for opened connections.
30✔
355
                if state == ConnOpen || state == ConnAll {
42✔
356
                        if client := s.clients[cid]; client != nil {
18✔
357
                                openClients = append(openClients, client)
6✔
358
                                closedClients = nil
6✔
359
                        }
6✔
360
                }
361
                // If we did not find, and the user selected for ConnClosed or ConnAll,
362
                // look for closed connections.
363
                if len(openClients) == 0 && (state == ConnClosed || state == ConnAll) {
50✔
364
                        copyClosed := closedClients
20✔
365
                        closedClients = nil
20✔
366
                        for _, cc := range copyClosed {
98✔
367
                                if cc.Cid == cid {
95✔
368
                                        closedClients = []*closedClient{cc}
17✔
369
                                        break
17✔
370
                                }
371
                        }
372
                }
373
        } else {
1,400✔
374
                // Gather all open clients.
1,400✔
375
                if state == ConnOpen || state == ConnAll {
1,733✔
376
                        for _, client := range clist {
874✔
377
                                // If we have an account specified we need to filter.
541✔
378
                                if acc != _EMPTY_ && (client.acc == nil || client.acc.Name != acc) {
541✔
379
                                        continue
×
380
                                }
381
                                // Do user filtering second
382
                                if user != _EMPTY_ && client.getRawAuthUserLock() != user {
586✔
383
                                        continue
45✔
384
                                }
385
                                // Do mqtt client ID filtering next
386
                                if mqttCID != _EMPTY_ && client.getMQTTClientID() != mqttCID {
496✔
387
                                        continue
×
388
                                }
389
                                openClients = append(openClients, client)
496✔
390
                        }
391
                }
392
        }
393
        s.mu.RUnlock()
1,430✔
394

1,430✔
395
        // Filter by subject now if needed. We do this outside of server lock.
1,430✔
396
        if filter != _EMPTY_ {
1,430✔
397
                var oc []*client
×
398
                for _, c := range openClients {
×
399
                        c.mu.Lock()
×
400
                        for _, sub := range c.subs {
×
401
                                if SubjectsCollide(filter, string(sub.subject)) {
×
402
                                        oc = append(oc, c)
×
403
                                        break
×
404
                                }
405
                        }
406
                        c.mu.Unlock()
×
407
                        openClients = oc
×
408
                }
409
        }
410

411
        // Just return with empty array if nothing here.
412
        if len(openClients) == 0 && len(closedClients) == 0 {
1,576✔
413
                c.Conns = ConnInfos{}
146✔
414
                return c, nil
146✔
415
        }
146✔
416

417
        // Now whip through and generate ConnInfo entries
418
        // Open Clients
419
        i := 0
1,284✔
420
        for _, client := range openClients {
1,786✔
421
                client.mu.Lock()
502✔
422
                ci := &conns[i]
502✔
423
                ci.fill(client, client.nc, c.Now, auth)
502✔
424
                // Fill in subscription data if requested.
502✔
425
                if len(client.subs) > 0 {
764✔
426
                        if subsDet {
264✔
427
                                ci.SubsDetail = newSubsDetailList(client)
2✔
428
                        } else if subs {
269✔
429
                                ci.Subs = newSubsList(client)
7✔
430
                        }
7✔
431
                }
432
                // Fill in user if auth requested.
433
                if auth {
546✔
434
                        ci.AuthorizedUser = client.getRawAuthUser()
44✔
435
                        if name := client.acc.GetName(); name != globalAccountName {
79✔
436
                                ci.Account = name
35✔
437
                        }
35✔
438
                        ci.JWT = client.opts.JWT
44✔
439
                        ci.IssuerKey = issuerForClient(client)
44✔
440
                        ci.Tags = client.tags
44✔
441
                        ci.NameTag = client.acc.getNameTag()
44✔
442
                }
443
                client.mu.Unlock()
502✔
444
                pconns[i] = ci
502✔
445
                i++
502✔
446
        }
447
        // Closed Clients
448
        var needCopy bool
1,284✔
449
        if subs || auth {
1,854✔
450
                needCopy = true
570✔
451
        }
570✔
452
        for _, cc := range closedClients {
105,950✔
453
                // If we have an account specified we need to filter.
104,666✔
454
                if acc != _EMPTY_ && cc.acc != acc {
104,666✔
455
                        continue
×
456
                }
457
                // Do user filtering second
458
                if user != _EMPTY_ && cc.user != user {
104,701✔
459
                        continue
35✔
460
                }
461
                // Do mqtt client ID filtering next
462
                if mqttCID != _EMPTY_ && cc.MQTTClient != mqttCID {
104,631✔
463
                        continue
×
464
                }
465
                // Copy if needed for any changes to the ConnInfo
466
                if needCopy {
156,547✔
467
                        cx := *cc
51,916✔
468
                        cc = &cx
51,916✔
469
                }
51,916✔
470
                // Fill in subscription data if requested.
471
                if len(cc.subs) > 0 {
104,681✔
472
                        if subsDet {
52✔
473
                                cc.SubsDetail = cc.subs
2✔
474
                        } else if subs {
52✔
475
                                cc.Subs = make([]string, 0, len(cc.subs))
2✔
476
                                for _, sub := range cc.subs {
4✔
477
                                        cc.Subs = append(cc.Subs, sub.Subject)
2✔
478
                                }
2✔
479
                        }
480
                }
481
                // Fill in user if auth requested.
482
                if auth {
104,645✔
483
                        cc.AuthorizedUser = cc.user
14✔
484
                        if cc.acc != _EMPTY_ && (cc.acc != globalAccountName) {
28✔
485
                                cc.Account = cc.acc
14✔
486
                                if acc, err := s.LookupAccount(cc.acc); err == nil {
28✔
487
                                        cc.NameTag = acc.getNameTag()
14✔
488
                                }
14✔
489
                        }
490
                }
491
                pconns[i] = &cc.ConnInfo
104,631✔
492
                i++
104,631✔
493
        }
494

495
        // This will trip if we have filtered out client connections.
496
        if len(pconns) != i {
1,300✔
497
                pconns = pconns[:i]
16✔
498
                totalClients = i
16✔
499
        }
16✔
500

501
        switch sortOpt {
1,284✔
502
        case ByCid, ByStart:
1,258✔
503
                sort.Sort(SortByCid{pconns})
1,258✔
504
        case BySubs:
2✔
505
                sort.Sort(sort.Reverse(SortBySubs{pconns}))
2✔
506
        case ByPending:
2✔
507
                sort.Sort(sort.Reverse(SortByPending{pconns}))
2✔
508
        case ByOutMsgs:
2✔
509
                sort.Sort(sort.Reverse(SortByOutMsgs{pconns}))
2✔
510
        case ByInMsgs:
2✔
511
                sort.Sort(sort.Reverse(SortByInMsgs{pconns}))
2✔
512
        case ByOutBytes:
2✔
513
                sort.Sort(sort.Reverse(SortByOutBytes{pconns}))
2✔
514
        case ByInBytes:
2✔
515
                sort.Sort(sort.Reverse(SortByInBytes{pconns}))
2✔
516
        case ByLast:
2✔
517
                sort.Sort(sort.Reverse(SortByLast{pconns}))
2✔
518
        case ByIdle:
2✔
519
                sort.Sort(sort.Reverse(SortByIdle{pconns, c.Now}))
2✔
520
        case ByUptime:
4✔
521
                sort.Sort(SortByUptime{pconns, time.Now()})
4✔
522
        case ByStop:
2✔
523
                sort.Sort(sort.Reverse(SortByStop{pconns}))
2✔
524
        case ByReason:
2✔
525
                sort.Sort(SortByReason{pconns})
2✔
526
        case ByRTT:
2✔
527
                sort.Sort(sort.Reverse(SortByRTT{pconns}))
2✔
528
        }
529

530
        minoff := c.Offset
1,284✔
531
        maxoff := c.Offset + c.Limit
1,284✔
532

1,284✔
533
        maxIndex := totalClients
1,284✔
534

1,284✔
535
        // Make sure these are sane.
1,284✔
536
        if minoff > maxIndex {
1,284✔
537
                minoff = maxIndex
×
538
        }
×
539
        if maxoff > maxIndex {
2,542✔
540
                maxoff = maxIndex
1,258✔
541
        }
1,258✔
542

543
        // Now pare down to the requested size.
544
        // TODO(dlc) - for very large number of connections we
545
        // could save the whole list in a hash, send hash on first
546
        // request and allow users to use has for subsequent pages.
547
        // Low TTL, say < 1sec.
548
        c.Conns = pconns[minoff:maxoff]
1,284✔
549
        c.NumConns = len(c.Conns)
1,284✔
550

1,284✔
551
        return c, nil
1,284✔
552
}
553

554
// Fills in the ConnInfo from the client.
555
// client should be locked.
556
func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) {
12,548✔
557
        // For fast sort if required.
12,548✔
558
        rtt := client.getRTT()
12,548✔
559
        ci.rtt = int64(rtt)
12,548✔
560

12,548✔
561
        ci.Cid = client.cid
12,548✔
562
        ci.MQTTClient = client.getMQTTClientID()
12,548✔
563
        ci.Kind = client.kindString()
12,548✔
564
        ci.Type = client.clientTypeString()
12,548✔
565
        ci.Start = client.start
12,548✔
566
        ci.LastActivity = client.last
12,548✔
567
        ci.Uptime = myUptime(now.Sub(client.start))
12,548✔
568
        ci.Idle = myUptime(now.Sub(client.last))
12,548✔
569
        ci.RTT = rtt.String()
12,548✔
570
        ci.OutMsgs = client.outMsgs
12,548✔
571
        ci.OutBytes = client.outBytes
12,548✔
572
        ci.NumSubs = uint32(len(client.subs))
12,548✔
573
        ci.Pending = int(client.out.pb)
12,548✔
574
        ci.Name = client.opts.Name
12,548✔
575
        ci.Lang = client.opts.Lang
12,548✔
576
        ci.Version = client.opts.Version
12,548✔
577
        // inMsgs and inBytes are updated outside of the client's lock, so
12,548✔
578
        // we need to use atomic here.
12,548✔
579
        ci.InMsgs = atomic.LoadInt64(&client.inMsgs)
12,548✔
580
        ci.InBytes = atomic.LoadInt64(&client.inBytes)
12,548✔
581
        ci.Stalls = atomic.LoadInt64(&client.stalls)
12,548✔
582
        ci.Proxy = createProxyInfo(client)
12,548✔
583

12,548✔
584
        // If the connection is gone, too bad, we won't set TLSVersion and TLSCipher.
12,548✔
585
        // Exclude clients that are still doing handshake so we don't block in
12,548✔
586
        // ConnectionState().
12,548✔
587
        if client.flags.isSet(handshakeComplete) && nc != nil {
12,869✔
588
                if conn, ok := nc.(*tls.Conn); ok {
642✔
589
                        cs := conn.ConnectionState()
321✔
590
                        ci.TLSVersion = tlsVersion(cs.Version)
321✔
591
                        ci.TLSCipher = tls.CipherSuiteName(cs.CipherSuite)
321✔
592
                        if auth && len(cs.PeerCertificates) > 0 {
323✔
593
                                ci.TLSPeerCerts = makePeerCerts(cs.PeerCertificates)
2✔
594
                        }
2✔
595
                        ci.TLSFirst = client.flags.isSet(didTLSFirst)
321✔
596
                }
597
        }
598

599
        if client.port != 0 {
24,872✔
600
                ci.Port = int(client.port)
12,324✔
601
                ci.IP = client.host
12,324✔
602
        }
12,324✔
603
}
604

605
// If this client came from a trusted proxy, this will return a ProxyInfo
606
// to be used in ConnInfo or LeafInfo.
607
//
608
// Client lock must be held on entry.
609
func createProxyInfo(c *client) *ProxyInfo {
12,679✔
610
        if c.proxyKey == _EMPTY_ {
25,346✔
611
                return nil
12,667✔
612
        }
12,667✔
613
        return &ProxyInfo{Key: c.proxyKey}
12✔
614
}
615

616
func makePeerCerts(pc []*x509.Certificate) []*TLSPeerCert {
2✔
617
        res := make([]*TLSPeerCert, len(pc))
2✔
618
        for i, c := range pc {
4✔
619
                tmp := sha256.Sum256(c.RawSubjectPublicKeyInfo)
2✔
620
                ssha := hex.EncodeToString(tmp[:])
2✔
621
                tmp = sha256.Sum256(c.Raw)
2✔
622
                csha := hex.EncodeToString(tmp[:])
2✔
623
                res[i] = &TLSPeerCert{Subject: c.Subject.String(), SubjectPKISha256: ssha, CertSha256: csha}
2✔
624
        }
2✔
625
        return res
2✔
626
}
627

628
// Assume lock is held
629
func (c *client) getRTT() time.Duration {
708,447✔
630
        if c.rtt == 0 {
1,398,309✔
631
                // If a real client, go ahead and send ping now to get a value
689,862✔
632
                // for RTT. For tests and telnet, or if client is closing, etc skip.
689,862✔
633
                if c.opts.Lang != _EMPTY_ {
689,862✔
634
                        c.sendRTTPingLocked()
×
635
                }
×
636
                return 0
689,862✔
637
        }
638
        var rtt time.Duration
18,585✔
639
        if c.rtt > time.Microsecond && c.rtt < time.Millisecond {
34,984✔
640
                rtt = c.rtt.Truncate(time.Microsecond)
16,399✔
641
        } else {
18,585✔
642
                rtt = c.rtt.Truncate(time.Nanosecond)
2,186✔
643
        }
2,186✔
644
        return rtt
18,585✔
645
}
646

647
func decodeBool(w http.ResponseWriter, r *http.Request, param string) (bool, error) {
3,627✔
648
        str := r.URL.Query().Get(param)
3,627✔
649
        if str == _EMPTY_ {
6,601✔
650
                return false, nil
2,974✔
651
        }
2,974✔
652
        val, err := strconv.ParseBool(str)
653✔
653
        if err != nil {
656✔
654
                w.WriteHeader(http.StatusBadRequest)
3✔
655
                w.Write([]byte(fmt.Sprintf("Error decoding boolean for '%s': %v", param, err)))
3✔
656
                return false, err
3✔
657
        }
3✔
658
        return val, nil
650✔
659
}
660

661
func decodeUint64(w http.ResponseWriter, r *http.Request, param string) (uint64, error) {
1,286✔
662
        str := r.URL.Query().Get(param)
1,286✔
663
        if str == _EMPTY_ {
2,562✔
664
                return 0, nil
1,276✔
665
        }
1,276✔
666
        val, err := strconv.ParseUint(str, 10, 64)
10✔
667
        if err != nil {
10✔
668
                w.WriteHeader(http.StatusBadRequest)
×
669
                w.Write([]byte(fmt.Sprintf("Error decoding uint64 for '%s': %v", param, err)))
×
670
                return 0, err
×
671
        }
×
672
        return val, nil
10✔
673
}
674

675
func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
3,035✔
676
        str := r.URL.Query().Get(param)
3,035✔
677
        if str == _EMPTY_ {
6,030✔
678
                return 0, nil
2,995✔
679
        }
2,995✔
680
        val, err := strconv.Atoi(str)
40✔
681
        if err != nil {
42✔
682
                w.WriteHeader(http.StatusBadRequest)
2✔
683
                w.Write([]byte(fmt.Sprintf("Error decoding int for '%s': %v", param, err)))
2✔
684
                return 0, err
2✔
685
        }
2✔
686
        return val, nil
38✔
687
}
688

689
func decodeState(w http.ResponseWriter, r *http.Request) (ConnState, error) {
1,286✔
690
        str := r.URL.Query().Get("state")
1,286✔
691
        if str == _EMPTY_ {
1,493✔
692
                return ConnOpen, nil
207✔
693
        }
207✔
694
        switch strings.ToLower(str) {
1,079✔
695
        case "open":
12✔
696
                return ConnOpen, nil
12✔
697
        case "closed":
1,063✔
698
                return ConnClosed, nil
1,063✔
699
        case "any", "all":
3✔
700
                return ConnAll, nil
3✔
701
        }
702
        // We do not understand intended state here.
703
        w.WriteHeader(http.StatusBadRequest)
1✔
704
        err := fmt.Errorf("Error decoding state for %s", str)
1✔
705
        w.Write([]byte(err.Error()))
1✔
706
        return 0, err
1✔
707
}
708

709
func decodeSubs(w http.ResponseWriter, r *http.Request) (subs bool, subsDet bool, err error) {
1,594✔
710
        subsDet = strings.ToLower(r.URL.Query().Get("subs")) == "detail"
1,594✔
711
        if !subsDet {
3,183✔
712
                subs, err = decodeBool(w, r, "subs")
1,589✔
713
        }
1,589✔
714
        return
1,594✔
715
}
716

717
// HandleConnz process HTTP requests for connection information.
718
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
1,290✔
719
        sortOpt := SortOpt(r.URL.Query().Get("sort"))
1,290✔
720
        auth, err := decodeBool(w, r, "auth")
1,290✔
721
        if err != nil {
1,291✔
722
                return
1✔
723
        }
1✔
724
        subs, subsDet, err := decodeSubs(w, r)
1,289✔
725
        if err != nil {
1,290✔
726
                return
1✔
727
        }
1✔
728
        offset, err := decodeInt(w, r, "offset")
1,288✔
729
        if err != nil {
1,289✔
730
                return
1✔
731
        }
1✔
732
        limit, err := decodeInt(w, r, "limit")
1,287✔
733
        if err != nil {
1,288✔
734
                return
1✔
735
        }
1✔
736
        cid, err := decodeUint64(w, r, "cid")
1,286✔
737
        if err != nil {
1,286✔
738
                return
×
739
        }
×
740
        state, err := decodeState(w, r)
1,286✔
741
        if err != nil {
1,287✔
742
                return
1✔
743
        }
1✔
744

745
        user := r.URL.Query().Get("user")
1,285✔
746
        acc := r.URL.Query().Get("acc")
1,285✔
747
        mqttCID := r.URL.Query().Get("mqtt_client")
1,285✔
748

1,285✔
749
        connzOpts := &ConnzOptions{
1,285✔
750
                Sort:                sortOpt,
1,285✔
751
                Username:            auth,
1,285✔
752
                Subscriptions:       subs,
1,285✔
753
                SubscriptionsDetail: subsDet,
1,285✔
754
                Offset:              offset,
1,285✔
755
                Limit:               limit,
1,285✔
756
                CID:                 cid,
1,285✔
757
                MQTTClient:          mqttCID,
1,285✔
758
                State:               state,
1,285✔
759
                User:                user,
1,285✔
760
                Account:             acc,
1,285✔
761
        }
1,285✔
762

1,285✔
763
        s.mu.Lock()
1,285✔
764
        s.httpReqStats[ConnzPath]++
1,285✔
765
        s.mu.Unlock()
1,285✔
766

1,285✔
767
        c, err := s.Connz(connzOpts)
1,285✔
768
        if err != nil {
1,286✔
769
                w.WriteHeader(http.StatusBadRequest)
1✔
770
                w.Write([]byte(err.Error()))
1✔
771
                return
1✔
772
        }
1✔
773
        b, err := json.MarshalIndent(c, "", "  ")
1,284✔
774
        if err != nil {
1,284✔
775
                s.Errorf("Error marshaling response to /connz request: %v", err)
×
776
        }
×
777

778
        // Handle response
779
        ResponseHandler(w, r, b)
1,284✔
780
}
781

782
// Routez represents detailed information on current client connections.
783
type Routez struct {
784
        ID        string             `json:"server_id"`
785
        Name      string             `json:"server_name"`
786
        Now       time.Time          `json:"now"`
787
        Import    *SubjectPermission `json:"import,omitempty"`
788
        Export    *SubjectPermission `json:"export,omitempty"`
789
        NumRoutes int                `json:"num_routes"`
790
        Routes    []*RouteInfo       `json:"routes"`
791
}
792

793
// RoutezOptions are options passed to Routez
794
type RoutezOptions struct {
795
        // Subscriptions indicates that Routez will return a route's subscriptions
796
        Subscriptions bool `json:"subscriptions"`
797
        // SubscriptionsDetail indicates if subscription details should be included in the results
798
        SubscriptionsDetail bool `json:"subscriptions_detail"`
799
}
800

801
// RouteInfo has detailed information on a per connection basis.
802
type RouteInfo struct {
803
        Rid          uint64             `json:"rid"`
804
        RemoteID     string             `json:"remote_id"`
805
        RemoteName   string             `json:"remote_name"`
806
        DidSolicit   bool               `json:"did_solicit"`
807
        IsConfigured bool               `json:"is_configured"`
808
        IP           string             `json:"ip"`
809
        Port         int                `json:"port"`
810
        Start        time.Time          `json:"start"`
811
        LastActivity time.Time          `json:"last_activity"`
812
        RTT          string             `json:"rtt,omitempty"`
813
        Uptime       string             `json:"uptime"`
814
        Idle         string             `json:"idle"`
815
        Import       *SubjectPermission `json:"import,omitempty"`
816
        Export       *SubjectPermission `json:"export,omitempty"`
817
        Pending      int                `json:"pending_size"`
818
        InMsgs       int64              `json:"in_msgs"`
819
        OutMsgs      int64              `json:"out_msgs"`
820
        InBytes      int64              `json:"in_bytes"`
821
        OutBytes     int64              `json:"out_bytes"`
822
        NumSubs      uint32             `json:"subscriptions"`
823
        Subs         []string           `json:"subscriptions_list,omitempty"`
824
        SubsDetail   []SubDetail        `json:"subscriptions_list_detail,omitempty"`
825
        Account      string             `json:"account,omitempty"`
826
        Compression  string             `json:"compression,omitempty"`
827
}
828

829
// Routez returns a Routez struct containing information about routes.
830
func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
758,173✔
831
        rs := &Routez{
758,173✔
832
                Now:    time.Now().UTC(),
758,173✔
833
                Routes: []*RouteInfo{},
758,173✔
834
        }
758,173✔
835

758,173✔
836
        if routezOpts == nil {
1,516,038✔
837
                routezOpts = &RoutezOptions{}
757,865✔
838
        }
757,865✔
839

840
        s.mu.Lock()
758,173✔
841
        rs.NumRoutes = s.numRoutes()
758,173✔
842

758,173✔
843
        // copy the server id for monitoring
758,173✔
844
        rs.ID = s.info.ID
758,173✔
845

758,173✔
846
        // Check for defined permissions for all connected routes.
758,173✔
847
        if perms := s.getOpts().Cluster.Permissions; perms != nil {
758,175✔
848
                rs.Import = perms.Import
2✔
849
                rs.Export = perms.Export
2✔
850
        }
2✔
851
        rs.Name = s.info.Name
758,173✔
852

758,173✔
853
        addRoute := func(r *client) {
1,445,596✔
854
                r.mu.Lock()
687,423✔
855
                ri := &RouteInfo{
687,423✔
856
                        Rid:          r.cid,
687,423✔
857
                        RemoteID:     r.route.remoteID,
687,423✔
858
                        RemoteName:   r.route.remoteName,
687,423✔
859
                        DidSolicit:   r.route.didSolicit,
687,423✔
860
                        IsConfigured: r.route.routeType == Explicit,
687,423✔
861
                        InMsgs:       atomic.LoadInt64(&r.inMsgs),
687,423✔
862
                        OutMsgs:      r.outMsgs,
687,423✔
863
                        InBytes:      atomic.LoadInt64(&r.inBytes),
687,423✔
864
                        OutBytes:     r.outBytes,
687,423✔
865
                        NumSubs:      uint32(len(r.subs)),
687,423✔
866
                        Import:       r.opts.Import,
687,423✔
867
                        Pending:      int(r.out.pb),
687,423✔
868
                        Export:       r.opts.Export,
687,423✔
869
                        RTT:          r.getRTT().String(),
687,423✔
870
                        Start:        r.start,
687,423✔
871
                        LastActivity: r.last,
687,423✔
872
                        Uptime:       myUptime(rs.Now.Sub(r.start)),
687,423✔
873
                        Idle:         myUptime(rs.Now.Sub(r.last)),
687,423✔
874
                        Account:      string(r.route.accName),
687,423✔
875
                        Compression:  r.route.compression,
687,423✔
876
                }
687,423✔
877

687,423✔
878
                if len(r.subs) > 0 {
687,466✔
879
                        if routezOpts.SubscriptionsDetail {
47✔
880
                                ri.SubsDetail = newSubsDetailList(r)
4✔
881
                        } else if routezOpts.Subscriptions {
51✔
882
                                ri.Subs = newSubsList(r)
8✔
883
                        }
8✔
884
                }
885

886
                switch conn := r.nc.(type) {
687,423✔
887
                case *net.TCPConn, *tls.Conn:
676,554✔
888
                        addr := conn.RemoteAddr().(*net.TCPAddr)
676,554✔
889
                        ri.Port = addr.Port
676,554✔
890
                        ri.IP = addr.IP.String()
676,554✔
891
                }
892
                r.mu.Unlock()
687,423✔
893
                rs.Routes = append(rs.Routes, ri)
687,423✔
894
        }
895

896
        // Walk the list
897
        s.forEachRoute(func(r *client) {
1,445,596✔
898
                addRoute(r)
687,423✔
899
        })
687,423✔
900
        s.mu.Unlock()
758,173✔
901
        return rs, nil
758,173✔
902
}
903

904
// HandleRoutez process HTTP requests for route information.
905
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
286✔
906
        subs, subsDetail, err := decodeSubs(w, r)
286✔
907
        if err != nil {
287✔
908
                return
1✔
909
        }
1✔
910

911
        opts := RoutezOptions{Subscriptions: subs, SubscriptionsDetail: subsDetail}
285✔
912

285✔
913
        s.mu.Lock()
285✔
914
        s.httpReqStats[RoutezPath]++
285✔
915
        s.mu.Unlock()
285✔
916

285✔
917
        // As of now, no error is ever returned.
285✔
918
        rs, _ := s.Routez(&opts)
285✔
919
        b, err := json.MarshalIndent(rs, "", "  ")
285✔
920
        if err != nil {
285✔
921
                s.Errorf("Error marshaling response to /routez request: %v", err)
×
922
        }
×
923

924
        // Handle response
925
        ResponseHandler(w, r, b)
285✔
926
}
927

928
// Subsz represents detail information on current connections.
929
type Subsz struct {
930
        ID  string    `json:"server_id"`
931
        Now time.Time `json:"now"`
932
        *SublistStats
933
        Total  int         `json:"total"`
934
        Offset int         `json:"offset"`
935
        Limit  int         `json:"limit"`
936
        Subs   []SubDetail `json:"subscriptions_list,omitempty"`
937
}
938

939
// SubszOptions are the options passed to Subsz.
940
// As of now, there are no options defined.
941
type SubszOptions struct {
942
        // Offset is used for pagination. Subsz() only returns connections starting at this
943
        // offset from the global results.
944
        Offset int `json:"offset"`
945

946
        // Limit is the maximum number of subscriptions that should be returned by Subsz().
947
        Limit int `json:"limit"`
948

949
        // Subscriptions indicates if subscription details should be included in the results.
950
        Subscriptions bool `json:"subscriptions"`
951

952
        // Filter based on this account name.
953
        Account string `json:"account,omitempty"`
954

955
        // Test the list against this subject. Needs to be literal since it signifies a publish subject.
956
        // We will only return subscriptions that would match if a message was sent to this subject.
957
        Test string `json:"test,omitempty"`
958
}
959

960
// SubDetail is for verbose information for subscriptions.
961
type SubDetail struct {
962
        Account    string `json:"account,omitempty"`
963
        AccountTag string `json:"account_tag,omitempty"`
964
        Subject    string `json:"subject"`
965
        Queue      string `json:"qgroup,omitempty"`
966
        Sid        string `json:"sid"`
967
        Msgs       int64  `json:"msgs"`
968
        Max        int64  `json:"max,omitempty"`
969
        Cid        uint64 `json:"cid"`
970
}
971

972
// Subscription client should be locked and guaranteed to be present.
973
func newSubDetail(sub *subscription) SubDetail {
149,800✔
974
        sd := newClientSubDetail(sub)
149,800✔
975
        sd.Account = sub.client.acc.GetName()
149,800✔
976
        sd.AccountTag = sub.client.acc.getNameTag()
149,800✔
977
        return sd
149,800✔
978
}
149,800✔
979

980
// For subs details under clients.
981
func newClientSubDetail(sub *subscription) SubDetail {
149,935✔
982
        return SubDetail{
149,935✔
983
                Subject: string(sub.subject),
149,935✔
984
                Queue:   string(sub.queue),
149,935✔
985
                Sid:     string(sub.sid),
149,935✔
986
                Msgs:    sub.nm,
149,935✔
987
                Max:     sub.max,
149,935✔
988
                Cid:     sub.client.cid,
149,935✔
989
        }
149,935✔
990
}
149,935✔
991

992
// Subsz returns a Subsz struct containing subjects statistics
993
func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
208✔
994
        var (
208✔
995
                subdetail bool
208✔
996
                test      bool
208✔
997
                offset    int
208✔
998
                testSub   string
208✔
999
                filterAcc string
208✔
1000
                limit     = DefaultSubListSize
208✔
1001
        )
208✔
1002

208✔
1003
        if opts != nil {
402✔
1004
                subdetail = opts.Subscriptions
194✔
1005
                offset = opts.Offset
194✔
1006
                if offset < 0 {
194✔
1007
                        offset = 0
×
1008
                }
×
1009
                limit = opts.Limit
194✔
1010
                if limit <= 0 {
380✔
1011
                        limit = DefaultSubListSize
186✔
1012
                }
186✔
1013
                if opts.Test != _EMPTY_ {
203✔
1014
                        testSub = opts.Test
9✔
1015
                        test = true
9✔
1016
                        if !IsValidLiteralSubject(testSub) {
13✔
1017
                                return nil, fmt.Errorf("invalid test subject, must be valid publish subject: %s", testSub)
4✔
1018
                        }
4✔
1019
                }
1020
                if opts.Account != _EMPTY_ {
196✔
1021
                        filterAcc = opts.Account
6✔
1022
                }
6✔
1023
        }
1024

1025
        slStats := &SublistStats{}
204✔
1026

204✔
1027
        // FIXME(dlc) - Make account aware.
204✔
1028
        sz := &Subsz{
204✔
1029
                ID:           s.info.ID,
204✔
1030
                Now:          time.Now().UTC(),
204✔
1031
                SublistStats: slStats,
204✔
1032
                Total:        0,
204✔
1033
                Offset:       offset,
204✔
1034
                Limit:        limit,
204✔
1035
                Subs:         nil,
204✔
1036
        }
204✔
1037

204✔
1038
        if subdetail {
222✔
1039
                var raw [4096]*subscription
18✔
1040
                subs := raw[:0]
18✔
1041
                s.accounts.Range(func(k, v any) bool {
58✔
1042
                        acc := v.(*Account)
40✔
1043
                        if filterAcc != _EMPTY_ && acc.GetName() != filterAcc {
58✔
1044
                                return true
18✔
1045
                        }
18✔
1046
                        slStats.add(acc.sl.Stats())
22✔
1047
                        acc.sl.localSubs(&subs, false)
22✔
1048
                        return true
22✔
1049
                })
1050

1051
                details := make([]SubDetail, 0, len(subs))
18✔
1052
                i := 0
18✔
1053
                // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
18✔
1054
                for _, sub := range subs {
1,483✔
1055
                        // Check for filter
1,465✔
1056
                        if test && !matchLiteral(testSub, string(sub.subject)) {
1,577✔
1057
                                continue
112✔
1058
                        }
1059
                        if sub.client == nil {
1,353✔
1060
                                continue
×
1061
                        }
1062
                        sub.client.mu.Lock()
1,353✔
1063
                        details = append(details, newSubDetail(sub))
1,353✔
1064
                        sub.client.mu.Unlock()
1,353✔
1065
                        i++
1,353✔
1066
                }
1067
                minoff := sz.Offset
18✔
1068
                maxoff := sz.Offset + sz.Limit
18✔
1069

18✔
1070
                maxIndex := i
18✔
1071

18✔
1072
                // Make sure these are sane.
18✔
1073
                if minoff > maxIndex {
18✔
1074
                        minoff = maxIndex
×
1075
                }
×
1076
                if maxoff > maxIndex {
30✔
1077
                        maxoff = maxIndex
12✔
1078
                }
12✔
1079
                sz.Subs = details[minoff:maxoff]
18✔
1080
                sz.Total = len(details)
18✔
1081
        } else {
186✔
1082
                s.accounts.Range(func(k, v any) bool {
394✔
1083
                        acc := v.(*Account)
208✔
1084
                        if filterAcc != _EMPTY_ && acc.GetName() != filterAcc {
208✔
1085
                                return true
×
1086
                        }
×
1087
                        slStats.add(acc.sl.Stats())
208✔
1088
                        return true
208✔
1089
                })
1090
        }
1091

1092
        return sz, nil
204✔
1093
}
1094

1095
// HandleSubsz processes HTTP requests for subjects stats.
1096
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
177✔
1097
        s.mu.Lock()
177✔
1098
        s.httpReqStats[SubszPath]++
177✔
1099
        s.mu.Unlock()
177✔
1100

177✔
1101
        subs, err := decodeBool(w, r, "subs")
177✔
1102
        if err != nil {
177✔
1103
                return
×
1104
        }
×
1105
        offset, err := decodeInt(w, r, "offset")
177✔
1106
        if err != nil {
177✔
1107
                return
×
1108
        }
×
1109
        limit, err := decodeInt(w, r, "limit")
177✔
1110
        if err != nil {
177✔
1111
                return
×
1112
        }
×
1113
        testSub := r.URL.Query().Get("test")
177✔
1114
        // Filtered account.
177✔
1115
        filterAcc := r.URL.Query().Get("acc")
177✔
1116

177✔
1117
        subszOpts := &SubszOptions{
177✔
1118
                Subscriptions: subs,
177✔
1119
                Offset:        offset,
177✔
1120
                Limit:         limit,
177✔
1121
                Account:       filterAcc,
177✔
1122
                Test:          testSub,
177✔
1123
        }
177✔
1124

177✔
1125
        st, err := s.Subsz(subszOpts)
177✔
1126
        if err != nil {
181✔
1127
                w.WriteHeader(http.StatusBadRequest)
4✔
1128
                w.Write([]byte(err.Error()))
4✔
1129
                return
4✔
1130
        }
4✔
1131

1132
        var b []byte
173✔
1133
        b, err = json.MarshalIndent(st, "", "  ")
173✔
1134
        if err != nil {
173✔
1135
                s.Errorf("Error marshaling response to /subscriptionsz request: %v", err)
×
1136
        }
×
1137

1138
        // Handle response
1139
        ResponseHandler(w, r, b)
173✔
1140
}
1141

1142
// HandleStacksz processes HTTP requests for getting stacks
1143
func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) {
60✔
1144
        // Do not get any lock here that would prevent getting the stacks
60✔
1145
        // if we were to have a deadlock somewhere.
60✔
1146
        var defaultBuf [defaultStackBufSize]byte
60✔
1147
        size := defaultStackBufSize
60✔
1148
        buf := defaultBuf[:size]
60✔
1149
        n := 0
60✔
1150
        for {
254✔
1151
                n = runtime.Stack(buf, true)
194✔
1152
                if n < size {
254✔
1153
                        break
60✔
1154
                }
1155
                size *= 2
134✔
1156
                buf = make([]byte, size)
134✔
1157
        }
1158
        // Handle response
1159
        ResponseHandler(w, r, buf[:n])
60✔
1160
}
1161

1162
type IpqueueszStatusIPQ struct {
1163
        Pending    int `json:"pending"`
1164
        InProgress int `json:"in_progress,omitempty"`
1165
}
1166

1167
type IpqueueszStatus map[string]IpqueueszStatusIPQ
1168

1169
func (s *Server) Ipqueuesz(opts *IpqueueszOptions) *IpqueueszStatus {
1✔
1170
        all, qfilter := opts.All, opts.Filter
1✔
1171
        queues := IpqueueszStatus{}
1✔
1172
        s.ipQueues.Range(func(k, v any) bool {
8✔
1173
                var pending, inProgress int
7✔
1174
                name := k.(string)
7✔
1175
                queue, ok := v.(interface {
7✔
1176
                        len() int
7✔
1177
                        inProgress() int64
7✔
1178
                })
7✔
1179
                if ok {
14✔
1180
                        pending = queue.len()
7✔
1181
                        inProgress = int(queue.inProgress())
7✔
1182
                }
7✔
1183
                if !all && (pending == 0 && inProgress == 0) {
7✔
1184
                        return true
×
1185
                } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) {
7✔
1186
                        return true
×
1187
                }
×
1188
                queues[name] = IpqueueszStatusIPQ{Pending: pending, InProgress: inProgress}
7✔
1189
                return true
7✔
1190
        })
1191
        return &queues
1✔
1192
}
1193

1194
func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
1✔
1195
        all, err := decodeBool(w, r, "all")
1✔
1196
        if err != nil {
1✔
1197
                return
×
1198
        }
×
1199
        qfilter := r.URL.Query().Get("queues")
1✔
1200

1✔
1201
        queues := s.Ipqueuesz(&IpqueueszOptions{
1✔
1202
                All:    all,
1✔
1203
                Filter: qfilter,
1✔
1204
        })
1✔
1205

1✔
1206
        b, _ := json.MarshalIndent(queues, "", "   ")
1✔
1207
        ResponseHandler(w, r, b)
1✔
1208
}
1209

1210
// Varz will output server information on the monitoring port at /varz.
1211
type Varz struct {
1212
        ID                    string                 `json:"server_id"`                         // ID is the unique server ID generated at start
1213
        Name                  string                 `json:"server_name"`                       // Name is the configured server name, equals ID when not set
1214
        Version               string                 `json:"version"`                           // Version is the version of the running server
1215
        Proto                 int                    `json:"proto"`                             // Proto is the protocol version this server supports
1216
        GitCommit             string                 `json:"git_commit,omitempty"`              // GitCommit is the git repository commit hash that the build corresponds with
1217
        GoVersion             string                 `json:"go"`                                // GoVersion is the version of Go used to build this binary
1218
        Host                  string                 `json:"host"`                              // Host is the hostname the server runs on
1219
        Port                  int                    `json:"port"`                              // Port is the port the server listens on for client connections
1220
        AuthRequired          bool                   `json:"auth_required,omitempty"`           // AuthRequired indicates if users are required to authenticate to join the server
1221
        TLSRequired           bool                   `json:"tls_required,omitempty"`            // TLSRequired indicates if connections must use TLS when connecting to this server
1222
        TLSVerify             bool                   `json:"tls_verify,omitempty"`              // TLSVerify indicates if full TLS verification will be performed
1223
        TLSOCSPPeerVerify     bool                   `json:"tls_ocsp_peer_verify,omitempty"`    // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers
1224
        IP                    string                 `json:"ip,omitempty"`                      // IP is the IP address the server listens on if set
1225
        ClientConnectURLs     []string               `json:"connect_urls,omitempty"`            // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server
1226
        WSConnectURLs         []string               `json:"ws_connect_urls,omitempty"`         // WSConnectURLs is the list of URLs websocket clients can use to connect to this server
1227
        MaxConn               int                    `json:"max_connections"`                   // MaxConn is the maximum amount of connections the server can accept
1228
        MaxSubs               int                    `json:"max_subscriptions,omitempty"`       // MaxSubs is the maximum amount of subscriptions the server can manage
1229
        PingInterval          time.Duration          `json:"ping_interval"`                     // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection
1230
        MaxPingsOut           int                    `json:"ping_max"`                          // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale
1231
        HTTPHost              string                 `json:"http_host"`                         // HTTPHost is the HTTP host monitoring connections are accepted on
1232
        HTTPPort              int                    `json:"http_port"`                         // HTTPPort is the port monitoring connections are accepted on
1233
        HTTPBasePath          string                 `json:"http_base_path"`                    // HTTPBasePath is the path prefix for access to monitor endpoints
1234
        HTTPSPort             int                    `json:"https_port"`                        // HTTPSPort is the HTTPS host monitoring connections are accepted on`
1235
        AuthTimeout           float64                `json:"auth_timeout"`                      // AuthTimeout is the amount of seconds connections have to complete authentication
1236
        MaxControlLine        int32                  `json:"max_control_line"`                  // MaxControlLine is the amount of bytes a signal control message may be
1237
        MaxPayload            int                    `json:"max_payload"`                       // MaxPayload is the maximum amount of bytes a message may have as payload
1238
        MaxPending            int64                  `json:"max_pending"`                       // MaxPending is the maximum amount of unprocessed bytes a connection may have
1239
        Cluster               ClusterOptsVarz        `json:"cluster,omitempty"`                 // Cluster is the Cluster state
1240
        Gateway               GatewayOptsVarz        `json:"gateway,omitempty"`                 // Gateway is the Super Cluster state
1241
        LeafNode              LeafNodeOptsVarz       `json:"leaf,omitempty"`                    // LeafNode is the Leafnode state
1242
        MQTT                  MQTTOptsVarz           `json:"mqtt,omitempty"`                    // MQTT is the MQTT state
1243
        Websocket             WebsocketOptsVarz      `json:"websocket,omitempty"`               // Websocket is the Websocket client state
1244
        JetStream             JetStreamVarz          `json:"jetstream,omitempty"`               // JetStream is the JetStream state
1245
        TLSTimeout            float64                `json:"tls_timeout"`                       // TLSTimeout is how long TLS operations have to complete
1246
        WriteDeadline         time.Duration          `json:"write_deadline"`                    // WriteDeadline is the maximum time writes to sockets have to complete
1247
        WriteTimeout          string                 `json:"write_timeout,omitempty"`           // WriteTimeout is the closure policy for write deadline errors
1248
        Start                 time.Time              `json:"start"`                             // Start is time when the server was started
1249
        Now                   time.Time              `json:"now"`                               // Now is the current time of the server
1250
        Uptime                string                 `json:"uptime"`                            // Uptime is how long the server has been running
1251
        Mem                   int64                  `json:"mem"`                               // Mem is the resident memory allocation
1252
        Cores                 int                    `json:"cores"`                             // Cores is the number of cores the process has access to
1253
        MaxProcs              int                    `json:"gomaxprocs"`                        // MaxProcs is the configured GOMAXPROCS value
1254
        MemLimit              int64                  `json:"gomemlimit,omitempty"`              // MemLimit is the configured GOMEMLIMIT value
1255
        CPU                   float64                `json:"cpu"`                               // CPU is the current total CPU usage
1256
        Connections           int                    `json:"connections"`                       // Connections is the current connected connections
1257
        TotalConnections      uint64                 `json:"total_connections"`                 // TotalConnections is the total connections the server have ever handled
1258
        Routes                int                    `json:"routes"`                            // Routes is the number of connected route servers
1259
        Remotes               int                    `json:"remotes"`                           // Remotes is the configured route remote endpoints
1260
        Leafs                 int                    `json:"leafnodes"`                         // Leafs is the number connected leafnode clients
1261
        InMsgs                int64                  `json:"in_msgs"`                           // InMsgs is the number of messages this server received
1262
        OutMsgs               int64                  `json:"out_msgs"`                          // OutMsgs is the number of message this server sent
1263
        InBytes               int64                  `json:"in_bytes"`                          // InBytes is the number of bytes this server received
1264
        OutBytes              int64                  `json:"out_bytes"`                         // OutMsgs is the number of bytes this server sent
1265
        SlowConsumers         int64                  `json:"slow_consumers"`                    // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
1266
        StaleConnections      int64                  `json:"stale_connections"`                 // StaleConnections is the total count of stale connections that were detected
1267
        StalledClients        int64                  `json:"stalled_clients"`                   // StalledClients is the total number of times that clients have been stalled.
1268
        Subscriptions         uint32                 `json:"subscriptions"`                     // Subscriptions is the count of active subscriptions
1269
        HTTPReqStats          map[string]uint64      `json:"http_req_stats"`                    // HTTPReqStats is the number of requests each HTTP endpoint received
1270
        ConfigLoadTime        time.Time              `json:"config_load_time"`                  // ConfigLoadTime is the time the configuration was loaded or reloaded
1271
        ConfigDigest          string                 `json:"config_digest"`                     // ConfigDigest is a calculated hash of the current configuration
1272
        Tags                  jwt.TagList            `json:"tags,omitempty"`                    // Tags are the tags assigned to the server in configuration
1273
        Metadata              map[string]string      `json:"metadata,omitempty"`                // Metadata is the metadata assigned to the server in configuration
1274
        FeatureFlags          map[string]bool        `json:"feature_flags,omitempty"`           // FeatureFlags is the feature flags enabled/disabled in configuration
1275
        TrustedOperatorsJwt   []string               `json:"trusted_operators_jwt,omitempty"`   // TrustedOperatorsJwt is the JWTs for all trusted operators
1276
        TrustedOperatorsClaim []*jwt.OperatorClaims  `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator
1277
        SystemAccount         string                 `json:"system_account,omitempty"`          // SystemAccount is the name of the System account
1278
        PinnedAccountFail     uint64                 `json:"pinned_account_fails,omitempty"`    // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned.
1279
        OCSPResponseCache     *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`         // OCSPResponseCache is the state of the OCSP cache
1280
        SlowConsumersStats    *SlowConsumersStats    `json:"slow_consumer_stats"`               // SlowConsumersStats are statistics about all detected Slow Consumer
1281
        StaleConnectionStats  *StaleConnectionStats  `json:"stale_connection_stats,omitempty"`  // StaleConnectionStats are statistics about all detected Stale Connections
1282
        Proxies               *ProxiesOptsVarz       `json:"proxies,omitempty"`                 // Proxies hold information about network proxy devices
1283
        TLSCertNotAfter       time.Time              `json:"tls_cert_not_after,omitzero"`       // TLSCertNotAfter is the expiration date of the TLS certificate of this server
1284
}
1285

1286
// JetStreamVarz contains basic runtime information about jetstream
1287
type JetStreamVarz struct {
1288
        Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration
1289
        Stats  *JetStreamStats  `json:"stats,omitempty"`  // Stats is the statistics for the JetStream server
1290
        Meta   *MetaClusterInfo `json:"meta,omitempty"`   // Meta is information about the JetStream metalayer
1291
        Limits *JSLimitOpts     `json:"limits,omitempty"` // Limits are the configured JetStream limits
1292
}
1293

1294
// ClusterOptsVarz contains monitoring cluster information
1295
type ClusterOptsVarz struct {
1296
        Name            string        `json:"name,omitempty"`              // Name is the configured cluster name
1297
        Host            string        `json:"addr,omitempty"`              // Host is the host the cluster listens on for connections
1298
        Port            int           `json:"cluster_port,omitempty"`      // Port is the port the cluster listens on for connections
1299
        AuthTimeout     float64       `json:"auth_timeout,omitempty"`      // AuthTimeout is the time cluster connections have to complete authentication
1300
        URLs            []string      `json:"urls,omitempty"`              // URLs is the list of cluster URLs
1301
        TLSTimeout      float64       `json:"tls_timeout,omitempty"`       // TLSTimeout is how long TLS operations have to complete
1302
        TLSRequired     bool          `json:"tls_required,omitempty"`      // TLSRequired indicates if TLS is required for connections
1303
        TLSVerify       bool          `json:"tls_verify,omitempty"`        // TLSVerify indicates if full verification of TLS connections is performed
1304
        PoolSize        int           `json:"pool_size,omitempty"`         // PoolSize is the configured route connection pool size
1305
        WriteDeadline   time.Duration `json:"write_deadline,omitempty"`    // WriteDeadline is the maximum time writes to sockets have to complete
1306
        WriteTimeout    string        `json:"write_timeout,omitempty"`     // WriteTimeout is the closure policy for write deadline errors
1307
        TLSCertNotAfter time.Time     `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificate
1308
}
1309

1310
// GatewayOptsVarz contains monitoring gateway information
1311
type GatewayOptsVarz struct {
1312
        Name            string                  `json:"name,omitempty"`              // Name is the configured cluster name
1313
        Host            string                  `json:"host,omitempty"`              // Host is the host the gateway listens on for connections
1314
        Port            int                     `json:"port,omitempty"`              // Port is the post gateway connections listens on
1315
        AuthTimeout     float64                 `json:"auth_timeout,omitempty"`      // AuthTimeout is the time cluster connections have to complete authentication
1316
        TLSTimeout      float64                 `json:"tls_timeout,omitempty"`       // TLSTimeout is how long TLS operations have to complete
1317
        TLSRequired     bool                    `json:"tls_required,omitempty"`      // TLSRequired indicates if TLS is required for connections
1318
        TLSVerify       bool                    `json:"tls_verify,omitempty"`        // TLSVerify indicates if full verification of TLS connections is performed
1319
        Advertise       string                  `json:"advertise,omitempty"`         // Advertise is the URL advertised to remote gateway clients
1320
        ConnectRetries  int                     `json:"connect_retries,omitempty"`   // ConnectRetries is how many connection attempts the route will make
1321
        Gateways        []RemoteGatewayOptsVarz `json:"gateways,omitempty"`          // Gateways is state of configured gateway remotes
1322
        RejectUnknown   bool                    `json:"reject_unknown,omitempty"`    // RejectUnknown indicates if unknown cluster connections will be rejected
1323
        WriteDeadline   time.Duration           `json:"write_deadline,omitempty"`    // WriteDeadline is the maximum time writes to sockets have to complete
1324
        WriteTimeout    string                  `json:"write_timeout,omitempty"`     // WriteTimeout is the closure policy for write deadline errors
1325
        TLSCertNotAfter time.Time               `json:"tls_cert_not_after,omitzero"` // TLSCertNotAfter is the expiration date of the TLS certificaet
1326
}
1327

1328
// RemoteGatewayOptsVarz contains monitoring remote gateway information
1329
type RemoteGatewayOptsVarz struct {
1330
        Name       string   `json:"name"`                  // Name is the name of the remote gateway
1331
        TLSTimeout float64  `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
1332
        URLs       []string `json:"urls,omitempty"`        // URLs is the list of Gateway URLs
1333
}
1334

1335
// LeafNodeOptsVarz contains monitoring leaf node information
1336
type LeafNodeOptsVarz struct {
1337
        Host              string               `json:"host,omitempty"`                 // Host is the host the server listens on
1338
        Port              int                  `json:"port,omitempty"`                 // Port is the port the server listens on
1339
        AuthTimeout       float64              `json:"auth_timeout,omitempty"`         // AuthTimeout is the time Leafnode connections have to complete authentication
1340
        TLSTimeout        float64              `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1341
        TLSRequired       bool                 `json:"tls_required,omitempty"`         // TLSRequired indicates if TLS is required for connections
1342
        TLSVerify         bool                 `json:"tls_verify,omitempty"`           // TLSVerify indicates if full verification of TLS connections is performed
1343
        Remotes           []RemoteLeafOptsVarz `json:"remotes,omitempty"`              // Remotes is state of configured Leafnode remotes
1344
        TLSOCSPPeerVerify bool                 `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
1345
        WriteDeadline     time.Duration        `json:"write_deadline,omitempty"`       // WriteDeadline is the maximum time writes to sockets have to complete
1346
        WriteTimeout      string               `json:"write_timeout,omitempty"`        // WriteTimeout is the closure policy for write deadline errors
1347
        TLSCertNotAfter   time.Time            `json:"tls_cert_not_after,omitzero"`    // TLSCertNotAfter is the expiration date of the TLS certificate
1348
}
1349

1350
// DenyRules Contains lists of subjects not allowed to be imported/exported
1351
type DenyRules struct {
1352
        Exports []string `json:"exports,omitempty"` // Exports are denied exports
1353
        Imports []string `json:"imports,omitempty"` // Imports are denied imports
1354
}
1355

1356
// RemoteLeafOptsVarz contains monitoring remote leaf node information
1357
type RemoteLeafOptsVarz struct {
1358
        LocalAccount      string     `json:"local_account,omitempty"`        // LocalAccount is the local account this leaf is logged into
1359
        TLSTimeout        float64    `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1360
        URLs              []string   `json:"urls,omitempty"`                 // URLs is the list of URLs for the remote Leafnode connection
1361
        Deny              *DenyRules `json:"deny,omitempty"`                 // Deny is the configured import and exports that the Leafnode may not access
1362
        TLSOCSPPeerVerify bool       `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1363
}
1364

1365
// MQTTOptsVarz contains monitoring MQTT information
1366
type MQTTOptsVarz struct {
1367
        Host              string        `json:"host,omitempty"`                 // Host is the host the server listens on
1368
        Port              int           `json:"port,omitempty"`                 // Port is the port the server listens on
1369
        NoAuthUser        string        `json:"no_auth_user,omitempty"`         // NoAuthUser is the user that will be used for unauthenticated connections
1370
        AuthTimeout       float64       `json:"auth_timeout,omitempty"`         // AuthTimeout is how long authentication has to complete
1371
        TLSMap            bool          `json:"tls_map,omitempty"`              // TLSMap indicates if TLS Mapping is enabled
1372
        TLSTimeout        float64       `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1373
        TLSPinnedCerts    []string      `json:"tls_pinned_certs,omitempty"`     // TLSPinnedCerts is the list of certificates pinned to this connection
1374
        JsDomain          string        `json:"js_domain,omitempty"`            // JsDomain is the JetStream domain used for MQTT state
1375
        AckWait           time.Duration `json:"ack_wait,omitempty"`             // AckWait is how long the internal JetStream state store will allow acks to complete
1376
        MaxAckPending     uint16        `json:"max_ack_pending,omitempty"`      // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
1377
        TLSOCSPPeerVerify bool          `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1378
        TLSCertNotAfter   time.Time     `json:"tls_cert_not_after,omitzero"`    // TLSCertNotAfter is the expiration date of the TLS certificate
1379
}
1380

1381
// WebsocketOptsVarz contains monitoring websocket information
1382
type WebsocketOptsVarz struct {
1383
        Host              string        `json:"host,omitempty"`                 // Host is the host the server listens on
1384
        Port              int           `json:"port,omitempty"`                 // Port is the port the server listens on
1385
        Advertise         string        `json:"advertise,omitempty"`            // Advertise is the connection URL the server advertises
1386
        NoAuthUser        string        `json:"no_auth_user,omitempty"`         // NoAuthUser is the user that will be used for unauthenticated connections
1387
        JWTCookie         string        `json:"jwt_cookie,omitempty"`           // JWTCookie is the name of a cookie the server will read for the connection JWT
1388
        HandshakeTimeout  time.Duration `json:"handshake_timeout,omitempty"`    // HandshakeTimeout is how long the connection has to complete the websocket setup
1389
        AuthTimeout       float64       `json:"auth_timeout,omitempty"`         // AuthTimeout is how long authentication has to complete
1390
        NoTLS             bool          `json:"no_tls,omitempty"`               // NoTLS indicates if TLS is disabled
1391
        TLSMap            bool          `json:"tls_map,omitempty"`              // TLSMap indicates if TLS Mapping is enabled
1392
        TLSPinnedCerts    []string      `json:"tls_pinned_certs,omitempty"`     // TLSPinnedCerts is the list of certificates pinned to this connection
1393
        SameOrigin        bool          `json:"same_origin,omitempty"`          // SameOrigin indicates if same origin connections are allowed
1394
        AllowedOrigins    []string      `json:"allowed_origins,omitempty"`      // AllowedOrigins list of configured trusted origins
1395
        Compression       bool          `json:"compression,omitempty"`          // Compression indicates if compression is supported
1396
        TLSOCSPPeerVerify bool          `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1397
        TLSCertNotAfter   time.Time     `json:"tls_cert_not_after,omitzero"`    // TLSCertNotAfter is the expiration date of the TLS certificate
1398
}
1399

1400
// OCSPResponseCacheVarz contains OCSP response cache information
1401
type OCSPResponseCacheVarz struct {
1402
        Type      string `json:"cache_type,omitempty"`               // Type is the kind of cache being used
1403
        Hits      int64  `json:"cache_hits,omitempty"`               // Hits is how many times the cache was able to answer a request
1404
        Misses    int64  `json:"cache_misses,omitempty"`             // Misses is how many times the cache failed to answer a request
1405
        Responses int64  `json:"cached_responses,omitempty"`         // Responses is how many responses are currently stored in the cache
1406
        Revokes   int64  `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes
1407
        Goods     int64  `json:"cached_good_responses,omitempty"`    // Goods is how many of the stored cache entries are good responses
1408
        Unknowns  int64  `json:"cached_unknown_responses,omitempty"` // Unknowns  is how many of the stored cache entries are unknown responses
1409
}
1410

1411
// ProxiesOptsVarz contains proxies information
1412
type ProxiesOptsVarz struct {
1413
        Trusted []*ProxyOptsVarz `json:"trusted,omitempty"` // Trusted holds a list of trusted proxies
1414
}
1415

1416
// ProxyOptsVarz contains proxy information
1417
type ProxyOptsVarz struct {
1418
        Key string `json:"key"` // Key is the public key of the proxy
1419
}
1420

1421
// VarzOptions are the options passed to Varz().
1422
// Currently, there are no options defined.
1423
type VarzOptions struct{}
1424

1425
// SlowConsumersStats contains information about the slow consumers from different type of connections.
1426
type SlowConsumersStats struct {
1427
        Clients  uint64 `json:"clients"`  // Clients is how many Clients were slow consumers
1428
        Routes   uint64 `json:"routes"`   // Routes is how many Routes were slow consumers
1429
        Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers
1430
        Leafs    uint64 `json:"leafs"`    // Leafs is how many Leafnodes were slow consumers
1431
}
1432

1433
// StaleConnectionStats contains information about the stale connections from different type of connections.
1434
type StaleConnectionStats struct {
1435
        Clients  uint64 `json:"clients"`  // Clients is how many Client connections became stale connections
1436
        Routes   uint64 `json:"routes"`   // Routes is how many Route connections became stale connections
1437
        Gateways uint64 `json:"gateways"` // Gateways is how many Gateway connections became stale connections
1438
        Leafs    uint64 `json:"leafs"`    // Leafs is how many Leafnode connections became stale connections
1439
}
1440

1441
func myUptime(d time.Duration) string {
1,410,126✔
1442
        // Just use total seconds for uptime, and display days / years
1,410,126✔
1443
        tsecs := d / time.Second
1,410,126✔
1444
        tmins := tsecs / 60
1,410,126✔
1445
        thrs := tmins / 60
1,410,126✔
1446
        tdays := thrs / 24
1,410,126✔
1447
        tyrs := tdays / 365
1,410,126✔
1448

1,410,126✔
1449
        if tyrs > 0 {
1,410,127✔
1450
                return fmt.Sprintf("%dy%dd%dh%dm%ds", tyrs, tdays%365, thrs%24, tmins%60, tsecs%60)
1✔
1451
        }
1✔
1452
        if tdays > 0 {
1,410,126✔
1453
                return fmt.Sprintf("%dd%dh%dm%ds", tdays, thrs%24, tmins%60, tsecs%60)
1✔
1454
        }
1✔
1455
        if thrs > 0 {
1,410,125✔
1456
                return fmt.Sprintf("%dh%dm%ds", thrs, tmins%60, tsecs%60)
1✔
1457
        }
1✔
1458
        if tmins > 0 {
1,410,135✔
1459
                return fmt.Sprintf("%dm%ds", tmins, tsecs%60)
12✔
1460
        }
12✔
1461
        return fmt.Sprintf("%ds", tsecs)
1,410,111✔
1462
}
1463

1464
func tlsCertNotAfter(config *tls.Config) time.Time {
30,300✔
1465
        if config == nil || len(config.Certificates) == 0 {
60,563✔
1466
                return time.Time{}
30,263✔
1467
        }
30,263✔
1468
        cert := config.Certificates[0]
37✔
1469
        leaf := cert.Leaf
37✔
1470
        if leaf == nil {
37✔
1471
                var err error
×
1472
                leaf, err = x509.ParseCertificate(cert.Certificate[0])
×
1473
                if err != nil {
×
1474
                        return time.Time{}
×
1475
                }
×
1476
        }
1477
        return leaf.NotAfter
37✔
1478
}
1479

1480
// HandleRoot will show basic info and links to others handlers.
1481
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
4✔
1482
        // This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
4✔
1483
        if r.URL.Path != s.httpBasePath {
4✔
1484
                http.NotFound(w, r)
×
1485
                return
×
1486
        }
×
1487
        s.mu.Lock()
4✔
1488
        s.httpReqStats[RootPath]++
4✔
1489
        s.mu.Unlock()
4✔
1490

4✔
1491
        // Calculate source url. If git set go directly to that tag, otherwise just main.
4✔
1492
        var srcUrl string
4✔
1493
        if gitCommit == _EMPTY_ {
8✔
1494
                srcUrl = "https://github.com/nats-io/nats-server"
4✔
1495
        } else if serverVersion != _EMPTY_ {
4✔
1496
                srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", serverVersion)
×
1497
        } else {
×
1498
                srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", gitCommit)
×
1499
        }
×
1500

1501
        fmt.Fprintf(w, `<html lang="en">
4✔
1502
        <head>
4✔
1503
        <link rel="shortcut icon" href="https://nats.io/favicon.ico">
4✔
1504
        <style type="text/css">
4✔
1505
                body { font-family: ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,"Helvetica Neue",Arial,"Noto Sans",sans-serif; font-size: 18; font-weight: light-bold; margin-left: 32px }
4✔
1506
                a { display:block; margin-left: 7px; padding-bottom: 6px; color: rgb(72 72 92); text-decoration: none }
4✔
1507
                a:hover { font-weight: 600; color: rgb(59 50 202) }
4✔
1508
                a.help { display:inline; font-weight: 600; color: rgb(59 50 202); font-size: 20}
4✔
1509
                a.last { padding-bottom: 16px }
4✔
1510
                a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem }
4✔
1511
                a.version:hover { color: rgb(22 22 32) }
4✔
1512
                .endpoint { font-size: 12px; color: #999; font-family: monospace; display: none }
4✔
1513
                a:hover .endpoint { display: inline }
4✔
1514

4✔
1515
        </style>
4✔
1516
        </head>
4✔
1517
        <body>
4✔
1518
   <svg xmlns="http://www.w3.org/2000/svg" role="img" width="325" height="110" viewBox="-4.14 -3.89 436.28 119.03"><style>.st1{fill:#fff}.st2{fill:#34a574}</style><path fill="#27aae1" d="M4.3 84.6h42.2L70.7 107V84.6H103v-80H4.3v80zm15.9-61.3h18.5l35.6 33.2V23.3h11.8v42.9H68.2L32 32.4v33.8H20.2V23.3z"/><path d="M32 32.4l36.2 33.8h17.9V23.3H74.3v33.2L38.7 23.3H20.2v42.9H32z" class="st1"/><path d="M159.8 30.7L147 49h25.6z" class="st2"/><path d="M111.3 84.6H210v-80h-98.7v80zm41-61.5H168l30.8 43.2h-14.1l-5.8-8.3h-38.1l-5.8 8.3h-13.5l30.8-43.2z" class="st2"/><path d="M140.8 57.9h38.1l5.8 8.3h14.1L168 23.1h-15.7l-30.8 43.2H135l5.8-8.4zm19-27.2L172.6 49H147l12.8-18.3z" class="st1"/><path fill="#375c93" d="M218.3 84.6H317v-80h-98.7v80zm15.5-61.3h66.7V33h-27.2v33.2h-12.2V33h-27.3v-9.7z"/><path d="M261.1 66.2h12.2V33h27.2v-9.7h-66.7V33h27.3z" class="st1"/><path fill="#8dc63f" d="M325.3 4.6v80H424v-80h-98.7zm76.5 56.7c-3.2 3.2-10.2 5.7-26.8 5.7-12.3 0-24.1-1.9-30.7-4.7v-10c6.3 2.8 20.1 5.5 30.7 5.5 9.3 0 15.8-.3 17.5-2.1.6-.6.7-1.3.7-2 0-.8-.2-1.3-.7-1.8-1-1-2.6-1.7-17.4-2.1-15.7-.4-23.4-2-27-5.6-1.7-1.7-2.6-4.4-2.6-7.5 0-3.3.6-6.2 3.3-8.9 3.6-3.6 10.7-5.3 25.1-5.3 10.8 0 21.6 1.7 27.3 4v10.1c-6.5-2.8-17.8-4.8-27.2-4.8-10.4 0-14.8.6-16.2 2-.5.5-.8 1.1-.8 1.9 0 .9.2 1.5.7 2 1.3 1.3 6.1 1.7 17.3 1.9 16.4.4 23.5 1.8 27 5.2 1.8 1.8 2.8 4.7 2.8 7.7.1 3.2-.6 6.4-3 8.8z"/><path d="M375.2 39.5c-11.2-.2-16-.6-17.3-1.9-.5-.5-.7-1.1-.7-2 0-.8.3-1.4.8-1.9 1.3-1.3 5.8-2 16.2-2 9.4 0 20.7 2 27.2 4.8v-10c-5.7-2.3-16.6-4-27.3-4-14.5 0-21.6 1.8-25.1 5.3-2.7 2.7-3.3 5.6-3.3 8.9 0 3.1 1 5.8 2.6 7.5 3.6 3.6 11.3 5.2 27 5.6 14.8.4 16.4 1.1 17.4 2.1.5.5.7 1 .7 1.8 0 .7-.1 1.3-.7 2-1.8 1.8-8.3 2.1-17.5 2.1-10.6 0-24.3-2.6-30.7-5.5v10.1c6.6 2.8 18.4 4.7 30.7 4.7 16.6 0 23.6-2.5 26.8-5.7 2.4-2.4 3.1-5.6 3.1-8.9 0-3.1-1-5.9-2.8-7.7-3.6-3.5-10.7-4.9-27.1-5.3z" class="st1"/></svg>
4✔
1519

4✔
1520
        <a href=%s class='version'>v%s</a>
4✔
1521

4✔
1522
        </div>
4✔
1523
        <br/>
4✔
1524
        <a href=.%s>General<span class="endpoint"> %s</span></a>
4✔
1525
        <a href=.%s>JetStream<span class="endpoint"> %s</span></a>
4✔
1526
        <a href=.%s>Connections<span class="endpoint"> %s</span></a>
4✔
1527
        <a href=.%s>Accounts<span class="endpoint"> %s</span></a>
4✔
1528
        <a href=.%s>Account Stats<span class="endpoint"> %s</span></a>
4✔
1529
        <a href=.%s>Subscriptions<span class="endpoint"> %s</span></a>
4✔
1530
        <a href=.%s>Routes<span class="endpoint"> %s</span></a>
4✔
1531
        <a href=.%s>LeafNodes<span class="endpoint"> %s</span></a>
4✔
1532
        <a href=.%s>Gateways<span class="endpoint"> %s</span></a>
4✔
1533
        <a href=.%s>Raft Groups<span class="endpoint"> %s</span></a>
4✔
1534
        <a href=.%s>Health Probe<span class="endpoint"> %s</span></a>
4✔
1535
        <a href=.%s class=last>Expvar<span class="endpoint"> %s</span></a>
4✔
1536
    <a href=https://docs.nats.io/running-a-nats-service/nats_admin/monitoring class="help">Help</a>
4✔
1537
  </body>
4✔
1538
</html>`,
4✔
1539
                srcUrl,
4✔
1540
                VERSION,
4✔
1541
                s.basePath(VarzPath), VarzPath,
4✔
1542
                s.basePath(JszPath), JszPath,
4✔
1543
                s.basePath(ConnzPath), ConnzPath,
4✔
1544
                s.basePath(AccountzPath), AccountzPath,
4✔
1545
                s.basePath(AccountStatzPath), AccountStatzPath,
4✔
1546
                s.basePath(SubszPath), SubszPath,
4✔
1547
                s.basePath(RoutezPath), RoutezPath,
4✔
1548
                s.basePath(LeafzPath), LeafzPath,
4✔
1549
                s.basePath(GatewayzPath), GatewayzPath,
4✔
1550
                s.basePath(RaftzPath), RaftzPath,
4✔
1551
                s.basePath(HealthzPath), HealthzPath,
4✔
1552
                s.basePath(ExpvarzPath), ExpvarzPath,
4✔
1553
        )
4✔
1554
}
1555

1556
func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
46✔
1557
        if doConfig {
69✔
1558
                js.mu.RLock()
23✔
1559
                // We want to snapshot the config since it will then be available outside
23✔
1560
                // of the js lock. So make a copy first, then point to this copy.
23✔
1561
                cfg := js.config
23✔
1562
                v.Config = &cfg
23✔
1563
                js.mu.RUnlock()
23✔
1564
        }
23✔
1565
        v.Stats = js.usageStats()
46✔
1566
        v.Limits = &s.getOpts().JetStreamLimits
46✔
1567
        if mg := js.getMetaGroup(); mg != nil {
60✔
1568
                if ci := s.raftNodeToClusterInfo(mg); ci != nil {
28✔
1569
                        v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
14✔
1570
                        if ci.Leader == s.info.Name {
16✔
1571
                                v.Meta.Replicas = ci.Replicas
2✔
1572
                        }
2✔
1573
                        if ipq := s.jsAPIRoutedReqs; ipq != nil {
28✔
1574
                                v.Meta.PendingRequests = ipq.len()
14✔
1575
                        }
14✔
1576
                        if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
28✔
1577
                                v.Meta.PendingInfos = ipq.len()
14✔
1578
                        }
14✔
1579
                        v.Meta.Pending = v.Meta.PendingRequests + v.Meta.PendingInfos
14✔
1580
                        v.Meta.Snapshot = s.metaClusterSnapshotStats(js, mg)
14✔
1581
                }
1582
        }
1583
}
1584

1585
// Varz returns a Varz struct containing the server information.
1586
func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
5,000✔
1587
        var rss, vss int64
5,000✔
1588
        var pcpu float64
5,000✔
1589

5,000✔
1590
        // We want to do that outside of the lock.
5,000✔
1591
        pse.ProcUsage(&pcpu, &rss, &vss)
5,000✔
1592

5,000✔
1593
        s.mu.RLock()
5,000✔
1594
        // We need to create a new instance of Varz (with no reference
5,000✔
1595
        // whatsoever to anything stored in the server) since the user
5,000✔
1596
        // has access to the returned value.
5,000✔
1597
        v := s.createVarz(pcpu, rss)
5,000✔
1598
        s.mu.RUnlock()
5,000✔
1599

5,000✔
1600
        if js := s.getJetStream(); js != nil {
5,017✔
1601
                s.updateJszVarz(js, &v.JetStream, true)
17✔
1602
        }
17✔
1603

1604
        return v, nil
5,000✔
1605
}
1606

1607
// Returns a Varz instance.
1608
// Server lock is held on entry.
1609
func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
5,047✔
1610
        info := s.info
5,047✔
1611
        opts := s.getOpts()
5,047✔
1612
        c := &opts.Cluster
5,047✔
1613
        gw := &opts.Gateway
5,047✔
1614
        ln := &opts.LeafNode
5,047✔
1615
        mqtt := &opts.MQTT
5,047✔
1616
        ws := &opts.Websocket
5,047✔
1617
        clustTlsReq := c.TLSConfig != nil
5,047✔
1618
        gatewayTlsReq := gw.TLSConfig != nil
5,047✔
1619
        leafTlsReq := ln.TLSConfig != nil
5,047✔
1620
        leafTlsVerify := leafTlsReq && ln.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
5,047✔
1621
        leafTlsOCSPPeerVerify := s.ocspPeerVerify && leafTlsReq && ln.tlsConfigOpts.OCSPPeerConfig != nil && ln.tlsConfigOpts.OCSPPeerConfig.Verify
5,047✔
1622
        mqttTlsOCSPPeerVerify := s.ocspPeerVerify && mqtt.TLSConfig != nil && mqtt.tlsConfigOpts.OCSPPeerConfig != nil && mqtt.tlsConfigOpts.OCSPPeerConfig.Verify
5,047✔
1623
        wsTlsOCSPPeerVerify := s.ocspPeerVerify && ws.TLSConfig != nil && ws.tlsConfigOpts.OCSPPeerConfig != nil && ws.tlsConfigOpts.OCSPPeerConfig.Verify
5,047✔
1624
        varz := &Varz{
5,047✔
1625
                ID:           info.ID,
5,047✔
1626
                Version:      info.Version,
5,047✔
1627
                Proto:        info.Proto,
5,047✔
1628
                GitCommit:    info.GitCommit,
5,047✔
1629
                GoVersion:    info.GoVersion,
5,047✔
1630
                Name:         info.Name,
5,047✔
1631
                Host:         info.Host,
5,047✔
1632
                Port:         info.Port,
5,047✔
1633
                IP:           info.IP,
5,047✔
1634
                HTTPHost:     opts.HTTPHost,
5,047✔
1635
                HTTPPort:     opts.HTTPPort,
5,047✔
1636
                HTTPBasePath: opts.HTTPBasePath,
5,047✔
1637
                HTTPSPort:    opts.HTTPSPort,
5,047✔
1638
                Cluster: ClusterOptsVarz{
5,047✔
1639
                        Name:          info.Cluster,
5,047✔
1640
                        Host:          c.Host,
5,047✔
1641
                        Port:          c.Port,
5,047✔
1642
                        AuthTimeout:   c.AuthTimeout,
5,047✔
1643
                        TLSTimeout:    c.TLSTimeout,
5,047✔
1644
                        TLSRequired:   clustTlsReq,
5,047✔
1645
                        TLSVerify:     clustTlsReq,
5,047✔
1646
                        PoolSize:      opts.Cluster.PoolSize,
5,047✔
1647
                        WriteDeadline: opts.Cluster.WriteDeadline,
5,047✔
1648
                        WriteTimeout:  opts.Cluster.WriteTimeout.String(),
5,047✔
1649
                },
5,047✔
1650
                Gateway: GatewayOptsVarz{
5,047✔
1651
                        Name:           gw.Name,
5,047✔
1652
                        Host:           gw.Host,
5,047✔
1653
                        Port:           gw.Port,
5,047✔
1654
                        AuthTimeout:    gw.AuthTimeout,
5,047✔
1655
                        TLSTimeout:     gw.TLSTimeout,
5,047✔
1656
                        TLSRequired:    gatewayTlsReq,
5,047✔
1657
                        TLSVerify:      gatewayTlsReq,
5,047✔
1658
                        Advertise:      gw.Advertise,
5,047✔
1659
                        ConnectRetries: gw.ConnectRetries,
5,047✔
1660
                        Gateways:       []RemoteGatewayOptsVarz{},
5,047✔
1661
                        RejectUnknown:  gw.RejectUnknown,
5,047✔
1662
                        WriteDeadline:  opts.Cluster.WriteDeadline,
5,047✔
1663
                        WriteTimeout:   opts.Cluster.WriteTimeout.String(),
5,047✔
1664
                },
5,047✔
1665
                LeafNode: LeafNodeOptsVarz{
5,047✔
1666
                        Host:              ln.Host,
5,047✔
1667
                        Port:              ln.Port,
5,047✔
1668
                        AuthTimeout:       ln.AuthTimeout,
5,047✔
1669
                        TLSTimeout:        ln.TLSTimeout,
5,047✔
1670
                        TLSRequired:       leafTlsReq,
5,047✔
1671
                        TLSVerify:         leafTlsVerify,
5,047✔
1672
                        TLSOCSPPeerVerify: leafTlsOCSPPeerVerify,
5,047✔
1673
                        Remotes:           []RemoteLeafOptsVarz{},
5,047✔
1674
                        WriteDeadline:     opts.Cluster.WriteDeadline,
5,047✔
1675
                        WriteTimeout:      opts.Cluster.WriteTimeout.String(),
5,047✔
1676
                },
5,047✔
1677
                MQTT: MQTTOptsVarz{
5,047✔
1678
                        Host:              mqtt.Host,
5,047✔
1679
                        Port:              mqtt.Port,
5,047✔
1680
                        NoAuthUser:        mqtt.NoAuthUser,
5,047✔
1681
                        AuthTimeout:       mqtt.AuthTimeout,
5,047✔
1682
                        TLSMap:            mqtt.TLSMap,
5,047✔
1683
                        TLSTimeout:        mqtt.TLSTimeout,
5,047✔
1684
                        JsDomain:          mqtt.JsDomain,
5,047✔
1685
                        AckWait:           mqtt.AckWait,
5,047✔
1686
                        MaxAckPending:     mqtt.MaxAckPending,
5,047✔
1687
                        TLSOCSPPeerVerify: mqttTlsOCSPPeerVerify,
5,047✔
1688
                },
5,047✔
1689
                Websocket: WebsocketOptsVarz{
5,047✔
1690
                        Host:              ws.Host,
5,047✔
1691
                        Port:              ws.Port,
5,047✔
1692
                        Advertise:         ws.Advertise,
5,047✔
1693
                        NoAuthUser:        ws.NoAuthUser,
5,047✔
1694
                        JWTCookie:         ws.JWTCookie,
5,047✔
1695
                        AuthTimeout:       ws.AuthTimeout,
5,047✔
1696
                        NoTLS:             ws.NoTLS,
5,047✔
1697
                        TLSMap:            ws.TLSMap,
5,047✔
1698
                        SameOrigin:        ws.SameOrigin,
5,047✔
1699
                        AllowedOrigins:    copyStrings(ws.AllowedOrigins),
5,047✔
1700
                        Compression:       ws.Compression,
5,047✔
1701
                        HandshakeTimeout:  ws.HandshakeTimeout,
5,047✔
1702
                        TLSOCSPPeerVerify: wsTlsOCSPPeerVerify,
5,047✔
1703
                },
5,047✔
1704
                Start:                 s.start.UTC(),
5,047✔
1705
                MaxSubs:               opts.MaxSubs,
5,047✔
1706
                Cores:                 runtime.NumCPU(),
5,047✔
1707
                MaxProcs:              runtime.GOMAXPROCS(0),
5,047✔
1708
                TrustedOperatorsJwt:   opts.operatorJWT,
5,047✔
1709
                TrustedOperatorsClaim: opts.TrustedOperators,
5,047✔
1710
        }
5,047✔
1711
        if mm := debug.SetMemoryLimit(-1); mm < math.MaxInt64 {
5,047✔
1712
                varz.MemLimit = mm
×
1713
        }
×
1714
        // If this is a leaf without cluster, reset the cluster name (that is otherwise
1715
        // set to the server name).
1716
        if s.leafNoCluster {
5,053✔
1717
                varz.Cluster.Name = _EMPTY_
6✔
1718
        }
6✔
1719
        if len(opts.Routes) > 0 {
5,092✔
1720
                varz.Cluster.URLs = urlsToStrings(opts.Routes)
45✔
1721
        }
45✔
1722
        if l := len(gw.Gateways); l > 0 {
5,060✔
1723
                rgwa := make([]RemoteGatewayOptsVarz, l)
13✔
1724
                for i, r := range gw.Gateways {
26✔
1725
                        rgwa[i] = RemoteGatewayOptsVarz{
13✔
1726
                                Name:       r.Name,
13✔
1727
                                TLSTimeout: r.TLSTimeout,
13✔
1728
                        }
13✔
1729
                }
13✔
1730
                varz.Gateway.Gateways = rgwa
13✔
1731
        }
1732
        if l := len(ln.Remotes); l > 0 {
5,053✔
1733
                rlna := make([]RemoteLeafOptsVarz, l)
6✔
1734
                for i, r := range ln.Remotes {
15✔
1735
                        var deny *DenyRules
9✔
1736
                        if len(r.DenyImports) > 0 || len(r.DenyExports) > 0 {
9✔
1737
                                deny = &DenyRules{
×
1738
                                        Imports: r.DenyImports,
×
1739
                                        Exports: r.DenyExports,
×
1740
                                }
×
1741
                        }
×
1742
                        remoteTlsOCSPPeerVerify := s.ocspPeerVerify && r.tlsConfigOpts != nil && r.tlsConfigOpts.OCSPPeerConfig != nil && r.tlsConfigOpts.OCSPPeerConfig.Verify
9✔
1743

9✔
1744
                        rlna[i] = RemoteLeafOptsVarz{
9✔
1745
                                LocalAccount:      r.LocalAccount,
9✔
1746
                                URLs:              urlsToStrings(r.URLs),
9✔
1747
                                TLSTimeout:        r.TLSTimeout,
9✔
1748
                                Deny:              deny,
9✔
1749
                                TLSOCSPPeerVerify: remoteTlsOCSPPeerVerify,
9✔
1750
                        }
9✔
1751
                }
1752
                varz.LeafNode.Remotes = rlna
6✔
1753
        }
1754

1755
        // Finish setting it up with fields that can be updated during
1756
        // configuration reload and runtime.
1757
        s.updateVarzConfigReloadableFields(varz)
5,047✔
1758
        s.updateVarzRuntimeFields(varz, true, pcpu, rss)
5,047✔
1759
        return varz
5,047✔
1760
}
1761

1762
func urlsToStrings(urls []*url.URL) []string {
56✔
1763
        sURLs := make([]string, len(urls))
56✔
1764
        for i, u := range urls {
145✔
1765
                sURLs[i] = u.Host
89✔
1766
        }
89✔
1767
        return sURLs
56✔
1768
}
1769

1770
// Invoked during configuration reload once options have possibly be changed
1771
// and config load time has been set. If s.varz has not been initialized yet
1772
// (because no pooling of /varz has been made), this function does nothing.
1773
// Server lock is held on entry.
1774
func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
6,174✔
1775
        if v == nil {
7,298✔
1776
                return
1,124✔
1777
        }
1,124✔
1778
        opts := s.getOpts()
5,050✔
1779
        info := &s.info
5,050✔
1780
        v.AuthRequired = info.AuthRequired
5,050✔
1781
        v.TLSRequired = info.TLSRequired
5,050✔
1782
        v.TLSVerify = info.TLSVerify
5,050✔
1783
        v.MaxConn = opts.MaxConn
5,050✔
1784
        v.PingInterval = opts.PingInterval
5,050✔
1785
        v.MaxPingsOut = opts.MaxPingsOut
5,050✔
1786
        v.AuthTimeout = opts.AuthTimeout
5,050✔
1787
        v.MaxControlLine = opts.MaxControlLine
5,050✔
1788
        v.MaxPayload = int(opts.MaxPayload)
5,050✔
1789
        v.MaxPending = opts.MaxPending
5,050✔
1790
        v.TLSTimeout = opts.TLSTimeout
5,050✔
1791
        v.WriteDeadline = opts.WriteDeadline
5,050✔
1792
        v.WriteTimeout = opts.WriteTimeout.String()
5,050✔
1793
        v.ConfigLoadTime = s.configTime.UTC()
5,050✔
1794
        v.ConfigDigest = opts.configDigest
5,050✔
1795
        v.Tags = opts.Tags
5,050✔
1796
        v.Metadata = opts.Metadata
5,050✔
1797
        v.FeatureFlags = opts.getMergedFeatureFlags()
5,050✔
1798
        // Update route URLs if applicable
5,050✔
1799
        if s.varzUpdateRouteURLs {
5,052✔
1800
                v.Cluster.URLs = urlsToStrings(opts.Routes)
2✔
1801
                s.varzUpdateRouteURLs = false
2✔
1802
        }
2✔
1803
        if s.sys != nil && s.sys.account != nil {
5,144✔
1804
                v.SystemAccount = s.sys.account.GetName()
94✔
1805
        }
94✔
1806
        v.MQTT.TLSPinnedCerts = getPinnedCertsAsSlice(opts.MQTT.TLSPinnedCerts)
5,050✔
1807
        v.Websocket.TLSPinnedCerts = getPinnedCertsAsSlice(opts.Websocket.TLSPinnedCerts)
5,050✔
1808

5,050✔
1809
        v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify
5,050✔
1810

5,050✔
1811
        v.TLSCertNotAfter = tlsCertNotAfter(opts.TLSConfig)
5,050✔
1812
        v.Cluster.TLSCertNotAfter = tlsCertNotAfter(opts.Cluster.TLSConfig)
5,050✔
1813
        v.Gateway.TLSCertNotAfter = tlsCertNotAfter(opts.Gateway.TLSConfig)
5,050✔
1814
        v.LeafNode.TLSCertNotAfter = tlsCertNotAfter(opts.LeafNode.TLSConfig)
5,050✔
1815
        v.MQTT.TLSCertNotAfter = tlsCertNotAfter(opts.MQTT.TLSConfig)
5,050✔
1816
        v.Websocket.TLSCertNotAfter = tlsCertNotAfter(opts.Websocket.TLSConfig)
5,050✔
1817

5,050✔
1818
        if opts.Proxies != nil {
5,052✔
1819
                if v.Proxies == nil {
4✔
1820
                        v.Proxies = &ProxiesOptsVarz{}
2✔
1821
                }
2✔
1822
                trusted := make([]*ProxyOptsVarz, 0, len(opts.Proxies.Trusted))
2✔
1823
                for _, t := range opts.Proxies.Trusted {
6✔
1824
                        trusted = append(trusted, &ProxyOptsVarz{Key: t.Key})
4✔
1825
                }
4✔
1826
                v.Proxies.Trusted = trusted
2✔
1827
        } else {
5,048✔
1828
                v.Proxies = nil
5,048✔
1829
        }
5,048✔
1830
}
1831

1832
func getPinnedCertsAsSlice(certs PinnedCertSet) []string {
10,100✔
1833
        if len(certs) == 0 {
20,196✔
1834
                return nil
10,096✔
1835
        }
10,096✔
1836
        res := make([]string, 0, len(certs))
4✔
1837
        for cn := range certs {
8✔
1838
                res = append(res, cn)
4✔
1839
        }
4✔
1840
        return res
4✔
1841
}
1842

1843
// Updates the runtime Varz fields, that is, fields that change during
1844
// runtime and that should be updated any time Varz() or polling of /varz
1845
// is done.
1846
// Server lock is held on entry.
1847
func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64, rss int64) {
10,179✔
1848
        v.Now = time.Now().UTC()
10,179✔
1849
        v.Uptime = myUptime(time.Since(s.start))
10,179✔
1850
        v.Mem = rss
10,179✔
1851
        v.CPU = pcpu
10,179✔
1852
        if l := len(s.info.ClientConnectURLs); l > 0 {
10,265✔
1853
                v.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
86✔
1854
        }
86✔
1855
        if l := len(s.info.WSConnectURLs); l > 0 {
10,179✔
1856
                v.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
×
1857
        }
×
1858
        v.Connections = len(s.clients)
10,179✔
1859
        v.TotalConnections = s.totalClients
10,179✔
1860
        v.Routes = s.numRoutes()
10,179✔
1861
        v.Remotes = s.numRemotes()
10,179✔
1862
        v.Leafs = len(s.leafs)
10,179✔
1863
        v.InMsgs = atomic.LoadInt64(&s.inMsgs)
10,179✔
1864
        v.InBytes = atomic.LoadInt64(&s.inBytes)
10,179✔
1865
        v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
10,179✔
1866
        v.OutBytes = atomic.LoadInt64(&s.outBytes)
10,179✔
1867
        v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
10,179✔
1868
        v.StalledClients = atomic.LoadInt64(&s.stalls)
10,179✔
1869
        v.SlowConsumersStats = &SlowConsumersStats{
10,179✔
1870
                Clients:  s.NumSlowConsumersClients(),
10,179✔
1871
                Routes:   s.NumSlowConsumersRoutes(),
10,179✔
1872
                Gateways: s.NumSlowConsumersGateways(),
10,179✔
1873
                Leafs:    s.NumSlowConsumersLeafs(),
10,179✔
1874
        }
10,179✔
1875
        v.StaleConnections = atomic.LoadInt64(&s.staleConnections)
10,179✔
1876
        v.StaleConnectionStats = &StaleConnectionStats{
10,179✔
1877
                Clients:  s.NumStaleConnectionsClients(),
10,179✔
1878
                Routes:   s.NumStaleConnectionsRoutes(),
10,179✔
1879
                Gateways: s.NumStaleConnectionsGateways(),
10,179✔
1880
                Leafs:    s.NumStaleConnectionsLeafs(),
10,179✔
1881
        }
10,179✔
1882
        v.PinnedAccountFail = atomic.LoadUint64(&s.pinnedAccFail)
10,179✔
1883

10,179✔
1884
        // Make sure to reset in case we are re-using.
10,179✔
1885
        v.Subscriptions = 0
10,179✔
1886
        s.accounts.Range(func(k, val any) bool {
20,524✔
1887
                acc := val.(*Account)
10,345✔
1888
                v.Subscriptions += acc.sl.Count()
10,345✔
1889
                return true
10,345✔
1890
        })
10,345✔
1891

1892
        v.HTTPReqStats = make(map[string]uint64, len(s.httpReqStats))
10,179✔
1893
        for key, val := range s.httpReqStats {
20,732✔
1894
                v.HTTPReqStats[key] = val
10,553✔
1895
        }
10,553✔
1896

1897
        // Update Gateway remote urls if applicable
1898
        gw := s.gateway
10,179✔
1899
        gw.RLock()
10,179✔
1900
        if gw.enabled {
10,203✔
1901
                for i := 0; i < len(v.Gateway.Gateways); i++ {
40✔
1902
                        g := &v.Gateway.Gateways[i]
16✔
1903
                        rgw := gw.remotes[g.Name]
16✔
1904
                        if rgw != nil {
30✔
1905
                                rgw.RLock()
14✔
1906
                                // forceUpdate is needed if user calls Varz() programmatically,
14✔
1907
                                // since we need to create a new instance every time and the
14✔
1908
                                // gateway's varzUpdateURLs may have been set to false after
14✔
1909
                                // a web /varz inspection.
14✔
1910
                                if forceUpdate || rgw.varzUpdateURLs {
27✔
1911
                                        // Make reuse of backend array
13✔
1912
                                        g.URLs = g.URLs[:0]
13✔
1913
                                        // rgw.urls is a map[string]*url.URL where the key is
13✔
1914
                                        // already in the right format (host:port, without any
13✔
1915
                                        // user info present).
13✔
1916
                                        for u := range rgw.urls {
31✔
1917
                                                g.URLs = append(g.URLs, u)
18✔
1918
                                        }
18✔
1919
                                        rgw.varzUpdateURLs = false
13✔
1920
                                }
1921
                                rgw.RUnlock()
14✔
1922
                        } else if g.Name == gw.name && len(gw.ownCfgURLs) > 0 {
4✔
1923
                                // This is a remote that correspond to this very same server.
2✔
1924
                                // We report the URLs that were configured (if any).
2✔
1925
                                // Since we don't support changes to the gateway configuration
2✔
1926
                                // at this time, we could do this only if g.URLs has not been already
2✔
1927
                                // set, but let's do it regardless in case we add support for
2✔
1928
                                // gateway config reload.
2✔
1929
                                g.URLs = g.URLs[:0]
2✔
1930
                                g.URLs = append(g.URLs, gw.ownCfgURLs...)
2✔
1931
                        }
2✔
1932
                }
1933
        }
1934
        gw.RUnlock()
10,179✔
1935

10,179✔
1936
        if s.ocsprc != nil && s.ocsprc.Type() != "none" {
10,194✔
1937
                stats := s.ocsprc.Stats()
15✔
1938
                if stats != nil {
30✔
1939
                        v.OCSPResponseCache = &OCSPResponseCacheVarz{
15✔
1940
                                s.ocsprc.Type(),
15✔
1941
                                stats.Hits,
15✔
1942
                                stats.Misses,
15✔
1943
                                stats.Responses,
15✔
1944
                                stats.Revokes,
15✔
1945
                                stats.Goods,
15✔
1946
                                stats.Unknowns,
15✔
1947
                        }
15✔
1948
                }
15✔
1949
        }
1950
}
1951

1952
// HandleVarz will process HTTP requests for server information.
1953
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
5,179✔
1954
        var rss, vss int64
5,179✔
1955
        var pcpu float64
5,179✔
1956

5,179✔
1957
        // We want to do that outside of the lock.
5,179✔
1958
        pse.ProcUsage(&pcpu, &rss, &vss)
5,179✔
1959

5,179✔
1960
        // In response to http requests, we want to minimize mem copies
5,179✔
1961
        // so we use an object stored in the server. Creating/collecting
5,179✔
1962
        // server metrics is done under server lock, but we don't want
5,179✔
1963
        // to marshal under that lock. Still, we need to prevent concurrent
5,179✔
1964
        // http requests to /varz to update s.varz while marshal is
5,179✔
1965
        // happening, so we need a new lock that serialize those http
5,179✔
1966
        // requests and include marshaling.
5,179✔
1967
        s.varzMu.Lock()
5,179✔
1968

5,179✔
1969
        // Use server lock to create/update the server's varz object.
5,179✔
1970
        s.mu.Lock()
5,179✔
1971
        var created bool
5,179✔
1972
        s.httpReqStats[VarzPath]++
5,179✔
1973
        if s.varz == nil {
5,226✔
1974
                s.varz = s.createVarz(pcpu, rss)
47✔
1975
                created = true
47✔
1976
        } else {
5,179✔
1977
                s.updateVarzRuntimeFields(s.varz, false, pcpu, rss)
5,132✔
1978
        }
5,132✔
1979
        s.mu.Unlock()
5,179✔
1980
        // Since locking is jetStream -> Server, need to update jetstream
5,179✔
1981
        // varz outside of server lock.
5,179✔
1982

5,179✔
1983
        if js := s.getJetStream(); js != nil {
5,208✔
1984
                var v JetStreamVarz
29✔
1985
                // Work on stack variable
29✔
1986
                s.updateJszVarz(js, &v, created)
29✔
1987
                // Now update server's varz
29✔
1988
                s.mu.RLock()
29✔
1989
                sv := &s.varz.JetStream
29✔
1990
                if created {
35✔
1991
                        sv.Config = v.Config
6✔
1992
                }
6✔
1993
                sv.Stats = v.Stats
29✔
1994
                sv.Meta = v.Meta
29✔
1995
                sv.Limits = v.Limits
29✔
1996
                s.mu.RUnlock()
29✔
1997
        }
1998

1999
        // Do the marshaling outside of server lock, but under varzMu lock.
2000
        b, err := json.MarshalIndent(s.varz, "", "  ")
5,179✔
2001
        s.varzMu.Unlock()
5,179✔
2002

5,179✔
2003
        if err != nil {
5,179✔
2004
                s.Errorf("Error marshaling response to /varz request: %v", err)
×
2005
        }
×
2006

2007
        // Handle response
2008
        ResponseHandler(w, r, b)
5,179✔
2009
}
2010

2011
// GatewayzOptions are the options passed to Gatewayz()
2012
type GatewayzOptions struct {
2013
        // Name will output only remote gateways with this name
2014
        Name string `json:"name"`
2015

2016
        // Accounts indicates if accounts with its interest should be included in the results.
2017
        Accounts bool `json:"accounts"`
2018

2019
        // AccountName will limit the list of accounts to that account name (makes Accounts implicit)
2020
        AccountName string `json:"account_name"`
2021

2022
        // AccountSubscriptions indicates if subscriptions should be included in the results.
2023
        // Note: This is used only if `Accounts` or `AccountName` are specified.
2024
        AccountSubscriptions bool `json:"subscriptions"`
2025

2026
        // AccountSubscriptionsDetail indicates if subscription details should be included in the results.
2027
        // Note: This is used only if `Accounts` or `AccountName` are specified.
2028
        AccountSubscriptionsDetail bool `json:"subscriptions_detail"`
2029
}
2030

2031
// Gatewayz represents detailed information on Gateways
2032
type Gatewayz struct {
2033
        ID               string                       `json:"server_id"`
2034
        Now              time.Time                    `json:"now"`
2035
        Name             string                       `json:"name,omitempty"`
2036
        Host             string                       `json:"host,omitempty"`
2037
        Port             int                          `json:"port,omitempty"`
2038
        OutboundGateways map[string]*RemoteGatewayz   `json:"outbound_gateways"`
2039
        InboundGateways  map[string][]*RemoteGatewayz `json:"inbound_gateways"`
2040
}
2041

2042
// RemoteGatewayz represents information about an outbound connection to a gateway
2043
type RemoteGatewayz struct {
2044
        IsConfigured bool               `json:"configured"`
2045
        Connection   *ConnInfo          `json:"connection,omitempty"`
2046
        Accounts     []*AccountGatewayz `json:"accounts,omitempty"`
2047
}
2048

2049
// AccountGatewayz represents interest mode for this account
2050
type AccountGatewayz struct {
2051
        Name                  string      `json:"name"`
2052
        InterestMode          string      `json:"interest_mode"`
2053
        NoInterestCount       int         `json:"no_interest_count,omitempty"`
2054
        InterestOnlyThreshold int         `json:"interest_only_threshold,omitempty"`
2055
        TotalSubscriptions    int         `json:"num_subs,omitempty"`
2056
        NumQueueSubscriptions int         `json:"num_queue_subs,omitempty"`
2057
        Subs                  []string    `json:"subscriptions_list,omitempty"`
2058
        SubsDetail            []SubDetail `json:"subscriptions_list_detail,omitempty"`
2059
}
2060

2061
// Gatewayz returns a Gatewayz struct containing information about gateways.
2062
func (s *Server) Gatewayz(opts *GatewayzOptions) (*Gatewayz, error) {
45✔
2063
        srvID := s.ID()
45✔
2064
        now := time.Now().UTC()
45✔
2065
        gw := s.gateway
45✔
2066
        gw.RLock()
45✔
2067
        if !gw.enabled || gw.info == nil {
48✔
2068
                gw.RUnlock()
3✔
2069
                gwz := &Gatewayz{
3✔
2070
                        ID:               srvID,
3✔
2071
                        Now:              now,
3✔
2072
                        OutboundGateways: map[string]*RemoteGatewayz{},
3✔
2073
                        InboundGateways:  map[string][]*RemoteGatewayz{},
3✔
2074
                }
3✔
2075
                return gwz, nil
3✔
2076
        }
3✔
2077
        // Here gateways are enabled, so fill up more.
2078
        gwz := &Gatewayz{
42✔
2079
                ID:   srvID,
42✔
2080
                Now:  now,
42✔
2081
                Name: gw.name,
42✔
2082
                Host: gw.info.Host,
42✔
2083
                Port: gw.info.Port,
42✔
2084
        }
42✔
2085
        gw.RUnlock()
42✔
2086

42✔
2087
        gwz.OutboundGateways = s.createOutboundsRemoteGatewayz(opts, now)
42✔
2088
        gwz.InboundGateways = s.createInboundsRemoteGatewayz(opts, now)
42✔
2089

42✔
2090
        return gwz, nil
42✔
2091
}
2092

2093
// Based on give options struct, returns if there is a filtered
2094
// Gateway Name and if we should do report Accounts.
2095
// Note that if Accounts is false but AccountName is not empty,
2096
// then Accounts is implicitly set to true.
2097
func getMonitorGWOptions(opts *GatewayzOptions) (string, bool) {
84✔
2098
        var name string
84✔
2099
        var accs bool
84✔
2100
        if opts != nil {
156✔
2101
                if opts.Name != _EMPTY_ {
88✔
2102
                        name = opts.Name
16✔
2103
                }
16✔
2104
                accs = opts.Accounts
72✔
2105
                if !accs && opts.AccountName != _EMPTY_ {
80✔
2106
                        accs = true
8✔
2107
                }
8✔
2108
        }
2109
        return name, accs
84✔
2110
}
2111

2112
// Returns a map of gateways outbound connections.
2113
// Based on options, will include a single or all gateways,
2114
// with no/single/or all accounts interest information.
2115
func (s *Server) createOutboundsRemoteGatewayz(opts *GatewayzOptions, now time.Time) map[string]*RemoteGatewayz {
42✔
2116
        targetGWName, doAccs := getMonitorGWOptions(opts)
42✔
2117

42✔
2118
        if targetGWName != _EMPTY_ {
50✔
2119
                c := s.getOutboundGatewayConnection(targetGWName)
8✔
2120
                if c == nil {
8✔
2121
                        return nil
×
2122
                }
×
2123
                outbounds := make(map[string]*RemoteGatewayz, 1)
8✔
2124
                _, rgw := createOutboundRemoteGatewayz(c, opts, now, doAccs)
8✔
2125
                outbounds[targetGWName] = rgw
8✔
2126
                return outbounds
8✔
2127
        }
2128

2129
        var connsa [16]*client
34✔
2130
        var conns = connsa[:0]
34✔
2131

34✔
2132
        s.getOutboundGatewayConnections(&conns)
34✔
2133

34✔
2134
        outbounds := make(map[string]*RemoteGatewayz, len(conns))
34✔
2135
        for _, c := range conns {
64✔
2136
                name, rgw := createOutboundRemoteGatewayz(c, opts, now, doAccs)
30✔
2137
                if rgw != nil {
60✔
2138
                        outbounds[name] = rgw
30✔
2139
                }
30✔
2140
        }
2141
        return outbounds
34✔
2142
}
2143

2144
// Returns a RemoteGatewayz for a given outbound gw connection
2145
func createOutboundRemoteGatewayz(c *client, opts *GatewayzOptions, now time.Time, doAccs bool) (string, *RemoteGatewayz) {
38✔
2146
        var name string
38✔
2147
        var rgw *RemoteGatewayz
38✔
2148

38✔
2149
        c.mu.Lock()
38✔
2150
        if c.gw != nil {
76✔
2151
                rgw = &RemoteGatewayz{}
38✔
2152
                if doAccs {
58✔
2153
                        rgw.Accounts = createOutboundAccountsGatewayz(opts, c.gw)
20✔
2154
                }
20✔
2155
                if c.gw.cfg != nil {
76✔
2156
                        rgw.IsConfigured = !c.gw.cfg.isImplicit()
38✔
2157
                }
38✔
2158
                rgw.Connection = &ConnInfo{}
38✔
2159
                rgw.Connection.fill(c, c.nc, now, false)
38✔
2160
                name = c.gw.name
38✔
2161
        }
2162
        c.mu.Unlock()
38✔
2163

38✔
2164
        return name, rgw
38✔
2165
}
2166

2167
// Returns the list of accounts for this outbound gateway connection.
2168
// Based on the options, it will be a single or all accounts for
2169
// this outbound.
2170
func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*AccountGatewayz {
20✔
2171
        if gw.outsim == nil {
20✔
2172
                return nil
×
2173
        }
×
2174

2175
        var accName string
20✔
2176
        if opts != nil {
40✔
2177
                accName = opts.AccountName
20✔
2178
        }
20✔
2179
        if accName != _EMPTY_ {
28✔
2180
                ei, ok := gw.outsim.Load(accName)
8✔
2181
                if !ok {
10✔
2182
                        return nil
2✔
2183
                }
2✔
2184
                a := createAccountOutboundGatewayz(opts, accName, ei)
6✔
2185
                return []*AccountGatewayz{a}
6✔
2186
        }
2187

2188
        accs := make([]*AccountGatewayz, 0, 4)
12✔
2189
        gw.outsim.Range(func(k, v any) bool {
84✔
2190
                name := k.(string)
72✔
2191
                a := createAccountOutboundGatewayz(opts, name, v)
72✔
2192
                accs = append(accs, a)
72✔
2193
                return true
72✔
2194
        })
72✔
2195
        return accs
12✔
2196
}
2197

2198
// Returns an AccountGatewayz for this gateway outbound connection
2199
func createAccountOutboundGatewayz(opts *GatewayzOptions, name string, ei any) *AccountGatewayz {
78✔
2200
        a := &AccountGatewayz{
78✔
2201
                Name:                  name,
78✔
2202
                InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch,
78✔
2203
        }
78✔
2204
        if ei != nil {
126✔
2205
                e := ei.(*outsie)
48✔
2206
                e.RLock()
48✔
2207
                a.InterestMode = e.mode.String()
48✔
2208
                a.NoInterestCount = len(e.ni)
48✔
2209
                a.NumQueueSubscriptions = e.qsubs
48✔
2210
                a.TotalSubscriptions = int(e.sl.Count())
48✔
2211
                if opts.AccountSubscriptions || opts.AccountSubscriptionsDetail {
64✔
2212
                        var subsa [4096]*subscription
16✔
2213
                        subs := subsa[:0]
16✔
2214
                        e.sl.All(&subs)
16✔
2215
                        if opts.AccountSubscriptions {
24✔
2216
                                a.Subs = make([]string, 0, len(subs))
8✔
2217
                        } else {
16✔
2218
                                a.SubsDetail = make([]SubDetail, 0, len(subs))
8✔
2219
                        }
8✔
2220
                        for _, sub := range subs {
36✔
2221
                                if opts.AccountSubscriptions {
30✔
2222
                                        a.Subs = append(a.Subs, string(sub.subject))
10✔
2223
                                } else {
20✔
2224
                                        a.SubsDetail = append(a.SubsDetail, newClientSubDetail(sub))
10✔
2225
                                }
10✔
2226
                        }
2227
                }
2228
                e.RUnlock()
48✔
2229
        } else {
30✔
2230
                a.InterestMode = Optimistic.String()
30✔
2231
        }
30✔
2232
        return a
78✔
2233
}
2234

2235
// Returns a map of gateways inbound connections.
2236
// Each entry is an array of RemoteGatewayz since a given server
2237
// may have more than one inbound from the same remote gateway.
2238
// Based on options, will include a single or all gateways,
2239
// with no/single/or all accounts interest information.
2240
func (s *Server) createInboundsRemoteGatewayz(opts *GatewayzOptions, now time.Time) map[string][]*RemoteGatewayz {
42✔
2241
        targetGWName, doAccs := getMonitorGWOptions(opts)
42✔
2242

42✔
2243
        var connsa [16]*client
42✔
2244
        var conns = connsa[:0]
42✔
2245
        s.getInboundGatewayConnections(&conns)
42✔
2246

42✔
2247
        m := make(map[string][]*RemoteGatewayz)
42✔
2248
        for _, c := range conns {
92✔
2249
                c.mu.Lock()
50✔
2250
                if c.gw != nil && (targetGWName == _EMPTY_ || targetGWName == c.gw.name) {
98✔
2251
                        igws := m[c.gw.name]
48✔
2252
                        if igws == nil {
86✔
2253
                                igws = make([]*RemoteGatewayz, 0, 2)
38✔
2254
                        }
38✔
2255
                        rgw := &RemoteGatewayz{}
48✔
2256
                        if doAccs {
68✔
2257
                                rgw.Accounts = createInboundAccountsGatewayz(opts, c.gw)
20✔
2258
                        }
20✔
2259
                        rgw.Connection = &ConnInfo{}
48✔
2260
                        rgw.Connection.fill(c, c.nc, now, false)
48✔
2261
                        igws = append(igws, rgw)
48✔
2262
                        m[c.gw.name] = igws
48✔
2263
                }
2264
                c.mu.Unlock()
50✔
2265
        }
2266
        return m
42✔
2267
}
2268

2269
// Returns the list of accounts for this inbound gateway connection.
2270
// Based on the options, it will be a single or all accounts for
2271
// this inbound.
2272
func createInboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*AccountGatewayz {
20✔
2273
        if gw.insim == nil {
20✔
2274
                return nil
×
2275
        }
×
2276

2277
        var accName string
20✔
2278
        if opts != nil {
40✔
2279
                accName = opts.AccountName
20✔
2280
        }
20✔
2281
        if accName != _EMPTY_ {
28✔
2282
                e, ok := gw.insim[accName]
8✔
2283
                if !ok {
10✔
2284
                        return nil
2✔
2285
                }
2✔
2286
                a := createInboundAccountGatewayz(accName, e)
6✔
2287
                return []*AccountGatewayz{a}
6✔
2288
        }
2289

2290
        accs := make([]*AccountGatewayz, 0, 4)
12✔
2291
        for name, e := range gw.insim {
84✔
2292
                a := createInboundAccountGatewayz(name, e)
72✔
2293
                accs = append(accs, a)
72✔
2294
        }
72✔
2295
        return accs
12✔
2296
}
2297

2298
// Returns an AccountGatewayz for this gateway inbound connection
2299
func createInboundAccountGatewayz(name string, e *insie) *AccountGatewayz {
78✔
2300
        a := &AccountGatewayz{
78✔
2301
                Name:                  name,
78✔
2302
                InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch,
78✔
2303
        }
78✔
2304
        if e != nil {
126✔
2305
                a.InterestMode = e.mode.String()
48✔
2306
                a.NoInterestCount = len(e.ni)
48✔
2307
        } else {
78✔
2308
                a.InterestMode = Optimistic.String()
30✔
2309
        }
30✔
2310
        return a
78✔
2311
}
2312

2313
// HandleGatewayz process HTTP requests for route information.
2314
func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) {
19✔
2315
        s.mu.Lock()
19✔
2316
        s.httpReqStats[GatewayzPath]++
19✔
2317
        s.mu.Unlock()
19✔
2318

19✔
2319
        subs, subsDet, err := decodeSubs(w, r)
19✔
2320
        if err != nil {
19✔
2321
                return
×
2322
        }
×
2323
        accs, err := decodeBool(w, r, "accs")
19✔
2324
        if err != nil {
19✔
2325
                return
×
2326
        }
×
2327
        gwName := r.URL.Query().Get("gw_name")
19✔
2328
        accName := r.URL.Query().Get("acc_name")
19✔
2329
        if accName != _EMPTY_ {
23✔
2330
                accs = true
4✔
2331
        }
4✔
2332

2333
        opts := &GatewayzOptions{
19✔
2334
                Name:                       gwName,
19✔
2335
                Accounts:                   accs,
19✔
2336
                AccountName:                accName,
19✔
2337
                AccountSubscriptions:       subs,
19✔
2338
                AccountSubscriptionsDetail: subsDet,
19✔
2339
        }
19✔
2340
        gw, err := s.Gatewayz(opts)
19✔
2341
        if err != nil {
19✔
2342
                w.WriteHeader(http.StatusBadRequest)
×
2343
                w.Write([]byte(err.Error()))
×
2344
                return
×
2345
        }
×
2346
        b, err := json.MarshalIndent(gw, "", "  ")
19✔
2347
        if err != nil {
19✔
2348
                s.Errorf("Error marshaling response to /gatewayz request: %v", err)
×
2349
        }
×
2350

2351
        // Handle response
2352
        ResponseHandler(w, r, b)
19✔
2353
}
2354

2355
// Leafz represents detailed information on Leafnodes.
2356
type Leafz struct {
2357
        ID       string      `json:"server_id"`
2358
        Now      time.Time   `json:"now"`
2359
        NumLeafs int         `json:"leafnodes"`
2360
        Leafs    []*LeafInfo `json:"leafs"`
2361
}
2362

2363
// LeafzOptions are options passed to Leafz
2364
type LeafzOptions struct {
2365
        // Subscriptions indicates that Leafz will return a leafnode's subscriptions
2366
        Subscriptions bool   `json:"subscriptions"`
2367
        Account       string `json:"account"`
2368
}
2369

2370
// LeafInfo has detailed information on each remote leafnode connection.
2371
type LeafInfo struct {
2372
        ID          uint64     `json:"id"`
2373
        Name        string     `json:"name"`
2374
        IsSpoke     bool       `json:"is_spoke"`
2375
        IsIsolated  bool       `json:"is_isolated,omitempty"`
2376
        Account     string     `json:"account"`
2377
        IP          string     `json:"ip"`
2378
        Port        int        `json:"port"`
2379
        RTT         string     `json:"rtt,omitempty"`
2380
        InMsgs      int64      `json:"in_msgs"`
2381
        OutMsgs     int64      `json:"out_msgs"`
2382
        InBytes     int64      `json:"in_bytes"`
2383
        OutBytes    int64      `json:"out_bytes"`
2384
        NumSubs     uint32     `json:"subscriptions"`
2385
        Subs        []string   `json:"subscriptions_list,omitempty"`
2386
        Compression string     `json:"compression,omitempty"`
2387
        Proxy       *ProxyInfo `json:"proxy,omitempty"`
2388
}
2389

2390
// Leafz returns a Leafz structure containing information about leafnodes.
2391
func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
133✔
2392
        // Grab leafnodes
133✔
2393
        var lconns []*client
133✔
2394
        s.mu.Lock()
133✔
2395
        if len(s.leafs) > 0 {
258✔
2396
                lconns = make([]*client, 0, len(s.leafs))
125✔
2397
                for _, ln := range s.leafs {
259✔
2398
                        if opts != nil && opts.Account != _EMPTY_ {
138✔
2399
                                ln.mu.Lock()
4✔
2400
                                ok := ln.acc.Name == opts.Account
4✔
2401
                                ln.mu.Unlock()
4✔
2402
                                if !ok {
7✔
2403
                                        continue
3✔
2404
                                }
2405
                        }
2406
                        lconns = append(lconns, ln)
131✔
2407
                }
2408
        }
2409
        s.mu.Unlock()
133✔
2410

133✔
2411
        leafnodes := make([]*LeafInfo, 0, len(lconns))
133✔
2412

133✔
2413
        if len(lconns) > 0 {
257✔
2414
                for _, ln := range lconns {
255✔
2415
                        ln.mu.Lock()
131✔
2416
                        lni := &LeafInfo{
131✔
2417
                                ID:          ln.cid,
131✔
2418
                                Name:        ln.leaf.remoteServer,
131✔
2419
                                IsSpoke:     ln.isSpokeLeafNode(),
131✔
2420
                                IsIsolated:  ln.leaf.isolated,
131✔
2421
                                Account:     ln.acc.Name,
131✔
2422
                                IP:          ln.host,
131✔
2423
                                Port:        int(ln.port),
131✔
2424
                                RTT:         ln.getRTT().String(),
131✔
2425
                                InMsgs:      atomic.LoadInt64(&ln.inMsgs),
131✔
2426
                                OutMsgs:     ln.outMsgs,
131✔
2427
                                InBytes:     atomic.LoadInt64(&ln.inBytes),
131✔
2428
                                OutBytes:    ln.outBytes,
131✔
2429
                                NumSubs:     uint32(len(ln.subs)),
131✔
2430
                                Compression: ln.leaf.compression,
131✔
2431
                                Proxy:       createProxyInfo(ln),
131✔
2432
                        }
131✔
2433
                        if opts != nil && opts.Subscriptions {
149✔
2434
                                lni.Subs = make([]string, 0, len(ln.subs))
18✔
2435
                                for _, sub := range ln.subs {
106✔
2436
                                        lni.Subs = append(lni.Subs, string(sub.subject))
88✔
2437
                                }
88✔
2438
                        }
2439
                        ln.mu.Unlock()
131✔
2440
                        leafnodes = append(leafnodes, lni)
131✔
2441
                }
2442
        }
2443

2444
        return &Leafz{
133✔
2445
                ID:       s.ID(),
133✔
2446
                Now:      time.Now().UTC(),
133✔
2447
                NumLeafs: len(leafnodes),
133✔
2448
                Leafs:    leafnodes,
133✔
2449
        }, nil
133✔
2450
}
2451

2452
// HandleLeafz process HTTP requests for leafnode information.
2453
func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) {
×
2454
        s.mu.Lock()
×
2455
        s.httpReqStats[LeafzPath]++
×
2456
        s.mu.Unlock()
×
2457

×
2458
        subs, err := decodeBool(w, r, "subs")
×
2459
        if err != nil {
×
2460
                return
×
2461
        }
×
2462
        l, err := s.Leafz(&LeafzOptions{subs, r.URL.Query().Get("acc")})
×
2463
        if err != nil {
×
2464
                w.WriteHeader(http.StatusBadRequest)
×
2465
                w.Write([]byte(err.Error()))
×
2466
                return
×
2467
        }
×
2468
        b, err := json.MarshalIndent(l, "", "  ")
×
2469
        if err != nil {
×
2470
                s.Errorf("Error marshaling response to /leafz request: %v", err)
×
2471
        }
×
2472

2473
        // Handle response
2474
        ResponseHandler(w, r, b)
×
2475
}
2476

2477
// Leafz represents detailed information on Leafnodes.
2478
type AccountStatz struct {
2479
        ID       string         `json:"server_id"`
2480
        Now      time.Time      `json:"now"`
2481
        Accounts []*AccountStat `json:"account_statz"`
2482
}
2483

2484
// AccountStatzOptions are options passed to account stats requests.
2485
type AccountStatzOptions struct {
2486
        Accounts      []string `json:"accounts"`
2487
        IncludeUnused bool     `json:"include_unused"`
2488
}
2489

2490
// Leafz returns a AccountStatz structure containing summary information about accounts.
2491
func (s *Server) AccountStatz(opts *AccountStatzOptions) (*AccountStatz, error) {
23✔
2492
        stz := &AccountStatz{
23✔
2493
                ID:       s.ID(),
23✔
2494
                Now:      time.Now().UTC(),
23✔
2495
                Accounts: []*AccountStat{},
23✔
2496
        }
23✔
2497
        if opts == nil || len(opts.Accounts) == 0 {
38✔
2498
                s.accounts.Range(func(key, a any) bool {
62✔
2499
                        acc := a.(*Account)
47✔
2500
                        acc.mu.RLock()
47✔
2501
                        if (opts != nil && opts.IncludeUnused) || acc.numLocalConnections() != 0 {
91✔
2502
                                stz.Accounts = append(stz.Accounts, acc.statz())
44✔
2503
                        }
44✔
2504
                        acc.mu.RUnlock()
47✔
2505
                        return true
47✔
2506
                })
2507
        } else {
8✔
2508
                for _, a := range opts.Accounts {
16✔
2509
                        if acc, ok := s.accounts.Load(a); ok {
16✔
2510
                                acc := acc.(*Account)
8✔
2511
                                acc.mu.RLock()
8✔
2512
                                if opts.IncludeUnused || acc.numLocalConnections() != 0 {
15✔
2513
                                        stz.Accounts = append(stz.Accounts, acc.statz())
7✔
2514
                                }
7✔
2515
                                acc.mu.RUnlock()
8✔
2516
                        }
2517
                }
2518
        }
2519
        return stz, nil
23✔
2520
}
2521

2522
// HandleAccountStatz process HTTP requests for statz information of all accounts.
2523
func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) {
7✔
2524
        s.mu.Lock()
7✔
2525
        s.httpReqStats[AccountStatzPath]++
7✔
2526
        s.mu.Unlock()
7✔
2527

7✔
2528
        unused, err := decodeBool(w, r, "unused")
7✔
2529
        if err != nil {
7✔
2530
                return
×
2531
        }
×
2532

2533
        l, err := s.AccountStatz(&AccountStatzOptions{IncludeUnused: unused})
7✔
2534
        if err != nil {
7✔
2535
                w.WriteHeader(http.StatusBadRequest)
×
2536
                w.Write([]byte(err.Error()))
×
2537
                return
×
2538
        }
×
2539
        b, err := json.MarshalIndent(l, "", "  ")
7✔
2540
        if err != nil {
7✔
2541
                s.Errorf("Error marshaling response to %s request: %v", AccountStatzPath, err)
×
2542
                return
×
2543
        }
×
2544

2545
        // Handle response
2546
        ResponseHandler(w, r, b)
7✔
2547
}
2548

2549
// ResponseHandler handles responses for monitoring routes.
2550
func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) {
7,067✔
2551
        handleResponse(http.StatusOK, w, r, data)
7,067✔
2552
}
7,067✔
2553

2554
// handleResponse handles responses for monitoring routes with a specific HTTP status code.
2555
func handleResponse(code int, w http.ResponseWriter, r *http.Request, data []byte) {
7,091✔
2556
        // Get callback from request
7,091✔
2557
        callback := r.URL.Query().Get("callback")
7,091✔
2558
        if callback != _EMPTY_ {
7,107✔
2559
                // Response for JSONP
16✔
2560
                w.Header().Set("Content-Type", "application/javascript")
16✔
2561
                w.WriteHeader(code)
16✔
2562
                fmt.Fprintf(w, "%s(%s)", callback, data)
16✔
2563
        } else {
7,091✔
2564
                // Otherwise JSON
7,075✔
2565
                w.Header().Set("Content-Type", "application/json")
7,075✔
2566
                w.Header().Set("Access-Control-Allow-Origin", "*")
7,075✔
2567
                w.WriteHeader(code)
7,075✔
2568
                w.Write(data)
7,075✔
2569
        }
7,075✔
2570
}
2571

2572
func (reason ClosedState) String() string {
40,919✔
2573
        switch reason {
40,919✔
2574
        case ClientClosed:
22,104✔
2575
                return "Client Closed"
22,104✔
2576
        case AuthenticationTimeout:
24✔
2577
                return "Authentication Timeout"
24✔
2578
        case AuthenticationViolation:
13,243✔
2579
                return "Authentication Failure"
13,243✔
2580
        case TLSHandshakeError:
503✔
2581
                return "TLS Handshake Failure"
503✔
2582
        case SlowConsumerPendingBytes:
×
2583
                return "Slow Consumer (Pending Bytes)"
×
2584
        case SlowConsumerWriteDeadline:
27✔
2585
                return "Slow Consumer (Write Deadline)"
27✔
2586
        case WriteError:
18✔
2587
                return "Write Error"
18✔
2588
        case ReadError:
230✔
2589
                return "Read Error"
230✔
2590
        case ParseError:
12✔
2591
                return "Parse Error"
12✔
2592
        case StaleConnection:
8✔
2593
                return "Stale Connection"
8✔
2594
        case ProtocolViolation:
115✔
2595
                return "Protocol Violation"
115✔
2596
        case BadClientProtocolVersion:
2✔
2597
                return "Bad Client Protocol Version"
2✔
2598
        case WrongPort:
123✔
2599
                return "Incorrect Port"
123✔
2600
        case MaxConnectionsExceeded:
3✔
2601
                return "Maximum Connections Exceeded"
3✔
2602
        case MaxAccountConnectionsExceeded:
113✔
2603
                return "Maximum Account Connections Exceeded"
113✔
2604
        case MaxPayloadExceeded:
12✔
2605
                return "Maximum Message Payload Exceeded"
12✔
2606
        case MaxControlLineExceeded:
4✔
2607
                return "Maximum Control Line Exceeded"
4✔
2608
        case MaxSubscriptionsExceeded:
3✔
2609
                return "Maximum Subscriptions Exceeded"
3✔
2610
        case DuplicateRoute:
98✔
2611
                return "Duplicate Route"
98✔
2612
        case RouteRemoved:
3✔
2613
                return "Route Removed"
3✔
2614
        case ServerShutdown:
3,984✔
2615
                return "Server Shutdown"
3,984✔
2616
        case AuthenticationExpired:
28✔
2617
                return "Authentication Expired"
28✔
2618
        case WrongGateway:
5✔
2619
                return "Wrong Gateway"
5✔
2620
        case MissingAccount:
6✔
2621
                return "Missing Account"
6✔
2622
        case Revocation:
44✔
2623
                return "Credentials Revoked"
44✔
2624
        case InternalClient:
18✔
2625
                return "Internal Client"
18✔
2626
        case MsgHeaderViolation:
×
2627
                return "Message Header Violation"
×
2628
        case NoRespondersRequiresHeaders:
2✔
2629
                return "No Responders Requires Headers"
2✔
2630
        case ClusterNameConflict:
2✔
2631
                return "Cluster Name Conflict"
2✔
2632
        case DuplicateRemoteLeafnodeConnection:
9✔
2633
                return "Duplicate Remote LeafNode Connection"
9✔
2634
        case DuplicateClientID:
8✔
2635
                return "Duplicate Client ID"
8✔
2636
        case DuplicateServerName:
4✔
2637
                return "Duplicate Server Name"
4✔
2638
        case MinimumVersionRequired:
3✔
2639
                return "Minimum Version Required"
3✔
2640
        case ClusterNamesIdentical:
6✔
2641
                return "Cluster Names Identical"
6✔
2642
        case Kicked:
4✔
2643
                return "Kicked"
4✔
2644
        case ProxyNotTrusted:
32✔
2645
                return "Proxy Not Trusted"
32✔
2646
        case ProxyRequired:
118✔
2647
                return "Proxy Required"
118✔
2648
        }
2649

2650
        return "Unknown State"
1✔
2651
}
2652

2653
// AccountzOptions are options passed to Accountz
2654
type AccountzOptions struct {
2655
        // Account indicates that Accountz will return details for the account
2656
        Account string `json:"account"`
2657
}
2658

2659
func newExtServiceLatency(l *serviceLatency) *jwt.ServiceLatency {
151✔
2660
        if l == nil {
302✔
2661
                return nil
151✔
2662
        }
151✔
2663
        return &jwt.ServiceLatency{
×
2664
                Sampling: jwt.SamplingRate(l.sampling),
×
2665
                Results:  jwt.Subject(l.subject),
×
2666
        }
×
2667
}
2668

2669
type ExtImport struct {
2670
        jwt.Import
2671
        Invalid     bool                `json:"invalid"`
2672
        Share       bool                `json:"share"`
2673
        Tracking    bool                `json:"tracking"`
2674
        TrackingHdr http.Header         `json:"tracking_header,omitempty"`
2675
        Latency     *jwt.ServiceLatency `json:"latency,omitempty"`
2676
        M1          *ServiceLatency     `json:"m1,omitempty"`
2677
}
2678

2679
type ExtExport struct {
2680
        jwt.Export
2681
        ApprovedAccounts []string             `json:"approved_accounts,omitempty"`
2682
        RevokedAct       map[string]time.Time `json:"revoked_activations,omitempty"`
2683
}
2684

2685
type ExtVrIssues struct {
2686
        Description string `json:"description"`
2687
        Blocking    bool   `json:"blocking"`
2688
        Time        bool   `json:"time_check"`
2689
}
2690

2691
type ExtMap map[string][]*MapDest
2692

2693
type AccountInfo struct {
2694
        AccountName string               `json:"account_name"`
2695
        LastUpdate  time.Time            `json:"update_time,omitempty"`
2696
        IsSystem    bool                 `json:"is_system,omitempty"`
2697
        Expired     bool                 `json:"expired"`
2698
        Complete    bool                 `json:"complete"`
2699
        JetStream   bool                 `json:"jetstream_enabled"`
2700
        LeafCnt     int                  `json:"leafnode_connections"`
2701
        ClientCnt   int                  `json:"client_connections"`
2702
        SubCnt      uint32               `json:"subscriptions"`
2703
        Mappings    ExtMap               `json:"mappings,omitempty"`
2704
        Exports     []ExtExport          `json:"exports,omitempty"`
2705
        Imports     []ExtImport          `json:"imports,omitempty"`
2706
        Jwt         string               `json:"jwt,omitempty"`
2707
        IssuerKey   string               `json:"issuer_key,omitempty"`
2708
        NameTag     string               `json:"name_tag,omitempty"`
2709
        Tags        jwt.TagList          `json:"tags,omitempty"`
2710
        Claim       *jwt.AccountClaims   `json:"decoded_jwt,omitempty"`
2711
        Vr          []ExtVrIssues        `json:"validation_result_jwt,omitempty"`
2712
        RevokedUser map[string]time.Time `json:"revoked_user,omitempty"`
2713
        Sublist     *SublistStats        `json:"sublist_stats,omitempty"`
2714
        Responses   map[string]ExtImport `json:"responses,omitempty"`
2715
}
2716

2717
type Accountz struct {
2718
        ID            string       `json:"server_id"`
2719
        Now           time.Time    `json:"now"`
2720
        SystemAccount string       `json:"system_account,omitempty"`
2721
        Accounts      []string     `json:"accounts,omitempty"`
2722
        Account       *AccountInfo `json:"account_detail,omitempty"`
2723
}
2724

2725
// HandleAccountz process HTTP requests for account information.
2726
func (s *Server) HandleAccountz(w http.ResponseWriter, r *http.Request) {
6✔
2727
        s.mu.Lock()
6✔
2728
        s.httpReqStats[AccountzPath]++
6✔
2729
        s.mu.Unlock()
6✔
2730
        if l, err := s.Accountz(&AccountzOptions{r.URL.Query().Get("acc")}); err != nil {
6✔
2731
                w.WriteHeader(http.StatusBadRequest)
×
2732
                w.Write([]byte(err.Error()))
×
2733
        } else if b, err := json.MarshalIndent(l, "", "  "); err != nil {
6✔
2734
                s.Errorf("Error marshaling response to %s request: %v", AccountzPath, err)
×
2735
                w.WriteHeader(http.StatusBadRequest)
×
2736
                w.Write([]byte(err.Error()))
×
2737
        } else {
6✔
2738
                ResponseHandler(w, r, b) // Handle response
6✔
2739
        }
6✔
2740
}
2741

2742
func (s *Server) Accountz(optz *AccountzOptions) (*Accountz, error) {
30✔
2743
        a := &Accountz{
30✔
2744
                ID:  s.ID(),
30✔
2745
                Now: time.Now().UTC(),
30✔
2746
        }
30✔
2747
        if sacc := s.SystemAccount(); sacc != nil {
60✔
2748
                a.SystemAccount = sacc.GetName()
30✔
2749
        }
30✔
2750
        if optz == nil || optz.Account == _EMPTY_ {
36✔
2751
                a.Accounts = []string{}
6✔
2752
                s.accounts.Range(func(key, value any) bool {
20✔
2753
                        a.Accounts = append(a.Accounts, key.(string))
14✔
2754
                        return true
14✔
2755
                })
14✔
2756
                return a, nil
6✔
2757
        }
2758
        aInfo, err := s.accountInfo(optz.Account)
24✔
2759
        if err != nil {
24✔
2760
                return nil, err
×
2761
        }
×
2762
        a.Account = aInfo
24✔
2763
        return a, nil
24✔
2764
}
2765

2766
func newExtImport(v *serviceImport) ExtImport {
57✔
2767
        imp := ExtImport{
57✔
2768
                Invalid: true,
57✔
2769
                Import:  jwt.Import{Type: jwt.Service},
57✔
2770
        }
57✔
2771
        if v != nil {
114✔
2772
                imp.Share = v.share
57✔
2773
                imp.Tracking = v.tracking
57✔
2774
                imp.Invalid = v.invalid
57✔
2775
                imp.Import = jwt.Import{
57✔
2776
                        Subject: jwt.Subject(v.to),
57✔
2777
                        Account: v.acc.Name,
57✔
2778
                        Type:    jwt.Service,
57✔
2779
                        // Deprecated so we duplicate. Use LocalSubject.
57✔
2780
                        To:           jwt.Subject(v.from),
57✔
2781
                        LocalSubject: jwt.RenamingSubject(v.from),
57✔
2782
                }
57✔
2783
                imp.TrackingHdr = v.trackingHdr
57✔
2784
                imp.Latency = newExtServiceLatency(v.latency)
57✔
2785
                if v.m1 != nil {
57✔
2786
                        m1 := *v.m1
×
2787
                        imp.M1 = &m1
×
2788
                }
×
2789
        }
2790
        return imp
57✔
2791
}
2792

2793
func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
27✔
2794
        var a *Account
27✔
2795
        if v, ok := s.accounts.Load(accName); !ok {
27✔
2796
                return nil, fmt.Errorf("Account %s does not exist", accName)
×
2797
        } else {
27✔
2798
                a = v.(*Account)
27✔
2799
        }
27✔
2800
        isSys := a == s.SystemAccount()
27✔
2801
        a.mu.RLock()
27✔
2802
        defer a.mu.RUnlock()
27✔
2803
        var vrIssues []ExtVrIssues
27✔
2804
        claim, _ := jwt.DecodeAccountClaims(a.claimJWT) // ignore error
27✔
2805
        if claim != nil {
50✔
2806
                vr := jwt.ValidationResults{}
23✔
2807
                claim.Validate(&vr)
23✔
2808
                vrIssues = make([]ExtVrIssues, len(vr.Issues))
23✔
2809
                for i, v := range vr.Issues {
23✔
2810
                        vrIssues[i] = ExtVrIssues{v.Description, v.Blocking, v.TimeCheck}
×
2811
                }
×
2812
        }
2813
        collectRevocations := func(revocations map[string]int64) map[string]time.Time {
148✔
2814
                l := len(revocations)
121✔
2815
                if l == 0 {
242✔
2816
                        return nil
121✔
2817
                }
121✔
2818
                rev := make(map[string]time.Time, l)
×
2819
                for k, v := range revocations {
×
2820
                        rev[k] = time.Unix(v, 0)
×
2821
                }
×
2822
                return rev
×
2823
        }
2824
        exports := []ExtExport{}
27✔
2825
        for k, v := range a.exports.services {
121✔
2826
                e := ExtExport{
94✔
2827
                        Export: jwt.Export{
94✔
2828
                                Subject: jwt.Subject(k),
94✔
2829
                                Type:    jwt.Service,
94✔
2830
                        },
94✔
2831
                        ApprovedAccounts: []string{},
94✔
2832
                }
94✔
2833
                if v != nil {
188✔
2834
                        e.Latency = newExtServiceLatency(v.latency)
94✔
2835
                        e.TokenReq = v.tokenReq
94✔
2836
                        e.ResponseType = jwt.ResponseType(v.respType.String())
94✔
2837
                        for name := range v.approved {
94✔
2838
                                e.ApprovedAccounts = append(e.ApprovedAccounts, name)
×
2839
                        }
×
2840
                        e.RevokedAct = collectRevocations(v.actsRevoked)
94✔
2841
                }
2842
                exports = append(exports, e)
94✔
2843
        }
2844
        for k, v := range a.exports.streams {
27✔
2845
                e := ExtExport{
×
2846
                        Export: jwt.Export{
×
2847
                                Subject: jwt.Subject(k),
×
2848
                                Type:    jwt.Stream,
×
2849
                        },
×
2850
                        ApprovedAccounts: []string{},
×
2851
                }
×
2852
                if v != nil {
×
2853
                        e.TokenReq = v.tokenReq
×
2854
                        for name := range v.approved {
×
2855
                                e.ApprovedAccounts = append(e.ApprovedAccounts, name)
×
2856
                        }
×
2857
                        e.RevokedAct = collectRevocations(v.actsRevoked)
×
2858
                }
2859
                exports = append(exports, e)
×
2860
        }
2861
        imports := []ExtImport{}
27✔
2862
        for _, v := range a.imports.streams {
27✔
2863
                imp := ExtImport{
×
2864
                        Invalid: true,
×
2865
                        Import:  jwt.Import{Type: jwt.Stream},
×
2866
                }
×
2867
                if v != nil {
×
2868
                        imp.Invalid = v.invalid
×
2869
                        imp.Import = jwt.Import{
×
2870
                                Subject:      jwt.Subject(v.from),
×
2871
                                Account:      v.acc.Name,
×
2872
                                Type:         jwt.Stream,
×
2873
                                LocalSubject: jwt.RenamingSubject(v.to),
×
2874
                        }
×
2875
                }
×
2876
                imports = append(imports, imp)
×
2877
        }
2878
        for _, sis := range a.imports.services {
82✔
2879
                for _, v := range sis {
111✔
2880
                        imports = append(imports, newExtImport(v))
56✔
2881
                }
56✔
2882
        }
2883
        responses := map[string]ExtImport{}
27✔
2884
        for k, v := range a.exports.responses {
28✔
2885
                responses[k] = newExtImport(v)
1✔
2886
        }
1✔
2887
        mappings := ExtMap{}
27✔
2888
        for _, m := range a.mappings {
30✔
2889
                var dests []*MapDest
3✔
2890
                var src string
3✔
2891
                if m == nil {
3✔
2892
                        src = "nil"
×
2893
                        if _, ok := mappings[src]; ok { // only set if not present (keep orig in case nil is used)
×
2894
                                continue
×
2895
                        }
2896
                        dests = append(dests, &MapDest{})
×
2897
                } else {
3✔
2898
                        src = m.src
3✔
2899
                        for _, d := range m.dests {
8✔
2900
                                dests = append(dests, &MapDest{d.tr.dest, d.weight, _EMPTY_})
5✔
2901
                        }
5✔
2902
                        for c, cd := range m.cdests {
3✔
2903
                                for _, d := range cd {
×
2904
                                        dests = append(dests, &MapDest{d.tr.dest, d.weight, c})
×
2905
                                }
×
2906
                        }
2907
                }
2908
                mappings[src] = dests
3✔
2909
        }
2910
        return &AccountInfo{
27✔
2911
                AccountName: accName,
27✔
2912
                LastUpdate:  a.updated.UTC(),
27✔
2913
                IsSystem:    isSys,
27✔
2914
                Expired:     a.expired.Load(),
27✔
2915
                Complete:    !a.incomplete,
27✔
2916
                JetStream:   a.js != nil,
27✔
2917
                LeafCnt:     a.numLocalLeafNodes(),
27✔
2918
                ClientCnt:   a.numLocalConnections(),
27✔
2919
                SubCnt:      a.sl.Count(),
27✔
2920
                Mappings:    mappings,
27✔
2921
                Exports:     exports,
27✔
2922
                Imports:     imports,
27✔
2923
                Jwt:         a.claimJWT,
27✔
2924
                IssuerKey:   a.Issuer,
27✔
2925
                NameTag:     a.getNameTagLocked(),
27✔
2926
                Tags:        a.tags,
27✔
2927
                Claim:       claim,
27✔
2928
                Vr:          vrIssues,
27✔
2929
                RevokedUser: collectRevocations(a.usersRevoked),
27✔
2930
                Sublist:     a.sl.Stats(),
27✔
2931
                Responses:   responses,
27✔
2932
        }, nil
27✔
2933
}
2934

2935
// JSzOptions are options passed to Jsz
2936
type JSzOptions struct {
2937
        Account          string `json:"account,omitempty"`
2938
        Accounts         bool   `json:"accounts,omitempty"`
2939
        Streams          bool   `json:"streams,omitempty"`
2940
        Consumer         bool   `json:"consumer,omitempty"`
2941
        DirectConsumer   bool   `json:"direct_consumer,omitempty"`
2942
        Config           bool   `json:"config,omitempty"`
2943
        LeaderOnly       bool   `json:"leader_only,omitempty"`
2944
        Offset           int    `json:"offset,omitempty"`
2945
        Limit            int    `json:"limit,omitempty"`
2946
        RaftGroups       bool   `json:"raft,omitempty"`
2947
        StreamLeaderOnly bool   `json:"stream_leader_only,omitempty"`
2948
}
2949

2950
// HealthzOptions are options passed to Healthz
2951
type HealthzOptions struct {
2952
        // Deprecated: Use JSEnabledOnly instead
2953
        JSEnabled     bool   `json:"js-enabled,omitempty"`
2954
        JSEnabledOnly bool   `json:"js-enabled-only,omitempty"`
2955
        JSServerOnly  bool   `json:"js-server-only,omitempty"`
2956
        JSMetaOnly    bool   `json:"js-meta-only,omitempty"`
2957
        Account       string `json:"account,omitempty"`
2958
        Stream        string `json:"stream,omitempty"`
2959
        Consumer      string `json:"consumer,omitempty"`
2960
        Details       bool   `json:"details,omitempty"`
2961
}
2962

2963
// ProfilezOptions are options passed to Profilez
2964
type ProfilezOptions struct {
2965
        Name     string        `json:"name"`
2966
        Debug    int           `json:"debug"`
2967
        Duration time.Duration `json:"duration,omitempty"`
2968
}
2969

2970
// IpqueueszOptions are options passed to Ipqueuesz
2971
type IpqueueszOptions struct {
2972
        All    bool   `json:"all"`
2973
        Filter string `json:"filter"`
2974
}
2975

2976
// RaftzOptions are options passed to Raftz
2977
type RaftzOptions struct {
2978
        AccountFilter string `json:"account"`
2979
        GroupFilter   string `json:"group"`
2980
}
2981

2982
// StreamDetail shows information about the stream state and its consumers.
2983
type StreamDetail struct {
2984
        Name               string              `json:"name"`
2985
        Created            time.Time           `json:"created"`
2986
        Cluster            *ClusterInfo        `json:"cluster,omitempty"`
2987
        Config             *StreamConfig       `json:"config,omitempty"`
2988
        State              StreamState         `json:"state,omitempty"`
2989
        Consumer           []*ConsumerInfo     `json:"consumer_detail,omitempty"`
2990
        DirectConsumer     []*ConsumerInfo     `json:"direct_consumer_detail,omitempty"`
2991
        Mirror             *StreamSourceInfo   `json:"mirror,omitempty"`
2992
        Sources            []*StreamSourceInfo `json:"sources,omitempty"`
2993
        RaftGroup          string              `json:"stream_raft_group,omitempty"`
2994
        ConsumerRaftGroups []*RaftGroupDetail  `json:"consumer_raft_groups,omitempty"`
2995
}
2996

2997
// RaftGroupDetail shows information details about the Raft group.
2998
type RaftGroupDetail struct {
2999
        Name      string `json:"name"`
3000
        RaftGroup string `json:"raft_group,omitempty"`
3001
}
3002

3003
type AccountDetail struct {
3004
        Name string `json:"name"`
3005
        Id   string `json:"id"`
3006
        JetStreamStats
3007
        Streams []StreamDetail `json:"stream_detail,omitempty"`
3008
}
3009

3010
// MetaSnapshotStats shows information about meta snapshots.
3011
type MetaSnapshotStats struct {
3012
        PendingEntries uint64        `json:"pending_entries"`         // PendingEntries is the count of pending entries in the meta layer
3013
        PendingSize    uint64        `json:"pending_size"`            // PendingSize is the size in bytes of pending entries in the meta layer
3014
        LastTime       time.Time     `json:"last_time,omitempty"`     // LastTime is when the last meta snapshot was taken
3015
        LastDuration   time.Duration `json:"last_duration,omitempty"` // LastDuration is how long the last meta snapshot took
3016
}
3017

3018
// metaClusterSnapshotStats returns snapshot statistics for the meta group.
3019
func (s *Server) metaClusterSnapshotStats(js *jetStream, mg RaftNode) *MetaSnapshotStats {
24,567✔
3020
        entries, bytes := mg.Size()
24,567✔
3021
        snap := &MetaSnapshotStats{
24,567✔
3022
                PendingEntries: entries,
24,567✔
3023
                PendingSize:    bytes,
24,567✔
3024
        }
24,567✔
3025

24,567✔
3026
        js.mu.RLock()
24,567✔
3027
        cluster := js.cluster
24,567✔
3028
        js.mu.RUnlock()
24,567✔
3029

24,567✔
3030
        if cluster != nil {
49,134✔
3031
                timeNanos := atomic.LoadInt64(&cluster.lastMetaSnapTime)
24,567✔
3032
                durationNanos := atomic.LoadInt64(&cluster.lastMetaSnapDuration)
24,567✔
3033
                if timeNanos > 0 {
24,875✔
3034
                        snap.LastTime = time.Unix(0, timeNanos).UTC()
308✔
3035
                }
308✔
3036
                if durationNanos > 0 {
24,875✔
3037
                        snap.LastDuration = time.Duration(durationNanos)
308✔
3038
                }
308✔
3039
        }
3040

3041
        return snap
24,567✔
3042
}
3043

3044
// MetaClusterInfo shows information about the meta group.
3045
type MetaClusterInfo struct {
3046
        Name            string             `json:"name,omitempty"`     // Name is the name of the cluster
3047
        Leader          string             `json:"leader,omitempty"`   // Leader is the server name of the cluster leader
3048
        Peer            string             `json:"peer,omitempty"`     // Peer is unique ID of the leader
3049
        Replicas        []*PeerInfo        `json:"replicas,omitempty"` // Replicas is a list of known peers
3050
        Size            int                `json:"cluster_size"`       // Size is the known size of the cluster
3051
        Pending         int                `json:"pending"`            // Pending is how many RAFT messages are not yet processed
3052
        PendingRequests int                `json:"pending_requests"`   // PendingRequests is how many CRUD operations are queued for processing
3053
        PendingInfos    int                `json:"pending_infos"`      // PendingInfos is how many info operations are queued for processing
3054
        Snapshot        *MetaSnapshotStats `json:"snapshot"`           // Snapshot contains meta snapshot statistics
3055
}
3056

3057
// JSInfo has detailed information on JetStream.
3058
type JSInfo struct {
3059
        JetStreamStats
3060
        ID              string           `json:"server_id"`
3061
        Now             time.Time        `json:"now"`
3062
        Disabled        bool             `json:"disabled,omitempty"`
3063
        Config          JetStreamConfig  `json:"config,omitempty"`
3064
        Limits          *JSLimitOpts     `json:"limits,omitempty"`
3065
        Streams         int              `json:"streams"`
3066
        StreamsLeader   int              `json:"streams_leader,omitempty"`
3067
        Consumers       int              `json:"consumers"`
3068
        ConsumersLeader int              `json:"consumers_leader,omitempty"`
3069
        Messages        uint64           `json:"messages"`
3070
        Bytes           uint64           `json:"bytes"`
3071
        Meta            *MetaClusterInfo `json:"meta_cluster,omitempty"`
3072
        AccountDetails  []*AccountDetail `json:"account_details,omitempty"`
3073
        Total           int              `json:"total"`
3074
}
3075

3076
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optDirectConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
366✔
3077
        jsa.mu.RLock()
366✔
3078
        acc := jsa.account
366✔
3079
        name := acc.GetName()
366✔
3080
        id := name
366✔
3081
        if acc.nameTag != _EMPTY_ {
368✔
3082
                name = acc.nameTag
2✔
3083
        }
2✔
3084
        jsa.usageMu.RLock()
366✔
3085
        totalMem, totalStore := jsa.storageTotals()
366✔
3086
        detail := AccountDetail{
366✔
3087
                Name: name,
366✔
3088
                Id:   id,
366✔
3089
                JetStreamStats: JetStreamStats{
366✔
3090
                        Memory: totalMem,
366✔
3091
                        Store:  totalStore,
366✔
3092
                        API: JetStreamAPIStats{
366✔
3093
                                Total:  jsa.apiTotal,
366✔
3094
                                Errors: jsa.apiErrors,
366✔
3095
                        },
366✔
3096
                },
366✔
3097
                Streams: make([]StreamDetail, 0, len(jsa.streams)),
366✔
3098
        }
366✔
3099
        if reserved, ok := jsa.limits[_EMPTY_]; ok {
730✔
3100
                detail.JetStreamStats.ReservedMemory = uint64(reserved.MaxMemory)
364✔
3101
                detail.JetStreamStats.ReservedStore = uint64(reserved.MaxStore)
364✔
3102
        }
364✔
3103
        jsa.usageMu.RUnlock()
366✔
3104

366✔
3105
        var streams []*stream
366✔
3106
        if optStreams {
707✔
3107
                for _, stream := range jsa.streams {
709✔
3108
                        streams = append(streams, stream)
368✔
3109
                }
368✔
3110
        }
3111
        jsa.mu.RUnlock()
366✔
3112

366✔
3113
        if js := s.getJetStream(); js != nil && optStreams {
707✔
3114
                for _, stream := range streams {
709✔
3115
                        rgroup := stream.raftGroup()
368✔
3116
                        ci := js.clusterInfo(rgroup)
368✔
3117
                        var cfg *StreamConfig
368✔
3118
                        if optCfg {
373✔
3119
                                c := stream.config()
5✔
3120
                                cfg = &c
5✔
3121
                        }
5✔
3122
                        // Skip if we are only looking for stream leaders.
3123
                        if optStreamLeader && ci != nil && ci.Leader != s.Name() {
370✔
3124
                                continue
2✔
3125
                        }
3126
                        sdet := StreamDetail{
366✔
3127
                                Name:    stream.name(),
366✔
3128
                                Created: stream.createdTime(),
366✔
3129
                                State:   stream.state(),
366✔
3130
                                Cluster: ci,
366✔
3131
                                Config:  cfg,
366✔
3132
                                Mirror:  stream.mirrorInfo(),
366✔
3133
                                Sources: stream.sourcesInfo(),
366✔
3134
                        }
366✔
3135
                        if optRaft && rgroup != nil {
371✔
3136
                                sdet.RaftGroup = rgroup.Name
5✔
3137
                                sdet.ConsumerRaftGroups = make([]*RaftGroupDetail, 0)
5✔
3138
                        }
5✔
3139
                        if optConsumers {
719✔
3140
                                for _, consumer := range stream.getPublicConsumers() {
457✔
3141
                                        cInfo := consumer.info()
104✔
3142
                                        if cInfo == nil {
104✔
3143
                                                continue
×
3144
                                        }
3145
                                        if !optCfg {
203✔
3146
                                                cInfo.Config = nil
99✔
3147
                                        }
99✔
3148
                                        sdet.Consumer = append(sdet.Consumer, cInfo)
104✔
3149
                                        if optRaft {
109✔
3150
                                                crgroup := consumer.raftGroup()
5✔
3151
                                                if crgroup != nil {
10✔
3152
                                                        sdet.ConsumerRaftGroups = append(sdet.ConsumerRaftGroups,
5✔
3153
                                                                &RaftGroupDetail{cInfo.Name, crgroup.Name},
5✔
3154
                                                        )
5✔
3155
                                                }
5✔
3156
                                        }
3157
                                }
3158
                                if optDirectConsumers {
358✔
3159
                                        for _, consumer := range stream.getDirectConsumers() {
6✔
3160
                                                cInfo := consumer.info()
1✔
3161
                                                if cInfo == nil {
1✔
3162
                                                        continue
×
3163
                                                }
3164
                                                if !optCfg {
2✔
3165
                                                        cInfo.Config = nil
1✔
3166
                                                }
1✔
3167
                                                sdet.DirectConsumer = append(sdet.Consumer, cInfo)
1✔
3168
                                        }
3169
                                }
3170
                        }
3171
                        detail.Streams = append(detail.Streams, sdet)
366✔
3172
                }
3173
        }
3174
        return &detail
366✔
3175
}
3176

3177
func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
1✔
3178
        js := s.getJetStream()
1✔
3179
        if js == nil {
1✔
3180
                return nil, fmt.Errorf("jetstream not enabled")
×
3181
        }
×
3182
        acc := opts.Account
1✔
3183
        account, ok := s.accounts.Load(acc)
1✔
3184
        if !ok {
1✔
3185
                return nil, fmt.Errorf("account %q not found", acc)
×
3186
        }
×
3187
        js.mu.RLock()
1✔
3188
        jsa, ok := js.accounts[account.(*Account).Name]
1✔
3189
        js.mu.RUnlock()
1✔
3190
        if !ok {
1✔
3191
                return nil, fmt.Errorf("account %q not jetstream enabled", acc)
×
3192
        }
×
3193
        return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil
1✔
3194
}
3195

3196
// helper to get cluster info from node via dummy group
3197
func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo {
3,246✔
3198
        if node == nil {
3,246✔
3199
                return nil
×
3200
        }
×
3201
        peers := node.Peers()
3,246✔
3202
        peerList := make([]string, len(peers))
3,246✔
3203
        for i, p := range peers {
15,171✔
3204
                peerList[i] = p.ID
11,925✔
3205
        }
11,925✔
3206
        group := &raftGroup{
3,246✔
3207
                Name:  _EMPTY_,
3,246✔
3208
                Peers: peerList,
3,246✔
3209
                node:  node,
3,246✔
3210
        }
3,246✔
3211
        return s.getJetStream().clusterInfo(group)
3,246✔
3212
}
3213

3214
// Jsz returns a Jsz structure containing information about JetStream.
3215
func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
689✔
3216
        // set option defaults
689✔
3217
        if opts == nil {
992✔
3218
                opts = &JSzOptions{}
303✔
3219
        }
303✔
3220
        if opts.Offset < 0 {
689✔
3221
                opts.Offset = 0
×
3222
        }
×
3223
        if opts.Limit == 0 {
1,350✔
3224
                opts.Limit = 1024
661✔
3225
        }
661✔
3226
        if opts.Consumer {
1,022✔
3227
                opts.Streams = true
333✔
3228
        }
333✔
3229
        if opts.Streams && opts.Account == _EMPTY_ {
1,016✔
3230
                opts.Accounts = true
327✔
3231
        }
327✔
3232

3233
        jsi := &JSInfo{
689✔
3234
                ID:  s.ID(),
689✔
3235
                Now: time.Now().UTC(),
689✔
3236
        }
689✔
3237

689✔
3238
        js := s.getJetStream()
689✔
3239
        if js == nil || !js.isEnabled() {
693✔
3240
                if opts.LeaderOnly {
5✔
3241
                        return nil, fmt.Errorf("%w: not leader", errSkipZreq)
1✔
3242
                }
1✔
3243

3244
                jsi.Disabled = true
3✔
3245
                return jsi, nil
3✔
3246
        }
3247

3248
        jsi.Limits = &s.getOpts().JetStreamLimits
685✔
3249

685✔
3250
        js.mu.RLock()
685✔
3251
        isLeader := js.cluster == nil || js.cluster.isLeader()
685✔
3252
        js.mu.RUnlock()
685✔
3253

685✔
3254
        if opts.LeaderOnly && !isLeader {
685✔
3255
                return nil, fmt.Errorf("%w: not leader", errSkipZreq)
×
3256
        }
×
3257

3258
        var accounts []*jsAccount
685✔
3259

685✔
3260
        js.mu.RLock()
685✔
3261
        jsi.Config = js.config
685✔
3262
        for _, info := range js.accounts {
1,422✔
3263
                accounts = append(accounts, info)
737✔
3264
        }
737✔
3265
        js.mu.RUnlock()
685✔
3266

685✔
3267
        jsi.Total = len(accounts)
685✔
3268

685✔
3269
        if mg := js.getMetaGroup(); mg != nil {
1,369✔
3270
                if ci := s.raftNodeToClusterInfo(mg); ci != nil {
1,368✔
3271
                        jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
684✔
3272
                        if isLeader {
892✔
3273
                                jsi.Meta.Replicas = ci.Replicas
208✔
3274
                        }
208✔
3275
                        if ipq := s.jsAPIRoutedReqs; ipq != nil {
1,368✔
3276
                                jsi.Meta.PendingRequests = ipq.len()
684✔
3277
                        }
684✔
3278
                        if ipq := s.jsAPIRoutedInfoReqs; ipq != nil {
1,368✔
3279
                                jsi.Meta.PendingInfos = ipq.len()
684✔
3280
                        }
684✔
3281
                        jsi.Meta.Pending = jsi.Meta.PendingRequests + jsi.Meta.PendingInfos
684✔
3282
                        jsi.Meta.Snapshot = s.metaClusterSnapshotStats(js, mg)
684✔
3283
                }
3284
        }
3285

3286
        jsi.JetStreamStats = *js.usageStats()
685✔
3287

685✔
3288
        // If a specific account is requested, track the index.
685✔
3289
        filterIdx := -1
685✔
3290

685✔
3291
        // Calculate the stats of all accounts and streams regardless of the filtering.
685✔
3292
        for i, jsa := range accounts {
1,422✔
3293
                if jsa.acc().GetName() == opts.Account {
755✔
3294
                        filterIdx = i
18✔
3295
                }
18✔
3296

3297
                jsa.mu.RLock()
737✔
3298
                streams := make([]*stream, 0, len(jsa.streams))
737✔
3299
                for _, stream := range jsa.streams {
1,510✔
3300
                        streams = append(streams, stream)
773✔
3301
                }
773✔
3302
                jsa.mu.RUnlock()
737✔
3303

737✔
3304
                jsi.Streams += len(streams)
737✔
3305
                for _, stream := range streams {
1,510✔
3306
                        streamState := stream.state()
773✔
3307
                        jsi.Messages += streamState.Msgs
773✔
3308
                        jsi.Bytes += streamState.Bytes
773✔
3309
                        jsi.Consumers += streamState.Consumers
773✔
3310
                        if opts.RaftGroups {
781✔
3311
                                if node := stream.raftNode(); node == nil || node.Leader() {
12✔
3312
                                        jsi.StreamsLeader++
4✔
3313
                                }
4✔
3314
                                for _, consumer := range stream.getPublicConsumers() {
16✔
3315
                                        if node := consumer.raftNode(); node == nil || node.Leader() {
12✔
3316
                                                jsi.ConsumersLeader++
4✔
3317
                                        }
4✔
3318
                                }
3319
                        }
3320
                }
3321
        }
3322

3323
        // Targeted account takes precedence.
3324
        if filterIdx >= 0 {
703✔
3325
                accounts = accounts[filterIdx : filterIdx+1]
18✔
3326
        } else if opts.Accounts {
1,030✔
3327

345✔
3328
                if opts.Limit > 0 {
690✔
3329
                        // Sort by name for a consistent read (barring any concurrent changes)
345✔
3330
                        slices.SortFunc(accounts, func(i, j *jsAccount) int { return cmp.Compare(i.acc().Name, j.acc().Name) })
363✔
3331

3332
                        // Offset larger than the number of accounts.
3333
                        offset := min(opts.Offset, len(accounts))
345✔
3334
                        accounts = accounts[offset:]
345✔
3335

345✔
3336
                        limit := min(opts.Limit, len(accounts))
345✔
3337
                        accounts = accounts[:limit]
345✔
3338
                }
3339
        } else {
322✔
3340
                accounts = nil
322✔
3341
        }
322✔
3342

3343
        if len(accounts) > 0 {
1,046✔
3344
                jsi.AccountDetails = make([]*AccountDetail, 0, len(accounts))
361✔
3345

361✔
3346
                for _, jsa := range accounts {
726✔
3347
                        detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly)
365✔
3348
                        jsi.AccountDetails = append(jsi.AccountDetails, detail)
365✔
3349
                }
365✔
3350
        }
3351

3352
        return jsi, nil
685✔
3353
}
3354

3355
// HandleJsz process HTTP requests for jetstream information.
3356
func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
53✔
3357
        s.mu.Lock()
53✔
3358
        s.httpReqStats[JszPath]++
53✔
3359
        s.mu.Unlock()
53✔
3360
        accounts, err := decodeBool(w, r, "accounts")
53✔
3361
        if err != nil {
53✔
3362
                return
×
3363
        }
×
3364
        streams, err := decodeBool(w, r, "streams")
53✔
3365
        if err != nil {
53✔
3366
                return
×
3367
        }
×
3368
        consumers, err := decodeBool(w, r, "consumers")
53✔
3369
        if err != nil {
53✔
3370
                return
×
3371
        }
×
3372
        directConsumers, err := decodeBool(w, r, "direct-consumers")
53✔
3373
        if err != nil {
53✔
3374
                return
×
3375
        }
×
3376
        config, err := decodeBool(w, r, "config")
53✔
3377
        if err != nil {
53✔
3378
                return
×
3379
        }
×
3380
        offset, err := decodeInt(w, r, "offset")
53✔
3381
        if err != nil {
53✔
3382
                return
×
3383
        }
×
3384
        limit, err := decodeInt(w, r, "limit")
53✔
3385
        if err != nil {
53✔
3386
                return
×
3387
        }
×
3388
        leader, err := decodeBool(w, r, "leader-only")
53✔
3389
        if err != nil {
53✔
3390
                return
×
3391
        }
×
3392
        rgroups, err := decodeBool(w, r, "raft")
53✔
3393
        if err != nil {
53✔
3394
                return
×
3395
        }
×
3396

3397
        sleader, err := decodeBool(w, r, "stream-leader-only")
53✔
3398
        if err != nil {
53✔
3399
                return
×
3400
        }
×
3401

3402
        l, err := s.Jsz(&JSzOptions{
53✔
3403
                Account:          r.URL.Query().Get("acc"),
53✔
3404
                Accounts:         accounts,
53✔
3405
                Streams:          streams,
53✔
3406
                Consumer:         consumers,
53✔
3407
                DirectConsumer:   directConsumers,
53✔
3408
                Config:           config,
53✔
3409
                LeaderOnly:       leader,
53✔
3410
                Offset:           offset,
53✔
3411
                Limit:            limit,
53✔
3412
                RaftGroups:       rgroups,
53✔
3413
                StreamLeaderOnly: sleader,
53✔
3414
        })
53✔
3415
        if err != nil {
53✔
3416
                w.WriteHeader(http.StatusBadRequest)
×
3417
                w.Write([]byte(err.Error()))
×
3418
                return
×
3419
        }
×
3420
        b, err := json.MarshalIndent(l, "", "  ")
53✔
3421
        if err != nil {
53✔
3422
                s.Errorf("Error marshaling response to /jsz request: %v", err)
×
3423
        }
×
3424

3425
        // Handle response
3426
        ResponseHandler(w, r, b)
53✔
3427
}
3428

3429
type HealthStatus struct {
3430
        Status     string         `json:"status"`
3431
        StatusCode int            `json:"status_code,omitempty"`
3432
        Error      string         `json:"error,omitempty"`
3433
        Errors     []HealthzError `json:"errors,omitempty"`
3434
}
3435

3436
type HealthzError struct {
3437
        Type     HealthZErrorType `json:"type"`
3438
        Account  string           `json:"account,omitempty"`
3439
        Stream   string           `json:"stream,omitempty"`
3440
        Consumer string           `json:"consumer,omitempty"`
3441
        Error    string           `json:"error,omitempty"`
3442
}
3443

3444
type HealthZErrorType int
3445

3446
const (
3447
        HealthzErrorConn HealthZErrorType = iota
3448
        HealthzErrorBadRequest
3449
        HealthzErrorJetStream
3450
        HealthzErrorAccount
3451
        HealthzErrorStream
3452
        HealthzErrorConsumer
3453
)
3454

3455
func (t HealthZErrorType) String() string {
25✔
3456
        switch t {
25✔
3457
        case HealthzErrorConn:
×
3458
                return "CONNECTION"
×
3459
        case HealthzErrorBadRequest:
6✔
3460
                return "BAD_REQUEST"
6✔
3461
        case HealthzErrorJetStream:
4✔
3462
                return "JETSTREAM"
4✔
3463
        case HealthzErrorAccount:
5✔
3464
                return "ACCOUNT"
5✔
3465
        case HealthzErrorStream:
5✔
3466
                return "STREAM"
5✔
3467
        case HealthzErrorConsumer:
5✔
3468
                return "CONSUMER"
5✔
3469
        default:
×
3470
                return "unknown"
×
3471
        }
3472
}
3473

3474
func (t HealthZErrorType) MarshalJSON() ([]byte, error) {
25✔
3475
        return json.Marshal(t.String())
25✔
3476
}
25✔
3477

3478
func (t *HealthZErrorType) UnmarshalJSON(data []byte) error {
25✔
3479
        switch string(data) {
25✔
3480
        case `"CONNECTION"`:
×
3481
                *t = HealthzErrorConn
×
3482
        case `"BAD_REQUEST"`:
6✔
3483
                *t = HealthzErrorBadRequest
6✔
3484
        case `"JETSTREAM"`:
4✔
3485
                *t = HealthzErrorJetStream
4✔
3486
        case `"ACCOUNT"`:
5✔
3487
                *t = HealthzErrorAccount
5✔
3488
        case `"STREAM"`:
5✔
3489
                *t = HealthzErrorStream
5✔
3490
        case `"CONSUMER"`:
5✔
3491
                *t = HealthzErrorConsumer
5✔
3492
        default:
×
3493
                return fmt.Errorf("unknown healthz error type %q", data)
×
3494
        }
3495
        return nil
25✔
3496
}
3497

3498
// https://datatracker.ietf.org/doc/html/draft-inadarei-api-health-check
3499
func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
24✔
3500
        s.mu.Lock()
24✔
3501
        s.httpReqStats[HealthzPath]++
24✔
3502
        s.mu.Unlock()
24✔
3503

24✔
3504
        jsEnabled, err := decodeBool(w, r, "js-enabled")
24✔
3505
        if err != nil {
24✔
3506
                return
×
3507
        }
×
3508
        if jsEnabled {
24✔
3509
                s.Warnf("Healthcheck: js-enabled deprecated, use js-enabled-only instead")
×
3510
        }
×
3511
        jsEnabledOnly, err := decodeBool(w, r, "js-enabled-only")
24✔
3512
        if err != nil {
24✔
3513
                return
×
3514
        }
×
3515
        jsServerOnly, err := decodeBool(w, r, "js-server-only")
24✔
3516
        if err != nil {
24✔
3517
                return
×
3518
        }
×
3519
        jsMetaOnly, err := decodeBool(w, r, "js-meta-only")
24✔
3520
        if err != nil {
24✔
3521
                return
×
3522
        }
×
3523

3524
        includeDetails, err := decodeBool(w, r, "details")
24✔
3525
        if err != nil {
24✔
3526
                return
×
3527
        }
×
3528

3529
        hs := s.healthz(&HealthzOptions{
24✔
3530
                JSEnabled:     jsEnabled,
24✔
3531
                JSEnabledOnly: jsEnabledOnly,
24✔
3532
                JSServerOnly:  jsServerOnly,
24✔
3533
                JSMetaOnly:    jsMetaOnly,
24✔
3534
                Account:       r.URL.Query().Get("account"),
24✔
3535
                Stream:        r.URL.Query().Get("stream"),
24✔
3536
                Consumer:      r.URL.Query().Get("consumer"),
24✔
3537
                Details:       includeDetails,
24✔
3538
        })
24✔
3539

24✔
3540
        code := hs.StatusCode
24✔
3541
        if hs.Error != _EMPTY_ {
34✔
3542
                s.Warnf("Healthcheck failed: %q", hs.Error)
10✔
3543
        } else if len(hs.Errors) != 0 {
28✔
3544
                s.Warnf("Healthcheck failed: %d errors", len(hs.Errors))
4✔
3545
        }
4✔
3546
        // Remove StatusCode from JSON representation when responding via HTTP
3547
        // since this is already in the response.
3548
        hs.StatusCode = 0
24✔
3549
        b, err := json.Marshal(hs)
24✔
3550
        if err != nil {
24✔
3551
                s.Errorf("Error marshaling response to /healthz request: %v", err)
×
3552
        }
×
3553

3554
        handleResponse(code, w, r, b)
24✔
3555
}
3556

3557
// Generate health status.
3558
func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
3,908✔
3559
        var health = &HealthStatus{Status: "ok"}
3,908✔
3560

3,908✔
3561
        // set option defaults
3,908✔
3562
        if opts == nil {
7,673✔
3563
                opts = &HealthzOptions{}
3,765✔
3564
        }
3,765✔
3565
        details := opts.Details
3,908✔
3566
        defer func() {
7,816✔
3567
                // for response with details enabled, set status to either "error" or "ok"
3,908✔
3568
                if details {
3,937✔
3569
                        if len(health.Errors) != 0 {
55✔
3570
                                health.Status = "error"
26✔
3571
                        } else {
29✔
3572
                                health.Status = "ok"
3✔
3573
                        }
3✔
3574
                }
3575
                // if no specific status code was set, set it based on the presence of errors
3576
                if health.StatusCode == 0 {
7,750✔
3577
                        if health.Error != _EMPTY_ || len(health.Errors) != 0 {
7,499✔
3578
                                health.StatusCode = http.StatusServiceUnavailable
3,657✔
3579
                        } else {
3,842✔
3580
                                health.StatusCode = http.StatusOK
185✔
3581
                        }
185✔
3582
                }
3583
        }()
3584

3585
        if opts.Account == _EMPTY_ && opts.Stream != _EMPTY_ {
3,920✔
3586
                health.StatusCode = http.StatusBadRequest
12✔
3587
                if !details {
18✔
3588
                        health.Status = "error"
6✔
3589
                        health.Error = fmt.Sprintf("%q must not be empty when checking stream health", "account")
6✔
3590
                } else {
12✔
3591
                        health.Errors = append(health.Errors, HealthzError{
6✔
3592
                                Type:  HealthzErrorBadRequest,
6✔
3593
                                Error: fmt.Sprintf("%q must not be empty when checking stream health", "account"),
6✔
3594
                        })
6✔
3595
                }
6✔
3596
                return health
12✔
3597
        }
3598

3599
        if opts.Stream == _EMPTY_ && opts.Consumer != _EMPTY_ {
3,899✔
3600
                health.StatusCode = http.StatusBadRequest
3✔
3601
                if !details {
5✔
3602
                        health.Status = "error"
2✔
3603
                        health.Error = fmt.Sprintf("%q must not be empty when checking consumer health", "stream")
2✔
3604
                } else {
3✔
3605
                        health.Errors = append(health.Errors, HealthzError{
1✔
3606
                                Type:  HealthzErrorBadRequest,
1✔
3607
                                Error: fmt.Sprintf("%q must not be empty when checking consumer health", "stream"),
1✔
3608
                        })
1✔
3609
                }
1✔
3610
                return health
3✔
3611
        }
3612

3613
        if err := s.readyForConnections(time.Millisecond); err != nil {
3,900✔
3614
                health.StatusCode = http.StatusInternalServerError
7✔
3615
                health.Status = "error"
7✔
3616
                if !details {
14✔
3617
                        health.Error = err.Error()
7✔
3618
                } else {
7✔
3619
                        health.Errors = append(health.Errors, HealthzError{
×
3620
                                Type:  HealthzErrorConn,
×
3621
                                Error: err.Error(),
×
3622
                        })
×
3623
                }
×
3624
                return health
7✔
3625
        }
3626

3627
        // If JSServerOnly is true, then do not check further accounts, streams and consumers.
3628
        if opts.JSServerOnly {
3,901✔
3629
                return health
15✔
3630
        }
15✔
3631

3632
        sopts := s.getOpts()
3,871✔
3633

3,871✔
3634
        // If JS is not enabled in the config, we stop.
3,871✔
3635
        if !sopts.JetStream {
3,890✔
3636
                return health
19✔
3637
        }
19✔
3638

3639
        // Access the Jetstream state to perform additional checks.
3640
        js := s.getJetStream()
3,852✔
3641
        const na = "unavailable"
3,852✔
3642
        if !js.isEnabled() {
3,862✔
3643
                health.StatusCode = http.StatusServiceUnavailable
10✔
3644
                health.Status = na
10✔
3645
                if !details {
16✔
3646
                        health.Error = NewJSNotEnabledError().Error()
6✔
3647
                } else {
10✔
3648
                        health.Errors = append(health.Errors, HealthzError{
4✔
3649
                                Type:  HealthzErrorJetStream,
4✔
3650
                                Error: NewJSNotEnabledError().Error(),
4✔
3651
                        })
4✔
3652
                }
4✔
3653
                return health
10✔
3654
        }
3655
        // Only check if JS is enabled, skip meta and asset check.
3656
        if opts.JSEnabledOnly || opts.JSEnabled {
3,847✔
3657
                return health
5✔
3658
        }
5✔
3659

3660
        // Clustered JetStream
3661
        js.mu.RLock()
3,837✔
3662
        cc := js.cluster
3,837✔
3663
        js.mu.RUnlock()
3,837✔
3664

3,837✔
3665
        // Currently single server we make sure the streams were recovered.
3,837✔
3666
        if cc == nil {
3,854✔
3667
                sdir := js.config.StoreDir
17✔
3668
                // Whip through account folders and pull each stream name.
17✔
3669
                fis, _ := os.ReadDir(sdir)
17✔
3670
                var accFound, streamFound, consumerFound bool
17✔
3671
                for _, fi := range fis {
35✔
3672
                        if fi.Name() == snapStagingDir {
19✔
3673
                                continue
1✔
3674
                        }
3675
                        if opts.Account != _EMPTY_ {
26✔
3676
                                if fi.Name() != opts.Account {
11✔
3677
                                        continue
2✔
3678
                                }
3679
                                accFound = true
7✔
3680
                        }
3681
                        acc, err := s.LookupAccount(fi.Name())
15✔
3682
                        if err != nil {
15✔
3683
                                if !details {
×
3684
                                        health.Status = na
×
3685
                                        health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name())
×
3686
                                        return health
×
3687
                                }
×
3688
                                health.Errors = append(health.Errors, HealthzError{
×
3689
                                        Type:    HealthzErrorAccount,
×
3690
                                        Account: fi.Name(),
×
3691
                                        Error:   fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()),
×
3692
                                })
×
3693
                                continue
×
3694
                        }
3695
                        sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams"))
15✔
3696
                        for _, sfi := range sfis {
32✔
3697
                                if opts.Stream != _EMPTY_ {
23✔
3698
                                        if sfi.Name() != opts.Stream {
8✔
3699
                                                continue
2✔
3700
                                        }
3701
                                        streamFound = true
4✔
3702
                                }
3703
                                stream := sfi.Name()
15✔
3704
                                s, err := acc.lookupStream(stream)
15✔
3705
                                if err != nil {
16✔
3706
                                        if !details {
2✔
3707
                                                health.Status = na
1✔
3708
                                                health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream)
1✔
3709
                                                return health
1✔
3710
                                        }
1✔
3711
                                        health.Errors = append(health.Errors, HealthzError{
×
3712
                                                Type:    HealthzErrorStream,
×
3713
                                                Account: acc.Name,
×
3714
                                                Stream:  stream,
×
3715
                                                Error:   fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream),
×
3716
                                        })
×
3717
                                        continue
×
3718
                                }
3719
                                if streamWerr := s.getWriteErr(); streamWerr != nil {
14✔
3720
                                        if !details {
×
3721
                                                health.Status = na
×
3722
                                                health.Error = fmt.Sprintf("JetStream stream '%s > %s' write error: %v", acc, stream, streamWerr)
×
3723
                                                return health
×
3724
                                        }
×
3725
                                        health.Errors = append(health.Errors, HealthzError{
×
3726
                                                Type:    HealthzErrorStream,
×
3727
                                                Account: acc.Name,
×
3728
                                                Stream:  stream,
×
3729
                                                Error:   fmt.Sprintf("JetStream stream '%s > %s' write error: %v", acc, stream, streamWerr),
×
3730
                                        })
×
3731
                                        continue
×
3732
                                }
3733
                                if streamFound {
18✔
3734
                                        // if consumer option is passed, verify that the consumer exists on stream
4✔
3735
                                        if opts.Consumer != _EMPTY_ {
7✔
3736
                                                for _, cons := range s.consumers {
6✔
3737
                                                        if cons.name == opts.Consumer {
4✔
3738
                                                                consumerFound = true
1✔
3739
                                                                break
1✔
3740
                                                        }
3741
                                                }
3742
                                        }
3743
                                        break
4✔
3744
                                }
3745
                        }
3746
                        if accFound {
21✔
3747
                                break
7✔
3748
                        }
3749
                }
3750
                if opts.Account != _EMPTY_ && !accFound {
18✔
3751
                        health.StatusCode = http.StatusNotFound
2✔
3752
                        if !details {
3✔
3753
                                health.Status = na
1✔
3754
                                health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
1✔
3755
                        } else {
2✔
3756
                                health.Errors = []HealthzError{
1✔
3757
                                        {
1✔
3758
                                                Type:    HealthzErrorAccount,
1✔
3759
                                                Account: opts.Account,
1✔
3760
                                                Error:   fmt.Sprintf("JetStream account %q not found", opts.Account),
1✔
3761
                                        },
1✔
3762
                                }
1✔
3763
                        }
1✔
3764
                        return health
2✔
3765
                }
3766
                if opts.Stream != _EMPTY_ && !streamFound {
16✔
3767
                        health.StatusCode = http.StatusNotFound
2✔
3768
                        if !details {
3✔
3769
                                health.Status = na
1✔
3770
                                health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
1✔
3771
                        } else {
2✔
3772
                                health.Errors = []HealthzError{
1✔
3773
                                        {
1✔
3774
                                                Type:    HealthzErrorStream,
1✔
3775
                                                Account: opts.Account,
1✔
3776
                                                Stream:  opts.Stream,
1✔
3777
                                                Error:   fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
1✔
3778
                                        },
1✔
3779
                                }
1✔
3780
                        }
1✔
3781
                        return health
2✔
3782
                }
3783
                if opts.Consumer != _EMPTY_ && !consumerFound {
14✔
3784
                        health.StatusCode = http.StatusNotFound
2✔
3785
                        if !details {
3✔
3786
                                health.Status = na
1✔
3787
                                health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
1✔
3788
                        } else {
2✔
3789
                                health.Errors = []HealthzError{
1✔
3790
                                        {
1✔
3791
                                                Type:     HealthzErrorConsumer,
1✔
3792
                                                Account:  opts.Account,
1✔
3793
                                                Stream:   opts.Stream,
1✔
3794
                                                Consumer: opts.Consumer,
1✔
3795
                                                Error:    fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
1✔
3796
                                        },
1✔
3797
                                }
1✔
3798
                        }
1✔
3799
                }
3800
                return health
12✔
3801
        }
3802

3803
        // If we are here we want to check for any assets assigned to us.
3804
        var meta RaftNode
3,820✔
3805
        js.mu.RLock()
3,820✔
3806
        meta = cc.meta
3,820✔
3807
        js.mu.RUnlock()
3,820✔
3808

3,820✔
3809
        // Check meta layer health.
3,820✔
3810
        var metaNoLeader, metaClosed, metaUnhealthy bool
3,820✔
3811
        var metaWerr error
3,820✔
3812
        if meta != nil {
7,640✔
3813
                metaNoLeader = meta.GroupLeader() == _EMPTY_
3,820✔
3814
                metaClosed = meta.State() == Closed
3,820✔
3815
                metaUnhealthy = !meta.Healthy()
3,820✔
3816
                metaWerr = meta.GetWriteErr()
3,820✔
3817
        }
3,820✔
3818
        metaRecovering := js.isMetaRecovering()
3,820✔
3819
        if meta == nil || metaNoLeader || metaClosed || metaUnhealthy || metaWerr != nil || metaRecovering {
7,265✔
3820
                var desc string
3,445✔
3821
                if metaWerr != nil {
3,445✔
3822
                        desc = fmt.Sprintf("JetStream meta layer write error: %v", metaWerr)
×
3823
                } else if metaClosed {
3,445✔
3824
                        desc = "JetStream meta layer is not running"
×
3825
                } else if meta != nil && metaRecovering {
3,477✔
3826
                        desc = "JetStream is still recovering meta layer"
32✔
3827
                } else if meta == nil || metaNoLeader {
6,647✔
3828
                        desc = "JetStream has not established contact with a meta leader"
3,202✔
3829
                } else {
3,413✔
3830
                        desc = "JetStream is not current with the meta leader"
211✔
3831
                }
211✔
3832
                if !details {
6,890✔
3833
                        health.Status = na
3,445✔
3834
                        health.Error = desc
3,445✔
3835
                } else {
3,445✔
3836
                        health.Errors = []HealthzError{
×
3837
                                {
×
3838
                                        Type:  HealthzErrorJetStream,
×
3839
                                        Error: desc,
×
3840
                                },
×
3841
                        }
×
3842
                }
×
3843
                return health
3,445✔
3844
        }
3845

3846
        // Skips doing full healthz and only checks the meta leader.
3847
        if opts.JSMetaOnly {
377✔
3848
                return health
2✔
3849
        }
2✔
3850

3851
        // Range across all accounts, the streams assigned to them, and the consumers.
3852
        // If they are assigned to this server check their status.
3853
        ourID := meta.ID()
373✔
3854

373✔
3855
        // Copy the meta layer so we do not need to hold the js read lock for an extended period of time.
373✔
3856
        var streams map[string]map[string]*streamAssignment
373✔
3857
        js.mu.RLock()
373✔
3858
        if opts.Account == _EMPTY_ {
704✔
3859
                // Collect all relevant streams and consumers.
331✔
3860
                streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
331✔
3861
                for acc, asa := range cc.streams {
655✔
3862
                        nasa := make(map[string]*streamAssignment)
324✔
3863
                        for stream, sa := range asa {
2,989✔
3864
                                // If we are a member and we are not being restored, select for check.
2,665✔
3865
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
5,301✔
3866
                                        csa := sa.copyGroup()
2,636✔
3867
                                        csa.consumers = make(map[string]*consumerAssignment)
2,636✔
3868
                                        for consumer, ca := range sa.consumers {
2,823✔
3869
                                                if ca.Group.isMember(ourID) {
367✔
3870
                                                        // Use original here. Not a copy.
180✔
3871
                                                        csa.consumers[consumer] = ca
180✔
3872
                                                }
180✔
3873
                                        }
3874
                                        nasa[stream] = csa
2,636✔
3875
                                }
3876
                        }
3877
                        streams[acc] = nasa
324✔
3878
                }
3879
        } else {
42✔
3880
                streams = make(map[string]map[string]*streamAssignment, 1)
42✔
3881
                asa, ok := cc.streams[opts.Account]
42✔
3882
                if !ok {
50✔
3883
                        health.StatusCode = http.StatusNotFound
8✔
3884
                        if !details {
12✔
3885
                                health.Status = na
4✔
3886
                                health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
4✔
3887
                        } else {
8✔
3888
                                health.Errors = []HealthzError{
4✔
3889
                                        {
4✔
3890
                                                Type:    HealthzErrorAccount,
4✔
3891
                                                Account: opts.Account,
4✔
3892
                                                Error:   fmt.Sprintf("JetStream account %q not found", opts.Account),
4✔
3893
                                        },
4✔
3894
                                }
4✔
3895
                        }
4✔
3896
                        js.mu.RUnlock()
8✔
3897
                        return health
8✔
3898
                }
3899
                nasa := make(map[string]*streamAssignment)
34✔
3900
                if opts.Stream != _EMPTY_ {
64✔
3901
                        sa, ok := asa[opts.Stream]
30✔
3902
                        if !ok || !sa.Group.isMember(ourID) {
40✔
3903
                                health.StatusCode = http.StatusNotFound
10✔
3904
                                if !details {
16✔
3905
                                        health.Status = na
6✔
3906
                                        health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
6✔
3907
                                } else {
10✔
3908
                                        health.Errors = []HealthzError{
4✔
3909
                                                {
4✔
3910
                                                        Type:    HealthzErrorStream,
4✔
3911
                                                        Account: opts.Account,
4✔
3912
                                                        Stream:  opts.Stream,
4✔
3913
                                                        Error:   fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
4✔
3914
                                                },
4✔
3915
                                        }
4✔
3916
                                }
4✔
3917
                                js.mu.RUnlock()
10✔
3918
                                return health
10✔
3919
                        }
3920
                        csa := sa.copyGroup()
20✔
3921
                        csa.consumers = make(map[string]*consumerAssignment)
20✔
3922
                        var consumerFound bool
20✔
3923
                        for consumer, ca := range sa.consumers {
40✔
3924
                                if opts.Consumer != _EMPTY_ {
35✔
3925
                                        if consumer != opts.Consumer || !ca.Group.isMember(ourID) {
25✔
3926
                                                continue
10✔
3927
                                        }
3928
                                        consumerFound = true
5✔
3929
                                }
3930
                                // If we are a member and we are not being restored, select for check.
3931
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
20✔
3932
                                        csa.consumers[consumer] = ca
10✔
3933
                                }
10✔
3934
                                if consumerFound {
15✔
3935
                                        break
5✔
3936
                                }
3937
                        }
3938
                        if opts.Consumer != _EMPTY_ && !consumerFound {
30✔
3939
                                health.StatusCode = http.StatusNotFound
10✔
3940
                                if !details {
16✔
3941
                                        health.Status = na
6✔
3942
                                        health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
6✔
3943
                                } else {
10✔
3944
                                        health.Errors = []HealthzError{
4✔
3945
                                                {
4✔
3946
                                                        Type:     HealthzErrorConsumer,
4✔
3947
                                                        Account:  opts.Account,
4✔
3948
                                                        Stream:   opts.Stream,
4✔
3949
                                                        Consumer: opts.Consumer,
4✔
3950
                                                        Error:    fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
4✔
3951
                                                },
4✔
3952
                                        }
4✔
3953
                                }
4✔
3954
                                js.mu.RUnlock()
10✔
3955
                                return health
10✔
3956
                        }
3957
                        nasa[opts.Stream] = csa
10✔
3958
                } else {
4✔
3959
                        for stream, sa := range asa {
8✔
3960
                                // If we are a member and we are not being restored, select for check.
4✔
3961
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
8✔
3962
                                        csa := sa.copyGroup()
4✔
3963
                                        csa.consumers = make(map[string]*consumerAssignment)
4✔
3964
                                        for consumer, ca := range sa.consumers {
8✔
3965
                                                if ca.Group.isMember(ourID) {
8✔
3966
                                                        csa.consumers[consumer] = ca
4✔
3967
                                                }
4✔
3968
                                        }
3969
                                        nasa[stream] = csa
4✔
3970
                                }
3971
                        }
3972
                }
3973
                streams[opts.Account] = nasa
14✔
3974
        }
3975
        js.mu.RUnlock()
345✔
3976

345✔
3977
        // Use our copy to traverse so we do not need to hold the js lock.
345✔
3978
        for accName, asa := range streams {
683✔
3979
                acc, err := s.LookupAccount(accName)
338✔
3980
                if err != nil && len(asa) > 0 {
338✔
3981
                        if !details {
×
3982
                                health.Status = na
×
3983
                                health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
×
3984
                                return health
×
3985
                        }
×
3986
                        health.Errors = append(health.Errors, HealthzError{
×
3987
                                Type:    HealthzErrorAccount,
×
3988
                                Account: accName,
×
3989
                                Error:   fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err),
×
3990
                        })
×
3991
                        continue
×
3992
                }
3993

3994
                for stream, sa := range asa {
1,614✔
3995
                        if sa != nil && sa.unsupported != nil {
1,288✔
3996
                                continue
12✔
3997
                        }
3998
                        // Make sure we can look up
3999
                        if err := js.isStreamHealthy(acc, sa); err != nil {
1,472✔
4000
                                if !details {
416✔
4001
                                        health.Status = na
208✔
4002
                                        health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err)
208✔
4003
                                        return health
208✔
4004
                                }
208✔
4005
                                health.Errors = append(health.Errors, HealthzError{
×
4006
                                        Type:    HealthzErrorStream,
×
4007
                                        Account: accName,
×
4008
                                        Stream:  stream,
×
4009
                                        Error:   fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err),
×
4010
                                })
×
4011
                                continue
×
4012
                        }
4013
                        mset, _ := acc.lookupStream(stream)
1,056✔
4014
                        // Now check consumers.
1,056✔
4015
                        for consumer, ca := range sa.consumers {
1,125✔
4016
                                if err := js.isConsumerHealthy(mset, consumer, ca); err != nil {
72✔
4017
                                        if !details {
6✔
4018
                                                health.Status = na
3✔
4019
                                                health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err)
3✔
4020
                                                return health
3✔
4021
                                        }
3✔
4022
                                        health.Errors = append(health.Errors, HealthzError{
×
4023
                                                Type:     HealthzErrorConsumer,
×
4024
                                                Account:  accName,
×
4025
                                                Stream:   stream,
×
4026
                                                Consumer: consumer,
×
4027
                                                Error:    fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err),
×
4028
                                        })
×
4029
                                }
4030
                        }
4031
                }
4032
        }
4033
        // Success.
4034
        return health
134✔
4035
}
4036

4037
// Healthz returns the health status of the server.
4038
func (s *Server) Healthz(opts *HealthzOptions) *HealthStatus {
10✔
4039
        return s.healthz(opts)
10✔
4040
}
10✔
4041

4042
type ExpvarzStatus struct {
4043
        Memstats json.RawMessage `json:"memstats"`
4044
        Cmdline  json.RawMessage `json:"cmdline"`
4045
}
4046

4047
func (s *Server) expvarz(_ *ExpvarzEventOptions) *ExpvarzStatus {
2✔
4048
        var stat ExpvarzStatus
2✔
4049

2✔
4050
        const memStatsKey = "memstats"
2✔
4051
        const cmdLineKey = "cmdline"
2✔
4052

2✔
4053
        expvar.Do(func(v expvar.KeyValue) {
6✔
4054
                switch v.Key {
4✔
4055
                case memStatsKey:
2✔
4056
                        stat.Memstats = json.RawMessage(v.Value.String())
2✔
4057

4058
                case cmdLineKey:
2✔
4059
                        stat.Cmdline = json.RawMessage(v.Value.String())
2✔
4060
                }
4061
        })
4062

4063
        return &stat
2✔
4064
}
4065

4066
type ProfilezStatus struct {
4067
        Profile []byte `json:"profile"`
4068
        Error   string `json:"error"`
4069
}
4070

4071
func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus {
12✔
4072
        var buffer bytes.Buffer
12✔
4073
        switch opts.Name {
12✔
4074
        case _EMPTY_:
×
4075
                return &ProfilezStatus{
×
4076
                        Error: "Profile name not specified",
×
4077
                }
×
4078
        case "cpu":
2✔
4079
                if opts.Duration <= 0 || opts.Duration > 15*time.Second {
2✔
4080
                        return &ProfilezStatus{
×
4081
                                Error: fmt.Sprintf("Duration %s should be between 0s and 15s", opts.Duration),
×
4082
                        }
×
4083
                }
×
4084
                if err := pprof.StartCPUProfile(&buffer); err != nil {
2✔
4085
                        return &ProfilezStatus{
×
4086
                                Error: fmt.Sprintf("Failed to start CPU profile: %s", err),
×
4087
                        }
×
4088
                }
×
4089
                time.Sleep(opts.Duration)
2✔
4090
                pprof.StopCPUProfile()
2✔
4091
        default:
10✔
4092
                profile := pprof.Lookup(opts.Name)
10✔
4093
                if profile == nil {
10✔
4094
                        return &ProfilezStatus{
×
4095
                                Error: fmt.Sprintf("Profile %q not found", opts.Name),
×
4096
                        }
×
4097
                }
×
4098
                if err := profile.WriteTo(&buffer, opts.Debug); err != nil {
10✔
4099
                        return &ProfilezStatus{
×
4100
                                Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err),
×
4101
                        }
×
4102
                }
×
4103
        }
4104
        return &ProfilezStatus{
12✔
4105
                Profile: buffer.Bytes(),
12✔
4106
        }
12✔
4107
}
4108

4109
type RaftzGroup struct {
4110
        ID            string                    `json:"id"`
4111
        State         string                    `json:"state"`
4112
        Size          int                       `json:"size"`
4113
        QuorumNeeded  int                       `json:"quorum_needed"`
4114
        Observer      bool                      `json:"observer,omitempty"`
4115
        Paused        bool                      `json:"paused,omitempty"`
4116
        Overrun       bool                      `json:"overrun,omitempty"`
4117
        OverrunCount  uint64                    `json:"overrun_count,omitempty"`
4118
        Committed     uint64                    `json:"committed"`
4119
        Applied       uint64                    `json:"applied"`
4120
        CatchingUp    bool                      `json:"catching_up,omitempty"`
4121
        Leader        string                    `json:"leader,omitempty"`
4122
        LeaderSince   *time.Time                `json:"leader_since,omitempty"`
4123
        EverHadLeader bool                      `json:"ever_had_leader"`
4124
        Term          uint64                    `json:"term"`
4125
        Vote          string                    `json:"voted_for,omitempty"`
4126
        PTerm         uint64                    `json:"pterm"`
4127
        PIndex        uint64                    `json:"pindex"`
4128
        SystemAcc     bool                      `json:"system_account"`
4129
        TrafficAcc    string                    `json:"traffic_account"`
4130
        IPQPropLen    int                       `json:"ipq_proposal_len"`
4131
        IPQEntryLen   int                       `json:"ipq_entry_len"`
4132
        IPQRespLen    int                       `json:"ipq_resp_len"`
4133
        IPQApplyLen   int                       `json:"ipq_apply_len"`
4134
        WAL           StreamState               `json:"wal"`
4135
        WALError      error                     `json:"wal_error,omitempty"`
4136
        Peers         map[string]RaftzGroupPeer `json:"peers"`
4137
}
4138

4139
type RaftzGroupPeer struct {
4140
        Name                string `json:"name"`
4141
        Known               bool   `json:"known"`
4142
        LastReplicatedIndex uint64 `json:"last_replicated_index,omitempty"`
4143
        LastSeen            string `json:"last_seen,omitempty"`
4144
}
4145

4146
type RaftzStatus map[string]map[string]RaftzGroup
4147

4148
func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
×
4149
        if s.raftNodes == nil {
×
4150
                w.WriteHeader(404)
×
4151
                w.Write([]byte("No Raft nodes registered"))
×
4152
                return
×
4153
        }
×
4154

4155
        groups := s.Raftz(&RaftzOptions{
×
4156
                AccountFilter: r.URL.Query().Get("acc"),
×
4157
                GroupFilter:   r.URL.Query().Get("group"),
×
4158
        })
×
4159

×
4160
        if groups == nil {
×
4161
                w.WriteHeader(404)
×
4162
                w.Write([]byte("No Raft nodes returned, check supplied filters"))
×
4163
                return
×
4164
        }
×
4165

4166
        b, _ := json.MarshalIndent(groups, "", "   ")
×
4167
        ResponseHandler(w, r, b)
×
4168
}
4169

4170
func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
9✔
4171
        afilter, gfilter := opts.AccountFilter, opts.GroupFilter
9✔
4172

9✔
4173
        if afilter == _EMPTY_ {
9✔
4174
                if sys := s.SystemAccount(); sys != nil {
×
4175
                        afilter = sys.Name
×
4176
                } else {
×
4177
                        return nil
×
4178
                }
×
4179
        }
4180

4181
        groups := map[string]RaftNode{}
9✔
4182
        infos := RaftzStatus{} // account -> group ID
9✔
4183

9✔
4184
        s.rnMu.RLock()
9✔
4185
        if gfilter != _EMPTY_ {
9✔
4186
                if rg, ok := s.raftNodes[gfilter]; ok && rg != nil {
×
4187
                        if n, ok := rg.(*raft); ok {
×
4188
                                if n.accName == afilter {
×
4189
                                        groups[gfilter] = rg
×
4190
                                }
×
4191
                        }
4192
                }
4193
        } else {
9✔
4194
                for name, rg := range s.raftNodes {
27✔
4195
                        if rg == nil {
18✔
4196
                                continue
×
4197
                        }
4198
                        if n, ok := rg.(*raft); ok {
36✔
4199
                                if n.accName != afilter {
27✔
4200
                                        continue
9✔
4201
                                }
4202
                                groups[name] = rg
9✔
4203
                        }
4204
                }
4205
        }
4206
        s.rnMu.RUnlock()
9✔
4207

9✔
4208
        for name, rg := range groups {
18✔
4209
                n, ok := rg.(*raft)
9✔
4210
                if n == nil || !ok {
9✔
4211
                        continue
×
4212
                }
4213
                if _, ok := infos[n.accName]; !ok {
18✔
4214
                        infos[n.accName] = map[string]RaftzGroup{}
9✔
4215
                }
9✔
4216
                // Only take the lock once, using the public RaftNode functions would
4217
                // cause us to take and release the locks over and over again.
4218
                n.RLock()
9✔
4219
                info := RaftzGroup{
9✔
4220
                        ID:            n.id,
9✔
4221
                        State:         RaftState(n.state.Load()).String(),
9✔
4222
                        Size:          n.csz,
9✔
4223
                        QuorumNeeded:  n.qn,
9✔
4224
                        Observer:      n.observer,
9✔
4225
                        Paused:        n.paused,
9✔
4226
                        Overrun:       n.quorumPaused || n.isLeaderOverrun(),
9✔
4227
                        OverrunCount:  n.overrunCount,
9✔
4228
                        Committed:     n.commit,
9✔
4229
                        Applied:       n.applied,
9✔
4230
                        CatchingUp:    n.catchup != nil,
9✔
4231
                        Leader:        n.leader,
9✔
4232
                        LeaderSince:   n.leaderSince.Load(),
9✔
4233
                        EverHadLeader: n.pleader.Load(),
9✔
4234
                        Term:          n.term,
9✔
4235
                        Vote:          n.vote,
9✔
4236
                        PTerm:         n.pterm,
9✔
4237
                        PIndex:        n.pindex,
9✔
4238
                        SystemAcc:     n.IsSystemAccount(),
9✔
4239
                        TrafficAcc:    n.acc.GetName(),
9✔
4240
                        IPQPropLen:    n.prop.len(),
9✔
4241
                        IPQEntryLen:   n.entry.len(),
9✔
4242
                        IPQRespLen:    n.resp.len(),
9✔
4243
                        IPQApplyLen:   n.apply.len(),
9✔
4244
                        WALError:      n.werr,
9✔
4245
                        Peers:         map[string]RaftzGroupPeer{},
9✔
4246
                }
9✔
4247
                n.wal.FastState(&info.WAL)
9✔
4248
                for id, p := range n.peers {
36✔
4249
                        if id == n.id {
36✔
4250
                                continue
9✔
4251
                        }
4252
                        peer := RaftzGroupPeer{
18✔
4253
                                Name:                s.serverNameForNode(id),
18✔
4254
                                Known:               p.kp,
18✔
4255
                                LastReplicatedIndex: p.li,
18✔
4256
                        }
18✔
4257
                        if !p.ts.IsZero() {
30✔
4258
                                peer.LastSeen = time.Since(p.ts).String()
12✔
4259
                        }
12✔
4260
                        info.Peers[id] = peer
18✔
4261
                }
4262
                n.RUnlock()
9✔
4263
                infos[n.accName][name] = info
9✔
4264
        }
4265

4266
        return &infos
9✔
4267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc