• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 19188141845

07 Nov 2025 01:16PM UTC coverage: 84.586% (-1.4%) from 86.033%
19188141845

push

github

web-flow
Add meta snapshot metrics to jsz monitoring (#7524)

Exposes snapshot related metrics under /jsz

```js
"meta_cluster": {
    "pending": 0,
    "snapshot": {
        "pending_entries": 1,
        "pending_size": 1314,
        "last_time": "2025-11-06T18:14:40.659678019Z", # UTC
        "last_duration": 161096363
     }
}
```

Signed-off-by: Waldemar Quevedo <wally@nats.io>

73649 of 87070 relevant lines covered (84.59%)

340562.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.3
/server/server.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "crypto/fips140"
20
        "crypto/tls"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math/rand"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path"
33
        "path/filepath"
34
        "reflect"
35
        "regexp"
36
        "runtime"
37
        "runtime/pprof"
38
        "strconv"
39
        "strings"
40
        "sync"
41
        "sync/atomic"
42
        "time"
43

44
        // Allow dynamic profiling.
45
        _ "net/http/pprof"
46

47
        "expvar"
48

49
        "github.com/klauspost/compress/s2"
50
        "github.com/nats-io/jwt/v2"
51
        "github.com/nats-io/nats-server/v2/logger"
52
        "github.com/nats-io/nkeys"
53
        "github.com/nats-io/nuid"
54
)
55

56
const (
57
        // Interval for the first PING for non client connections.
58
        firstPingInterval = time.Second
59

60
        // This is for the first ping for client connections.
61
        firstClientPingInterval = 2 * time.Second
62
)
63

64
// These are protocol versions sent between server connections: ROUTER, LEAF and
65
// GATEWAY. We may have protocol versions that have a meaning only for a certain
66
// type of connections, but we don't have to have separate enums for that.
67
// However, it is CRITICAL to not change the order of those constants since they
68
// are exchanged between servers. When adding a new protocol version, add to the
69
// end of the list, don't try to group them by connection types.
70
const (
71
        // RouteProtoZero is the original Route protocol from 2009.
72
        // http://nats.io/documentation/internals/nats-protocol/
73
        RouteProtoZero = iota
74
        // RouteProtoInfo signals a route can receive more then the original INFO block.
75
        // This can be used to update remote cluster permissions, etc...
76
        RouteProtoInfo
77
        // RouteProtoV2 is the new route/cluster protocol that provides account support.
78
        RouteProtoV2
79
        // MsgTraceProto indicates that this server understands distributed message tracing.
80
        MsgTraceProto
81
)
82

83
// Will return the latest server-to-server protocol versions, unless the
84
// option to override it is set.
85
func (s *Server) getServerProto() int {
10,030✔
86
        opts := s.getOpts()
10,030✔
87
        // Initialize with the latest protocol version.
10,030✔
88
        proto := MsgTraceProto
10,030✔
89
        // For tests, we want to be able to make this server behave
10,030✔
90
        // as an older server so check this option to see if we should override.
10,030✔
91
        if opts.overrideProto < 0 {
10,040✔
92
                // The option overrideProto is set to 0 by default (when creating an
10✔
93
                // Options structure). Since this is the same value than the original
10✔
94
                // proto RouteProtoZero, tests call setServerProtoForTest() with the
10✔
95
                // desired protocol level, which sets it as negative value equal to:
10✔
96
                // (wantedProto + 1) * -1. Here we compute back the real value.
10✔
97
                proto = (opts.overrideProto * -1) - 1
10✔
98
        }
10✔
99
        return proto
10,030✔
100
}
101

102
// Used by tests.
103
func setServerProtoForTest(wantedProto int) int {
5✔
104
        return (wantedProto + 1) * -1
5✔
105
}
5✔
106

107
// Info is the information sent to clients, routes, gateways, and leaf nodes,
108
// to help them understand information about this server.
109
type Info struct {
110
        ID                string   `json:"server_id"`
111
        Name              string   `json:"server_name"`
112
        Version           string   `json:"version"`
113
        Proto             int      `json:"proto"`
114
        GitCommit         string   `json:"git_commit,omitempty"`
115
        GoVersion         string   `json:"go"`
116
        Host              string   `json:"host"`
117
        Port              int      `json:"port"`
118
        Headers           bool     `json:"headers"`
119
        AuthRequired      bool     `json:"auth_required,omitempty"`
120
        TLSRequired       bool     `json:"tls_required,omitempty"`
121
        TLSVerify         bool     `json:"tls_verify,omitempty"`
122
        TLSAvailable      bool     `json:"tls_available,omitempty"`
123
        MaxPayload        int32    `json:"max_payload"`
124
        JetStream         bool     `json:"jetstream,omitempty"`
125
        IP                string   `json:"ip,omitempty"`
126
        CID               uint64   `json:"client_id,omitempty"`
127
        ClientIP          string   `json:"client_ip,omitempty"`
128
        Nonce             string   `json:"nonce,omitempty"`
129
        Cluster           string   `json:"cluster,omitempty"`
130
        Dynamic           bool     `json:"cluster_dynamic,omitempty"`
131
        Domain            string   `json:"domain,omitempty"`
132
        ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
133
        WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
134
        LameDuckMode      bool     `json:"ldm,omitempty"`
135
        Compression       string   `json:"compression,omitempty"`
136
        ConnectInfo       bool     `json:"connect_info,omitempty"`   // When true this is the server INFO response to CONNECT
137
        RemoteAccount     string   `json:"remote_account,omitempty"` // Lets the client or leafnode side know the remote account that they bind to.
138
        IsSystemAccount   bool     `json:"acc_is_sys,omitempty"`     // Indicates if the account is a system account.
139
        JSApiLevel        int      `json:"api_lvl,omitempty"`
140

141
        // Route Specific
142
        Import        *SubjectPermission `json:"import,omitempty"`
143
        Export        *SubjectPermission `json:"export,omitempty"`
144
        LNOC          bool               `json:"lnoc,omitempty"`
145
        LNOCU         bool               `json:"lnocu,omitempty"`
146
        InfoOnConnect bool               `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
147
        RoutePoolSize int                `json:"route_pool_size,omitempty"`
148
        RoutePoolIdx  int                `json:"route_pool_idx,omitempty"`
149
        RouteAccount  string             `json:"route_account,omitempty"`
150
        RouteAccReqID string             `json:"route_acc_add_reqid,omitempty"`
151
        GossipMode    byte               `json:"gossip_mode,omitempty"`
152

153
        // Gateways Specific
154
        Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
155
        GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
156
        GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
157
        GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
158
        GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
159
        GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies
160
        GatewayIOM        bool     `json:"gateway_iom,omitempty"`         // Indicate that all accounts will be switched to InterestOnly mode "right away"
161

162
        // LeafNode Specific
163
        LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
164

165
        XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
166
}
167

168
// Server is our main struct.
169
type Server struct {
170
        // Fields accessed with atomic operations need to be 64-bit aligned
171
        gcid uint64
172
        // How often user logon fails due to the issuer account not being pinned.
173
        pinnedAccFail uint64
174
        stats
175
        scStats
176
        staleStats
177
        mu                  sync.RWMutex
178
        reloadMu            sync.RWMutex // Write-locked when a config reload is taking place ONLY
179
        kp                  nkeys.KeyPair
180
        xkp                 nkeys.KeyPair
181
        xpub                string
182
        info                Info
183
        configFile          string
184
        optsMu              sync.RWMutex
185
        opts                *Options
186
        running             atomic.Bool
187
        shutdown            atomic.Bool
188
        listener            net.Listener
189
        listenerErr         error
190
        gacc                *Account
191
        sys                 *internal
192
        sysAcc              atomic.Pointer[Account]
193
        js                  atomic.Pointer[jetStream]
194
        isMetaLeader        atomic.Bool
195
        jsClustered         atomic.Bool
196
        accounts            sync.Map
197
        tmpAccounts         sync.Map // Temporarily stores accounts that are being built
198
        activeAccounts      int32
199
        accResolver         AccountResolver
200
        clients             map[uint64]*client
201
        routes              map[string][]*client
202
        remoteRoutePoolSize map[string]int                // Map for remote's configure route pool size
203
        routesPoolSize      int                           // Configured pool size
204
        routesReject        bool                          // During reload, we may want to reject adding routes until some conditions are met
205
        routesNoPool        int                           // Number of routes that don't use pooling (connecting to older server for instance)
206
        accRoutes           map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
207
        accRouteByHash      sync.Map                      // Key is account name, value is nil or a pool index
208
        accAddedCh          chan struct{}
209
        accAddedReqID       string
210
        leafs               map[uint64]*client
211
        users               map[string]*User
212
        nkeys               map[string]*NkeyUser
213
        totalClients        uint64
214
        closed              *closedRingBuffer
215
        done                chan bool
216
        start               time.Time
217
        http                net.Listener
218
        httpHandler         http.Handler
219
        httpBasePath        string
220
        profiler            net.Listener
221
        httpReqStats        map[string]uint64
222
        routeListener       net.Listener
223
        routeListenerErr    error
224
        routeInfo           Info
225
        routeResolver       netResolver
226
        routesToSelf        map[string]struct{}
227
        routeTLSName        string
228
        leafNodeListener    net.Listener
229
        leafNodeListenerErr error
230
        leafNodeInfo        Info
231
        leafNodeInfoJSON    []byte
232
        leafURLsMap         refCountedUrlSet
233
        leafNodeOpts        struct {
234
                resolver    netResolver
235
                dialTimeout time.Duration
236
        }
237
        leafRemoteCfgs     []*leafNodeCfg
238
        leafRemoteAccounts sync.Map
239
        leafNodeEnabled    bool
240
        leafDisableConnect bool // Used in test only
241
        leafNoCluster      bool // Indicate that this server has only remotes and no cluster defined
242

243
        quitCh           chan struct{}
244
        startupComplete  chan struct{}
245
        shutdownComplete chan struct{}
246

247
        // Tracking Go routines
248
        grMu         sync.Mutex
249
        grTmpClients map[uint64]*client
250
        grRunning    bool
251
        grWG         sync.WaitGroup // to wait on various go routines
252

253
        cproto     int64     // number of clients supporting async INFO
254
        configTime time.Time // last time config was loaded
255

256
        logging struct {
257
                sync.RWMutex
258
                logger      Logger
259
                trace       int32
260
                debug       int32
261
                traceSysAcc int32
262
        }
263

264
        clientConnectURLs []string
265

266
        // Used internally for quick look-ups.
267
        clientConnectURLsMap refCountedUrlSet
268

269
        // For Gateways
270
        gatewayListener    net.Listener // Accept listener
271
        gatewayListenerErr error
272
        gateway            *srvGateway
273

274
        // Used by tests to check that http.Servers do
275
        // not set any timeout.
276
        monitoringServer *http.Server
277
        profilingServer  *http.Server
278

279
        // LameDuck mode
280
        ldm   bool
281
        ldmCh chan bool
282

283
        // Trusted public operator keys.
284
        trustedKeys []string
285
        // map of trusted keys to operator setting StrictSigningKeyUsage
286
        strictSigningKeyUsage map[string]struct{}
287

288
        // We use this to minimize mem copies for requests to monitoring
289
        // endpoint /varz (when it comes from http).
290
        varzMu sync.Mutex
291
        varz   *Varz
292
        // This is set during a config reload if we detect that we have
293
        // added/removed routes. The monitoring code then check that
294
        // to know if it should update the cluster's URLs array.
295
        varzUpdateRouteURLs bool
296

297
        // Keeps a sublist of of subscriptions attached to leafnode connections
298
        // for the $GNR.*.*.*.> subject so that a server can send back a mapped
299
        // gateway reply.
300
        gwLeafSubs *Sublist
301

302
        // Used for expiration of mapped GW replies
303
        gwrm struct {
304
                w  int32
305
                ch chan time.Duration
306
                m  sync.Map
307
        }
308

309
        // For eventIDs
310
        eventIds *nuid.NUID
311

312
        // Websocket structure
313
        websocket srvWebsocket
314

315
        // MQTT structure
316
        mqtt srvMQTT
317

318
        // OCSP monitoring
319
        ocsps []*OCSPMonitor
320

321
        // OCSP peer verification (at least one TLS block)
322
        ocspPeerVerify bool
323

324
        // OCSP response cache
325
        ocsprc OCSPResponseCache
326

327
        // exporting account name the importer experienced issues with
328
        incompleteAccExporterMap sync.Map
329

330
        // Holds cluster name under different lock for mapping
331
        cnMu sync.RWMutex
332
        cn   string
333

334
        // For registering raft nodes with the server.
335
        rnMu      sync.RWMutex
336
        raftNodes map[string]RaftNode
337

338
        // For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
339
        nodeToInfo sync.Map
340

341
        // For out of resources to not log errors too fast.
342
        rerrMu   sync.Mutex
343
        rerrLast time.Time
344

345
        connRateCounter *rateCounter
346

347
        // If there is a system account configured, to still support the $G account,
348
        // the server will create a fake user and add it to the list of users.
349
        // Keep track of what that user name is for config reload purposes.
350
        sysAccOnlyNoAuthUser string
351

352
        // IPQueues map
353
        ipQueues sync.Map
354

355
        // To limit logging frequency
356
        rateLimitLogging   sync.Map
357
        rateLimitLoggingCh chan time.Duration
358

359
        // Total outstanding catchup bytes in flight.
360
        gcbMu     sync.RWMutex
361
        gcbOut    int64
362
        gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
363
        // A global chanel to kick out stalled catchup sequences.
364
        gcbKick chan struct{}
365

366
        // Total outbound syncRequests
367
        syncOutSem chan struct{}
368

369
        // Queue to process JS API requests that come from routes (or gateways)
370
        jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
371

372
        // Delayed API responses.
373
        delayedAPIResponses *ipQueue[*delayedAPIResponse]
374

375
        // Whether moving NRG traffic into accounts is permitted on this server.
376
        // Controls whether or not the account NRG capability is set in statsz.
377
        // Currently used by unit tests to simulate nodes not supporting account NRG.
378
        accountNRGAllowed atomic.Bool
379

380
        // List of proxies trusted keys in `KeyPair` form so we can do signature
381
        // verification when processing incoming proxy connections.
382
        proxiesKeyPairs []nkeys.KeyPair
383
        proxiedConns    map[string]map[uint64]*client
384
}
385

386
// For tracking JS nodes.
387
type nodeInfo struct {
388
        name            string
389
        version         string
390
        cluster         string
391
        domain          string
392
        id              string
393
        tags            jwt.TagList
394
        cfg             *JetStreamConfig
395
        stats           *JetStreamStats
396
        offline         bool
397
        js              bool
398
        binarySnapshots bool
399
        accountNRG      bool
400
}
401

402
type stats struct {
403
        inMsgs           int64
404
        outMsgs          int64
405
        inBytes          int64
406
        outBytes         int64
407
        slowConsumers    int64
408
        staleConnections int64
409
        stalls           int64
410
}
411

412
// scStats includes the total and per connection counters of Slow Consumers.
413
type scStats struct {
414
        clients  atomic.Uint64
415
        routes   atomic.Uint64
416
        leafs    atomic.Uint64
417
        gateways atomic.Uint64
418
}
419

420
// staleStats includes the total and per connection counters of Stale Connections.
421
type staleStats struct {
422
        clients  atomic.Uint64
423
        routes   atomic.Uint64
424
        leafs    atomic.Uint64
425
        gateways atomic.Uint64
426
}
427

428
// This is used by tests so we can run all server tests with a default route
429
// or leafnode compression mode. For instance:
430
// go test -race -v ./server -cluster_compression=fast
431
var (
432
        testDefaultClusterCompression  string
433
        testDefaultLeafNodeCompression string
434
)
435

436
// Compression modes.
437
const (
438
        CompressionNotSupported   = "not supported"
439
        CompressionOff            = "off"
440
        CompressionAccept         = "accept"
441
        CompressionS2Auto         = "s2_auto"
442
        CompressionS2Uncompressed = "s2_uncompressed"
443
        CompressionS2Fast         = "s2_fast"
444
        CompressionS2Better       = "s2_better"
445
        CompressionS2Best         = "s2_best"
446
)
447

448
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
449
// the CompressionS2Auto mode.
450
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
451
        // [0..10ms] -> CompressionS2Uncompressed
452
        10 * time.Millisecond,
453
        // ]10ms..50ms] -> CompressionS2Fast
454
        50 * time.Millisecond,
455
        // ]50ms..100ms] -> CompressionS2Better
456
        100 * time.Millisecond,
457
        // ]100ms..] -> CompressionS2Best
458
}
459

460
// For a given user provided string, matches to one of the compression mode
461
// constant and updates the provided string to that constant. Returns an
462
// error if the provided compression mode is not known.
463
// The parameter `chosenModeForOn` indicates which compression mode to use
464
// when the user selects "on" (or enabled, true, etc..). This is because
465
// we may have different defaults depending on where the compression is used.
466
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
10,713✔
467
        if c == nil {
10,713✔
468
                return nil
×
469
        }
×
470
        cmtl := strings.ToLower(c.Mode)
10,713✔
471
        // First, check for the "on" case so that we set to the default compression
10,713✔
472
        // mode for that. The other switch/case will finish setup if needed (for
10,713✔
473
        // instance if the default mode is s2Auto).
10,713✔
474
        switch cmtl {
10,713✔
475
        case "on", "enabled", "true":
14✔
476
                cmtl = chosenModeForOn
14✔
477
        default:
10,699✔
478
        }
479
        // Check (again) with the proper mode.
480
        switch cmtl {
10,713✔
481
        case "not supported", "not_supported":
4✔
482
                c.Mode = CompressionNotSupported
4✔
483
        case "disabled", "off", "false":
1,780✔
484
                c.Mode = CompressionOff
1,780✔
485
        case "accept":
4,187✔
486
                c.Mode = CompressionAccept
4,187✔
487
        case "auto", "s2_auto":
4,643✔
488
                var rtts []time.Duration
4,643✔
489
                if len(c.RTTThresholds) == 0 {
8,503✔
490
                        rtts = defaultCompressionS2AutoRTTThresholds
3,860✔
491
                } else {
4,643✔
492
                        for _, n := range c.RTTThresholds {
3,142✔
493
                                // Do not error on negative, but simply set to 0
2,359✔
494
                                if n < 0 {
2,363✔
495
                                        n = 0
4✔
496
                                }
4✔
497
                                // Make sure they are properly ordered. However, it is possible
498
                                // to have a "0" anywhere in the list to indicate that this
499
                                // compression level should not be used.
500
                                if l := len(rtts); l > 0 && n != 0 {
3,905✔
501
                                        for _, v := range rtts {
3,887✔
502
                                                if n < v {
2,343✔
503
                                                        return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
2✔
504
                                                }
2✔
505
                                        }
506
                                }
507
                                rtts = append(rtts, n)
2,357✔
508
                        }
509
                        if len(rtts) > 0 {
1,562✔
510
                                // Trim 0 that are at the end.
781✔
511
                                stop := -1
781✔
512
                                for i := len(rtts) - 1; i >= 0; i-- {
1,576✔
513
                                        if rtts[i] != 0 {
1,572✔
514
                                                stop = i
777✔
515
                                                break
777✔
516
                                        }
517
                                }
518
                                rtts = rtts[:stop+1]
781✔
519
                        }
520
                        if len(rtts) > 4 {
783✔
521
                                // There should be at most values for "uncompressed", "fast",
2✔
522
                                // "better" and "best" (when some 0 are present).
2✔
523
                                return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
2✔
524
                        } else if len(rtts) == 0 {
785✔
525
                                // But there should be at least 1 if the user provided the slice.
4✔
526
                                // We would be here only if it was provided by say with values
4✔
527
                                // being a single or all zeros.
4✔
528
                                return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
4✔
529
                        }
4✔
530
                }
531
                c.Mode = CompressionS2Auto
4,635✔
532
                c.RTTThresholds = rtts
4,635✔
533
        case "fast", "s2_fast":
29✔
534
                c.Mode = CompressionS2Fast
29✔
535
        case "better", "s2_better":
35✔
536
                c.Mode = CompressionS2Better
35✔
537
        case "best", "s2_best":
33✔
538
                c.Mode = CompressionS2Best
33✔
539
        default:
2✔
540
                return fmt.Errorf("unsupported compression mode %q", c.Mode)
2✔
541
        }
542
        return nil
10,703✔
543
}
544

545
// Returns `true` if the compression mode `m` indicates that the server
546
// will negotiate compression with the remote server, `false` otherwise.
547
// Note that the provided compression mode is assumed to have been
548
// normalized and validated.
549
func needsCompression(m string) bool {
172,382✔
550
        return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
172,382✔
551
}
172,382✔
552

553
// Compression is asymmetric, meaning that one side can have a different
554
// compression level than the other. However, we need to check for cases
555
// when this server `scm` or the remote `rcm` do not support compression
556
// (say older server, or test to make it behave as it is not), or have
557
// the compression off.
558
// Note that `scm` is assumed to not be "off" or "not supported".
559
func selectCompressionMode(scm, rcm string) (mode string, err error) {
41,580✔
560
        if rcm == CompressionNotSupported || rcm == _EMPTY_ {
41,646✔
561
                return CompressionNotSupported, nil
66✔
562
        }
66✔
563
        switch rcm {
41,514✔
564
        case CompressionOff:
16,549✔
565
                // If the remote explicitly disables compression, then we won't
16,549✔
566
                // use compression.
16,549✔
567
                return CompressionOff, nil
16,549✔
568
        case CompressionAccept:
23,765✔
569
                // If the remote is ok with compression (but is not initiating it),
23,765✔
570
                // and if we too are in this mode, then it means no compression.
23,765✔
571
                if scm == CompressionAccept {
47,505✔
572
                        return CompressionOff, nil
23,740✔
573
                }
23,740✔
574
                // Otherwise use our compression mode.
575
                return scm, nil
25✔
576
        case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
1,200✔
577
                // This case is here to make sure that if we don't recognize a
1,200✔
578
                // compression setting, we error out.
1,200✔
579
                if scm == CompressionAccept {
1,236✔
580
                        // If our compression mode is "accept", then we will use the remote
36✔
581
                        // compression mode, except if it is "auto", in which case we will
36✔
582
                        // default to "fast". This is not a configuration (auto in one
36✔
583
                        // side and accept in the other) that would be recommended.
36✔
584
                        if rcm == CompressionS2Auto {
55✔
585
                                return CompressionS2Fast, nil
19✔
586
                        }
19✔
587
                        // Use their compression mode.
588
                        return rcm, nil
17✔
589
                }
590
                // Otherwise use our compression mode.
591
                return scm, nil
1,164✔
592
        default:
×
593
                return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
×
594
        }
595
}
596

597
// If the configured compression mode is "auto" then will return that,
598
// otherwise will return the given `cm` compression mode.
599
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
17,656✔
600
        if co.Mode == CompressionS2Auto {
18,789✔
601
                return CompressionS2Auto
1,133✔
602
        }
1,133✔
603
        return cm
16,523✔
604
}
605

606
// Given a connection RTT and a list of thresholds durations, this
607
// function will return an S2 compression level such as "uncompressed",
608
// "fast", "better" or "best". For instance, with the following slice:
609
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
610
// in the compression level "uncompressed", ]5ms..10ms] will result in
611
// "fast" compression, etc..
612
// However, the 0 value allows for disabling of some compression levels.
613
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
614
// [0..20ms] would result in the "better" compression - effectively disabling
615
// the use of "uncompressed" and "fast", then anything above 20ms would
616
// result in the use of "best" level (the 30 in the list has no effect
617
// and the list could have been simplified to [0, 0, 20]).
618
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
1,499✔
619
        var idx int
1,499✔
620
        var found bool
1,499✔
621
        for i, d := range rttThresholds {
3,036✔
622
                if rtt <= d {
3,020✔
623
                        idx = i
1,483✔
624
                        found = true
1,483✔
625
                        break
1,483✔
626
                }
627
        }
628
        if !found {
1,515✔
629
                // If we did not find but we have all levels, then use "best",
16✔
630
                // otherwise use the last one in array.
16✔
631
                if l := len(rttThresholds); l >= 3 {
32✔
632
                        idx = 3
16✔
633
                } else {
16✔
634
                        idx = l - 1
×
635
                }
×
636
        }
637
        switch idx {
1,499✔
638
        case 0:
1,479✔
639
                return CompressionS2Uncompressed
1,479✔
640
        case 1:
2✔
641
                return CompressionS2Fast
2✔
642
        case 2:
2✔
643
                return CompressionS2Better
2✔
644
        }
645
        return CompressionS2Best
16✔
646
}
647

648
func compressOptsEqual(c1, c2 *CompressionOpts) bool {
268✔
649
        if c1 == c2 {
268✔
650
                return true
×
651
        }
×
652
        if (c1 == nil && c2 != nil) || (c1 != nil && c2 == nil) {
268✔
653
                return false
×
654
        }
×
655
        if c1.Mode != c2.Mode {
339✔
656
                return false
71✔
657
        }
71✔
658
        // For s2_auto, if one has an empty RTTThresholds, it is equivalent
659
        // to the defaultCompressionS2AutoRTTThresholds array, so compare with that.
660
        if c1.Mode == CompressionS2Auto {
223✔
661
                if len(c1.RTTThresholds) == 0 && !reflect.DeepEqual(c2.RTTThresholds, defaultCompressionS2AutoRTTThresholds) {
26✔
662
                        return false
×
663
                }
×
664
                if len(c2.RTTThresholds) == 0 && !reflect.DeepEqual(c1.RTTThresholds, defaultCompressionS2AutoRTTThresholds) {
26✔
665
                        return false
×
666
                }
×
667
                if !reflect.DeepEqual(c1.RTTThresholds, c2.RTTThresholds) {
52✔
668
                        return false
26✔
669
                }
26✔
670
        }
671
        return true
171✔
672
}
673

674
// Returns an array of s2 WriterOption based on the route compression mode.
675
// So far we return a single option, but this way we can call s2.NewWriter()
676
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
677
// this is more versatile.
678
func s2WriterOptions(cm string) []s2.WriterOption {
1,266✔
679
        _opts := [2]s2.WriterOption{}
1,266✔
680
        opts := append(
1,266✔
681
                _opts[:0],
1,266✔
682
                s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
1,266✔
683
        )
1,266✔
684
        switch cm {
1,266✔
685
        case CompressionS2Uncompressed:
1,130✔
686
                return append(opts, s2.WriterUncompressed())
1,130✔
687
        case CompressionS2Best:
42✔
688
                return append(opts, s2.WriterBestCompression())
42✔
689
        case CompressionS2Better:
51✔
690
                return append(opts, s2.WriterBetterCompression())
51✔
691
        default:
43✔
692
                return nil
43✔
693
        }
694
}
695

696
// New will setup a new server struct after parsing the options.
697
// DEPRECATED: Use NewServer(opts)
698
func New(opts *Options) *Server {
280✔
699
        s, _ := NewServer(opts)
280✔
700
        return s
280✔
701
}
280✔
702

703
func NewServerFromConfig(opts *Options) (*Server, error) {
3✔
704
        if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
6✔
705
                if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
4✔
706
                        return nil, err
1✔
707
                }
1✔
708
        }
709
        return NewServer(opts)
2✔
710
}
711

712
// NewServer will setup a new server struct after parsing the options.
713
// Could return an error if options can not be validated.
714
// The provided Options type should not be re-used afterwards.
715
// Either use Options.Clone() to pass a copy, or make a new one.
716
func NewServer(opts *Options) (*Server, error) {
6,982✔
717
        setBaselineOptions(opts)
6,982✔
718

6,982✔
719
        // Process TLS options, including whether we require client certificates.
6,982✔
720
        tlsReq := opts.TLSConfig != nil
6,982✔
721
        verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
6,982✔
722

6,982✔
723
        // Create our server's nkey identity.
6,982✔
724
        kp, _ := nkeys.CreateServer()
6,982✔
725
        pub, _ := kp.PublicKey()
6,982✔
726

6,982✔
727
        // Create an xkey for encrypting messages from this server.
6,982✔
728
        var xkp nkeys.KeyPair
6,982✔
729
        var xpub string
6,982✔
730
        if !fips140.Enabled() {
13,964✔
731
                xkp, _ = nkeys.CreateCurveKeys()
6,982✔
732
                xpub, _ = xkp.PublicKey()
6,982✔
733
        }
6,982✔
734

735
        serverName := pub
6,982✔
736
        if opts.ServerName != _EMPTY_ {
11,237✔
737
                serverName = opts.ServerName
4,255✔
738
        }
4,255✔
739

740
        httpBasePath := normalizeBasePath(opts.HTTPBasePath)
6,982✔
741

6,982✔
742
        // Validate some options. This is here because we cannot assume that
6,982✔
743
        // server will always be started with configuration parsing (that could
6,982✔
744
        // report issues). Its options can be (incorrectly) set by hand when
6,982✔
745
        // server is embedded. If there is an error, return nil.
6,982✔
746
        if err := validateOptions(opts); err != nil {
7,036✔
747
                return nil, err
54✔
748
        }
54✔
749

750
        info := Info{
6,928✔
751
                ID:           pub,
6,928✔
752
                XKey:         xpub,
6,928✔
753
                Version:      VERSION,
6,928✔
754
                Proto:        PROTO,
6,928✔
755
                GitCommit:    gitCommit,
6,928✔
756
                GoVersion:    runtime.Version(),
6,928✔
757
                Name:         serverName,
6,928✔
758
                Host:         opts.Host,
6,928✔
759
                Port:         opts.Port,
6,928✔
760
                AuthRequired: false,
6,928✔
761
                TLSRequired:  tlsReq && !opts.AllowNonTLS,
6,928✔
762
                TLSVerify:    verify,
6,928✔
763
                MaxPayload:   opts.MaxPayload,
6,928✔
764
                JetStream:    opts.JetStream,
6,928✔
765
                Headers:      !opts.NoHeaderSupport,
6,928✔
766
                Cluster:      opts.Cluster.Name,
6,928✔
767
                Domain:       opts.JetStreamDomain,
6,928✔
768
                JSApiLevel:   JSApiLevel,
6,928✔
769
        }
6,928✔
770

6,928✔
771
        if tlsReq && !info.TLSRequired {
6,931✔
772
                info.TLSAvailable = true
3✔
773
        }
3✔
774

775
        now := time.Now()
6,928✔
776

6,928✔
777
        s := &Server{
6,928✔
778
                kp:                 kp,
6,928✔
779
                xkp:                xkp,
6,928✔
780
                xpub:               xpub,
6,928✔
781
                configFile:         opts.ConfigFile,
6,928✔
782
                info:               info,
6,928✔
783
                opts:               opts,
6,928✔
784
                done:               make(chan bool, 1),
6,928✔
785
                start:              now,
6,928✔
786
                configTime:         now,
6,928✔
787
                gwLeafSubs:         NewSublistWithCache(),
6,928✔
788
                httpBasePath:       httpBasePath,
6,928✔
789
                eventIds:           nuid.New(),
6,928✔
790
                routesToSelf:       make(map[string]struct{}),
6,928✔
791
                httpReqStats:       make(map[string]uint64), // Used to track HTTP requests
6,928✔
792
                rateLimitLoggingCh: make(chan time.Duration, 1),
6,928✔
793
                leafNodeEnabled:    opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
6,928✔
794
                syncOutSem:         make(chan struct{}, maxConcurrentSyncRequests),
6,928✔
795
        }
6,928✔
796

6,928✔
797
        // Delayed API response queue. Create regardless if JetStream is configured
6,928✔
798
        // or not (since it can be enabled/disabled with config reload, we want this
6,928✔
799
        // queue to exist at all times).
6,928✔
800
        s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
6,928✔
801

6,928✔
802
        // By default we'll allow account NRG.
6,928✔
803
        s.accountNRGAllowed.Store(true)
6,928✔
804

6,928✔
805
        // Fill up the maximum in flight syncRequests for this server.
6,928✔
806
        // Used in JetStream catchup semantics.
6,928✔
807
        for i := 0; i < maxConcurrentSyncRequests; i++ {
228,624✔
808
                s.syncOutSem <- struct{}{}
221,696✔
809
        }
221,696✔
810

811
        if opts.TLSRateLimit > 0 {
6,929✔
812
                s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
1✔
813
        }
1✔
814

815
        // Trusted root operator keys.
816
        if !s.processTrustedKeys() {
6,928✔
817
                return nil, fmt.Errorf("Error processing trusted operator keys")
×
818
        }
×
819

820
        // If we have solicited leafnodes but no clustering and no clustername.
821
        // However we may need a stable clustername so use the server name.
822
        if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
7,859✔
823
                s.leafNoCluster = true
931✔
824
                opts.Cluster.Name = opts.ServerName
931✔
825
        }
931✔
826

827
        if opts.Cluster.Name != _EMPTY_ {
11,789✔
828
                // Also place into mapping cn with cnMu lock.
4,861✔
829
                s.cnMu.Lock()
4,861✔
830
                s.cn = opts.Cluster.Name
4,861✔
831
                s.cnMu.Unlock()
4,861✔
832
        }
4,861✔
833

834
        s.mu.Lock()
6,928✔
835
        defer s.mu.Unlock()
6,928✔
836

6,928✔
837
        // If there are proxies trusted public keys in the configuration
6,928✔
838
        // this will fill create the corresponding list of nkeys.KeyPair
6,928✔
839
        // that we can use for signature verification.
6,928✔
840
        s.processProxiesTrustedKeys()
6,928✔
841

6,928✔
842
        // Place ourselves in the JetStream nodeInfo if needed.
6,928✔
843
        if opts.JetStream {
11,037✔
844
                ourNode := getHash(serverName)
4,109✔
845
                s.nodeToInfo.Store(ourNode, nodeInfo{
4,109✔
846
                        name:            serverName,
4,109✔
847
                        version:         VERSION,
4,109✔
848
                        cluster:         opts.Cluster.Name,
4,109✔
849
                        domain:          opts.JetStreamDomain,
4,109✔
850
                        id:              info.ID,
4,109✔
851
                        tags:            opts.Tags,
4,109✔
852
                        cfg:             &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
4,109✔
853
                        stats:           nil,
4,109✔
854
                        offline:         false,
4,109✔
855
                        js:              true,
4,109✔
856
                        binarySnapshots: true,
4,109✔
857
                        accountNRG:      true,
4,109✔
858
                })
4,109✔
859
        }
4,109✔
860

861
        s.routeResolver = opts.Cluster.resolver
6,928✔
862
        if s.routeResolver == nil {
13,851✔
863
                s.routeResolver = net.DefaultResolver
6,923✔
864
        }
6,923✔
865

866
        // Used internally for quick look-ups.
867
        s.clientConnectURLsMap = make(refCountedUrlSet)
6,928✔
868
        s.websocket.connectURLsMap = make(refCountedUrlSet)
6,928✔
869
        s.leafURLsMap = make(refCountedUrlSet)
6,928✔
870

6,928✔
871
        // Ensure that non-exported options (used in tests) are properly set.
6,928✔
872
        s.setLeafNodeNonExportedOptions()
6,928✔
873

6,928✔
874
        // Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
6,928✔
875
        // are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
6,928✔
876
        if err := s.enableOCSP(); err != nil {
6,930✔
877
                return nil, err
2✔
878
        }
2✔
879

880
        // Call this even if there is no gateway defined. It will
881
        // initialize the structure so we don't have to check for
882
        // it to be nil or not in various places in the code.
883
        if err := s.newGateway(opts); err != nil {
6,926✔
884
                return nil, err
×
885
        }
×
886

887
        // If we have a cluster definition but do not have a cluster name, create one.
888
        if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
6,967✔
889
                s.info.Cluster = nuid.Next()
41✔
890
        } else if opts.Cluster.Name != _EMPTY_ {
11,787✔
891
                // Likewise here if we have a cluster name set.
4,861✔
892
                s.info.Cluster = opts.Cluster.Name
4,861✔
893
        }
4,861✔
894

895
        // This is normally done in the AcceptLoop, once the
896
        // listener has been created (possibly with random port),
897
        // but since some tests may expect the INFO to be properly
898
        // set after New(), let's do it now.
899
        s.setInfoHostPort()
6,926✔
900

6,926✔
901
        // For tracking clients
6,926✔
902
        s.clients = make(map[uint64]*client)
6,926✔
903

6,926✔
904
        // For tracking closed clients.
6,926✔
905
        s.closed = newClosedRingBuffer(opts.MaxClosedClients)
6,926✔
906

6,926✔
907
        // For tracking connections that are not yet registered
6,926✔
908
        // in s.routes, but for which readLoop has started.
6,926✔
909
        s.grTmpClients = make(map[uint64]*client)
6,926✔
910

6,926✔
911
        // For tracking routes and their remote ids
6,926✔
912
        s.initRouteStructures(opts)
6,926✔
913

6,926✔
914
        // For tracking leaf nodes.
6,926✔
915
        s.leafs = make(map[uint64]*client)
6,926✔
916

6,926✔
917
        // Used to kick out all go routines possibly waiting on server
6,926✔
918
        // to shutdown.
6,926✔
919
        s.quitCh = make(chan struct{})
6,926✔
920

6,926✔
921
        // Closed when startup is complete. ReadyForConnections() will block on
6,926✔
922
        // this before checking the presence of listening sockets.
6,926✔
923
        s.startupComplete = make(chan struct{})
6,926✔
924

6,926✔
925
        // Closed when Shutdown() is complete. Allows WaitForShutdown() to block
6,926✔
926
        // waiting for complete shutdown.
6,926✔
927
        s.shutdownComplete = make(chan struct{})
6,926✔
928

6,926✔
929
        // Check for configured account resolvers.
6,926✔
930
        if err := s.configureResolver(); err != nil {
6,926✔
931
                return nil, err
×
932
        }
×
933
        // If there is an URL account resolver, do basic test to see if anyone is home.
934
        if ar := opts.AccountResolver; ar != nil {
7,287✔
935
                if ur, ok := ar.(*URLAccResolver); ok {
398✔
936
                        if _, err := ur.Fetch(_EMPTY_); err != nil {
38✔
937
                                return nil, err
1✔
938
                        }
1✔
939
                }
940
        }
941
        // For other resolver:
942
        // In operator mode, when the account resolver depends on an external system and
943
        // the system account can't be fetched, inject a temporary one.
944
        if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
6,925✔
945
                opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
7,185✔
946
                if _, ok := ar.(*MemAccResolver); !ok {
413✔
947
                        s.mu.Unlock()
153✔
948
                        var a *Account
153✔
949
                        // perform direct lookup to avoid warning trace
153✔
950
                        if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
238✔
951
                                a, _ = s.lookupAccount(opts.SystemAccount)
85✔
952
                        }
85✔
953
                        s.mu.Lock()
153✔
954
                        if a == nil {
221✔
955
                                sac := NewAccount(opts.SystemAccount)
68✔
956
                                sac.Issuer = opts.TrustedOperators[0].Issuer
68✔
957
                                sac.signingKeys = map[string]jwt.Scope{}
68✔
958
                                sac.signingKeys[opts.SystemAccount] = nil
68✔
959
                                s.registerAccountNoLock(sac)
68✔
960
                        }
68✔
961
                }
962
        }
963

964
        // For tracking accounts
965
        if _, err := s.configureAccounts(false); err != nil {
6,925✔
966
                return nil, err
×
967
        }
×
968

969
        // Used to setup Authorization.
970
        s.configureAuthorization()
6,925✔
971

6,925✔
972
        // Start signal handler
6,925✔
973
        s.handleSignals()
6,925✔
974

6,925✔
975
        return s, nil
6,925✔
976
}
977

978
// Initializes route structures based on pooling and/or per-account routes.
979
//
980
// Server lock is held on entry
981
func (s *Server) initRouteStructures(opts *Options) {
6,926✔
982
        s.routes = make(map[string][]*client)
6,926✔
983
        if ps := opts.Cluster.PoolSize; ps > 0 {
10,962✔
984
                s.routesPoolSize = ps
4,036✔
985
        } else {
6,926✔
986
                s.routesPoolSize = 1
2,890✔
987
        }
2,890✔
988
        // If we have per-account routes, we create accRoutes and initialize it
989
        // with nil values. The presence of an account as the key will allow us
990
        // to know if a given account is supposed to have dedicated routes.
991
        if l := len(opts.Cluster.PinnedAccounts); l > 0 {
10,641✔
992
                s.accRoutes = make(map[string]map[string]*client, l)
3,715✔
993
                for _, acc := range opts.Cluster.PinnedAccounts {
7,457✔
994
                        s.accRoutes[acc] = make(map[string]*client)
3,742✔
995
                }
3,742✔
996
        }
997
}
998

999
func (s *Server) logRejectedTLSConns() {
1✔
1000
        defer s.grWG.Done()
1✔
1001
        t := time.NewTicker(time.Second)
1✔
1002
        defer t.Stop()
1✔
1003
        for {
3✔
1004
                select {
2✔
1005
                case <-s.quitCh:
1✔
1006
                        return
1✔
1007
                case <-t.C:
1✔
1008
                        blocked := s.connRateCounter.countBlocked()
1✔
1009
                        if blocked > 0 {
2✔
1010
                                s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
1✔
1011
                        }
1✔
1012
                }
1013
        }
1014
}
1015

1016
// clusterName returns our cluster name which could be dynamic.
1017
func (s *Server) ClusterName() string {
113,637✔
1018
        s.mu.RLock()
113,637✔
1019
        cn := s.info.Cluster
113,637✔
1020
        s.mu.RUnlock()
113,637✔
1021
        return cn
113,637✔
1022
}
113,637✔
1023

1024
// Grabs cluster name with cluster name specific lock.
1025
func (s *Server) cachedClusterName() string {
111,408✔
1026
        s.cnMu.RLock()
111,408✔
1027
        cn := s.cn
111,408✔
1028
        s.cnMu.RUnlock()
111,408✔
1029
        return cn
111,408✔
1030
}
111,408✔
1031

1032
// setClusterName will update the cluster name for this server.
1033
func (s *Server) setClusterName(name string) {
38✔
1034
        s.mu.Lock()
38✔
1035
        var resetCh chan struct{}
38✔
1036
        if s.sys != nil && s.info.Cluster != name {
72✔
1037
                // can't hold the lock as go routine reading it may be waiting for lock as well
34✔
1038
                resetCh = s.sys.resetCh
34✔
1039
        }
34✔
1040
        s.info.Cluster = name
38✔
1041
        s.routeInfo.Cluster = name
38✔
1042

38✔
1043
        // Need to close solicited leaf nodes. The close has to be done outside of the server lock.
38✔
1044
        var leafs []*client
38✔
1045
        for _, c := range s.leafs {
39✔
1046
                c.mu.Lock()
1✔
1047
                if c.leaf != nil && c.leaf.remote != nil {
2✔
1048
                        leafs = append(leafs, c)
1✔
1049
                }
1✔
1050
                c.mu.Unlock()
1✔
1051
        }
1052
        s.mu.Unlock()
38✔
1053

38✔
1054
        // Also place into mapping cn with cnMu lock.
38✔
1055
        s.cnMu.Lock()
38✔
1056
        s.cn = name
38✔
1057
        s.cnMu.Unlock()
38✔
1058

38✔
1059
        for _, l := range leafs {
39✔
1060
                l.closeConnection(ClusterNameConflict)
1✔
1061
        }
1✔
1062
        if resetCh != nil {
72✔
1063
                resetCh <- struct{}{}
34✔
1064
        }
34✔
1065
        s.Noticef("Cluster name updated to %s", name)
38✔
1066
}
1067

1068
// Return whether the cluster name is dynamic.
1069
func (s *Server) isClusterNameDynamic() bool {
42,870✔
1070
        // We need to lock the whole "Cluster.Name" check and not use s.getOpts()
42,870✔
1071
        // because otherwise this could cause a data race with setting the name in
42,870✔
1072
        // route.go's processRouteConnect().
42,870✔
1073
        s.optsMu.RLock()
42,870✔
1074
        dynamic := s.opts.Cluster.Name == _EMPTY_
42,870✔
1075
        s.optsMu.RUnlock()
42,870✔
1076
        return dynamic
42,870✔
1077
}
42,870✔
1078

1079
// Returns our configured serverName.
1080
func (s *Server) serverName() string {
23,692✔
1081
        return s.getOpts().ServerName
23,692✔
1082
}
23,692✔
1083

1084
// ClientURL returns the URL used to connect clients.
1085
// Helpful in tests and with in-process servers using a random client port (-1).
1086
func (s *Server) ClientURL() string {
6,513✔
1087
        // FIXME(dlc) - should we add in user and pass if defined single?
6,513✔
1088
        opts := s.getOpts()
6,513✔
1089
        var u url.URL
6,513✔
1090
        u.Scheme = "nats"
6,513✔
1091
        if opts.TLSConfig != nil {
6,523✔
1092
                u.Scheme = "tls"
10✔
1093
        }
10✔
1094
        u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
6,513✔
1095
        return u.String()
6,513✔
1096
}
1097

1098
// WebsocketURL returns the URL used to connect websocket clients.
1099
// Helpful in tests and with in-process servers using a random websocket port (-1).
1100
func (s *Server) WebsocketURL() string {
×
1101
        opts := s.getOpts()
×
1102
        var u url.URL
×
1103
        u.Scheme = "ws"
×
1104
        if opts.Websocket.TLSConfig != nil {
×
1105
                u.Scheme = "wss"
×
1106
        }
×
1107
        u.Host = net.JoinHostPort(opts.Websocket.Host, fmt.Sprintf("%d", opts.Websocket.Port))
×
1108
        return u.String()
×
1109
}
1110

1111
func validateCluster(o *Options) error {
8,367✔
1112
        if o.Cluster.Name != _EMPTY_ && strings.Contains(o.Cluster.Name, " ") {
8,367✔
1113
                return ErrClusterNameHasSpaces
×
1114
        }
×
1115
        if o.Cluster.Compression.Mode != _EMPTY_ {
13,427✔
1116
                if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil {
5,060✔
1117
                        return err
×
1118
                }
×
1119
        }
1120
        if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil {
8,367✔
1121
                return fmt.Errorf("cluster: %v", err)
×
1122
        }
×
1123
        // Check that cluster name if defined matches any gateway name.
1124
        // Note that we have already verified that the gateway name does not have spaces.
1125
        if o.Gateway.Name != _EMPTY_ && o.Gateway.Name != o.Cluster.Name {
8,730✔
1126
                if o.Cluster.Name != _EMPTY_ {
364✔
1127
                        return ErrClusterNameConfigConflict
1✔
1128
                }
1✔
1129
                // Set this here so we do not consider it dynamic.
1130
                o.Cluster.Name = o.Gateway.Name
362✔
1131
        }
1132
        if l := len(o.Cluster.PinnedAccounts); l > 0 {
12,185✔
1133
                if o.Cluster.PoolSize < 0 {
3,820✔
1134
                        return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
1✔
1135
                }
1✔
1136
                m := make(map[string]struct{}, l)
3,818✔
1137
                for _, a := range o.Cluster.PinnedAccounts {
7,701✔
1138
                        if _, exists := m[a]; exists {
3,883✔
1139
                                return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
×
1140
                        }
×
1141
                        m[a] = struct{}{}
3,883✔
1142
                }
1143
        }
1144
        return nil
8,365✔
1145
}
1146

1147
func validatePinnedCerts(pinned PinnedCertSet) error {
19,031✔
1148
        re := regexp.MustCompile("^[a-f0-9]{64}$")
19,031✔
1149
        for certId := range pinned {
19,037✔
1150
                entry := strings.ToLower(certId)
6✔
1151
                if !re.MatchString(entry) {
6✔
1152
                        return fmt.Errorf("error parsing 'pinned_certs' key %s does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo", entry)
×
1153
                }
×
1154
        }
1155
        return nil
19,031✔
1156
}
1157

1158
func validateOptions(o *Options) error {
8,415✔
1159
        if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
8,415✔
1160
                return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
×
1161
                        o.LameDuckGracePeriod, o.LameDuckDuration)
×
1162
        }
×
1163
        if int64(o.MaxPayload) > o.MaxPending {
8,417✔
1164
                return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)",
2✔
1165
                        o.MaxPayload, o.MaxPending)
2✔
1166
        }
2✔
1167
        if o.ServerName != _EMPTY_ && strings.Contains(o.ServerName, " ") {
8,413✔
1168
                return errors.New("server name cannot contain spaces")
×
1169
        }
×
1170
        // Check that the trust configuration is correct.
1171
        if err := validateTrustedOperators(o); err != nil {
8,426✔
1172
                return err
13✔
1173
        }
13✔
1174
        // Check on leaf nodes which will require a system
1175
        // account when gateways are also configured.
1176
        if err := validateLeafNode(o); err != nil {
8,427✔
1177
                return err
27✔
1178
        }
27✔
1179
        // Check that authentication is properly configured.
1180
        if err := validateAuth(o); err != nil {
8,378✔
1181
                return err
5✔
1182
        }
5✔
1183
        // Check that proxies is properly configured.
1184
        if err := validateProxies(o); err != nil {
8,369✔
1185
                return err
1✔
1186
        }
1✔
1187
        // Check that gateway is properly configured. Returns no error
1188
        // if there is no gateway defined.
1189
        if err := validateGatewayOptions(o); err != nil {
8,367✔
1190
                return err
×
1191
        }
×
1192
        // Check that cluster name if defined matches any gateway name.
1193
        if err := validateCluster(o); err != nil {
8,369✔
1194
                return err
2✔
1195
        }
2✔
1196
        if err := validateMQTTOptions(o); err != nil {
8,370✔
1197
                return err
5✔
1198
        }
5✔
1199
        if err := validateJetStreamOptions(o); err != nil {
8,372✔
1200
                return err
12✔
1201
        }
12✔
1202
        // Finally check websocket options.
1203
        return validateWebsocketOptions(o)
8,348✔
1204
}
1205

1206
func (s *Server) getOpts() *Options {
3,078,140✔
1207
        s.optsMu.RLock()
3,078,140✔
1208
        opts := s.opts
3,078,140✔
1209
        s.optsMu.RUnlock()
3,078,140✔
1210
        return opts
3,078,140✔
1211
}
3,078,140✔
1212

1213
func (s *Server) setOpts(opts *Options) {
1,420✔
1214
        s.optsMu.Lock()
1,420✔
1215
        s.opts = opts
1,420✔
1216
        s.optsMu.Unlock()
1,420✔
1217
}
1,420✔
1218

1219
func (s *Server) globalAccount() *Account {
10,990✔
1220
        s.mu.RLock()
10,990✔
1221
        gacc := s.gacc
10,990✔
1222
        s.mu.RUnlock()
10,990✔
1223
        return gacc
10,990✔
1224
}
10,990✔
1225

1226
// Used to setup or update Accounts.
1227
// Returns a map that indicates which accounts have had their stream imports
1228
// changed (in case of an update in configuration reload).
1229
// Lock is held upon entry, but will be released/reacquired in this function.
1230
func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) {
8,334✔
1231
        awcsti := make(map[string]struct{})
8,334✔
1232

8,334✔
1233
        // Create the global account.
8,334✔
1234
        if s.gacc == nil {
15,259✔
1235
                s.gacc = NewAccount(globalAccountName)
6,925✔
1236
                s.registerAccountNoLock(s.gacc)
6,925✔
1237
        }
6,925✔
1238

1239
        opts := s.getOpts()
8,334✔
1240

8,334✔
1241
        // We need to track service imports since we can not swap them out (unsub and re-sub)
8,334✔
1242
        // until the proper server struct accounts have been swapped in properly. Doing it in
8,334✔
1243
        // place could lead to data loss or server panic since account under new si has no real
8,334✔
1244
        // account and hence no sublist, so will panic on inbound message.
8,334✔
1245
        siMap := make(map[*Account][][]byte)
8,334✔
1246

8,334✔
1247
        // Check opts and walk through them. We need to copy them here
8,334✔
1248
        // so that we do not keep a real one sitting in the options.
8,334✔
1249
        for _, acc := range opts.Accounts {
17,388✔
1250
                var a *Account
9,054✔
1251
                create := true
9,054✔
1252
                // For the global account, we want to skip the reload process
9,054✔
1253
                // and fall back into the "create" case which will in that
9,054✔
1254
                // case really be just an update (shallowCopy will make sure
9,054✔
1255
                // that mappings are copied over).
9,054✔
1256
                if reloading && acc.Name != globalAccountName {
12,537✔
1257
                        if ai, ok := s.accounts.Load(acc.Name); ok {
6,966✔
1258
                                a = ai.(*Account)
3,483✔
1259
                                // Before updating the account, check if stream imports have changed.
3,483✔
1260
                                if !a.checkStreamImportsEqual(acc) {
3,490✔
1261
                                        awcsti[acc.Name] = struct{}{}
7✔
1262
                                }
7✔
1263
                                a.mu.Lock()
3,483✔
1264
                                // Collect the sids for the service imports since we are going to
3,483✔
1265
                                // replace with new ones.
3,483✔
1266
                                var sids [][]byte
3,483✔
1267
                                for _, sis := range a.imports.services {
15,284✔
1268
                                        for _, si := range sis {
23,602✔
1269
                                                if si.sid != nil {
23,602✔
1270
                                                        sids = append(sids, si.sid)
11,801✔
1271
                                                }
11,801✔
1272
                                        }
1273
                                }
1274
                                // Setup to process later if needed.
1275
                                if len(sids) > 0 || len(acc.imports.services) > 0 {
6,958✔
1276
                                        siMap[a] = sids
3,475✔
1277
                                }
3,475✔
1278

1279
                                // Now reset all export/imports fields since they are going to be
1280
                                // filled in shallowCopy()
1281
                                a.imports.streams, a.imports.services = nil, nil
3,483✔
1282
                                a.exports.streams, a.exports.services = nil, nil
3,483✔
1283
                                // We call shallowCopy from the account `acc` (the one in Options)
3,483✔
1284
                                // and pass `a` (our existing account) to get it updated.
3,483✔
1285
                                acc.shallowCopy(a)
3,483✔
1286
                                a.mu.Unlock()
3,483✔
1287
                                create = false
3,483✔
1288
                        }
1289
                }
1290
                // Track old mappings if global account.
1291
                var oldGMappings []*mapping
9,054✔
1292
                if create {
14,625✔
1293
                        if acc.Name == globalAccountName {
5,587✔
1294
                                a = s.gacc
16✔
1295
                                a.mu.Lock()
16✔
1296
                                oldGMappings = append(oldGMappings, a.mappings...)
16✔
1297
                                a.mu.Unlock()
16✔
1298
                        } else {
5,571✔
1299
                                a = NewAccount(acc.Name)
5,555✔
1300
                        }
5,555✔
1301
                        // Locking matters in the case of an update of the global account
1302
                        a.mu.Lock()
5,571✔
1303
                        acc.shallowCopy(a)
5,571✔
1304
                        a.mu.Unlock()
5,571✔
1305
                        // Will be a no-op in case of the global account since it is already registered.
5,571✔
1306
                        s.registerAccountNoLock(a)
5,571✔
1307
                }
1308

1309
                // The `acc` account is stored in options, not in the server, and these can be cleared.
1310
                acc.sl, acc.clients, acc.mappings = nil, nil, nil
9,054✔
1311

9,054✔
1312
                // Check here if we have been reloaded and we have a global account with mappings that may have changed.
9,054✔
1313
                // If we have leafnodes they need to be updated.
9,054✔
1314
                if reloading && a == s.gacc {
9,059✔
1315
                        a.mu.Lock()
5✔
1316
                        mappings := make(map[string]*mapping)
5✔
1317
                        if len(a.mappings) > 0 && a.nleafs > 0 {
7✔
1318
                                for _, em := range a.mappings {
4✔
1319
                                        mappings[em.src] = em
2✔
1320
                                }
2✔
1321
                        }
1322
                        a.mu.Unlock()
5✔
1323
                        if len(mappings) > 0 || len(oldGMappings) > 0 {
10✔
1324
                                a.lmu.RLock()
5✔
1325
                                for _, lc := range a.lleafs {
7✔
1326
                                        for _, em := range mappings {
4✔
1327
                                                lc.forceAddToSmap(em.src)
2✔
1328
                                        }
2✔
1329
                                        // Remove any old ones if needed.
1330
                                        for _, em := range oldGMappings {
4✔
1331
                                                // Only remove if not in the new ones.
2✔
1332
                                                if _, ok := mappings[em.src]; !ok {
3✔
1333
                                                        lc.forceRemoveFromSmap(em.src)
1✔
1334
                                                }
1✔
1335
                                        }
1336
                                }
1337
                                a.lmu.RUnlock()
5✔
1338
                        }
1339
                }
1340

1341
                // If we see an account defined using $SYS we will make sure that is set as system account.
1342
                if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ {
12,627✔
1343
                        opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT
3,573✔
1344
                }
3,573✔
1345
        }
1346

1347
        // Now that we have this we need to remap any referenced accounts in
1348
        // import or export maps to the new ones.
1349
        swapApproved := func(ea *exportAuth) {
11,144✔
1350
                for sub, a := range ea.approved {
2,860✔
1351
                        var acc *Account
50✔
1352
                        if v, ok := s.accounts.Load(a.Name); ok {
100✔
1353
                                acc = v.(*Account)
50✔
1354
                        }
50✔
1355
                        ea.approved[sub] = acc
50✔
1356
                }
1357
        }
1358
        var numAccounts int
8,334✔
1359
        s.accounts.Range(func(k, v any) bool {
26,169✔
1360
                numAccounts++
17,835✔
1361
                acc := v.(*Account)
17,835✔
1362
                acc.mu.Lock()
17,835✔
1363
                // Exports
17,835✔
1364
                for _, se := range acc.exports.streams {
17,976✔
1365
                        if se != nil {
154✔
1366
                                swapApproved(&se.exportAuth)
13✔
1367
                        }
13✔
1368
                }
1369
                for _, se := range acc.exports.services {
20,632✔
1370
                        if se != nil {
5,594✔
1371
                                // Swap over the bound account for service exports.
2,797✔
1372
                                if se.acc != nil {
5,594✔
1373
                                        if v, ok := s.accounts.Load(se.acc.Name); ok {
5,594✔
1374
                                                se.acc = v.(*Account)
2,797✔
1375
                                        }
2,797✔
1376
                                }
1377
                                swapApproved(&se.exportAuth)
2,797✔
1378
                        }
1379
                }
1380
                // Imports
1381
                for _, si := range acc.imports.streams {
18,037✔
1382
                        if v, ok := s.accounts.Load(si.acc.Name); ok {
404✔
1383
                                si.acc = v.(*Account)
202✔
1384
                        }
202✔
1385
                }
1386
                for _, sis := range acc.imports.services {
25,159✔
1387
                        for _, si := range sis {
14,650✔
1388
                                if v, ok := s.accounts.Load(si.acc.Name); ok {
14,652✔
1389
                                        si.acc = v.(*Account)
7,326✔
1390

7,326✔
1391
                                        // It is possible to allow for latency tracking inside your
7,326✔
1392
                                        // own account, so lock only when not the same account.
7,326✔
1393
                                        if si.acc == acc {
7,637✔
1394
                                                si.se = si.acc.getServiceExport(si.to)
311✔
1395
                                                continue
311✔
1396
                                        }
1397
                                        si.acc.mu.RLock()
7,015✔
1398
                                        si.se = si.acc.getServiceExport(si.to)
7,015✔
1399
                                        si.acc.mu.RUnlock()
7,015✔
1400
                                }
1401
                        }
1402
                }
1403
                // Make sure the subs are running, but only if not reloading.
1404
                if len(acc.imports.services) > 0 && acc.ic == nil && !reloading {
18,129✔
1405
                        acc.ic = s.createInternalAccountClient()
294✔
1406
                        acc.ic.acc = acc
294✔
1407
                        // Need to release locks to invoke this function.
294✔
1408
                        acc.mu.Unlock()
294✔
1409
                        s.mu.Unlock()
294✔
1410
                        acc.addAllServiceImportSubs()
294✔
1411
                        s.mu.Lock()
294✔
1412
                        acc.mu.Lock()
294✔
1413
                }
294✔
1414
                acc.updated = time.Now()
17,835✔
1415
                acc.mu.Unlock()
17,835✔
1416
                return true
17,835✔
1417
        })
1418

1419
        // Check if we need to process service imports pending from above.
1420
        // This processing needs to be after we swap in the real accounts above.
1421
        for acc, sids := range siMap {
11,809✔
1422
                c := acc.ic
3,475✔
1423
                for _, sid := range sids {
15,276✔
1424
                        c.processUnsub(sid)
11,801✔
1425
                }
11,801✔
1426
                acc.addAllServiceImportSubs()
3,475✔
1427
                s.mu.Unlock()
3,475✔
1428
                s.registerSystemImports(acc)
3,475✔
1429
                s.mu.Lock()
3,475✔
1430
        }
1431

1432
        // Set the system account if it was configured.
1433
        // Otherwise create a default one.
1434
        if opts.SystemAccount != _EMPTY_ {
13,106✔
1435
                // Lock may be acquired in lookupAccount, so release to call lookupAccount.
4,772✔
1436
                s.mu.Unlock()
4,772✔
1437
                acc, err := s.lookupAccount(opts.SystemAccount)
4,772✔
1438
                s.mu.Lock()
4,772✔
1439
                if err == nil && s.sys != nil && acc != s.sys.account {
4,772✔
1440
                        // sys.account.clients (including internal client)/respmap/etc... are transferred separately
×
1441
                        s.sys.account = acc
×
1442
                        s.sysAcc.Store(acc)
×
1443
                }
×
1444
                if err != nil {
4,772✔
1445
                        return awcsti, fmt.Errorf("error resolving system account: %v", err)
×
1446
                }
×
1447

1448
                // If we have defined a system account here check to see if its just us and the $G account.
1449
                // We would do this to add user/pass to the system account. If this is the case add in
1450
                // no-auth-user for $G.
1451
                // Only do this if non-operator mode and we did not have an authorization block defined.
1452
                if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
7,554✔
1453
                        // If we come here from config reload, let's not recreate the fake user name otherwise
2,782✔
1454
                        // it will cause currently clients to be disconnected.
2,782✔
1455
                        uname := s.sysAccOnlyNoAuthUser
2,782✔
1456
                        if uname == _EMPTY_ {
5,557✔
1457
                                // Create a unique name so we do not collide.
2,775✔
1458
                                var b [8]byte
2,775✔
1459
                                rn := rand.Int63()
2,775✔
1460
                                for i, l := 0, rn; i < len(b); i++ {
24,975✔
1461
                                        b[i] = digits[l%base]
22,200✔
1462
                                        l /= base
22,200✔
1463
                                }
22,200✔
1464
                                uname = fmt.Sprintf("nats-%s", b[:])
2,775✔
1465
                                s.sysAccOnlyNoAuthUser = uname
2,775✔
1466
                        }
1467
                        opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc})
2,782✔
1468
                        opts.NoAuthUser = uname
2,782✔
1469
                }
1470
        }
1471

1472
        // Add any required exports from system account.
1473
        if s.sys != nil {
9,718✔
1474
                sysAcc := s.sys.account
1,384✔
1475
                s.mu.Unlock()
1,384✔
1476
                s.addSystemAccountExports(sysAcc)
1,384✔
1477
                s.mu.Lock()
1,384✔
1478
        }
1,384✔
1479

1480
        return awcsti, nil
8,334✔
1481
}
1482

1483
// Setup the account resolver. For memory resolver, make sure the JWTs are
1484
// properly formed but do not enforce expiration etc.
1485
// Lock is held on entry, but may be released/reacquired during this call.
1486
func (s *Server) configureResolver() error {
6,938✔
1487
        opts := s.getOpts()
6,938✔
1488
        s.accResolver = opts.AccountResolver
6,938✔
1489
        if opts.AccountResolver != nil {
7,311✔
1490
                // For URL resolver, set the TLSConfig if specified.
373✔
1491
                if opts.AccountResolverTLSConfig != nil {
377✔
1492
                        if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
8✔
1493
                                if t, ok := ar.c.Transport.(*http.Transport); ok {
8✔
1494
                                        t.CloseIdleConnections()
4✔
1495
                                        t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
4✔
1496
                                }
4✔
1497
                        }
1498
                }
1499
                if len(opts.resolverPreloads) > 0 {
544✔
1500
                        // Lock ordering is account resolver -> server, so we need to release
171✔
1501
                        // the lock and reacquire it when done with account resolver's calls.
171✔
1502
                        ar := s.accResolver
171✔
1503
                        s.mu.Unlock()
171✔
1504
                        defer s.mu.Lock()
171✔
1505
                        if ar.IsReadOnly() {
171✔
1506
                                return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
×
1507
                        }
×
1508
                        for k, v := range opts.resolverPreloads {
1,087✔
1509
                                _, err := jwt.DecodeAccountClaims(v)
916✔
1510
                                if err != nil {
916✔
1511
                                        return fmt.Errorf("preload account error for %q: %v", k, err)
×
1512
                                }
×
1513
                                ar.Store(k, v)
916✔
1514
                        }
1515
                }
1516
        }
1517
        return nil
6,938✔
1518
}
1519

1520
// This will check preloads for validation issues.
1521
func (s *Server) checkResolvePreloads() {
171✔
1522
        opts := s.getOpts()
171✔
1523
        // We can just check the read-only opts versions here, that way we do not need
171✔
1524
        // to grab server lock or access s.accResolver.
171✔
1525
        for k, v := range opts.resolverPreloads {
1,086✔
1526
                claims, err := jwt.DecodeAccountClaims(v)
915✔
1527
                if err != nil {
915✔
1528
                        s.Errorf("Preloaded account [%s] not valid", k)
×
1529
                        continue
×
1530
                }
1531
                // Check if it is expired.
1532
                vr := jwt.CreateValidationResults()
915✔
1533
                claims.Validate(vr)
915✔
1534
                if vr.IsBlocking(true) {
916✔
1535
                        s.Warnf("Account [%s] has validation issues:", k)
1✔
1536
                        for _, v := range vr.Issues {
2✔
1537
                                s.Warnf("  - %s", v.Description)
1✔
1538
                        }
1✔
1539
                }
1540
        }
1541
}
1542

1543
// Determines if we are in pre NATS 2.0 setup with no accounts.
1544
func (s *Server) globalAccountOnly() bool {
4,192✔
1545
        var hasOthers bool
4,192✔
1546

4,192✔
1547
        if s.trustedKeys != nil {
4,331✔
1548
                return false
139✔
1549
        }
139✔
1550

1551
        s.mu.RLock()
4,053✔
1552
        s.accounts.Range(func(k, v any) bool {
12,028✔
1553
                acc := v.(*Account)
7,975✔
1554
                // Ignore global and system
7,975✔
1555
                if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
15,472✔
1556
                        return true
7,497✔
1557
                }
7,497✔
1558
                hasOthers = true
478✔
1559
                return false
478✔
1560
        })
1561
        s.mu.RUnlock()
4,053✔
1562

4,053✔
1563
        return !hasOthers
4,053✔
1564
}
1565

1566
// Determines if this server is in standalone mode, meaning no routes or gateways.
1567
func (s *Server) standAloneMode() bool {
26,562✔
1568
        opts := s.getOpts()
26,562✔
1569
        return opts.Cluster.Port == 0 && opts.Gateway.Port == 0
26,562✔
1570
}
26,562✔
1571

1572
func (s *Server) configuredRoutes() int {
3,134✔
1573
        return len(s.getOpts().Routes)
3,134✔
1574
}
3,134✔
1575

1576
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
1577
func (s *Server) ActivePeers() (peers []string) {
3,773✔
1578
        s.nodeToInfo.Range(func(k, v any) bool {
9,898✔
1579
                si := v.(nodeInfo)
6,125✔
1580
                if !si.offline {
12,250✔
1581
                        peers = append(peers, k.(string))
6,125✔
1582
                }
6,125✔
1583
                return true
6,125✔
1584
        })
1585
        return peers
3,773✔
1586
}
1587

1588
// isTrustedIssuer will check that the issuer is a trusted public key.
1589
// This is used to make sure an account was signed by a trusted operator.
1590
func (s *Server) isTrustedIssuer(issuer string) bool {
10,540✔
1591
        s.mu.RLock()
10,540✔
1592
        defer s.mu.RUnlock()
10,540✔
1593
        // If we are not running in trusted mode and there is no issuer, that is ok.
10,540✔
1594
        if s.trustedKeys == nil && issuer == _EMPTY_ {
16,235✔
1595
                return true
5,695✔
1596
        }
5,695✔
1597
        for _, tk := range s.trustedKeys {
9,900✔
1598
                if tk == issuer {
9,822✔
1599
                        return true
4,767✔
1600
                }
4,767✔
1601
        }
1602
        return false
78✔
1603
}
1604

1605
// processTrustedKeys will process binary stamped and
1606
// options-based trusted nkeys. Returns success.
1607
func (s *Server) processTrustedKeys() bool {
6,928✔
1608
        s.strictSigningKeyUsage = map[string]struct{}{}
6,928✔
1609
        opts := s.getOpts()
6,928✔
1610
        if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() {
6,928✔
1611
                return false
×
1612
        } else if opts.TrustedKeys != nil {
7,337✔
1613
                for _, key := range opts.TrustedKeys {
1,695✔
1614
                        if !nkeys.IsValidPublicOperatorKey(key) {
1,286✔
1615
                                return false
×
1616
                        }
×
1617
                }
1618
                s.trustedKeys = append([]string(nil), opts.TrustedKeys...)
409✔
1619
                for _, claim := range opts.TrustedOperators {
712✔
1620
                        if !claim.StrictSigningKeyUsage {
604✔
1621
                                continue
301✔
1622
                        }
1623
                        for _, key := range claim.SigningKeys {
4✔
1624
                                s.strictSigningKeyUsage[key] = struct{}{}
2✔
1625
                        }
2✔
1626
                }
1627
        }
1628
        return true
6,928✔
1629
}
1630

1631
// checkTrustedKeyString will check that the string is a valid array
1632
// of public operator nkeys.
1633
func checkTrustedKeyString(keys string) []string {
×
1634
        tks := strings.Fields(keys)
×
1635
        if len(tks) == 0 {
×
1636
                return nil
×
1637
        }
×
1638
        // Walk all the keys and make sure they are valid.
1639
        for _, key := range tks {
×
1640
                if !nkeys.IsValidPublicOperatorKey(key) {
×
1641
                        return nil
×
1642
                }
×
1643
        }
1644
        return tks
×
1645
}
1646

1647
// initStampedTrustedKeys will check the stamped trusted keys
1648
// and will set the server field 'trustedKeys'. Returns whether
1649
// it succeeded or not.
1650
func (s *Server) initStampedTrustedKeys() bool {
×
1651
        // Check to see if we have an override in options, which will cause us to fail.
×
1652
        if len(s.getOpts().TrustedKeys) > 0 {
×
1653
                return false
×
1654
        }
×
1655
        tks := checkTrustedKeyString(trustedKeys)
×
1656
        if len(tks) == 0 {
×
1657
                return false
×
1658
        }
×
1659
        s.trustedKeys = tks
×
1660
        return true
×
1661
}
1662

1663
// PrintAndDie is exported for access in other packages.
1664
func PrintAndDie(msg string) {
×
1665
        fmt.Fprintln(os.Stderr, msg)
×
1666
        os.Exit(1)
×
1667
}
×
1668

1669
// PrintServerAndExit will print our version and exit.
1670
func PrintServerAndExit() {
×
1671
        fmt.Printf("nats-server: v%s\n", VERSION)
×
1672
        os.Exit(0)
×
1673
}
×
1674

1675
// ProcessCommandLineArgs takes the command line arguments
1676
// validating and setting flags for handling in case any
1677
// sub command was present.
1678
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
82✔
1679
        if len(cmd.Args()) > 0 {
85✔
1680
                arg := cmd.Args()[0]
3✔
1681
                switch strings.ToLower(arg) {
3✔
1682
                case "version":
1✔
1683
                        return true, false, nil
1✔
1684
                case "help":
1✔
1685
                        return false, true, nil
1✔
1686
                default:
1✔
1687
                        return false, false, fmt.Errorf("unrecognized command: %q", arg)
1✔
1688
                }
1689
        }
1690

1691
        return false, false, nil
79✔
1692
}
1693

1694
// Public version.
1695
func (s *Server) Running() bool {
2,275✔
1696
        return s.isRunning()
2,275✔
1697
}
2,275✔
1698

1699
// Protected check on running state
1700
func (s *Server) isRunning() bool {
322,575,078✔
1701
        return s.running.Load()
322,575,078✔
1702
}
322,575,078✔
1703

1704
func (s *Server) logPid() error {
4✔
1705
        pidStr := strconv.Itoa(os.Getpid())
4✔
1706
        return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
4✔
1707
}
4✔
1708

1709
// numReservedAccounts will return the number of reserved accounts configured in the server.
1710
// Currently this is 1, one for the global default account.
1711
func (s *Server) numReservedAccounts() int {
2✔
1712
        return 1
2✔
1713
}
2✔
1714

1715
// NumActiveAccounts reports number of active accounts on this server.
1716
func (s *Server) NumActiveAccounts() int32 {
×
1717
        return atomic.LoadInt32(&s.activeAccounts)
×
1718
}
×
1719

1720
// incActiveAccounts() just adds one under lock.
1721
func (s *Server) incActiveAccounts() {
21,669✔
1722
        atomic.AddInt32(&s.activeAccounts, 1)
21,669✔
1723
}
21,669✔
1724

1725
// decActiveAccounts() just subtracts one under lock.
1726
func (s *Server) decActiveAccounts() {
11,717✔
1727
        atomic.AddInt32(&s.activeAccounts, -1)
11,717✔
1728
}
11,717✔
1729

1730
// This should be used for testing only. Will be slow since we have to
1731
// range over all accounts in the sync.Map to count.
1732
func (s *Server) numAccounts() int {
7✔
1733
        count := 0
7✔
1734
        s.mu.RLock()
7✔
1735
        s.accounts.Range(func(k, v any) bool {
24✔
1736
                count++
17✔
1737
                return true
17✔
1738
        })
17✔
1739
        s.mu.RUnlock()
7✔
1740
        return count
7✔
1741
}
1742

1743
// NumLoadedAccounts returns the number of loaded accounts.
1744
func (s *Server) NumLoadedAccounts() int {
4✔
1745
        return s.numAccounts()
4✔
1746
}
4✔
1747

1748
// LookupOrRegisterAccount will return the given account if known or create a new entry.
1749
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
2,371✔
1750
        s.mu.Lock()
2,371✔
1751
        defer s.mu.Unlock()
2,371✔
1752
        if v, ok := s.accounts.Load(name); ok {
2,371✔
1753
                return v.(*Account), false
×
1754
        }
×
1755
        acc := NewAccount(name)
2,371✔
1756
        s.registerAccountNoLock(acc)
2,371✔
1757
        return acc, true
2,371✔
1758
}
1759

1760
// RegisterAccount will register an account. The account must be new
1761
// or this call will fail.
1762
func (s *Server) RegisterAccount(name string) (*Account, error) {
849✔
1763
        s.mu.Lock()
849✔
1764
        defer s.mu.Unlock()
849✔
1765
        if _, ok := s.accounts.Load(name); ok {
978✔
1766
                return nil, ErrAccountExists
129✔
1767
        }
129✔
1768
        acc := NewAccount(name)
720✔
1769
        s.registerAccountNoLock(acc)
720✔
1770
        return acc, nil
720✔
1771
}
1772

1773
// SetSystemAccount will set the internal system account.
1774
// If root operators are present it will also check validity.
1775
func (s *Server) SetSystemAccount(accName string) error {
6,064✔
1776
        // Lookup from sync.Map first.
6,064✔
1777
        if v, ok := s.accounts.Load(accName); ok {
12,126✔
1778
                return s.setSystemAccount(v.(*Account))
6,062✔
1779
        }
6,062✔
1780

1781
        // If we are here we do not have local knowledge of this account.
1782
        // Do this one by hand to return more useful error.
1783
        ac, jwt, err := s.fetchAccountClaims(accName)
2✔
1784
        if err != nil {
3✔
1785
                return err
1✔
1786
        }
1✔
1787
        acc := s.buildInternalAccount(ac)
1✔
1788
        acc.claimJWT = jwt
1✔
1789
        // Due to race, we need to make sure that we are not
1✔
1790
        // registering twice.
1✔
1791
        if racc := s.registerAccount(acc); racc != nil {
1✔
1792
                return nil
×
1793
        }
×
1794
        return s.setSystemAccount(acc)
1✔
1795
}
1796

1797
// SystemAccount returns the system account if set.
1798
func (s *Server) SystemAccount() *Account {
458,188✔
1799
        return s.sysAcc.Load()
458,188✔
1800
}
458,188✔
1801

1802
// GlobalAccount returns the global account.
1803
// Default clients will use the global account.
1804
func (s *Server) GlobalAccount() *Account {
7,422✔
1805
        s.mu.RLock()
7,422✔
1806
        defer s.mu.RUnlock()
7,422✔
1807
        return s.gacc
7,422✔
1808
}
7,422✔
1809

1810
// SetDefaultSystemAccount will create a default system account if one is not present.
1811
func (s *Server) SetDefaultSystemAccount() error {
2,361✔
1812
        if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
2,361✔
1813
                return nil
×
1814
        }
×
1815
        s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
2,361✔
1816
        return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
2,361✔
1817
}
1818

1819
// Assign a system account. Should only be called once.
1820
// This sets up a server to send and receive messages from
1821
// inside the server itself.
1822
func (s *Server) setSystemAccount(acc *Account) error {
6,083✔
1823
        if acc == nil {
6,083✔
1824
                return ErrMissingAccount
×
1825
        }
×
1826
        // Don't try to fix this here.
1827
        if acc.IsExpired() {
6,083✔
1828
                return ErrAccountExpired
×
1829
        }
×
1830
        // If we are running with trusted keys for an operator
1831
        // make sure we check the account is legit.
1832
        if !s.isTrustedIssuer(acc.Issuer) {
6,154✔
1833
                return ErrAccountValidation
71✔
1834
        }
71✔
1835

1836
        s.mu.Lock()
6,012✔
1837

6,012✔
1838
        if s.sys != nil {
6,014✔
1839
                s.mu.Unlock()
2✔
1840
                return ErrAccountExists
2✔
1841
        }
2✔
1842

1843
        // This is here in an attempt to quiet the race detector and not have to place
1844
        // locks on fast path for inbound messages and checking service imports.
1845
        acc.mu.Lock()
6,010✔
1846
        if acc.imports.services == nil {
12,020✔
1847
                acc.imports.services = make(map[string][]*serviceImport)
6,010✔
1848
        }
6,010✔
1849
        acc.mu.Unlock()
6,010✔
1850

6,010✔
1851
        s.sys = &internal{
6,010✔
1852
                account: acc,
6,010✔
1853
                client:  s.createInternalSystemClient(),
6,010✔
1854
                seq:     1,
6,010✔
1855
                sid:     1,
6,010✔
1856
                servers: make(map[string]*serverUpdate),
6,010✔
1857
                replies: make(map[string]msgHandler),
6,010✔
1858
                sendq:   newIPQueue[*pubMsg](s, "System sendQ"),
6,010✔
1859
                recvq:   newIPQueue[*inSysMsg](s, "System recvQ"),
6,010✔
1860
                recvqp:  newIPQueue[*inSysMsg](s, "System recvQ Pings"),
6,010✔
1861
                resetCh: make(chan struct{}),
6,010✔
1862
                sq:      s.newSendQ(acc),
6,010✔
1863
                statsz:  statsHBInterval,
6,010✔
1864
                orphMax: 5 * eventsHBInterval,
6,010✔
1865
                chkOrph: 3 * eventsHBInterval,
6,010✔
1866
        }
6,010✔
1867
        recvq, recvqp := s.sys.recvq, s.sys.recvqp
6,010✔
1868
        s.sys.wg.Add(1)
6,010✔
1869
        s.mu.Unlock()
6,010✔
1870

6,010✔
1871
        // Store in atomic for fast lookup.
6,010✔
1872
        s.sysAcc.Store(acc)
6,010✔
1873

6,010✔
1874
        // Register with the account.
6,010✔
1875
        s.sys.client.registerWithAccount(acc)
6,010✔
1876

6,010✔
1877
        s.addSystemAccountExports(acc)
6,010✔
1878

6,010✔
1879
        // Start our internal loop to serialize outbound messages.
6,010✔
1880
        // We do our own wg here since we will stop first during shutdown.
6,010✔
1881
        go s.internalSendLoop(&s.sys.wg)
6,010✔
1882

6,010✔
1883
        // Start the internal loop for inbound messages.
6,010✔
1884
        go s.internalReceiveLoop(recvq)
6,010✔
1885
        // Start the internal loop for inbound STATSZ/Ping messages.
6,010✔
1886
        go s.internalReceiveLoop(recvqp)
6,010✔
1887

6,010✔
1888
        // Start up our general subscriptions
6,010✔
1889
        s.initEventTracking()
6,010✔
1890

6,010✔
1891
        // Track for dead remote servers.
6,010✔
1892
        s.wrapChk(s.startRemoteServerSweepTimer)()
6,010✔
1893

6,010✔
1894
        // Send out statsz updates periodically.
6,010✔
1895
        s.wrapChk(s.startStatszTimer)()
6,010✔
1896

6,010✔
1897
        // If we have existing accounts make sure we enable account tracking.
6,010✔
1898
        s.mu.Lock()
6,010✔
1899
        s.accounts.Range(func(k, v any) bool {
20,083✔
1900
                acc := v.(*Account)
14,073✔
1901
                s.enableAccountTracking(acc)
14,073✔
1902
                return true
14,073✔
1903
        })
14,073✔
1904
        s.mu.Unlock()
6,010✔
1905

6,010✔
1906
        return nil
6,010✔
1907
}
1908

1909
// Creates an internal system client.
1910
func (s *Server) createInternalSystemClient() *client {
29,339✔
1911
        return s.createInternalClient(SYSTEM)
29,339✔
1912
}
29,339✔
1913

1914
// Creates an internal jetstream client.
1915
func (s *Server) createInternalJetStreamClient() *client {
44,482✔
1916
        return s.createInternalClient(JETSTREAM)
44,482✔
1917
}
44,482✔
1918

1919
// Creates an internal client for Account.
1920
func (s *Server) createInternalAccountClient() *client {
15,340✔
1921
        return s.createInternalClient(ACCOUNT)
15,340✔
1922
}
15,340✔
1923

1924
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
1925
func (s *Server) createInternalClient(kind int) *client {
89,161✔
1926
        if !isInternalClient(kind) {
89,161✔
1927
                return nil
×
1928
        }
×
1929
        now := time.Now()
89,161✔
1930
        c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
89,161✔
1931
        c.initClient()
89,161✔
1932
        c.echo = false
89,161✔
1933
        c.headers = true
89,161✔
1934
        c.flags.set(noReconnect)
89,161✔
1935
        return c
89,161✔
1936
}
1937

1938
// Determine if accounts should track subscriptions for
1939
// efficient propagation.
1940
// Lock should be held on entry.
1941
func (s *Server) shouldTrackSubscriptions() bool {
16,918✔
1942
        opts := s.getOpts()
16,918✔
1943
        return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0)
16,918✔
1944
}
16,918✔
1945

1946
// Invokes registerAccountNoLock under the protection of the server lock.
1947
// That is, server lock is acquired/released in this function.
1948
// See registerAccountNoLock for comment on returned value.
1949
func (s *Server) registerAccount(acc *Account) *Account {
1,544✔
1950
        s.mu.Lock()
1,544✔
1951
        racc := s.registerAccountNoLock(acc)
1,544✔
1952
        s.mu.Unlock()
1,544✔
1953
        return racc
1,544✔
1954
}
1,544✔
1955

1956
// Helper to set the sublist based on preferences.
1957
func (s *Server) setAccountSublist(acc *Account) {
18,462✔
1958
        if acc != nil && acc.sl == nil {
35,645✔
1959
                opts := s.getOpts()
17,183✔
1960
                if opts != nil && opts.NoSublistCache {
17,186✔
1961
                        acc.sl = NewSublistNoCache()
3✔
1962
                } else {
17,183✔
1963
                        acc.sl = NewSublistWithCache()
17,180✔
1964
                }
17,180✔
1965
        }
1966
}
1967

1968
// Registers an account in the server.
1969
// Due to some locking considerations, we may end-up trying
1970
// to register the same account twice. This function will
1971
// then return the already registered account.
1972
// Lock should be held on entry.
1973
func (s *Server) registerAccountNoLock(acc *Account) *Account {
17,199✔
1974
        // We are under the server lock. Lookup from map, if present
17,199✔
1975
        // return existing account.
17,199✔
1976
        if a, _ := s.accounts.Load(acc.Name); a != nil {
17,480✔
1977
                s.tmpAccounts.Delete(acc.Name)
281✔
1978
                return a.(*Account)
281✔
1979
        }
281✔
1980
        // Finish account setup and store.
1981
        s.setAccountSublist(acc)
16,918✔
1982

16,918✔
1983
        acc.mu.Lock()
16,918✔
1984
        s.setRouteInfo(acc)
16,918✔
1985
        if acc.clients == nil {
33,828✔
1986
                acc.clients = make(map[*client]struct{})
16,910✔
1987
        }
16,910✔
1988

1989
        // If we are capable of routing we will track subscription
1990
        // information for efficient interest propagation.
1991
        // During config reload, it is possible that account was
1992
        // already created (global account), so use locking and
1993
        // make sure we create only if needed.
1994
        // TODO(dlc)- Double check that we need this for GWs.
1995
        if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
28,016✔
1996
                acc.rm = make(map[string]int32)
11,098✔
1997
                acc.lqws = make(map[string]int32)
11,098✔
1998
        }
11,098✔
1999
        acc.srv = s
16,918✔
2000
        acc.updated = time.Now()
16,918✔
2001
        accName := acc.Name
16,918✔
2002
        jsEnabled := len(acc.jsLimits) > 0
16,918✔
2003
        acc.mu.Unlock()
16,918✔
2004

16,918✔
2005
        if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 {
16,975✔
2006
                if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok {
76✔
2007
                        if jsEnabled {
22✔
2008
                                s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
3✔
2009
                        } else if defDomain != _EMPTY_ {
28✔
2010
                                for src, dest := range generateJSMappingTable(defDomain) {
90✔
2011
                                        // flip src and dest around so the domain is inserted
81✔
2012
                                        s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
81✔
2013
                                        if err := acc.AddMapping(dest, src); err != nil {
81✔
2014
                                                s.Errorf("Error adding JetStream default domain mapping: %v", err)
×
2015
                                        }
×
2016
                                }
2017
                        }
2018
                }
2019
        }
2020

2021
        s.accounts.Store(acc.Name, acc)
16,918✔
2022
        s.tmpAccounts.Delete(acc.Name)
16,918✔
2023
        s.enableAccountTracking(acc)
16,918✔
2024

16,918✔
2025
        // Can not have server lock here.
16,918✔
2026
        s.mu.Unlock()
16,918✔
2027
        s.registerSystemImports(acc)
16,918✔
2028
        // Starting 2.9.0, we are phasing out the optimistic mode, so change
16,918✔
2029
        // the account to interest-only mode (except if instructed not to do
16,918✔
2030
        // it in some tests).
16,918✔
2031
        if s.gateway.enabled && !gwDoNotForceInterestOnlyMode {
19,938✔
2032
                s.switchAccountToInterestMode(acc.GetName())
3,020✔
2033
        }
3,020✔
2034
        s.mu.Lock()
16,918✔
2035

16,918✔
2036
        return nil
16,918✔
2037
}
2038

2039
// Sets the account's routePoolIdx depending on presence or not of
2040
// pooling or per-account routes. Also updates a map used by
2041
// gateway code to retrieve a route based on some route hash.
2042
//
2043
// Both Server and Account lock held on entry.
2044
func (s *Server) setRouteInfo(acc *Account) {
17,021✔
2045
        // If there is a dedicated route configured for this account
17,021✔
2046
        if _, ok := s.accRoutes[acc.Name]; ok {
20,782✔
2047
                // We want the account name to be in the map, but we don't
3,761✔
2048
                // need a value (we could store empty string)
3,761✔
2049
                s.accRouteByHash.Store(acc.Name, nil)
3,761✔
2050
                // Set the route pool index to -1 so that it is easy when
3,761✔
2051
                // ranging over accounts to exclude those accounts when
3,761✔
2052
                // trying to get accounts for a given pool index.
3,761✔
2053
                acc.routePoolIdx = accDedicatedRoute
3,761✔
2054
        } else {
17,021✔
2055
                // If pool size more than 1, we will compute a hash code and
13,260✔
2056
                // use modulo to assign to an index of the pool slice. For 1
13,260✔
2057
                // and below, all accounts will be bound to the single connection
13,260✔
2058
                // at index 0.
13,260✔
2059
                acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
13,260✔
2060
                if s.routesPoolSize > 1 {
18,416✔
2061
                        s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
5,156✔
2062
                }
5,156✔
2063
        }
2064
}
2065

2066
// lookupAccount is a function to return the account structure
2067
// associated with an account name.
2068
// Lock MUST NOT be held upon entry.
2069
func (s *Server) lookupAccount(name string) (*Account, error) {
1,781,448✔
2070
        return s.lookupOrFetchAccount(name, true)
1,781,448✔
2071
}
1,781,448✔
2072

2073
// lookupOrFetchAccount is a function to return the account structure
2074
// associated with an account name.
2075
// Lock MUST NOT be held upon entry.
2076
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
1,979,268✔
2077
        var acc *Account
1,979,268✔
2078
        if v, ok := s.accounts.Load(name); ok {
3,956,024✔
2079
                acc = v.(*Account)
1,976,756✔
2080
        }
1,976,756✔
2081
        if acc != nil {
3,956,024✔
2082
                // If we are expired and we have a resolver, then
1,976,756✔
2083
                // return the latest information from the resolver.
1,976,756✔
2084
                if acc.IsExpired() {
1,976,766✔
2085
                        s.Debugf("Requested account [%s] has expired", name)
10✔
2086
                        if s.AccountResolver() != nil && fetch {
20✔
2087
                                if err := s.updateAccount(acc); err != nil {
12✔
2088
                                        // This error could mask expired, so just return expired here.
2✔
2089
                                        return nil, ErrAccountExpired
2✔
2090
                                }
2✔
2091
                        } else {
×
2092
                                return nil, ErrAccountExpired
×
2093
                        }
×
2094
                }
2095
                return acc, nil
1,976,754✔
2096
        }
2097
        // If we have a resolver see if it can fetch the account.
2098
        if s.AccountResolver() == nil || !fetch {
3,521✔
2099
                return nil, ErrMissingAccount
1,009✔
2100
        }
1,009✔
2101
        return s.fetchAccount(name)
1,503✔
2102
}
2103

2104
// LookupAccount is a public function to return the account structure
2105
// associated with name.
2106
func (s *Server) LookupAccount(name string) (*Account, error) {
1,625,850✔
2107
        return s.lookupAccount(name)
1,625,850✔
2108
}
1,625,850✔
2109

2110
// This will fetch new claims and if found update the account with new claims.
2111
// Lock MUST NOT be held upon entry.
2112
func (s *Server) updateAccount(acc *Account) error {
10✔
2113
        acc.mu.RLock()
10✔
2114
        // TODO(dlc) - Make configurable
10✔
2115
        if !acc.incomplete && time.Since(acc.updated) < time.Second {
10✔
2116
                acc.mu.RUnlock()
×
2117
                s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
×
2118
                return ErrAccountResolverUpdateTooSoon
×
2119
        }
×
2120
        acc.mu.RUnlock()
10✔
2121
        claimJWT, err := s.fetchRawAccountClaims(acc.Name)
10✔
2122
        if err != nil {
12✔
2123
                return err
2✔
2124
        }
2✔
2125
        return s.updateAccountWithClaimJWT(acc, claimJWT)
8✔
2126
}
2127

2128
// updateAccountWithClaimJWT will check and apply the claim update.
2129
// Lock MUST NOT be held upon entry.
2130
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
466✔
2131
        if acc == nil {
466✔
2132
                return ErrMissingAccount
×
2133
        }
×
2134
        acc.mu.RLock()
466✔
2135
        sameClaim := acc.claimJWT != _EMPTY_ && acc.claimJWT == claimJWT && !acc.incomplete
466✔
2136
        acc.mu.RUnlock()
466✔
2137
        if sameClaim {
764✔
2138
                s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
298✔
2139
                return nil
298✔
2140
        }
298✔
2141
        accClaims, _, err := s.verifyAccountClaims(claimJWT)
168✔
2142
        if err == nil && accClaims != nil {
336✔
2143
                acc.mu.Lock()
168✔
2144
                // if an account is updated with a different operator signing key, we want to
168✔
2145
                // show a consistent issuer.
168✔
2146
                acc.Issuer = accClaims.Issuer
168✔
2147
                if acc.Name != accClaims.Subject {
169✔
2148
                        acc.mu.Unlock()
1✔
2149
                        return ErrAccountValidation
1✔
2150
                }
1✔
2151
                acc.mu.Unlock()
167✔
2152
                s.UpdateAccountClaims(acc, accClaims)
167✔
2153
                acc.mu.Lock()
167✔
2154
                // needs to be set after update completed.
167✔
2155
                // This causes concurrent calls to return with sameClaim=true if the change is effective.
167✔
2156
                acc.claimJWT = claimJWT
167✔
2157
                acc.mu.Unlock()
167✔
2158
                return nil
167✔
2159
        }
2160
        return err
×
2161
}
2162

2163
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
2164
// Lock is NOT held upon entry.
2165
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
1,635✔
2166
        accResolver := s.AccountResolver()
1,635✔
2167
        if accResolver == nil {
1,635✔
2168
                return _EMPTY_, ErrNoAccountResolver
×
2169
        }
×
2170
        // Need to do actual Fetch
2171
        start := time.Now()
1,635✔
2172
        claimJWT, err := fetchAccount(accResolver, name)
1,635✔
2173
        fetchTime := time.Since(start)
1,635✔
2174
        if fetchTime > time.Second {
1,639✔
2175
                s.Warnf("Account [%s] fetch took %v", name, fetchTime)
4✔
2176
        } else {
1,635✔
2177
                s.Debugf("Account [%s] fetch took %v", name, fetchTime)
1,631✔
2178
        }
1,631✔
2179
        if err != nil {
1,686✔
2180
                s.Warnf("Account fetch failed: %v", err)
51✔
2181
                return "", err
51✔
2182
        }
51✔
2183
        return claimJWT, nil
1,584✔
2184
}
2185

2186
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
2187
// Lock is NOT held upon entry.
2188
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
1,625✔
2189
        claimJWT, err := s.fetchRawAccountClaims(name)
1,625✔
2190
        if err != nil {
1,674✔
2191
                return nil, _EMPTY_, err
49✔
2192
        }
49✔
2193
        var claim *jwt.AccountClaims
1,576✔
2194
        claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
1,576✔
2195
        if claim != nil && claim.Subject != name {
1,576✔
2196
                return nil, _EMPTY_, ErrAccountValidation
×
2197
        }
×
2198
        return claim, claimJWT, err
1,576✔
2199
}
2200

2201
// verifyAccountClaims will decode and validate any account claims.
2202
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
2,016✔
2203
        accClaims, err := jwt.DecodeAccountClaims(claimJWT)
2,016✔
2204
        if err != nil {
2,017✔
2205
                return nil, _EMPTY_, err
1✔
2206
        }
1✔
2207
        if !s.isTrustedIssuer(accClaims.Issuer) {
2,022✔
2208
                return nil, _EMPTY_, ErrAccountValidation
7✔
2209
        }
7✔
2210
        vr := jwt.CreateValidationResults()
2,008✔
2211
        accClaims.Validate(vr)
2,008✔
2212
        if vr.IsBlocking(true) {
2,014✔
2213
                return nil, _EMPTY_, ErrAccountValidation
6✔
2214
        }
6✔
2215
        return accClaims, claimJWT, nil
2,002✔
2216
}
2217

2218
// This will fetch an account from a resolver if defined.
2219
// Lock is NOT held upon entry.
2220
func (s *Server) fetchAccount(name string) (*Account, error) {
1,603✔
2221
        accClaims, claimJWT, err := s.fetchAccountClaims(name)
1,603✔
2222
        if accClaims == nil {
1,663✔
2223
                return nil, err
60✔
2224
        }
60✔
2225
        acc := s.buildInternalAccount(accClaims)
1,543✔
2226
        // Due to possible race, if registerAccount() returns a non
1,543✔
2227
        // nil account, it means the same account was already
1,543✔
2228
        // registered and we should use this one.
1,543✔
2229
        if racc := s.registerAccount(acc); racc != nil {
1,808✔
2230
                // Update with the new claims in case they are new.
265✔
2231
                if err = s.updateAccountWithClaimJWT(racc, claimJWT); err != nil {
265✔
2232
                        return nil, err
×
2233
                }
×
2234
                return racc, nil
265✔
2235
        }
2236
        // The sub imports may have been setup but will not have had their
2237
        // subscriptions properly setup. Do that here.
2238
        var needImportSubs bool
1,278✔
2239

1,278✔
2240
        acc.mu.Lock()
1,278✔
2241
        acc.claimJWT = claimJWT
1,278✔
2242
        if len(acc.imports.services) > 0 {
2,197✔
2243
                if acc.ic == nil {
1,648✔
2244
                        acc.ic = s.createInternalAccountClient()
729✔
2245
                        acc.ic.acc = acc
729✔
2246
                }
729✔
2247
                needImportSubs = true
919✔
2248
        }
2249
        acc.mu.Unlock()
1,278✔
2250

1,278✔
2251
        // Do these outside the lock.
1,278✔
2252
        if needImportSubs {
2,197✔
2253
                acc.addAllServiceImportSubs()
919✔
2254
        }
919✔
2255

2256
        return acc, nil
1,278✔
2257
}
2258

2259
// Start up the server, this will not block.
2260
//
2261
// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed
2262
// after calling s.Shutdown()
2263
func (s *Server) Start() {
6,715✔
2264
        s.Noticef("Starting nats-server")
6,715✔
2265

6,715✔
2266
        gc := gitCommit
6,715✔
2267
        if gc == _EMPTY_ {
13,430✔
2268
                gc = "not set"
6,715✔
2269
        }
6,715✔
2270

2271
        // Snapshot server options.
2272
        opts := s.getOpts()
6,715✔
2273

6,715✔
2274
        // Capture if this server is a leaf that has no cluster, so we don't
6,715✔
2275
        // display the cluster name if that is the case.
6,715✔
2276
        s.mu.RLock()
6,715✔
2277
        leafNoCluster := s.leafNoCluster
6,715✔
2278
        s.mu.RUnlock()
6,715✔
2279

6,715✔
2280
        var clusterName string
6,715✔
2281
        if !leafNoCluster {
12,499✔
2282
                clusterName = s.ClusterName()
5,784✔
2283
        }
5,784✔
2284

2285
        s.Noticef("  Version:  %s", VERSION)
6,715✔
2286
        s.Noticef("  Git:      [%s]", gc)
6,715✔
2287
        s.Debugf("  Go build: %s", s.info.GoVersion)
6,715✔
2288
        if clusterName != _EMPTY_ {
11,465✔
2289
                s.Noticef("  Cluster:  %s", clusterName)
4,750✔
2290
        }
4,750✔
2291
        s.Noticef("  Name:     %s", s.info.Name)
6,715✔
2292
        if opts.JetStream {
10,824✔
2293
                s.Noticef("  Node:     %s", getHash(s.info.Name))
4,109✔
2294
        }
4,109✔
2295
        s.Noticef("  ID:       %s", s.info.ID)
6,715✔
2296

6,715✔
2297
        defer s.Noticef("Server is ready")
6,715✔
2298

6,715✔
2299
        // Check for insecure configurations.
6,715✔
2300
        s.checkAuthforWarnings()
6,715✔
2301

6,715✔
2302
        // Avoid RACE between Start() and Shutdown()
6,715✔
2303
        s.running.Store(true)
6,715✔
2304
        s.mu.Lock()
6,715✔
2305
        // Update leafNodeEnabled in case options have changed post NewServer()
6,715✔
2306
        // and before Start() (we should not be able to allow that, but server has
6,715✔
2307
        // direct reference to user-provided options - at least before a Reload() is
6,715✔
2308
        // performed.
6,715✔
2309
        s.leafNodeEnabled = opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0
6,715✔
2310
        s.mu.Unlock()
6,715✔
2311

6,715✔
2312
        s.grMu.Lock()
6,715✔
2313
        s.grRunning = true
6,715✔
2314
        s.grMu.Unlock()
6,715✔
2315

6,715✔
2316
        s.startRateLimitLogExpiration()
6,715✔
2317

6,715✔
2318
        // Pprof http endpoint for the profiler.
6,715✔
2319
        if opts.ProfPort != 0 {
6,716✔
2320
                s.StartProfiler()
1✔
2321
        } else {
6,715✔
2322
                // It's still possible to access this profile via a SYS endpoint, so set
6,714✔
2323
                // this anyway. (Otherwise StartProfiler would have called it.)
6,714✔
2324
                s.setBlockProfileRate(opts.ProfBlockRate)
6,714✔
2325
        }
6,714✔
2326

2327
        if opts.ConfigFile != _EMPTY_ {
11,583✔
2328
                var cd string
4,868✔
2329
                if opts.configDigest != "" {
9,736✔
2330
                        cd = fmt.Sprintf("(%s)", opts.configDigest)
4,868✔
2331
                }
4,868✔
2332
                s.Noticef("Using configuration file: %s %s", opts.ConfigFile, cd)
4,868✔
2333
        }
2334

2335
        hasOperators := len(opts.TrustedOperators) > 0
6,715✔
2336
        if hasOperators {
7,012✔
2337
                s.Noticef("Trusted Operators")
297✔
2338
        }
297✔
2339
        for _, opc := range opts.TrustedOperators {
7,012✔
2340
                s.Noticef("  System  : %q", opc.Audience)
297✔
2341
                s.Noticef("  Operator: %q", opc.Name)
297✔
2342
                s.Noticef("  Issued  : %v", time.Unix(opc.IssuedAt, 0))
297✔
2343
                switch opc.Expires {
297✔
2344
                case 0:
11✔
2345
                        s.Noticef("  Expires : Never")
11✔
2346
                default:
286✔
2347
                        s.Noticef("  Expires : %v", time.Unix(opc.Expires, 0))
286✔
2348
                }
2349
        }
2350
        if hasOperators && opts.SystemAccount == _EMPTY_ {
6,753✔
2351
                s.Warnf("Trusted Operators should utilize a System Account")
38✔
2352
        }
38✔
2353
        if opts.MaxPayload > MAX_PAYLOAD_MAX_SIZE {
6,717✔
2354
                s.Warnf("Maximum payloads over %v are generally discouraged and could lead to poor performance",
2✔
2355
                        friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE)))
2✔
2356
        }
2✔
2357

2358
        if len(opts.JsAccDefaultDomain) > 0 {
6,734✔
2359
                s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed")
19✔
2360
        }
19✔
2361

2362
        // If we have a memory resolver, check the accounts here for validation exceptions.
2363
        // This allows them to be logged right away vs when they are accessed via a client.
2364
        if hasOperators && len(opts.resolverPreloads) > 0 {
6,877✔
2365
                s.checkResolvePreloads()
162✔
2366
        }
162✔
2367

2368
        // Log the pid to a file.
2369
        if opts.PidFile != _EMPTY_ {
6,717✔
2370
                if err := s.logPid(); err != nil {
2✔
2371
                        s.Fatalf("Could not write pidfile: %v", err)
×
2372
                        return
×
2373
                }
×
2374
        }
2375

2376
        // Setup system account which will start the eventing stack.
2377
        if sa := opts.SystemAccount; sa != _EMPTY_ {
10,412✔
2378
                if err := s.SetSystemAccount(sa); err != nil {
3,697✔
2379
                        s.Fatalf("Can't set system account: %v", err)
×
2380
                        return
×
2381
                }
×
2382
        } else if !opts.NoSystemAccount {
5,378✔
2383
                // We will create a default system account here.
2,360✔
2384
                s.SetDefaultSystemAccount()
2,360✔
2385
        }
2,360✔
2386

2387
        // Start monitoring before enabling other subsystems of the
2388
        // server to be able to monitor during startup.
2389
        if err := s.StartMonitoring(); err != nil {
6,716✔
2390
                s.Fatalf("Can't start monitoring: %v", err)
1✔
2391
                return
1✔
2392
        }
1✔
2393

2394
        // Start up resolver machinery.
2395
        if ar := s.AccountResolver(); ar != nil {
7,069✔
2396
                if err := ar.Start(s); err != nil {
355✔
2397
                        s.Fatalf("Could not start resolver: %v", err)
×
2398
                        return
×
2399
                }
×
2400
                // In operator mode, when the account resolver depends on an external system and
2401
                // the system account is the bootstrapping account, start fetching it.
2402
                if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
614✔
2403
                        opts := s.getOpts()
259✔
2404
                        _, isMemResolver := ar.(*MemAccResolver)
259✔
2405
                        if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
327✔
2406
                                s.Noticef("Using bootstrapping system account")
68✔
2407
                                s.startGoRoutine(func() {
136✔
2408
                                        defer s.grWG.Done()
68✔
2409
                                        t := time.NewTicker(time.Second)
68✔
2410
                                        defer t.Stop()
68✔
2411
                                        for {
173✔
2412
                                                select {
105✔
2413
                                                case <-s.quitCh:
45✔
2414
                                                        return
45✔
2415
                                                case <-t.C:
60✔
2416
                                                        sacc := s.SystemAccount()
60✔
2417
                                                        if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil {
97✔
2418
                                                                continue
37✔
2419
                                                        } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil {
23✔
2420
                                                                continue
×
2421
                                                        }
2422
                                                        s.Noticef("System account fetched and updated")
23✔
2423
                                                        return
23✔
2424
                                                }
2425
                                        }
2426
                                })
2427
                        }
2428
                }
2429
        }
2430

2431
        // Start expiration of mapped GW replies, regardless if
2432
        // this server is configured with gateway or not.
2433
        s.startGWReplyMapExpiration()
6,714✔
2434

6,714✔
2435
        // Check if JetStream has been enabled. This needs to be after
6,714✔
2436
        // the system account setup above. JetStream will create its
6,714✔
2437
        // own system account if one is not present.
6,714✔
2438
        if opts.JetStream {
10,823✔
2439
                // Make sure someone is not trying to enable on the system account.
4,109✔
2440
                if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 {
4,109✔
2441
                        s.Fatalf("Not allowed to enable JetStream on the system account")
×
2442
                }
×
2443
                cfg := &JetStreamConfig{
4,109✔
2444
                        StoreDir:     opts.StoreDir,
4,109✔
2445
                        SyncInterval: opts.SyncInterval,
4,109✔
2446
                        SyncAlways:   opts.SyncAlways,
4,109✔
2447
                        Strict:       !opts.NoJetStreamStrict,
4,109✔
2448
                        MaxMemory:    opts.JetStreamMaxMemory,
4,109✔
2449
                        MaxStore:     opts.JetStreamMaxStore,
4,109✔
2450
                        Domain:       opts.JetStreamDomain,
4,109✔
2451
                        CompressOK:   true,
4,109✔
2452
                        UniqueTag:    opts.JetStreamUniqueTag,
4,109✔
2453
                }
4,109✔
2454
                if err := s.EnableJetStream(cfg); err != nil {
4,110✔
2455
                        s.Fatalf("Can't start JetStream: %v", err)
1✔
2456
                        return
1✔
2457
                }
1✔
2458
        } else {
2,605✔
2459
                // Check to see if any configured accounts have JetStream enabled.
2,605✔
2460
                sa, ga := s.SystemAccount(), s.GlobalAccount()
2,605✔
2461
                var hasSys, hasGlobal bool
2,605✔
2462
                var total int
2,605✔
2463

2,605✔
2464
                s.accounts.Range(func(k, v any) bool {
8,505✔
2465
                        total++
5,900✔
2466
                        acc := v.(*Account)
5,900✔
2467
                        if acc == sa {
7,776✔
2468
                                hasSys = true
1,876✔
2469
                        } else if acc == ga {
8,505✔
2470
                                hasGlobal = true
2,605✔
2471
                        }
2,605✔
2472
                        acc.mu.RLock()
5,900✔
2473
                        hasJs := len(acc.jsLimits) > 0
5,900✔
2474
                        acc.mu.RUnlock()
5,900✔
2475
                        if hasJs {
5,925✔
2476
                                s.checkJetStreamExports()
25✔
2477
                                acc.enableAllJetStreamServiceImportsAndMappings()
25✔
2478
                        }
25✔
2479
                        return true
5,900✔
2480
                })
2481
                // If we only have the system account and the global account and we are not standalone,
2482
                // go ahead and enable JS on $G in case we are in simple mixed mode setup.
2483
                if total == 2 && hasSys && hasGlobal && !s.standAloneMode() {
3,219✔
2484
                        ga.mu.Lock()
614✔
2485
                        ga.jsLimits = map[string]JetStreamAccountLimits{
614✔
2486
                                _EMPTY_: dynamicJSAccountLimits,
614✔
2487
                        }
614✔
2488
                        ga.mu.Unlock()
614✔
2489
                        s.checkJetStreamExports()
614✔
2490
                        ga.enableAllJetStreamServiceImportsAndMappings()
614✔
2491
                }
614✔
2492
        }
2493

2494
        // Delayed API response handling. Start regardless of JetStream being
2495
        // currently configured or not (since it can be enabled/disabled with
2496
        // configuration reload).
2497
        s.startGoRoutine(s.delayedAPIResponder)
6,713✔
2498

6,713✔
2499
        // Start OCSP Stapling monitoring for TLS certificates if enabled. Hook TLS handshake for
6,713✔
2500
        // OCSP check on peers (LEAF and CLIENT kind) if enabled.
6,713✔
2501
        s.startOCSPMonitoring()
6,713✔
2502

6,713✔
2503
        // Configure OCSP Response Cache for peer OCSP checks if enabled.
6,713✔
2504
        s.initOCSPResponseCache()
6,713✔
2505

6,713✔
2506
        // Start up gateway if needed. Do this before starting the routes, because
6,713✔
2507
        // we want to resolve the gateway host:port so that this information can
6,713✔
2508
        // be sent to other routes.
6,713✔
2509
        if opts.Gateway.Port != 0 {
7,869✔
2510
                s.startGateways()
1,156✔
2511
        }
1,156✔
2512

2513
        // Start websocket server if needed. Do this before starting the routes, and
2514
        // leaf node because we want to resolve the gateway host:port so that this
2515
        // information can be sent to other routes.
2516
        if opts.Websocket.Port != 0 {
6,835✔
2517
                s.startWebsocketServer()
122✔
2518
        }
122✔
2519

2520
        // Start up listen if we want to accept leaf node connections.
2521
        if opts.LeafNode.Port != 0 {
10,371✔
2522
                // Will resolve or assign the advertise address for the leafnode listener.
3,658✔
2523
                // We need that in StartRouting().
3,658✔
2524
                s.startLeafNodeAcceptLoop()
3,658✔
2525
        }
3,658✔
2526

2527
        // Solicit remote servers for leaf node connections.
2528
        if len(opts.LeafNode.Remotes) > 0 {
7,903✔
2529
                s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
1,190✔
2530
        }
1,190✔
2531

2532
        // TODO (ik): I wanted to refactor this by starting the client
2533
        // accept loop first, that is, it would resolve listen spec
2534
        // in place, but start the accept-for-loop in a different go
2535
        // routine. This would get rid of the synchronization between
2536
        // this function and StartRouting, which I also would have wanted
2537
        // to refactor, but both AcceptLoop() and StartRouting() have
2538
        // been exported and not sure if that would break users using them.
2539
        // We could mark them as deprecated and remove in a release or two...
2540

2541
        // The Routing routine needs to wait for the client listen
2542
        // port to be opened and potential ephemeral port selected.
2543
        clientListenReady := make(chan struct{})
6,713✔
2544

6,713✔
2545
        // MQTT
6,713✔
2546
        if opts.MQTT.Port != 0 {
6,960✔
2547
                s.startMQTT()
247✔
2548
        }
247✔
2549

2550
        // Start up routing as well if needed.
2551
        if opts.Cluster.Port != 0 {
11,243✔
2552
                s.startGoRoutine(func() {
9,060✔
2553
                        s.StartRouting(clientListenReady)
4,530✔
2554
                })
4,530✔
2555
        }
2556

2557
        if opts.PortsFileDir != _EMPTY_ {
6,715✔
2558
                s.logPorts()
2✔
2559
        }
2✔
2560

2561
        if opts.TLSRateLimit > 0 {
6,714✔
2562
                s.startGoRoutine(s.logRejectedTLSConns)
1✔
2563
        }
1✔
2564

2565
        // We've finished starting up.
2566
        close(s.startupComplete)
6,713✔
2567

6,713✔
2568
        // Wait for clients.
6,713✔
2569
        if !opts.DontListen {
13,426✔
2570
                s.AcceptLoop(clientListenReady)
6,713✔
2571
        }
6,713✔
2572

2573
        // Bring OSCP Response cache online after accept loop started in anticipation of NATS-enabled cache types
2574
        s.startOCSPResponseCache()
6,713✔
2575
}
2576

2577
func (s *Server) isShuttingDown() bool {
397,940✔
2578
        return s.shutdown.Load()
397,940✔
2579
}
397,940✔
2580

2581
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
2582
// and closing all associated clients.
2583
func (s *Server) Shutdown() {
7,227✔
2584
        if s == nil {
7,232✔
2585
                return
5✔
2586
        }
5✔
2587
        // This is for JetStream R1 Pull Consumers to allow signaling
2588
        // that pending pull requests are invalid.
2589
        s.signalPullConsumers()
7,222✔
2590

7,222✔
2591
        // Transfer off any raft nodes that we are a leader by stepping them down.
7,222✔
2592
        s.stepdownRaftNodes()
7,222✔
2593

7,222✔
2594
        // Shutdown the eventing system as needed.
7,222✔
2595
        // This is done first to send out any messages for
7,222✔
2596
        // account status. We will also clean up any
7,222✔
2597
        // eventing items associated with accounts.
7,222✔
2598
        s.shutdownEventing()
7,222✔
2599

7,222✔
2600
        // Prevent issues with multiple calls.
7,222✔
2601
        if s.isShuttingDown() {
7,661✔
2602
                return
439✔
2603
        }
439✔
2604

2605
        s.mu.Lock()
6,783✔
2606
        s.Noticef("Initiating Shutdown...")
6,783✔
2607

6,783✔
2608
        accRes := s.accResolver
6,783✔
2609

6,783✔
2610
        opts := s.getOpts()
6,783✔
2611

6,783✔
2612
        s.shutdown.Store(true)
6,783✔
2613
        s.running.Store(false)
6,783✔
2614
        s.grMu.Lock()
6,783✔
2615
        s.grRunning = false
6,783✔
2616
        s.grMu.Unlock()
6,783✔
2617
        s.mu.Unlock()
6,783✔
2618

6,783✔
2619
        if accRes != nil {
7,188✔
2620
                accRes.Close()
405✔
2621
        }
405✔
2622

2623
        // Now check and shutdown jetstream.
2624
        s.shutdownJetStream()
6,783✔
2625

6,783✔
2626
        // Now shutdown the nodes
6,783✔
2627
        s.shutdownRaftNodes()
6,783✔
2628

6,783✔
2629
        s.mu.Lock()
6,783✔
2630
        conns := make(map[uint64]*client)
6,783✔
2631

6,783✔
2632
        // Copy off the clients
6,783✔
2633
        for i, c := range s.clients {
8,077✔
2634
                conns[i] = c
1,294✔
2635
        }
1,294✔
2636
        // Copy off the connections that are not yet registered
2637
        // in s.routes, but for which the readLoop has started
2638
        s.grMu.Lock()
6,783✔
2639
        for i, c := range s.grTmpClients {
6,809✔
2640
                conns[i] = c
26✔
2641
        }
26✔
2642
        s.grMu.Unlock()
6,783✔
2643
        // Copy off the routes
6,783✔
2644
        s.forEachRoute(func(r *client) {
23,626✔
2645
                r.mu.Lock()
16,843✔
2646
                conns[r.cid] = r
16,843✔
2647
                r.mu.Unlock()
16,843✔
2648
        })
16,843✔
2649
        // Copy off the gateways
2650
        s.getAllGatewayConnections(conns)
6,783✔
2651

6,783✔
2652
        // Copy off the leaf nodes
6,783✔
2653
        for i, c := range s.leafs {
7,444✔
2654
                conns[i] = c
661✔
2655
        }
661✔
2656

2657
        // Number of done channel responses we expect.
2658
        doneExpected := 0
6,783✔
2659

6,783✔
2660
        // Kick client AcceptLoop()
6,783✔
2661
        if s.listener != nil {
13,486✔
2662
                doneExpected++
6,703✔
2663
                s.listener.Close()
6,703✔
2664
                s.listener = nil
6,703✔
2665
        }
6,703✔
2666

2667
        // Kick websocket server
2668
        doneExpected += s.closeWebsocketServer()
6,783✔
2669

6,783✔
2670
        // Kick MQTT accept loop
6,783✔
2671
        if s.mqtt.listener != nil {
7,029✔
2672
                doneExpected++
246✔
2673
                s.mqtt.listener.Close()
246✔
2674
                s.mqtt.listener = nil
246✔
2675
        }
246✔
2676

2677
        // Kick leafnodes AcceptLoop()
2678
        if s.leafNodeListener != nil {
10,441✔
2679
                doneExpected++
3,658✔
2680
                s.leafNodeListener.Close()
3,658✔
2681
                s.leafNodeListener = nil
3,658✔
2682
        }
3,658✔
2683

2684
        // Kick route AcceptLoop()
2685
        if s.routeListener != nil {
11,306✔
2686
                doneExpected++
4,523✔
2687
                s.routeListener.Close()
4,523✔
2688
                s.routeListener = nil
4,523✔
2689
        }
4,523✔
2690

2691
        // Kick Gateway AcceptLoop()
2692
        if s.gatewayListener != nil {
7,936✔
2693
                doneExpected++
1,153✔
2694
                s.gatewayListener.Close()
1,153✔
2695
                s.gatewayListener = nil
1,153✔
2696
        }
1,153✔
2697

2698
        // Kick HTTP monitoring if its running
2699
        if s.http != nil {
7,886✔
2700
                doneExpected++
1,103✔
2701
                s.http.Close()
1,103✔
2702
                s.http = nil
1,103✔
2703
        }
1,103✔
2704

2705
        // Kick Profiling if its running
2706
        if s.profiler != nil {
6,785✔
2707
                doneExpected++
2✔
2708
                s.profiler.Close()
2✔
2709
        }
2✔
2710

2711
        s.mu.Unlock()
6,783✔
2712

6,783✔
2713
        // Release go routines that wait on that channel
6,783✔
2714
        close(s.quitCh)
6,783✔
2715

6,783✔
2716
        // Close client and route connections
6,783✔
2717
        for _, c := range conns {
27,277✔
2718
                c.setNoReconnect()
20,494✔
2719
                c.closeConnection(ServerShutdown)
20,494✔
2720
        }
20,494✔
2721

2722
        // Block until the accept loops exit
2723
        for doneExpected > 0 {
24,293✔
2724
                <-s.done
17,510✔
2725
                doneExpected--
17,510✔
2726
        }
17,510✔
2727

2728
        // Wait for go routines to be done.
2729
        s.grWG.Wait()
6,783✔
2730

6,783✔
2731
        if opts.PortsFileDir != _EMPTY_ {
6,785✔
2732
                s.deletePortsFile(opts.PortsFileDir)
2✔
2733
        }
2✔
2734

2735
        s.Noticef("Server Exiting..")
6,783✔
2736

6,783✔
2737
        // Stop OCSP Response Cache
6,783✔
2738
        if s.ocsprc != nil {
6,833✔
2739
                s.ocsprc.Stop(s)
50✔
2740
        }
50✔
2741

2742
        // Close logger if applicable. It allows tests on Windows
2743
        // to be able to do proper cleanup (delete log file).
2744
        s.logging.RLock()
6,783✔
2745
        log := s.logging.logger
6,783✔
2746
        s.logging.RUnlock()
6,783✔
2747
        if log != nil {
7,370✔
2748
                if l, ok := log.(*logger.Logger); ok {
662✔
2749
                        l.Close()
75✔
2750
                }
75✔
2751
        }
2752
        // Notify that the shutdown is complete
2753
        close(s.shutdownComplete)
6,783✔
2754
}
2755

2756
// Close the websocket server if running. If so, returns 1, else 0.
2757
// Server lock held on entry.
2758
func (s *Server) closeWebsocketServer() int {
6,789✔
2759
        ws := &s.websocket
6,789✔
2760
        ws.mu.Lock()
6,789✔
2761
        hs := ws.server
6,789✔
2762
        if hs != nil {
6,911✔
2763
                ws.server = nil
122✔
2764
                ws.listener = nil
122✔
2765
        }
122✔
2766
        ws.mu.Unlock()
6,789✔
2767
        if hs != nil {
6,911✔
2768
                hs.Close()
122✔
2769
                return 1
122✔
2770
        }
122✔
2771
        return 0
6,667✔
2772
}
2773

2774
// WaitForShutdown will block until the server has been fully shutdown.
2775
func (s *Server) WaitForShutdown() {
3,136✔
2776
        <-s.shutdownComplete
3,136✔
2777
}
3,136✔
2778

2779
// AcceptLoop is exported for easier testing.
2780
func (s *Server) AcceptLoop(clr chan struct{}) {
6,713✔
2781
        // If we were to exit before the listener is setup properly,
6,713✔
2782
        // make sure we close the channel.
6,713✔
2783
        defer func() {
13,426✔
2784
                if clr != nil {
6,714✔
2785
                        close(clr)
1✔
2786
                }
1✔
2787
        }()
2788

2789
        if s.isShuttingDown() {
6,714✔
2790
                return
1✔
2791
        }
1✔
2792

2793
        // Snapshot server options.
2794
        opts := s.getOpts()
6,712✔
2795

6,712✔
2796
        // Setup state that can enable shutdown
6,712✔
2797
        s.mu.Lock()
6,712✔
2798
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
6,712✔
2799
        l, e := s.getServerListener(hp)
6,712✔
2800
        s.listenerErr = e
6,712✔
2801
        if e != nil {
6,712✔
2802
                s.mu.Unlock()
×
2803
                s.Fatalf("Error listening on port: %s, %q", hp, e)
×
2804
                return
×
2805
        }
×
2806
        s.Noticef("Listening for client connections on %s",
6,712✔
2807
                net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
6,712✔
2808

6,712✔
2809
        // Alert of TLS enabled.
6,712✔
2810
        if opts.TLSConfig != nil {
6,873✔
2811
                s.Noticef("TLS required for client connections")
161✔
2812
                if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
167✔
2813
                        s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
6✔
2814
                }
6✔
2815
        }
2816

2817
        // If server was started with RANDOM_PORT (-1), opts.Port would be equal
2818
        // to 0 at the beginning this function. So we need to get the actual port
2819
        if opts.Port == 0 {
12,850✔
2820
                // Write resolved port back to options.
6,138✔
2821
                opts.Port = l.Addr().(*net.TCPAddr).Port
6,138✔
2822
        }
6,138✔
2823

2824
        // Now that port has been set (if it was set to RANDOM), set the
2825
        // server's info Host/Port with either values from Options or
2826
        // ClientAdvertise.
2827
        if err := s.setInfoHostPort(); err != nil {
6,712✔
2828
                s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
×
2829
                l.Close()
×
2830
                s.mu.Unlock()
×
2831
                return
×
2832
        }
×
2833
        // Keep track of client connect URLs. We may need them later.
2834
        s.clientConnectURLs = s.getClientConnectURLs()
6,712✔
2835
        s.listener = l
6,712✔
2836

6,712✔
2837
        go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
16,314✔
2838
                func(_ error) bool {
6,709✔
2839
                        if s.isLameDuckMode() {
6,715✔
2840
                                // Signal that we are not accepting new clients
6✔
2841
                                s.ldmCh <- true
6✔
2842
                                // Now wait for the Shutdown...
6✔
2843
                                <-s.quitCh
6✔
2844
                                return true
6✔
2845
                        }
6✔
2846
                        return false
6,703✔
2847
                })
2848
        s.mu.Unlock()
6,712✔
2849

6,712✔
2850
        // Let the caller know that we are ready
6,712✔
2851
        close(clr)
6,712✔
2852
        clr = nil
6,712✔
2853
}
2854

2855
// getServerListener returns a network listener for the given host-port address.
2856
// If the Server already has an active listener (s.listener), it returns that listener
2857
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
2858
// a new TCP listener on the specified address using natsListen.
2859
func (s *Server) getServerListener(hp string) (net.Listener, error) {
6,712✔
2860
        if s.listener != nil {
6,712✔
2861
                return s.listener, s.listenerErr
×
2862
        }
×
2863

2864
        return natsListen("tcp", hp)
6,712✔
2865
}
2866

2867
// InProcessConn returns an in-process connection to the server,
2868
// avoiding the need to use a TCP listener for local connectivity
2869
// within the same process. This can be used regardless of the
2870
// state of the DontListen option.
2871
func (s *Server) InProcessConn() (net.Conn, error) {
8✔
2872
        pl, pr := net.Pipe()
8✔
2873
        if !s.startGoRoutine(func() {
16✔
2874
                s.createClientInProcess(pl)
8✔
2875
                s.grWG.Done()
8✔
2876
        }) {
8✔
2877
                pl.Close()
×
2878
                pr.Close()
×
2879
                return nil, fmt.Errorf("failed to create connection")
×
2880
        }
×
2881
        return pr, nil
8✔
2882
}
2883

2884
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
16,294✔
2885
        tmpDelay := ACCEPT_MIN_SLEEP
16,294✔
2886

16,294✔
2887
        for {
76,447✔
2888
                conn, err := l.Accept()
60,153✔
2889
                if err != nil {
76,442✔
2890
                        if errFunc != nil && errFunc(err) {
16,295✔
2891
                                return
6✔
2892
                        }
6✔
2893
                        if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
32,566✔
2894
                                break
16,283✔
2895
                        }
2896
                        continue
×
2897
                }
2898
                tmpDelay = ACCEPT_MIN_SLEEP
43,859✔
2899
                if !s.startGoRoutine(func() {
87,713✔
2900
                        s.reloadMu.RLock()
43,854✔
2901
                        createFunc(conn)
43,854✔
2902
                        s.reloadMu.RUnlock()
43,854✔
2903
                        s.grWG.Done()
43,854✔
2904
                }) {
43,859✔
2905
                        conn.Close()
5✔
2906
                }
5✔
2907
        }
2908
        s.Debugf(acceptName + " accept loop exiting..")
16,283✔
2909
        s.done <- true
16,283✔
2910
}
2911

2912
// This function sets the server's info Host/Port based on server Options.
2913
// Note that this function may be called during config reload, this is why
2914
// Host/Port may be reset to original Options if the ClientAdvertise option
2915
// is not set (since it may have previously been).
2916
func (s *Server) setInfoHostPort() error {
13,642✔
2917
        // When this function is called, opts.Port is set to the actual listen
13,642✔
2918
        // port (if option was originally set to RANDOM), even during a config
13,642✔
2919
        // reload. So use of s.opts.Port is safe.
13,642✔
2920
        opts := s.getOpts()
13,642✔
2921
        if opts.ClientAdvertise != _EMPTY_ {
13,653✔
2922
                h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
11✔
2923
                if err != nil {
11✔
2924
                        return err
×
2925
                }
×
2926
                s.info.Host = h
11✔
2927
                s.info.Port = p
11✔
2928
        } else {
13,631✔
2929
                s.info.Host = opts.Host
13,631✔
2930
                s.info.Port = opts.Port
13,631✔
2931
        }
13,631✔
2932
        return nil
13,642✔
2933
}
2934

2935
// StartProfiler is called to enable dynamic profiling.
2936
func (s *Server) StartProfiler() {
2✔
2937
        if s.isShuttingDown() {
2✔
2938
                return
×
2939
        }
×
2940

2941
        // Snapshot server options.
2942
        opts := s.getOpts()
2✔
2943

2✔
2944
        port := opts.ProfPort
2✔
2945

2✔
2946
        // Check for Random Port
2✔
2947
        if port == -1 {
3✔
2948
                port = 0
1✔
2949
        }
1✔
2950

2951
        s.mu.Lock()
2✔
2952
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
2✔
2953
        l, err := net.Listen("tcp", hp)
2✔
2954

2✔
2955
        if err != nil {
2✔
2956
                s.mu.Unlock()
×
2957
                s.Fatalf("error starting profiler: %s", err)
×
2958
                return
×
2959
        }
×
2960
        s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
2✔
2961

2✔
2962
        srv := &http.Server{
2✔
2963
                Addr:           hp,
2✔
2964
                Handler:        http.DefaultServeMux,
2✔
2965
                MaxHeaderBytes: 1 << 20,
2✔
2966
                ReadTimeout:    time.Second * 5,
2✔
2967
        }
2✔
2968
        s.profiler = l
2✔
2969
        s.profilingServer = srv
2✔
2970

2✔
2971
        s.setBlockProfileRate(opts.ProfBlockRate)
2✔
2972

2✔
2973
        go func() {
4✔
2974
                // if this errors out, it's probably because the server is being shutdown
2✔
2975
                err := srv.Serve(l)
2✔
2976
                if err != nil {
4✔
2977
                        if !s.isShuttingDown() {
2✔
2978
                                s.Fatalf("error starting profiler: %s", err)
×
2979
                        }
×
2980
                }
2981
                srv.Close()
2✔
2982
                s.done <- true
2✔
2983
        }()
2984
        s.mu.Unlock()
2✔
2985
}
2986

2987
func (s *Server) setBlockProfileRate(rate int) {
6,716✔
2988
        // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
6,716✔
2989
        runtime.SetBlockProfileRate(rate)
6,716✔
2990

6,716✔
2991
        if rate > 0 {
6,716✔
2992
                s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
×
2993
        }
×
2994
}
2995

2996
// StartHTTPMonitoring will enable the HTTP monitoring port.
2997
// DEPRECATED: Should use StartMonitoring.
2998
func (s *Server) StartHTTPMonitoring() {
×
2999
        s.startMonitoring(false)
×
3000
}
×
3001

3002
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
3003
// DEPRECATED: Should use StartMonitoring.
3004
func (s *Server) StartHTTPSMonitoring() {
×
3005
        s.startMonitoring(true)
×
3006
}
×
3007

3008
// StartMonitoring starts the HTTP or HTTPs server if needed.
3009
func (s *Server) StartMonitoring() error {
6,719✔
3010
        // Snapshot server options.
6,719✔
3011
        opts := s.getOpts()
6,719✔
3012

6,719✔
3013
        // Specifying both HTTP and HTTPS ports is a misconfiguration
6,719✔
3014
        if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
6,720✔
3015
                return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
1✔
3016
        }
1✔
3017
        var err error
6,718✔
3018
        if opts.HTTPPort != 0 {
7,812✔
3019
                err = s.startMonitoring(false)
1,094✔
3020
        } else if opts.HTTPSPort != 0 {
6,733✔
3021
                if opts.TLSConfig == nil {
17✔
3022
                        return fmt.Errorf("TLS cert and key required for HTTPS")
2✔
3023
                }
2✔
3024
                err = s.startMonitoring(true)
13✔
3025
        }
3026
        return err
6,716✔
3027
}
3028

3029
// HTTP endpoints
3030
const (
3031
        RootPath         = "/"
3032
        VarzPath         = "/varz"
3033
        ConnzPath        = "/connz"
3034
        RoutezPath       = "/routez"
3035
        GatewayzPath     = "/gatewayz"
3036
        LeafzPath        = "/leafz"
3037
        SubszPath        = "/subsz"
3038
        StackszPath      = "/stacksz"
3039
        AccountzPath     = "/accountz"
3040
        AccountStatzPath = "/accstatz"
3041
        JszPath          = "/jsz"
3042
        HealthzPath      = "/healthz"
3043
        IPQueuesPath     = "/ipqueuesz"
3044
        RaftzPath        = "/raftz"
3045
        ExpvarzPath      = "/debug/vars"
3046
)
3047

3048
func (s *Server) basePath(p string) string {
17,728✔
3049
        return path.Join(s.httpBasePath, p)
17,728✔
3050
}
17,728✔
3051

3052
type captureHTTPServerLog struct {
3053
        s      *Server
3054
        prefix string
3055
}
3056

3057
func (cl *captureHTTPServerLog) Write(p []byte) (int, error) {
3✔
3058
        var buf [128]byte
3✔
3059
        var b = buf[:0]
3✔
3060

3✔
3061
        b = append(b, []byte(cl.prefix)...)
3✔
3062
        offset := 0
3✔
3063
        if bytes.HasPrefix(p, []byte("http:")) {
6✔
3064
                offset = 6
3✔
3065
        }
3✔
3066
        b = append(b, p[offset:]...)
3✔
3067
        cl.s.Errorf(string(b))
3✔
3068
        return len(p), nil
3✔
3069
}
3070

3071
// The TLS configuration is passed to the listener when the monitoring
3072
// "server" is setup. That prevents TLS configuration updates on reload
3073
// from being used. By setting this function in tls.Config.GetConfigForClient
3074
// we instruct the TLS handshake to ask for the tls configuration to be
3075
// used for a specific client. We don't care which client, we always use
3076
// the same TLS configuration.
3077
func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, error) {
4✔
3078
        opts := s.getOpts()
4✔
3079
        tc := opts.TLSConfig.Clone()
4✔
3080
        tc.ClientAuth = tls.NoClientCert
4✔
3081
        return tc, nil
4✔
3082
}
4✔
3083

3084
// Start the monitoring server
3085
func (s *Server) startMonitoring(secure bool) error {
1,107✔
3086
        if s.isShuttingDown() {
1,108✔
3087
                return nil
1✔
3088
        }
1✔
3089

3090
        // Snapshot server options.
3091
        opts := s.getOpts()
1,106✔
3092

1,106✔
3093
        var (
1,106✔
3094
                hp           string
1,106✔
3095
                err          error
1,106✔
3096
                httpListener net.Listener
1,106✔
3097
                port         int
1,106✔
3098
        )
1,106✔
3099

1,106✔
3100
        monitorProtocol := "http"
1,106✔
3101

1,106✔
3102
        if secure {
1,119✔
3103
                monitorProtocol += "s"
13✔
3104
                port = opts.HTTPSPort
13✔
3105
                if port == -1 {
17✔
3106
                        port = 0
4✔
3107
                }
4✔
3108
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
13✔
3109
                config := opts.TLSConfig.Clone()
13✔
3110
                if !s.ocspPeerVerify {
25✔
3111
                        config.GetConfigForClient = s.getMonitoringTLSConfig
12✔
3112
                        config.ClientAuth = tls.NoClientCert
12✔
3113
                }
12✔
3114
                httpListener, err = tls.Listen("tcp", hp, config)
13✔
3115

3116
        } else {
1,093✔
3117
                port = opts.HTTPPort
1,093✔
3118
                if port == -1 {
1,980✔
3119
                        port = 0
887✔
3120
                }
887✔
3121
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
1,093✔
3122
                httpListener, err = net.Listen("tcp", hp)
1,093✔
3123
        }
3124

3125
        if err != nil {
1,107✔
3126
                return fmt.Errorf("can't listen to the monitor port: %v", err)
1✔
3127
        }
1✔
3128

3129
        rport := httpListener.Addr().(*net.TCPAddr).Port
1,105✔
3130
        s.Noticef("Starting %s monitor on %s", monitorProtocol, net.JoinHostPort(opts.HTTPHost, strconv.Itoa(rport)))
1,105✔
3131

1,105✔
3132
        mux := http.NewServeMux()
1,105✔
3133

1,105✔
3134
        // Root
1,105✔
3135
        mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
1,105✔
3136
        // Varz
1,105✔
3137
        mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
1,105✔
3138
        // Connz
1,105✔
3139
        mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
1,105✔
3140
        // Routez
1,105✔
3141
        mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
1,105✔
3142
        // Gatewayz
1,105✔
3143
        mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
1,105✔
3144
        // Leafz
1,105✔
3145
        mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
1,105✔
3146
        // Subz
1,105✔
3147
        mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
1,105✔
3148
        // Subz alias for backwards compatibility
1,105✔
3149
        mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
1,105✔
3150
        // Stacksz
1,105✔
3151
        mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
1,105✔
3152
        // Accountz
1,105✔
3153
        mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
1,105✔
3154
        // Accstatz
1,105✔
3155
        mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz)
1,105✔
3156
        // Jsz
1,105✔
3157
        mux.HandleFunc(s.basePath(JszPath), s.HandleJsz)
1,105✔
3158
        // Healthz
1,105✔
3159
        mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz)
1,105✔
3160
        // IPQueuesz
1,105✔
3161
        mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz)
1,105✔
3162
        // Raftz
1,105✔
3163
        mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz)
1,105✔
3164
        // Expvarz
1,105✔
3165
        mux.Handle(s.basePath(ExpvarzPath), expvar.Handler())
1,105✔
3166

1,105✔
3167
        // Do not set a WriteTimeout because it could cause cURL/browser
1,105✔
3168
        // to return empty response or unable to display page if the
1,105✔
3169
        // server needs more time to build the response.
1,105✔
3170
        srv := &http.Server{
1,105✔
3171
                Addr:              hp,
1,105✔
3172
                Handler:           mux,
1,105✔
3173
                MaxHeaderBytes:    1 << 20,
1,105✔
3174
                ErrorLog:          log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
1,105✔
3175
                ReadHeaderTimeout: time.Second * 5,
1,105✔
3176
        }
1,105✔
3177
        s.mu.Lock()
1,105✔
3178
        s.http = httpListener
1,105✔
3179
        s.httpHandler = mux
1,105✔
3180
        s.monitoringServer = srv
1,105✔
3181
        s.mu.Unlock()
1,105✔
3182

1,105✔
3183
        go func() {
2,210✔
3184
                if err := srv.Serve(httpListener); err != nil {
2,208✔
3185
                        if !s.isShuttingDown() {
1,103✔
3186
                                s.Fatalf("Error starting monitor on %q: %v", hp, err)
×
3187
                        }
×
3188
                }
3189
                srv.Close()
1,103✔
3190
                s.mu.Lock()
1,103✔
3191
                s.httpHandler = nil
1,103✔
3192
                s.mu.Unlock()
1,103✔
3193
                s.done <- true
1,103✔
3194
        }()
3195

3196
        return nil
1,105✔
3197
}
3198

3199
// HTTPHandler returns the http.Handler object used to handle monitoring
3200
// endpoints. It will return nil if the server is not configured for
3201
// monitoring, or if the server has not been started yet (Server.Start()).
3202
func (s *Server) HTTPHandler() http.Handler {
2✔
3203
        s.mu.Lock()
2✔
3204
        defer s.mu.Unlock()
2✔
3205
        return s.httpHandler
2✔
3206
}
2✔
3207

3208
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
3209
// If updates are made to Info, this function should be consulted and updated.
3210
// Assume lock is held.
3211
func (s *Server) copyInfo() Info {
18,979✔
3212
        info := s.info
18,979✔
3213
        if len(info.ClientConnectURLs) > 0 {
30,292✔
3214
                info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
11,313✔
3215
        }
11,313✔
3216
        if len(info.WSConnectURLs) > 0 {
19,061✔
3217
                info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
82✔
3218
        }
82✔
3219
        return info
18,979✔
3220
}
3221

3222
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
3223
type tlsMixConn struct {
3224
        net.Conn
3225
        pre *bytes.Buffer
3226
}
3227

3228
// Read for our mixed multi-reader.
3229
func (c *tlsMixConn) Read(b []byte) (int, error) {
39✔
3230
        if c.pre != nil {
45✔
3231
                n, err := c.pre.Read(b)
6✔
3232
                if c.pre.Len() == 0 {
12✔
3233
                        c.pre = nil
6✔
3234
                }
6✔
3235
                return n, err
6✔
3236
        }
3237
        return c.Conn.Read(b)
33✔
3238
}
3239

3240
func (s *Server) createClient(conn net.Conn) *client {
9,821✔
3241
        return s.createClientEx(conn, false)
9,821✔
3242
}
9,821✔
3243

3244
func (s *Server) createClientInProcess(conn net.Conn) *client {
8✔
3245
        return s.createClientEx(conn, true)
8✔
3246
}
8✔
3247

3248
func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
9,829✔
3249
        // Snapshot server options.
9,829✔
3250
        opts := s.getOpts()
9,829✔
3251

9,829✔
3252
        maxPay := int32(opts.MaxPayload)
9,829✔
3253
        maxSubs := int32(opts.MaxSubs)
9,829✔
3254
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
9,829✔
3255
        if maxSubs == 0 {
19,657✔
3256
                maxSubs = -1
9,828✔
3257
        }
9,828✔
3258
        now := time.Now()
9,829✔
3259

9,829✔
3260
        c := &client{
9,829✔
3261
                srv:   s,
9,829✔
3262
                nc:    conn,
9,829✔
3263
                opts:  defaultOpts,
9,829✔
3264
                mpay:  maxPay,
9,829✔
3265
                msubs: maxSubs,
9,829✔
3266
                start: now,
9,829✔
3267
                last:  now,
9,829✔
3268
                iproc: inProcess,
9,829✔
3269
        }
9,829✔
3270

9,829✔
3271
        c.registerWithAccount(s.globalAccount())
9,829✔
3272

9,829✔
3273
        var info Info
9,829✔
3274
        var authRequired bool
9,829✔
3275

9,829✔
3276
        s.mu.Lock()
9,829✔
3277
        // Grab JSON info string
9,829✔
3278
        info = s.copyInfo()
9,829✔
3279
        if s.nonceRequired() {
12,330✔
3280
                // Nonce handling
2,501✔
3281
                var raw [nonceLen]byte
2,501✔
3282
                nonce := raw[:]
2,501✔
3283
                s.generateNonce(nonce)
2,501✔
3284
                info.Nonce = string(nonce)
2,501✔
3285
        }
2,501✔
3286
        c.nonce = []byte(info.Nonce)
9,829✔
3287
        authRequired = info.AuthRequired
9,829✔
3288

9,829✔
3289
        // Check to see if we have auth_required set but we also have a no_auth_user.
9,829✔
3290
        // If so set back to false.
9,829✔
3291
        if info.AuthRequired && opts.NoAuthUser != _EMPTY_ && opts.NoAuthUser != s.sysAccOnlyNoAuthUser {
10,013✔
3292
                info.AuthRequired = false
184✔
3293
        }
184✔
3294

3295
        // Check to see if this is an in-process connection with tls_required.
3296
        // If so, set as not required, but available.
3297
        if inProcess && info.TLSRequired {
9,833✔
3298
                info.TLSRequired = false
4✔
3299
                info.TLSAvailable = true
4✔
3300
        }
4✔
3301

3302
        s.totalClients++
9,829✔
3303
        s.mu.Unlock()
9,829✔
3304

9,829✔
3305
        // Grab lock
9,829✔
3306
        c.mu.Lock()
9,829✔
3307
        if authRequired {
17,013✔
3308
                c.flags.set(expectConnect)
7,184✔
3309
        }
7,184✔
3310

3311
        // Initialize
3312
        c.initClient()
9,829✔
3313

9,829✔
3314
        c.Debugf("Client connection created")
9,829✔
3315

9,829✔
3316
        // Save info.TLSRequired value since we may neeed to change it back and forth.
9,829✔
3317
        orgInfoTLSReq := info.TLSRequired
9,829✔
3318

9,829✔
3319
        var tlsFirstFallback time.Duration
9,829✔
3320
        // Check if we should do TLS first.
9,829✔
3321
        tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst
9,829✔
3322
        if tlsFirst {
9,843✔
3323
                // Make sure info.TLSRequired is set to true (it could be false
14✔
3324
                // if AllowNonTLS is enabled).
14✔
3325
                info.TLSRequired = true
14✔
3326
                // Get the fallback delay value if applicable.
14✔
3327
                if f := opts.TLSHandshakeFirstFallback; f > 0 {
19✔
3328
                        tlsFirstFallback = f
5✔
3329
                } else if inProcess {
17✔
3330
                        // For in-process connection, we will always have a fallback
3✔
3331
                        // delay. It allows support for non-TLS, TLS and "TLS First"
3✔
3332
                        // in-process clients to successfully connect.
3✔
3333
                        tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY
3✔
3334
                }
3✔
3335
        }
3336

3337
        // Decide if we are going to require TLS or not and generate INFO json.
3338
        tlsRequired := info.TLSRequired
9,829✔
3339
        infoBytes := c.generateClientInfoJSON(info)
9,829✔
3340

9,829✔
3341
        // Send our information, except if TLS and TLSHandshakeFirst is requested.
9,829✔
3342
        if !tlsFirst {
19,644✔
3343
                // Need to be sent in place since writeLoop cannot be started until
9,815✔
3344
                // TLS handshake is done (if applicable).
9,815✔
3345
                c.sendProtoNow(infoBytes)
9,815✔
3346
        }
9,815✔
3347

3348
        // Unlock to register
3349
        c.mu.Unlock()
9,829✔
3350

9,829✔
3351
        // Register with the server.
9,829✔
3352
        s.mu.Lock()
9,829✔
3353
        // If server is not running, Shutdown() may have already gathered the
9,829✔
3354
        // list of connections to close. It won't contain this one, so we need
9,829✔
3355
        // to bail out now otherwise the readLoop started down there would not
9,829✔
3356
        // be interrupted. Skip also if in lame duck mode.
9,829✔
3357
        if !s.isRunning() || s.ldm {
10,025✔
3358
                // There are some tests that create a server but don't start it,
196✔
3359
                // and use "async" clients and perform the parsing manually. Such
196✔
3360
                // clients would branch here (since server is not running). However,
196✔
3361
                // when a server was really running and has been shutdown, we must
196✔
3362
                // close this connection.
196✔
3363
                if s.isShuttingDown() {
197✔
3364
                        conn.Close()
1✔
3365
                }
1✔
3366
                s.mu.Unlock()
196✔
3367
                return c
196✔
3368
        }
3369

3370
        // If there is a max connections specified, check that adding
3371
        // this new client would not push us over the max
3372
        if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {
9,634✔
3373
                s.mu.Unlock()
1✔
3374
                c.maxConnExceeded()
1✔
3375
                return nil
1✔
3376
        }
1✔
3377
        s.clients[c.cid] = c
9,632✔
3378

9,632✔
3379
        s.mu.Unlock()
9,632✔
3380

9,632✔
3381
        // Re-Grab lock
9,632✔
3382
        c.mu.Lock()
9,632✔
3383

9,632✔
3384
        isClosed := c.isClosed()
9,632✔
3385
        var pre []byte
9,632✔
3386
        // We need first to check for "TLS First" fallback delay.
9,632✔
3387
        if !isClosed && tlsFirstFallback > 0 {
9,640✔
3388
                // We wait and see if we are getting any data. Since we did not send
8✔
3389
                // the INFO protocol yet, only clients that use TLS first should be
8✔
3390
                // sending data (the TLS handshake). We don't really check the content:
8✔
3391
                // if it is a rogue agent and not an actual client performing the
8✔
3392
                // TLS handshake, the error will be detected when performing the
8✔
3393
                // handshake on our side.
8✔
3394
                pre = make([]byte, 4)
8✔
3395
                c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
8✔
3396
                n, _ := io.ReadFull(c.nc, pre[:])
8✔
3397
                c.nc.SetReadDeadline(time.Time{})
8✔
3398
                // If we get any data (regardless of possible timeout), we will proceed
8✔
3399
                // with the TLS handshake.
8✔
3400
                if n > 0 {
11✔
3401
                        pre = pre[:n]
3✔
3402
                } else {
8✔
3403
                        // We did not get anything so we will send the INFO protocol.
5✔
3404
                        pre = nil
5✔
3405

5✔
3406
                        // Restore the original info.TLSRequired value if it is
5✔
3407
                        // different that the current value and regenerate infoBytes.
5✔
3408
                        if orgInfoTLSReq != info.TLSRequired {
9✔
3409
                                info.TLSRequired = orgInfoTLSReq
4✔
3410
                                infoBytes = c.generateClientInfoJSON(info)
4✔
3411
                        }
4✔
3412
                        c.sendProtoNow(infoBytes)
5✔
3413
                        // Set the boolean to false for the rest of the function.
5✔
3414
                        tlsFirst = false
5✔
3415
                        // Check closed status again
5✔
3416
                        isClosed = c.isClosed()
5✔
3417
                }
3418
        }
3419
        // If we have both TLS and non-TLS allowed we need to see which
3420
        // one the client wants. We'll always allow this for in-process
3421
        // connections.
3422
        if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) {
9,641✔
3423
                pre = make([]byte, 4)
9✔
3424
                c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
9✔
3425
                n, _ := io.ReadFull(c.nc, pre[:])
9✔
3426
                c.nc.SetReadDeadline(time.Time{})
9✔
3427
                pre = pre[:n]
9✔
3428
                if n > 0 && pre[0] == 0x16 {
12✔
3429
                        tlsRequired = true
3✔
3430
                } else {
9✔
3431
                        tlsRequired = false
6✔
3432
                }
6✔
3433
        }
3434

3435
        // Check for TLS
3436
        if !isClosed && tlsRequired {
10,009✔
3437
                if s.connRateCounter != nil && !s.connRateCounter.allow() {
378✔
3438
                        c.mu.Unlock()
1✔
3439
                        c.sendErr("Connection throttling is active. Please try again later.")
1✔
3440
                        c.closeConnection(MaxConnectionsExceeded)
1✔
3441
                        return nil
1✔
3442
                }
1✔
3443

3444
                // If we have a prebuffer create a multi-reader.
3445
                if len(pre) > 0 {
382✔
3446
                        c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
6✔
3447
                        // Clear pre so it is not parsed.
6✔
3448
                        pre = nil
6✔
3449
                }
6✔
3450
                // Performs server-side TLS handshake.
3451
                if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout, opts.TLSPinnedCerts); err != nil {
509✔
3452
                        c.mu.Unlock()
133✔
3453
                        return nil
133✔
3454
                }
133✔
3455
        }
3456

3457
        // Now, send the INFO if it was delayed
3458
        if !isClosed && tlsFirst {
9,504✔
3459
                c.flags.set(didTLSFirst)
6✔
3460
                c.sendProtoNow(infoBytes)
6✔
3461
                // Check closed status
6✔
3462
                isClosed = c.isClosed()
6✔
3463
        }
6✔
3464

3465
        // Connection could have been closed while sending the INFO proto.
3466
        if isClosed {
9,501✔
3467
                c.mu.Unlock()
3✔
3468
                // We need to call closeConnection() to make sure that proper cleanup is done.
3✔
3469
                c.closeConnection(WriteError)
3✔
3470
                return nil
3✔
3471
        }
3✔
3472

3473
        // Check for Auth. We schedule this timer after the TLS handshake to avoid
3474
        // the race where the timer fires during the handshake and causes the
3475
        // server to write bad data to the socket. See issue #432.
3476
        if authRequired {
16,553✔
3477
                c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
7,058✔
3478
        }
7,058✔
3479

3480
        // Do final client initialization
3481

3482
        // Set the Ping timer. Will be reset once connect was received.
3483
        c.setPingTimer()
9,495✔
3484

9,495✔
3485
        // Spin up the read loop.
9,495✔
3486
        s.startGoRoutine(func() { c.readLoop(pre) })
18,990✔
3487

3488
        // Spin up the write loop.
3489
        s.startGoRoutine(func() { c.writeLoop() })
18,990✔
3490

3491
        if tlsRequired {
9,738✔
3492
                c.Debugf("TLS handshake complete")
243✔
3493
                cs := c.nc.(*tls.Conn).ConnectionState()
243✔
3494
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
243✔
3495
        }
243✔
3496

3497
        c.mu.Unlock()
9,495✔
3498

9,495✔
3499
        return c
9,495✔
3500
}
3501

3502
// This will save off a closed client in a ring buffer such that
3503
// /connz can inspect. Useful for debugging, etc.
3504
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
12,082✔
3505
        now := time.Now()
12,082✔
3506

12,082✔
3507
        s.accountDisconnectEvent(c, now, reason.String())
12,082✔
3508

12,082✔
3509
        c.mu.Lock()
12,082✔
3510

12,082✔
3511
        cc := &closedClient{}
12,082✔
3512
        cc.fill(c, nc, now, false)
12,082✔
3513
        // Note that cc.fill is using len(c.subs), which may have been set to nil by now,
12,082✔
3514
        // so replace cc.NumSubs with len(subs).
12,082✔
3515
        cc.NumSubs = uint32(len(subs))
12,082✔
3516
        cc.Stop = &now
12,082✔
3517
        cc.Reason = reason.String()
12,082✔
3518

12,082✔
3519
        // Do subs, do not place by default in main ConnInfo
12,082✔
3520
        if len(subs) > 0 {
21,455✔
3521
                cc.subs = make([]SubDetail, 0, len(subs))
9,373✔
3522
                for _, sub := range subs {
159,411✔
3523
                        cc.subs = append(cc.subs, newSubDetail(sub))
150,038✔
3524
                }
150,038✔
3525
        }
3526
        // Hold user as well.
3527
        cc.user = c.getRawAuthUser()
12,082✔
3528
        // Hold account name if not the global account.
12,082✔
3529
        if c.acc != nil && c.acc.Name != globalAccountName {
18,697✔
3530
                cc.acc = c.acc.Name
6,615✔
3531
        }
6,615✔
3532
        cc.JWT = c.opts.JWT
12,082✔
3533
        cc.IssuerKey = issuerForClient(c)
12,082✔
3534
        cc.Tags = c.tags
12,082✔
3535
        cc.NameTag = c.nameTag
12,082✔
3536
        c.mu.Unlock()
12,082✔
3537

12,082✔
3538
        // Place in the ring buffer
12,082✔
3539
        s.mu.Lock()
12,082✔
3540
        if s.closed != nil {
24,163✔
3541
                s.closed.append(cc)
12,081✔
3542
        }
12,081✔
3543
        s.mu.Unlock()
12,082✔
3544
}
3545

3546
// Adds to the list of client and websocket clients connect URLs.
3547
// If there was a change, an INFO protocol is sent to registered clients
3548
// that support async INFO protocols.
3549
// Server lock held on entry.
3550
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,104✔
3551
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
9,104✔
3552
}
9,104✔
3553

3554
// Removes from the list of client and websocket clients connect URLs.
3555
// If there was a change, an INFO protocol is sent to registered clients
3556
// that support async INFO protocols.
3557
// Server lock held on entry.
3558
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,097✔
3559
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
9,097✔
3560
}
9,097✔
3561

3562
// Updates the list of client and websocket clients connect URLs and if any change
3563
// sends an async INFO update to clients that support it.
3564
// Server lock held on entry.
3565
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
18,201✔
3566
        remove := !add
18,201✔
3567
        // Will return true if we need alter the server's Info object.
18,201✔
3568
        updateMap := func(urls []string, m refCountedUrlSet) bool {
54,603✔
3569
                wasUpdated := false
36,402✔
3570
                for _, url := range urls {
55,242✔
3571
                        if add && m.addUrl(url) {
28,256✔
3572
                                wasUpdated = true
9,416✔
3573
                        } else if remove && m.removeUrl(url) {
28,254✔
3574
                                wasUpdated = true
9,414✔
3575
                        }
9,414✔
3576
                }
3577
                return wasUpdated
36,402✔
3578
        }
3579
        cliUpdated := updateMap(curls, s.clientConnectURLsMap)
18,201✔
3580
        wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
18,201✔
3581

18,201✔
3582
        updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
36,725✔
3583
                // Recreate the info's slice from the map
18,524✔
3584
                *infoURLs = (*infoURLs)[:0]
18,524✔
3585
                // Add this server client connect ULRs first...
18,524✔
3586
                *infoURLs = append(*infoURLs, urls...)
18,524✔
3587
                // Then the ones from the map
18,524✔
3588
                for url := range m {
45,022✔
3589
                        *infoURLs = append(*infoURLs, url)
26,498✔
3590
                }
26,498✔
3591
        }
3592
        if cliUpdated {
36,379✔
3593
                updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
18,178✔
3594
        }
18,178✔
3595
        if wsUpdated {
18,547✔
3596
                updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
346✔
3597
        }
346✔
3598
        if cliUpdated || wsUpdated {
36,379✔
3599
                // Send to all registered clients that support async INFO protocols.
18,178✔
3600
                s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
18,178✔
3601
        }
18,178✔
3602
}
3603

3604
// Handle closing down a connection when the handshake has timedout.
3605
func tlsTimeout(c *client, conn *tls.Conn) {
34✔
3606
        c.mu.Lock()
34✔
3607
        closed := c.isClosed()
34✔
3608
        c.mu.Unlock()
34✔
3609
        // Check if already closed
34✔
3610
        if closed {
42✔
3611
                return
8✔
3612
        }
8✔
3613
        cs := conn.ConnectionState()
26✔
3614
        if !cs.HandshakeComplete {
30✔
3615
                c.Errorf("TLS handshake timeout")
4✔
3616
                c.sendErr("Secure Connection - TLS Required")
4✔
3617
                c.closeConnection(TLSHandshakeError)
4✔
3618
        }
4✔
3619
}
3620

3621
// Seems silly we have to write these
3622
func tlsVersion(ver uint16) string {
1,283✔
3623
        switch ver {
1,283✔
3624
        case tls.VersionTLS10:
×
3625
                return "1.0"
×
3626
        case tls.VersionTLS11:
×
3627
                return "1.1"
×
3628
        case tls.VersionTLS12:
×
3629
                return "1.2"
×
3630
        case tls.VersionTLS13:
1,283✔
3631
                return "1.3"
1,283✔
3632
        }
3633
        return fmt.Sprintf("Unknown [0x%x]", ver)
×
3634
}
3635

3636
func tlsVersionFromString(ver string) (uint16, error) {
×
3637
        switch ver {
×
3638
        case "1.0":
×
3639
                return tls.VersionTLS10, nil
×
3640
        case "1.1":
×
3641
                return tls.VersionTLS11, nil
×
3642
        case "1.2":
×
3643
                return tls.VersionTLS12, nil
×
3644
        case "1.3":
×
3645
                return tls.VersionTLS13, nil
×
3646
        }
3647
        return 0, fmt.Errorf("unknown version: %v", ver)
×
3648
}
3649

3650
// Remove a client or route from our internal accounting.
3651
func (s *Server) removeClient(c *client) {
145,078✔
3652
        // kind is immutable, so can check without lock
145,078✔
3653
        switch c.kind {
145,078✔
3654
        case CLIENT:
10,387✔
3655
                c.mu.Lock()
10,387✔
3656
                cid := c.cid
10,387✔
3657
                updateProtoInfoCount := false
10,387✔
3658
                if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
19,251✔
3659
                        updateProtoInfoCount = true
8,864✔
3660
                }
8,864✔
3661
                proxyKey := c.proxyKey
10,387✔
3662
                c.mu.Unlock()
10,387✔
3663

10,387✔
3664
                s.mu.Lock()
10,387✔
3665
                delete(s.clients, cid)
10,387✔
3666
                if updateProtoInfoCount {
19,251✔
3667
                        s.cproto--
8,864✔
3668
                }
8,864✔
3669
                if proxyKey != _EMPTY_ {
10,391✔
3670
                        s.removeProxiedConn(proxyKey, cid)
4✔
3671
                }
4✔
3672
                s.mu.Unlock()
10,387✔
3673
        case ROUTER:
61,514✔
3674
                s.removeRoute(c)
61,514✔
3675
        case GATEWAY:
3,771✔
3676
                s.removeRemoteGatewayConnection(c)
3,771✔
3677
        case LEAF:
1,697✔
3678
                s.removeLeafNodeConnection(c)
1,697✔
3679
        }
3680
}
3681

3682
// Remove the connection with id `cid` from the map of connections
3683
// under the public key `key` of the trusted proxies.
3684
//
3685
// Server lock must be held on entry.
3686
func (s *Server) removeProxiedConn(key string, cid uint64) {
8✔
3687
        conns := s.proxiedConns[key]
8✔
3688
        delete(conns, cid)
8✔
3689
        if len(conns) == 0 {
16✔
3690
                delete(s.proxiedConns, key)
8✔
3691
        }
8✔
3692
}
3693

3694
func (s *Server) removeFromTempClients(cid uint64) {
105,427✔
3695
        s.grMu.Lock()
105,427✔
3696
        delete(s.grTmpClients, cid)
105,427✔
3697
        s.grMu.Unlock()
105,427✔
3698
}
105,427✔
3699

3700
func (s *Server) addToTempClients(cid uint64, c *client) bool {
66,676✔
3701
        added := false
66,676✔
3702
        s.grMu.Lock()
66,676✔
3703
        if s.grRunning {
133,349✔
3704
                s.grTmpClients[cid] = c
66,673✔
3705
                added = true
66,673✔
3706
        }
66,673✔
3707
        s.grMu.Unlock()
66,676✔
3708
        return added
66,676✔
3709
}
3710

3711
/////////////////////////////////////////////////////////////////
3712
// These are some helpers for accounting in functional tests.
3713
/////////////////////////////////////////////////////////////////
3714

3715
// NumRoutes will report the number of registered routes.
3716
func (s *Server) NumRoutes() int {
6,468✔
3717
        s.mu.RLock()
6,468✔
3718
        defer s.mu.RUnlock()
6,468✔
3719
        return s.numRoutes()
6,468✔
3720
}
6,468✔
3721

3722
// numRoutes will report the number of registered routes.
3723
// Server lock held on entry
3724
func (s *Server) numRoutes() int {
766,740✔
3725
        var nr int
766,740✔
3726
        s.forEachRoute(func(c *client) {
1,510,736✔
3727
                nr++
743,996✔
3728
        })
743,996✔
3729
        return nr
766,740✔
3730
}
3731

3732
// NumRemotes will report number of registered remotes.
3733
func (s *Server) NumRemotes() int {
×
3734
        s.mu.RLock()
×
3735
        defer s.mu.RUnlock()
×
3736
        return s.numRemotes()
×
3737
}
×
3738

3739
// numRemotes will report number of registered remotes.
3740
// Server lock held on entry
3741
func (s *Server) numRemotes() int {
28,570✔
3742
        return len(s.routes)
28,570✔
3743
}
28,570✔
3744

3745
// NumLeafNodes will report number of leaf node connections.
3746
func (s *Server) NumLeafNodes() int {
4,386✔
3747
        s.mu.RLock()
4,386✔
3748
        defer s.mu.RUnlock()
4,386✔
3749
        return len(s.leafs)
4,386✔
3750
}
4,386✔
3751

3752
// NumClients will report the number of registered clients.
3753
func (s *Server) NumClients() int {
69✔
3754
        s.mu.RLock()
69✔
3755
        defer s.mu.RUnlock()
69✔
3756
        return len(s.clients)
69✔
3757
}
69✔
3758

3759
// GetClient will return the client associated with cid.
3760
func (s *Server) GetClient(cid uint64) *client {
134✔
3761
        return s.getClient(cid)
134✔
3762
}
134✔
3763

3764
// getClient will return the client associated with cid.
3765
func (s *Server) getClient(cid uint64) *client {
164✔
3766
        s.mu.RLock()
164✔
3767
        defer s.mu.RUnlock()
164✔
3768
        return s.clients[cid]
164✔
3769
}
164✔
3770

3771
// GetLeafNode returns the leafnode associated with the cid.
3772
func (s *Server) GetLeafNode(cid uint64) *client {
1✔
3773
        s.mu.RLock()
1✔
3774
        defer s.mu.RUnlock()
1✔
3775
        return s.leafs[cid]
1✔
3776
}
1✔
3777

3778
// NumSubscriptions will report how many subscriptions are active.
3779
func (s *Server) NumSubscriptions() uint32 {
384✔
3780
        s.mu.RLock()
384✔
3781
        defer s.mu.RUnlock()
384✔
3782
        return s.numSubscriptions()
384✔
3783
}
384✔
3784

3785
// numSubscriptions will report how many subscriptions are active.
3786
// Lock should be held.
3787
func (s *Server) numSubscriptions() uint32 {
26,547✔
3788
        var subs int
26,547✔
3789
        s.accounts.Range(func(k, v any) bool {
86,898✔
3790
                acc := v.(*Account)
60,351✔
3791
                subs += acc.TotalSubs()
60,351✔
3792
                return true
60,351✔
3793
        })
60,351✔
3794
        return uint32(subs)
26,547✔
3795
}
3796

3797
// NumSlowConsumers will report the number of slow consumers.
3798
func (s *Server) NumSlowConsumers() int64 {
1✔
3799
        return atomic.LoadInt64(&s.slowConsumers)
1✔
3800
}
1✔
3801

3802
// NumStalledClients will report the total number of times clients have been stalled.
3803
func (s *Server) NumStalledClients() int64 {
×
3804
        return atomic.LoadInt64(&s.stalls)
×
3805
}
×
3806

3807
// NumSlowConsumersClients will report the number of slow consumers clients.
3808
func (s *Server) NumSlowConsumersClients() uint64 {
37,458✔
3809
        return s.scStats.clients.Load()
37,458✔
3810
}
37,458✔
3811

3812
// NumSlowConsumersRoutes will report the number of slow consumers routes.
3813
func (s *Server) NumSlowConsumersRoutes() uint64 {
37,458✔
3814
        return s.scStats.routes.Load()
37,458✔
3815
}
37,458✔
3816

3817
// NumSlowConsumersGateways will report the number of slow consumers leafs.
3818
func (s *Server) NumSlowConsumersGateways() uint64 {
37,460✔
3819
        return s.scStats.gateways.Load()
37,460✔
3820
}
37,460✔
3821

3822
// NumSlowConsumersLeafs will report the number of slow consumers leafs.
3823
func (s *Server) NumSlowConsumersLeafs() uint64 {
37,459✔
3824
        return s.scStats.leafs.Load()
37,459✔
3825
}
37,459✔
3826

3827
// NumStaleConnections will report the number of stale connections.
3828
func (s *Server) NumStaleConnections() int64 {
4✔
3829
        return atomic.LoadInt64(&s.staleConnections)
4✔
3830
}
4✔
3831

3832
// NumStaleConnectionsClients will report the number of stale client connections.
3833
func (s *Server) NumStaleConnectionsClients() uint64 {
37,462✔
3834
        return s.staleStats.clients.Load()
37,462✔
3835
}
37,462✔
3836

3837
// NumStaleConnectionsRoutes will report the number of stale route connections.
3838
func (s *Server) NumStaleConnectionsRoutes() uint64 {
37,462✔
3839
        return s.staleStats.routes.Load()
37,462✔
3840
}
37,462✔
3841

3842
// NumStaleConnectionsGateways will report the number of stale gateway connections.
3843
func (s *Server) NumStaleConnectionsGateways() uint64 {
37,462✔
3844
        return s.staleStats.gateways.Load()
37,462✔
3845
}
37,462✔
3846

3847
// NumStaleConnectionsLeafs will report the number of stale leaf connections.
3848
func (s *Server) NumStaleConnectionsLeafs() uint64 {
37,462✔
3849
        return s.staleStats.leafs.Load()
37,462✔
3850
}
37,462✔
3851

3852
// ConfigTime will report the last time the server configuration was loaded.
3853
func (s *Server) ConfigTime() time.Time {
10✔
3854
        s.mu.RLock()
10✔
3855
        defer s.mu.RUnlock()
10✔
3856
        return s.configTime
10✔
3857
}
10✔
3858

3859
// Addr will return the net.Addr object for the current listener.
3860
func (s *Server) Addr() net.Addr {
229✔
3861
        s.mu.RLock()
229✔
3862
        defer s.mu.RUnlock()
229✔
3863
        if s.listener == nil {
229✔
3864
                return nil
×
3865
        }
×
3866
        return s.listener.Addr()
229✔
3867
}
3868

3869
// MonitorAddr will return the net.Addr object for the monitoring listener.
3870
func (s *Server) MonitorAddr() *net.TCPAddr {
136✔
3871
        s.mu.RLock()
136✔
3872
        defer s.mu.RUnlock()
136✔
3873
        if s.http == nil {
136✔
3874
                return nil
×
3875
        }
×
3876
        return s.http.Addr().(*net.TCPAddr)
136✔
3877
}
3878

3879
// ClusterAddr returns the net.Addr object for the route listener.
3880
func (s *Server) ClusterAddr() *net.TCPAddr {
49✔
3881
        s.mu.RLock()
49✔
3882
        defer s.mu.RUnlock()
49✔
3883
        if s.routeListener == nil {
49✔
3884
                return nil
×
3885
        }
×
3886
        return s.routeListener.Addr().(*net.TCPAddr)
49✔
3887
}
3888

3889
// ProfilerAddr returns the net.Addr object for the profiler listener.
3890
func (s *Server) ProfilerAddr() *net.TCPAddr {
×
3891
        s.mu.RLock()
×
3892
        defer s.mu.RUnlock()
×
3893
        if s.profiler == nil {
×
3894
                return nil
×
3895
        }
×
3896
        return s.profiler.Addr().(*net.TCPAddr)
×
3897
}
3898

3899
func (s *Server) readyForConnections(d time.Duration) error {
10,125✔
3900
        // Snapshot server options.
10,125✔
3901
        opts := s.getOpts()
10,125✔
3902

10,125✔
3903
        type info struct {
10,125✔
3904
                ok  bool
10,125✔
3905
                err error
10,125✔
3906
        }
10,125✔
3907
        chk := make(map[string]info)
10,125✔
3908

10,125✔
3909
        end := time.Now().Add(d)
10,125✔
3910
        for time.Now().Before(end) {
48,826✔
3911
                s.mu.RLock()
38,701✔
3912
                chk["server"] = info{ok: s.listener != nil || opts.DontListen, err: s.listenerErr}
38,701✔
3913
                chk["route"] = info{ok: (opts.Cluster.Port == 0 || s.routeListener != nil), err: s.routeListenerErr}
38,701✔
3914
                chk["gateway"] = info{ok: (opts.Gateway.Name == _EMPTY_ || s.gatewayListener != nil), err: s.gatewayListenerErr}
38,701✔
3915
                chk["leafnode"] = info{ok: (opts.LeafNode.Port == 0 || s.leafNodeListener != nil), err: s.leafNodeListenerErr}
38,701✔
3916
                chk["websocket"] = info{ok: (opts.Websocket.Port == 0 || s.websocket.listener != nil), err: s.websocket.listenerErr}
38,701✔
3917
                chk["mqtt"] = info{ok: (opts.MQTT.Port == 0 || s.mqtt.listener != nil), err: s.mqtt.listenerErr}
38,701✔
3918
                s.mu.RUnlock()
38,701✔
3919

38,701✔
3920
                var numOK int
38,701✔
3921
                for _, inf := range chk {
270,907✔
3922
                        if inf.ok {
434,775✔
3923
                                numOK++
202,569✔
3924
                        }
202,569✔
3925
                }
3926
                if numOK == len(chk) {
48,818✔
3927
                        // In the case of DontListen option (no accept loop), we still want
10,117✔
3928
                        // to make sure that Start() has done all the work, so we wait on
10,117✔
3929
                        // that.
10,117✔
3930
                        if opts.DontListen {
10,117✔
3931
                                select {
×
3932
                                case <-s.startupComplete:
×
3933
                                case <-time.After(d):
×
3934
                                        return fmt.Errorf("failed to be ready for connections after %s: startup did not complete", d)
×
3935
                                }
3936
                        }
3937
                        return nil
10,117✔
3938
                }
3939
                if d > 25*time.Millisecond {
33,443✔
3940
                        time.Sleep(25 * time.Millisecond)
4,859✔
3941
                }
4,859✔
3942
        }
3943

3944
        failed := make([]string, 0, len(chk))
7✔
3945
        for name, inf := range chk {
49✔
3946
                if inf.ok && inf.err != nil {
42✔
3947
                        failed = append(failed, fmt.Sprintf("%s(ok, but %s)", name, inf.err))
×
3948
                }
×
3949
                if !inf.ok && inf.err == nil {
51✔
3950
                        failed = append(failed, name)
9✔
3951
                }
9✔
3952
                if !inf.ok && inf.err != nil {
42✔
3953
                        failed = append(failed, fmt.Sprintf("%s(%s)", name, inf.err))
×
3954
                }
×
3955
        }
3956

3957
        return fmt.Errorf(
7✔
3958
                "failed to be ready for connections after %s: %s",
7✔
3959
                d, strings.Join(failed, ", "),
7✔
3960
        )
7✔
3961
}
3962

3963
// ReadyForConnections returns `true` if the server is ready to accept clients
3964
// and, if routing is enabled, route connections. If after the duration
3965
// `dur` the server is still not ready, returns `false`.
3966
func (s *Server) ReadyForConnections(dur time.Duration) bool {
806✔
3967
        return s.readyForConnections(dur) == nil
806✔
3968
}
806✔
3969

3970
// Quick utility to function to tell if the server supports headers.
3971
func (s *Server) supportsHeaders() bool {
183,293✔
3972
        if s == nil {
183,293✔
3973
                return false
×
3974
        }
×
3975
        return !(s.getOpts().NoHeaderSupport)
183,293✔
3976
}
3977

3978
// ID returns the server's ID
3979
func (s *Server) ID() string {
53,645✔
3980
        return s.info.ID
53,645✔
3981
}
53,645✔
3982

3983
// NodeName returns the node name for this server.
3984
func (s *Server) NodeName() string {
197✔
3985
        return getHash(s.info.Name)
197✔
3986
}
197✔
3987

3988
// Name returns the server's name. This will be the same as the ID if it was not set.
3989
func (s *Server) Name() string {
222,129✔
3990
        return s.info.Name
222,129✔
3991
}
222,129✔
3992

3993
func (s *Server) String() string {
5,977✔
3994
        return s.info.Name
5,977✔
3995
}
5,977✔
3996

3997
type pprofLabels map[string]string
3998

3999
func setGoRoutineLabels(tags ...pprofLabels) {
353,891✔
4000
        var labels []string
353,891✔
4001
        for _, m := range tags {
403,344✔
4002
                for k, v := range m {
225,406✔
4003
                        labels = append(labels, k, v)
175,953✔
4004
                }
175,953✔
4005
        }
4006
        if len(labels) > 0 {
403,343✔
4007
                pprof.SetGoroutineLabels(
49,452✔
4008
                        pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
49,452✔
4009
                )
49,452✔
4010
        }
49,452✔
4011
}
4012

4013
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
332,704✔
4014
        var started bool
332,704✔
4015
        s.grMu.Lock()
332,704✔
4016
        defer s.grMu.Unlock()
332,704✔
4017
        if s.grRunning {
665,273✔
4018
                s.grWG.Add(1)
332,569✔
4019
                go func() {
665,138✔
4020
                        setGoRoutineLabels(tags...)
332,569✔
4021
                        f()
332,569✔
4022
                }()
332,569✔
4023
                started = true
332,569✔
4024
        }
4025
        return started
332,704✔
4026
}
4027

4028
func (s *Server) numClosedConns() int {
102✔
4029
        s.mu.RLock()
102✔
4030
        defer s.mu.RUnlock()
102✔
4031
        return s.closed.len()
102✔
4032
}
102✔
4033

4034
func (s *Server) totalClosedConns() uint64 {
43✔
4035
        s.mu.RLock()
43✔
4036
        defer s.mu.RUnlock()
43✔
4037
        return s.closed.totalConns()
43✔
4038
}
43✔
4039

4040
func (s *Server) closedClients() []*closedClient {
8✔
4041
        s.mu.RLock()
8✔
4042
        defer s.mu.RUnlock()
8✔
4043
        return s.closed.closedClients()
8✔
4044
}
8✔
4045

4046
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
4047
// port based on the server options' Host and Port. If the Host corresponds to
4048
// "any" interfaces, this call returns the list of resolved IP addresses.
4049
// If ClientAdvertise is set, returns the client advertise host and port.
4050
// The server lock is assumed held on entry.
4051
func (s *Server) getClientConnectURLs() []string {
6,712✔
4052
        // Snapshot server options.
6,712✔
4053
        opts := s.getOpts()
6,712✔
4054
        // Ignore error here since we know that if there is client advertise, the
6,712✔
4055
        // parseHostPort is correct because we did it right before calling this
6,712✔
4056
        // function in Server.New().
6,712✔
4057
        urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
6,712✔
4058
        return urls
6,712✔
4059
}
6,712✔
4060

4061
// Generic version that will return an array of URLs based on the given
4062
// advertise, host and port values.
4063
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
6,834✔
4064
        urls := make([]string, 0, 1)
6,834✔
4065

6,834✔
4066
        // short circuit if advertise is set
6,834✔
4067
        if advertise != "" {
6,839✔
4068
                h, p, err := parseHostPort(advertise, port)
5✔
4069
                if err != nil {
5✔
4070
                        return nil, err
×
4071
                }
×
4072
                urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
5✔
4073
        } else {
6,829✔
4074
                sPort := strconv.Itoa(port)
6,829✔
4075
                _, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
6,829✔
4076
                for _, ip := range ips {
8,281✔
4077
                        urls = append(urls, net.JoinHostPort(ip, sPort))
1,452✔
4078
                }
1,452✔
4079
                if err != nil || len(urls) == 0 {
12,932✔
4080
                        // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
6,103✔
4081
                        // reason we could not add any URL in the loop above.
6,103✔
4082
                        // We had a case where a Windows VM was hosed and would have err == nil
6,103✔
4083
                        // and not add any address in the array in the loop above, and we
6,103✔
4084
                        // ended-up returning 0.0.0.0, which is problematic for Windows clients.
6,103✔
4085
                        // Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
6,103✔
4086
                        if host == "0.0.0.0" || host == "::" {
6,103✔
4087
                                s.Errorf("Address %q can not be resolved properly", host)
×
4088
                        } else {
6,103✔
4089
                                urls = append(urls, net.JoinHostPort(host, sPort))
6,103✔
4090
                        }
6,103✔
4091
                }
4092
        }
4093
        return urls, nil
6,834✔
4094
}
4095

4096
// Returns an array of non local IPs if the provided host is
4097
// 0.0.0.0 or ::. It returns the first resolved if `all` is
4098
// false.
4099
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
4100
// so that if the returned array is empty caller can decide
4101
// what to do next.
4102
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
11,607✔
4103
        ip := net.ParseIP(host)
11,607✔
4104
        // If this is not an IP, we are done
11,607✔
4105
        if ip == nil {
11,630✔
4106
                return false, nil, nil
23✔
4107
        }
23✔
4108
        // If this is not 0.0.0.0 or :: we have nothing to do.
4109
        if !ip.IsUnspecified() {
22,088✔
4110
                return false, nil, nil
10,504✔
4111
        }
10,504✔
4112
        s.Debugf("Get non local IPs for %q", host)
1,080✔
4113
        var ips []string
1,080✔
4114
        ifaces, _ := net.Interfaces()
1,080✔
4115
        for _, i := range ifaces {
4,320✔
4116
                addrs, _ := i.Addrs()
3,240✔
4117
                for _, addr := range addrs {
8,286✔
4118
                        switch v := addr.(type) {
5,046✔
4119
                        case *net.IPNet:
5,046✔
4120
                                ip = v.IP
5,046✔
4121
                        case *net.IPAddr:
×
4122
                                ip = v.IP
×
4123
                        }
4124
                        ipStr := ip.String()
5,046✔
4125
                        // Skip non global unicast addresses
5,046✔
4126
                        if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
7,932✔
4127
                                ip = nil
2,886✔
4128
                                continue
2,886✔
4129
                        }
4130
                        s.Debugf("  ip=%s", ipStr)
2,160✔
4131
                        ips = append(ips, ipStr)
2,160✔
4132
                        if !all {
2,868✔
4133
                                break
708✔
4134
                        }
4135
                }
4136
        }
4137
        return true, ips, nil
1,080✔
4138
}
4139

4140
// if the ip is not specified, attempt to resolve it
4141
func resolveHostPorts(addr net.Listener) []string {
15✔
4142
        hostPorts := make([]string, 0)
15✔
4143
        hp := addr.Addr().(*net.TCPAddr)
15✔
4144
        port := strconv.Itoa(hp.Port)
15✔
4145
        if hp.IP.IsUnspecified() {
22✔
4146
                var ip net.IP
7✔
4147
                ifaces, _ := net.Interfaces()
7✔
4148
                for _, i := range ifaces {
28✔
4149
                        addrs, _ := i.Addrs()
21✔
4150
                        for _, addr := range addrs {
56✔
4151
                                switch v := addr.(type) {
35✔
4152
                                case *net.IPNet:
35✔
4153
                                        ip = v.IP
35✔
4154
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
35✔
4155
                                case *net.IPAddr:
×
4156
                                        ip = v.IP
×
4157
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
×
4158
                                default:
×
4159
                                        continue
×
4160
                                }
4161
                        }
4162
                }
4163
        } else {
8✔
4164
                hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
8✔
4165
        }
8✔
4166
        return hostPorts
15✔
4167
}
4168

4169
// format the address of a net.Listener with a protocol
4170
func formatURL(protocol string, addr net.Listener) []string {
15✔
4171
        hostports := resolveHostPorts(addr)
15✔
4172
        for i, hp := range hostports {
58✔
4173
                hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
43✔
4174
        }
43✔
4175
        return hostports
15✔
4176
}
4177

4178
// Ports describes URLs that the server can be contacted in
4179
type Ports struct {
4180
        Nats       []string `json:"nats,omitempty"`
4181
        Monitoring []string `json:"monitoring,omitempty"`
4182
        Cluster    []string `json:"cluster,omitempty"`
4183
        Profile    []string `json:"profile,omitempty"`
4184
        WebSocket  []string `json:"websocket,omitempty"`
4185
}
4186

4187
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
4188
// resolved, it returns nil. Otherwise it returns a Ports struct
4189
// describing ports where the server can be contacted
4190
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
6✔
4191
        if s.readyForListeners(maxWait) {
12✔
4192
                opts := s.getOpts()
6✔
4193

6✔
4194
                s.mu.RLock()
6✔
4195
                tls := s.info.TLSRequired
6✔
4196
                listener := s.listener
6✔
4197
                httpListener := s.http
6✔
4198
                clusterListener := s.routeListener
6✔
4199
                profileListener := s.profiler
6✔
4200
                wsListener := s.websocket.listener
6✔
4201
                wss := s.websocket.tls
6✔
4202
                s.mu.RUnlock()
6✔
4203

6✔
4204
                ports := Ports{}
6✔
4205

6✔
4206
                if listener != nil {
12✔
4207
                        natsProto := "nats"
6✔
4208
                        if tls {
7✔
4209
                                natsProto = "tls"
1✔
4210
                        }
1✔
4211
                        ports.Nats = formatURL(natsProto, listener)
6✔
4212
                }
4213

4214
                if httpListener != nil {
8✔
4215
                        monProto := "http"
2✔
4216
                        if opts.HTTPSPort != 0 {
2✔
4217
                                monProto = "https"
×
4218
                        }
×
4219
                        ports.Monitoring = formatURL(monProto, httpListener)
2✔
4220
                }
4221

4222
                if clusterListener != nil {
8✔
4223
                        clusterProto := "nats"
2✔
4224
                        if opts.Cluster.TLSConfig != nil {
2✔
4225
                                clusterProto = "tls"
×
4226
                        }
×
4227
                        ports.Cluster = formatURL(clusterProto, clusterListener)
2✔
4228
                }
4229

4230
                if profileListener != nil {
8✔
4231
                        ports.Profile = formatURL("http", profileListener)
2✔
4232
                }
2✔
4233

4234
                if wsListener != nil {
9✔
4235
                        protocol := wsSchemePrefix
3✔
4236
                        if wss {
6✔
4237
                                protocol = wsSchemePrefixTLS
3✔
4238
                        }
3✔
4239
                        ports.WebSocket = formatURL(protocol, wsListener)
3✔
4240
                }
4241

4242
                return &ports
6✔
4243
        }
4244

4245
        return nil
×
4246
}
4247

4248
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
4249
// path is used instead of the server option value
4250
func (s *Server) portFile(dirHint string) string {
6✔
4251
        dirname := s.getOpts().PortsFileDir
6✔
4252
        if dirHint != "" {
12✔
4253
                dirname = dirHint
6✔
4254
        }
6✔
4255
        if dirname == _EMPTY_ {
6✔
4256
                return _EMPTY_
×
4257
        }
×
4258
        return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
6✔
4259
}
4260

4261
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
4262
// path is used instead of the server option value
4263
func (s *Server) deletePortsFile(hintDir string) {
3✔
4264
        portsFile := s.portFile(hintDir)
3✔
4265
        if portsFile != "" {
6✔
4266
                if err := os.Remove(portsFile); err != nil {
3✔
4267
                        s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
×
4268
                }
×
4269
        }
4270
}
4271

4272
// Writes a file with a serialized Ports to the specified ports_file_dir.
4273
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
4274
// if ports file is not set, this function has no effect
4275
func (s *Server) logPorts() {
3✔
4276
        opts := s.getOpts()
3✔
4277
        portsFile := s.portFile(opts.PortsFileDir)
3✔
4278
        if portsFile != _EMPTY_ {
6✔
4279
                go func() {
6✔
4280
                        info := s.PortsInfo(5 * time.Second)
3✔
4281
                        if info == nil {
3✔
4282
                                s.Errorf("Unable to resolve the ports in the specified time")
×
4283
                                return
×
4284
                        }
×
4285
                        data, err := json.Marshal(info)
3✔
4286
                        if err != nil {
3✔
4287
                                s.Errorf("Error marshaling ports file: %v", err)
×
4288
                                return
×
4289
                        }
×
4290
                        if err := os.WriteFile(portsFile, data, 0666); err != nil {
3✔
4291
                                s.Errorf("Error writing ports file (%s): %v", portsFile, err)
×
4292
                                return
×
4293
                        }
×
4294

4295
                }()
4296
        }
4297
}
4298

4299
// waits until a calculated list of listeners is resolved or a timeout
4300
func (s *Server) readyForListeners(dur time.Duration) bool {
6✔
4301
        end := time.Now().Add(dur)
6✔
4302
        for time.Now().Before(end) {
13✔
4303
                s.mu.RLock()
7✔
4304
                listeners := s.serviceListeners()
7✔
4305
                s.mu.RUnlock()
7✔
4306
                if len(listeners) == 0 {
7✔
4307
                        return false
×
4308
                }
×
4309

4310
                ok := true
7✔
4311
                for _, l := range listeners {
24✔
4312
                        if l == nil {
18✔
4313
                                ok = false
1✔
4314
                                break
1✔
4315
                        }
4316
                }
4317
                if ok {
13✔
4318
                        return true
6✔
4319
                }
6✔
4320
                select {
1✔
4321
                case <-s.quitCh:
×
4322
                        return false
×
4323
                case <-time.After(25 * time.Millisecond):
1✔
4324
                        // continue - unable to select from quit - we are still running
4325
                }
4326
        }
4327
        return false
×
4328
}
4329

4330
// returns a list of listeners that are intended for the process
4331
// if the entry is nil, the interface is yet to be resolved
4332
func (s *Server) serviceListeners() []net.Listener {
7✔
4333
        listeners := make([]net.Listener, 0)
7✔
4334
        opts := s.getOpts()
7✔
4335
        listeners = append(listeners, s.listener)
7✔
4336
        if opts.Cluster.Port != 0 {
10✔
4337
                listeners = append(listeners, s.routeListener)
3✔
4338
        }
3✔
4339
        if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
10✔
4340
                listeners = append(listeners, s.http)
3✔
4341
        }
3✔
4342
        if opts.ProfPort != 0 {
10✔
4343
                listeners = append(listeners, s.profiler)
3✔
4344
        }
3✔
4345
        if opts.Websocket.Port != 0 {
11✔
4346
                listeners = append(listeners, s.websocket.listener)
4✔
4347
        }
4✔
4348
        return listeners
7✔
4349
}
4350

4351
// Returns true if in lame duck mode.
4352
func (s *Server) isLameDuckMode() bool {
21,004✔
4353
        s.mu.RLock()
21,004✔
4354
        defer s.mu.RUnlock()
21,004✔
4355
        return s.ldm
21,004✔
4356
}
21,004✔
4357

4358
// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
4359
// the client listener is closed, existing client connections are
4360
// kicked, Raft leaderships are transferred, JetStream is shutdown
4361
// and then finally shutdown the the NATS Server itself.
4362
// This function blocks and will not return until the NATS Server
4363
// has completed the entire shutdown operation.
4364
func (s *Server) LameDuckShutdown() {
1✔
4365
        s.lameDuckMode()
1✔
4366
}
1✔
4367

4368
// This function will close the client listener then close the clients
4369
// at some interval to avoid a reconnect storm.
4370
// We will also transfer any raft leaders and shutdown JetStream.
4371
func (s *Server) lameDuckMode() {
6✔
4372
        s.mu.Lock()
6✔
4373
        // Check if there is actually anything to do
6✔
4374
        if s.isShuttingDown() || s.ldm || s.listener == nil {
6✔
4375
                s.mu.Unlock()
×
4376
                return
×
4377
        }
×
4378
        s.Noticef("Entering lame duck mode, stop accepting new clients")
6✔
4379
        s.ldm = true
6✔
4380
        s.sendLDMShutdownEventLocked()
6✔
4381
        expected := 1
6✔
4382
        s.listener.Close()
6✔
4383
        s.listener = nil
6✔
4384
        expected += s.closeWebsocketServer()
6✔
4385
        s.ldmCh = make(chan bool, expected)
6✔
4386
        opts := s.getOpts()
6✔
4387
        gp := opts.LameDuckGracePeriod
6✔
4388
        // For tests, we want the grace period to be in some cases bigger
6✔
4389
        // than the ldm duration, so to by-pass the validateOptions() check,
6✔
4390
        // we use negative number and flip it here.
6✔
4391
        if gp < 0 {
11✔
4392
                gp *= -1
5✔
4393
        }
5✔
4394
        s.mu.Unlock()
6✔
4395

6✔
4396
        // If we are running any raftNodes transfer leaders.
6✔
4397
        if hadTransfers := s.transferRaftLeaders(); hadTransfers {
11✔
4398
                // They will transfer leadership quickly, but wait here for a second.
5✔
4399
                select {
5✔
4400
                case <-time.After(time.Second):
5✔
4401
                case <-s.quitCh:
×
4402
                        return
×
4403
                }
4404
        }
4405

4406
        // Now check and shutdown jetstream.
4407
        s.shutdownJetStream()
6✔
4408

6✔
4409
        // Now shutdown the nodes
6✔
4410
        s.shutdownRaftNodes()
6✔
4411

6✔
4412
        // Wait for accept loops to be done to make sure that no new
6✔
4413
        // client can connect
6✔
4414
        for i := 0; i < expected; i++ {
12✔
4415
                <-s.ldmCh
6✔
4416
        }
6✔
4417

4418
        s.mu.Lock()
6✔
4419
        // Need to recheck few things
6✔
4420
        if s.isShuttingDown() || len(s.clients) == 0 {
7✔
4421
                s.mu.Unlock()
1✔
4422
                // If there is no client, we need to call Shutdown() to complete
1✔
4423
                // the LDMode. If server has been shutdown while lock was released,
1✔
4424
                // calling Shutdown() should be no-op.
1✔
4425
                s.Shutdown()
1✔
4426
                return
1✔
4427
        }
1✔
4428
        dur := int64(opts.LameDuckDuration)
5✔
4429
        dur -= int64(gp)
5✔
4430
        if dur <= 0 {
9✔
4431
                dur = int64(time.Second)
4✔
4432
        }
4✔
4433
        numClients := int64(len(s.clients))
5✔
4434
        batch := 1
5✔
4435
        // Sleep interval between each client connection close.
5✔
4436
        var si int64
5✔
4437
        if numClients != 0 {
10✔
4438
                si = dur / numClients
5✔
4439
        }
5✔
4440
        if si < 1 {
5✔
4441
                // Should not happen (except in test with very small LD duration), but
×
4442
                // if there are too many clients, batch the number of close and
×
4443
                // use a tiny sleep interval that will result in yield likely.
×
4444
                si = 1
×
4445
                batch = int(numClients / dur)
×
4446
        } else if si > int64(time.Second) {
6✔
4447
                // Conversely, there is no need to sleep too long between clients
1✔
4448
                // and spread say 10 clients for the 2min duration. Sleeping no
1✔
4449
                // more than 1sec.
1✔
4450
                si = int64(time.Second)
1✔
4451
        }
1✔
4452

4453
        // Now capture all clients
4454
        clients := make([]*client, 0, len(s.clients))
5✔
4455
        for _, client := range s.clients {
18✔
4456
                clients = append(clients, client)
13✔
4457
        }
13✔
4458
        // Now that we know that no new client can be accepted,
4459
        // send INFO to routes and clients to notify this state.
4460
        s.sendLDMToRoutes()
5✔
4461
        s.sendLDMToClients()
5✔
4462
        s.mu.Unlock()
5✔
4463

5✔
4464
        t := time.NewTimer(gp)
5✔
4465
        // Delay start of closing of client connections in case
5✔
4466
        // we have several servers that we want to signal to enter LD mode
5✔
4467
        // and not have their client reconnect to each other.
5✔
4468
        select {
5✔
4469
        case <-t.C:
5✔
4470
                s.Noticef("Closing existing clients")
5✔
4471
        case <-s.quitCh:
×
4472
                t.Stop()
×
4473
                return
×
4474
        }
4475
        for i, client := range clients {
18✔
4476
                client.closeConnection(ServerShutdown)
13✔
4477
                if i == len(clients)-1 {
18✔
4478
                        break
5✔
4479
                }
4480
                if batch == 1 || i%batch == 0 {
16✔
4481
                        // We pick a random interval which will be at least si/2
8✔
4482
                        v := rand.Int63n(si)
8✔
4483
                        if v < si/2 {
10✔
4484
                                v = si / 2
2✔
4485
                        }
2✔
4486
                        t.Reset(time.Duration(v))
8✔
4487
                        // Sleep for given interval or bail out if kicked by Shutdown().
8✔
4488
                        select {
8✔
4489
                        case <-t.C:
8✔
4490
                        case <-s.quitCh:
×
4491
                                t.Stop()
×
4492
                                return
×
4493
                        }
4494
                }
4495
        }
4496
        s.Shutdown()
5✔
4497
        s.WaitForShutdown()
5✔
4498
}
4499

4500
// Send an INFO update to routes with the indication that this server is in LDM mode.
4501
// Server lock is held on entry.
4502
func (s *Server) sendLDMToRoutes() {
5✔
4503
        s.routeInfo.LameDuckMode = true
5✔
4504
        infoJSON := generateInfoJSON(&s.routeInfo)
5✔
4505
        s.forEachRemote(func(r *client) {
14✔
4506
                r.mu.Lock()
9✔
4507
                r.enqueueProto(infoJSON)
9✔
4508
                r.mu.Unlock()
9✔
4509
        })
9✔
4510
        // Clear now so that we notify only once, should we have to send other INFOs.
4511
        s.routeInfo.LameDuckMode = false
5✔
4512
}
4513

4514
// Send an INFO update to clients with the indication that this server is in
4515
// LDM mode and with only URLs of other nodes.
4516
// Server lock is held on entry.
4517
func (s *Server) sendLDMToClients() {
5✔
4518
        s.info.LameDuckMode = true
5✔
4519
        // Clear this so that if there are further updates, we don't send our URLs.
5✔
4520
        s.clientConnectURLs = s.clientConnectURLs[:0]
5✔
4521
        if s.websocket.connectURLs != nil {
5✔
4522
                s.websocket.connectURLs = s.websocket.connectURLs[:0]
×
4523
        }
×
4524
        // Reset content first.
4525
        s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
5✔
4526
        s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
5✔
4527
        // Only add the other nodes if we are allowed to.
5✔
4528
        if !s.getOpts().Cluster.NoAdvertise {
10✔
4529
                for url := range s.clientConnectURLsMap {
15✔
4530
                        s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
10✔
4531
                }
10✔
4532
                for url := range s.websocket.connectURLsMap {
5✔
4533
                        s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
×
4534
                }
×
4535
        }
4536
        // Send to all registered clients that support async INFO protocols.
4537
        s.sendAsyncInfoToClients(true, true)
5✔
4538
        // We now clear the info.LameDuckMode flag so that if there are
5✔
4539
        // cluster updates and we send the INFO, we don't have the boolean
5✔
4540
        // set which would cause multiple LDM notifications to clients.
5✔
4541
        s.info.LameDuckMode = false
5✔
4542
}
4543

4544
// If given error is a net.Error and is temporary, sleeps for the given
4545
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
4546
// interrupted if the server is shutdown.
4547
// An error message is displayed depending on the type of error.
4548
// Returns the new (or unchanged) delay, or a negative value if the
4549
// server has been or is being shutdown.
4550
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
16,283✔
4551
        if !s.isRunning() {
32,566✔
4552
                return -1
16,283✔
4553
        }
16,283✔
4554
        //lint:ignore SA1019 We want to retry on a bunch of errors here.
4555
        if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
×
4556
                s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
×
4557
                select {
×
4558
                case <-time.After(tmpDelay):
×
4559
                case <-s.quitCh:
×
4560
                        return -1
×
4561
                }
4562
                tmpDelay *= 2
×
4563
                if tmpDelay > ACCEPT_MAX_SLEEP {
×
4564
                        tmpDelay = ACCEPT_MAX_SLEEP
×
4565
                }
×
4566
        } else {
×
4567
                s.Errorf("%s Accept error: %v", acceptName, err)
×
4568
        }
×
4569
        return tmpDelay
×
4570
}
4571

4572
var errNoIPAvail = errors.New("no IP available")
4573

4574
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
119,775✔
4575
        host, port, err := net.SplitHostPort(url)
119,775✔
4576
        if err != nil {
119,775✔
4577
                return "", err
×
4578
        }
×
4579
        // If already an IP, skip.
4580
        if net.ParseIP(host) != nil {
239,393✔
4581
                return url, nil
119,618✔
4582
        }
119,618✔
4583
        ips, err := resolver.LookupHost(context.Background(), host)
157✔
4584
        if err != nil {
166✔
4585
                return "", fmt.Errorf("lookup for host %q: %v", host, err)
9✔
4586
        }
9✔
4587
        if len(excludedAddresses) > 0 {
186✔
4588
                for i := 0; i < len(ips); i++ {
106✔
4589
                        ip := ips[i]
68✔
4590
                        addr := net.JoinHostPort(ip, port)
68✔
4591
                        if _, excluded := excludedAddresses[addr]; excluded {
82✔
4592
                                if len(ips) == 1 {
19✔
4593
                                        ips = nil
5✔
4594
                                        break
5✔
4595
                                }
4596
                                ips[i] = ips[len(ips)-1]
9✔
4597
                                ips = ips[:len(ips)-1]
9✔
4598
                                i--
9✔
4599
                        }
4600
                }
4601
                if len(ips) == 0 {
43✔
4602
                        return "", errNoIPAvail
5✔
4603
                }
5✔
4604
        }
4605
        var address string
143✔
4606
        if len(ips) == 0 {
144✔
4607
                s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
1✔
4608
                address = url
1✔
4609
        } else {
143✔
4610
                var ip string
142✔
4611
                if len(ips) == 1 {
162✔
4612
                        ip = ips[0]
20✔
4613
                } else {
142✔
4614
                        ip = ips[rand.Int31n(int32(len(ips)))]
122✔
4615
                }
122✔
4616
                // add the port
4617
                address = net.JoinHostPort(ip, port)
142✔
4618
        }
4619
        return address, nil
143✔
4620
}
4621

4622
// Returns true for the first attempt and depending on the nature
4623
// of the attempt (first connect or a reconnect), when the number
4624
// of attempts is equal to the configured report attempts.
4625
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
69,896✔
4626
        opts := s.getOpts()
69,896✔
4627
        if firstConnect {
110,038✔
4628
                if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
49,419✔
4629
                        return true
9,277✔
4630
                }
9,277✔
4631
                return false
30,865✔
4632
        }
4633
        if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
59,508✔
4634
                return true
29,754✔
4635
        }
29,754✔
4636
        return false
×
4637
}
4638

4639
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
3,032✔
4640
        s.updateRouteSubscriptionMap(acc, sub, delta)
3,032✔
4641
        if s.gateway.enabled {
3,689✔
4642
                s.gatewayUpdateSubInterest(acc.Name, sub, delta)
657✔
4643
        }
657✔
4644

4645
        acc.updateLeafNodes(sub, delta)
3,032✔
4646
}
4647

4648
func (s *Server) startRateLimitLogExpiration() {
6,715✔
4649
        interval := time.Second
6,715✔
4650
        s.startGoRoutine(func() {
13,430✔
4651
                defer s.grWG.Done()
6,715✔
4652

6,715✔
4653
                ticker := time.NewTicker(time.Second)
6,715✔
4654
                defer ticker.Stop()
6,715✔
4655
                for {
25,891✔
4656
                        select {
19,176✔
4657
                        case <-s.quitCh:
6,711✔
4658
                                return
6,711✔
4659
                        case interval = <-s.rateLimitLoggingCh:
×
4660
                                ticker.Reset(interval)
×
4661
                        case <-ticker.C:
12,461✔
4662
                                s.rateLimitLogging.Range(func(k, v any) bool {
15,735✔
4663
                                        start := v.(time.Time)
3,274✔
4664
                                        if time.Since(start) >= interval {
4,629✔
4665
                                                s.rateLimitLogging.Delete(k)
1,355✔
4666
                                        }
1,355✔
4667
                                        return true
3,274✔
4668
                                })
4669
                        }
4670
                }
4671
        })
4672
}
4673

4674
func (s *Server) changeRateLimitLogInterval(d time.Duration) {
×
4675
        if d <= 0 {
×
4676
                return
×
4677
        }
×
4678
        select {
×
4679
        case s.rateLimitLoggingCh <- d:
×
4680
        default:
×
4681
        }
4682
}
4683

4684
// DisconnectClientByID disconnects a client by connection ID
4685
func (s *Server) DisconnectClientByID(id uint64) error {
2✔
4686
        if s == nil {
2✔
4687
                return ErrServerNotRunning
×
4688
        }
×
4689
        if client := s.getClient(id); client != nil {
3✔
4690
                client.closeConnection(Kicked)
1✔
4691
                return nil
1✔
4692
        } else if client = s.GetLeafNode(id); client != nil {
3✔
4693
                client.closeConnection(Kicked)
1✔
4694
                return nil
1✔
4695
        }
1✔
4696
        return errors.New("no such client or leafnode id")
×
4697
}
4698

4699
// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
4700
func (s *Server) LDMClientByID(id uint64) error {
1✔
4701
        if s == nil {
1✔
4702
                return ErrServerNotRunning
×
4703
        }
×
4704
        s.mu.RLock()
1✔
4705
        c := s.clients[id]
1✔
4706
        if c == nil {
1✔
4707
                s.mu.RUnlock()
×
4708
                return errors.New("no such client id")
×
4709
        }
×
4710
        info := s.copyInfo()
1✔
4711
        info.LameDuckMode = true
1✔
4712
        s.mu.RUnlock()
1✔
4713
        c.mu.Lock()
1✔
4714
        defer c.mu.Unlock()
1✔
4715
        if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
2✔
4716
                // sendInfo takes care of checking if the connection is still
1✔
4717
                // valid or not, so don't duplicate tests here.
1✔
4718
                c.Debugf("Sending Lame Duck Mode info to client")
1✔
4719
                c.enqueueProto(c.generateClientInfoJSON(info))
1✔
4720
                return nil
1✔
4721
        } else {
1✔
4722
                return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
×
4723
        }
×
4724
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc