• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 20326380621

17 Dec 2025 03:32PM UTC coverage: 84.522% (-0.05%) from 84.574%
20326380621

push

github

web-flow
NRG: Fix single node election (#7642)

This commit fixes single node election: previously, a single node would
simply store its vote, and never check if it already reached a majority.
So it would never transition to the leader state.

Signed-off-by: Daniele Sciascia <daniele@nats.io>

73985 of 87533 relevant lines covered (84.52%)

339454.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.13
/server/monitor.go
1
// Copyright 2013-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "cmp"
19
        "crypto/sha256"
20
        "crypto/tls"
21
        "crypto/x509"
22
        "encoding/hex"
23
        "encoding/json"
24
        "expvar"
25
        "fmt"
26
        "maps"
27
        "math"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path/filepath"
33
        "runtime"
34
        "runtime/debug"
35
        "runtime/pprof"
36
        "slices"
37
        "sort"
38
        "strconv"
39
        "strings"
40
        "sync/atomic"
41
        "time"
42

43
        "github.com/nats-io/jwt/v2"
44
        "github.com/nats-io/nats-server/v2/server/pse"
45
)
46

47
// Connz represents detailed information on current client connections.
48
type Connz struct {
49
        ID       string      `json:"server_id"`
50
        Now      time.Time   `json:"now"`
51
        NumConns int         `json:"num_connections"`
52
        Total    int         `json:"total"`
53
        Offset   int         `json:"offset"`
54
        Limit    int         `json:"limit"`
55
        Conns    []*ConnInfo `json:"connections"`
56
}
57

58
// ConnzOptions are the options passed to Connz()
59
type ConnzOptions struct {
60
        // Sort indicates how the results will be sorted. Check SortOpt for possible values.
61
        // Only the sort by connection ID (ByCid) is ascending, all others are descending.
62
        Sort SortOpt `json:"sort"`
63

64
        // Username indicates if user names should be included in the results.
65
        Username bool `json:"auth"`
66

67
        // Subscriptions indicates if subscriptions should be included in the results.
68
        Subscriptions bool `json:"subscriptions"`
69

70
        // SubscriptionsDetail indicates if subscription details should be included in the results
71
        SubscriptionsDetail bool `json:"subscriptions_detail"`
72

73
        // Offset is used for pagination. Connz() only returns connections starting at this
74
        // offset from the global results.
75
        Offset int `json:"offset"`
76

77
        // Limit is the maximum number of connections that should be returned by Connz().
78
        Limit int `json:"limit"`
79

80
        // Filter for this explicit client connection.
81
        CID uint64 `json:"cid"`
82

83
        // Filter for this explicit client connection based on the MQTT client ID
84
        MQTTClient string `json:"mqtt_client"`
85

86
        // Filter by connection state.
87
        State ConnState `json:"state"`
88

89
        // The below options only apply if auth is true.
90

91
        // Filter by username.
92
        User string `json:"user"`
93

94
        // Filter by account.
95
        Account string `json:"acc"`
96

97
        // Filter by subject interest
98
        FilterSubject string `json:"filter_subject"`
99
}
100

101
// ConnState is for filtering states of connections. We will only have two, open and closed.
102
type ConnState int
103

104
const (
105
        // ConnOpen filters on open clients.
106
        ConnOpen = ConnState(iota)
107
        // ConnClosed filters on closed clients.
108
        ConnClosed
109
        // ConnAll returns all clients.
110
        ConnAll
111
)
112

113
// ConnInfo has detailed information on a per connection basis.
114
type ConnInfo struct {
115
        Cid            uint64         `json:"cid"`
116
        Kind           string         `json:"kind,omitempty"`
117
        Type           string         `json:"type,omitempty"`
118
        IP             string         `json:"ip"`
119
        Port           int            `json:"port"`
120
        Start          time.Time      `json:"start"`
121
        LastActivity   time.Time      `json:"last_activity"`
122
        Stop           *time.Time     `json:"stop,omitempty"`
123
        Reason         string         `json:"reason,omitempty"`
124
        RTT            string         `json:"rtt,omitempty"`
125
        Uptime         string         `json:"uptime"`
126
        Idle           string         `json:"idle"`
127
        Pending        int            `json:"pending_bytes"`
128
        InMsgs         int64          `json:"in_msgs"`
129
        OutMsgs        int64          `json:"out_msgs"`
130
        InBytes        int64          `json:"in_bytes"`
131
        OutBytes       int64          `json:"out_bytes"`
132
        Stalls         int64          `json:"stalls,omitempty"`
133
        NumSubs        uint32         `json:"subscriptions"`
134
        Name           string         `json:"name,omitempty"`
135
        Lang           string         `json:"lang,omitempty"`
136
        Version        string         `json:"version,omitempty"`
137
        TLSVersion     string         `json:"tls_version,omitempty"`
138
        TLSCipher      string         `json:"tls_cipher_suite,omitempty"`
139
        TLSPeerCerts   []*TLSPeerCert `json:"tls_peer_certs,omitempty"`
140
        TLSFirst       bool           `json:"tls_first,omitempty"`
141
        AuthorizedUser string         `json:"authorized_user,omitempty"`
142
        Account        string         `json:"account,omitempty"`
143
        Subs           []string       `json:"subscriptions_list,omitempty"`
144
        SubsDetail     []SubDetail    `json:"subscriptions_list_detail,omitempty"`
145
        JWT            string         `json:"jwt,omitempty"`
146
        IssuerKey      string         `json:"issuer_key,omitempty"`
147
        NameTag        string         `json:"name_tag,omitempty"`
148
        Tags           jwt.TagList    `json:"tags,omitempty"`
149
        MQTTClient     string         `json:"mqtt_client,omitempty"` // This is the MQTT client id
150
        Proxy          *ProxyInfo     `json:"proxy,omitempty"`
151

152
        // Internal
153
        rtt int64 // For fast sorting
154
}
155

156
// ProxyInfo represents the information about this proxied connection.
157
type ProxyInfo struct {
158
        Key string `json:"key"`
159
}
160

161
// TLSPeerCert contains basic information about a TLS peer certificate
162
type TLSPeerCert struct {
163
        Subject          string `json:"subject,omitempty"`
164
        SubjectPKISha256 string `json:"spki_sha256,omitempty"`
165
        CertSha256       string `json:"cert_sha256,omitempty"`
166
}
167

168
// DefaultConnListSize is the default size of the connection list.
169
const DefaultConnListSize = 1024
170

171
// DefaultSubListSize is the default size of the subscriptions list.
172
const DefaultSubListSize = 1024
173

174
const defaultStackBufSize = 10000
175

176
func newSubsDetailList(client *client) []SubDetail {
6✔
177
        subsDetail := make([]SubDetail, 0, len(client.subs))
6✔
178
        for _, sub := range client.subs {
131✔
179
                subsDetail = append(subsDetail, newClientSubDetail(sub))
125✔
180
        }
125✔
181
        return subsDetail
6✔
182
}
183

184
func newSubsList(client *client) []string {
15✔
185
        subs := make([]string, 0, len(client.subs))
15✔
186
        for _, sub := range client.subs {
162✔
187
                subs = append(subs, string(sub.subject))
147✔
188
        }
147✔
189
        return subs
15✔
190
}
191

192
// Connz returns a Connz struct containing information about connections.
193
func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
1,447✔
194
        var (
1,447✔
195
                sortOpt = ByCid
1,447✔
196
                auth    bool
1,447✔
197
                subs    bool
1,447✔
198
                subsDet bool
1,447✔
199
                offset  int
1,447✔
200
                limit   = DefaultConnListSize
1,447✔
201
                cid     = uint64(0)
1,447✔
202
                state   = ConnOpen
1,447✔
203
                user    string
1,447✔
204
                acc     string
1,447✔
205
                a       *Account
1,447✔
206
                filter  string
1,447✔
207
                mqttCID string
1,447✔
208
        )
1,447✔
209

1,447✔
210
        if opts != nil {
2,828✔
211
                // If no sort option given or sort is by uptime, then sort by cid
1,381✔
212
                if opts.Sort != _EMPTY_ {
1,415✔
213
                        sortOpt = opts.Sort
34✔
214
                        if !sortOpt.IsValid() {
36✔
215
                                return nil, fmt.Errorf("invalid sorting option: %s", sortOpt)
2✔
216
                        }
2✔
217
                }
218

219
                // Auth specifics.
220
                auth = opts.Username
1,379✔
221
                user = opts.User
1,379✔
222
                acc = opts.Account
1,379✔
223
                mqttCID = opts.MQTTClient
1,379✔
224

1,379✔
225
                subs = opts.Subscriptions
1,379✔
226
                subsDet = opts.SubscriptionsDetail
1,379✔
227
                offset = opts.Offset
1,379✔
228
                if offset < 0 {
1,381✔
229
                        offset = 0
2✔
230
                }
2✔
231
                limit = opts.Limit
1,379✔
232
                if limit <= 0 {
2,749✔
233
                        limit = DefaultConnListSize
1,370✔
234
                }
1,370✔
235
                // state
236
                state = opts.State
1,379✔
237

1,379✔
238
                // ByStop only makes sense on closed connections
1,379✔
239
                if sortOpt == ByStop && state != ConnClosed {
1,380✔
240
                        return nil, fmt.Errorf("sort by stop only valid on closed connections")
1✔
241
                }
1✔
242
                // ByReason is the same.
243
                if sortOpt == ByReason && state != ConnClosed {
1,379✔
244
                        return nil, fmt.Errorf("sort by reason only valid on closed connections")
1✔
245
                }
1✔
246
                // If searching by CID
247
                if opts.CID > 0 {
1,407✔
248
                        cid = opts.CID
30✔
249
                        limit = 1
30✔
250
                }
30✔
251
                // If filtering by subject.
252
                if opts.FilterSubject != _EMPTY_ && opts.FilterSubject != fwcs {
1,377✔
253
                        if acc == _EMPTY_ {
×
254
                                return nil, fmt.Errorf("filter by subject only valid with account filtering")
×
255
                        }
×
256
                        filter = opts.FilterSubject
×
257
                }
258
        }
259

260
        c := &Connz{
1,443✔
261
                Offset: offset,
1,443✔
262
                Limit:  limit,
1,443✔
263
                Now:    time.Now().UTC(),
1,443✔
264
        }
1,443✔
265

1,443✔
266
        // Open clients
1,443✔
267
        var openClients []*client
1,443✔
268
        // Hold for closed clients if requested.
1,443✔
269
        var closedClients []*closedClient
1,443✔
270

1,443✔
271
        var clist map[uint64]*client
1,443✔
272

1,443✔
273
        if acc != _EMPTY_ {
1,470✔
274
                var err error
27✔
275
                a, err = s.lookupAccount(acc)
27✔
276
                if err != nil {
28✔
277
                        return c, nil
1✔
278
                }
1✔
279
                a.mu.RLock()
26✔
280
                clist = make(map[uint64]*client, a.numLocalConnections())
26✔
281
                for c := range a.clients {
54✔
282
                        if c.kind == CLIENT || c.kind == LEAF {
52✔
283
                                clist[c.cid] = c
24✔
284
                        }
24✔
285
                }
286
                a.mu.RUnlock()
26✔
287
        }
288

289
        // Walk the open client list with server lock held.
290
        s.mu.RLock()
1,442✔
291
        // Default to all client unless filled in above.
1,442✔
292
        if clist == nil {
2,858✔
293
                clist = make(map[uint64]*client, len(s.clients)+len(s.leafs))
1,416✔
294
                maps.Copy(clist, s.clients)
1,416✔
295
                maps.Copy(clist, s.leafs)
1,416✔
296
        }
1,416✔
297

298
        // copy the server id for monitoring
299
        c.ID = s.info.ID
1,442✔
300

1,442✔
301
        // Number of total clients. The resulting ConnInfo array
1,442✔
302
        // may be smaller if pagination is used.
1,442✔
303
        switch state {
1,442✔
304
        case ConnOpen:
353✔
305
                c.Total = len(clist)
353✔
306
        case ConnClosed:
1,083✔
307
                closedClients = s.closed.closedClients()
1,083✔
308
                c.Total = len(closedClients)
1,083✔
309
        case ConnAll:
6✔
310
                c.Total = len(clist)
6✔
311
                closedClients = s.closed.closedClients()
6✔
312
                c.Total += len(closedClients)
6✔
313
        }
314

315
        // We may need to filter these connections.
316
        if acc != _EMPTY_ && len(closedClients) > 0 {
1,445✔
317
                var ccc []*closedClient
3✔
318
                for _, cc := range closedClients {
21✔
319
                        if cc.acc != acc {
30✔
320
                                continue
12✔
321
                        }
322
                        ccc = append(ccc, cc)
6✔
323
                }
324
                c.Total -= (len(closedClients) - len(ccc))
3✔
325
                closedClients = ccc
3✔
326
        }
327

328
        totalClients := c.Total
1,442✔
329
        if cid > 0 { // Meaning we only want 1.
1,472✔
330
                totalClients = 1
30✔
331
        }
30✔
332
        if state == ConnOpen || state == ConnAll {
1,801✔
333
                openClients = make([]*client, 0, totalClients)
359✔
334
        }
359✔
335

336
        // Data structures for results.
337
        var conns []ConnInfo // Limits allocs for actual ConnInfos.
1,442✔
338
        var pconns ConnInfos
1,442✔
339

1,442✔
340
        switch state {
1,442✔
341
        case ConnOpen:
353✔
342
                conns = make([]ConnInfo, totalClients)
353✔
343
                pconns = make(ConnInfos, totalClients)
353✔
344
        case ConnClosed:
1,083✔
345
                pconns = make(ConnInfos, totalClients)
1,083✔
346
        case ConnAll:
6✔
347
                conns = make([]ConnInfo, cap(openClients))
6✔
348
                pconns = make(ConnInfos, totalClients)
6✔
349
        }
350

351
        // Search by individual CID.
352
        if cid > 0 {
1,472✔
353
                // Let's first check if user also selects on ConnOpen or ConnAll
30✔
354
                // and look for opened connections.
30✔
355
                if state == ConnOpen || state == ConnAll {
42✔
356
                        if client := s.clients[cid]; client != nil {
18✔
357
                                openClients = append(openClients, client)
6✔
358
                                closedClients = nil
6✔
359
                        }
6✔
360
                }
361
                // If we did not find, and the user selected for ConnClosed or ConnAll,
362
                // look for closed connections.
363
                if len(openClients) == 0 && (state == ConnClosed || state == ConnAll) {
50✔
364
                        copyClosed := closedClients
20✔
365
                        closedClients = nil
20✔
366
                        for _, cc := range copyClosed {
105✔
367
                                if cc.Cid == cid {
102✔
368
                                        closedClients = []*closedClient{cc}
17✔
369
                                        break
17✔
370
                                }
371
                        }
372
                }
373
        } else {
1,412✔
374
                // Gather all open clients.
1,412✔
375
                if state == ConnOpen || state == ConnAll {
1,759✔
376
                        for _, client := range clist {
915✔
377
                                // If we have an account specified we need to filter.
568✔
378
                                if acc != _EMPTY_ && (client.acc == nil || client.acc.Name != acc) {
568✔
379
                                        continue
×
380
                                }
381
                                // Do user filtering second
382
                                if user != _EMPTY_ && client.getRawAuthUserLock() != user {
613✔
383
                                        continue
45✔
384
                                }
385
                                // Do mqtt client ID filtering next
386
                                if mqttCID != _EMPTY_ && client.getMQTTClientID() != mqttCID {
537✔
387
                                        continue
14✔
388
                                }
389
                                openClients = append(openClients, client)
509✔
390
                        }
391
                }
392
        }
393
        s.mu.RUnlock()
1,442✔
394

1,442✔
395
        // Filter by subject now if needed. We do this outside of server lock.
1,442✔
396
        if filter != _EMPTY_ {
1,442✔
397
                var oc []*client
×
398
                for _, c := range openClients {
×
399
                        c.mu.Lock()
×
400
                        for _, sub := range c.subs {
×
401
                                if SubjectsCollide(filter, string(sub.subject)) {
×
402
                                        oc = append(oc, c)
×
403
                                        break
×
404
                                }
405
                        }
406
                        c.mu.Unlock()
×
407
                        openClients = oc
×
408
                }
409
        }
410

411
        // Just return with empty array if nothing here.
412
        if len(openClients) == 0 && len(closedClients) == 0 {
1,596✔
413
                c.Conns = ConnInfos{}
154✔
414
                return c, nil
154✔
415
        }
154✔
416

417
        // Now whip through and generate ConnInfo entries
418
        // Open Clients
419
        i := 0
1,288✔
420
        for _, client := range openClients {
1,803✔
421
                client.mu.Lock()
515✔
422
                ci := &conns[i]
515✔
423
                ci.fill(client, client.nc, c.Now, auth)
515✔
424
                // Fill in subscription data if requested.
515✔
425
                if len(client.subs) > 0 {
790✔
426
                        if subsDet {
277✔
427
                                ci.SubsDetail = newSubsDetailList(client)
2✔
428
                        } else if subs {
282✔
429
                                ci.Subs = newSubsList(client)
7✔
430
                        }
7✔
431
                }
432
                // Fill in user if auth requested.
433
                if auth {
559✔
434
                        ci.AuthorizedUser = client.getRawAuthUser()
44✔
435
                        if name := client.acc.GetName(); name != globalAccountName {
79✔
436
                                ci.Account = name
35✔
437
                        }
35✔
438
                        ci.JWT = client.opts.JWT
44✔
439
                        ci.IssuerKey = issuerForClient(client)
44✔
440
                        ci.Tags = client.tags
44✔
441
                        ci.NameTag = client.acc.getNameTag()
44✔
442
                }
443
                client.mu.Unlock()
515✔
444
                pconns[i] = ci
515✔
445
                i++
515✔
446
        }
447
        // Closed Clients
448
        var needCopy bool
1,288✔
449
        if subs || auth {
1,854✔
450
                needCopy = true
566✔
451
        }
566✔
452
        for _, cc := range closedClients {
104,774✔
453
                // If we have an account specified we need to filter.
103,486✔
454
                if acc != _EMPTY_ && cc.acc != acc {
103,486✔
455
                        continue
×
456
                }
457
                // Do user filtering second
458
                if user != _EMPTY_ && cc.user != user {
103,521✔
459
                        continue
35✔
460
                }
461
                // Do mqtt client ID filtering next
462
                if mqttCID != _EMPTY_ && cc.MQTTClient != mqttCID {
103,461✔
463
                        continue
10✔
464
                }
465
                // Copy if needed for any changes to the ConnInfo
466
                if needCopy {
154,957✔
467
                        cx := *cc
51,516✔
468
                        cc = &cx
51,516✔
469
                }
51,516✔
470
                // Fill in subscription data if requested.
471
                if len(cc.subs) > 0 {
103,491✔
472
                        if subsDet {
52✔
473
                                cc.SubsDetail = cc.subs
2✔
474
                        } else if subs {
52✔
475
                                cc.Subs = make([]string, 0, len(cc.subs))
2✔
476
                                for _, sub := range cc.subs {
4✔
477
                                        cc.Subs = append(cc.Subs, sub.Subject)
2✔
478
                                }
2✔
479
                        }
480
                }
481
                // Fill in user if auth requested.
482
                if auth {
103,455✔
483
                        cc.AuthorizedUser = cc.user
14✔
484
                        if cc.acc != _EMPTY_ && (cc.acc != globalAccountName) {
28✔
485
                                cc.Account = cc.acc
14✔
486
                                if acc, err := s.LookupAccount(cc.acc); err == nil {
28✔
487
                                        cc.NameTag = acc.getNameTag()
14✔
488
                                }
14✔
489
                        }
490
                }
491
                pconns[i] = &cc.ConnInfo
103,441✔
492
                i++
103,441✔
493
        }
494

495
        // This will trip if we have filtered out client connections.
496
        if len(pconns) != i {
1,316✔
497
                pconns = pconns[:i]
28✔
498
                totalClients = i
28✔
499
        }
28✔
500

501
        switch sortOpt {
1,288✔
502
        case ByCid, ByStart:
1,262✔
503
                sort.Sort(SortByCid{pconns})
1,262✔
504
        case BySubs:
2✔
505
                sort.Sort(sort.Reverse(SortBySubs{pconns}))
2✔
506
        case ByPending:
2✔
507
                sort.Sort(sort.Reverse(SortByPending{pconns}))
2✔
508
        case ByOutMsgs:
2✔
509
                sort.Sort(sort.Reverse(SortByOutMsgs{pconns}))
2✔
510
        case ByInMsgs:
2✔
511
                sort.Sort(sort.Reverse(SortByInMsgs{pconns}))
2✔
512
        case ByOutBytes:
2✔
513
                sort.Sort(sort.Reverse(SortByOutBytes{pconns}))
2✔
514
        case ByInBytes:
2✔
515
                sort.Sort(sort.Reverse(SortByInBytes{pconns}))
2✔
516
        case ByLast:
2✔
517
                sort.Sort(sort.Reverse(SortByLast{pconns}))
2✔
518
        case ByIdle:
2✔
519
                sort.Sort(sort.Reverse(SortByIdle{pconns, c.Now}))
2✔
520
        case ByUptime:
4✔
521
                sort.Sort(SortByUptime{pconns, time.Now()})
4✔
522
        case ByStop:
2✔
523
                sort.Sort(sort.Reverse(SortByStop{pconns}))
2✔
524
        case ByReason:
2✔
525
                sort.Sort(SortByReason{pconns})
2✔
526
        case ByRTT:
2✔
527
                sort.Sort(sort.Reverse(SortByRTT{pconns}))
2✔
528
        }
529

530
        minoff := c.Offset
1,288✔
531
        maxoff := c.Offset + c.Limit
1,288✔
532

1,288✔
533
        maxIndex := totalClients
1,288✔
534

1,288✔
535
        // Make sure these are sane.
1,288✔
536
        if minoff > maxIndex {
1,288✔
537
                minoff = maxIndex
×
538
        }
×
539
        if maxoff > maxIndex {
2,550✔
540
                maxoff = maxIndex
1,262✔
541
        }
1,262✔
542

543
        // Now pare down to the requested size.
544
        // TODO(dlc) - for very large number of connections we
545
        // could save the whole list in a hash, send hash on first
546
        // request and allow users to use has for subsequent pages.
547
        // Low TTL, say < 1sec.
548
        c.Conns = pconns[minoff:maxoff]
1,288✔
549
        c.NumConns = len(c.Conns)
1,288✔
550

1,288✔
551
        return c, nil
1,288✔
552
}
553

554
// Fills in the ConnInfo from the client.
555
// client should be locked.
556
func (ci *ConnInfo) fill(client *client, nc net.Conn, now time.Time, auth bool) {
12,970✔
557
        // For fast sort if required.
12,970✔
558
        rtt := client.getRTT()
12,970✔
559
        ci.rtt = int64(rtt)
12,970✔
560

12,970✔
561
        ci.Cid = client.cid
12,970✔
562
        ci.MQTTClient = client.getMQTTClientID()
12,970✔
563
        ci.Kind = client.kindString()
12,970✔
564
        ci.Type = client.clientTypeString()
12,970✔
565
        ci.Start = client.start
12,970✔
566
        ci.LastActivity = client.last
12,970✔
567
        ci.Uptime = myUptime(now.Sub(client.start))
12,970✔
568
        ci.Idle = myUptime(now.Sub(client.last))
12,970✔
569
        ci.RTT = rtt.String()
12,970✔
570
        ci.OutMsgs = client.outMsgs
12,970✔
571
        ci.OutBytes = client.outBytes
12,970✔
572
        ci.NumSubs = uint32(len(client.subs))
12,970✔
573
        ci.Pending = int(client.out.pb)
12,970✔
574
        ci.Name = client.opts.Name
12,970✔
575
        ci.Lang = client.opts.Lang
12,970✔
576
        ci.Version = client.opts.Version
12,970✔
577
        // inMsgs and inBytes are updated outside of the client's lock, so
12,970✔
578
        // we need to use atomic here.
12,970✔
579
        ci.InMsgs = atomic.LoadInt64(&client.inMsgs)
12,970✔
580
        ci.InBytes = atomic.LoadInt64(&client.inBytes)
12,970✔
581
        ci.Stalls = atomic.LoadInt64(&client.stalls)
12,970✔
582
        ci.Proxy = createProxyInfo(client)
12,970✔
583

12,970✔
584
        // If the connection is gone, too bad, we won't set TLSVersion and TLSCipher.
12,970✔
585
        // Exclude clients that are still doing handshake so we don't block in
12,970✔
586
        // ConnectionState().
12,970✔
587
        if client.flags.isSet(handshakeComplete) && nc != nil {
13,298✔
588
                if conn, ok := nc.(*tls.Conn); ok {
656✔
589
                        cs := conn.ConnectionState()
328✔
590
                        ci.TLSVersion = tlsVersion(cs.Version)
328✔
591
                        ci.TLSCipher = tls.CipherSuiteName(cs.CipherSuite)
328✔
592
                        if auth && len(cs.PeerCertificates) > 0 {
330✔
593
                                ci.TLSPeerCerts = makePeerCerts(cs.PeerCertificates)
2✔
594
                        }
2✔
595
                        ci.TLSFirst = client.flags.isSet(didTLSFirst)
328✔
596
                }
597
        }
598

599
        if client.port != 0 {
25,713✔
600
                ci.Port = int(client.port)
12,743✔
601
                ci.IP = client.host
12,743✔
602
        }
12,743✔
603
}
604

605
// If this client came from a trusted proxy, this will return a ProxyInfo
606
// to be used in ConnInfo or LeafInfo.
607
//
608
// Client lock must be held on entry.
609
func createProxyInfo(c *client) *ProxyInfo {
13,101✔
610
        if c.proxyKey == _EMPTY_ {
26,190✔
611
                return nil
13,089✔
612
        }
13,089✔
613
        return &ProxyInfo{Key: c.proxyKey}
12✔
614
}
615

616
func makePeerCerts(pc []*x509.Certificate) []*TLSPeerCert {
2✔
617
        res := make([]*TLSPeerCert, len(pc))
2✔
618
        for i, c := range pc {
4✔
619
                tmp := sha256.Sum256(c.RawSubjectPublicKeyInfo)
2✔
620
                ssha := hex.EncodeToString(tmp[:])
2✔
621
                tmp = sha256.Sum256(c.Raw)
2✔
622
                csha := hex.EncodeToString(tmp[:])
2✔
623
                res[i] = &TLSPeerCert{Subject: c.Subject.String(), SubjectPKISha256: ssha, CertSha256: csha}
2✔
624
        }
2✔
625
        return res
2✔
626
}
627

628
// Assume lock is held
629
func (c *client) getRTT() time.Duration {
682,373✔
630
        if c.rtt == 0 {
1,345,781✔
631
                // If a real client, go ahead and send ping now to get a value
663,408✔
632
                // for RTT. For tests and telnet, or if client is closing, etc skip.
663,408✔
633
                if c.opts.Lang != _EMPTY_ {
663,408✔
634
                        c.sendRTTPingLocked()
×
635
                }
×
636
                return 0
663,408✔
637
        }
638
        var rtt time.Duration
18,965✔
639
        if c.rtt > time.Microsecond && c.rtt < time.Millisecond {
35,495✔
640
                rtt = c.rtt.Truncate(time.Microsecond)
16,530✔
641
        } else {
18,965✔
642
                rtt = c.rtt.Truncate(time.Nanosecond)
2,435✔
643
        }
2,435✔
644
        return rtt
18,965✔
645
}
646

647
func decodeBool(w http.ResponseWriter, r *http.Request, param string) (bool, error) {
3,602✔
648
        str := r.URL.Query().Get(param)
3,602✔
649
        if str == _EMPTY_ {
6,557✔
650
                return false, nil
2,955✔
651
        }
2,955✔
652
        val, err := strconv.ParseBool(str)
647✔
653
        if err != nil {
650✔
654
                w.WriteHeader(http.StatusBadRequest)
3✔
655
                w.Write([]byte(fmt.Sprintf("Error decoding boolean for '%s': %v", param, err)))
3✔
656
                return false, err
3✔
657
        }
3✔
658
        return val, nil
644✔
659
}
660

661
func decodeUint64(w http.ResponseWriter, r *http.Request, param string) (uint64, error) {
1,278✔
662
        str := r.URL.Query().Get(param)
1,278✔
663
        if str == _EMPTY_ {
2,546✔
664
                return 0, nil
1,268✔
665
        }
1,268✔
666
        val, err := strconv.ParseUint(str, 10, 64)
10✔
667
        if err != nil {
10✔
668
                w.WriteHeader(http.StatusBadRequest)
×
669
                w.Write([]byte(fmt.Sprintf("Error decoding uint64 for '%s': %v", param, err)))
×
670
                return 0, err
×
671
        }
×
672
        return val, nil
10✔
673
}
674

675
func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
3,015✔
676
        str := r.URL.Query().Get(param)
3,015✔
677
        if str == _EMPTY_ {
5,990✔
678
                return 0, nil
2,975✔
679
        }
2,975✔
680
        val, err := strconv.Atoi(str)
40✔
681
        if err != nil {
42✔
682
                w.WriteHeader(http.StatusBadRequest)
2✔
683
                w.Write([]byte(fmt.Sprintf("Error decoding int for '%s': %v", param, err)))
2✔
684
                return 0, err
2✔
685
        }
2✔
686
        return val, nil
38✔
687
}
688

689
func decodeState(w http.ResponseWriter, r *http.Request) (ConnState, error) {
1,278✔
690
        str := r.URL.Query().Get("state")
1,278✔
691
        if str == _EMPTY_ {
1,486✔
692
                return ConnOpen, nil
208✔
693
        }
208✔
694
        switch strings.ToLower(str) {
1,070✔
695
        case "open":
12✔
696
                return ConnOpen, nil
12✔
697
        case "closed":
1,054✔
698
                return ConnClosed, nil
1,054✔
699
        case "any", "all":
3✔
700
                return ConnAll, nil
3✔
701
        }
702
        // We do not understand intended state here.
703
        w.WriteHeader(http.StatusBadRequest)
1✔
704
        err := fmt.Errorf("Error decoding state for %s", str)
1✔
705
        w.Write([]byte(err.Error()))
1✔
706
        return 0, err
1✔
707
}
708

709
func decodeSubs(w http.ResponseWriter, r *http.Request) (subs bool, subsDet bool, err error) {
1,586✔
710
        subsDet = strings.ToLower(r.URL.Query().Get("subs")) == "detail"
1,586✔
711
        if !subsDet {
3,167✔
712
                subs, err = decodeBool(w, r, "subs")
1,581✔
713
        }
1,581✔
714
        return
1,586✔
715
}
716

717
// HandleConnz process HTTP requests for connection information.
718
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
1,282✔
719
        sortOpt := SortOpt(r.URL.Query().Get("sort"))
1,282✔
720
        auth, err := decodeBool(w, r, "auth")
1,282✔
721
        if err != nil {
1,283✔
722
                return
1✔
723
        }
1✔
724
        subs, subsDet, err := decodeSubs(w, r)
1,281✔
725
        if err != nil {
1,282✔
726
                return
1✔
727
        }
1✔
728
        offset, err := decodeInt(w, r, "offset")
1,280✔
729
        if err != nil {
1,281✔
730
                return
1✔
731
        }
1✔
732
        limit, err := decodeInt(w, r, "limit")
1,279✔
733
        if err != nil {
1,280✔
734
                return
1✔
735
        }
1✔
736
        cid, err := decodeUint64(w, r, "cid")
1,278✔
737
        if err != nil {
1,278✔
738
                return
×
739
        }
×
740
        state, err := decodeState(w, r)
1,278✔
741
        if err != nil {
1,279✔
742
                return
1✔
743
        }
1✔
744

745
        user := r.URL.Query().Get("user")
1,277✔
746
        acc := r.URL.Query().Get("acc")
1,277✔
747
        mqttCID := r.URL.Query().Get("mqtt_client")
1,277✔
748

1,277✔
749
        connzOpts := &ConnzOptions{
1,277✔
750
                Sort:                sortOpt,
1,277✔
751
                Username:            auth,
1,277✔
752
                Subscriptions:       subs,
1,277✔
753
                SubscriptionsDetail: subsDet,
1,277✔
754
                Offset:              offset,
1,277✔
755
                Limit:               limit,
1,277✔
756
                CID:                 cid,
1,277✔
757
                MQTTClient:          mqttCID,
1,277✔
758
                State:               state,
1,277✔
759
                User:                user,
1,277✔
760
                Account:             acc,
1,277✔
761
        }
1,277✔
762

1,277✔
763
        s.mu.Lock()
1,277✔
764
        s.httpReqStats[ConnzPath]++
1,277✔
765
        s.mu.Unlock()
1,277✔
766

1,277✔
767
        c, err := s.Connz(connzOpts)
1,277✔
768
        if err != nil {
1,278✔
769
                w.WriteHeader(http.StatusBadRequest)
1✔
770
                w.Write([]byte(err.Error()))
1✔
771
                return
1✔
772
        }
1✔
773
        b, err := json.MarshalIndent(c, "", "  ")
1,276✔
774
        if err != nil {
1,276✔
775
                s.Errorf("Error marshaling response to /connz request: %v", err)
×
776
        }
×
777

778
        // Handle response
779
        ResponseHandler(w, r, b)
1,276✔
780
}
781

782
// Routez represents detailed information on current client connections.
783
type Routez struct {
784
        ID        string             `json:"server_id"`
785
        Name      string             `json:"server_name"`
786
        Now       time.Time          `json:"now"`
787
        Import    *SubjectPermission `json:"import,omitempty"`
788
        Export    *SubjectPermission `json:"export,omitempty"`
789
        NumRoutes int                `json:"num_routes"`
790
        Routes    []*RouteInfo       `json:"routes"`
791
}
792

793
// RoutezOptions are options passed to Routez
794
type RoutezOptions struct {
795
        // Subscriptions indicates that Routez will return a route's subscriptions
796
        Subscriptions bool `json:"subscriptions"`
797
        // SubscriptionsDetail indicates if subscription details should be included in the results
798
        SubscriptionsDetail bool `json:"subscriptions_detail"`
799
}
800

801
// RouteInfo has detailed information on a per connection basis.
802
type RouteInfo struct {
803
        Rid          uint64             `json:"rid"`
804
        RemoteID     string             `json:"remote_id"`
805
        RemoteName   string             `json:"remote_name"`
806
        DidSolicit   bool               `json:"did_solicit"`
807
        IsConfigured bool               `json:"is_configured"`
808
        IP           string             `json:"ip"`
809
        Port         int                `json:"port"`
810
        Start        time.Time          `json:"start"`
811
        LastActivity time.Time          `json:"last_activity"`
812
        RTT          string             `json:"rtt,omitempty"`
813
        Uptime       string             `json:"uptime"`
814
        Idle         string             `json:"idle"`
815
        Import       *SubjectPermission `json:"import,omitempty"`
816
        Export       *SubjectPermission `json:"export,omitempty"`
817
        Pending      int                `json:"pending_size"`
818
        InMsgs       int64              `json:"in_msgs"`
819
        OutMsgs      int64              `json:"out_msgs"`
820
        InBytes      int64              `json:"in_bytes"`
821
        OutBytes     int64              `json:"out_bytes"`
822
        NumSubs      uint32             `json:"subscriptions"`
823
        Subs         []string           `json:"subscriptions_list,omitempty"`
824
        SubsDetail   []SubDetail        `json:"subscriptions_list_detail,omitempty"`
825
        Account      string             `json:"account,omitempty"`
826
        Compression  string             `json:"compression,omitempty"`
827
}
828

829
// Routez returns a Routez struct containing information about routes.
830
func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
740,100✔
831
        rs := &Routez{
740,100✔
832
                Now:    time.Now().UTC(),
740,100✔
833
                Routes: []*RouteInfo{},
740,100✔
834
        }
740,100✔
835

740,100✔
836
        if routezOpts == nil {
1,479,892✔
837
                routezOpts = &RoutezOptions{}
739,792✔
838
        }
739,792✔
839

840
        s.mu.Lock()
740,100✔
841
        rs.NumRoutes = s.numRoutes()
740,100✔
842

740,100✔
843
        // copy the server id for monitoring
740,100✔
844
        rs.ID = s.info.ID
740,100✔
845

740,100✔
846
        // Check for defined permissions for all connected routes.
740,100✔
847
        if perms := s.getOpts().Cluster.Permissions; perms != nil {
740,102✔
848
                rs.Import = perms.Import
2✔
849
                rs.Export = perms.Export
2✔
850
        }
2✔
851
        rs.Name = s.info.Name
740,100✔
852

740,100✔
853
        addRoute := func(r *client) {
1,400,478✔
854
                r.mu.Lock()
660,378✔
855
                ri := &RouteInfo{
660,378✔
856
                        Rid:          r.cid,
660,378✔
857
                        RemoteID:     r.route.remoteID,
660,378✔
858
                        RemoteName:   r.route.remoteName,
660,378✔
859
                        DidSolicit:   r.route.didSolicit,
660,378✔
860
                        IsConfigured: r.route.routeType == Explicit,
660,378✔
861
                        InMsgs:       atomic.LoadInt64(&r.inMsgs),
660,378✔
862
                        OutMsgs:      r.outMsgs,
660,378✔
863
                        InBytes:      atomic.LoadInt64(&r.inBytes),
660,378✔
864
                        OutBytes:     r.outBytes,
660,378✔
865
                        NumSubs:      uint32(len(r.subs)),
660,378✔
866
                        Import:       r.opts.Import,
660,378✔
867
                        Pending:      int(r.out.pb),
660,378✔
868
                        Export:       r.opts.Export,
660,378✔
869
                        RTT:          r.getRTT().String(),
660,378✔
870
                        Start:        r.start,
660,378✔
871
                        LastActivity: r.last,
660,378✔
872
                        Uptime:       myUptime(rs.Now.Sub(r.start)),
660,378✔
873
                        Idle:         myUptime(rs.Now.Sub(r.last)),
660,378✔
874
                        Account:      string(r.route.accName),
660,378✔
875
                        Compression:  r.route.compression,
660,378✔
876
                }
660,378✔
877

660,378✔
878
                if len(r.subs) > 0 {
660,421✔
879
                        if routezOpts.SubscriptionsDetail {
47✔
880
                                ri.SubsDetail = newSubsDetailList(r)
4✔
881
                        } else if routezOpts.Subscriptions {
51✔
882
                                ri.Subs = newSubsList(r)
8✔
883
                        }
8✔
884
                }
885

886
                switch conn := r.nc.(type) {
660,378✔
887
                case *net.TCPConn, *tls.Conn:
653,793✔
888
                        addr := conn.RemoteAddr().(*net.TCPAddr)
653,793✔
889
                        ri.Port = addr.Port
653,793✔
890
                        ri.IP = addr.IP.String()
653,793✔
891
                }
892
                r.mu.Unlock()
660,378✔
893
                rs.Routes = append(rs.Routes, ri)
660,378✔
894
        }
895

896
        // Walk the list
897
        s.forEachRoute(func(r *client) {
1,400,478✔
898
                addRoute(r)
660,378✔
899
        })
660,378✔
900
        s.mu.Unlock()
740,100✔
901
        return rs, nil
740,100✔
902
}
903

904
// HandleRoutez process HTTP requests for route information.
905
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
286✔
906
        subs, subsDetail, err := decodeSubs(w, r)
286✔
907
        if err != nil {
287✔
908
                return
1✔
909
        }
1✔
910

911
        opts := RoutezOptions{Subscriptions: subs, SubscriptionsDetail: subsDetail}
285✔
912

285✔
913
        s.mu.Lock()
285✔
914
        s.httpReqStats[RoutezPath]++
285✔
915
        s.mu.Unlock()
285✔
916

285✔
917
        // As of now, no error is ever returned.
285✔
918
        rs, _ := s.Routez(&opts)
285✔
919
        b, err := json.MarshalIndent(rs, "", "  ")
285✔
920
        if err != nil {
285✔
921
                s.Errorf("Error marshaling response to /routez request: %v", err)
×
922
        }
×
923

924
        // Handle response
925
        ResponseHandler(w, r, b)
285✔
926
}
927

928
// Subsz represents detail information on current connections.
929
type Subsz struct {
930
        ID  string    `json:"server_id"`
931
        Now time.Time `json:"now"`
932
        *SublistStats
933
        Total  int         `json:"total"`
934
        Offset int         `json:"offset"`
935
        Limit  int         `json:"limit"`
936
        Subs   []SubDetail `json:"subscriptions_list,omitempty"`
937
}
938

939
// SubszOptions are the options passed to Subsz.
940
// As of now, there are no options defined.
941
type SubszOptions struct {
942
        // Offset is used for pagination. Subsz() only returns connections starting at this
943
        // offset from the global results.
944
        Offset int `json:"offset"`
945

946
        // Limit is the maximum number of subscriptions that should be returned by Subsz().
947
        Limit int `json:"limit"`
948

949
        // Subscriptions indicates if subscription details should be included in the results.
950
        Subscriptions bool `json:"subscriptions"`
951

952
        // Filter based on this account name.
953
        Account string `json:"account,omitempty"`
954

955
        // Test the list against this subject. Needs to be literal since it signifies a publish subject.
956
        // We will only return subscriptions that would match if a message was sent to this subject.
957
        Test string `json:"test,omitempty"`
958
}
959

960
// SubDetail is for verbose information for subscriptions.
961
type SubDetail struct {
962
        Account    string `json:"account,omitempty"`
963
        AccountTag string `json:"account_tag,omitempty"`
964
        Subject    string `json:"subject"`
965
        Queue      string `json:"qgroup,omitempty"`
966
        Sid        string `json:"sid"`
967
        Msgs       int64  `json:"msgs"`
968
        Max        int64  `json:"max,omitempty"`
969
        Cid        uint64 `json:"cid"`
970
}
971

972
// Subscription client should be locked and guaranteed to be present.
973
func newSubDetail(sub *subscription) SubDetail {
151,368✔
974
        sd := newClientSubDetail(sub)
151,368✔
975
        sd.Account = sub.client.acc.GetName()
151,368✔
976
        sd.AccountTag = sub.client.acc.getNameTag()
151,368✔
977
        return sd
151,368✔
978
}
151,368✔
979

980
// For subs details under clients.
981
func newClientSubDetail(sub *subscription) SubDetail {
151,503✔
982
        return SubDetail{
151,503✔
983
                Subject: string(sub.subject),
151,503✔
984
                Queue:   string(sub.queue),
151,503✔
985
                Sid:     string(sub.sid),
151,503✔
986
                Msgs:    sub.nm,
151,503✔
987
                Max:     sub.max,
151,503✔
988
                Cid:     sub.client.cid,
151,503✔
989
        }
151,503✔
990
}
151,503✔
991

992
// Subsz returns a Subsz struct containing subjects statistics
993
func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
207✔
994
        var (
207✔
995
                subdetail bool
207✔
996
                test      bool
207✔
997
                offset    int
207✔
998
                testSub   string
207✔
999
                filterAcc string
207✔
1000
                limit     = DefaultSubListSize
207✔
1001
        )
207✔
1002

207✔
1003
        if opts != nil {
400✔
1004
                subdetail = opts.Subscriptions
193✔
1005
                offset = opts.Offset
193✔
1006
                if offset < 0 {
193✔
1007
                        offset = 0
×
1008
                }
×
1009
                limit = opts.Limit
193✔
1010
                if limit <= 0 {
378✔
1011
                        limit = DefaultSubListSize
185✔
1012
                }
185✔
1013
                if opts.Test != _EMPTY_ {
202✔
1014
                        testSub = opts.Test
9✔
1015
                        test = true
9✔
1016
                        if !IsValidLiteralSubject(testSub) {
13✔
1017
                                return nil, fmt.Errorf("invalid test subject, must be valid publish subject: %s", testSub)
4✔
1018
                        }
4✔
1019
                }
1020
                if opts.Account != _EMPTY_ {
195✔
1021
                        filterAcc = opts.Account
6✔
1022
                }
6✔
1023
        }
1024

1025
        slStats := &SublistStats{}
203✔
1026

203✔
1027
        // FIXME(dlc) - Make account aware.
203✔
1028
        sz := &Subsz{
203✔
1029
                ID:           s.info.ID,
203✔
1030
                Now:          time.Now().UTC(),
203✔
1031
                SublistStats: slStats,
203✔
1032
                Total:        0,
203✔
1033
                Offset:       offset,
203✔
1034
                Limit:        limit,
203✔
1035
                Subs:         nil,
203✔
1036
        }
203✔
1037

203✔
1038
        if subdetail {
221✔
1039
                var raw [4096]*subscription
18✔
1040
                subs := raw[:0]
18✔
1041
                s.accounts.Range(func(k, v any) bool {
58✔
1042
                        acc := v.(*Account)
40✔
1043
                        if filterAcc != _EMPTY_ && acc.GetName() != filterAcc {
58✔
1044
                                return true
18✔
1045
                        }
18✔
1046
                        slStats.add(acc.sl.Stats())
22✔
1047
                        acc.sl.localSubs(&subs, false)
22✔
1048
                        return true
22✔
1049
                })
1050

1051
                details := make([]SubDetail, 0, len(subs))
18✔
1052
                i := 0
18✔
1053
                // TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
18✔
1054
                for _, sub := range subs {
1,483✔
1055
                        // Check for filter
1,465✔
1056
                        if test && !matchLiteral(testSub, string(sub.subject)) {
1,577✔
1057
                                continue
112✔
1058
                        }
1059
                        if sub.client == nil {
1,353✔
1060
                                continue
×
1061
                        }
1062
                        sub.client.mu.Lock()
1,353✔
1063
                        details = append(details, newSubDetail(sub))
1,353✔
1064
                        sub.client.mu.Unlock()
1,353✔
1065
                        i++
1,353✔
1066
                }
1067
                minoff := sz.Offset
18✔
1068
                maxoff := sz.Offset + sz.Limit
18✔
1069

18✔
1070
                maxIndex := i
18✔
1071

18✔
1072
                // Make sure these are sane.
18✔
1073
                if minoff > maxIndex {
18✔
1074
                        minoff = maxIndex
×
1075
                }
×
1076
                if maxoff > maxIndex {
30✔
1077
                        maxoff = maxIndex
12✔
1078
                }
12✔
1079
                sz.Subs = details[minoff:maxoff]
18✔
1080
                sz.Total = len(details)
18✔
1081
        } else {
185✔
1082
                s.accounts.Range(func(k, v any) bool {
392✔
1083
                        acc := v.(*Account)
207✔
1084
                        if filterAcc != _EMPTY_ && acc.GetName() != filterAcc {
207✔
1085
                                return true
×
1086
                        }
×
1087
                        slStats.add(acc.sl.Stats())
207✔
1088
                        return true
207✔
1089
                })
1090
        }
1091

1092
        return sz, nil
203✔
1093
}
1094

1095
// HandleSubsz processes HTTP requests for subjects stats.
1096
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
176✔
1097
        s.mu.Lock()
176✔
1098
        s.httpReqStats[SubszPath]++
176✔
1099
        s.mu.Unlock()
176✔
1100

176✔
1101
        subs, err := decodeBool(w, r, "subs")
176✔
1102
        if err != nil {
176✔
1103
                return
×
1104
        }
×
1105
        offset, err := decodeInt(w, r, "offset")
176✔
1106
        if err != nil {
176✔
1107
                return
×
1108
        }
×
1109
        limit, err := decodeInt(w, r, "limit")
176✔
1110
        if err != nil {
176✔
1111
                return
×
1112
        }
×
1113
        testSub := r.URL.Query().Get("test")
176✔
1114
        // Filtered account.
176✔
1115
        filterAcc := r.URL.Query().Get("acc")
176✔
1116

176✔
1117
        subszOpts := &SubszOptions{
176✔
1118
                Subscriptions: subs,
176✔
1119
                Offset:        offset,
176✔
1120
                Limit:         limit,
176✔
1121
                Account:       filterAcc,
176✔
1122
                Test:          testSub,
176✔
1123
        }
176✔
1124

176✔
1125
        st, err := s.Subsz(subszOpts)
176✔
1126
        if err != nil {
180✔
1127
                w.WriteHeader(http.StatusBadRequest)
4✔
1128
                w.Write([]byte(err.Error()))
4✔
1129
                return
4✔
1130
        }
4✔
1131

1132
        var b []byte
172✔
1133
        b, err = json.MarshalIndent(st, "", "  ")
172✔
1134
        if err != nil {
172✔
1135
                s.Errorf("Error marshaling response to /subscriptionsz request: %v", err)
×
1136
        }
×
1137

1138
        // Handle response
1139
        ResponseHandler(w, r, b)
172✔
1140
}
1141

1142
// HandleStacksz processes HTTP requests for getting stacks
1143
func (s *Server) HandleStacksz(w http.ResponseWriter, r *http.Request) {
65✔
1144
        // Do not get any lock here that would prevent getting the stacks
65✔
1145
        // if we were to have a deadlock somewhere.
65✔
1146
        var defaultBuf [defaultStackBufSize]byte
65✔
1147
        size := defaultStackBufSize
65✔
1148
        buf := defaultBuf[:size]
65✔
1149
        n := 0
65✔
1150
        for {
271✔
1151
                n = runtime.Stack(buf, true)
206✔
1152
                if n < size {
271✔
1153
                        break
65✔
1154
                }
1155
                size *= 2
141✔
1156
                buf = make([]byte, size)
141✔
1157
        }
1158
        // Handle response
1159
        ResponseHandler(w, r, buf[:n])
65✔
1160
}
1161

1162
type IpqueueszStatusIPQ struct {
1163
        Pending    int `json:"pending"`
1164
        InProgress int `json:"in_progress,omitempty"`
1165
}
1166

1167
type IpqueueszStatus map[string]IpqueueszStatusIPQ
1168

1169
func (s *Server) Ipqueuesz(opts *IpqueueszOptions) *IpqueueszStatus {
1✔
1170
        all, qfilter := opts.All, opts.Filter
1✔
1171
        queues := IpqueueszStatus{}
1✔
1172
        s.ipQueues.Range(func(k, v any) bool {
7✔
1173
                var pending, inProgress int
6✔
1174
                name := k.(string)
6✔
1175
                queue, ok := v.(interface {
6✔
1176
                        len() int
6✔
1177
                        inProgress() int64
6✔
1178
                })
6✔
1179
                if ok {
12✔
1180
                        pending = queue.len()
6✔
1181
                        inProgress = int(queue.inProgress())
6✔
1182
                }
6✔
1183
                if !all && (pending == 0 && inProgress == 0) {
6✔
1184
                        return true
×
1185
                } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) {
6✔
1186
                        return true
×
1187
                }
×
1188
                queues[name] = IpqueueszStatusIPQ{Pending: pending, InProgress: inProgress}
6✔
1189
                return true
6✔
1190
        })
1191
        return &queues
1✔
1192
}
1193

1194
func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
1✔
1195
        all, err := decodeBool(w, r, "all")
1✔
1196
        if err != nil {
1✔
1197
                return
×
1198
        }
×
1199
        qfilter := r.URL.Query().Get("queues")
1✔
1200

1✔
1201
        queues := s.Ipqueuesz(&IpqueueszOptions{
1✔
1202
                All:    all,
1✔
1203
                Filter: qfilter,
1✔
1204
        })
1✔
1205

1✔
1206
        b, _ := json.MarshalIndent(queues, "", "   ")
1✔
1207
        ResponseHandler(w, r, b)
1✔
1208
}
1209

1210
// Varz will output server information on the monitoring port at /varz.
1211
type Varz struct {
1212
        ID                    string                 `json:"server_id"`                         // ID is the unique server ID generated at start
1213
        Name                  string                 `json:"server_name"`                       // Name is the configured server name, equals ID when not set
1214
        Version               string                 `json:"version"`                           // Version is the version of the running server
1215
        Proto                 int                    `json:"proto"`                             // Proto is the protocol version this server supports
1216
        GitCommit             string                 `json:"git_commit,omitempty"`              // GitCommit is the git repository commit hash that the build corresponds with
1217
        GoVersion             string                 `json:"go"`                                // GoVersion is the version of Go used to build this binary
1218
        Host                  string                 `json:"host"`                              // Host is the hostname the server runs on
1219
        Port                  int                    `json:"port"`                              // Port is the port the server listens on for client connections
1220
        AuthRequired          bool                   `json:"auth_required,omitempty"`           // AuthRequired indicates if users are required to authenticate to join the server
1221
        TLSRequired           bool                   `json:"tls_required,omitempty"`            // TLSRequired indicates if connections must use TLS when connecting to this server
1222
        TLSVerify             bool                   `json:"tls_verify,omitempty"`              // TLSVerify indicates if full TLS verification will be performed
1223
        TLSOCSPPeerVerify     bool                   `json:"tls_ocsp_peer_verify,omitempty"`    // TLSOCSPPeerVerify indicates if the OCSP protocol will be used to verify peers
1224
        IP                    string                 `json:"ip,omitempty"`                      // IP is the IP address the server listens on if set
1225
        ClientConnectURLs     []string               `json:"connect_urls,omitempty"`            // ClientConnectURLs is the list of URLs NATS clients can use to connect to this server
1226
        WSConnectURLs         []string               `json:"ws_connect_urls,omitempty"`         // WSConnectURLs is the list of URLs websocket clients can use to connect to this server
1227
        MaxConn               int                    `json:"max_connections"`                   // MaxConn is the maximum amount of connections the server can accept
1228
        MaxSubs               int                    `json:"max_subscriptions,omitempty"`       // MaxSubs is the maximum amount of subscriptions the server can manage
1229
        PingInterval          time.Duration          `json:"ping_interval"`                     // PingInterval is the interval the server will send PING messages during periods of inactivity on a connection
1230
        MaxPingsOut           int                    `json:"ping_max"`                          // MaxPingsOut is the number of unanswered PINGs after which the connection will be considered stale
1231
        HTTPHost              string                 `json:"http_host"`                         // HTTPHost is the HTTP host monitoring connections are accepted on
1232
        HTTPPort              int                    `json:"http_port"`                         // HTTPPort is the port monitoring connections are accepted on
1233
        HTTPBasePath          string                 `json:"http_base_path"`                    // HTTPBasePath is the path prefix for access to monitor endpoints
1234
        HTTPSPort             int                    `json:"https_port"`                        // HTTPSPort is the HTTPS host monitoring connections are accepted on`
1235
        AuthTimeout           float64                `json:"auth_timeout"`                      // AuthTimeout is the amount of seconds connections have to complete authentication
1236
        MaxControlLine        int32                  `json:"max_control_line"`                  // MaxControlLine is the amount of bytes a signal control message may be
1237
        MaxPayload            int                    `json:"max_payload"`                       // MaxPayload is the maximum amount of bytes a message may have as payload
1238
        MaxPending            int64                  `json:"max_pending"`                       // MaxPending is the maximum amount of unprocessed bytes a connection may have
1239
        Cluster               ClusterOptsVarz        `json:"cluster,omitempty"`                 // Cluster is the Cluster state
1240
        Gateway               GatewayOptsVarz        `json:"gateway,omitempty"`                 // Gateway is the Super Cluster state
1241
        LeafNode              LeafNodeOptsVarz       `json:"leaf,omitempty"`                    // LeafNode is the Leafnode state
1242
        MQTT                  MQTTOptsVarz           `json:"mqtt,omitempty"`                    // MQTT is the MQTT state
1243
        Websocket             WebsocketOptsVarz      `json:"websocket,omitempty"`               // Websocket is the Websocket client state
1244
        JetStream             JetStreamVarz          `json:"jetstream,omitempty"`               // JetStream is the JetStream state
1245
        TLSTimeout            float64                `json:"tls_timeout"`                       // TLSTimeout is how long TLS operations have to complete
1246
        WriteDeadline         time.Duration          `json:"write_deadline"`                    // WriteDeadline is the maximum time writes to sockets have to complete
1247
        WriteTimeout          string                 `json:"write_timeout,omitempty"`           // WriteTimeout is the closure policy for write deadline errors
1248
        Start                 time.Time              `json:"start"`                             // Start is time when the server was started
1249
        Now                   time.Time              `json:"now"`                               // Now is the current time of the server
1250
        Uptime                string                 `json:"uptime"`                            // Uptime is how long the server has been running
1251
        Mem                   int64                  `json:"mem"`                               // Mem is the resident memory allocation
1252
        Cores                 int                    `json:"cores"`                             // Cores is the number of cores the process has access to
1253
        MaxProcs              int                    `json:"gomaxprocs"`                        // MaxProcs is the configured GOMAXPROCS value
1254
        MemLimit              int64                  `json:"gomemlimit,omitempty"`              // MemLimit is the configured GOMEMLIMIT value
1255
        CPU                   float64                `json:"cpu"`                               // CPU is the current total CPU usage
1256
        Connections           int                    `json:"connections"`                       // Connections is the current connected connections
1257
        TotalConnections      uint64                 `json:"total_connections"`                 // TotalConnections is the total connections the server have ever handled
1258
        Routes                int                    `json:"routes"`                            // Routes is the number of connected route servers
1259
        Remotes               int                    `json:"remotes"`                           // Remotes is the configured route remote endpoints
1260
        Leafs                 int                    `json:"leafnodes"`                         // Leafs is the number connected leafnode clients
1261
        InMsgs                int64                  `json:"in_msgs"`                           // InMsgs is the number of messages this server received
1262
        OutMsgs               int64                  `json:"out_msgs"`                          // OutMsgs is the number of message this server sent
1263
        InBytes               int64                  `json:"in_bytes"`                          // InBytes is the number of bytes this server received
1264
        OutBytes              int64                  `json:"out_bytes"`                         // OutMsgs is the number of bytes this server sent
1265
        SlowConsumers         int64                  `json:"slow_consumers"`                    // SlowConsumers is the total count of clients that were disconnected since start due to being slow consumers
1266
        StaleConnections      int64                  `json:"stale_connections"`                 // StaleConnections is the total count of stale connections that were detected
1267
        StalledClients        int64                  `json:"stalled_clients"`                   // StalledClients is the total number of times that clients have been stalled.
1268
        Subscriptions         uint32                 `json:"subscriptions"`                     // Subscriptions is the count of active subscriptions
1269
        HTTPReqStats          map[string]uint64      `json:"http_req_stats"`                    // HTTPReqStats is the number of requests each HTTP endpoint received
1270
        ConfigLoadTime        time.Time              `json:"config_load_time"`                  // ConfigLoadTime is the time the configuration was loaded or reloaded
1271
        ConfigDigest          string                 `json:"config_digest"`                     // ConfigDigest is a calculated hash of the current configuration
1272
        Tags                  jwt.TagList            `json:"tags,omitempty"`                    // Tags are the tags assigned to the server in configuration
1273
        Metadata              map[string]string      `json:"metadata,omitempty"`                // Metadata is the metadata assigned to the server in configuration
1274
        TrustedOperatorsJwt   []string               `json:"trusted_operators_jwt,omitempty"`   // TrustedOperatorsJwt is the JWTs for all trusted operators
1275
        TrustedOperatorsClaim []*jwt.OperatorClaims  `json:"trusted_operators_claim,omitempty"` // TrustedOperatorsClaim is the decoded claims for each trusted operator
1276
        SystemAccount         string                 `json:"system_account,omitempty"`          // SystemAccount is the name of the System account
1277
        PinnedAccountFail     uint64                 `json:"pinned_account_fails,omitempty"`    // PinnedAccountFail is how often user logon fails due to the issuer account not being pinned.
1278
        OCSPResponseCache     *OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`         // OCSPResponseCache is the state of the OCSP cache
1279
        SlowConsumersStats    *SlowConsumersStats    `json:"slow_consumer_stats"`               // SlowConsumersStats are statistics about all detected Slow Consumer
1280
        StaleConnectionStats  *StaleConnectionStats  `json:"stale_connection_stats,omitempty"`  // StaleConnectionStats are statistics about all detected Stale Connections
1281
        Proxies               *ProxiesOptsVarz       `json:"proxies,omitempty"`                 // Proxies hold information about network proxy devices
1282
}
1283

1284
// JetStreamVarz contains basic runtime information about jetstream
1285
type JetStreamVarz struct {
1286
        Config *JetStreamConfig `json:"config,omitempty"` // Config is the active JetStream configuration
1287
        Stats  *JetStreamStats  `json:"stats,omitempty"`  // Stats is the statistics for the JetStream server
1288
        Meta   *MetaClusterInfo `json:"meta,omitempty"`   // Meta is information about the JetStream metalayer
1289
        Limits *JSLimitOpts     `json:"limits,omitempty"` // Limits are the configured JetStream limits
1290
}
1291

1292
// ClusterOptsVarz contains monitoring cluster information
1293
type ClusterOptsVarz struct {
1294
        Name          string        `json:"name,omitempty"`           // Name is the configured cluster name
1295
        Host          string        `json:"addr,omitempty"`           // Host is the host the cluster listens on for connections
1296
        Port          int           `json:"cluster_port,omitempty"`   // Port is the port the cluster listens on for connections
1297
        AuthTimeout   float64       `json:"auth_timeout,omitempty"`   // AuthTimeout is the time cluster connections have to complete authentication
1298
        URLs          []string      `json:"urls,omitempty"`           // URLs is the list of cluster URLs
1299
        TLSTimeout    float64       `json:"tls_timeout,omitempty"`    // TLSTimeout is how long TLS operations have to complete
1300
        TLSRequired   bool          `json:"tls_required,omitempty"`   // TLSRequired indicates if TLS is required for connections
1301
        TLSVerify     bool          `json:"tls_verify,omitempty"`     // TLSVerify indicates if full verification of TLS connections is performed
1302
        PoolSize      int           `json:"pool_size,omitempty"`      // PoolSize is the configured route connection pool size
1303
        WriteDeadline time.Duration `json:"write_deadline,omitempty"` // WriteDeadline is the maximum time writes to sockets have to complete
1304
        WriteTimeout  string        `json:"write_timeout,omitempty"`  // WriteTimeout is the closure policy for write deadline errors
1305
}
1306

1307
// GatewayOptsVarz contains monitoring gateway information
1308
type GatewayOptsVarz struct {
1309
        Name           string                  `json:"name,omitempty"`            // Name is the configured cluster name
1310
        Host           string                  `json:"host,omitempty"`            // Host is the host the gateway listens on for connections
1311
        Port           int                     `json:"port,omitempty"`            // Port is the post gateway connections listens on
1312
        AuthTimeout    float64                 `json:"auth_timeout,omitempty"`    // AuthTimeout is the time cluster connections have to complete authentication
1313
        TLSTimeout     float64                 `json:"tls_timeout,omitempty"`     // TLSTimeout is how long TLS operations have to complete
1314
        TLSRequired    bool                    `json:"tls_required,omitempty"`    // TLSRequired indicates if TLS is required for connections
1315
        TLSVerify      bool                    `json:"tls_verify,omitempty"`      // TLSVerify indicates if full verification of TLS connections is performed
1316
        Advertise      string                  `json:"advertise,omitempty"`       // Advertise is the URL advertised to remote gateway clients
1317
        ConnectRetries int                     `json:"connect_retries,omitempty"` // ConnectRetries is how many connection attempts the route will make
1318
        Gateways       []RemoteGatewayOptsVarz `json:"gateways,omitempty"`        // Gateways is state of configured gateway remotes
1319
        RejectUnknown  bool                    `json:"reject_unknown,omitempty"`  // RejectUnknown indicates if unknown cluster connections will be rejected
1320
        WriteDeadline  time.Duration           `json:"write_deadline,omitempty"`  // WriteDeadline is the maximum time writes to sockets have to complete
1321
        WriteTimeout   string                  `json:"write_timeout,omitempty"`   // WriteTimeout is the closure policy for write deadline errors
1322
}
1323

1324
// RemoteGatewayOptsVarz contains monitoring remote gateway information
1325
type RemoteGatewayOptsVarz struct {
1326
        Name       string   `json:"name"`                  // Name is the name of the remote gateway
1327
        TLSTimeout float64  `json:"tls_timeout,omitempty"` // TLSTimeout is how long TLS operations have to complete
1328
        URLs       []string `json:"urls,omitempty"`        // URLs is the list of Gateway URLs
1329
}
1330

1331
// LeafNodeOptsVarz contains monitoring leaf node information
1332
type LeafNodeOptsVarz struct {
1333
        Host              string               `json:"host,omitempty"`                 // Host is the host the server listens on
1334
        Port              int                  `json:"port,omitempty"`                 // Port is the port the server listens on
1335
        AuthTimeout       float64              `json:"auth_timeout,omitempty"`         // AuthTimeout is the time Leafnode connections have to complete authentication
1336
        TLSTimeout        float64              `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1337
        TLSRequired       bool                 `json:"tls_required,omitempty"`         // TLSRequired indicates if TLS is required for connections
1338
        TLSVerify         bool                 `json:"tls_verify,omitempty"`           // TLSVerify indicates if full verification of TLS connections is performed
1339
        Remotes           []RemoteLeafOptsVarz `json:"remotes,omitempty"`              // Remotes is state of configured Leafnode remotes
1340
        TLSOCSPPeerVerify bool                 `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be performed
1341
        WriteDeadline     time.Duration        `json:"write_deadline,omitempty"`       // WriteDeadline is the maximum time writes to sockets have to complete
1342
        WriteTimeout      string               `json:"write_timeout,omitempty"`        // WriteTimeout is the closure policy for write deadline errors
1343
}
1344

1345
// DenyRules Contains lists of subjects not allowed to be imported/exported
1346
type DenyRules struct {
1347
        Exports []string `json:"exports,omitempty"` // Exports are denied exports
1348
        Imports []string `json:"imports,omitempty"` // Imports are denied imports
1349
}
1350

1351
// RemoteLeafOptsVarz contains monitoring remote leaf node information
1352
type RemoteLeafOptsVarz struct {
1353
        LocalAccount      string     `json:"local_account,omitempty"`        // LocalAccount is the local account this leaf is logged into
1354
        TLSTimeout        float64    `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1355
        URLs              []string   `json:"urls,omitempty"`                 // URLs is the list of URLs for the remote Leafnode connection
1356
        Deny              *DenyRules `json:"deny,omitempty"`                 // Deny is the configured import and exports that the Leafnode may not access
1357
        TLSOCSPPeerVerify bool       `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1358
}
1359

1360
// MQTTOptsVarz contains monitoring MQTT information
1361
type MQTTOptsVarz struct {
1362
        Host              string        `json:"host,omitempty"`                 // Host is the host the server listens on
1363
        Port              int           `json:"port,omitempty"`                 // Port is the port the server listens on
1364
        NoAuthUser        string        `json:"no_auth_user,omitempty"`         // NoAuthUser is the user that will be used for unauthenticated connections
1365
        AuthTimeout       float64       `json:"auth_timeout,omitempty"`         // AuthTimeout is how long authentication has to complete
1366
        TLSMap            bool          `json:"tls_map,omitempty"`              // TLSMap indicates if TLS Mapping is enabled
1367
        TLSTimeout        float64       `json:"tls_timeout,omitempty"`          // TLSTimeout is how long TLS operations have to complete
1368
        TLSPinnedCerts    []string      `json:"tls_pinned_certs,omitempty"`     // TLSPinnedCerts is the list of certificates pinned to this connection
1369
        JsDomain          string        `json:"js_domain,omitempty"`            // JsDomain is the JetStream domain used for MQTT state
1370
        AckWait           time.Duration `json:"ack_wait,omitempty"`             // AckWait is how long the internal JetStream state store will allow acks to complete
1371
        MaxAckPending     uint16        `json:"max_ack_pending,omitempty"`      // MaxAckPending is how many outstanding acks the internal JetStream state store will allow
1372
        TLSOCSPPeerVerify bool          `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1373
}
1374

1375
// WebsocketOptsVarz contains monitoring websocket information
1376
type WebsocketOptsVarz struct {
1377
        Host              string        `json:"host,omitempty"`                 // Host is the host the server listens on
1378
        Port              int           `json:"port,omitempty"`                 // Port is the port the server listens on
1379
        Advertise         string        `json:"advertise,omitempty"`            // Advertise is the connection URL the server advertises
1380
        NoAuthUser        string        `json:"no_auth_user,omitempty"`         // NoAuthUser is the user that will be used for unauthenticated connections
1381
        JWTCookie         string        `json:"jwt_cookie,omitempty"`           // JWTCookie is the name of a cookie the server will read for the connection JWT
1382
        HandshakeTimeout  time.Duration `json:"handshake_timeout,omitempty"`    // HandshakeTimeout is how long the connection has to complete the websocket setup
1383
        AuthTimeout       float64       `json:"auth_timeout,omitempty"`         // AuthTimeout is how long authentication has to complete
1384
        NoTLS             bool          `json:"no_tls,omitempty"`               // NoTLS indicates if TLS is disabled
1385
        TLSMap            bool          `json:"tls_map,omitempty"`              // TLSMap indicates if TLS Mapping is enabled
1386
        TLSPinnedCerts    []string      `json:"tls_pinned_certs,omitempty"`     // TLSPinnedCerts is the list of certificates pinned to this connection
1387
        SameOrigin        bool          `json:"same_origin,omitempty"`          // SameOrigin indicates if same origin connections are allowed
1388
        AllowedOrigins    []string      `json:"allowed_origins,omitempty"`      // AllowedOrigins list of configured trusted origins
1389
        Compression       bool          `json:"compression,omitempty"`          // Compression indicates if compression is supported
1390
        TLSOCSPPeerVerify bool          `json:"tls_ocsp_peer_verify,omitempty"` // TLSOCSPPeerVerify indicates if OCSP verification will be done
1391
}
1392

1393
// OCSPResponseCacheVarz contains OCSP response cache information
1394
type OCSPResponseCacheVarz struct {
1395
        Type      string `json:"cache_type,omitempty"`               // Type is the kind of cache being used
1396
        Hits      int64  `json:"cache_hits,omitempty"`               // Hits is how many times the cache was able to answer a request
1397
        Misses    int64  `json:"cache_misses,omitempty"`             // Misses is how many times the cache failed to answer a request
1398
        Responses int64  `json:"cached_responses,omitempty"`         // Responses is how many responses are currently stored in the cache
1399
        Revokes   int64  `json:"cached_revoked_responses,omitempty"` // Revokes is how many of the stored cache entries are revokes
1400
        Goods     int64  `json:"cached_good_responses,omitempty"`    // Goods is how many of the stored cache entries are good responses
1401
        Unknowns  int64  `json:"cached_unknown_responses,omitempty"` // Unknowns  is how many of the stored cache entries are unknown responses
1402
}
1403

1404
// ProxiesOptsVarz contains proxies information
1405
type ProxiesOptsVarz struct {
1406
        Trusted []*ProxyOptsVarz `json:"trusted,omitempty"` // Trusted holds a list of trusted proxies
1407
}
1408

1409
// ProxyOptsVarz contains proxy information
1410
type ProxyOptsVarz struct {
1411
        Key string `json:"key"` // Key is the public key of the proxy
1412
}
1413

1414
// VarzOptions are the options passed to Varz().
1415
// Currently, there are no options defined.
1416
type VarzOptions struct{}
1417

1418
// SlowConsumersStats contains information about the slow consumers from different type of connections.
1419
type SlowConsumersStats struct {
1420
        Clients  uint64 `json:"clients"`  // Clients is how many Clients were slow consumers
1421
        Routes   uint64 `json:"routes"`   // Routes is how many Routes were slow consumers
1422
        Gateways uint64 `json:"gateways"` // Gateways is how many Gateways were slow consumers
1423
        Leafs    uint64 `json:"leafs"`    // Leafs is how many Leafnodes were slow consumers
1424
}
1425

1426
// StaleConnectionStats contains information about the stale connections from different type of connections.
1427
type StaleConnectionStats struct {
1428
        Clients  uint64 `json:"clients"`  // Clients is how many Client connections became stale connections
1429
        Routes   uint64 `json:"routes"`   // Routes is how many Route connections became stale connections
1430
        Gateways uint64 `json:"gateways"` // Gateways is how many Gateway connections became stale connections
1431
        Leafs    uint64 `json:"leafs"`    // Leafs is how many Leafnode connections became stale connections
1432
}
1433

1434
func myUptime(d time.Duration) string {
1,357,603✔
1435
        // Just use total seconds for uptime, and display days / years
1,357,603✔
1436
        tsecs := d / time.Second
1,357,603✔
1437
        tmins := tsecs / 60
1,357,603✔
1438
        thrs := tmins / 60
1,357,603✔
1439
        tdays := thrs / 24
1,357,603✔
1440
        tyrs := tdays / 365
1,357,603✔
1441

1,357,603✔
1442
        if tyrs > 0 {
1,357,604✔
1443
                return fmt.Sprintf("%dy%dd%dh%dm%ds", tyrs, tdays%365, thrs%24, tmins%60, tsecs%60)
1✔
1444
        }
1✔
1445
        if tdays > 0 {
1,357,603✔
1446
                return fmt.Sprintf("%dd%dh%dm%ds", tdays, thrs%24, tmins%60, tsecs%60)
1✔
1447
        }
1✔
1448
        if thrs > 0 {
1,357,602✔
1449
                return fmt.Sprintf("%dh%dm%ds", thrs, tmins%60, tsecs%60)
1✔
1450
        }
1✔
1451
        if tmins > 0 {
1,357,601✔
1452
                return fmt.Sprintf("%dm%ds", tmins, tsecs%60)
1✔
1453
        }
1✔
1454
        return fmt.Sprintf("%ds", tsecs)
1,357,599✔
1455
}
1456

1457
// HandleRoot will show basic info and links to others handlers.
1458
func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
4✔
1459
        // This feels dumb to me, but is required: https://code.google.com/p/go/issues/detail?id=4799
4✔
1460
        if r.URL.Path != s.httpBasePath {
4✔
1461
                http.NotFound(w, r)
×
1462
                return
×
1463
        }
×
1464
        s.mu.Lock()
4✔
1465
        s.httpReqStats[RootPath]++
4✔
1466
        s.mu.Unlock()
4✔
1467

4✔
1468
        // Calculate source url. If git set go directly to that tag, otherwise just main.
4✔
1469
        var srcUrl string
4✔
1470
        if gitCommit == _EMPTY_ {
8✔
1471
                srcUrl = "https://github.com/nats-io/nats-server"
4✔
1472
        } else if serverVersion != _EMPTY_ {
4✔
1473
                srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", serverVersion)
×
1474
        } else {
×
1475
                srcUrl = fmt.Sprintf("https://github.com/nats-io/nats-server/tree/%s", gitCommit)
×
1476
        }
×
1477

1478
        fmt.Fprintf(w, `<html lang="en">
4✔
1479
        <head>
4✔
1480
        <link rel="shortcut icon" href="https://nats.io/favicon.ico">
4✔
1481
        <style type="text/css">
4✔
1482
                body { font-family: ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,"Helvetica Neue",Arial,"Noto Sans",sans-serif; font-size: 18; font-weight: light-bold; margin-left: 32px }
4✔
1483
                a { display:block; margin-left: 7px; padding-bottom: 6px; color: rgb(72 72 92); text-decoration: none }
4✔
1484
                a:hover { font-weight: 600; color: rgb(59 50 202) }
4✔
1485
                a.help { display:inline; font-weight: 600; color: rgb(59 50 202); font-size: 20}
4✔
1486
                a.last { padding-bottom: 16px }
4✔
1487
                a.version { font-size: 14; font-weight: 400; width: 312px; text-align: right; margin-top: -2rem }
4✔
1488
                a.version:hover { color: rgb(22 22 32) }
4✔
1489
                .endpoint { font-size: 12px; color: #999; font-family: monospace; display: none }
4✔
1490
                a:hover .endpoint { display: inline }
4✔
1491

4✔
1492
        </style>
4✔
1493
        </head>
4✔
1494
        <body>
4✔
1495
   <svg xmlns="http://www.w3.org/2000/svg" role="img" width="325" height="110" viewBox="-4.14 -3.89 436.28 119.03"><style>.st1{fill:#fff}.st2{fill:#34a574}</style><path fill="#27aae1" d="M4.3 84.6h42.2L70.7 107V84.6H103v-80H4.3v80zm15.9-61.3h18.5l35.6 33.2V23.3h11.8v42.9H68.2L32 32.4v33.8H20.2V23.3z"/><path d="M32 32.4l36.2 33.8h17.9V23.3H74.3v33.2L38.7 23.3H20.2v42.9H32z" class="st1"/><path d="M159.8 30.7L147 49h25.6z" class="st2"/><path d="M111.3 84.6H210v-80h-98.7v80zm41-61.5H168l30.8 43.2h-14.1l-5.8-8.3h-38.1l-5.8 8.3h-13.5l30.8-43.2z" class="st2"/><path d="M140.8 57.9h38.1l5.8 8.3h14.1L168 23.1h-15.7l-30.8 43.2H135l5.8-8.4zm19-27.2L172.6 49H147l12.8-18.3z" class="st1"/><path fill="#375c93" d="M218.3 84.6H317v-80h-98.7v80zm15.5-61.3h66.7V33h-27.2v33.2h-12.2V33h-27.3v-9.7z"/><path d="M261.1 66.2h12.2V33h27.2v-9.7h-66.7V33h27.3z" class="st1"/><path fill="#8dc63f" d="M325.3 4.6v80H424v-80h-98.7zm76.5 56.7c-3.2 3.2-10.2 5.7-26.8 5.7-12.3 0-24.1-1.9-30.7-4.7v-10c6.3 2.8 20.1 5.5 30.7 5.5 9.3 0 15.8-.3 17.5-2.1.6-.6.7-1.3.7-2 0-.8-.2-1.3-.7-1.8-1-1-2.6-1.7-17.4-2.1-15.7-.4-23.4-2-27-5.6-1.7-1.7-2.6-4.4-2.6-7.5 0-3.3.6-6.2 3.3-8.9 3.6-3.6 10.7-5.3 25.1-5.3 10.8 0 21.6 1.7 27.3 4v10.1c-6.5-2.8-17.8-4.8-27.2-4.8-10.4 0-14.8.6-16.2 2-.5.5-.8 1.1-.8 1.9 0 .9.2 1.5.7 2 1.3 1.3 6.1 1.7 17.3 1.9 16.4.4 23.5 1.8 27 5.2 1.8 1.8 2.8 4.7 2.8 7.7.1 3.2-.6 6.4-3 8.8z"/><path d="M375.2 39.5c-11.2-.2-16-.6-17.3-1.9-.5-.5-.7-1.1-.7-2 0-.8.3-1.4.8-1.9 1.3-1.3 5.8-2 16.2-2 9.4 0 20.7 2 27.2 4.8v-10c-5.7-2.3-16.6-4-27.3-4-14.5 0-21.6 1.8-25.1 5.3-2.7 2.7-3.3 5.6-3.3 8.9 0 3.1 1 5.8 2.6 7.5 3.6 3.6 11.3 5.2 27 5.6 14.8.4 16.4 1.1 17.4 2.1.5.5.7 1 .7 1.8 0 .7-.1 1.3-.7 2-1.8 1.8-8.3 2.1-17.5 2.1-10.6 0-24.3-2.6-30.7-5.5v10.1c6.6 2.8 18.4 4.7 30.7 4.7 16.6 0 23.6-2.5 26.8-5.7 2.4-2.4 3.1-5.6 3.1-8.9 0-3.1-1-5.9-2.8-7.7-3.6-3.5-10.7-4.9-27.1-5.3z" class="st1"/></svg>
4✔
1496

4✔
1497
        <a href=%s class='version'>v%s</a>
4✔
1498

4✔
1499
        </div>
4✔
1500
        <br/>
4✔
1501
        <a href=.%s>General<span class="endpoint"> %s</span></a>
4✔
1502
        <a href=.%s>JetStream<span class="endpoint"> %s</span></a>
4✔
1503
        <a href=.%s>Connections<span class="endpoint"> %s</span></a>
4✔
1504
        <a href=.%s>Accounts<span class="endpoint"> %s</span></a>
4✔
1505
        <a href=.%s>Account Stats<span class="endpoint"> %s</span></a>
4✔
1506
        <a href=.%s>Subscriptions<span class="endpoint"> %s</span></a>
4✔
1507
        <a href=.%s>Routes<span class="endpoint"> %s</span></a>
4✔
1508
        <a href=.%s>LeafNodes<span class="endpoint"> %s</span></a>
4✔
1509
        <a href=.%s>Gateways<span class="endpoint"> %s</span></a>
4✔
1510
        <a href=.%s>Raft Groups<span class="endpoint"> %s</span></a>
4✔
1511
        <a href=.%s>Health Probe<span class="endpoint"> %s</span></a>
4✔
1512
        <a href=.%s class=last>Expvar<span class="endpoint"> %s</span></a>
4✔
1513
    <a href=https://docs.nats.io/running-a-nats-service/nats_admin/monitoring class="help">Help</a>
4✔
1514
  </body>
4✔
1515
</html>`,
4✔
1516
                srcUrl,
4✔
1517
                VERSION,
4✔
1518
                s.basePath(VarzPath), VarzPath,
4✔
1519
                s.basePath(JszPath), JszPath,
4✔
1520
                s.basePath(ConnzPath), ConnzPath,
4✔
1521
                s.basePath(AccountzPath), AccountzPath,
4✔
1522
                s.basePath(AccountStatzPath), AccountStatzPath,
4✔
1523
                s.basePath(SubszPath), SubszPath,
4✔
1524
                s.basePath(RoutezPath), RoutezPath,
4✔
1525
                s.basePath(LeafzPath), LeafzPath,
4✔
1526
                s.basePath(GatewayzPath), GatewayzPath,
4✔
1527
                s.basePath(RaftzPath), RaftzPath,
4✔
1528
                s.basePath(HealthzPath), HealthzPath,
4✔
1529
                s.basePath(ExpvarzPath), ExpvarzPath,
4✔
1530
        )
4✔
1531
}
1532

1533
func (s *Server) updateJszVarz(js *jetStream, v *JetStreamVarz, doConfig bool) {
45✔
1534
        if doConfig {
67✔
1535
                js.mu.RLock()
22✔
1536
                // We want to snapshot the config since it will then be available outside
22✔
1537
                // of the js lock. So make a copy first, then point to this copy.
22✔
1538
                cfg := js.config
22✔
1539
                v.Config = &cfg
22✔
1540
                js.mu.RUnlock()
22✔
1541
        }
22✔
1542
        v.Stats = js.usageStats()
45✔
1543
        v.Limits = &s.getOpts().JetStreamLimits
45✔
1544
        if mg := js.getMetaGroup(); mg != nil {
58✔
1545
                if ci := s.raftNodeToClusterInfo(mg); ci != nil {
26✔
1546
                        v.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
13✔
1547
                        if ci.Leader == s.info.Name {
25✔
1548
                                v.Meta.Replicas = ci.Replicas
12✔
1549
                        }
12✔
1550
                        if ipq := s.jsAPIRoutedReqs; ipq != nil {
26✔
1551
                                v.Meta.Pending = ipq.len()
13✔
1552
                        }
13✔
1553
                }
1554
        }
1555
}
1556

1557
// Varz returns a Varz struct containing the server information.
1558
func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
5,362✔
1559
        var rss, vss int64
5,362✔
1560
        var pcpu float64
5,362✔
1561

5,362✔
1562
        // We want to do that outside of the lock.
5,362✔
1563
        pse.ProcUsage(&pcpu, &rss, &vss)
5,362✔
1564

5,362✔
1565
        s.mu.RLock()
5,362✔
1566
        // We need to create a new instance of Varz (with no reference
5,362✔
1567
        // whatsoever to anything stored in the server) since the user
5,362✔
1568
        // has access to the returned value.
5,362✔
1569
        v := s.createVarz(pcpu, rss)
5,362✔
1570
        s.mu.RUnlock()
5,362✔
1571

5,362✔
1572
        if js := s.getJetStream(); js != nil {
5,378✔
1573
                s.updateJszVarz(js, &v.JetStream, true)
16✔
1574
        }
16✔
1575

1576
        return v, nil
5,362✔
1577
}
1578

1579
// Returns a Varz instance.
1580
// Server lock is held on entry.
1581
func (s *Server) createVarz(pcpu float64, rss int64) *Varz {
5,408✔
1582
        info := s.info
5,408✔
1583
        opts := s.getOpts()
5,408✔
1584
        c := &opts.Cluster
5,408✔
1585
        gw := &opts.Gateway
5,408✔
1586
        ln := &opts.LeafNode
5,408✔
1587
        mqtt := &opts.MQTT
5,408✔
1588
        ws := &opts.Websocket
5,408✔
1589
        clustTlsReq := c.TLSConfig != nil
5,408✔
1590
        gatewayTlsReq := gw.TLSConfig != nil
5,408✔
1591
        leafTlsReq := ln.TLSConfig != nil
5,408✔
1592
        leafTlsVerify := leafTlsReq && ln.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
5,408✔
1593
        leafTlsOCSPPeerVerify := s.ocspPeerVerify && leafTlsReq && ln.tlsConfigOpts.OCSPPeerConfig != nil && ln.tlsConfigOpts.OCSPPeerConfig.Verify
5,408✔
1594
        mqttTlsOCSPPeerVerify := s.ocspPeerVerify && mqtt.TLSConfig != nil && mqtt.tlsConfigOpts.OCSPPeerConfig != nil && mqtt.tlsConfigOpts.OCSPPeerConfig.Verify
5,408✔
1595
        wsTlsOCSPPeerVerify := s.ocspPeerVerify && ws.TLSConfig != nil && ws.tlsConfigOpts.OCSPPeerConfig != nil && ws.tlsConfigOpts.OCSPPeerConfig.Verify
5,408✔
1596
        varz := &Varz{
5,408✔
1597
                ID:           info.ID,
5,408✔
1598
                Version:      info.Version,
5,408✔
1599
                Proto:        info.Proto,
5,408✔
1600
                GitCommit:    info.GitCommit,
5,408✔
1601
                GoVersion:    info.GoVersion,
5,408✔
1602
                Name:         info.Name,
5,408✔
1603
                Host:         info.Host,
5,408✔
1604
                Port:         info.Port,
5,408✔
1605
                IP:           info.IP,
5,408✔
1606
                HTTPHost:     opts.HTTPHost,
5,408✔
1607
                HTTPPort:     opts.HTTPPort,
5,408✔
1608
                HTTPBasePath: opts.HTTPBasePath,
5,408✔
1609
                HTTPSPort:    opts.HTTPSPort,
5,408✔
1610
                Cluster: ClusterOptsVarz{
5,408✔
1611
                        Name:          info.Cluster,
5,408✔
1612
                        Host:          c.Host,
5,408✔
1613
                        Port:          c.Port,
5,408✔
1614
                        AuthTimeout:   c.AuthTimeout,
5,408✔
1615
                        TLSTimeout:    c.TLSTimeout,
5,408✔
1616
                        TLSRequired:   clustTlsReq,
5,408✔
1617
                        TLSVerify:     clustTlsReq,
5,408✔
1618
                        PoolSize:      opts.Cluster.PoolSize,
5,408✔
1619
                        WriteDeadline: opts.Cluster.WriteDeadline,
5,408✔
1620
                        WriteTimeout:  opts.Cluster.WriteTimeout.String(),
5,408✔
1621
                },
5,408✔
1622
                Gateway: GatewayOptsVarz{
5,408✔
1623
                        Name:           gw.Name,
5,408✔
1624
                        Host:           gw.Host,
5,408✔
1625
                        Port:           gw.Port,
5,408✔
1626
                        AuthTimeout:    gw.AuthTimeout,
5,408✔
1627
                        TLSTimeout:     gw.TLSTimeout,
5,408✔
1628
                        TLSRequired:    gatewayTlsReq,
5,408✔
1629
                        TLSVerify:      gatewayTlsReq,
5,408✔
1630
                        Advertise:      gw.Advertise,
5,408✔
1631
                        ConnectRetries: gw.ConnectRetries,
5,408✔
1632
                        Gateways:       []RemoteGatewayOptsVarz{},
5,408✔
1633
                        RejectUnknown:  gw.RejectUnknown,
5,408✔
1634
                        WriteDeadline:  opts.Cluster.WriteDeadline,
5,408✔
1635
                        WriteTimeout:   opts.Cluster.WriteTimeout.String(),
5,408✔
1636
                },
5,408✔
1637
                LeafNode: LeafNodeOptsVarz{
5,408✔
1638
                        Host:              ln.Host,
5,408✔
1639
                        Port:              ln.Port,
5,408✔
1640
                        AuthTimeout:       ln.AuthTimeout,
5,408✔
1641
                        TLSTimeout:        ln.TLSTimeout,
5,408✔
1642
                        TLSRequired:       leafTlsReq,
5,408✔
1643
                        TLSVerify:         leafTlsVerify,
5,408✔
1644
                        TLSOCSPPeerVerify: leafTlsOCSPPeerVerify,
5,408✔
1645
                        Remotes:           []RemoteLeafOptsVarz{},
5,408✔
1646
                        WriteDeadline:     opts.Cluster.WriteDeadline,
5,408✔
1647
                        WriteTimeout:      opts.Cluster.WriteTimeout.String(),
5,408✔
1648
                },
5,408✔
1649
                MQTT: MQTTOptsVarz{
5,408✔
1650
                        Host:              mqtt.Host,
5,408✔
1651
                        Port:              mqtt.Port,
5,408✔
1652
                        NoAuthUser:        mqtt.NoAuthUser,
5,408✔
1653
                        AuthTimeout:       mqtt.AuthTimeout,
5,408✔
1654
                        TLSMap:            mqtt.TLSMap,
5,408✔
1655
                        TLSTimeout:        mqtt.TLSTimeout,
5,408✔
1656
                        JsDomain:          mqtt.JsDomain,
5,408✔
1657
                        AckWait:           mqtt.AckWait,
5,408✔
1658
                        MaxAckPending:     mqtt.MaxAckPending,
5,408✔
1659
                        TLSOCSPPeerVerify: mqttTlsOCSPPeerVerify,
5,408✔
1660
                },
5,408✔
1661
                Websocket: WebsocketOptsVarz{
5,408✔
1662
                        Host:              ws.Host,
5,408✔
1663
                        Port:              ws.Port,
5,408✔
1664
                        Advertise:         ws.Advertise,
5,408✔
1665
                        NoAuthUser:        ws.NoAuthUser,
5,408✔
1666
                        JWTCookie:         ws.JWTCookie,
5,408✔
1667
                        AuthTimeout:       ws.AuthTimeout,
5,408✔
1668
                        NoTLS:             ws.NoTLS,
5,408✔
1669
                        TLSMap:            ws.TLSMap,
5,408✔
1670
                        SameOrigin:        ws.SameOrigin,
5,408✔
1671
                        AllowedOrigins:    copyStrings(ws.AllowedOrigins),
5,408✔
1672
                        Compression:       ws.Compression,
5,408✔
1673
                        HandshakeTimeout:  ws.HandshakeTimeout,
5,408✔
1674
                        TLSOCSPPeerVerify: wsTlsOCSPPeerVerify,
5,408✔
1675
                },
5,408✔
1676
                Start:                 s.start.UTC(),
5,408✔
1677
                MaxSubs:               opts.MaxSubs,
5,408✔
1678
                Cores:                 runtime.NumCPU(),
5,408✔
1679
                MaxProcs:              runtime.GOMAXPROCS(0),
5,408✔
1680
                TrustedOperatorsJwt:   opts.operatorJWT,
5,408✔
1681
                TrustedOperatorsClaim: opts.TrustedOperators,
5,408✔
1682
        }
5,408✔
1683
        if mm := debug.SetMemoryLimit(-1); mm < math.MaxInt64 {
5,408✔
1684
                varz.MemLimit = mm
×
1685
        }
×
1686
        // If this is a leaf without cluster, reset the cluster name (that is otherwise
1687
        // set to the server name).
1688
        if s.leafNoCluster {
5,414✔
1689
                varz.Cluster.Name = _EMPTY_
6✔
1690
        }
6✔
1691
        if len(opts.Routes) > 0 {
5,452✔
1692
                varz.Cluster.URLs = urlsToStrings(opts.Routes)
44✔
1693
        }
44✔
1694
        if l := len(gw.Gateways); l > 0 {
5,421✔
1695
                rgwa := make([]RemoteGatewayOptsVarz, l)
13✔
1696
                for i, r := range gw.Gateways {
26✔
1697
                        rgwa[i] = RemoteGatewayOptsVarz{
13✔
1698
                                Name:       r.Name,
13✔
1699
                                TLSTimeout: r.TLSTimeout,
13✔
1700
                        }
13✔
1701
                }
13✔
1702
                varz.Gateway.Gateways = rgwa
13✔
1703
        }
1704
        if l := len(ln.Remotes); l > 0 {
5,414✔
1705
                rlna := make([]RemoteLeafOptsVarz, l)
6✔
1706
                for i, r := range ln.Remotes {
15✔
1707
                        var deny *DenyRules
9✔
1708
                        if len(r.DenyImports) > 0 || len(r.DenyExports) > 0 {
9✔
1709
                                deny = &DenyRules{
×
1710
                                        Imports: r.DenyImports,
×
1711
                                        Exports: r.DenyExports,
×
1712
                                }
×
1713
                        }
×
1714
                        remoteTlsOCSPPeerVerify := s.ocspPeerVerify && r.tlsConfigOpts != nil && r.tlsConfigOpts.OCSPPeerConfig != nil && r.tlsConfigOpts.OCSPPeerConfig.Verify
9✔
1715

9✔
1716
                        rlna[i] = RemoteLeafOptsVarz{
9✔
1717
                                LocalAccount:      r.LocalAccount,
9✔
1718
                                URLs:              urlsToStrings(r.URLs),
9✔
1719
                                TLSTimeout:        r.TLSTimeout,
9✔
1720
                                Deny:              deny,
9✔
1721
                                TLSOCSPPeerVerify: remoteTlsOCSPPeerVerify,
9✔
1722
                        }
9✔
1723
                }
1724
                varz.LeafNode.Remotes = rlna
6✔
1725
        }
1726

1727
        // Finish setting it up with fields that can be updated during
1728
        // configuration reload and runtime.
1729
        s.updateVarzConfigReloadableFields(varz)
5,408✔
1730
        s.updateVarzRuntimeFields(varz, true, pcpu, rss)
5,408✔
1731
        return varz
5,408✔
1732
}
1733

1734
func urlsToStrings(urls []*url.URL) []string {
55✔
1735
        sURLs := make([]string, len(urls))
55✔
1736
        for i, u := range urls {
141✔
1737
                sURLs[i] = u.Host
86✔
1738
        }
86✔
1739
        return sURLs
55✔
1740
}
1741

1742
// Invoked during configuration reload once options have possibly be changed
1743
// and config load time has been set. If s.varz has not been initialized yet
1744
// (because no pooling of /varz has been made), this function does nothing.
1745
// Server lock is held on entry.
1746
func (s *Server) updateVarzConfigReloadableFields(v *Varz) {
6,751✔
1747
        if v == nil {
8,091✔
1748
                return
1,340✔
1749
        }
1,340✔
1750
        opts := s.getOpts()
5,411✔
1751
        info := &s.info
5,411✔
1752
        v.AuthRequired = info.AuthRequired
5,411✔
1753
        v.TLSRequired = info.TLSRequired
5,411✔
1754
        v.TLSVerify = info.TLSVerify
5,411✔
1755
        v.MaxConn = opts.MaxConn
5,411✔
1756
        v.PingInterval = opts.PingInterval
5,411✔
1757
        v.MaxPingsOut = opts.MaxPingsOut
5,411✔
1758
        v.AuthTimeout = opts.AuthTimeout
5,411✔
1759
        v.MaxControlLine = opts.MaxControlLine
5,411✔
1760
        v.MaxPayload = int(opts.MaxPayload)
5,411✔
1761
        v.MaxPending = opts.MaxPending
5,411✔
1762
        v.TLSTimeout = opts.TLSTimeout
5,411✔
1763
        v.WriteDeadline = opts.WriteDeadline
5,411✔
1764
        v.WriteTimeout = opts.WriteTimeout.String()
5,411✔
1765
        v.ConfigLoadTime = s.configTime.UTC()
5,411✔
1766
        v.ConfigDigest = opts.configDigest
5,411✔
1767
        v.Tags = opts.Tags
5,411✔
1768
        v.Metadata = opts.Metadata
5,411✔
1769
        // Update route URLs if applicable
5,411✔
1770
        if s.varzUpdateRouteURLs {
5,413✔
1771
                v.Cluster.URLs = urlsToStrings(opts.Routes)
2✔
1772
                s.varzUpdateRouteURLs = false
2✔
1773
        }
2✔
1774
        if s.sys != nil && s.sys.account != nil {
5,505✔
1775
                v.SystemAccount = s.sys.account.GetName()
94✔
1776
        }
94✔
1777
        v.MQTT.TLSPinnedCerts = getPinnedCertsAsSlice(opts.MQTT.TLSPinnedCerts)
5,411✔
1778
        v.Websocket.TLSPinnedCerts = getPinnedCertsAsSlice(opts.Websocket.TLSPinnedCerts)
5,411✔
1779

5,411✔
1780
        v.TLSOCSPPeerVerify = s.ocspPeerVerify && v.TLSRequired && s.opts.tlsConfigOpts != nil && s.opts.tlsConfigOpts.OCSPPeerConfig != nil && s.opts.tlsConfigOpts.OCSPPeerConfig.Verify
5,411✔
1781

5,411✔
1782
        if opts.Proxies != nil {
5,413✔
1783
                if v.Proxies == nil {
4✔
1784
                        v.Proxies = &ProxiesOptsVarz{}
2✔
1785
                }
2✔
1786
                trusted := make([]*ProxyOptsVarz, 0, len(opts.Proxies.Trusted))
2✔
1787
                for _, t := range opts.Proxies.Trusted {
6✔
1788
                        trusted = append(trusted, &ProxyOptsVarz{Key: t.Key})
4✔
1789
                }
4✔
1790
                v.Proxies.Trusted = trusted
2✔
1791
        } else {
5,409✔
1792
                v.Proxies = nil
5,409✔
1793
        }
5,409✔
1794
}
1795

1796
func getPinnedCertsAsSlice(certs PinnedCertSet) []string {
10,822✔
1797
        if len(certs) == 0 {
21,640✔
1798
                return nil
10,818✔
1799
        }
10,818✔
1800
        res := make([]string, 0, len(certs))
4✔
1801
        for cn := range certs {
8✔
1802
                res = append(res, cn)
4✔
1803
        }
4✔
1804
        return res
4✔
1805
}
1806

1807
// Updates the runtime Varz fields, that is, fields that change during
1808
// runtime and that should be updated any time Varz() or polling of /varz
1809
// is done.
1810
// Server lock is held on entry.
1811
func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64, rss int64) {
10,902✔
1812
        v.Now = time.Now().UTC()
10,902✔
1813
        v.Uptime = myUptime(time.Since(s.start))
10,902✔
1814
        v.Mem = rss
10,902✔
1815
        v.CPU = pcpu
10,902✔
1816
        if l := len(s.info.ClientConnectURLs); l > 0 {
10,987✔
1817
                v.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
85✔
1818
        }
85✔
1819
        if l := len(s.info.WSConnectURLs); l > 0 {
10,902✔
1820
                v.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
×
1821
        }
×
1822
        v.Connections = len(s.clients)
10,902✔
1823
        v.TotalConnections = s.totalClients
10,902✔
1824
        v.Routes = s.numRoutes()
10,902✔
1825
        v.Remotes = s.numRemotes()
10,902✔
1826
        v.Leafs = len(s.leafs)
10,902✔
1827
        v.InMsgs = atomic.LoadInt64(&s.inMsgs)
10,902✔
1828
        v.InBytes = atomic.LoadInt64(&s.inBytes)
10,902✔
1829
        v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
10,902✔
1830
        v.OutBytes = atomic.LoadInt64(&s.outBytes)
10,902✔
1831
        v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
10,902✔
1832
        v.StalledClients = atomic.LoadInt64(&s.stalls)
10,902✔
1833
        v.SlowConsumersStats = &SlowConsumersStats{
10,902✔
1834
                Clients:  s.NumSlowConsumersClients(),
10,902✔
1835
                Routes:   s.NumSlowConsumersRoutes(),
10,902✔
1836
                Gateways: s.NumSlowConsumersGateways(),
10,902✔
1837
                Leafs:    s.NumSlowConsumersLeafs(),
10,902✔
1838
        }
10,902✔
1839
        v.StaleConnections = atomic.LoadInt64(&s.staleConnections)
10,902✔
1840
        v.StaleConnectionStats = &StaleConnectionStats{
10,902✔
1841
                Clients:  s.NumStaleConnectionsClients(),
10,902✔
1842
                Routes:   s.NumStaleConnectionsRoutes(),
10,902✔
1843
                Gateways: s.NumStaleConnectionsGateways(),
10,902✔
1844
                Leafs:    s.NumStaleConnectionsLeafs(),
10,902✔
1845
        }
10,902✔
1846
        v.PinnedAccountFail = atomic.LoadUint64(&s.pinnedAccFail)
10,902✔
1847

10,902✔
1848
        // Make sure to reset in case we are re-using.
10,902✔
1849
        v.Subscriptions = 0
10,902✔
1850
        s.accounts.Range(func(k, val any) bool {
21,970✔
1851
                acc := val.(*Account)
11,068✔
1852
                v.Subscriptions += acc.sl.Count()
11,068✔
1853
                return true
11,068✔
1854
        })
11,068✔
1855

1856
        v.HTTPReqStats = make(map[string]uint64, len(s.httpReqStats))
10,902✔
1857
        for key, val := range s.httpReqStats {
22,176✔
1858
                v.HTTPReqStats[key] = val
11,274✔
1859
        }
11,274✔
1860

1861
        // Update Gateway remote urls if applicable
1862
        gw := s.gateway
10,902✔
1863
        gw.RLock()
10,902✔
1864
        if gw.enabled {
10,926✔
1865
                for i := 0; i < len(v.Gateway.Gateways); i++ {
40✔
1866
                        g := &v.Gateway.Gateways[i]
16✔
1867
                        rgw := gw.remotes[g.Name]
16✔
1868
                        if rgw != nil {
30✔
1869
                                rgw.RLock()
14✔
1870
                                // forceUpdate is needed if user calls Varz() programmatically,
14✔
1871
                                // since we need to create a new instance every time and the
14✔
1872
                                // gateway's varzUpdateURLs may have been set to false after
14✔
1873
                                // a web /varz inspection.
14✔
1874
                                if forceUpdate || rgw.varzUpdateURLs {
27✔
1875
                                        // Make reuse of backend array
13✔
1876
                                        g.URLs = g.URLs[:0]
13✔
1877
                                        // rgw.urls is a map[string]*url.URL where the key is
13✔
1878
                                        // already in the right format (host:port, without any
13✔
1879
                                        // user info present).
13✔
1880
                                        for u := range rgw.urls {
31✔
1881
                                                g.URLs = append(g.URLs, u)
18✔
1882
                                        }
18✔
1883
                                        rgw.varzUpdateURLs = false
13✔
1884
                                }
1885
                                rgw.RUnlock()
14✔
1886
                        } else if g.Name == gw.name && len(gw.ownCfgURLs) > 0 {
4✔
1887
                                // This is a remote that correspond to this very same server.
2✔
1888
                                // We report the URLs that were configured (if any).
2✔
1889
                                // Since we don't support changes to the gateway configuration
2✔
1890
                                // at this time, we could do this only if g.URLs has not been already
2✔
1891
                                // set, but let's do it regardless in case we add support for
2✔
1892
                                // gateway config reload.
2✔
1893
                                g.URLs = g.URLs[:0]
2✔
1894
                                g.URLs = append(g.URLs, gw.ownCfgURLs...)
2✔
1895
                        }
2✔
1896
                }
1897
        }
1898
        gw.RUnlock()
10,902✔
1899

10,902✔
1900
        if s.ocsprc != nil && s.ocsprc.Type() != "none" {
10,917✔
1901
                stats := s.ocsprc.Stats()
15✔
1902
                if stats != nil {
30✔
1903
                        v.OCSPResponseCache = &OCSPResponseCacheVarz{
15✔
1904
                                s.ocsprc.Type(),
15✔
1905
                                stats.Hits,
15✔
1906
                                stats.Misses,
15✔
1907
                                stats.Responses,
15✔
1908
                                stats.Revokes,
15✔
1909
                                stats.Goods,
15✔
1910
                                stats.Unknowns,
15✔
1911
                        }
15✔
1912
                }
15✔
1913
        }
1914
}
1915

1916
// HandleVarz will process HTTP requests for server information.
1917
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
5,540✔
1918
        var rss, vss int64
5,540✔
1919
        var pcpu float64
5,540✔
1920

5,540✔
1921
        // We want to do that outside of the lock.
5,540✔
1922
        pse.ProcUsage(&pcpu, &rss, &vss)
5,540✔
1923

5,540✔
1924
        // In response to http requests, we want to minimize mem copies
5,540✔
1925
        // so we use an object stored in the server. Creating/collecting
5,540✔
1926
        // server metrics is done under server lock, but we don't want
5,540✔
1927
        // to marshal under that lock. Still, we need to prevent concurrent
5,540✔
1928
        // http requests to /varz to update s.varz while marshal is
5,540✔
1929
        // happening, so we need a new lock that serialize those http
5,540✔
1930
        // requests and include marshaling.
5,540✔
1931
        s.varzMu.Lock()
5,540✔
1932

5,540✔
1933
        // Use server lock to create/update the server's varz object.
5,540✔
1934
        s.mu.Lock()
5,540✔
1935
        var created bool
5,540✔
1936
        s.httpReqStats[VarzPath]++
5,540✔
1937
        if s.varz == nil {
5,586✔
1938
                s.varz = s.createVarz(pcpu, rss)
46✔
1939
                created = true
46✔
1940
        } else {
5,540✔
1941
                s.updateVarzRuntimeFields(s.varz, false, pcpu, rss)
5,494✔
1942
        }
5,494✔
1943
        s.mu.Unlock()
5,540✔
1944
        // Since locking is jetStream -> Server, need to update jetstream
5,540✔
1945
        // varz outside of server lock.
5,540✔
1946

5,540✔
1947
        if js := s.getJetStream(); js != nil {
5,569✔
1948
                var v JetStreamVarz
29✔
1949
                // Work on stack variable
29✔
1950
                s.updateJszVarz(js, &v, created)
29✔
1951
                // Now update server's varz
29✔
1952
                s.mu.RLock()
29✔
1953
                sv := &s.varz.JetStream
29✔
1954
                if created {
35✔
1955
                        sv.Config = v.Config
6✔
1956
                }
6✔
1957
                sv.Stats = v.Stats
29✔
1958
                sv.Meta = v.Meta
29✔
1959
                sv.Limits = v.Limits
29✔
1960
                s.mu.RUnlock()
29✔
1961
        }
1962

1963
        // Do the marshaling outside of server lock, but under varzMu lock.
1964
        b, err := json.MarshalIndent(s.varz, "", "  ")
5,540✔
1965
        s.varzMu.Unlock()
5,540✔
1966

5,540✔
1967
        if err != nil {
5,540✔
1968
                s.Errorf("Error marshaling response to /varz request: %v", err)
×
1969
        }
×
1970

1971
        // Handle response
1972
        ResponseHandler(w, r, b)
5,540✔
1973
}
1974

1975
// GatewayzOptions are the options passed to Gatewayz()
1976
type GatewayzOptions struct {
1977
        // Name will output only remote gateways with this name
1978
        Name string `json:"name"`
1979

1980
        // Accounts indicates if accounts with its interest should be included in the results.
1981
        Accounts bool `json:"accounts"`
1982

1983
        // AccountName will limit the list of accounts to that account name (makes Accounts implicit)
1984
        AccountName string `json:"account_name"`
1985

1986
        // AccountSubscriptions indicates if subscriptions should be included in the results.
1987
        // Note: This is used only if `Accounts` or `AccountName` are specified.
1988
        AccountSubscriptions bool `json:"subscriptions"`
1989

1990
        // AccountSubscriptionsDetail indicates if subscription details should be included in the results.
1991
        // Note: This is used only if `Accounts` or `AccountName` are specified.
1992
        AccountSubscriptionsDetail bool `json:"subscriptions_detail"`
1993
}
1994

1995
// Gatewayz represents detailed information on Gateways
1996
type Gatewayz struct {
1997
        ID               string                       `json:"server_id"`
1998
        Now              time.Time                    `json:"now"`
1999
        Name             string                       `json:"name,omitempty"`
2000
        Host             string                       `json:"host,omitempty"`
2001
        Port             int                          `json:"port,omitempty"`
2002
        OutboundGateways map[string]*RemoteGatewayz   `json:"outbound_gateways"`
2003
        InboundGateways  map[string][]*RemoteGatewayz `json:"inbound_gateways"`
2004
}
2005

2006
// RemoteGatewayz represents information about an outbound connection to a gateway
2007
type RemoteGatewayz struct {
2008
        IsConfigured bool               `json:"configured"`
2009
        Connection   *ConnInfo          `json:"connection,omitempty"`
2010
        Accounts     []*AccountGatewayz `json:"accounts,omitempty"`
2011
}
2012

2013
// AccountGatewayz represents interest mode for this account
2014
type AccountGatewayz struct {
2015
        Name                  string      `json:"name"`
2016
        InterestMode          string      `json:"interest_mode"`
2017
        NoInterestCount       int         `json:"no_interest_count,omitempty"`
2018
        InterestOnlyThreshold int         `json:"interest_only_threshold,omitempty"`
2019
        TotalSubscriptions    int         `json:"num_subs,omitempty"`
2020
        NumQueueSubscriptions int         `json:"num_queue_subs,omitempty"`
2021
        Subs                  []string    `json:"subscriptions_list,omitempty"`
2022
        SubsDetail            []SubDetail `json:"subscriptions_list_detail,omitempty"`
2023
}
2024

2025
// Gatewayz returns a Gatewayz struct containing information about gateways.
2026
func (s *Server) Gatewayz(opts *GatewayzOptions) (*Gatewayz, error) {
45✔
2027
        srvID := s.ID()
45✔
2028
        now := time.Now().UTC()
45✔
2029
        gw := s.gateway
45✔
2030
        gw.RLock()
45✔
2031
        if !gw.enabled || gw.info == nil {
48✔
2032
                gw.RUnlock()
3✔
2033
                gwz := &Gatewayz{
3✔
2034
                        ID:               srvID,
3✔
2035
                        Now:              now,
3✔
2036
                        OutboundGateways: map[string]*RemoteGatewayz{},
3✔
2037
                        InboundGateways:  map[string][]*RemoteGatewayz{},
3✔
2038
                }
3✔
2039
                return gwz, nil
3✔
2040
        }
3✔
2041
        // Here gateways are enabled, so fill up more.
2042
        gwz := &Gatewayz{
42✔
2043
                ID:   srvID,
42✔
2044
                Now:  now,
42✔
2045
                Name: gw.name,
42✔
2046
                Host: gw.info.Host,
42✔
2047
                Port: gw.info.Port,
42✔
2048
        }
42✔
2049
        gw.RUnlock()
42✔
2050

42✔
2051
        gwz.OutboundGateways = s.createOutboundsRemoteGatewayz(opts, now)
42✔
2052
        gwz.InboundGateways = s.createInboundsRemoteGatewayz(opts, now)
42✔
2053

42✔
2054
        return gwz, nil
42✔
2055
}
2056

2057
// Based on give options struct, returns if there is a filtered
2058
// Gateway Name and if we should do report Accounts.
2059
// Note that if Accounts is false but AccountName is not empty,
2060
// then Accounts is implicitly set to true.
2061
func getMonitorGWOptions(opts *GatewayzOptions) (string, bool) {
84✔
2062
        var name string
84✔
2063
        var accs bool
84✔
2064
        if opts != nil {
156✔
2065
                if opts.Name != _EMPTY_ {
88✔
2066
                        name = opts.Name
16✔
2067
                }
16✔
2068
                accs = opts.Accounts
72✔
2069
                if !accs && opts.AccountName != _EMPTY_ {
80✔
2070
                        accs = true
8✔
2071
                }
8✔
2072
        }
2073
        return name, accs
84✔
2074
}
2075

2076
// Returns a map of gateways outbound connections.
2077
// Based on options, will include a single or all gateways,
2078
// with no/single/or all accounts interest information.
2079
func (s *Server) createOutboundsRemoteGatewayz(opts *GatewayzOptions, now time.Time) map[string]*RemoteGatewayz {
42✔
2080
        targetGWName, doAccs := getMonitorGWOptions(opts)
42✔
2081

42✔
2082
        if targetGWName != _EMPTY_ {
50✔
2083
                c := s.getOutboundGatewayConnection(targetGWName)
8✔
2084
                if c == nil {
8✔
2085
                        return nil
×
2086
                }
×
2087
                outbounds := make(map[string]*RemoteGatewayz, 1)
8✔
2088
                _, rgw := createOutboundRemoteGatewayz(c, opts, now, doAccs)
8✔
2089
                outbounds[targetGWName] = rgw
8✔
2090
                return outbounds
8✔
2091
        }
2092

2093
        var connsa [16]*client
34✔
2094
        var conns = connsa[:0]
34✔
2095

34✔
2096
        s.getOutboundGatewayConnections(&conns)
34✔
2097

34✔
2098
        outbounds := make(map[string]*RemoteGatewayz, len(conns))
34✔
2099
        for _, c := range conns {
64✔
2100
                name, rgw := createOutboundRemoteGatewayz(c, opts, now, doAccs)
30✔
2101
                if rgw != nil {
60✔
2102
                        outbounds[name] = rgw
30✔
2103
                }
30✔
2104
        }
2105
        return outbounds
34✔
2106
}
2107

2108
// Returns a RemoteGatewayz for a given outbound gw connection
2109
func createOutboundRemoteGatewayz(c *client, opts *GatewayzOptions, now time.Time, doAccs bool) (string, *RemoteGatewayz) {
38✔
2110
        var name string
38✔
2111
        var rgw *RemoteGatewayz
38✔
2112

38✔
2113
        c.mu.Lock()
38✔
2114
        if c.gw != nil {
76✔
2115
                rgw = &RemoteGatewayz{}
38✔
2116
                if doAccs {
58✔
2117
                        rgw.Accounts = createOutboundAccountsGatewayz(opts, c.gw)
20✔
2118
                }
20✔
2119
                if c.gw.cfg != nil {
76✔
2120
                        rgw.IsConfigured = !c.gw.cfg.isImplicit()
38✔
2121
                }
38✔
2122
                rgw.Connection = &ConnInfo{}
38✔
2123
                rgw.Connection.fill(c, c.nc, now, false)
38✔
2124
                name = c.gw.name
38✔
2125
        }
2126
        c.mu.Unlock()
38✔
2127

38✔
2128
        return name, rgw
38✔
2129
}
2130

2131
// Returns the list of accounts for this outbound gateway connection.
2132
// Based on the options, it will be a single or all accounts for
2133
// this outbound.
2134
func createOutboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*AccountGatewayz {
20✔
2135
        if gw.outsim == nil {
20✔
2136
                return nil
×
2137
        }
×
2138

2139
        var accName string
20✔
2140
        if opts != nil {
40✔
2141
                accName = opts.AccountName
20✔
2142
        }
20✔
2143
        if accName != _EMPTY_ {
28✔
2144
                ei, ok := gw.outsim.Load(accName)
8✔
2145
                if !ok {
10✔
2146
                        return nil
2✔
2147
                }
2✔
2148
                a := createAccountOutboundGatewayz(opts, accName, ei)
6✔
2149
                return []*AccountGatewayz{a}
6✔
2150
        }
2151

2152
        accs := make([]*AccountGatewayz, 0, 4)
12✔
2153
        gw.outsim.Range(func(k, v any) bool {
84✔
2154
                name := k.(string)
72✔
2155
                a := createAccountOutboundGatewayz(opts, name, v)
72✔
2156
                accs = append(accs, a)
72✔
2157
                return true
72✔
2158
        })
72✔
2159
        return accs
12✔
2160
}
2161

2162
// Returns an AccountGatewayz for this gateway outbound connection
2163
func createAccountOutboundGatewayz(opts *GatewayzOptions, name string, ei any) *AccountGatewayz {
78✔
2164
        a := &AccountGatewayz{
78✔
2165
                Name:                  name,
78✔
2166
                InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch,
78✔
2167
        }
78✔
2168
        if ei != nil {
126✔
2169
                e := ei.(*outsie)
48✔
2170
                e.RLock()
48✔
2171
                a.InterestMode = e.mode.String()
48✔
2172
                a.NoInterestCount = len(e.ni)
48✔
2173
                a.NumQueueSubscriptions = e.qsubs
48✔
2174
                a.TotalSubscriptions = int(e.sl.Count())
48✔
2175
                if opts.AccountSubscriptions || opts.AccountSubscriptionsDetail {
64✔
2176
                        var subsa [4096]*subscription
16✔
2177
                        subs := subsa[:0]
16✔
2178
                        e.sl.All(&subs)
16✔
2179
                        if opts.AccountSubscriptions {
24✔
2180
                                a.Subs = make([]string, 0, len(subs))
8✔
2181
                        } else {
16✔
2182
                                a.SubsDetail = make([]SubDetail, 0, len(subs))
8✔
2183
                        }
8✔
2184
                        for _, sub := range subs {
36✔
2185
                                if opts.AccountSubscriptions {
30✔
2186
                                        a.Subs = append(a.Subs, string(sub.subject))
10✔
2187
                                } else {
20✔
2188
                                        a.SubsDetail = append(a.SubsDetail, newClientSubDetail(sub))
10✔
2189
                                }
10✔
2190
                        }
2191
                }
2192
                e.RUnlock()
48✔
2193
        } else {
30✔
2194
                a.InterestMode = Optimistic.String()
30✔
2195
        }
30✔
2196
        return a
78✔
2197
}
2198

2199
// Returns a map of gateways inbound connections.
2200
// Each entry is an array of RemoteGatewayz since a given server
2201
// may have more than one inbound from the same remote gateway.
2202
// Based on options, will include a single or all gateways,
2203
// with no/single/or all accounts interest information.
2204
func (s *Server) createInboundsRemoteGatewayz(opts *GatewayzOptions, now time.Time) map[string][]*RemoteGatewayz {
42✔
2205
        targetGWName, doAccs := getMonitorGWOptions(opts)
42✔
2206

42✔
2207
        var connsa [16]*client
42✔
2208
        var conns = connsa[:0]
42✔
2209
        s.getInboundGatewayConnections(&conns)
42✔
2210

42✔
2211
        m := make(map[string][]*RemoteGatewayz)
42✔
2212
        for _, c := range conns {
92✔
2213
                c.mu.Lock()
50✔
2214
                if c.gw != nil && (targetGWName == _EMPTY_ || targetGWName == c.gw.name) {
98✔
2215
                        igws := m[c.gw.name]
48✔
2216
                        if igws == nil {
86✔
2217
                                igws = make([]*RemoteGatewayz, 0, 2)
38✔
2218
                        }
38✔
2219
                        rgw := &RemoteGatewayz{}
48✔
2220
                        if doAccs {
68✔
2221
                                rgw.Accounts = createInboundAccountsGatewayz(opts, c.gw)
20✔
2222
                        }
20✔
2223
                        rgw.Connection = &ConnInfo{}
48✔
2224
                        rgw.Connection.fill(c, c.nc, now, false)
48✔
2225
                        igws = append(igws, rgw)
48✔
2226
                        m[c.gw.name] = igws
48✔
2227
                }
2228
                c.mu.Unlock()
50✔
2229
        }
2230
        return m
42✔
2231
}
2232

2233
// Returns the list of accounts for this inbound gateway connection.
2234
// Based on the options, it will be a single or all accounts for
2235
// this inbound.
2236
func createInboundAccountsGatewayz(opts *GatewayzOptions, gw *gateway) []*AccountGatewayz {
20✔
2237
        if gw.insim == nil {
20✔
2238
                return nil
×
2239
        }
×
2240

2241
        var accName string
20✔
2242
        if opts != nil {
40✔
2243
                accName = opts.AccountName
20✔
2244
        }
20✔
2245
        if accName != _EMPTY_ {
28✔
2246
                e, ok := gw.insim[accName]
8✔
2247
                if !ok {
10✔
2248
                        return nil
2✔
2249
                }
2✔
2250
                a := createInboundAccountGatewayz(accName, e)
6✔
2251
                return []*AccountGatewayz{a}
6✔
2252
        }
2253

2254
        accs := make([]*AccountGatewayz, 0, 4)
12✔
2255
        for name, e := range gw.insim {
84✔
2256
                a := createInboundAccountGatewayz(name, e)
72✔
2257
                accs = append(accs, a)
72✔
2258
        }
72✔
2259
        return accs
12✔
2260
}
2261

2262
// Returns an AccountGatewayz for this gateway inbound connection
2263
func createInboundAccountGatewayz(name string, e *insie) *AccountGatewayz {
78✔
2264
        a := &AccountGatewayz{
78✔
2265
                Name:                  name,
78✔
2266
                InterestOnlyThreshold: gatewayMaxRUnsubBeforeSwitch,
78✔
2267
        }
78✔
2268
        if e != nil {
126✔
2269
                a.InterestMode = e.mode.String()
48✔
2270
                a.NoInterestCount = len(e.ni)
48✔
2271
        } else {
78✔
2272
                a.InterestMode = Optimistic.String()
30✔
2273
        }
30✔
2274
        return a
78✔
2275
}
2276

2277
// HandleGatewayz process HTTP requests for route information.
2278
func (s *Server) HandleGatewayz(w http.ResponseWriter, r *http.Request) {
19✔
2279
        s.mu.Lock()
19✔
2280
        s.httpReqStats[GatewayzPath]++
19✔
2281
        s.mu.Unlock()
19✔
2282

19✔
2283
        subs, subsDet, err := decodeSubs(w, r)
19✔
2284
        if err != nil {
19✔
2285
                return
×
2286
        }
×
2287
        accs, err := decodeBool(w, r, "accs")
19✔
2288
        if err != nil {
19✔
2289
                return
×
2290
        }
×
2291
        gwName := r.URL.Query().Get("gw_name")
19✔
2292
        accName := r.URL.Query().Get("acc_name")
19✔
2293
        if accName != _EMPTY_ {
23✔
2294
                accs = true
4✔
2295
        }
4✔
2296

2297
        opts := &GatewayzOptions{
19✔
2298
                Name:                       gwName,
19✔
2299
                Accounts:                   accs,
19✔
2300
                AccountName:                accName,
19✔
2301
                AccountSubscriptions:       subs,
19✔
2302
                AccountSubscriptionsDetail: subsDet,
19✔
2303
        }
19✔
2304
        gw, err := s.Gatewayz(opts)
19✔
2305
        if err != nil {
19✔
2306
                w.WriteHeader(http.StatusBadRequest)
×
2307
                w.Write([]byte(err.Error()))
×
2308
                return
×
2309
        }
×
2310
        b, err := json.MarshalIndent(gw, "", "  ")
19✔
2311
        if err != nil {
19✔
2312
                s.Errorf("Error marshaling response to /gatewayz request: %v", err)
×
2313
        }
×
2314

2315
        // Handle response
2316
        ResponseHandler(w, r, b)
19✔
2317
}
2318

2319
// Leafz represents detailed information on Leafnodes.
2320
type Leafz struct {
2321
        ID       string      `json:"server_id"`
2322
        Now      time.Time   `json:"now"`
2323
        NumLeafs int         `json:"leafnodes"`
2324
        Leafs    []*LeafInfo `json:"leafs"`
2325
}
2326

2327
// LeafzOptions are options passed to Leafz
2328
type LeafzOptions struct {
2329
        // Subscriptions indicates that Leafz will return a leafnode's subscriptions
2330
        Subscriptions bool   `json:"subscriptions"`
2331
        Account       string `json:"account"`
2332
}
2333

2334
// LeafInfo has detailed information on each remote leafnode connection.
2335
type LeafInfo struct {
2336
        ID          uint64     `json:"id"`
2337
        Name        string     `json:"name"`
2338
        IsSpoke     bool       `json:"is_spoke"`
2339
        IsIsolated  bool       `json:"is_isolated,omitempty"`
2340
        Account     string     `json:"account"`
2341
        IP          string     `json:"ip"`
2342
        Port        int        `json:"port"`
2343
        RTT         string     `json:"rtt,omitempty"`
2344
        InMsgs      int64      `json:"in_msgs"`
2345
        OutMsgs     int64      `json:"out_msgs"`
2346
        InBytes     int64      `json:"in_bytes"`
2347
        OutBytes    int64      `json:"out_bytes"`
2348
        NumSubs     uint32     `json:"subscriptions"`
2349
        Subs        []string   `json:"subscriptions_list,omitempty"`
2350
        Compression string     `json:"compression,omitempty"`
2351
        Proxy       *ProxyInfo `json:"proxy,omitempty"`
2352
}
2353

2354
// Leafz returns a Leafz structure containing information about leafnodes.
2355
func (s *Server) Leafz(opts *LeafzOptions) (*Leafz, error) {
133✔
2356
        // Grab leafnodes
133✔
2357
        var lconns []*client
133✔
2358
        s.mu.Lock()
133✔
2359
        if len(s.leafs) > 0 {
258✔
2360
                lconns = make([]*client, 0, len(s.leafs))
125✔
2361
                for _, ln := range s.leafs {
259✔
2362
                        if opts != nil && opts.Account != _EMPTY_ {
138✔
2363
                                ln.mu.Lock()
4✔
2364
                                ok := ln.acc.Name == opts.Account
4✔
2365
                                ln.mu.Unlock()
4✔
2366
                                if !ok {
7✔
2367
                                        continue
3✔
2368
                                }
2369
                        }
2370
                        lconns = append(lconns, ln)
131✔
2371
                }
2372
        }
2373
        s.mu.Unlock()
133✔
2374

133✔
2375
        leafnodes := make([]*LeafInfo, 0, len(lconns))
133✔
2376

133✔
2377
        if len(lconns) > 0 {
257✔
2378
                for _, ln := range lconns {
255✔
2379
                        ln.mu.Lock()
131✔
2380
                        lni := &LeafInfo{
131✔
2381
                                ID:          ln.cid,
131✔
2382
                                Name:        ln.leaf.remoteServer,
131✔
2383
                                IsSpoke:     ln.isSpokeLeafNode(),
131✔
2384
                                IsIsolated:  ln.leaf.isolated,
131✔
2385
                                Account:     ln.acc.Name,
131✔
2386
                                IP:          ln.host,
131✔
2387
                                Port:        int(ln.port),
131✔
2388
                                RTT:         ln.getRTT().String(),
131✔
2389
                                InMsgs:      atomic.LoadInt64(&ln.inMsgs),
131✔
2390
                                OutMsgs:     ln.outMsgs,
131✔
2391
                                InBytes:     atomic.LoadInt64(&ln.inBytes),
131✔
2392
                                OutBytes:    ln.outBytes,
131✔
2393
                                NumSubs:     uint32(len(ln.subs)),
131✔
2394
                                Compression: ln.leaf.compression,
131✔
2395
                                Proxy:       createProxyInfo(ln),
131✔
2396
                        }
131✔
2397
                        if opts != nil && opts.Subscriptions {
149✔
2398
                                lni.Subs = make([]string, 0, len(ln.subs))
18✔
2399
                                for _, sub := range ln.subs {
106✔
2400
                                        lni.Subs = append(lni.Subs, string(sub.subject))
88✔
2401
                                }
88✔
2402
                        }
2403
                        ln.mu.Unlock()
131✔
2404
                        leafnodes = append(leafnodes, lni)
131✔
2405
                }
2406
        }
2407

2408
        return &Leafz{
133✔
2409
                ID:       s.ID(),
133✔
2410
                Now:      time.Now().UTC(),
133✔
2411
                NumLeafs: len(leafnodes),
133✔
2412
                Leafs:    leafnodes,
133✔
2413
        }, nil
133✔
2414
}
2415

2416
// HandleLeafz process HTTP requests for leafnode information.
2417
func (s *Server) HandleLeafz(w http.ResponseWriter, r *http.Request) {
×
2418
        s.mu.Lock()
×
2419
        s.httpReqStats[LeafzPath]++
×
2420
        s.mu.Unlock()
×
2421

×
2422
        subs, err := decodeBool(w, r, "subs")
×
2423
        if err != nil {
×
2424
                return
×
2425
        }
×
2426
        l, err := s.Leafz(&LeafzOptions{subs, r.URL.Query().Get("acc")})
×
2427
        if err != nil {
×
2428
                w.WriteHeader(http.StatusBadRequest)
×
2429
                w.Write([]byte(err.Error()))
×
2430
                return
×
2431
        }
×
2432
        b, err := json.MarshalIndent(l, "", "  ")
×
2433
        if err != nil {
×
2434
                s.Errorf("Error marshaling response to /leafz request: %v", err)
×
2435
        }
×
2436

2437
        // Handle response
2438
        ResponseHandler(w, r, b)
×
2439
}
2440

2441
// Leafz represents detailed information on Leafnodes.
2442
type AccountStatz struct {
2443
        ID       string         `json:"server_id"`
2444
        Now      time.Time      `json:"now"`
2445
        Accounts []*AccountStat `json:"account_statz"`
2446
}
2447

2448
// AccountStatzOptions are options passed to account stats requests.
2449
type AccountStatzOptions struct {
2450
        Accounts      []string `json:"accounts"`
2451
        IncludeUnused bool     `json:"include_unused"`
2452
}
2453

2454
// Leafz returns a AccountStatz structure containing summary information about accounts.
2455
func (s *Server) AccountStatz(opts *AccountStatzOptions) (*AccountStatz, error) {
26✔
2456
        stz := &AccountStatz{
26✔
2457
                ID:       s.ID(),
26✔
2458
                Now:      time.Now().UTC(),
26✔
2459
                Accounts: []*AccountStat{},
26✔
2460
        }
26✔
2461
        if opts == nil || len(opts.Accounts) == 0 {
41✔
2462
                s.accounts.Range(func(key, a any) bool {
62✔
2463
                        acc := a.(*Account)
47✔
2464
                        acc.mu.RLock()
47✔
2465
                        if (opts != nil && opts.IncludeUnused) || acc.numLocalConnections() != 0 {
91✔
2466
                                stz.Accounts = append(stz.Accounts, acc.statz())
44✔
2467
                        }
44✔
2468
                        acc.mu.RUnlock()
47✔
2469
                        return true
47✔
2470
                })
2471
        } else {
11✔
2472
                for _, a := range opts.Accounts {
22✔
2473
                        if acc, ok := s.accounts.Load(a); ok {
22✔
2474
                                acc := acc.(*Account)
11✔
2475
                                acc.mu.RLock()
11✔
2476
                                if opts.IncludeUnused || acc.numLocalConnections() != 0 {
21✔
2477
                                        stz.Accounts = append(stz.Accounts, acc.statz())
10✔
2478
                                }
10✔
2479
                                acc.mu.RUnlock()
11✔
2480
                        }
2481
                }
2482
        }
2483
        return stz, nil
26✔
2484
}
2485

2486
// HandleAccountStatz process HTTP requests for statz information of all accounts.
2487
func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) {
7✔
2488
        s.mu.Lock()
7✔
2489
        s.httpReqStats[AccountStatzPath]++
7✔
2490
        s.mu.Unlock()
7✔
2491

7✔
2492
        unused, err := decodeBool(w, r, "unused")
7✔
2493
        if err != nil {
7✔
2494
                return
×
2495
        }
×
2496

2497
        l, err := s.AccountStatz(&AccountStatzOptions{IncludeUnused: unused})
7✔
2498
        if err != nil {
7✔
2499
                w.WriteHeader(http.StatusBadRequest)
×
2500
                w.Write([]byte(err.Error()))
×
2501
                return
×
2502
        }
×
2503
        b, err := json.MarshalIndent(l, "", "  ")
7✔
2504
        if err != nil {
7✔
2505
                s.Errorf("Error marshaling response to %s request: %v", AccountStatzPath, err)
×
2506
                return
×
2507
        }
×
2508

2509
        // Handle response
2510
        ResponseHandler(w, r, b)
7✔
2511
}
2512

2513
// ResponseHandler handles responses for monitoring routes.
2514
func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) {
7,423✔
2515
        handleResponse(http.StatusOK, w, r, data)
7,423✔
2516
}
7,423✔
2517

2518
// handleResponse handles responses for monitoring routes with a specific HTTP status code.
2519
func handleResponse(code int, w http.ResponseWriter, r *http.Request, data []byte) {
7,447✔
2520
        // Get callback from request
7,447✔
2521
        callback := r.URL.Query().Get("callback")
7,447✔
2522
        if callback != _EMPTY_ {
7,463✔
2523
                // Response for JSONP
16✔
2524
                w.Header().Set("Content-Type", "application/javascript")
16✔
2525
                w.WriteHeader(code)
16✔
2526
                fmt.Fprintf(w, "%s(%s)", callback, data)
16✔
2527
        } else {
7,447✔
2528
                // Otherwise JSON
7,431✔
2529
                w.Header().Set("Content-Type", "application/json")
7,431✔
2530
                w.Header().Set("Access-Control-Allow-Origin", "*")
7,431✔
2531
                w.WriteHeader(code)
7,431✔
2532
                w.Write(data)
7,431✔
2533
        }
7,431✔
2534
}
2535

2536
func (reason ClosedState) String() string {
44,083✔
2537
        switch reason {
44,083✔
2538
        case ClientClosed:
24,720✔
2539
                return "Client Closed"
24,720✔
2540
        case AuthenticationTimeout:
31✔
2541
                return "Authentication Timeout"
31✔
2542
        case AuthenticationViolation:
13,792✔
2543
                return "Authentication Failure"
13,792✔
2544
        case TLSHandshakeError:
496✔
2545
                return "TLS Handshake Failure"
496✔
2546
        case SlowConsumerPendingBytes:
1✔
2547
                return "Slow Consumer (Pending Bytes)"
1✔
2548
        case SlowConsumerWriteDeadline:
28✔
2549
                return "Slow Consumer (Write Deadline)"
28✔
2550
        case WriteError:
26✔
2551
                return "Write Error"
26✔
2552
        case ReadError:
252✔
2553
                return "Read Error"
252✔
2554
        case ParseError:
15✔
2555
                return "Parse Error"
15✔
2556
        case StaleConnection:
7✔
2557
                return "Stale Connection"
7✔
2558
        case ProtocolViolation:
108✔
2559
                return "Protocol Violation"
108✔
2560
        case BadClientProtocolVersion:
4✔
2561
                return "Bad Client Protocol Version"
4✔
2562
        case WrongPort:
116✔
2563
                return "Incorrect Port"
116✔
2564
        case MaxConnectionsExceeded:
7✔
2565
                return "Maximum Connections Exceeded"
7✔
2566
        case MaxAccountConnectionsExceeded:
114✔
2567
                return "Maximum Account Connections Exceeded"
114✔
2568
        case MaxPayloadExceeded:
19✔
2569
                return "Maximum Message Payload Exceeded"
19✔
2570
        case MaxControlLineExceeded:
3✔
2571
                return "Maximum Control Line Exceeded"
3✔
2572
        case MaxSubscriptionsExceeded:
×
2573
                return "Maximum Subscriptions Exceeded"
×
2574
        case DuplicateRoute:
98✔
2575
                return "Duplicate Route"
98✔
2576
        case RouteRemoved:
×
2577
                return "Route Removed"
×
2578
        case ServerShutdown:
3,948✔
2579
                return "Server Shutdown"
3,948✔
2580
        case AuthenticationExpired:
28✔
2581
                return "Authentication Expired"
28✔
2582
        case WrongGateway:
5✔
2583
                return "Wrong Gateway"
5✔
2584
        case MissingAccount:
6✔
2585
                return "Missing Account"
6✔
2586
        case Revocation:
44✔
2587
                return "Credentials Revoked"
44✔
2588
        case InternalClient:
18✔
2589
                return "Internal Client"
18✔
2590
        case MsgHeaderViolation:
×
2591
                return "Message Header Violation"
×
2592
        case NoRespondersRequiresHeaders:
2✔
2593
                return "No Responders Requires Headers"
2✔
2594
        case ClusterNameConflict:
2✔
2595
                return "Cluster Name Conflict"
2✔
2596
        case DuplicateRemoteLeafnodeConnection:
8✔
2597
                return "Duplicate Remote LeafNode Connection"
8✔
2598
        case DuplicateClientID:
20✔
2599
                return "Duplicate Client ID"
20✔
2600
        case DuplicateServerName:
5✔
2601
                return "Duplicate Server Name"
5✔
2602
        case MinimumVersionRequired:
×
2603
                return "Minimum Version Required"
×
2604
        case ClusterNamesIdentical:
6✔
2605
                return "Cluster Names Identical"
6✔
2606
        case Kicked:
4✔
2607
                return "Kicked"
4✔
2608
        case ProxyNotTrusted:
32✔
2609
                return "Proxy Not Trusted"
32✔
2610
        case ProxyRequired:
118✔
2611
                return "Proxy Required"
118✔
2612
        }
2613

2614
        return "Unknown State"
×
2615
}
2616

2617
// AccountzOptions are options passed to Accountz
2618
type AccountzOptions struct {
2619
        // Account indicates that Accountz will return details for the account
2620
        Account string `json:"account"`
2621
}
2622

2623
func newExtServiceLatency(l *serviceLatency) *jwt.ServiceLatency {
151✔
2624
        if l == nil {
302✔
2625
                return nil
151✔
2626
        }
151✔
2627
        return &jwt.ServiceLatency{
×
2628
                Sampling: jwt.SamplingRate(l.sampling),
×
2629
                Results:  jwt.Subject(l.subject),
×
2630
        }
×
2631
}
2632

2633
type ExtImport struct {
2634
        jwt.Import
2635
        Invalid     bool                `json:"invalid"`
2636
        Share       bool                `json:"share"`
2637
        Tracking    bool                `json:"tracking"`
2638
        TrackingHdr http.Header         `json:"tracking_header,omitempty"`
2639
        Latency     *jwt.ServiceLatency `json:"latency,omitempty"`
2640
        M1          *ServiceLatency     `json:"m1,omitempty"`
2641
}
2642

2643
type ExtExport struct {
2644
        jwt.Export
2645
        ApprovedAccounts []string             `json:"approved_accounts,omitempty"`
2646
        RevokedAct       map[string]time.Time `json:"revoked_activations,omitempty"`
2647
}
2648

2649
type ExtVrIssues struct {
2650
        Description string `json:"description"`
2651
        Blocking    bool   `json:"blocking"`
2652
        Time        bool   `json:"time_check"`
2653
}
2654

2655
type ExtMap map[string][]*MapDest
2656

2657
type AccountInfo struct {
2658
        AccountName string               `json:"account_name"`
2659
        LastUpdate  time.Time            `json:"update_time,omitempty"`
2660
        IsSystem    bool                 `json:"is_system,omitempty"`
2661
        Expired     bool                 `json:"expired"`
2662
        Complete    bool                 `json:"complete"`
2663
        JetStream   bool                 `json:"jetstream_enabled"`
2664
        LeafCnt     int                  `json:"leafnode_connections"`
2665
        ClientCnt   int                  `json:"client_connections"`
2666
        SubCnt      uint32               `json:"subscriptions"`
2667
        Mappings    ExtMap               `json:"mappings,omitempty"`
2668
        Exports     []ExtExport          `json:"exports,omitempty"`
2669
        Imports     []ExtImport          `json:"imports,omitempty"`
2670
        Jwt         string               `json:"jwt,omitempty"`
2671
        IssuerKey   string               `json:"issuer_key,omitempty"`
2672
        NameTag     string               `json:"name_tag,omitempty"`
2673
        Tags        jwt.TagList          `json:"tags,omitempty"`
2674
        Claim       *jwt.AccountClaims   `json:"decoded_jwt,omitempty"`
2675
        Vr          []ExtVrIssues        `json:"validation_result_jwt,omitempty"`
2676
        RevokedUser map[string]time.Time `json:"revoked_user,omitempty"`
2677
        Sublist     *SublistStats        `json:"sublist_stats,omitempty"`
2678
        Responses   map[string]ExtImport `json:"responses,omitempty"`
2679
}
2680

2681
type Accountz struct {
2682
        ID            string       `json:"server_id"`
2683
        Now           time.Time    `json:"now"`
2684
        SystemAccount string       `json:"system_account,omitempty"`
2685
        Accounts      []string     `json:"accounts,omitempty"`
2686
        Account       *AccountInfo `json:"account_detail,omitempty"`
2687
}
2688

2689
// HandleAccountz process HTTP requests for account information.
2690
func (s *Server) HandleAccountz(w http.ResponseWriter, r *http.Request) {
6✔
2691
        s.mu.Lock()
6✔
2692
        s.httpReqStats[AccountzPath]++
6✔
2693
        s.mu.Unlock()
6✔
2694
        if l, err := s.Accountz(&AccountzOptions{r.URL.Query().Get("acc")}); err != nil {
6✔
2695
                w.WriteHeader(http.StatusBadRequest)
×
2696
                w.Write([]byte(err.Error()))
×
2697
        } else if b, err := json.MarshalIndent(l, "", "  "); err != nil {
6✔
2698
                s.Errorf("Error marshaling response to %s request: %v", AccountzPath, err)
×
2699
                w.WriteHeader(http.StatusBadRequest)
×
2700
                w.Write([]byte(err.Error()))
×
2701
        } else {
6✔
2702
                ResponseHandler(w, r, b) // Handle response
6✔
2703
        }
6✔
2704
}
2705

2706
func (s *Server) Accountz(optz *AccountzOptions) (*Accountz, error) {
30✔
2707
        a := &Accountz{
30✔
2708
                ID:  s.ID(),
30✔
2709
                Now: time.Now().UTC(),
30✔
2710
        }
30✔
2711
        if sacc := s.SystemAccount(); sacc != nil {
60✔
2712
                a.SystemAccount = sacc.GetName()
30✔
2713
        }
30✔
2714
        if optz == nil || optz.Account == _EMPTY_ {
36✔
2715
                a.Accounts = []string{}
6✔
2716
                s.accounts.Range(func(key, value any) bool {
20✔
2717
                        a.Accounts = append(a.Accounts, key.(string))
14✔
2718
                        return true
14✔
2719
                })
14✔
2720
                return a, nil
6✔
2721
        }
2722
        aInfo, err := s.accountInfo(optz.Account)
24✔
2723
        if err != nil {
24✔
2724
                return nil, err
×
2725
        }
×
2726
        a.Account = aInfo
24✔
2727
        return a, nil
24✔
2728
}
2729

2730
func newExtImport(v *serviceImport) ExtImport {
57✔
2731
        imp := ExtImport{
57✔
2732
                Invalid: true,
57✔
2733
                Import:  jwt.Import{Type: jwt.Service},
57✔
2734
        }
57✔
2735
        if v != nil {
114✔
2736
                imp.Share = v.share
57✔
2737
                imp.Tracking = v.tracking
57✔
2738
                imp.Invalid = v.invalid
57✔
2739
                imp.Import = jwt.Import{
57✔
2740
                        Subject: jwt.Subject(v.to),
57✔
2741
                        Account: v.acc.Name,
57✔
2742
                        Type:    jwt.Service,
57✔
2743
                        // Deprecated so we duplicate. Use LocalSubject.
57✔
2744
                        To:           jwt.Subject(v.from),
57✔
2745
                        LocalSubject: jwt.RenamingSubject(v.from),
57✔
2746
                }
57✔
2747
                imp.TrackingHdr = v.trackingHdr
57✔
2748
                imp.Latency = newExtServiceLatency(v.latency)
57✔
2749
                if v.m1 != nil {
57✔
2750
                        m1 := *v.m1
×
2751
                        imp.M1 = &m1
×
2752
                }
×
2753
        }
2754
        return imp
57✔
2755
}
2756

2757
func (s *Server) accountInfo(accName string) (*AccountInfo, error) {
27✔
2758
        var a *Account
27✔
2759
        if v, ok := s.accounts.Load(accName); !ok {
27✔
2760
                return nil, fmt.Errorf("Account %s does not exist", accName)
×
2761
        } else {
27✔
2762
                a = v.(*Account)
27✔
2763
        }
27✔
2764
        isSys := a == s.SystemAccount()
27✔
2765
        a.mu.RLock()
27✔
2766
        defer a.mu.RUnlock()
27✔
2767
        var vrIssues []ExtVrIssues
27✔
2768
        claim, _ := jwt.DecodeAccountClaims(a.claimJWT) // ignore error
27✔
2769
        if claim != nil {
50✔
2770
                vr := jwt.ValidationResults{}
23✔
2771
                claim.Validate(&vr)
23✔
2772
                vrIssues = make([]ExtVrIssues, len(vr.Issues))
23✔
2773
                for i, v := range vr.Issues {
23✔
2774
                        vrIssues[i] = ExtVrIssues{v.Description, v.Blocking, v.TimeCheck}
×
2775
                }
×
2776
        }
2777
        collectRevocations := func(revocations map[string]int64) map[string]time.Time {
148✔
2778
                l := len(revocations)
121✔
2779
                if l == 0 {
242✔
2780
                        return nil
121✔
2781
                }
121✔
2782
                rev := make(map[string]time.Time, l)
×
2783
                for k, v := range revocations {
×
2784
                        rev[k] = time.Unix(v, 0)
×
2785
                }
×
2786
                return rev
×
2787
        }
2788
        exports := []ExtExport{}
27✔
2789
        for k, v := range a.exports.services {
121✔
2790
                e := ExtExport{
94✔
2791
                        Export: jwt.Export{
94✔
2792
                                Subject: jwt.Subject(k),
94✔
2793
                                Type:    jwt.Service,
94✔
2794
                        },
94✔
2795
                        ApprovedAccounts: []string{},
94✔
2796
                }
94✔
2797
                if v != nil {
188✔
2798
                        e.Latency = newExtServiceLatency(v.latency)
94✔
2799
                        e.TokenReq = v.tokenReq
94✔
2800
                        e.ResponseType = jwt.ResponseType(v.respType.String())
94✔
2801
                        for name := range v.approved {
94✔
2802
                                e.ApprovedAccounts = append(e.ApprovedAccounts, name)
×
2803
                        }
×
2804
                        e.RevokedAct = collectRevocations(v.actsRevoked)
94✔
2805
                }
2806
                exports = append(exports, e)
94✔
2807
        }
2808
        for k, v := range a.exports.streams {
27✔
2809
                e := ExtExport{
×
2810
                        Export: jwt.Export{
×
2811
                                Subject: jwt.Subject(k),
×
2812
                                Type:    jwt.Stream,
×
2813
                        },
×
2814
                        ApprovedAccounts: []string{},
×
2815
                }
×
2816
                if v != nil {
×
2817
                        e.TokenReq = v.tokenReq
×
2818
                        for name := range v.approved {
×
2819
                                e.ApprovedAccounts = append(e.ApprovedAccounts, name)
×
2820
                        }
×
2821
                        e.RevokedAct = collectRevocations(v.actsRevoked)
×
2822
                }
2823
                exports = append(exports, e)
×
2824
        }
2825
        imports := []ExtImport{}
27✔
2826
        for _, v := range a.imports.streams {
27✔
2827
                imp := ExtImport{
×
2828
                        Invalid: true,
×
2829
                        Import:  jwt.Import{Type: jwt.Stream},
×
2830
                }
×
2831
                if v != nil {
×
2832
                        imp.Invalid = v.invalid
×
2833
                        imp.Import = jwt.Import{
×
2834
                                Subject:      jwt.Subject(v.from),
×
2835
                                Account:      v.acc.Name,
×
2836
                                Type:         jwt.Stream,
×
2837
                                LocalSubject: jwt.RenamingSubject(v.to),
×
2838
                        }
×
2839
                }
×
2840
                imports = append(imports, imp)
×
2841
        }
2842
        for _, sis := range a.imports.services {
82✔
2843
                for _, v := range sis {
111✔
2844
                        imports = append(imports, newExtImport(v))
56✔
2845
                }
56✔
2846
        }
2847
        responses := map[string]ExtImport{}
27✔
2848
        for k, v := range a.exports.responses {
28✔
2849
                responses[k] = newExtImport(v)
1✔
2850
        }
1✔
2851
        mappings := ExtMap{}
27✔
2852
        for _, m := range a.mappings {
30✔
2853
                var dests []*MapDest
3✔
2854
                var src string
3✔
2855
                if m == nil {
3✔
2856
                        src = "nil"
×
2857
                        if _, ok := mappings[src]; ok { // only set if not present (keep orig in case nil is used)
×
2858
                                continue
×
2859
                        }
2860
                        dests = append(dests, &MapDest{})
×
2861
                } else {
3✔
2862
                        src = m.src
3✔
2863
                        for _, d := range m.dests {
8✔
2864
                                dests = append(dests, &MapDest{d.tr.dest, d.weight, _EMPTY_})
5✔
2865
                        }
5✔
2866
                        for c, cd := range m.cdests {
3✔
2867
                                for _, d := range cd {
×
2868
                                        dests = append(dests, &MapDest{d.tr.dest, d.weight, c})
×
2869
                                }
×
2870
                        }
2871
                }
2872
                mappings[src] = dests
3✔
2873
        }
2874
        return &AccountInfo{
27✔
2875
                AccountName: accName,
27✔
2876
                LastUpdate:  a.updated.UTC(),
27✔
2877
                IsSystem:    isSys,
27✔
2878
                Expired:     a.expired.Load(),
27✔
2879
                Complete:    !a.incomplete,
27✔
2880
                JetStream:   a.js != nil,
27✔
2881
                LeafCnt:     a.numLocalLeafNodes(),
27✔
2882
                ClientCnt:   a.numLocalConnections(),
27✔
2883
                SubCnt:      a.sl.Count(),
27✔
2884
                Mappings:    mappings,
27✔
2885
                Exports:     exports,
27✔
2886
                Imports:     imports,
27✔
2887
                Jwt:         a.claimJWT,
27✔
2888
                IssuerKey:   a.Issuer,
27✔
2889
                NameTag:     a.getNameTagLocked(),
27✔
2890
                Tags:        a.tags,
27✔
2891
                Claim:       claim,
27✔
2892
                Vr:          vrIssues,
27✔
2893
                RevokedUser: collectRevocations(a.usersRevoked),
27✔
2894
                Sublist:     a.sl.Stats(),
27✔
2895
                Responses:   responses,
27✔
2896
        }, nil
27✔
2897
}
2898

2899
// JSzOptions are options passed to Jsz
2900
type JSzOptions struct {
2901
        Account          string `json:"account,omitempty"`
2902
        Accounts         bool   `json:"accounts,omitempty"`
2903
        Streams          bool   `json:"streams,omitempty"`
2904
        Consumer         bool   `json:"consumer,omitempty"`
2905
        DirectConsumer   bool   `json:"direct_consumer,omitempty"`
2906
        Config           bool   `json:"config,omitempty"`
2907
        LeaderOnly       bool   `json:"leader_only,omitempty"`
2908
        Offset           int    `json:"offset,omitempty"`
2909
        Limit            int    `json:"limit,omitempty"`
2910
        RaftGroups       bool   `json:"raft,omitempty"`
2911
        StreamLeaderOnly bool   `json:"stream_leader_only,omitempty"`
2912
}
2913

2914
// HealthzOptions are options passed to Healthz
2915
type HealthzOptions struct {
2916
        // Deprecated: Use JSEnabledOnly instead
2917
        JSEnabled     bool   `json:"js-enabled,omitempty"`
2918
        JSEnabledOnly bool   `json:"js-enabled-only,omitempty"`
2919
        JSServerOnly  bool   `json:"js-server-only,omitempty"`
2920
        JSMetaOnly    bool   `json:"js-meta-only,omitempty"`
2921
        Account       string `json:"account,omitempty"`
2922
        Stream        string `json:"stream,omitempty"`
2923
        Consumer      string `json:"consumer,omitempty"`
2924
        Details       bool   `json:"details,omitempty"`
2925
}
2926

2927
// ProfilezOptions are options passed to Profilez
2928
type ProfilezOptions struct {
2929
        Name     string        `json:"name"`
2930
        Debug    int           `json:"debug"`
2931
        Duration time.Duration `json:"duration,omitempty"`
2932
}
2933

2934
// IpqueueszOptions are options passed to Ipqueuesz
2935
type IpqueueszOptions struct {
2936
        All    bool   `json:"all"`
2937
        Filter string `json:"filter"`
2938
}
2939

2940
// RaftzOptions are options passed to Raftz
2941
type RaftzOptions struct {
2942
        AccountFilter string `json:"account"`
2943
        GroupFilter   string `json:"group"`
2944
}
2945

2946
// StreamDetail shows information about the stream state and its consumers.
2947
type StreamDetail struct {
2948
        Name               string              `json:"name"`
2949
        Created            time.Time           `json:"created"`
2950
        Cluster            *ClusterInfo        `json:"cluster,omitempty"`
2951
        Config             *StreamConfig       `json:"config,omitempty"`
2952
        State              StreamState         `json:"state,omitempty"`
2953
        Consumer           []*ConsumerInfo     `json:"consumer_detail,omitempty"`
2954
        DirectConsumer     []*ConsumerInfo     `json:"direct_consumer_detail,omitempty"`
2955
        Mirror             *StreamSourceInfo   `json:"mirror,omitempty"`
2956
        Sources            []*StreamSourceInfo `json:"sources,omitempty"`
2957
        RaftGroup          string              `json:"stream_raft_group,omitempty"`
2958
        ConsumerRaftGroups []*RaftGroupDetail  `json:"consumer_raft_groups,omitempty"`
2959
}
2960

2961
// RaftGroupDetail shows information details about the Raft group.
2962
type RaftGroupDetail struct {
2963
        Name      string `json:"name"`
2964
        RaftGroup string `json:"raft_group,omitempty"`
2965
}
2966

2967
type AccountDetail struct {
2968
        Name string `json:"name"`
2969
        Id   string `json:"id"`
2970
        JetStreamStats
2971
        Streams []StreamDetail `json:"stream_detail,omitempty"`
2972
}
2973

2974
// MetaSnapshotStats shows information about meta snapshots.
2975
type MetaSnapshotStats struct {
2976
        PendingEntries uint64        `json:"pending_entries"`         // PendingEntries is the count of pending entries in the meta layer
2977
        PendingSize    uint64        `json:"pending_size"`            // PendingSize is the size in bytes of pending entries in the meta layer
2978
        LastTime       time.Time     `json:"last_time,omitempty"`     // LastTime is when the last meta snapshot was taken
2979
        LastDuration   time.Duration `json:"last_duration,omitempty"` // LastDuration is how long the last meta snapshot took
2980
}
2981

2982
// MetaClusterInfo shows information about the meta group.
2983
type MetaClusterInfo struct {
2984
        Name     string             `json:"name,omitempty"`     // Name is the name of the cluster
2985
        Leader   string             `json:"leader,omitempty"`   // Leader is the server name of the cluster leader
2986
        Peer     string             `json:"peer,omitempty"`     // Peer is unique ID of the leader
2987
        Replicas []*PeerInfo        `json:"replicas,omitempty"` // Replicas is a list of known peers
2988
        Size     int                `json:"cluster_size"`       // Size is the known size of the cluster
2989
        Pending  int                `json:"pending"`            // Pending is how many RAFT messages are not yet processed
2990
        Snapshot *MetaSnapshotStats `json:"snapshot"`           // Snapshot contains meta snapshot statistics
2991
}
2992

2993
// JSInfo has detailed information on JetStream.
2994
type JSInfo struct {
2995
        JetStreamStats
2996
        ID              string           `json:"server_id"`
2997
        Now             time.Time        `json:"now"`
2998
        Disabled        bool             `json:"disabled,omitempty"`
2999
        Config          JetStreamConfig  `json:"config,omitempty"`
3000
        Limits          *JSLimitOpts     `json:"limits,omitempty"`
3001
        Streams         int              `json:"streams"`
3002
        StreamsLeader   int              `json:"streams_leader,omitempty"`
3003
        Consumers       int              `json:"consumers"`
3004
        ConsumersLeader int              `json:"consumers_leader,omitempty"`
3005
        Messages        uint64           `json:"messages"`
3006
        Bytes           uint64           `json:"bytes"`
3007
        Meta            *MetaClusterInfo `json:"meta_cluster,omitempty"`
3008
        AccountDetails  []*AccountDetail `json:"account_details,omitempty"`
3009
        Total           int              `json:"total"`
3010
}
3011

3012
func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optDirectConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail {
277✔
3013
        jsa.mu.RLock()
277✔
3014
        acc := jsa.account
277✔
3015
        name := acc.GetName()
277✔
3016
        id := name
277✔
3017
        if acc.nameTag != _EMPTY_ {
279✔
3018
                name = acc.nameTag
2✔
3019
        }
2✔
3020
        jsa.usageMu.RLock()
277✔
3021
        totalMem, totalStore := jsa.storageTotals()
277✔
3022
        detail := AccountDetail{
277✔
3023
                Name: name,
277✔
3024
                Id:   id,
277✔
3025
                JetStreamStats: JetStreamStats{
277✔
3026
                        Memory: totalMem,
277✔
3027
                        Store:  totalStore,
277✔
3028
                        API: JetStreamAPIStats{
277✔
3029
                                Total:  jsa.apiTotal,
277✔
3030
                                Errors: jsa.apiErrors,
277✔
3031
                        },
277✔
3032
                },
277✔
3033
                Streams: make([]StreamDetail, 0, len(jsa.streams)),
277✔
3034
        }
277✔
3035
        if reserved, ok := jsa.limits[_EMPTY_]; ok {
554✔
3036
                detail.JetStreamStats.ReservedMemory = uint64(reserved.MaxMemory)
277✔
3037
                detail.JetStreamStats.ReservedStore = uint64(reserved.MaxStore)
277✔
3038
        }
277✔
3039
        jsa.usageMu.RUnlock()
277✔
3040

277✔
3041
        var streams []*stream
277✔
3042
        if optStreams {
529✔
3043
                for _, stream := range jsa.streams {
527✔
3044
                        streams = append(streams, stream)
275✔
3045
                }
275✔
3046
        }
3047
        jsa.mu.RUnlock()
277✔
3048

277✔
3049
        if js := s.getJetStream(); js != nil && optStreams {
529✔
3050
                for _, stream := range streams {
527✔
3051
                        rgroup := stream.raftGroup()
275✔
3052
                        ci := js.clusterInfo(rgroup)
275✔
3053
                        var cfg *StreamConfig
275✔
3054
                        if optCfg {
280✔
3055
                                c := stream.config()
5✔
3056
                                cfg = &c
5✔
3057
                        }
5✔
3058
                        // Skip if we are only looking for stream leaders.
3059
                        if optStreamLeader && ci != nil && ci.Leader != s.Name() {
277✔
3060
                                continue
2✔
3061
                        }
3062
                        sdet := StreamDetail{
273✔
3063
                                Name:    stream.name(),
273✔
3064
                                Created: stream.createdTime(),
273✔
3065
                                State:   stream.state(),
273✔
3066
                                Cluster: ci,
273✔
3067
                                Config:  cfg,
273✔
3068
                                Mirror:  stream.mirrorInfo(),
273✔
3069
                                Sources: stream.sourcesInfo(),
273✔
3070
                        }
273✔
3071
                        if optRaft && rgroup != nil {
278✔
3072
                                sdet.RaftGroup = rgroup.Name
5✔
3073
                                sdet.ConsumerRaftGroups = make([]*RaftGroupDetail, 0)
5✔
3074
                        }
5✔
3075
                        if optConsumers {
533✔
3076
                                for _, consumer := range stream.getPublicConsumers() {
344✔
3077
                                        cInfo := consumer.info()
84✔
3078
                                        if cInfo == nil {
84✔
3079
                                                continue
×
3080
                                        }
3081
                                        if !optCfg {
163✔
3082
                                                cInfo.Config = nil
79✔
3083
                                        }
79✔
3084
                                        sdet.Consumer = append(sdet.Consumer, cInfo)
84✔
3085
                                        if optRaft {
89✔
3086
                                                crgroup := consumer.raftGroup()
5✔
3087
                                                if crgroup != nil {
10✔
3088
                                                        sdet.ConsumerRaftGroups = append(sdet.ConsumerRaftGroups,
5✔
3089
                                                                &RaftGroupDetail{cInfo.Name, crgroup.Name},
5✔
3090
                                                        )
5✔
3091
                                                }
5✔
3092
                                        }
3093
                                }
3094
                                if optDirectConsumers {
262✔
3095
                                        for _, consumer := range stream.getDirectConsumers() {
3✔
3096
                                                cInfo := consumer.info()
1✔
3097
                                                if cInfo == nil {
1✔
3098
                                                        continue
×
3099
                                                }
3100
                                                if !optCfg {
2✔
3101
                                                        cInfo.Config = nil
1✔
3102
                                                }
1✔
3103
                                                sdet.DirectConsumer = append(sdet.Consumer, cInfo)
1✔
3104
                                        }
3105
                                }
3106
                        }
3107
                        detail.Streams = append(detail.Streams, sdet)
273✔
3108
                }
3109
        }
3110
        return &detail
277✔
3111
}
3112

3113
func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
1✔
3114
        js := s.getJetStream()
1✔
3115
        if js == nil {
1✔
3116
                return nil, fmt.Errorf("jetstream not enabled")
×
3117
        }
×
3118
        acc := opts.Account
1✔
3119
        account, ok := s.accounts.Load(acc)
1✔
3120
        if !ok {
1✔
3121
                return nil, fmt.Errorf("account %q not found", acc)
×
3122
        }
×
3123
        js.mu.RLock()
1✔
3124
        jsa, ok := js.accounts[account.(*Account).Name]
1✔
3125
        js.mu.RUnlock()
1✔
3126
        if !ok {
1✔
3127
                return nil, fmt.Errorf("account %q not jetstream enabled", acc)
×
3128
        }
×
3129
        return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil
1✔
3130
}
3131

3132
// helper to get cluster info from node via dummy group
3133
func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo {
3,025✔
3134
        if node == nil {
3,025✔
3135
                return nil
×
3136
        }
×
3137
        peers := node.Peers()
3,025✔
3138
        peerList := make([]string, len(peers))
3,025✔
3139
        for i, p := range peers {
14,446✔
3140
                peerList[i] = p.ID
11,421✔
3141
        }
11,421✔
3142
        group := &raftGroup{
3,025✔
3143
                Name:  _EMPTY_,
3,025✔
3144
                Peers: peerList,
3,025✔
3145
                node:  node,
3,025✔
3146
        }
3,025✔
3147
        return s.getJetStream().clusterInfo(group)
3,025✔
3148
}
3149

3150
// Jsz returns a Jsz structure containing information about JetStream.
3151
func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) {
608✔
3152
        // set option defaults
608✔
3153
        if opts == nil {
919✔
3154
                opts = &JSzOptions{}
311✔
3155
        }
311✔
3156
        if opts.Offset < 0 {
608✔
3157
                opts.Offset = 0
×
3158
        }
×
3159
        if opts.Limit == 0 {
1,189✔
3160
                opts.Limit = 1024
581✔
3161
        }
581✔
3162
        if opts.Consumer {
852✔
3163
                opts.Streams = true
244✔
3164
        }
244✔
3165
        if opts.Streams && opts.Account == _EMPTY_ {
847✔
3166
                opts.Accounts = true
239✔
3167
        }
239✔
3168

3169
        jsi := &JSInfo{
608✔
3170
                ID:  s.ID(),
608✔
3171
                Now: time.Now().UTC(),
608✔
3172
        }
608✔
3173

608✔
3174
        js := s.getJetStream()
608✔
3175
        if js == nil || !js.isEnabled() {
612✔
3176
                if opts.LeaderOnly {
5✔
3177
                        return nil, fmt.Errorf("%w: not leader", errSkipZreq)
1✔
3178
                }
1✔
3179

3180
                jsi.Disabled = true
3✔
3181
                return jsi, nil
3✔
3182
        }
3183

3184
        jsi.Limits = &s.getOpts().JetStreamLimits
604✔
3185

604✔
3186
        js.mu.RLock()
604✔
3187
        isLeader := js.cluster == nil || js.cluster.isLeader()
604✔
3188
        js.mu.RUnlock()
604✔
3189

604✔
3190
        if opts.LeaderOnly && !isLeader {
604✔
3191
                return nil, fmt.Errorf("%w: not leader", errSkipZreq)
×
3192
        }
×
3193

3194
        var accounts []*jsAccount
604✔
3195

604✔
3196
        js.mu.RLock()
604✔
3197
        jsi.Config = js.config
604✔
3198
        for _, info := range js.accounts {
1,259✔
3199
                accounts = append(accounts, info)
655✔
3200
        }
655✔
3201
        js.mu.RUnlock()
604✔
3202

604✔
3203
        jsi.Total = len(accounts)
604✔
3204

604✔
3205
        if mg := js.getMetaGroup(); mg != nil {
1,207✔
3206
                if ci := s.raftNodeToClusterInfo(mg); ci != nil {
1,206✔
3207
                        entries, bytes := mg.Size()
603✔
3208
                        jsi.Meta = &MetaClusterInfo{Name: ci.Name, Leader: ci.Leader, Peer: getHash(ci.Leader), Size: mg.ClusterSize()}
603✔
3209
                        if isLeader {
785✔
3210
                                jsi.Meta.Replicas = ci.Replicas
182✔
3211
                        }
182✔
3212
                        if ipq := s.jsAPIRoutedReqs; ipq != nil {
1,206✔
3213
                                jsi.Meta.Pending = ipq.len()
603✔
3214
                        }
603✔
3215
                        // Add meta snapshot stats
3216
                        jsi.Meta.Snapshot = &MetaSnapshotStats{
603✔
3217
                                PendingEntries: entries,
603✔
3218
                                PendingSize:    bytes,
603✔
3219
                        }
603✔
3220
                        js.mu.RLock()
603✔
3221
                        cluster := js.cluster
603✔
3222
                        js.mu.RUnlock()
603✔
3223
                        if cluster != nil {
1,206✔
3224
                                timeNanos := atomic.LoadInt64(&cluster.lastMetaSnapTime)
603✔
3225
                                durationNanos := atomic.LoadInt64(&cluster.lastMetaSnapDuration)
603✔
3226
                                if timeNanos > 0 {
683✔
3227
                                        jsi.Meta.Snapshot.LastTime = time.Unix(0, timeNanos).UTC()
80✔
3228
                                }
80✔
3229
                                if durationNanos > 0 {
683✔
3230
                                        jsi.Meta.Snapshot.LastDuration = time.Duration(durationNanos)
80✔
3231
                                }
80✔
3232
                        }
3233
                }
3234
        }
3235

3236
        jsi.JetStreamStats = *js.usageStats()
604✔
3237

604✔
3238
        // If a specific account is requested, track the index.
604✔
3239
        filterIdx := -1
604✔
3240

604✔
3241
        // Calculate the stats of all accounts and streams regardless of the filtering.
604✔
3242
        for i, jsa := range accounts {
1,259✔
3243
                if jsa.acc().GetName() == opts.Account {
672✔
3244
                        filterIdx = i
17✔
3245
                }
17✔
3246

3247
                jsa.mu.RLock()
655✔
3248
                streams := make([]*stream, 0, len(jsa.streams))
655✔
3249
                for _, stream := range jsa.streams {
1,333✔
3250
                        streams = append(streams, stream)
678✔
3251
                }
678✔
3252
                jsa.mu.RUnlock()
655✔
3253

655✔
3254
                jsi.Streams += len(streams)
655✔
3255
                for _, stream := range streams {
1,333✔
3256
                        streamState := stream.state()
678✔
3257
                        jsi.Messages += streamState.Msgs
678✔
3258
                        jsi.Bytes += streamState.Bytes
678✔
3259
                        jsi.Consumers += streamState.Consumers
678✔
3260
                        if opts.RaftGroups {
686✔
3261
                                if node := stream.raftNode(); node == nil || node.Leader() {
12✔
3262
                                        jsi.StreamsLeader++
4✔
3263
                                }
4✔
3264
                                for _, consumer := range stream.getPublicConsumers() {
16✔
3265
                                        if node := consumer.raftNode(); node == nil || node.Leader() {
12✔
3266
                                                jsi.ConsumersLeader++
4✔
3267
                                        }
4✔
3268
                                }
3269
                        }
3270
                }
3271
        }
3272

3273
        // Targeted account takes precedence.
3274
        if filterIdx >= 0 {
621✔
3275
                accounts = accounts[filterIdx : filterIdx+1]
17✔
3276
        } else if opts.Accounts {
861✔
3277

257✔
3278
                if opts.Limit > 0 {
514✔
3279
                        // Sort by name for a consistent read (barring any concurrent changes)
257✔
3280
                        slices.SortFunc(accounts, func(i, j *jsAccount) int { return cmp.Compare(i.acc().Name, j.acc().Name) })
275✔
3281

3282
                        // Offset larger than the number of accounts.
3283
                        offset := min(opts.Offset, len(accounts))
257✔
3284
                        accounts = accounts[offset:]
257✔
3285

257✔
3286
                        limit := min(opts.Limit, len(accounts))
257✔
3287
                        accounts = accounts[:limit]
257✔
3288
                }
3289
        } else {
330✔
3290
                accounts = nil
330✔
3291
        }
330✔
3292

3293
        if len(accounts) > 0 {
876✔
3294
                jsi.AccountDetails = make([]*AccountDetail, 0, len(accounts))
272✔
3295

272✔
3296
                for _, jsa := range accounts {
548✔
3297
                        detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.DirectConsumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly)
276✔
3298
                        jsi.AccountDetails = append(jsi.AccountDetails, detail)
276✔
3299
                }
276✔
3300
        }
3301

3302
        return jsi, nil
604✔
3303
}
3304

3305
// HandleJsz process HTTP requests for jetstream information.
3306
func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) {
52✔
3307
        s.mu.Lock()
52✔
3308
        s.httpReqStats[JszPath]++
52✔
3309
        s.mu.Unlock()
52✔
3310
        accounts, err := decodeBool(w, r, "accounts")
52✔
3311
        if err != nil {
52✔
3312
                return
×
3313
        }
×
3314
        streams, err := decodeBool(w, r, "streams")
52✔
3315
        if err != nil {
52✔
3316
                return
×
3317
        }
×
3318
        consumers, err := decodeBool(w, r, "consumers")
52✔
3319
        if err != nil {
52✔
3320
                return
×
3321
        }
×
3322
        directConsumers, err := decodeBool(w, r, "direct-consumers")
52✔
3323
        if err != nil {
52✔
3324
                return
×
3325
        }
×
3326
        config, err := decodeBool(w, r, "config")
52✔
3327
        if err != nil {
52✔
3328
                return
×
3329
        }
×
3330
        offset, err := decodeInt(w, r, "offset")
52✔
3331
        if err != nil {
52✔
3332
                return
×
3333
        }
×
3334
        limit, err := decodeInt(w, r, "limit")
52✔
3335
        if err != nil {
52✔
3336
                return
×
3337
        }
×
3338
        leader, err := decodeBool(w, r, "leader-only")
52✔
3339
        if err != nil {
52✔
3340
                return
×
3341
        }
×
3342
        rgroups, err := decodeBool(w, r, "raft")
52✔
3343
        if err != nil {
52✔
3344
                return
×
3345
        }
×
3346

3347
        sleader, err := decodeBool(w, r, "stream-leader-only")
52✔
3348
        if err != nil {
52✔
3349
                return
×
3350
        }
×
3351

3352
        l, err := s.Jsz(&JSzOptions{
52✔
3353
                Account:          r.URL.Query().Get("acc"),
52✔
3354
                Accounts:         accounts,
52✔
3355
                Streams:          streams,
52✔
3356
                Consumer:         consumers,
52✔
3357
                DirectConsumer:   directConsumers,
52✔
3358
                Config:           config,
52✔
3359
                LeaderOnly:       leader,
52✔
3360
                Offset:           offset,
52✔
3361
                Limit:            limit,
52✔
3362
                RaftGroups:       rgroups,
52✔
3363
                StreamLeaderOnly: sleader,
52✔
3364
        })
52✔
3365
        if err != nil {
52✔
3366
                w.WriteHeader(http.StatusBadRequest)
×
3367
                w.Write([]byte(err.Error()))
×
3368
                return
×
3369
        }
×
3370
        b, err := json.MarshalIndent(l, "", "  ")
52✔
3371
        if err != nil {
52✔
3372
                s.Errorf("Error marshaling response to /jsz request: %v", err)
×
3373
        }
×
3374

3375
        // Handle response
3376
        ResponseHandler(w, r, b)
52✔
3377
}
3378

3379
type HealthStatus struct {
3380
        Status     string         `json:"status"`
3381
        StatusCode int            `json:"status_code,omitempty"`
3382
        Error      string         `json:"error,omitempty"`
3383
        Errors     []HealthzError `json:"errors,omitempty"`
3384
}
3385

3386
type HealthzError struct {
3387
        Type     HealthZErrorType `json:"type"`
3388
        Account  string           `json:"account,omitempty"`
3389
        Stream   string           `json:"stream,omitempty"`
3390
        Consumer string           `json:"consumer,omitempty"`
3391
        Error    string           `json:"error,omitempty"`
3392
}
3393

3394
type HealthZErrorType int
3395

3396
const (
3397
        HealthzErrorConn HealthZErrorType = iota
3398
        HealthzErrorBadRequest
3399
        HealthzErrorJetStream
3400
        HealthzErrorAccount
3401
        HealthzErrorStream
3402
        HealthzErrorConsumer
3403
)
3404

3405
func (t HealthZErrorType) String() string {
25✔
3406
        switch t {
25✔
3407
        case HealthzErrorConn:
×
3408
                return "CONNECTION"
×
3409
        case HealthzErrorBadRequest:
6✔
3410
                return "BAD_REQUEST"
6✔
3411
        case HealthzErrorJetStream:
4✔
3412
                return "JETSTREAM"
4✔
3413
        case HealthzErrorAccount:
5✔
3414
                return "ACCOUNT"
5✔
3415
        case HealthzErrorStream:
5✔
3416
                return "STREAM"
5✔
3417
        case HealthzErrorConsumer:
5✔
3418
                return "CONSUMER"
5✔
3419
        default:
×
3420
                return "unknown"
×
3421
        }
3422
}
3423

3424
func (t HealthZErrorType) MarshalJSON() ([]byte, error) {
25✔
3425
        return json.Marshal(t.String())
25✔
3426
}
25✔
3427

3428
func (t *HealthZErrorType) UnmarshalJSON(data []byte) error {
25✔
3429
        switch string(data) {
25✔
3430
        case `"CONNECTION"`:
×
3431
                *t = HealthzErrorConn
×
3432
        case `"BAD_REQUEST"`:
6✔
3433
                *t = HealthzErrorBadRequest
6✔
3434
        case `"JETSTREAM"`:
4✔
3435
                *t = HealthzErrorJetStream
4✔
3436
        case `"ACCOUNT"`:
5✔
3437
                *t = HealthzErrorAccount
5✔
3438
        case `"STREAM"`:
5✔
3439
                *t = HealthzErrorStream
5✔
3440
        case `"CONSUMER"`:
5✔
3441
                *t = HealthzErrorConsumer
5✔
3442
        default:
×
3443
                return fmt.Errorf("unknown healthz error type %q", data)
×
3444
        }
3445
        return nil
25✔
3446
}
3447

3448
// https://datatracker.ietf.org/doc/html/draft-inadarei-api-health-check
3449
func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
24✔
3450
        s.mu.Lock()
24✔
3451
        s.httpReqStats[HealthzPath]++
24✔
3452
        s.mu.Unlock()
24✔
3453

24✔
3454
        jsEnabled, err := decodeBool(w, r, "js-enabled")
24✔
3455
        if err != nil {
24✔
3456
                return
×
3457
        }
×
3458
        if jsEnabled {
24✔
3459
                s.Warnf("Healthcheck: js-enabled deprecated, use js-enabled-only instead")
×
3460
        }
×
3461
        jsEnabledOnly, err := decodeBool(w, r, "js-enabled-only")
24✔
3462
        if err != nil {
24✔
3463
                return
×
3464
        }
×
3465
        jsServerOnly, err := decodeBool(w, r, "js-server-only")
24✔
3466
        if err != nil {
24✔
3467
                return
×
3468
        }
×
3469
        jsMetaOnly, err := decodeBool(w, r, "js-meta-only")
24✔
3470
        if err != nil {
24✔
3471
                return
×
3472
        }
×
3473

3474
        includeDetails, err := decodeBool(w, r, "details")
24✔
3475
        if err != nil {
24✔
3476
                return
×
3477
        }
×
3478

3479
        hs := s.healthz(&HealthzOptions{
24✔
3480
                JSEnabled:     jsEnabled,
24✔
3481
                JSEnabledOnly: jsEnabledOnly,
24✔
3482
                JSServerOnly:  jsServerOnly,
24✔
3483
                JSMetaOnly:    jsMetaOnly,
24✔
3484
                Account:       r.URL.Query().Get("account"),
24✔
3485
                Stream:        r.URL.Query().Get("stream"),
24✔
3486
                Consumer:      r.URL.Query().Get("consumer"),
24✔
3487
                Details:       includeDetails,
24✔
3488
        })
24✔
3489

24✔
3490
        code := hs.StatusCode
24✔
3491
        if hs.Error != _EMPTY_ {
34✔
3492
                s.Warnf("Healthcheck failed: %q", hs.Error)
10✔
3493
        } else if len(hs.Errors) != 0 {
28✔
3494
                s.Warnf("Healthcheck failed: %d errors", len(hs.Errors))
4✔
3495
        }
4✔
3496
        // Remove StatusCode from JSON representation when responding via HTTP
3497
        // since this is already in the response.
3498
        hs.StatusCode = 0
24✔
3499
        b, err := json.Marshal(hs)
24✔
3500
        if err != nil {
24✔
3501
                s.Errorf("Error marshaling response to /healthz request: %v", err)
×
3502
        }
×
3503

3504
        handleResponse(code, w, r, b)
24✔
3505
}
3506

3507
// Generate health status.
3508
func (s *Server) healthz(opts *HealthzOptions) *HealthStatus {
3,635✔
3509
        var health = &HealthStatus{Status: "ok"}
3,635✔
3510

3,635✔
3511
        // set option defaults
3,635✔
3512
        if opts == nil {
7,134✔
3513
                opts = &HealthzOptions{}
3,499✔
3514
        }
3,499✔
3515
        details := opts.Details
3,635✔
3516
        defer func() {
7,270✔
3517
                // for response with details enabled, set status to either "error" or "ok"
3,635✔
3518
                if details {
3,662✔
3519
                        if len(health.Errors) != 0 {
52✔
3520
                                health.Status = "error"
25✔
3521
                        } else {
27✔
3522
                                health.Status = "ok"
2✔
3523
                        }
2✔
3524
                }
3525
                // if no specific status code was set, set it based on the presence of errors
3526
                if health.StatusCode == 0 {
7,209✔
3527
                        if health.Error != _EMPTY_ || len(health.Errors) != 0 {
7,001✔
3528
                                health.StatusCode = http.StatusServiceUnavailable
3,427✔
3529
                        } else {
3,574✔
3530
                                health.StatusCode = http.StatusOK
147✔
3531
                        }
147✔
3532
                }
3533
        }()
3534

3535
        if opts.Account == _EMPTY_ && opts.Stream != _EMPTY_ {
3,645✔
3536
                health.StatusCode = http.StatusBadRequest
10✔
3537
                if !details {
15✔
3538
                        health.Status = "error"
5✔
3539
                        health.Error = fmt.Sprintf("%q must not be empty when checking stream health", "account")
5✔
3540
                } else {
10✔
3541
                        health.Errors = append(health.Errors, HealthzError{
5✔
3542
                                Type:  HealthzErrorBadRequest,
5✔
3543
                                Error: fmt.Sprintf("%q must not be empty when checking stream health", "account"),
5✔
3544
                        })
5✔
3545
                }
5✔
3546
                return health
10✔
3547
        }
3548

3549
        if opts.Stream == _EMPTY_ && opts.Consumer != _EMPTY_ {
3,627✔
3550
                health.StatusCode = http.StatusBadRequest
2✔
3551
                if !details {
3✔
3552
                        health.Status = "error"
1✔
3553
                        health.Error = fmt.Sprintf("%q must not be empty when checking consumer health", "stream")
1✔
3554
                } else {
2✔
3555
                        health.Errors = append(health.Errors, HealthzError{
1✔
3556
                                Type:  HealthzErrorBadRequest,
1✔
3557
                                Error: fmt.Sprintf("%q must not be empty when checking consumer health", "stream"),
1✔
3558
                        })
1✔
3559
                }
1✔
3560
                return health
2✔
3561
        }
3562

3563
        if err := s.readyForConnections(time.Millisecond); err != nil {
3,629✔
3564
                health.StatusCode = http.StatusInternalServerError
6✔
3565
                health.Status = "error"
6✔
3566
                if !details {
12✔
3567
                        health.Error = err.Error()
6✔
3568
                } else {
6✔
3569
                        health.Errors = append(health.Errors, HealthzError{
×
3570
                                Type:  HealthzErrorConn,
×
3571
                                Error: err.Error(),
×
3572
                        })
×
3573
                }
×
3574
                return health
6✔
3575
        }
3576

3577
        // If JSServerOnly is true, then do not check further accounts, streams and consumers.
3578
        if opts.JSServerOnly {
3,630✔
3579
                return health
13✔
3580
        }
13✔
3581

3582
        sopts := s.getOpts()
3,604✔
3583

3,604✔
3584
        // If JS is not enabled in the config, we stop.
3,604✔
3585
        if !sopts.JetStream {
3,620✔
3586
                return health
16✔
3587
        }
16✔
3588

3589
        // Access the Jetstream state to perform additional checks.
3590
        js := s.getJetStream()
3,588✔
3591
        const na = "unavailable"
3,588✔
3592
        if !js.isEnabled() {
3,597✔
3593
                health.StatusCode = http.StatusServiceUnavailable
9✔
3594
                health.Status = na
9✔
3595
                if !details {
14✔
3596
                        health.Error = NewJSNotEnabledError().Error()
5✔
3597
                } else {
9✔
3598
                        health.Errors = append(health.Errors, HealthzError{
4✔
3599
                                Type:  HealthzErrorJetStream,
4✔
3600
                                Error: NewJSNotEnabledError().Error(),
4✔
3601
                        })
4✔
3602
                }
4✔
3603
                return health
9✔
3604
        }
3605
        // Only check if JS is enabled, skip meta and asset check.
3606
        if opts.JSEnabledOnly || opts.JSEnabled {
3,584✔
3607
                return health
5✔
3608
        }
5✔
3609

3610
        // Clustered JetStream
3611
        js.mu.RLock()
3,574✔
3612
        cc := js.cluster
3,574✔
3613
        js.mu.RUnlock()
3,574✔
3614

3,574✔
3615
        // Currently single server we make sure the streams were recovered.
3,574✔
3616
        if cc == nil {
3,591✔
3617
                sdir := js.config.StoreDir
17✔
3618
                // Whip through account folders and pull each stream name.
17✔
3619
                fis, _ := os.ReadDir(sdir)
17✔
3620
                var accFound, streamFound, consumerFound bool
17✔
3621
                for _, fi := range fis {
36✔
3622
                        if fi.Name() == snapStagingDir {
21✔
3623
                                continue
2✔
3624
                        }
3625
                        if opts.Account != _EMPTY_ {
26✔
3626
                                if fi.Name() != opts.Account {
11✔
3627
                                        continue
2✔
3628
                                }
3629
                                accFound = true
7✔
3630
                        }
3631
                        acc, err := s.LookupAccount(fi.Name())
15✔
3632
                        if err != nil {
15✔
3633
                                if !details {
×
3634
                                        health.Status = na
×
3635
                                        health.Error = fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name())
×
3636
                                        return health
×
3637
                                }
×
3638
                                health.Errors = append(health.Errors, HealthzError{
×
3639
                                        Type:    HealthzErrorAccount,
×
3640
                                        Account: fi.Name(),
×
3641
                                        Error:   fmt.Sprintf("JetStream account '%s' could not be resolved", fi.Name()),
×
3642
                                })
×
3643
                                continue
×
3644
                        }
3645
                        sfis, _ := os.ReadDir(filepath.Join(sdir, fi.Name(), "streams"))
15✔
3646
                        for _, sfi := range sfis {
32✔
3647
                                if opts.Stream != _EMPTY_ {
23✔
3648
                                        if sfi.Name() != opts.Stream {
8✔
3649
                                                continue
2✔
3650
                                        }
3651
                                        streamFound = true
4✔
3652
                                }
3653
                                stream := sfi.Name()
15✔
3654
                                s, err := acc.lookupStream(stream)
15✔
3655
                                if err != nil {
16✔
3656
                                        if !details {
2✔
3657
                                                health.Status = na
1✔
3658
                                                health.Error = fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream)
1✔
3659
                                                return health
1✔
3660
                                        }
1✔
3661
                                        health.Errors = append(health.Errors, HealthzError{
×
3662
                                                Type:    HealthzErrorStream,
×
3663
                                                Account: acc.Name,
×
3664
                                                Stream:  stream,
×
3665
                                                Error:   fmt.Sprintf("JetStream stream '%s > %s' could not be recovered", acc, stream),
×
3666
                                        })
×
3667
                                        continue
×
3668
                                }
3669
                                if streamFound {
18✔
3670
                                        // if consumer option is passed, verify that the consumer exists on stream
4✔
3671
                                        if opts.Consumer != _EMPTY_ {
7✔
3672
                                                for _, cons := range s.consumers {
6✔
3673
                                                        if cons.name == opts.Consumer {
4✔
3674
                                                                consumerFound = true
1✔
3675
                                                                break
1✔
3676
                                                        }
3677
                                                }
3678
                                        }
3679
                                        break
4✔
3680
                                }
3681
                        }
3682
                        if accFound {
21✔
3683
                                break
7✔
3684
                        }
3685
                }
3686
                if opts.Account != _EMPTY_ && !accFound {
18✔
3687
                        health.StatusCode = http.StatusNotFound
2✔
3688
                        if !details {
3✔
3689
                                health.Status = na
1✔
3690
                                health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
1✔
3691
                        } else {
2✔
3692
                                health.Errors = []HealthzError{
1✔
3693
                                        {
1✔
3694
                                                Type:    HealthzErrorAccount,
1✔
3695
                                                Account: opts.Account,
1✔
3696
                                                Error:   fmt.Sprintf("JetStream account %q not found", opts.Account),
1✔
3697
                                        },
1✔
3698
                                }
1✔
3699
                        }
1✔
3700
                        return health
2✔
3701
                }
3702
                if opts.Stream != _EMPTY_ && !streamFound {
16✔
3703
                        health.StatusCode = http.StatusNotFound
2✔
3704
                        if !details {
3✔
3705
                                health.Status = na
1✔
3706
                                health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
1✔
3707
                        } else {
2✔
3708
                                health.Errors = []HealthzError{
1✔
3709
                                        {
1✔
3710
                                                Type:    HealthzErrorStream,
1✔
3711
                                                Account: opts.Account,
1✔
3712
                                                Stream:  opts.Stream,
1✔
3713
                                                Error:   fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
1✔
3714
                                        },
1✔
3715
                                }
1✔
3716
                        }
1✔
3717
                        return health
2✔
3718
                }
3719
                if opts.Consumer != _EMPTY_ && !consumerFound {
14✔
3720
                        health.StatusCode = http.StatusNotFound
2✔
3721
                        if !details {
3✔
3722
                                health.Status = na
1✔
3723
                                health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
1✔
3724
                        } else {
2✔
3725
                                health.Errors = []HealthzError{
1✔
3726
                                        {
1✔
3727
                                                Type:     HealthzErrorConsumer,
1✔
3728
                                                Account:  opts.Account,
1✔
3729
                                                Stream:   opts.Stream,
1✔
3730
                                                Consumer: opts.Consumer,
1✔
3731
                                                Error:    fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
1✔
3732
                                        },
1✔
3733
                                }
1✔
3734
                        }
1✔
3735
                }
3736
                return health
12✔
3737
        }
3738

3739
        // If we are here we want to check for any assets assigned to us.
3740
        var meta RaftNode
3,557✔
3741
        js.mu.RLock()
3,557✔
3742
        meta = cc.meta
3,557✔
3743
        js.mu.RUnlock()
3,557✔
3744

3,557✔
3745
        // If no meta leader.
3,557✔
3746
        if meta == nil || meta.GroupLeader() == _EMPTY_ {
6,705✔
3747
                if !details {
6,296✔
3748
                        health.Status = na
3,148✔
3749
                        health.Error = "JetStream has not established contact with a meta leader"
3,148✔
3750
                } else {
3,148✔
3751
                        health.Errors = []HealthzError{
×
3752
                                {
×
3753
                                        Type:  HealthzErrorJetStream,
×
3754
                                        Error: "JetStream has not established contact with a meta leader",
×
3755
                                },
×
3756
                        }
×
3757
                }
×
3758
                return health
3,148✔
3759
        }
3760

3761
        // If we are not current with the meta leader.
3762
        if !meta.Healthy() {
597✔
3763
                if !details {
376✔
3764
                        health.Status = na
188✔
3765
                        health.Error = "JetStream is not current with the meta leader"
188✔
3766
                } else {
188✔
3767
                        health.Errors = []HealthzError{
×
3768
                                {
×
3769
                                        Type:  HealthzErrorJetStream,
×
3770
                                        Error: "JetStream is not current with the meta leader",
×
3771
                                },
×
3772
                        }
×
3773
                }
×
3774
                return health
188✔
3775
        }
3776

3777
        // Are we still recovering meta layer?
3778
        if js.isMetaRecovering() {
221✔
3779
                if !details {
×
3780
                        health.Status = na
×
3781
                        health.Error = "JetStream is still recovering meta layer"
×
3782

×
3783
                } else {
×
3784
                        health.Errors = []HealthzError{
×
3785
                                {
×
3786
                                        Type:  HealthzErrorJetStream,
×
3787
                                        Error: "JetStream is still recovering meta layer",
×
3788
                                },
×
3789
                        }
×
3790
                }
×
3791
                return health
×
3792
        }
3793

3794
        // Skips doing full healthz and only checks the meta leader.
3795
        if opts.JSMetaOnly {
223✔
3796
                return health
2✔
3797
        }
2✔
3798

3799
        // Range across all accounts, the streams assigned to them, and the consumers.
3800
        // If they are assigned to this server check their status.
3801
        ourID := meta.ID()
219✔
3802

219✔
3803
        // Copy the meta layer so we do not need to hold the js read lock for an extended period of time.
219✔
3804
        var streams map[string]map[string]*streamAssignment
219✔
3805
        js.mu.RLock()
219✔
3806
        if opts.Account == _EMPTY_ {
396✔
3807
                // Collect all relevant streams and consumers.
177✔
3808
                streams = make(map[string]map[string]*streamAssignment, len(cc.streams))
177✔
3809
                for acc, asa := range cc.streams {
348✔
3810
                        nasa := make(map[string]*streamAssignment)
171✔
3811
                        for stream, sa := range asa {
1,542✔
3812
                                // If we are a member and we are not being restored, select for check.
1,371✔
3813
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
2,735✔
3814
                                        csa := sa.copyGroup()
1,364✔
3815
                                        csa.consumers = make(map[string]*consumerAssignment)
1,364✔
3816
                                        for consumer, ca := range sa.consumers {
1,479✔
3817
                                                if ca.Group.isMember(ourID) {
226✔
3818
                                                        // Use original here. Not a copy.
111✔
3819
                                                        csa.consumers[consumer] = ca
111✔
3820
                                                }
111✔
3821
                                        }
3822
                                        nasa[stream] = csa
1,364✔
3823
                                }
3824
                        }
3825
                        streams[acc] = nasa
171✔
3826
                }
3827
        } else {
42✔
3828
                streams = make(map[string]map[string]*streamAssignment, 1)
42✔
3829
                asa, ok := cc.streams[opts.Account]
42✔
3830
                if !ok {
50✔
3831
                        health.StatusCode = http.StatusNotFound
8✔
3832
                        if !details {
12✔
3833
                                health.Status = na
4✔
3834
                                health.Error = fmt.Sprintf("JetStream account %q not found", opts.Account)
4✔
3835
                        } else {
8✔
3836
                                health.Errors = []HealthzError{
4✔
3837
                                        {
4✔
3838
                                                Type:    HealthzErrorAccount,
4✔
3839
                                                Account: opts.Account,
4✔
3840
                                                Error:   fmt.Sprintf("JetStream account %q not found", opts.Account),
4✔
3841
                                        },
4✔
3842
                                }
4✔
3843
                        }
4✔
3844
                        js.mu.RUnlock()
8✔
3845
                        return health
8✔
3846
                }
3847
                nasa := make(map[string]*streamAssignment)
34✔
3848
                if opts.Stream != _EMPTY_ {
64✔
3849
                        sa, ok := asa[opts.Stream]
30✔
3850
                        if !ok || !sa.Group.isMember(ourID) {
40✔
3851
                                health.StatusCode = http.StatusNotFound
10✔
3852
                                if !details {
16✔
3853
                                        health.Status = na
6✔
3854
                                        health.Error = fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account)
6✔
3855
                                } else {
10✔
3856
                                        health.Errors = []HealthzError{
4✔
3857
                                                {
4✔
3858
                                                        Type:    HealthzErrorStream,
4✔
3859
                                                        Account: opts.Account,
4✔
3860
                                                        Stream:  opts.Stream,
4✔
3861
                                                        Error:   fmt.Sprintf("JetStream stream %q not found on account %q", opts.Stream, opts.Account),
4✔
3862
                                                },
4✔
3863
                                        }
4✔
3864
                                }
4✔
3865
                                js.mu.RUnlock()
10✔
3866
                                return health
10✔
3867
                        }
3868
                        csa := sa.copyGroup()
20✔
3869
                        csa.consumers = make(map[string]*consumerAssignment)
20✔
3870
                        var consumerFound bool
20✔
3871
                        for consumer, ca := range sa.consumers {
40✔
3872
                                if opts.Consumer != _EMPTY_ {
35✔
3873
                                        if consumer != opts.Consumer || !ca.Group.isMember(ourID) {
25✔
3874
                                                continue
10✔
3875
                                        }
3876
                                        consumerFound = true
5✔
3877
                                }
3878
                                // If we are a member and we are not being restored, select for check.
3879
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
20✔
3880
                                        csa.consumers[consumer] = ca
10✔
3881
                                }
10✔
3882
                                if consumerFound {
15✔
3883
                                        break
5✔
3884
                                }
3885
                        }
3886
                        if opts.Consumer != _EMPTY_ && !consumerFound {
30✔
3887
                                health.StatusCode = http.StatusNotFound
10✔
3888
                                if !details {
16✔
3889
                                        health.Status = na
6✔
3890
                                        health.Error = fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account)
6✔
3891
                                } else {
10✔
3892
                                        health.Errors = []HealthzError{
4✔
3893
                                                {
4✔
3894
                                                        Type:     HealthzErrorConsumer,
4✔
3895
                                                        Account:  opts.Account,
4✔
3896
                                                        Stream:   opts.Stream,
4✔
3897
                                                        Consumer: opts.Consumer,
4✔
3898
                                                        Error:    fmt.Sprintf("JetStream consumer %q not found for stream %q on account %q", opts.Consumer, opts.Stream, opts.Account),
4✔
3899
                                                },
4✔
3900
                                        }
4✔
3901
                                }
4✔
3902
                                js.mu.RUnlock()
10✔
3903
                                return health
10✔
3904
                        }
3905
                        nasa[opts.Stream] = csa
10✔
3906
                } else {
4✔
3907
                        for stream, sa := range asa {
8✔
3908
                                // If we are a member and we are not being restored, select for check.
4✔
3909
                                if sa.Group.isMember(ourID) && sa.Restore == nil {
8✔
3910
                                        csa := sa.copyGroup()
4✔
3911
                                        csa.consumers = make(map[string]*consumerAssignment)
4✔
3912
                                        for consumer, ca := range sa.consumers {
8✔
3913
                                                if ca.Group.isMember(ourID) {
8✔
3914
                                                        csa.consumers[consumer] = ca
4✔
3915
                                                }
4✔
3916
                                        }
3917
                                        nasa[stream] = csa
4✔
3918
                                }
3919
                        }
3920
                }
3921
                streams[opts.Account] = nasa
14✔
3922
        }
3923
        js.mu.RUnlock()
191✔
3924

191✔
3925
        // Use our copy to traverse so we do not need to hold the js lock.
191✔
3926
        for accName, asa := range streams {
376✔
3927
                acc, err := s.LookupAccount(accName)
185✔
3928
                if err != nil && len(asa) > 0 {
185✔
3929
                        if !details {
×
3930
                                health.Status = na
×
3931
                                health.Error = fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err)
×
3932
                                return health
×
3933
                        }
×
3934
                        health.Errors = append(health.Errors, HealthzError{
×
3935
                                Type:    HealthzErrorAccount,
×
3936
                                Account: accName,
×
3937
                                Error:   fmt.Sprintf("JetStream can not lookup account %q: %v", accName, err),
×
3938
                        })
×
3939
                        continue
×
3940
                }
3941

3942
                for stream, sa := range asa {
1,145✔
3943
                        if sa != nil && sa.unsupported != nil {
972✔
3944
                                continue
12✔
3945
                        }
3946
                        // Make sure we can look up
3947
                        if err := js.isStreamHealthy(acc, sa); err != nil {
1,031✔
3948
                                if !details {
166✔
3949
                                        health.Status = na
83✔
3950
                                        health.Error = fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err)
83✔
3951
                                        return health
83✔
3952
                                }
83✔
3953
                                health.Errors = append(health.Errors, HealthzError{
×
3954
                                        Type:    HealthzErrorStream,
×
3955
                                        Account: accName,
×
3956
                                        Stream:  stream,
×
3957
                                        Error:   fmt.Sprintf("JetStream stream '%s > %s' is not current: %s", accName, stream, err),
×
3958
                                })
×
3959
                                continue
×
3960
                        }
3961
                        mset, _ := acc.lookupStream(stream)
865✔
3962
                        // Now check consumers.
865✔
3963
                        for consumer, ca := range sa.consumers {
947✔
3964
                                if err := js.isConsumerHealthy(mset, consumer, ca); err != nil {
89✔
3965
                                        if !details {
14✔
3966
                                                health.Status = na
7✔
3967
                                                health.Error = fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err)
7✔
3968
                                                return health
7✔
3969
                                        }
7✔
3970
                                        health.Errors = append(health.Errors, HealthzError{
×
3971
                                                Type:     HealthzErrorConsumer,
×
3972
                                                Account:  accName,
×
3973
                                                Stream:   stream,
×
3974
                                                Consumer: consumer,
×
3975
                                                Error:    fmt.Sprintf("JetStream consumer '%s > %s > %s' is not current: %s", acc, stream, consumer, err),
×
3976
                                        })
×
3977
                                }
3978
                        }
3979
                }
3980
        }
3981
        // Success.
3982
        return health
101✔
3983
}
3984

3985
type ExpvarzStatus struct {
3986
        Memstats json.RawMessage `json:"memstats"`
3987
        Cmdline  json.RawMessage `json:"cmdline"`
3988
}
3989

3990
func (s *Server) expvarz(_ *ExpvarzEventOptions) *ExpvarzStatus {
2✔
3991
        var stat ExpvarzStatus
2✔
3992

2✔
3993
        const memStatsKey = "memstats"
2✔
3994
        const cmdLineKey = "cmdline"
2✔
3995

2✔
3996
        expvar.Do(func(v expvar.KeyValue) {
6✔
3997
                switch v.Key {
4✔
3998
                case memStatsKey:
2✔
3999
                        stat.Memstats = json.RawMessage(v.Value.String())
2✔
4000

4001
                case cmdLineKey:
2✔
4002
                        stat.Cmdline = json.RawMessage(v.Value.String())
2✔
4003
                }
4004
        })
4005

4006
        return &stat
2✔
4007
}
4008

4009
type ProfilezStatus struct {
4010
        Profile []byte `json:"profile"`
4011
        Error   string `json:"error"`
4012
}
4013

4014
func (s *Server) profilez(opts *ProfilezOptions) *ProfilezStatus {
12✔
4015
        var buffer bytes.Buffer
12✔
4016
        switch opts.Name {
12✔
4017
        case _EMPTY_:
×
4018
                return &ProfilezStatus{
×
4019
                        Error: "Profile name not specified",
×
4020
                }
×
4021
        case "cpu":
2✔
4022
                if opts.Duration <= 0 || opts.Duration > 15*time.Second {
2✔
4023
                        return &ProfilezStatus{
×
4024
                                Error: fmt.Sprintf("Duration %s should be between 0s and 15s", opts.Duration),
×
4025
                        }
×
4026
                }
×
4027
                if err := pprof.StartCPUProfile(&buffer); err != nil {
2✔
4028
                        return &ProfilezStatus{
×
4029
                                Error: fmt.Sprintf("Failed to start CPU profile: %s", err),
×
4030
                        }
×
4031
                }
×
4032
                time.Sleep(opts.Duration)
2✔
4033
                pprof.StopCPUProfile()
2✔
4034
        default:
10✔
4035
                profile := pprof.Lookup(opts.Name)
10✔
4036
                if profile == nil {
10✔
4037
                        return &ProfilezStatus{
×
4038
                                Error: fmt.Sprintf("Profile %q not found", opts.Name),
×
4039
                        }
×
4040
                }
×
4041
                if err := profile.WriteTo(&buffer, opts.Debug); err != nil {
10✔
4042
                        return &ProfilezStatus{
×
4043
                                Error: fmt.Sprintf("Profile %q error: %s", opts.Name, err),
×
4044
                        }
×
4045
                }
×
4046
        }
4047
        return &ProfilezStatus{
12✔
4048
                Profile: buffer.Bytes(),
12✔
4049
        }
12✔
4050
}
4051

4052
type RaftzGroup struct {
4053
        ID            string                    `json:"id"`
4054
        State         string                    `json:"state"`
4055
        Size          int                       `json:"size"`
4056
        QuorumNeeded  int                       `json:"quorum_needed"`
4057
        Observer      bool                      `json:"observer,omitempty"`
4058
        Paused        bool                      `json:"paused,omitempty"`
4059
        Committed     uint64                    `json:"committed"`
4060
        Applied       uint64                    `json:"applied"`
4061
        CatchingUp    bool                      `json:"catching_up,omitempty"`
4062
        Leader        string                    `json:"leader,omitempty"`
4063
        LeaderSince   *time.Time                `json:"leader_since,omitempty"`
4064
        EverHadLeader bool                      `json:"ever_had_leader"`
4065
        Term          uint64                    `json:"term"`
4066
        Vote          string                    `json:"voted_for,omitempty"`
4067
        PTerm         uint64                    `json:"pterm"`
4068
        PIndex        uint64                    `json:"pindex"`
4069
        SystemAcc     bool                      `json:"system_account"`
4070
        TrafficAcc    string                    `json:"traffic_account"`
4071
        IPQPropLen    int                       `json:"ipq_proposal_len"`
4072
        IPQEntryLen   int                       `json:"ipq_entry_len"`
4073
        IPQRespLen    int                       `json:"ipq_resp_len"`
4074
        IPQApplyLen   int                       `json:"ipq_apply_len"`
4075
        WAL           StreamState               `json:"wal"`
4076
        WALError      error                     `json:"wal_error,omitempty"`
4077
        Peers         map[string]RaftzGroupPeer `json:"peers"`
4078
}
4079

4080
type RaftzGroupPeer struct {
4081
        Name                string `json:"name"`
4082
        Known               bool   `json:"known"`
4083
        LastReplicatedIndex uint64 `json:"last_replicated_index,omitempty"`
4084
        LastSeen            string `json:"last_seen,omitempty"`
4085
}
4086

4087
type RaftzStatus map[string]map[string]RaftzGroup
4088

4089
func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) {
×
4090
        if s.raftNodes == nil {
×
4091
                w.WriteHeader(404)
×
4092
                w.Write([]byte("No Raft nodes registered"))
×
4093
                return
×
4094
        }
×
4095

4096
        groups := s.Raftz(&RaftzOptions{
×
4097
                AccountFilter: r.URL.Query().Get("acc"),
×
4098
                GroupFilter:   r.URL.Query().Get("group"),
×
4099
        })
×
4100

×
4101
        if groups == nil {
×
4102
                w.WriteHeader(404)
×
4103
                w.Write([]byte("No Raft nodes returned, check supplied filters"))
×
4104
                return
×
4105
        }
×
4106

4107
        b, _ := json.MarshalIndent(groups, "", "   ")
×
4108
        ResponseHandler(w, r, b)
×
4109
}
4110

4111
func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
11✔
4112
        afilter, gfilter := opts.AccountFilter, opts.GroupFilter
11✔
4113

11✔
4114
        if afilter == _EMPTY_ {
13✔
4115
                if sys := s.SystemAccount(); sys != nil {
4✔
4116
                        afilter = sys.Name
2✔
4117
                } else {
2✔
4118
                        return nil
×
4119
                }
×
4120
        }
4121

4122
        groups := map[string]RaftNode{}
11✔
4123
        infos := RaftzStatus{} // account -> group ID
11✔
4124

11✔
4125
        s.rnMu.RLock()
11✔
4126
        if gfilter != _EMPTY_ {
11✔
4127
                if rg, ok := s.raftNodes[gfilter]; ok && rg != nil {
×
4128
                        if n, ok := rg.(*raft); ok {
×
4129
                                if n.accName == afilter {
×
4130
                                        groups[gfilter] = rg
×
4131
                                }
×
4132
                        }
4133
                }
4134
        } else {
11✔
4135
                for name, rg := range s.raftNodes {
31✔
4136
                        if rg == nil {
20✔
4137
                                continue
×
4138
                        }
4139
                        if n, ok := rg.(*raft); ok {
40✔
4140
                                if n.accName != afilter {
29✔
4141
                                        continue
9✔
4142
                                }
4143
                                groups[name] = rg
11✔
4144
                        }
4145
                }
4146
        }
4147
        s.rnMu.RUnlock()
11✔
4148

11✔
4149
        for name, rg := range groups {
22✔
4150
                n, ok := rg.(*raft)
11✔
4151
                if n == nil || !ok {
11✔
4152
                        continue
×
4153
                }
4154
                if _, ok := infos[n.accName]; !ok {
22✔
4155
                        infos[n.accName] = map[string]RaftzGroup{}
11✔
4156
                }
11✔
4157
                // Only take the lock once, using the public RaftNode functions would
4158
                // cause us to take and release the locks over and over again.
4159
                n.RLock()
11✔
4160
                info := RaftzGroup{
11✔
4161
                        ID:            n.id,
11✔
4162
                        State:         RaftState(n.state.Load()).String(),
11✔
4163
                        Size:          n.csz,
11✔
4164
                        QuorumNeeded:  n.qn,
11✔
4165
                        Observer:      n.observer,
11✔
4166
                        Paused:        n.paused,
11✔
4167
                        Committed:     n.commit,
11✔
4168
                        Applied:       n.applied,
11✔
4169
                        CatchingUp:    n.catchup != nil,
11✔
4170
                        Leader:        n.leader,
11✔
4171
                        LeaderSince:   n.leaderSince.Load(),
11✔
4172
                        EverHadLeader: n.pleader.Load(),
11✔
4173
                        Term:          n.term,
11✔
4174
                        Vote:          n.vote,
11✔
4175
                        PTerm:         n.pterm,
11✔
4176
                        PIndex:        n.pindex,
11✔
4177
                        SystemAcc:     n.IsSystemAccount(),
11✔
4178
                        TrafficAcc:    n.acc.GetName(),
11✔
4179
                        IPQPropLen:    n.prop.len(),
11✔
4180
                        IPQEntryLen:   n.entry.len(),
11✔
4181
                        IPQRespLen:    n.resp.len(),
11✔
4182
                        IPQApplyLen:   n.apply.len(),
11✔
4183
                        WALError:      n.werr,
11✔
4184
                        Peers:         map[string]RaftzGroupPeer{},
11✔
4185
                }
11✔
4186
                n.wal.FastState(&info.WAL)
11✔
4187
                for id, p := range n.peers {
44✔
4188
                        if id == n.id {
44✔
4189
                                continue
11✔
4190
                        }
4191
                        peer := RaftzGroupPeer{
22✔
4192
                                Name:                s.serverNameForNode(id),
22✔
4193
                                Known:               p.kp,
22✔
4194
                                LastReplicatedIndex: p.li,
22✔
4195
                        }
22✔
4196
                        if !p.ts.IsZero() {
36✔
4197
                                peer.LastSeen = time.Since(p.ts).String()
14✔
4198
                        }
14✔
4199
                        info.Peers[id] = peer
22✔
4200
                }
4201
                n.RUnlock()
11✔
4202
                infos[n.accName][name] = info
11✔
4203
        }
4204

4205
        return &infos
11✔
4206
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc