• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 20326380621

17 Dec 2025 03:32PM UTC coverage: 84.522% (-0.05%) from 84.574%
20326380621

push

github

web-flow
NRG: Fix single node election (#7642)

This commit fixes single node election: previously, a single node would
simply store its vote, and never check if it already reached a majority.
So it would never transition to the leader state.

Signed-off-by: Daniele Sciascia <daniele@nats.io>

73985 of 87533 relevant lines covered (84.52%)

339454.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.91
/server/server.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "crypto/fips140"
20
        "crypto/tls"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math/rand"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path"
33
        "path/filepath"
34
        "reflect"
35
        "regexp"
36
        "runtime"
37
        "runtime/pprof"
38
        "strconv"
39
        "strings"
40
        "sync"
41
        "sync/atomic"
42
        "time"
43

44
        // Allow dynamic profiling.
45
        _ "net/http/pprof"
46

47
        "expvar"
48

49
        "github.com/klauspost/compress/s2"
50
        "github.com/nats-io/jwt/v2"
51
        "github.com/nats-io/nats-server/v2/logger"
52
        "github.com/nats-io/nkeys"
53
        "github.com/nats-io/nuid"
54
)
55

56
const (
57
        // Interval for the first PING for non client connections.
58
        firstPingInterval = time.Second
59

60
        // This is for the first ping for client connections.
61
        firstClientPingInterval = 2 * time.Second
62
)
63

64
// These are protocol versions sent between server connections: ROUTER, LEAF and
65
// GATEWAY. We may have protocol versions that have a meaning only for a certain
66
// type of connections, but we don't have to have separate enums for that.
67
// However, it is CRITICAL to not change the order of those constants since they
68
// are exchanged between servers. When adding a new protocol version, add to the
69
// end of the list, don't try to group them by connection types.
70
const (
71
        // RouteProtoZero is the original Route protocol from 2009.
72
        // http://nats.io/documentation/internals/nats-protocol/
73
        RouteProtoZero = iota
74
        // RouteProtoInfo signals a route can receive more then the original INFO block.
75
        // This can be used to update remote cluster permissions, etc...
76
        RouteProtoInfo
77
        // RouteProtoV2 is the new route/cluster protocol that provides account support.
78
        RouteProtoV2
79
        // MsgTraceProto indicates that this server understands distributed message tracing.
80
        MsgTraceProto
81
)
82

83
// Will return the latest server-to-server protocol versions, unless the
84
// option to override it is set.
85
func (s *Server) getServerProto() int {
10,168✔
86
        opts := s.getOpts()
10,168✔
87
        // Initialize with the latest protocol version.
10,168✔
88
        proto := MsgTraceProto
10,168✔
89
        // For tests, we want to be able to make this server behave
10,168✔
90
        // as an older server so check this option to see if we should override.
10,168✔
91
        if opts.overrideProto < 0 {
10,178✔
92
                // The option overrideProto is set to 0 by default (when creating an
10✔
93
                // Options structure). Since this is the same value than the original
10✔
94
                // proto RouteProtoZero, tests call setServerProtoForTest() with the
10✔
95
                // desired protocol level, which sets it as negative value equal to:
10✔
96
                // (wantedProto + 1) * -1. Here we compute back the real value.
10✔
97
                proto = (opts.overrideProto * -1) - 1
10✔
98
        }
10✔
99
        return proto
10,168✔
100
}
101

102
// Used by tests.
103
func setServerProtoForTest(wantedProto int) int {
5✔
104
        return (wantedProto + 1) * -1
5✔
105
}
5✔
106

107
// Info is the information sent to clients, routes, gateways, and leaf nodes,
108
// to help them understand information about this server.
109
type Info struct {
110
        ID                string   `json:"server_id"`
111
        Name              string   `json:"server_name"`
112
        Version           string   `json:"version"`
113
        Proto             int      `json:"proto"`
114
        GitCommit         string   `json:"git_commit,omitempty"`
115
        GoVersion         string   `json:"go"`
116
        Host              string   `json:"host"`
117
        Port              int      `json:"port"`
118
        Headers           bool     `json:"headers"`
119
        AuthRequired      bool     `json:"auth_required,omitempty"`
120
        TLSRequired       bool     `json:"tls_required,omitempty"`
121
        TLSVerify         bool     `json:"tls_verify,omitempty"`
122
        TLSAvailable      bool     `json:"tls_available,omitempty"`
123
        MaxPayload        int32    `json:"max_payload"`
124
        JetStream         bool     `json:"jetstream,omitempty"`
125
        IP                string   `json:"ip,omitempty"`
126
        CID               uint64   `json:"client_id,omitempty"`
127
        ClientIP          string   `json:"client_ip,omitempty"`
128
        Nonce             string   `json:"nonce,omitempty"`
129
        Cluster           string   `json:"cluster,omitempty"`
130
        Dynamic           bool     `json:"cluster_dynamic,omitempty"`
131
        Domain            string   `json:"domain,omitempty"`
132
        ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
133
        WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
134
        LameDuckMode      bool     `json:"ldm,omitempty"`
135
        Compression       string   `json:"compression,omitempty"`
136
        ConnectInfo       bool     `json:"connect_info,omitempty"`   // When true this is the server INFO response to CONNECT
137
        RemoteAccount     string   `json:"remote_account,omitempty"` // Lets the client or leafnode side know the remote account that they bind to.
138
        IsSystemAccount   bool     `json:"acc_is_sys,omitempty"`     // Indicates if the account is a system account.
139
        JSApiLevel        int      `json:"api_lvl,omitempty"`
140

141
        // Route Specific
142
        Import        *SubjectPermission `json:"import,omitempty"`
143
        Export        *SubjectPermission `json:"export,omitempty"`
144
        LNOC          bool               `json:"lnoc,omitempty"`
145
        LNOCU         bool               `json:"lnocu,omitempty"`
146
        InfoOnConnect bool               `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
147
        RoutePoolSize int                `json:"route_pool_size,omitempty"`
148
        RoutePoolIdx  int                `json:"route_pool_idx,omitempty"`
149
        RouteAccount  string             `json:"route_account,omitempty"`
150
        RouteAccReqID string             `json:"route_acc_add_reqid,omitempty"`
151
        GossipMode    byte               `json:"gossip_mode,omitempty"`
152

153
        // Gateways Specific
154
        Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
155
        GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
156
        GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
157
        GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
158
        GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
159
        GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies
160
        GatewayIOM        bool     `json:"gateway_iom,omitempty"`         // Indicate that all accounts will be switched to InterestOnly mode "right away"
161

162
        // LeafNode Specific
163
        LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
164

165
        XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
166
}
167

168
// Server is our main struct.
169
type Server struct {
170
        // Fields accessed with atomic operations need to be 64-bit aligned
171
        gcid uint64
172
        // How often user logon fails due to the issuer account not being pinned.
173
        pinnedAccFail uint64
174
        stats
175
        scStats
176
        staleStats
177
        mu                  sync.RWMutex
178
        reloadMu            sync.RWMutex // Write-locked when a config reload is taking place ONLY
179
        kp                  nkeys.KeyPair
180
        xkp                 nkeys.KeyPair
181
        xpub                string
182
        info                Info
183
        configFile          string
184
        optsMu              sync.RWMutex
185
        opts                *Options
186
        running             atomic.Bool
187
        shutdown            atomic.Bool
188
        listener            net.Listener
189
        listenerErr         error
190
        gacc                *Account
191
        sys                 *internal
192
        sysAcc              atomic.Pointer[Account]
193
        js                  atomic.Pointer[jetStream]
194
        isMetaLeader        atomic.Bool
195
        jsClustered         atomic.Bool
196
        accounts            sync.Map
197
        tmpAccounts         sync.Map // Temporarily stores accounts that are being built
198
        activeAccounts      int32
199
        accResolver         AccountResolver
200
        clients             map[uint64]*client
201
        routes              map[string][]*client
202
        remoteRoutePoolSize map[string]int                // Map for remote's configure route pool size
203
        routesPoolSize      int                           // Configured pool size
204
        routesReject        bool                          // During reload, we may want to reject adding routes until some conditions are met
205
        routesNoPool        int                           // Number of routes that don't use pooling (connecting to older server for instance)
206
        accRoutes           map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
207
        accRouteByHash      sync.Map                      // Key is account name, value is nil or a pool index
208
        accAddedCh          chan struct{}
209
        accAddedReqID       string
210
        leafs               map[uint64]*client
211
        users               map[string]*User
212
        nkeys               map[string]*NkeyUser
213
        totalClients        uint64
214
        closed              *closedRingBuffer
215
        done                chan bool
216
        start               time.Time
217
        http                net.Listener
218
        httpHandler         http.Handler
219
        httpBasePath        string
220
        profiler            net.Listener
221
        httpReqStats        map[string]uint64
222
        routeListener       net.Listener
223
        routeListenerErr    error
224
        routeInfo           Info
225
        routeResolver       netResolver
226
        routesToSelf        map[string]struct{}
227
        routeTLSName        string
228
        leafNodeListener    net.Listener
229
        leafNodeListenerErr error
230
        leafNodeInfo        Info
231
        leafNodeInfoJSON    []byte
232
        leafURLsMap         refCountedUrlSet
233
        leafNodeOpts        struct {
234
                resolver    netResolver
235
                dialTimeout time.Duration
236
        }
237
        leafRemoteCfgs     []*leafNodeCfg
238
        leafRemoteAccounts sync.Map
239
        leafNodeEnabled    bool
240
        leafDisableConnect bool // Used in test only
241
        leafNoCluster      bool // Indicate that this server has only remotes and no cluster defined
242

243
        quitCh           chan struct{}
244
        startupComplete  chan struct{}
245
        shutdownComplete chan struct{}
246

247
        // Tracking Go routines
248
        grMu         sync.Mutex
249
        grTmpClients map[uint64]*client
250
        grRunning    bool
251
        grWG         sync.WaitGroup // to wait on various go routines
252

253
        cproto     int64     // number of clients supporting async INFO
254
        configTime time.Time // last time config was loaded
255

256
        logging struct {
257
                sync.RWMutex
258
                logger      Logger
259
                trace       int32
260
                debug       int32
261
                traceSysAcc int32
262
        }
263

264
        clientConnectURLs []string
265

266
        // Used internally for quick look-ups.
267
        clientConnectURLsMap refCountedUrlSet
268

269
        // For Gateways
270
        gatewayListener    net.Listener // Accept listener
271
        gatewayListenerErr error
272
        gateway            *srvGateway
273

274
        // Used by tests to check that http.Servers do
275
        // not set any timeout.
276
        monitoringServer *http.Server
277
        profilingServer  *http.Server
278

279
        // LameDuck mode
280
        ldm   bool
281
        ldmCh chan bool
282

283
        // Trusted public operator keys.
284
        trustedKeys []string
285
        // map of trusted keys to operator setting StrictSigningKeyUsage
286
        strictSigningKeyUsage map[string]struct{}
287

288
        // We use this to minimize mem copies for requests to monitoring
289
        // endpoint /varz (when it comes from http).
290
        varzMu sync.Mutex
291
        varz   *Varz
292
        // This is set during a config reload if we detect that we have
293
        // added/removed routes. The monitoring code then check that
294
        // to know if it should update the cluster's URLs array.
295
        varzUpdateRouteURLs bool
296

297
        // Keeps a sublist of of subscriptions attached to leafnode connections
298
        // for the $GNR.*.*.*.> subject so that a server can send back a mapped
299
        // gateway reply.
300
        gwLeafSubs *Sublist
301

302
        // Used for expiration of mapped GW replies
303
        gwrm struct {
304
                w  int32
305
                ch chan time.Duration
306
                m  sync.Map
307
        }
308

309
        // For eventIDs
310
        eventIds *nuid.NUID
311

312
        // Websocket structure
313
        websocket srvWebsocket
314

315
        // MQTT structure
316
        mqtt srvMQTT
317

318
        // OCSP monitoring
319
        ocsps []*OCSPMonitor
320

321
        // OCSP peer verification (at least one TLS block)
322
        ocspPeerVerify bool
323

324
        // OCSP response cache
325
        ocsprc OCSPResponseCache
326

327
        // exporting account name the importer experienced issues with
328
        incompleteAccExporterMap sync.Map
329

330
        // Holds cluster name under different lock for mapping
331
        cnMu sync.RWMutex
332
        cn   string
333

334
        // For registering raft nodes with the server.
335
        rnMu      sync.RWMutex
336
        raftNodes map[string]RaftNode
337

338
        // For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
339
        nodeToInfo sync.Map
340

341
        // For out of resources to not log errors too fast.
342
        rerrMu   sync.Mutex
343
        rerrLast time.Time
344

345
        connRateCounter *rateCounter
346

347
        // If there is a system account configured, to still support the $G account,
348
        // the server will create a fake user and add it to the list of users.
349
        // Keep track of what that user name is for config reload purposes.
350
        sysAccOnlyNoAuthUser string
351

352
        // IPQueues map
353
        ipQueues sync.Map
354

355
        // To limit logging frequency
356
        rateLimitLogging   sync.Map
357
        rateLimitLoggingCh chan time.Duration
358

359
        // Total outstanding catchup bytes in flight.
360
        gcbMu     sync.RWMutex
361
        gcbOut    int64
362
        gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
363
        // A global chanel to kick out stalled catchup sequences.
364
        gcbKick chan struct{}
365

366
        // Total outbound syncRequests
367
        syncOutSem chan struct{}
368

369
        // Queue to process JS API requests that come from routes (or gateways)
370
        jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq]
371

372
        // Delayed API responses.
373
        delayedAPIResponses *ipQueue[*delayedAPIResponse]
374

375
        // Whether moving NRG traffic into accounts is permitted on this server.
376
        // Controls whether or not the account NRG capability is set in statsz.
377
        // Currently used by unit tests to simulate nodes not supporting account NRG.
378
        accountNRGAllowed atomic.Bool
379

380
        // List of proxies trusted keys in `KeyPair` form so we can do signature
381
        // verification when processing incoming proxy connections.
382
        proxiesKeyPairs []nkeys.KeyPair
383
        proxiedConns    map[string]map[uint64]*client
384
}
385

386
// For tracking JS nodes.
387
type nodeInfo struct {
388
        name            string
389
        version         string
390
        cluster         string
391
        domain          string
392
        id              string
393
        tags            jwt.TagList
394
        cfg             *JetStreamConfig
395
        stats           *JetStreamStats
396
        offline         bool
397
        js              bool
398
        binarySnapshots bool
399
        accountNRG      bool
400
}
401

402
type stats struct {
403
        inMsgs           int64
404
        outMsgs          int64
405
        inBytes          int64
406
        outBytes         int64
407
        slowConsumers    int64
408
        staleConnections int64
409
        stalls           int64
410
}
411

412
// scStats includes the total and per connection counters of Slow Consumers.
413
type scStats struct {
414
        clients  atomic.Uint64
415
        routes   atomic.Uint64
416
        leafs    atomic.Uint64
417
        gateways atomic.Uint64
418
}
419

420
// staleStats includes the total and per connection counters of Stale Connections.
421
type staleStats struct {
422
        clients  atomic.Uint64
423
        routes   atomic.Uint64
424
        leafs    atomic.Uint64
425
        gateways atomic.Uint64
426
}
427

428
// This is used by tests so we can run all server tests with a default route
429
// or leafnode compression mode. For instance:
430
// go test -race -v ./server -cluster_compression=fast
431
var (
432
        testDefaultClusterCompression  string
433
        testDefaultLeafNodeCompression string
434
)
435

436
// Compression modes.
437
const (
438
        CompressionNotSupported   = "not supported"
439
        CompressionOff            = "off"
440
        CompressionAccept         = "accept"
441
        CompressionS2Auto         = "s2_auto"
442
        CompressionS2Uncompressed = "s2_uncompressed"
443
        CompressionS2Fast         = "s2_fast"
444
        CompressionS2Better       = "s2_better"
445
        CompressionS2Best         = "s2_best"
446
)
447

448
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
449
// the CompressionS2Auto mode.
450
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
451
        // [0..10ms] -> CompressionS2Uncompressed
452
        10 * time.Millisecond,
453
        // ]10ms..50ms] -> CompressionS2Fast
454
        50 * time.Millisecond,
455
        // ]50ms..100ms] -> CompressionS2Better
456
        100 * time.Millisecond,
457
        // ]100ms..] -> CompressionS2Best
458
}
459

460
// For a given user provided string, matches to one of the compression mode
461
// constant and updates the provided string to that constant. Returns an
462
// error if the provided compression mode is not known.
463
// The parameter `chosenModeForOn` indicates which compression mode to use
464
// when the user selects "on" (or enabled, true, etc..). This is because
465
// we may have different defaults depending on where the compression is used.
466
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
10,823✔
467
        if c == nil {
10,823✔
468
                return nil
×
469
        }
×
470
        cmtl := strings.ToLower(c.Mode)
10,823✔
471
        // First, check for the "on" case so that we set to the default compression
10,823✔
472
        // mode for that. The other switch/case will finish setup if needed (for
10,823✔
473
        // instance if the default mode is s2Auto).
10,823✔
474
        switch cmtl {
10,823✔
475
        case "on", "enabled", "true":
14✔
476
                cmtl = chosenModeForOn
14✔
477
        default:
10,809✔
478
        }
479
        // Check (again) with the proper mode.
480
        switch cmtl {
10,823✔
481
        case "not supported", "not_supported":
3✔
482
                c.Mode = CompressionNotSupported
3✔
483
        case "disabled", "off", "false":
1,762✔
484
                c.Mode = CompressionOff
1,762✔
485
        case "accept":
4,219✔
486
                c.Mode = CompressionAccept
4,219✔
487
        case "auto", "s2_auto":
4,757✔
488
                var rtts []time.Duration
4,757✔
489
                if len(c.RTTThresholds) == 0 {
8,732✔
490
                        rtts = defaultCompressionS2AutoRTTThresholds
3,975✔
491
                } else {
4,757✔
492
                        for _, n := range c.RTTThresholds {
3,138✔
493
                                // Do not error on negative, but simply set to 0
2,356✔
494
                                if n < 0 {
2,360✔
495
                                        n = 0
4✔
496
                                }
4✔
497
                                // Make sure they are properly ordered. However, it is possible
498
                                // to have a "0" anywhere in the list to indicate that this
499
                                // compression level should not be used.
500
                                if l := len(rtts); l > 0 && n != 0 {
3,900✔
501
                                        for _, v := range rtts {
3,882✔
502
                                                if n < v {
2,340✔
503
                                                        return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
2✔
504
                                                }
2✔
505
                                        }
506
                                }
507
                                rtts = append(rtts, n)
2,354✔
508
                        }
509
                        if len(rtts) > 0 {
1,560✔
510
                                // Trim 0 that are at the end.
780✔
511
                                stop := -1
780✔
512
                                for i := len(rtts) - 1; i >= 0; i-- {
1,574✔
513
                                        if rtts[i] != 0 {
1,570✔
514
                                                stop = i
776✔
515
                                                break
776✔
516
                                        }
517
                                }
518
                                rtts = rtts[:stop+1]
780✔
519
                        }
520
                        if len(rtts) > 4 {
782✔
521
                                // There should be at most values for "uncompressed", "fast",
2✔
522
                                // "better" and "best" (when some 0 are present).
2✔
523
                                return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
2✔
524
                        } else if len(rtts) == 0 {
784✔
525
                                // But there should be at least 1 if the user provided the slice.
4✔
526
                                // We would be here only if it was provided by say with values
4✔
527
                                // being a single or all zeros.
4✔
528
                                return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
4✔
529
                        }
4✔
530
                }
531
                c.Mode = CompressionS2Auto
4,749✔
532
                c.RTTThresholds = rtts
4,749✔
533
        case "fast", "s2_fast":
26✔
534
                c.Mode = CompressionS2Fast
26✔
535
        case "better", "s2_better":
27✔
536
                c.Mode = CompressionS2Better
27✔
537
        case "best", "s2_best":
27✔
538
                c.Mode = CompressionS2Best
27✔
539
        default:
2✔
540
                return fmt.Errorf("unsupported compression mode %q", c.Mode)
2✔
541
        }
542
        return nil
10,813✔
543
}
544

545
// Returns `true` if the compression mode `m` indicates that the server
546
// will negotiate compression with the remote server, `false` otherwise.
547
// Note that the provided compression mode is assumed to have been
548
// normalized and validated.
549
func needsCompression(m string) bool {
173,333✔
550
        return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
173,333✔
551
}
173,333✔
552

553
// Compression is asymmetric, meaning that one side can have a different
554
// compression level than the other. However, we need to check for cases
555
// when this server `scm` or the remote `rcm` do not support compression
556
// (say older server, or test to make it behave as it is not), or have
557
// the compression off.
558
// Note that `scm` is assumed to not be "off" or "not supported".
559
func selectCompressionMode(scm, rcm string) (mode string, err error) {
41,887✔
560
        if rcm == CompressionNotSupported || rcm == _EMPTY_ {
41,932✔
561
                return CompressionNotSupported, nil
45✔
562
        }
45✔
563
        switch rcm {
41,842✔
564
        case CompressionOff:
16,630✔
565
                // If the remote explicitly disables compression, then we won't
16,630✔
566
                // use compression.
16,630✔
567
                return CompressionOff, nil
16,630✔
568
        case CompressionAccept:
24,043✔
569
                // If the remote is ok with compression (but is not initiating it),
24,043✔
570
                // and if we too are in this mode, then it means no compression.
24,043✔
571
                if scm == CompressionAccept {
48,068✔
572
                        return CompressionOff, nil
24,025✔
573
                }
24,025✔
574
                // Otherwise use our compression mode.
575
                return scm, nil
18✔
576
        case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
1,169✔
577
                // This case is here to make sure that if we don't recognize a
1,169✔
578
                // compression setting, we error out.
1,169✔
579
                if scm == CompressionAccept {
1,190✔
580
                        // If our compression mode is "accept", then we will use the remote
21✔
581
                        // compression mode, except if it is "auto", in which case we will
21✔
582
                        // default to "fast". This is not a configuration (auto in one
21✔
583
                        // side and accept in the other) that would be recommended.
21✔
584
                        if rcm == CompressionS2Auto {
38✔
585
                                return CompressionS2Fast, nil
17✔
586
                        }
17✔
587
                        // Use their compression mode.
588
                        return rcm, nil
4✔
589
                }
590
                // Otherwise use our compression mode.
591
                return scm, nil
1,148✔
592
        default:
×
593
                return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
×
594
        }
595
}
596

597
// If the configured compression mode is "auto" then will return that,
598
// otherwise will return the given `cm` compression mode.
599
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
17,722✔
600
        if co.Mode == CompressionS2Auto {
18,862✔
601
                return CompressionS2Auto
1,140✔
602
        }
1,140✔
603
        return cm
16,582✔
604
}
605

606
// Given a connection RTT and a list of thresholds durations, this
607
// function will return an S2 compression level such as "uncompressed",
608
// "fast", "better" or "best". For instance, with the following slice:
609
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
610
// in the compression level "uncompressed", ]5ms..10ms] will result in
611
// "fast" compression, etc..
612
// However, the 0 value allows for disabling of some compression levels.
613
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
614
// [0..20ms] would result in the "better" compression - effectively disabling
615
// the use of "uncompressed" and "fast", then anything above 20ms would
616
// result in the use of "best" level (the 30 in the list has no effect
617
// and the list could have been simplified to [0, 0, 20]).
618
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
1,483✔
619
        var idx int
1,483✔
620
        var found bool
1,483✔
621
        for i, d := range rttThresholds {
3,004✔
622
                if rtt <= d {
2,988✔
623
                        idx = i
1,467✔
624
                        found = true
1,467✔
625
                        break
1,467✔
626
                }
627
        }
628
        if !found {
1,499✔
629
                // If we did not find but we have all levels, then use "best",
16✔
630
                // otherwise use the last one in array.
16✔
631
                if l := len(rttThresholds); l >= 3 {
32✔
632
                        idx = 3
16✔
633
                } else {
16✔
634
                        idx = l - 1
×
635
                }
×
636
        }
637
        switch idx {
1,483✔
638
        case 0:
1,463✔
639
                return CompressionS2Uncompressed
1,463✔
640
        case 1:
2✔
641
                return CompressionS2Fast
2✔
642
        case 2:
2✔
643
                return CompressionS2Better
2✔
644
        }
645
        return CompressionS2Best
16✔
646
}
647

648
func compressOptsEqual(c1, c2 *CompressionOpts) bool {
234✔
649
        if c1 == c2 {
234✔
650
                return true
×
651
        }
×
652
        if (c1 == nil && c2 != nil) || (c1 != nil && c2 == nil) {
234✔
653
                return false
×
654
        }
×
655
        if c1.Mode != c2.Mode {
278✔
656
                return false
44✔
657
        }
44✔
658
        // For s2_auto, if one has an empty RTTThresholds, it is equivalent
659
        // to the defaultCompressionS2AutoRTTThresholds array, so compare with that.
660
        if c1.Mode == CompressionS2Auto {
215✔
661
                if len(c1.RTTThresholds) == 0 && !reflect.DeepEqual(c2.RTTThresholds, defaultCompressionS2AutoRTTThresholds) {
25✔
662
                        return false
×
663
                }
×
664
                if len(c2.RTTThresholds) == 0 && !reflect.DeepEqual(c1.RTTThresholds, defaultCompressionS2AutoRTTThresholds) {
25✔
665
                        return false
×
666
                }
×
667
                if !reflect.DeepEqual(c1.RTTThresholds, c2.RTTThresholds) {
50✔
668
                        return false
25✔
669
                }
25✔
670
        }
671
        return true
165✔
672
}
673

674
// Returns an array of s2 WriterOption based on the route compression mode.
675
// So far we return a single option, but this way we can call s2.NewWriter()
676
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
677
// this is more versatile.
678
func s2WriterOptions(cm string) []s2.WriterOption {
1,199✔
679
        _opts := [2]s2.WriterOption{}
1,199✔
680
        opts := append(
1,199✔
681
                _opts[:0],
1,199✔
682
                s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
1,199✔
683
        )
1,199✔
684
        switch cm {
1,199✔
685
        case CompressionS2Uncompressed:
1,132✔
686
                return append(opts, s2.WriterUncompressed())
1,132✔
687
        case CompressionS2Best:
27✔
688
                return append(opts, s2.WriterBestCompression())
27✔
689
        case CompressionS2Better:
13✔
690
                return append(opts, s2.WriterBetterCompression())
13✔
691
        default:
27✔
692
                return nil
27✔
693
        }
694
}
695

696
// New will setup a new server struct after parsing the options.
697
// DEPRECATED: Use NewServer(opts)
698
func New(opts *Options) *Server {
270✔
699
        s, _ := NewServer(opts)
270✔
700
        return s
270✔
701
}
270✔
702

703
func NewServerFromConfig(opts *Options) (*Server, error) {
3✔
704
        if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
6✔
705
                if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
4✔
706
                        return nil, err
1✔
707
                }
1✔
708
        }
709
        return NewServer(opts)
2✔
710
}
711

712
// NewServer will setup a new server struct after parsing the options.
713
// Could return an error if options can not be validated.
714
// The provided Options type should not be re-used afterwards.
715
// Either use Options.Clone() to pass a copy, or make a new one.
716
func NewServer(opts *Options) (*Server, error) {
6,973✔
717
        setBaselineOptions(opts)
6,973✔
718

6,973✔
719
        // Process TLS options, including whether we require client certificates.
6,973✔
720
        tlsReq := opts.TLSConfig != nil
6,973✔
721
        verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
6,973✔
722

6,973✔
723
        // Create our server's nkey identity.
6,973✔
724
        kp, _ := nkeys.CreateServer()
6,973✔
725
        pub, _ := kp.PublicKey()
6,973✔
726

6,973✔
727
        // Create an xkey for encrypting messages from this server.
6,973✔
728
        var xkp nkeys.KeyPair
6,973✔
729
        var xpub string
6,973✔
730
        if !fips140.Enabled() {
13,946✔
731
                xkp, _ = nkeys.CreateCurveKeys()
6,973✔
732
                xpub, _ = xkp.PublicKey()
6,973✔
733
        }
6,973✔
734

735
        serverName := pub
6,973✔
736
        if opts.ServerName != _EMPTY_ {
11,335✔
737
                serverName = opts.ServerName
4,362✔
738
        }
4,362✔
739

740
        httpBasePath := normalizeBasePath(opts.HTTPBasePath)
6,973✔
741

6,973✔
742
        // Validate some options. This is here because we cannot assume that
6,973✔
743
        // server will always be started with configuration parsing (that could
6,973✔
744
        // report issues). Its options can be (incorrectly) set by hand when
6,973✔
745
        // server is embedded. If there is an error, return nil.
6,973✔
746
        if err := validateOptions(opts); err != nil {
7,027✔
747
                return nil, err
54✔
748
        }
54✔
749

750
        info := Info{
6,919✔
751
                ID:           pub,
6,919✔
752
                XKey:         xpub,
6,919✔
753
                Version:      VERSION,
6,919✔
754
                Proto:        PROTO,
6,919✔
755
                GitCommit:    gitCommit,
6,919✔
756
                GoVersion:    runtime.Version(),
6,919✔
757
                Name:         serverName,
6,919✔
758
                Host:         opts.Host,
6,919✔
759
                Port:         opts.Port,
6,919✔
760
                AuthRequired: false,
6,919✔
761
                TLSRequired:  tlsReq && !opts.AllowNonTLS,
6,919✔
762
                TLSVerify:    verify,
6,919✔
763
                MaxPayload:   opts.MaxPayload,
6,919✔
764
                JetStream:    opts.JetStream,
6,919✔
765
                Headers:      !opts.NoHeaderSupport,
6,919✔
766
                Cluster:      opts.Cluster.Name,
6,919✔
767
                Domain:       opts.JetStreamDomain,
6,919✔
768
                JSApiLevel:   JSApiLevel,
6,919✔
769
        }
6,919✔
770

6,919✔
771
        if tlsReq && !info.TLSRequired {
6,922✔
772
                info.TLSAvailable = true
3✔
773
        }
3✔
774

775
        now := time.Now()
6,919✔
776

6,919✔
777
        s := &Server{
6,919✔
778
                kp:                 kp,
6,919✔
779
                xkp:                xkp,
6,919✔
780
                xpub:               xpub,
6,919✔
781
                configFile:         opts.ConfigFile,
6,919✔
782
                info:               info,
6,919✔
783
                opts:               opts,
6,919✔
784
                done:               make(chan bool, 1),
6,919✔
785
                start:              now,
6,919✔
786
                configTime:         now,
6,919✔
787
                gwLeafSubs:         NewSublistWithCache(),
6,919✔
788
                httpBasePath:       httpBasePath,
6,919✔
789
                eventIds:           nuid.New(),
6,919✔
790
                routesToSelf:       make(map[string]struct{}),
6,919✔
791
                httpReqStats:       make(map[string]uint64), // Used to track HTTP requests
6,919✔
792
                rateLimitLoggingCh: make(chan time.Duration, 1),
6,919✔
793
                leafNodeEnabled:    opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
6,919✔
794
                syncOutSem:         make(chan struct{}, maxConcurrentSyncRequests),
6,919✔
795
        }
6,919✔
796

6,919✔
797
        // Delayed API response queue. Create regardless if JetStream is configured
6,919✔
798
        // or not (since it can be enabled/disabled with config reload, we want this
6,919✔
799
        // queue to exist at all times).
6,919✔
800
        s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
6,919✔
801

6,919✔
802
        // By default we'll allow account NRG.
6,919✔
803
        s.accountNRGAllowed.Store(true)
6,919✔
804

6,919✔
805
        // Fill up the maximum in flight syncRequests for this server.
6,919✔
806
        // Used in JetStream catchup semantics.
6,919✔
807
        for i := 0; i < maxConcurrentSyncRequests; i++ {
228,327✔
808
                s.syncOutSem <- struct{}{}
221,408✔
809
        }
221,408✔
810

811
        if opts.TLSRateLimit > 0 {
6,920✔
812
                s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
1✔
813
        }
1✔
814

815
        // Trusted root operator keys.
816
        if !s.processTrustedKeys() {
6,919✔
817
                return nil, fmt.Errorf("Error processing trusted operator keys")
×
818
        }
×
819

820
        // If we have solicited leafnodes but no clustering and no clustername.
821
        // However we may need a stable clustername so use the server name.
822
        if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
7,852✔
823
                s.leafNoCluster = true
933✔
824
                opts.Cluster.Name = opts.ServerName
933✔
825
        }
933✔
826

827
        if opts.Cluster.Name != _EMPTY_ {
11,822✔
828
                // Also place into mapping cn with cnMu lock.
4,903✔
829
                s.cnMu.Lock()
4,903✔
830
                s.cn = opts.Cluster.Name
4,903✔
831
                s.cnMu.Unlock()
4,903✔
832
        }
4,903✔
833

834
        s.mu.Lock()
6,919✔
835
        defer s.mu.Unlock()
6,919✔
836

6,919✔
837
        // If there are proxies trusted public keys in the configuration
6,919✔
838
        // this will fill create the corresponding list of nkeys.KeyPair
6,919✔
839
        // that we can use for signature verification.
6,919✔
840
        s.processProxiesTrustedKeys()
6,919✔
841

6,919✔
842
        // Place ourselves in the JetStream nodeInfo if needed.
6,919✔
843
        if opts.JetStream {
11,108✔
844
                ourNode := getHash(serverName)
4,189✔
845
                s.nodeToInfo.Store(ourNode, nodeInfo{
4,189✔
846
                        name:            serverName,
4,189✔
847
                        version:         VERSION,
4,189✔
848
                        cluster:         opts.Cluster.Name,
4,189✔
849
                        domain:          opts.JetStreamDomain,
4,189✔
850
                        id:              info.ID,
4,189✔
851
                        tags:            opts.Tags,
4,189✔
852
                        cfg:             &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
4,189✔
853
                        stats:           nil,
4,189✔
854
                        offline:         false,
4,189✔
855
                        js:              true,
4,189✔
856
                        binarySnapshots: true,
4,189✔
857
                        accountNRG:      true,
4,189✔
858
                })
4,189✔
859
        }
4,189✔
860

861
        s.routeResolver = opts.Cluster.resolver
6,919✔
862
        if s.routeResolver == nil {
13,838✔
863
                s.routeResolver = net.DefaultResolver
6,919✔
864
        }
6,919✔
865

866
        // Used internally for quick look-ups.
867
        s.clientConnectURLsMap = make(refCountedUrlSet)
6,919✔
868
        s.websocket.connectURLsMap = make(refCountedUrlSet)
6,919✔
869
        s.leafURLsMap = make(refCountedUrlSet)
6,919✔
870

6,919✔
871
        // Ensure that non-exported options (used in tests) are properly set.
6,919✔
872
        s.setLeafNodeNonExportedOptions()
6,919✔
873

6,919✔
874
        // Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
6,919✔
875
        // are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
6,919✔
876
        if err := s.enableOCSP(); err != nil {
6,921✔
877
                return nil, err
2✔
878
        }
2✔
879

880
        // Call this even if there is no gateway defined. It will
881
        // initialize the structure so we don't have to check for
882
        // it to be nil or not in various places in the code.
883
        if err := s.newGateway(opts); err != nil {
6,917✔
884
                return nil, err
×
885
        }
×
886

887
        // If we have a cluster definition but do not have a cluster name, create one.
888
        if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
6,950✔
889
                s.info.Cluster = nuid.Next()
33✔
890
        } else if opts.Cluster.Name != _EMPTY_ {
11,820✔
891
                // Likewise here if we have a cluster name set.
4,903✔
892
                s.info.Cluster = opts.Cluster.Name
4,903✔
893
        }
4,903✔
894

895
        // This is normally done in the AcceptLoop, once the
896
        // listener has been created (possibly with random port),
897
        // but since some tests may expect the INFO to be properly
898
        // set after New(), let's do it now.
899
        s.setInfoHostPort()
6,917✔
900

6,917✔
901
        // For tracking clients
6,917✔
902
        s.clients = make(map[uint64]*client)
6,917✔
903

6,917✔
904
        // For tracking closed clients.
6,917✔
905
        s.closed = newClosedRingBuffer(opts.MaxClosedClients)
6,917✔
906

6,917✔
907
        // For tracking connections that are not yet registered
6,917✔
908
        // in s.routes, but for which readLoop has started.
6,917✔
909
        s.grTmpClients = make(map[uint64]*client)
6,917✔
910

6,917✔
911
        // For tracking routes and their remote ids
6,917✔
912
        s.initRouteStructures(opts)
6,917✔
913

6,917✔
914
        // For tracking leaf nodes.
6,917✔
915
        s.leafs = make(map[uint64]*client)
6,917✔
916

6,917✔
917
        // Used to kick out all go routines possibly waiting on server
6,917✔
918
        // to shutdown.
6,917✔
919
        s.quitCh = make(chan struct{})
6,917✔
920

6,917✔
921
        // Closed when startup is complete. ReadyForConnections() will block on
6,917✔
922
        // this before checking the presence of listening sockets.
6,917✔
923
        s.startupComplete = make(chan struct{})
6,917✔
924

6,917✔
925
        // Closed when Shutdown() is complete. Allows WaitForShutdown() to block
6,917✔
926
        // waiting for complete shutdown.
6,917✔
927
        s.shutdownComplete = make(chan struct{})
6,917✔
928

6,917✔
929
        // Check for configured account resolvers.
6,917✔
930
        if err := s.configureResolver(); err != nil {
6,917✔
931
                return nil, err
×
932
        }
×
933
        // If there is an URL account resolver, do basic test to see if anyone is home.
934
        if ar := opts.AccountResolver; ar != nil {
7,281✔
935
                if ur, ok := ar.(*URLAccResolver); ok {
404✔
936
                        if _, err := ur.Fetch(_EMPTY_); err != nil {
41✔
937
                                return nil, err
1✔
938
                        }
1✔
939
                }
940
        }
941
        // For other resolver:
942
        // In operator mode, when the account resolver depends on an external system and
943
        // the system account can't be fetched, inject a temporary one.
944
        if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
6,916✔
945
                opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
7,179✔
946
                if _, ok := ar.(*MemAccResolver); !ok {
419✔
947
                        s.mu.Unlock()
156✔
948
                        var a *Account
156✔
949
                        // perform direct lookup to avoid warning trace
156✔
950
                        if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
244✔
951
                                a, _ = s.lookupAccount(opts.SystemAccount)
88✔
952
                        }
88✔
953
                        s.mu.Lock()
156✔
954
                        if a == nil {
224✔
955
                                sac := NewAccount(opts.SystemAccount)
68✔
956
                                sac.Issuer = opts.TrustedOperators[0].Issuer
68✔
957
                                sac.signingKeys = map[string]jwt.Scope{}
68✔
958
                                sac.signingKeys[opts.SystemAccount] = nil
68✔
959
                                s.registerAccountNoLock(sac)
68✔
960
                        }
68✔
961
                }
962
        }
963

964
        // For tracking accounts
965
        if _, err := s.configureAccounts(false); err != nil {
6,916✔
966
                return nil, err
×
967
        }
×
968

969
        // Used to setup Authorization.
970
        s.configureAuthorization()
6,916✔
971

6,916✔
972
        // Start signal handler
6,916✔
973
        s.handleSignals()
6,916✔
974

6,916✔
975
        return s, nil
6,916✔
976
}
977

978
// Initializes route structures based on pooling and/or per-account routes.
979
//
980
// Server lock is held on entry
981
func (s *Server) initRouteStructures(opts *Options) {
6,917✔
982
        s.routes = make(map[string][]*client)
6,917✔
983
        if ps := opts.Cluster.PoolSize; ps > 0 {
10,993✔
984
                s.routesPoolSize = ps
4,076✔
985
        } else {
6,917✔
986
                s.routesPoolSize = 1
2,841✔
987
        }
2,841✔
988
        // If we have per-account routes, we create accRoutes and initialize it
989
        // with nil values. The presence of an account as the key will allow us
990
        // to know if a given account is supposed to have dedicated routes.
991
        if l := len(opts.Cluster.PinnedAccounts); l > 0 {
10,689✔
992
                s.accRoutes = make(map[string]map[string]*client, l)
3,772✔
993
                for _, acc := range opts.Cluster.PinnedAccounts {
7,571✔
994
                        s.accRoutes[acc] = make(map[string]*client)
3,799✔
995
                }
3,799✔
996
        }
997
}
998

999
func (s *Server) logRejectedTLSConns() {
1✔
1000
        defer s.grWG.Done()
1✔
1001
        t := time.NewTicker(time.Second)
1✔
1002
        defer t.Stop()
1✔
1003
        for {
3✔
1004
                select {
2✔
1005
                case <-s.quitCh:
1✔
1006
                        return
1✔
1007
                case <-t.C:
1✔
1008
                        blocked := s.connRateCounter.countBlocked()
1✔
1009
                        if blocked > 0 {
2✔
1010
                                s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
1✔
1011
                        }
1✔
1012
                }
1013
        }
1014
}
1015

1016
// clusterName returns our cluster name which could be dynamic.
1017
func (s *Server) ClusterName() string {
114,872✔
1018
        s.mu.RLock()
114,872✔
1019
        cn := s.info.Cluster
114,872✔
1020
        s.mu.RUnlock()
114,872✔
1021
        return cn
114,872✔
1022
}
114,872✔
1023

1024
// Grabs cluster name with cluster name specific lock.
1025
func (s *Server) cachedClusterName() string {
107,918✔
1026
        s.cnMu.RLock()
107,918✔
1027
        cn := s.cn
107,918✔
1028
        s.cnMu.RUnlock()
107,918✔
1029
        return cn
107,918✔
1030
}
107,918✔
1031

1032
// setClusterName will update the cluster name for this server.
1033
func (s *Server) setClusterName(name string) {
23✔
1034
        s.mu.Lock()
23✔
1035
        var resetCh chan struct{}
23✔
1036
        if s.sys != nil && s.info.Cluster != name {
42✔
1037
                // can't hold the lock as go routine reading it may be waiting for lock as well
19✔
1038
                resetCh = s.sys.resetCh
19✔
1039
        }
19✔
1040
        s.info.Cluster = name
23✔
1041
        s.routeInfo.Cluster = name
23✔
1042

23✔
1043
        // Need to close solicited leaf nodes. The close has to be done outside of the server lock.
23✔
1044
        var leafs []*client
23✔
1045
        for _, c := range s.leafs {
24✔
1046
                c.mu.Lock()
1✔
1047
                if c.leaf != nil && c.leaf.remote != nil {
2✔
1048
                        leafs = append(leafs, c)
1✔
1049
                }
1✔
1050
                c.mu.Unlock()
1✔
1051
        }
1052
        s.mu.Unlock()
23✔
1053

23✔
1054
        // Also place into mapping cn with cnMu lock.
23✔
1055
        s.cnMu.Lock()
23✔
1056
        s.cn = name
23✔
1057
        s.cnMu.Unlock()
23✔
1058

23✔
1059
        for _, l := range leafs {
24✔
1060
                l.closeConnection(ClusterNameConflict)
1✔
1061
        }
1✔
1062
        if resetCh != nil {
42✔
1063
                resetCh <- struct{}{}
19✔
1064
        }
19✔
1065
        s.Noticef("Cluster name updated to %s", name)
23✔
1066
}
1067

1068
// Return whether the cluster name is dynamic.
1069
func (s *Server) isClusterNameDynamic() bool {
43,386✔
1070
        // We need to lock the whole "Cluster.Name" check and not use s.getOpts()
43,386✔
1071
        // because otherwise this could cause a data race with setting the name in
43,386✔
1072
        // route.go's processRouteConnect().
43,386✔
1073
        s.optsMu.RLock()
43,386✔
1074
        dynamic := s.opts.Cluster.Name == _EMPTY_
43,386✔
1075
        s.optsMu.RUnlock()
43,386✔
1076
        return dynamic
43,386✔
1077
}
43,386✔
1078

1079
// Returns our configured serverName.
1080
func (s *Server) serverName() string {
24,298✔
1081
        return s.getOpts().ServerName
24,298✔
1082
}
24,298✔
1083

1084
// ClientURL returns the URL used to connect clients.
1085
// Helpful in tests and with in-process servers using a random client port (-1).
1086
func (s *Server) ClientURL() string {
6,554✔
1087
        // FIXME(dlc) - should we add in user and pass if defined single?
6,554✔
1088
        opts := s.getOpts()
6,554✔
1089
        var u url.URL
6,554✔
1090
        u.Scheme = "nats"
6,554✔
1091
        if opts.TLSConfig != nil {
6,564✔
1092
                u.Scheme = "tls"
10✔
1093
        }
10✔
1094
        u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
6,554✔
1095
        return u.String()
6,554✔
1096
}
1097

1098
// WebsocketURL returns the URL used to connect websocket clients.
1099
// Helpful in tests and with in-process servers using a random websocket port (-1).
1100
func (s *Server) WebsocketURL() string {
×
1101
        opts := s.getOpts()
×
1102
        var u url.URL
×
1103
        u.Scheme = "ws"
×
1104
        if opts.Websocket.TLSConfig != nil {
×
1105
                u.Scheme = "wss"
×
1106
        }
×
1107
        u.Host = net.JoinHostPort(opts.Websocket.Host, fmt.Sprintf("%d", opts.Websocket.Port))
×
1108
        return u.String()
×
1109
}
1110

1111
func validateCluster(o *Options) error {
8,281✔
1112
        if o.Cluster.Name != _EMPTY_ && strings.Contains(o.Cluster.Name, " ") {
8,281✔
1113
                return ErrClusterNameHasSpaces
×
1114
        }
×
1115
        if o.Cluster.Compression.Mode != _EMPTY_ {
13,357✔
1116
                if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil {
5,076✔
1117
                        return err
×
1118
                }
×
1119
        }
1120
        if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil {
8,281✔
1121
                return fmt.Errorf("cluster: %v", err)
×
1122
        }
×
1123
        // Check that cluster name if defined matches any gateway name.
1124
        // Note that we have already verified that the gateway name does not have spaces.
1125
        if o.Gateway.Name != _EMPTY_ && o.Gateway.Name != o.Cluster.Name {
8,644✔
1126
                if o.Cluster.Name != _EMPTY_ {
364✔
1127
                        return ErrClusterNameConfigConflict
1✔
1128
                }
1✔
1129
                // Set this here so we do not consider it dynamic.
1130
                o.Cluster.Name = o.Gateway.Name
362✔
1131
        }
1132
        if l := len(o.Cluster.PinnedAccounts); l > 0 {
12,144✔
1133
                if o.Cluster.PoolSize < 0 {
3,865✔
1134
                        return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
1✔
1135
                }
1✔
1136
                m := make(map[string]struct{}, l)
3,863✔
1137
                for _, a := range o.Cluster.PinnedAccounts {
7,791✔
1138
                        if _, exists := m[a]; exists {
3,928✔
1139
                                return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
×
1140
                        }
×
1141
                        m[a] = struct{}{}
3,928✔
1142
                }
1143
        }
1144
        return nil
8,279✔
1145
}
1146

1147
func validatePinnedCerts(pinned PinnedCertSet) error {
18,879✔
1148
        re := regexp.MustCompile("^[a-f0-9]{64}$")
18,879✔
1149
        for certId := range pinned {
18,885✔
1150
                entry := strings.ToLower(certId)
6✔
1151
                if !re.MatchString(entry) {
6✔
1152
                        return fmt.Errorf("error parsing 'pinned_certs' key %s does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo", entry)
×
1153
                }
×
1154
        }
1155
        return nil
18,879✔
1156
}
1157

1158
func validateOptions(o *Options) error {
8,329✔
1159
        if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
8,329✔
1160
                return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
×
1161
                        o.LameDuckGracePeriod, o.LameDuckDuration)
×
1162
        }
×
1163
        if int64(o.MaxPayload) > o.MaxPending {
8,331✔
1164
                return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)",
2✔
1165
                        o.MaxPayload, o.MaxPending)
2✔
1166
        }
2✔
1167
        if o.ServerName != _EMPTY_ && strings.Contains(o.ServerName, " ") {
8,327✔
1168
                return errors.New("server name cannot contain spaces")
×
1169
        }
×
1170
        // Check that the trust configuration is correct.
1171
        if err := validateTrustedOperators(o); err != nil {
8,340✔
1172
                return err
13✔
1173
        }
13✔
1174
        // Check on leaf nodes which will require a system
1175
        // account when gateways are also configured.
1176
        if err := validateLeafNode(o); err != nil {
8,341✔
1177
                return err
27✔
1178
        }
27✔
1179
        // Check that authentication is properly configured.
1180
        if err := validateAuth(o); err != nil {
8,292✔
1181
                return err
5✔
1182
        }
5✔
1183
        // Check that proxies is properly configured.
1184
        if err := validateProxies(o); err != nil {
8,283✔
1185
                return err
1✔
1186
        }
1✔
1187
        // Check that gateway is properly configured. Returns no error
1188
        // if there is no gateway defined.
1189
        if err := validateGatewayOptions(o); err != nil {
8,281✔
1190
                return err
×
1191
        }
×
1192
        // Check that cluster name if defined matches any gateway name.
1193
        if err := validateCluster(o); err != nil {
8,283✔
1194
                return err
2✔
1195
        }
2✔
1196
        if err := validateMQTTOptions(o); err != nil {
8,284✔
1197
                return err
5✔
1198
        }
5✔
1199
        if err := validateJetStreamOptions(o); err != nil {
8,286✔
1200
                return err
12✔
1201
        }
12✔
1202
        // Finally check websocket options.
1203
        return validateWebsocketOptions(o)
8,262✔
1204
}
1205

1206
func (s *Server) getOpts() *Options {
3,124,550✔
1207
        s.optsMu.RLock()
3,124,550✔
1208
        opts := s.opts
3,124,550✔
1209
        s.optsMu.RUnlock()
3,124,550✔
1210
        return opts
3,124,550✔
1211
}
3,124,550✔
1212

1213
func (s *Server) setOpts(opts *Options) {
1,343✔
1214
        s.optsMu.Lock()
1,343✔
1215
        s.opts = opts
1,343✔
1216
        s.optsMu.Unlock()
1,343✔
1217
}
1,343✔
1218

1219
func (s *Server) globalAccount() *Account {
11,236✔
1220
        s.mu.RLock()
11,236✔
1221
        gacc := s.gacc
11,236✔
1222
        s.mu.RUnlock()
11,236✔
1223
        return gacc
11,236✔
1224
}
11,236✔
1225

1226
// Used to setup or update Accounts.
1227
// Returns a map that indicates which accounts have had their stream imports
1228
// changed (in case of an update in configuration reload).
1229
// Lock is held upon entry, but will be released/reacquired in this function.
1230
func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) {
8,248✔
1231
        awcsti := make(map[string]struct{})
8,248✔
1232

8,248✔
1233
        // Create the global account.
8,248✔
1234
        if s.gacc == nil {
15,164✔
1235
                s.gacc = NewAccount(globalAccountName)
6,916✔
1236
                s.registerAccountNoLock(s.gacc)
6,916✔
1237
        }
6,916✔
1238

1239
        opts := s.getOpts()
8,248✔
1240

8,248✔
1241
        // We need to track service imports since we can not swap them out (unsub and re-sub)
8,248✔
1242
        // until the proper server struct accounts have been swapped in properly. Doing it in
8,248✔
1243
        // place could lead to data loss or server panic since account under new si has no real
8,248✔
1244
        // account and hence no sublist, so will panic on inbound message.
8,248✔
1245
        siMap := make(map[*Account][][]byte)
8,248✔
1246

8,248✔
1247
        // Check opts and walk through them. We need to copy them here
8,248✔
1248
        // so that we do not keep a real one sitting in the options.
8,248✔
1249
        for _, acc := range opts.Accounts {
17,232✔
1250
                var a *Account
8,984✔
1251
                create := true
8,984✔
1252
                // For the global account, we want to skip the reload process
8,984✔
1253
                // and fall back into the "create" case which will in that
8,984✔
1254
                // case really be just an update (shallowCopy will make sure
8,984✔
1255
                // that mappings are copied over).
8,984✔
1256
                if reloading && acc.Name != globalAccountName {
12,370✔
1257
                        if ai, ok := s.accounts.Load(acc.Name); ok {
6,772✔
1258
                                a = ai.(*Account)
3,386✔
1259
                                // Before updating the account, check if stream imports have changed.
3,386✔
1260
                                if !a.checkStreamImportsEqual(acc) {
3,393✔
1261
                                        awcsti[acc.Name] = struct{}{}
7✔
1262
                                }
7✔
1263
                                a.mu.Lock()
3,386✔
1264
                                // Collect the sids for the service imports since we are going to
3,386✔
1265
                                // replace with new ones.
3,386✔
1266
                                var sids [][]byte
3,386✔
1267
                                for _, sis := range a.imports.services {
14,698✔
1268
                                        for _, si := range sis {
22,624✔
1269
                                                if si.sid != nil {
22,624✔
1270
                                                        sids = append(sids, si.sid)
11,312✔
1271
                                                }
11,312✔
1272
                                        }
1273
                                }
1274
                                // Setup to process later if needed.
1275
                                if len(sids) > 0 || len(acc.imports.services) > 0 {
6,764✔
1276
                                        siMap[a] = sids
3,378✔
1277
                                }
3,378✔
1278

1279
                                // Now reset all export/imports fields since they are going to be
1280
                                // filled in shallowCopy()
1281
                                a.imports.streams, a.imports.services = nil, nil
3,386✔
1282
                                a.exports.streams, a.exports.services = nil, nil
3,386✔
1283
                                // We call shallowCopy from the account `acc` (the one in Options)
3,386✔
1284
                                // and pass `a` (our existing account) to get it updated.
3,386✔
1285
                                acc.shallowCopy(a)
3,386✔
1286
                                a.mu.Unlock()
3,386✔
1287
                                create = false
3,386✔
1288
                        }
1289
                }
1290
                // Track old mappings if global account.
1291
                var oldGMappings []*mapping
8,984✔
1292
                if create {
14,582✔
1293
                        if acc.Name == globalAccountName {
5,614✔
1294
                                a = s.gacc
16✔
1295
                                a.mu.Lock()
16✔
1296
                                oldGMappings = append(oldGMappings, a.mappings...)
16✔
1297
                                a.mu.Unlock()
16✔
1298
                        } else {
5,598✔
1299
                                a = NewAccount(acc.Name)
5,582✔
1300
                        }
5,582✔
1301
                        // Locking matters in the case of an update of the global account
1302
                        a.mu.Lock()
5,598✔
1303
                        acc.shallowCopy(a)
5,598✔
1304
                        a.mu.Unlock()
5,598✔
1305
                        // Will be a no-op in case of the global account since it is already registered.
5,598✔
1306
                        s.registerAccountNoLock(a)
5,598✔
1307
                }
1308

1309
                // The `acc` account is stored in options, not in the server, and these can be cleared.
1310
                acc.sl, acc.clients, acc.mappings = nil, nil, nil
8,984✔
1311

8,984✔
1312
                // Check here if we have been reloaded and we have a global account with mappings that may have changed.
8,984✔
1313
                // If we have leafnodes they need to be updated.
8,984✔
1314
                if reloading && a == s.gacc {
8,989✔
1315
                        a.mu.Lock()
5✔
1316
                        mappings := make(map[string]*mapping)
5✔
1317
                        if len(a.mappings) > 0 && a.nleafs > 0 {
7✔
1318
                                for _, em := range a.mappings {
4✔
1319
                                        mappings[em.src] = em
2✔
1320
                                }
2✔
1321
                        }
1322
                        a.mu.Unlock()
5✔
1323
                        if len(mappings) > 0 || len(oldGMappings) > 0 {
10✔
1324
                                a.lmu.RLock()
5✔
1325
                                for _, lc := range a.lleafs {
7✔
1326
                                        for _, em := range mappings {
4✔
1327
                                                lc.forceAddToSmap(em.src)
2✔
1328
                                        }
2✔
1329
                                        // Remove any old ones if needed.
1330
                                        for _, em := range oldGMappings {
4✔
1331
                                                // Only remove if not in the new ones.
2✔
1332
                                                if _, ok := mappings[em.src]; !ok {
3✔
1333
                                                        lc.forceRemoveFromSmap(em.src)
1✔
1334
                                                }
1✔
1335
                                        }
1336
                                }
1337
                                a.lmu.RUnlock()
5✔
1338
                        }
1339
                }
1340

1341
                // If we see an account defined using $SYS we will make sure that is set as system account.
1342
                if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ {
12,665✔
1343
                        opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT
3,681✔
1344
                }
3,681✔
1345
        }
1346

1347
        // Now that we have this we need to remap any referenced accounts in
1348
        // import or export maps to the new ones.
1349
        swapApproved := func(ea *exportAuth) {
10,686✔
1350
                for sub, a := range ea.approved {
2,488✔
1351
                        var acc *Account
50✔
1352
                        if v, ok := s.accounts.Load(a.Name); ok {
100✔
1353
                                acc = v.(*Account)
50✔
1354
                        }
50✔
1355
                        ea.approved[sub] = acc
50✔
1356
                }
1357
        }
1358
        var numAccounts int
8,248✔
1359
        s.accounts.Range(func(k, v any) bool {
25,852✔
1360
                numAccounts++
17,604✔
1361
                acc := v.(*Account)
17,604✔
1362
                acc.mu.Lock()
17,604✔
1363
                // Exports
17,604✔
1364
                for _, se := range acc.exports.streams {
17,752✔
1365
                        if se != nil {
161✔
1366
                                swapApproved(&se.exportAuth)
13✔
1367
                        }
13✔
1368
                }
1369
                for _, se := range acc.exports.services {
20,029✔
1370
                        if se != nil {
4,850✔
1371
                                // Swap over the bound account for service exports.
2,425✔
1372
                                if se.acc != nil {
4,850✔
1373
                                        if v, ok := s.accounts.Load(se.acc.Name); ok {
4,850✔
1374
                                                se.acc = v.(*Account)
2,425✔
1375
                                        }
2,425✔
1376
                                }
1377
                                swapApproved(&se.exportAuth)
2,425✔
1378
                        }
1379
                }
1380
                // Imports
1381
                for _, si := range acc.imports.streams {
17,815✔
1382
                        if v, ok := s.accounts.Load(si.acc.Name); ok {
422✔
1383
                                si.acc = v.(*Account)
211✔
1384
                        }
211✔
1385
                }
1386
                for _, sis := range acc.imports.services {
24,483✔
1387
                        for _, si := range sis {
13,760✔
1388
                                if v, ok := s.accounts.Load(si.acc.Name); ok {
13,762✔
1389
                                        si.acc = v.(*Account)
6,881✔
1390

6,881✔
1391
                                        // It is possible to allow for latency tracking inside your
6,881✔
1392
                                        // own account, so lock only when not the same account.
6,881✔
1393
                                        if si.acc == acc {
7,114✔
1394
                                                si.se = si.acc.getServiceExport(si.to)
233✔
1395
                                                continue
233✔
1396
                                        }
1397
                                        si.acc.mu.RLock()
6,648✔
1398
                                        si.se = si.acc.getServiceExport(si.to)
6,648✔
1399
                                        si.acc.mu.RUnlock()
6,648✔
1400
                                }
1401
                        }
1402
                }
1403
                // Make sure the subs are running, but only if not reloading.
1404
                if len(acc.imports.services) > 0 && acc.ic == nil && !reloading {
17,900✔
1405
                        acc.ic = s.createInternalAccountClient()
296✔
1406
                        acc.ic.acc = acc
296✔
1407
                        // Need to release locks to invoke this function.
296✔
1408
                        acc.mu.Unlock()
296✔
1409
                        s.mu.Unlock()
296✔
1410
                        acc.addAllServiceImportSubs()
296✔
1411
                        s.mu.Lock()
296✔
1412
                        acc.mu.Lock()
296✔
1413
                }
296✔
1414
                acc.updated = time.Now()
17,604✔
1415
                acc.mu.Unlock()
17,604✔
1416
                return true
17,604✔
1417
        })
1418

1419
        // Check if we need to process service imports pending from above.
1420
        // This processing needs to be after we swap in the real accounts above.
1421
        for acc, sids := range siMap {
11,626✔
1422
                c := acc.ic
3,378✔
1423
                for _, sid := range sids {
14,690✔
1424
                        c.processUnsub(sid)
11,312✔
1425
                }
11,312✔
1426
                acc.addAllServiceImportSubs()
3,378✔
1427
                s.mu.Unlock()
3,378✔
1428
                s.registerSystemImports(acc)
3,378✔
1429
                s.mu.Lock()
3,378✔
1430
        }
1431

1432
        // Set the system account if it was configured.
1433
        // Otherwise create a default one.
1434
        if opts.SystemAccount != _EMPTY_ {
13,136✔
1435
                // Lock may be acquired in lookupAccount, so release to call lookupAccount.
4,888✔
1436
                s.mu.Unlock()
4,888✔
1437
                acc, err := s.lookupAccount(opts.SystemAccount)
4,888✔
1438
                s.mu.Lock()
4,888✔
1439
                if err == nil && s.sys != nil && acc != s.sys.account {
4,888✔
1440
                        // sys.account.clients (including internal client)/respmap/etc... are transferred separately
×
1441
                        s.sys.account = acc
×
1442
                        s.sysAcc.Store(acc)
×
1443
                }
×
1444
                if err != nil {
4,888✔
1445
                        return awcsti, fmt.Errorf("error resolving system account: %v", err)
×
1446
                }
×
1447

1448
                // If we have defined a system account here check to see if its just us and the $G account.
1449
                // We would do this to add user/pass to the system account. If this is the case add in
1450
                // no-auth-user for $G.
1451
                // Only do this if non-operator mode and we did not have an authorization block defined.
1452
                if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
7,778✔
1453
                        // If we come here from config reload, let's not recreate the fake user name otherwise
2,890✔
1454
                        // it will cause currently clients to be disconnected.
2,890✔
1455
                        uname := s.sysAccOnlyNoAuthUser
2,890✔
1456
                        if uname == _EMPTY_ {
5,773✔
1457
                                // Create a unique name so we do not collide.
2,883✔
1458
                                var b [8]byte
2,883✔
1459
                                rn := rand.Int63()
2,883✔
1460
                                for i, l := 0, rn; i < len(b); i++ {
25,947✔
1461
                                        b[i] = digits[l%base]
23,064✔
1462
                                        l /= base
23,064✔
1463
                                }
23,064✔
1464
                                uname = fmt.Sprintf("nats-%s", b[:])
2,883✔
1465
                                s.sysAccOnlyNoAuthUser = uname
2,883✔
1466
                        }
1467
                        opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc})
2,890✔
1468
                        opts.NoAuthUser = uname
2,890✔
1469
                }
1470
        }
1471

1472
        // Add any required exports from system account.
1473
        if s.sys != nil {
9,555✔
1474
                sysAcc := s.sys.account
1,307✔
1475
                s.mu.Unlock()
1,307✔
1476
                s.addSystemAccountExports(sysAcc)
1,307✔
1477
                s.mu.Lock()
1,307✔
1478
        }
1,307✔
1479

1480
        return awcsti, nil
8,248✔
1481
}
1482

1483
// Setup the account resolver. For memory resolver, make sure the JWTs are
1484
// properly formed but do not enforce expiration etc.
1485
// Lock is held on entry, but may be released/reacquired during this call.
1486
func (s *Server) configureResolver() error {
6,929✔
1487
        opts := s.getOpts()
6,929✔
1488
        s.accResolver = opts.AccountResolver
6,929✔
1489
        if opts.AccountResolver != nil {
7,305✔
1490
                // For URL resolver, set the TLSConfig if specified.
376✔
1491
                if opts.AccountResolverTLSConfig != nil {
380✔
1492
                        if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
8✔
1493
                                if t, ok := ar.c.Transport.(*http.Transport); ok {
8✔
1494
                                        t.CloseIdleConnections()
4✔
1495
                                        t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
4✔
1496
                                }
4✔
1497
                        }
1498
                }
1499
                if len(opts.resolverPreloads) > 0 {
547✔
1500
                        // Lock ordering is account resolver -> server, so we need to release
171✔
1501
                        // the lock and reacquire it when done with account resolver's calls.
171✔
1502
                        ar := s.accResolver
171✔
1503
                        s.mu.Unlock()
171✔
1504
                        defer s.mu.Lock()
171✔
1505
                        if ar.IsReadOnly() {
171✔
1506
                                return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
×
1507
                        }
×
1508
                        for k, v := range opts.resolverPreloads {
1,087✔
1509
                                _, err := jwt.DecodeAccountClaims(v)
916✔
1510
                                if err != nil {
916✔
1511
                                        return fmt.Errorf("preload account error for %q: %v", k, err)
×
1512
                                }
×
1513
                                ar.Store(k, v)
916✔
1514
                        }
1515
                }
1516
        }
1517
        return nil
6,929✔
1518
}
1519

1520
// This will check preloads for validation issues.
1521
func (s *Server) checkResolvePreloads() {
171✔
1522
        opts := s.getOpts()
171✔
1523
        // We can just check the read-only opts versions here, that way we do not need
171✔
1524
        // to grab server lock or access s.accResolver.
171✔
1525
        for k, v := range opts.resolverPreloads {
1,086✔
1526
                claims, err := jwt.DecodeAccountClaims(v)
915✔
1527
                if err != nil {
915✔
1528
                        s.Errorf("Preloaded account [%s] not valid", k)
×
1529
                        continue
×
1530
                }
1531
                // Check if it is expired.
1532
                vr := jwt.CreateValidationResults()
915✔
1533
                claims.Validate(vr)
915✔
1534
                if vr.IsBlocking(true) {
916✔
1535
                        s.Warnf("Account [%s] has validation issues:", k)
1✔
1536
                        for _, v := range vr.Issues {
2✔
1537
                                s.Warnf("  - %s", v.Description)
1✔
1538
                        }
1✔
1539
                }
1540
        }
1541
}
1542

1543
// Determines if we are in pre NATS 2.0 setup with no accounts.
1544
func (s *Server) globalAccountOnly() bool {
4,224✔
1545
        var hasOthers bool
4,224✔
1546

4,224✔
1547
        if s.trustedKeys != nil {
4,366✔
1548
                return false
142✔
1549
        }
142✔
1550

1551
        s.mu.RLock()
4,082✔
1552
        s.accounts.Range(func(k, v any) bool {
12,156✔
1553
                acc := v.(*Account)
8,074✔
1554
                // Ignore global and system
8,074✔
1555
                if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
15,762✔
1556
                        return true
7,688✔
1557
                }
7,688✔
1558
                hasOthers = true
386✔
1559
                return false
386✔
1560
        })
1561
        s.mu.RUnlock()
4,082✔
1562

4,082✔
1563
        return !hasOthers
4,082✔
1564
}
1565

1566
// Determines if this server is in standalone mode, meaning no routes or gateways.
1567
func (s *Server) standAloneMode() bool {
26,469✔
1568
        opts := s.getOpts()
26,469✔
1569
        return opts.Cluster.Port == 0 && opts.Gateway.Port == 0
26,469✔
1570
}
26,469✔
1571

1572
func (s *Server) configuredRoutes() int {
3,247✔
1573
        return len(s.getOpts().Routes)
3,247✔
1574
}
3,247✔
1575

1576
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
1577
func (s *Server) ActivePeers() (peers []string) {
3,054✔
1578
        s.nodeToInfo.Range(func(k, v any) bool {
6,512✔
1579
                si := v.(nodeInfo)
3,458✔
1580
                if !si.offline {
6,916✔
1581
                        peers = append(peers, k.(string))
3,458✔
1582
                }
3,458✔
1583
                return true
3,458✔
1584
        })
1585
        return peers
3,054✔
1586
}
1587

1588
// isTrustedIssuer will check that the issuer is a trusted public key.
1589
// This is used to make sure an account was signed by a trusted operator.
1590
func (s *Server) isTrustedIssuer(issuer string) bool {
10,682✔
1591
        s.mu.RLock()
10,682✔
1592
        defer s.mu.RUnlock()
10,682✔
1593
        // If we are not running in trusted mode and there is no issuer, that is ok.
10,682✔
1594
        if s.trustedKeys == nil && issuer == _EMPTY_ {
16,382✔
1595
                return true
5,700✔
1596
        }
5,700✔
1597
        for _, tk := range s.trustedKeys {
10,174✔
1598
                if tk == issuer {
10,096✔
1599
                        return true
4,904✔
1600
                }
4,904✔
1601
        }
1602
        return false
78✔
1603
}
1604

1605
// processTrustedKeys will process binary stamped and
1606
// options-based trusted nkeys. Returns success.
1607
func (s *Server) processTrustedKeys() bool {
6,919✔
1608
        s.strictSigningKeyUsage = map[string]struct{}{}
6,919✔
1609
        opts := s.getOpts()
6,919✔
1610
        if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() {
6,919✔
1611
                return false
×
1612
        } else if opts.TrustedKeys != nil {
7,331✔
1613
                for _, key := range opts.TrustedKeys {
1,710✔
1614
                        if !nkeys.IsValidPublicOperatorKey(key) {
1,298✔
1615
                                return false
×
1616
                        }
×
1617
                }
1618
                s.trustedKeys = append([]string(nil), opts.TrustedKeys...)
412✔
1619
                for _, claim := range opts.TrustedOperators {
718✔
1620
                        if !claim.StrictSigningKeyUsage {
610✔
1621
                                continue
304✔
1622
                        }
1623
                        for _, key := range claim.SigningKeys {
4✔
1624
                                s.strictSigningKeyUsage[key] = struct{}{}
2✔
1625
                        }
2✔
1626
                }
1627
        }
1628
        return true
6,919✔
1629
}
1630

1631
// checkTrustedKeyString will check that the string is a valid array
1632
// of public operator nkeys.
1633
func checkTrustedKeyString(keys string) []string {
×
1634
        tks := strings.Fields(keys)
×
1635
        if len(tks) == 0 {
×
1636
                return nil
×
1637
        }
×
1638
        // Walk all the keys and make sure they are valid.
1639
        for _, key := range tks {
×
1640
                if !nkeys.IsValidPublicOperatorKey(key) {
×
1641
                        return nil
×
1642
                }
×
1643
        }
1644
        return tks
×
1645
}
1646

1647
// initStampedTrustedKeys will check the stamped trusted keys
1648
// and will set the server field 'trustedKeys'. Returns whether
1649
// it succeeded or not.
1650
func (s *Server) initStampedTrustedKeys() bool {
×
1651
        // Check to see if we have an override in options, which will cause us to fail.
×
1652
        if len(s.getOpts().TrustedKeys) > 0 {
×
1653
                return false
×
1654
        }
×
1655
        tks := checkTrustedKeyString(trustedKeys)
×
1656
        if len(tks) == 0 {
×
1657
                return false
×
1658
        }
×
1659
        s.trustedKeys = tks
×
1660
        return true
×
1661
}
1662

1663
// PrintAndDie is exported for access in other packages.
1664
func PrintAndDie(msg string) {
×
1665
        fmt.Fprintln(os.Stderr, msg)
×
1666
        os.Exit(1)
×
1667
}
×
1668

1669
// PrintServerAndExit will print our version and exit.
1670
func PrintServerAndExit() {
×
1671
        fmt.Printf("nats-server: v%s\n", VERSION)
×
1672
        os.Exit(0)
×
1673
}
×
1674

1675
// ProcessCommandLineArgs takes the command line arguments
1676
// validating and setting flags for handling in case any
1677
// sub command was present.
1678
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
82✔
1679
        if len(cmd.Args()) > 0 {
85✔
1680
                arg := cmd.Args()[0]
3✔
1681
                switch strings.ToLower(arg) {
3✔
1682
                case "version":
1✔
1683
                        return true, false, nil
1✔
1684
                case "help":
1✔
1685
                        return false, true, nil
1✔
1686
                default:
1✔
1687
                        return false, false, fmt.Errorf("unrecognized command: %q", arg)
1✔
1688
                }
1689
        }
1690

1691
        return false, false, nil
79✔
1692
}
1693

1694
// Public version.
1695
func (s *Server) Running() bool {
2,284✔
1696
        return s.isRunning()
2,284✔
1697
}
2,284✔
1698

1699
// Protected check on running state
1700
func (s *Server) isRunning() bool {
316,668,172✔
1701
        return s.running.Load()
316,668,172✔
1702
}
316,668,172✔
1703

1704
func (s *Server) logPid() error {
4✔
1705
        pidStr := strconv.Itoa(os.Getpid())
4✔
1706
        return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
4✔
1707
}
4✔
1708

1709
// numReservedAccounts will return the number of reserved accounts configured in the server.
1710
// Currently this is 1, one for the global default account.
1711
func (s *Server) numReservedAccounts() int {
2✔
1712
        return 1
2✔
1713
}
2✔
1714

1715
// NumActiveAccounts reports number of active accounts on this server.
1716
func (s *Server) NumActiveAccounts() int32 {
×
1717
        return atomic.LoadInt32(&s.activeAccounts)
×
1718
}
×
1719

1720
// incActiveAccounts() just adds one under lock.
1721
func (s *Server) incActiveAccounts() {
21,842✔
1722
        atomic.AddInt32(&s.activeAccounts, 1)
21,842✔
1723
}
21,842✔
1724

1725
// decActiveAccounts() just subtracts one under lock.
1726
func (s *Server) decActiveAccounts() {
11,844✔
1727
        atomic.AddInt32(&s.activeAccounts, -1)
11,844✔
1728
}
11,844✔
1729

1730
// This should be used for testing only. Will be slow since we have to
1731
// range over all accounts in the sync.Map to count.
1732
func (s *Server) numAccounts() int {
7✔
1733
        count := 0
7✔
1734
        s.mu.RLock()
7✔
1735
        s.accounts.Range(func(k, v any) bool {
24✔
1736
                count++
17✔
1737
                return true
17✔
1738
        })
17✔
1739
        s.mu.RUnlock()
7✔
1740
        return count
7✔
1741
}
1742

1743
// NumLoadedAccounts returns the number of loaded accounts.
1744
func (s *Server) NumLoadedAccounts() int {
4✔
1745
        return s.numAccounts()
4✔
1746
}
4✔
1747

1748
// LookupOrRegisterAccount will return the given account if known or create a new entry.
1749
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
2,264✔
1750
        s.mu.Lock()
2,264✔
1751
        defer s.mu.Unlock()
2,264✔
1752
        if v, ok := s.accounts.Load(name); ok {
2,264✔
1753
                return v.(*Account), false
×
1754
        }
×
1755
        acc := NewAccount(name)
2,264✔
1756
        s.registerAccountNoLock(acc)
2,264✔
1757
        return acc, true
2,264✔
1758
}
1759

1760
// RegisterAccount will register an account. The account must be new
1761
// or this call will fail.
1762
func (s *Server) RegisterAccount(name string) (*Account, error) {
849✔
1763
        s.mu.Lock()
849✔
1764
        defer s.mu.Unlock()
849✔
1765
        if _, ok := s.accounts.Load(name); ok {
978✔
1766
                return nil, ErrAccountExists
129✔
1767
        }
129✔
1768
        acc := NewAccount(name)
720✔
1769
        s.registerAccountNoLock(acc)
720✔
1770
        return acc, nil
720✔
1771
}
1772

1773
// SetSystemAccount will set the internal system account.
1774
// If root operators are present it will also check validity.
1775
func (s *Server) SetSystemAccount(accName string) error {
6,072✔
1776
        // Lookup from sync.Map first.
6,072✔
1777
        if v, ok := s.accounts.Load(accName); ok {
12,142✔
1778
                return s.setSystemAccount(v.(*Account))
6,070✔
1779
        }
6,070✔
1780

1781
        // If we are here we do not have local knowledge of this account.
1782
        // Do this one by hand to return more useful error.
1783
        ac, jwt, err := s.fetchAccountClaims(accName)
2✔
1784
        if err != nil {
3✔
1785
                return err
1✔
1786
        }
1✔
1787
        acc := s.buildInternalAccount(ac)
1✔
1788
        acc.claimJWT = jwt
1✔
1789
        // Due to race, we need to make sure that we are not
1✔
1790
        // registering twice.
1✔
1791
        if racc := s.registerAccount(acc); racc != nil {
1✔
1792
                return nil
×
1793
        }
×
1794
        return s.setSystemAccount(acc)
1✔
1795
}
1796

1797
// SystemAccount returns the system account if set.
1798
func (s *Server) SystemAccount() *Account {
447,506✔
1799
        return s.sysAcc.Load()
447,506✔
1800
}
447,506✔
1801

1802
// GlobalAccount returns the global account.
1803
// Default clients will use the global account.
1804
func (s *Server) GlobalAccount() *Account {
7,452✔
1805
        s.mu.RLock()
7,452✔
1806
        defer s.mu.RUnlock()
7,452✔
1807
        return s.gacc
7,452✔
1808
}
7,452✔
1809

1810
// SetDefaultSystemAccount will create a default system account if one is not present.
1811
func (s *Server) SetDefaultSystemAccount() error {
2,254✔
1812
        if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
2,254✔
1813
                return nil
×
1814
        }
×
1815
        s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
2,254✔
1816
        return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
2,254✔
1817
}
1818

1819
// Assign a system account. Should only be called once.
1820
// This sets up a server to send and receive messages from
1821
// inside the server itself.
1822
func (s *Server) setSystemAccount(acc *Account) error {
6,091✔
1823
        if acc == nil {
6,091✔
1824
                return ErrMissingAccount
×
1825
        }
×
1826
        // Don't try to fix this here.
1827
        if acc.IsExpired() {
6,091✔
1828
                return ErrAccountExpired
×
1829
        }
×
1830
        // If we are running with trusted keys for an operator
1831
        // make sure we check the account is legit.
1832
        if !s.isTrustedIssuer(acc.Issuer) {
6,162✔
1833
                return ErrAccountValidation
71✔
1834
        }
71✔
1835

1836
        s.mu.Lock()
6,020✔
1837

6,020✔
1838
        if s.sys != nil {
6,022✔
1839
                s.mu.Unlock()
2✔
1840
                return ErrAccountExists
2✔
1841
        }
2✔
1842

1843
        // This is here in an attempt to quiet the race detector and not have to place
1844
        // locks on fast path for inbound messages and checking service imports.
1845
        acc.mu.Lock()
6,018✔
1846
        if acc.imports.services == nil {
12,036✔
1847
                acc.imports.services = make(map[string][]*serviceImport)
6,018✔
1848
        }
6,018✔
1849
        acc.mu.Unlock()
6,018✔
1850

6,018✔
1851
        s.sys = &internal{
6,018✔
1852
                account: acc,
6,018✔
1853
                client:  s.createInternalSystemClient(),
6,018✔
1854
                seq:     1,
6,018✔
1855
                sid:     1,
6,018✔
1856
                servers: make(map[string]*serverUpdate),
6,018✔
1857
                replies: make(map[string]msgHandler),
6,018✔
1858
                sendq:   newIPQueue[*pubMsg](s, "System sendQ"),
6,018✔
1859
                recvq:   newIPQueue[*inSysMsg](s, "System recvQ"),
6,018✔
1860
                recvqp:  newIPQueue[*inSysMsg](s, "System recvQ Pings"),
6,018✔
1861
                resetCh: make(chan struct{}),
6,018✔
1862
                sq:      s.newSendQ(acc),
6,018✔
1863
                statsz:  statsHBInterval,
6,018✔
1864
                orphMax: 5 * eventsHBInterval,
6,018✔
1865
                chkOrph: 3 * eventsHBInterval,
6,018✔
1866
        }
6,018✔
1867
        recvq, recvqp := s.sys.recvq, s.sys.recvqp
6,018✔
1868
        s.sys.wg.Add(1)
6,018✔
1869
        s.mu.Unlock()
6,018✔
1870

6,018✔
1871
        // Store in atomic for fast lookup.
6,018✔
1872
        s.sysAcc.Store(acc)
6,018✔
1873

6,018✔
1874
        // Register with the account.
6,018✔
1875
        s.sys.client.registerWithAccount(acc)
6,018✔
1876

6,018✔
1877
        s.addSystemAccountExports(acc)
6,018✔
1878

6,018✔
1879
        // Start our internal loop to serialize outbound messages.
6,018✔
1880
        // We do our own wg here since we will stop first during shutdown.
6,018✔
1881
        go s.internalSendLoop(&s.sys.wg)
6,018✔
1882

6,018✔
1883
        // Start the internal loop for inbound messages.
6,018✔
1884
        go s.internalReceiveLoop(recvq)
6,018✔
1885
        // Start the internal loop for inbound STATSZ/Ping messages.
6,018✔
1886
        go s.internalReceiveLoop(recvqp)
6,018✔
1887

6,018✔
1888
        // Start up our general subscriptions
6,018✔
1889
        s.initEventTracking()
6,018✔
1890

6,018✔
1891
        // Track for dead remote servers.
6,018✔
1892
        s.wrapChk(s.startRemoteServerSweepTimer)()
6,018✔
1893

6,018✔
1894
        // Send out statsz updates periodically.
6,018✔
1895
        s.wrapChk(s.startStatszTimer)()
6,018✔
1896

6,018✔
1897
        // If we have existing accounts make sure we enable account tracking.
6,018✔
1898
        s.mu.Lock()
6,018✔
1899
        s.accounts.Range(func(k, v any) bool {
20,022✔
1900
                acc := v.(*Account)
14,004✔
1901
                s.enableAccountTracking(acc)
14,004✔
1902
                return true
14,004✔
1903
        })
14,004✔
1904
        s.mu.Unlock()
6,018✔
1905

6,018✔
1906
        return nil
6,018✔
1907
}
1908

1909
// Creates an internal system client.
1910
func (s *Server) createInternalSystemClient() *client {
29,734✔
1911
        return s.createInternalClient(SYSTEM)
29,734✔
1912
}
29,734✔
1913

1914
// Creates an internal jetstream client.
1915
func (s *Server) createInternalJetStreamClient() *client {
44,632✔
1916
        return s.createInternalClient(JETSTREAM)
44,632✔
1917
}
44,632✔
1918

1919
// Creates an internal client for Account.
1920
func (s *Server) createInternalAccountClient() *client {
15,292✔
1921
        return s.createInternalClient(ACCOUNT)
15,292✔
1922
}
15,292✔
1923

1924
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
1925
func (s *Server) createInternalClient(kind int) *client {
89,658✔
1926
        if !isInternalClient(kind) {
89,658✔
1927
                return nil
×
1928
        }
×
1929
        now := time.Now()
89,658✔
1930
        c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
89,658✔
1931
        c.initClient()
89,658✔
1932
        c.echo = false
89,658✔
1933
        c.headers = true
89,658✔
1934
        c.flags.set(noReconnect)
89,658✔
1935
        return c
89,658✔
1936
}
1937

1938
// Determine if accounts should track subscriptions for
1939
// efficient propagation.
1940
// Lock should be held on entry.
1941
func (s *Server) shouldTrackSubscriptions() bool {
16,832✔
1942
        opts := s.getOpts()
16,832✔
1943
        return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0)
16,832✔
1944
}
16,832✔
1945

1946
// Invokes registerAccountNoLock under the protection of the server lock.
1947
// That is, server lock is acquired/released in this function.
1948
// See registerAccountNoLock for comment on returned value.
1949
func (s *Server) registerAccount(acc *Account) *Account {
1,405✔
1950
        s.mu.Lock()
1,405✔
1951
        racc := s.registerAccountNoLock(acc)
1,405✔
1952
        s.mu.Unlock()
1,405✔
1953
        return racc
1,405✔
1954
}
1,405✔
1955

1956
// Helper to set the sublist based on preferences.
1957
func (s *Server) setAccountSublist(acc *Account) {
18,237✔
1958
        if acc != nil && acc.sl == nil {
35,192✔
1959
                opts := s.getOpts()
16,955✔
1960
                if opts != nil && opts.NoSublistCache {
16,958✔
1961
                        acc.sl = NewSublistNoCache()
3✔
1962
                } else {
16,955✔
1963
                        acc.sl = NewSublistWithCache()
16,952✔
1964
                }
16,952✔
1965
        }
1966
}
1967

1968
// Registers an account in the server.
1969
// Due to some locking considerations, we may end-up trying
1970
// to register the same account twice. This function will
1971
// then return the already registered account.
1972
// Lock should be held on entry.
1973
func (s *Server) registerAccountNoLock(acc *Account) *Account {
16,971✔
1974
        // We are under the server lock. Lookup from map, if present
16,971✔
1975
        // return existing account.
16,971✔
1976
        if a, _ := s.accounts.Load(acc.Name); a != nil {
17,110✔
1977
                s.tmpAccounts.Delete(acc.Name)
139✔
1978
                return a.(*Account)
139✔
1979
        }
139✔
1980
        // Finish account setup and store.
1981
        s.setAccountSublist(acc)
16,832✔
1982

16,832✔
1983
        acc.mu.Lock()
16,832✔
1984
        s.setRouteInfo(acc)
16,832✔
1985
        if acc.clients == nil {
33,656✔
1986
                acc.clients = make(map[*client]struct{})
16,824✔
1987
        }
16,824✔
1988

1989
        // If we are capable of routing we will track subscription
1990
        // information for efficient interest propagation.
1991
        // During config reload, it is possible that account was
1992
        // already created (global account), so use locking and
1993
        // make sure we create only if needed.
1994
        // TODO(dlc)- Double check that we need this for GWs.
1995
        if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
28,009✔
1996
                acc.rm = make(map[string]int32)
11,177✔
1997
                acc.lqws = make(map[string]int32)
11,177✔
1998
        }
11,177✔
1999
        acc.srv = s
16,832✔
2000
        acc.updated = time.Now()
16,832✔
2001
        accName := acc.Name
16,832✔
2002
        jsEnabled := len(acc.jsLimits) > 0
16,832✔
2003
        acc.mu.Unlock()
16,832✔
2004

16,832✔
2005
        if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 {
16,889✔
2006
                if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok {
76✔
2007
                        if jsEnabled {
22✔
2008
                                s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
3✔
2009
                        } else if defDomain != _EMPTY_ {
28✔
2010
                                for src, dest := range generateJSMappingTable(defDomain) {
90✔
2011
                                        // flip src and dest around so the domain is inserted
81✔
2012
                                        s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
81✔
2013
                                        if err := acc.AddMapping(dest, src); err != nil {
81✔
2014
                                                s.Errorf("Error adding JetStream default domain mapping: %v", err)
×
2015
                                        }
×
2016
                                }
2017
                        }
2018
                }
2019
        }
2020

2021
        s.accounts.Store(acc.Name, acc)
16,832✔
2022
        s.tmpAccounts.Delete(acc.Name)
16,832✔
2023
        s.enableAccountTracking(acc)
16,832✔
2024

16,832✔
2025
        // Can not have server lock here.
16,832✔
2026
        s.mu.Unlock()
16,832✔
2027
        s.registerSystemImports(acc)
16,832✔
2028
        // Starting 2.9.0, we are phasing out the optimistic mode, so change
16,832✔
2029
        // the account to interest-only mode (except if instructed not to do
16,832✔
2030
        // it in some tests).
16,832✔
2031
        if s.gateway.enabled && !gwDoNotForceInterestOnlyMode {
19,861✔
2032
                s.switchAccountToInterestMode(acc.GetName())
3,029✔
2033
        }
3,029✔
2034
        s.mu.Lock()
16,832✔
2035

16,832✔
2036
        return nil
16,832✔
2037
}
2038

2039
// Sets the account's routePoolIdx depending on presence or not of
2040
// pooling or per-account routes. Also updates a map used by
2041
// gateway code to retrieve a route based on some route hash.
2042
//
2043
// Both Server and Account lock held on entry.
2044
func (s *Server) setRouteInfo(acc *Account) {
16,935✔
2045
        // If there is a dedicated route configured for this account
16,935✔
2046
        if _, ok := s.accRoutes[acc.Name]; ok {
20,753✔
2047
                // We want the account name to be in the map, but we don't
3,818✔
2048
                // need a value (we could store empty string)
3,818✔
2049
                s.accRouteByHash.Store(acc.Name, nil)
3,818✔
2050
                // Set the route pool index to -1 so that it is easy when
3,818✔
2051
                // ranging over accounts to exclude those accounts when
3,818✔
2052
                // trying to get accounts for a given pool index.
3,818✔
2053
                acc.routePoolIdx = accDedicatedRoute
3,818✔
2054
        } else {
16,935✔
2055
                // If pool size more than 1, we will compute a hash code and
13,117✔
2056
                // use modulo to assign to an index of the pool slice. For 1
13,117✔
2057
                // and below, all accounts will be bound to the single connection
13,117✔
2058
                // at index 0.
13,117✔
2059
                acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
13,117✔
2060
                if s.routesPoolSize > 1 {
18,319✔
2061
                        s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
5,202✔
2062
                }
5,202✔
2063
        }
2064
}
2065

2066
// lookupAccount is a function to return the account structure
2067
// associated with an account name.
2068
// Lock MUST NOT be held upon entry.
2069
func (s *Server) lookupAccount(name string) (*Account, error) {
1,833,944✔
2070
        return s.lookupOrFetchAccount(name, true)
1,833,944✔
2071
}
1,833,944✔
2072

2073
// lookupOrFetchAccount is a function to return the account structure
2074
// associated with an account name.
2075
// Lock MUST NOT be held upon entry.
2076
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
2,055,057✔
2077
        var acc *Account
2,055,057✔
2078
        if v, ok := s.accounts.Load(name); ok {
4,107,738✔
2079
                acc = v.(*Account)
2,052,681✔
2080
        }
2,052,681✔
2081
        if acc != nil {
4,107,738✔
2082
                // If we are expired and we have a resolver, then
2,052,681✔
2083
                // return the latest information from the resolver.
2,052,681✔
2084
                if acc.IsExpired() {
2,052,691✔
2085
                        s.Debugf("Requested account [%s] has expired", name)
10✔
2086
                        if s.AccountResolver() != nil && fetch {
20✔
2087
                                if err := s.updateAccount(acc); err != nil {
12✔
2088
                                        // This error could mask expired, so just return expired here.
2✔
2089
                                        return nil, ErrAccountExpired
2✔
2090
                                }
2✔
2091
                        } else {
×
2092
                                return nil, ErrAccountExpired
×
2093
                        }
×
2094
                }
2095
                return acc, nil
2,052,679✔
2096
        }
2097
        // If we have a resolver see if it can fetch the account.
2098
        if s.AccountResolver() == nil || !fetch {
3,397✔
2099
                return nil, ErrMissingAccount
1,021✔
2100
        }
1,021✔
2101
        return s.fetchAccount(name)
1,355✔
2102
}
2103

2104
// LookupAccount is a public function to return the account structure
2105
// associated with name.
2106
func (s *Server) LookupAccount(name string) (*Account, error) {
1,677,777✔
2107
        return s.lookupAccount(name)
1,677,777✔
2108
}
1,677,777✔
2109

2110
// This will fetch new claims and if found update the account with new claims.
2111
// Lock MUST NOT be held upon entry.
2112
func (s *Server) updateAccount(acc *Account) error {
10✔
2113
        acc.mu.RLock()
10✔
2114
        // TODO(dlc) - Make configurable
10✔
2115
        if !acc.incomplete && time.Since(acc.updated) < time.Second {
10✔
2116
                acc.mu.RUnlock()
×
2117
                s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
×
2118
                return ErrAccountResolverUpdateTooSoon
×
2119
        }
×
2120
        acc.mu.RUnlock()
10✔
2121
        claimJWT, err := s.fetchRawAccountClaims(acc.Name)
10✔
2122
        if err != nil {
12✔
2123
                return err
2✔
2124
        }
2✔
2125
        return s.updateAccountWithClaimJWT(acc, claimJWT)
8✔
2126
}
2127

2128
// updateAccountWithClaimJWT will check and apply the claim update.
2129
// Lock MUST NOT be held upon entry.
2130
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
324✔
2131
        if acc == nil {
324✔
2132
                return ErrMissingAccount
×
2133
        }
×
2134
        acc.mu.RLock()
324✔
2135
        sameClaim := acc.claimJWT != _EMPTY_ && acc.claimJWT == claimJWT && !acc.incomplete
324✔
2136
        acc.mu.RUnlock()
324✔
2137
        if sameClaim {
490✔
2138
                s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
166✔
2139
                return nil
166✔
2140
        }
166✔
2141
        accClaims, _, err := s.verifyAccountClaims(claimJWT)
158✔
2142
        if err == nil && accClaims != nil {
316✔
2143
                acc.mu.Lock()
158✔
2144
                // if an account is updated with a different operator signing key, we want to
158✔
2145
                // show a consistent issuer.
158✔
2146
                acc.Issuer = accClaims.Issuer
158✔
2147
                if acc.Name != accClaims.Subject {
159✔
2148
                        acc.mu.Unlock()
1✔
2149
                        return ErrAccountValidation
1✔
2150
                }
1✔
2151
                acc.mu.Unlock()
157✔
2152
                s.UpdateAccountClaims(acc, accClaims)
157✔
2153
                acc.mu.Lock()
157✔
2154
                // needs to be set after update completed.
157✔
2155
                // This causes concurrent calls to return with sameClaim=true if the change is effective.
157✔
2156
                acc.claimJWT = claimJWT
157✔
2157
                acc.mu.Unlock()
157✔
2158
                return nil
157✔
2159
        }
2160
        return err
×
2161
}
2162

2163
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
2164
// Lock is NOT held upon entry.
2165
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
1,490✔
2166
        accResolver := s.AccountResolver()
1,490✔
2167
        if accResolver == nil {
1,490✔
2168
                return _EMPTY_, ErrNoAccountResolver
×
2169
        }
×
2170
        // Need to do actual Fetch
2171
        start := time.Now()
1,490✔
2172
        claimJWT, err := fetchAccount(accResolver, name)
1,490✔
2173
        fetchTime := time.Since(start)
1,490✔
2174
        if fetchTime > time.Second {
1,494✔
2175
                s.Warnf("Account [%s] fetch took %v", name, fetchTime)
4✔
2176
        } else {
1,490✔
2177
                s.Debugf("Account [%s] fetch took %v", name, fetchTime)
1,486✔
2178
        }
1,486✔
2179
        if err != nil {
1,535✔
2180
                s.Warnf("Account fetch failed: %v", err)
45✔
2181
                return "", err
45✔
2182
        }
45✔
2183
        return claimJWT, nil
1,445✔
2184
}
2185

2186
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
2187
// Lock is NOT held upon entry.
2188
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
1,480✔
2189
        claimJWT, err := s.fetchRawAccountClaims(name)
1,480✔
2190
        if err != nil {
1,523✔
2191
                return nil, _EMPTY_, err
43✔
2192
        }
43✔
2193
        var claim *jwt.AccountClaims
1,437✔
2194
        claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
1,437✔
2195
        if claim != nil && claim.Subject != name {
1,437✔
2196
                return nil, _EMPTY_, ErrAccountValidation
×
2197
        }
×
2198
        return claim, claimJWT, err
1,437✔
2199
}
2200

2201
// verifyAccountClaims will decode and validate any account claims.
2202
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
2,150✔
2203
        accClaims, err := jwt.DecodeAccountClaims(claimJWT)
2,150✔
2204
        if err != nil {
2,151✔
2205
                return nil, _EMPTY_, err
1✔
2206
        }
1✔
2207
        if !s.isTrustedIssuer(accClaims.Issuer) {
2,156✔
2208
                return nil, _EMPTY_, ErrAccountValidation
7✔
2209
        }
7✔
2210
        vr := jwt.CreateValidationResults()
2,142✔
2211
        accClaims.Validate(vr)
2,142✔
2212
        if vr.IsBlocking(true) {
2,148✔
2213
                return nil, _EMPTY_, ErrAccountValidation
6✔
2214
        }
6✔
2215
        return accClaims, claimJWT, nil
2,136✔
2216
}
2217

2218
// This will fetch an account from a resolver if defined.
2219
// Lock is NOT held upon entry.
2220
func (s *Server) fetchAccount(name string) (*Account, error) {
1,458✔
2221
        accClaims, claimJWT, err := s.fetchAccountClaims(name)
1,458✔
2222
        if accClaims == nil {
1,512✔
2223
                return nil, err
54✔
2224
        }
54✔
2225
        acc := s.buildInternalAccount(accClaims)
1,404✔
2226
        // Due to possible race, if registerAccount() returns a non
1,404✔
2227
        // nil account, it means the same account was already
1,404✔
2228
        // registered and we should use this one.
1,404✔
2229
        if racc := s.registerAccount(acc); racc != nil {
1,527✔
2230
                // Update with the new claims in case they are new.
123✔
2231
                if err = s.updateAccountWithClaimJWT(racc, claimJWT); err != nil {
123✔
2232
                        return nil, err
×
2233
                }
×
2234
                return racc, nil
123✔
2235
        }
2236
        // The sub imports may have been setup but will not have had their
2237
        // subscriptions properly setup. Do that here.
2238
        var needImportSubs bool
1,281✔
2239

1,281✔
2240
        acc.mu.Lock()
1,281✔
2241
        acc.claimJWT = claimJWT
1,281✔
2242
        if len(acc.imports.services) > 0 {
2,200✔
2243
                if acc.ic == nil {
1,660✔
2244
                        acc.ic = s.createInternalAccountClient()
741✔
2245
                        acc.ic.acc = acc
741✔
2246
                }
741✔
2247
                needImportSubs = true
919✔
2248
        }
2249
        acc.mu.Unlock()
1,281✔
2250

1,281✔
2251
        // Do these outside the lock.
1,281✔
2252
        if needImportSubs {
2,200✔
2253
                acc.addAllServiceImportSubs()
919✔
2254
        }
919✔
2255

2256
        return acc, nil
1,281✔
2257
}
2258

2259
// Start up the server, this will not block.
2260
//
2261
// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed
2262
// after calling s.Shutdown()
2263
func (s *Server) Start() {
6,706✔
2264
        s.Noticef("Starting nats-server")
6,706✔
2265

6,706✔
2266
        gc := gitCommit
6,706✔
2267
        if gc == _EMPTY_ {
13,412✔
2268
                gc = "not set"
6,706✔
2269
        }
6,706✔
2270

2271
        // Snapshot server options.
2272
        opts := s.getOpts()
6,706✔
2273

6,706✔
2274
        // Capture if this server is a leaf that has no cluster, so we don't
6,706✔
2275
        // display the cluster name if that is the case.
6,706✔
2276
        s.mu.RLock()
6,706✔
2277
        leafNoCluster := s.leafNoCluster
6,706✔
2278
        s.mu.RUnlock()
6,706✔
2279

6,706✔
2280
        var clusterName string
6,706✔
2281
        if !leafNoCluster {
12,479✔
2282
                clusterName = s.ClusterName()
5,773✔
2283
        }
5,773✔
2284

2285
        s.Noticef("  Version:  %s", VERSION)
6,706✔
2286
        s.Noticef("  Git:      [%s]", gc)
6,706✔
2287
        s.Debugf("  Go build: %s", s.info.GoVersion)
6,706✔
2288
        if clusterName != _EMPTY_ {
11,492✔
2289
                s.Noticef("  Cluster:  %s", clusterName)
4,786✔
2290
        }
4,786✔
2291
        s.Noticef("  Name:     %s", s.info.Name)
6,706✔
2292
        if opts.JetStream {
10,895✔
2293
                s.Noticef("  Node:     %s", getHash(s.info.Name))
4,189✔
2294
        }
4,189✔
2295
        s.Noticef("  ID:       %s", s.info.ID)
6,706✔
2296

6,706✔
2297
        defer s.Noticef("Server is ready")
6,706✔
2298

6,706✔
2299
        // Check for insecure configurations.
6,706✔
2300
        s.checkAuthforWarnings()
6,706✔
2301

6,706✔
2302
        // Avoid RACE between Start() and Shutdown()
6,706✔
2303
        s.running.Store(true)
6,706✔
2304
        s.mu.Lock()
6,706✔
2305
        // Update leafNodeEnabled in case options have changed post NewServer()
6,706✔
2306
        // and before Start() (we should not be able to allow that, but server has
6,706✔
2307
        // direct reference to user-provided options - at least before a Reload() is
6,706✔
2308
        // performed.
6,706✔
2309
        s.leafNodeEnabled = opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0
6,706✔
2310
        s.mu.Unlock()
6,706✔
2311

6,706✔
2312
        s.grMu.Lock()
6,706✔
2313
        s.grRunning = true
6,706✔
2314
        s.grMu.Unlock()
6,706✔
2315

6,706✔
2316
        s.startRateLimitLogExpiration()
6,706✔
2317

6,706✔
2318
        // Pprof http endpoint for the profiler.
6,706✔
2319
        if opts.ProfPort != 0 {
6,707✔
2320
                s.StartProfiler()
1✔
2321
        } else {
6,706✔
2322
                // It's still possible to access this profile via a SYS endpoint, so set
6,705✔
2323
                // this anyway. (Otherwise StartProfiler would have called it.)
6,705✔
2324
                s.setBlockProfileRate(opts.ProfBlockRate)
6,705✔
2325
        }
6,705✔
2326

2327
        if opts.ConfigFile != _EMPTY_ {
11,596✔
2328
                var cd string
4,890✔
2329
                if opts.configDigest != "" {
9,780✔
2330
                        cd = fmt.Sprintf("(%s)", opts.configDigest)
4,890✔
2331
                }
4,890✔
2332
                s.Noticef("Using configuration file: %s %s", opts.ConfigFile, cd)
4,890✔
2333
        }
2334

2335
        hasOperators := len(opts.TrustedOperators) > 0
6,706✔
2336
        if hasOperators {
7,006✔
2337
                s.Noticef("Trusted Operators")
300✔
2338
        }
300✔
2339
        for _, opc := range opts.TrustedOperators {
7,006✔
2340
                s.Noticef("  System  : %q", opc.Audience)
300✔
2341
                s.Noticef("  Operator: %q", opc.Name)
300✔
2342
                s.Noticef("  Issued  : %v", time.Unix(opc.IssuedAt, 0))
300✔
2343
                switch opc.Expires {
300✔
2344
                case 0:
11✔
2345
                        s.Noticef("  Expires : Never")
11✔
2346
                default:
289✔
2347
                        s.Noticef("  Expires : %v", time.Unix(opc.Expires, 0))
289✔
2348
                }
2349
        }
2350
        if hasOperators && opts.SystemAccount == _EMPTY_ {
6,744✔
2351
                s.Warnf("Trusted Operators should utilize a System Account")
38✔
2352
        }
38✔
2353
        if opts.MaxPayload > MAX_PAYLOAD_MAX_SIZE {
6,708✔
2354
                s.Warnf("Maximum payloads over %v are generally discouraged and could lead to poor performance",
2✔
2355
                        friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE)))
2✔
2356
        }
2✔
2357

2358
        if len(opts.JsAccDefaultDomain) > 0 {
6,725✔
2359
                s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed")
19✔
2360
        }
19✔
2361

2362
        // If we have a memory resolver, check the accounts here for validation exceptions.
2363
        // This allows them to be logged right away vs when they are accessed via a client.
2364
        if hasOperators && len(opts.resolverPreloads) > 0 {
6,868✔
2365
                s.checkResolvePreloads()
162✔
2366
        }
162✔
2367

2368
        // Log the pid to a file.
2369
        if opts.PidFile != _EMPTY_ {
6,708✔
2370
                if err := s.logPid(); err != nil {
2✔
2371
                        s.Fatalf("Could not write pidfile: %v", err)
×
2372
                        return
×
2373
                }
×
2374
        }
2375

2376
        // Setup system account which will start the eventing stack.
2377
        if sa := opts.SystemAccount; sa != _EMPTY_ {
10,518✔
2378
                if err := s.SetSystemAccount(sa); err != nil {
3,812✔
2379
                        s.Fatalf("Can't set system account: %v", err)
×
2380
                        return
×
2381
                }
×
2382
        } else if !opts.NoSystemAccount {
5,147✔
2383
                // We will create a default system account here.
2,253✔
2384
                s.SetDefaultSystemAccount()
2,253✔
2385
        }
2,253✔
2386

2387
        // Start monitoring before enabling other subsystems of the
2388
        // server to be able to monitor during startup.
2389
        if err := s.StartMonitoring(); err != nil {
6,707✔
2390
                s.Fatalf("Can't start monitoring: %v", err)
1✔
2391
                return
1✔
2392
        }
1✔
2393

2394
        // Start up resolver machinery.
2395
        if ar := s.AccountResolver(); ar != nil {
7,063✔
2396
                if err := ar.Start(s); err != nil {
358✔
2397
                        s.Fatalf("Could not start resolver: %v", err)
×
2398
                        return
×
2399
                }
×
2400
                // In operator mode, when the account resolver depends on an external system and
2401
                // the system account is the bootstrapping account, start fetching it.
2402
                if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
620✔
2403
                        opts := s.getOpts()
262✔
2404
                        _, isMemResolver := ar.(*MemAccResolver)
262✔
2405
                        if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
330✔
2406
                                s.Noticef("Using bootstrapping system account")
68✔
2407
                                s.startGoRoutine(func() {
136✔
2408
                                        defer s.grWG.Done()
68✔
2409
                                        t := time.NewTicker(time.Second)
68✔
2410
                                        defer t.Stop()
68✔
2411
                                        for {
174✔
2412
                                                select {
106✔
2413
                                                case <-s.quitCh:
45✔
2414
                                                        return
45✔
2415
                                                case <-t.C:
61✔
2416
                                                        sacc := s.SystemAccount()
61✔
2417
                                                        if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil {
99✔
2418
                                                                continue
38✔
2419
                                                        } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil {
23✔
2420
                                                                continue
×
2421
                                                        }
2422
                                                        s.Noticef("System account fetched and updated")
23✔
2423
                                                        return
23✔
2424
                                                }
2425
                                        }
2426
                                })
2427
                        }
2428
                }
2429
        }
2430

2431
        // Start expiration of mapped GW replies, regardless if
2432
        // this server is configured with gateway or not.
2433
        s.startGWReplyMapExpiration()
6,705✔
2434

6,705✔
2435
        // Check if JetStream has been enabled. This needs to be after
6,705✔
2436
        // the system account setup above. JetStream will create its
6,705✔
2437
        // own system account if one is not present.
6,705✔
2438
        if opts.JetStream {
10,894✔
2439
                // Make sure someone is not trying to enable on the system account.
4,189✔
2440
                if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 {
4,189✔
2441
                        s.Fatalf("Not allowed to enable JetStream on the system account")
×
2442
                }
×
2443
                cfg := &JetStreamConfig{
4,189✔
2444
                        StoreDir:     opts.StoreDir,
4,189✔
2445
                        SyncInterval: opts.SyncInterval,
4,189✔
2446
                        SyncAlways:   opts.SyncAlways,
4,189✔
2447
                        Strict:       !opts.NoJetStreamStrict,
4,189✔
2448
                        MaxMemory:    opts.JetStreamMaxMemory,
4,189✔
2449
                        MaxStore:     opts.JetStreamMaxStore,
4,189✔
2450
                        Domain:       opts.JetStreamDomain,
4,189✔
2451
                        CompressOK:   true,
4,189✔
2452
                        UniqueTag:    opts.JetStreamUniqueTag,
4,189✔
2453
                }
4,189✔
2454
                if err := s.EnableJetStream(cfg); err != nil {
4,191✔
2455
                        s.Fatalf("Can't start JetStream: %v", err)
2✔
2456
                        return
2✔
2457
                }
2✔
2458
        } else {
2,516✔
2459
                // Check to see if any configured accounts have JetStream enabled.
2,516✔
2460
                sa, ga := s.SystemAccount(), s.GlobalAccount()
2,516✔
2461
                var hasSys, hasGlobal bool
2,516✔
2462
                var total int
2,516✔
2463

2,516✔
2464
                s.accounts.Range(func(k, v any) bool {
8,255✔
2465
                        total++
5,739✔
2466
                        acc := v.(*Account)
5,739✔
2467
                        if acc == sa {
7,543✔
2468
                                hasSys = true
1,804✔
2469
                        } else if acc == ga {
8,255✔
2470
                                hasGlobal = true
2,516✔
2471
                        }
2,516✔
2472
                        acc.mu.RLock()
5,739✔
2473
                        hasJs := len(acc.jsLimits) > 0
5,739✔
2474
                        acc.mu.RUnlock()
5,739✔
2475
                        if hasJs {
5,764✔
2476
                                s.checkJetStreamExports()
25✔
2477
                                acc.enableAllJetStreamServiceImportsAndMappings()
25✔
2478
                        }
25✔
2479
                        return true
5,739✔
2480
                })
2481
                // If we only have the system account and the global account and we are not standalone,
2482
                // go ahead and enable JS on $G in case we are in simple mixed mode setup.
2483
                if total == 2 && hasSys && hasGlobal && !s.standAloneMode() {
3,063✔
2484
                        ga.mu.Lock()
547✔
2485
                        ga.jsLimits = map[string]JetStreamAccountLimits{
547✔
2486
                                _EMPTY_: dynamicJSAccountLimits,
547✔
2487
                        }
547✔
2488
                        ga.mu.Unlock()
547✔
2489
                        s.checkJetStreamExports()
547✔
2490
                        ga.enableAllJetStreamServiceImportsAndMappings()
547✔
2491
                }
547✔
2492
        }
2493

2494
        // Delayed API response handling. Start regardless of JetStream being
2495
        // currently configured or not (since it can be enabled/disabled with
2496
        // configuration reload).
2497
        s.startGoRoutine(s.delayedAPIResponder)
6,703✔
2498

6,703✔
2499
        // Start OCSP Stapling monitoring for TLS certificates if enabled. Hook TLS handshake for
6,703✔
2500
        // OCSP check on peers (LEAF and CLIENT kind) if enabled.
6,703✔
2501
        s.startOCSPMonitoring()
6,703✔
2502

6,703✔
2503
        // Configure OCSP Response Cache for peer OCSP checks if enabled.
6,703✔
2504
        s.initOCSPResponseCache()
6,703✔
2505

6,703✔
2506
        // Start up gateway if needed. Do this before starting the routes, because
6,703✔
2507
        // we want to resolve the gateway host:port so that this information can
6,703✔
2508
        // be sent to other routes.
6,703✔
2509
        if opts.Gateway.Port != 0 {
7,862✔
2510
                s.startGateways()
1,159✔
2511
        }
1,159✔
2512

2513
        // Start websocket server if needed. Do this before starting the routes, and
2514
        // leaf node because we want to resolve the gateway host:port so that this
2515
        // information can be sent to other routes.
2516
        if opts.Websocket.Port != 0 {
6,825✔
2517
                s.startWebsocketServer()
122✔
2518
        }
122✔
2519

2520
        // Start up listen if we want to accept leaf node connections.
2521
        if opts.LeafNode.Port != 0 {
10,472✔
2522
                // Will resolve or assign the advertise address for the leafnode listener.
3,769✔
2523
                // We need that in StartRouting().
3,769✔
2524
                s.startLeafNodeAcceptLoop()
3,769✔
2525
        }
3,769✔
2526

2527
        // Solicit remote servers for leaf node connections.
2528
        if len(opts.LeafNode.Remotes) > 0 {
7,895✔
2529
                s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
1,192✔
2530
        }
1,192✔
2531

2532
        // TODO (ik): I wanted to refactor this by starting the client
2533
        // accept loop first, that is, it would resolve listen spec
2534
        // in place, but start the accept-for-loop in a different go
2535
        // routine. This would get rid of the synchronization between
2536
        // this function and StartRouting, which I also would have wanted
2537
        // to refactor, but both AcceptLoop() and StartRouting() have
2538
        // been exported and not sure if that would break users using them.
2539
        // We could mark them as deprecated and remove in a release or two...
2540

2541
        // The Routing routine needs to wait for the client listen
2542
        // port to be opened and potential ephemeral port selected.
2543
        clientListenReady := make(chan struct{})
6,703✔
2544

6,703✔
2545
        // MQTT
6,703✔
2546
        if opts.MQTT.Port != 0 {
6,963✔
2547
                s.startMQTT()
260✔
2548
        }
260✔
2549

2550
        // Start up routing as well if needed.
2551
        if opts.Cluster.Port != 0 {
11,261✔
2552
                s.startGoRoutine(func() {
9,116✔
2553
                        s.StartRouting(clientListenReady)
4,558✔
2554
                })
4,558✔
2555
        }
2556

2557
        if opts.PortsFileDir != _EMPTY_ {
6,705✔
2558
                s.logPorts()
2✔
2559
        }
2✔
2560

2561
        if opts.TLSRateLimit > 0 {
6,704✔
2562
                s.startGoRoutine(s.logRejectedTLSConns)
1✔
2563
        }
1✔
2564

2565
        // We've finished starting up.
2566
        close(s.startupComplete)
6,703✔
2567

6,703✔
2568
        // Wait for clients.
6,703✔
2569
        if !opts.DontListen {
13,406✔
2570
                s.AcceptLoop(clientListenReady)
6,703✔
2571
        }
6,703✔
2572

2573
        // Bring OSCP Response cache online after accept loop started in anticipation of NATS-enabled cache types
2574
        s.startOCSPResponseCache()
6,703✔
2575
}
2576

2577
func (s *Server) isShuttingDown() bool {
493,195✔
2578
        return s.shutdown.Load()
493,195✔
2579
}
493,195✔
2580

2581
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
2582
// and closing all associated clients.
2583
func (s *Server) Shutdown() {
7,200✔
2584
        if s == nil {
7,205✔
2585
                return
5✔
2586
        }
5✔
2587
        // This is for JetStream R1 Pull Consumers to allow signaling
2588
        // that pending pull requests are invalid.
2589
        s.signalPullConsumers()
7,195✔
2590

7,195✔
2591
        // Transfer off any raft nodes that we are a leader by stepping them down.
7,195✔
2592
        s.stepdownRaftNodes()
7,195✔
2593

7,195✔
2594
        // Shutdown the eventing system as needed.
7,195✔
2595
        // This is done first to send out any messages for
7,195✔
2596
        // account status. We will also clean up any
7,195✔
2597
        // eventing items associated with accounts.
7,195✔
2598
        s.shutdownEventing()
7,195✔
2599

7,195✔
2600
        // Prevent issues with multiple calls.
7,195✔
2601
        if s.isShuttingDown() {
7,619✔
2602
                return
424✔
2603
        }
424✔
2604

2605
        s.mu.Lock()
6,771✔
2606
        s.Noticef("Initiating Shutdown...")
6,771✔
2607

6,771✔
2608
        accRes := s.accResolver
6,771✔
2609

6,771✔
2610
        opts := s.getOpts()
6,771✔
2611

6,771✔
2612
        s.shutdown.Store(true)
6,771✔
2613
        s.running.Store(false)
6,771✔
2614
        s.grMu.Lock()
6,771✔
2615
        s.grRunning = false
6,771✔
2616
        s.grMu.Unlock()
6,771✔
2617
        s.mu.Unlock()
6,771✔
2618

6,771✔
2619
        if accRes != nil {
7,179✔
2620
                accRes.Close()
408✔
2621
        }
408✔
2622

2623
        // Now check and shutdown jetstream.
2624
        s.shutdownJetStream()
6,771✔
2625

6,771✔
2626
        // Now shutdown the nodes
6,771✔
2627
        s.shutdownRaftNodes()
6,771✔
2628

6,771✔
2629
        s.mu.Lock()
6,771✔
2630
        conns := make(map[uint64]*client)
6,771✔
2631

6,771✔
2632
        // Copy off the clients
6,771✔
2633
        for i, c := range s.clients {
8,031✔
2634
                conns[i] = c
1,260✔
2635
        }
1,260✔
2636
        // Copy off the connections that are not yet registered
2637
        // in s.routes, but for which the readLoop has started
2638
        s.grMu.Lock()
6,771✔
2639
        for i, c := range s.grTmpClients {
6,794✔
2640
                conns[i] = c
23✔
2641
        }
23✔
2642
        s.grMu.Unlock()
6,771✔
2643
        // Copy off the routes
6,771✔
2644
        s.forEachRoute(func(r *client) {
23,777✔
2645
                r.mu.Lock()
17,006✔
2646
                conns[r.cid] = r
17,006✔
2647
                r.mu.Unlock()
17,006✔
2648
        })
17,006✔
2649
        // Copy off the gateways
2650
        s.getAllGatewayConnections(conns)
6,771✔
2651

6,771✔
2652
        // Copy off the leaf nodes
6,771✔
2653
        for i, c := range s.leafs {
7,438✔
2654
                conns[i] = c
667✔
2655
        }
667✔
2656

2657
        // Number of done channel responses we expect.
2658
        doneExpected := 0
6,771✔
2659

6,771✔
2660
        // Kick client AcceptLoop()
6,771✔
2661
        if s.listener != nil {
13,462✔
2662
                doneExpected++
6,691✔
2663
                s.listener.Close()
6,691✔
2664
                s.listener = nil
6,691✔
2665
        }
6,691✔
2666

2667
        // Kick websocket server
2668
        doneExpected += s.closeWebsocketServer()
6,771✔
2669

6,771✔
2670
        // Kick MQTT accept loop
6,771✔
2671
        if s.mqtt.listener != nil {
7,029✔
2672
                doneExpected++
258✔
2673
                s.mqtt.listener.Close()
258✔
2674
                s.mqtt.listener = nil
258✔
2675
        }
258✔
2676

2677
        // Kick leafnodes AcceptLoop()
2678
        if s.leafNodeListener != nil {
10,540✔
2679
                doneExpected++
3,769✔
2680
                s.leafNodeListener.Close()
3,769✔
2681
                s.leafNodeListener = nil
3,769✔
2682
        }
3,769✔
2683

2684
        // Kick route AcceptLoop()
2685
        if s.routeListener != nil {
11,323✔
2686
                doneExpected++
4,552✔
2687
                s.routeListener.Close()
4,552✔
2688
                s.routeListener = nil
4,552✔
2689
        }
4,552✔
2690

2691
        // Kick Gateway AcceptLoop()
2692
        if s.gatewayListener != nil {
7,927✔
2693
                doneExpected++
1,156✔
2694
                s.gatewayListener.Close()
1,156✔
2695
                s.gatewayListener = nil
1,156✔
2696
        }
1,156✔
2697

2698
        // Kick HTTP monitoring if its running
2699
        if s.http != nil {
7,821✔
2700
                doneExpected++
1,050✔
2701
                s.http.Close()
1,050✔
2702
                s.http = nil
1,050✔
2703
        }
1,050✔
2704

2705
        // Kick Profiling if its running
2706
        if s.profiler != nil {
6,773✔
2707
                doneExpected++
2✔
2708
                s.profiler.Close()
2✔
2709
        }
2✔
2710

2711
        s.mu.Unlock()
6,771✔
2712

6,771✔
2713
        // Release go routines that wait on that channel
6,771✔
2714
        close(s.quitCh)
6,771✔
2715

6,771✔
2716
        // Close client and route connections
6,771✔
2717
        for _, c := range conns {
27,411✔
2718
                c.setNoReconnect()
20,640✔
2719
                c.closeConnection(ServerShutdown)
20,640✔
2720
        }
20,640✔
2721

2722
        // Block until the accept loops exit
2723
        for doneExpected > 0 {
24,371✔
2724
                <-s.done
17,600✔
2725
                doneExpected--
17,600✔
2726
        }
17,600✔
2727

2728
        // Wait for go routines to be done.
2729
        s.grWG.Wait()
6,771✔
2730

6,771✔
2731
        if opts.PortsFileDir != _EMPTY_ {
6,773✔
2732
                s.deletePortsFile(opts.PortsFileDir)
2✔
2733
        }
2✔
2734

2735
        s.Noticef("Server Exiting..")
6,771✔
2736

6,771✔
2737
        // Stop OCSP Response Cache
6,771✔
2738
        if s.ocsprc != nil {
6,821✔
2739
                s.ocsprc.Stop(s)
50✔
2740
        }
50✔
2741

2742
        // Close logger if applicable. It allows tests on Windows
2743
        // to be able to do proper cleanup (delete log file).
2744
        s.logging.RLock()
6,771✔
2745
        log := s.logging.logger
6,771✔
2746
        s.logging.RUnlock()
6,771✔
2747
        if log != nil {
7,358✔
2748
                if l, ok := log.(*logger.Logger); ok {
662✔
2749
                        l.Close()
75✔
2750
                }
75✔
2751
        }
2752
        // Notify that the shutdown is complete
2753
        close(s.shutdownComplete)
6,771✔
2754
}
2755

2756
// Close the websocket server if running. If so, returns 1, else 0.
2757
// Server lock held on entry.
2758
func (s *Server) closeWebsocketServer() int {
6,777✔
2759
        ws := &s.websocket
6,777✔
2760
        ws.mu.Lock()
6,777✔
2761
        hs := ws.server
6,777✔
2762
        if hs != nil {
6,899✔
2763
                ws.server = nil
122✔
2764
                ws.listener = nil
122✔
2765
        }
122✔
2766
        ws.mu.Unlock()
6,777✔
2767
        if hs != nil {
6,899✔
2768
                hs.Close()
122✔
2769
                return 1
122✔
2770
        }
122✔
2771
        return 0
6,655✔
2772
}
2773

2774
// WaitForShutdown will block until the server has been fully shutdown.
2775
func (s *Server) WaitForShutdown() {
3,242✔
2776
        <-s.shutdownComplete
3,242✔
2777
}
3,242✔
2778

2779
// AcceptLoop is exported for easier testing.
2780
func (s *Server) AcceptLoop(clr chan struct{}) {
6,703✔
2781
        // If we were to exit before the listener is setup properly,
6,703✔
2782
        // make sure we close the channel.
6,703✔
2783
        defer func() {
13,406✔
2784
                if clr != nil {
6,704✔
2785
                        close(clr)
1✔
2786
                }
1✔
2787
        }()
2788

2789
        if s.isShuttingDown() {
6,704✔
2790
                return
1✔
2791
        }
1✔
2792

2793
        // Snapshot server options.
2794
        opts := s.getOpts()
6,702✔
2795

6,702✔
2796
        // Setup state that can enable shutdown
6,702✔
2797
        s.mu.Lock()
6,702✔
2798
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
6,702✔
2799
        l, e := s.getServerListener(hp)
6,702✔
2800
        s.listenerErr = e
6,702✔
2801
        if e != nil {
6,702✔
2802
                s.mu.Unlock()
×
2803
                s.Fatalf("Error listening on port: %s, %q", hp, e)
×
2804
                return
×
2805
        }
×
2806
        s.Noticef("Listening for client connections on %s",
6,702✔
2807
                net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
6,702✔
2808

6,702✔
2809
        // Alert if PROXY protocol is enabled
6,702✔
2810
        if opts.ProxyProtocol {
6,705✔
2811
                s.Noticef("PROXY protocol enabled for client connections")
3✔
2812
        }
3✔
2813

2814
        // Alert of TLS enabled.
2815
        if opts.TLSConfig != nil {
6,863✔
2816
                s.Noticef("TLS required for client connections")
161✔
2817
                if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
167✔
2818
                        s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
6✔
2819
                }
6✔
2820
        }
2821

2822
        // If server was started with RANDOM_PORT (-1), opts.Port would be equal
2823
        // to 0 at the beginning this function. So we need to get the actual port
2824
        if opts.Port == 0 {
12,857✔
2825
                // Write resolved port back to options.
6,155✔
2826
                opts.Port = l.Addr().(*net.TCPAddr).Port
6,155✔
2827
        }
6,155✔
2828

2829
        // Now that port has been set (if it was set to RANDOM), set the
2830
        // server's info Host/Port with either values from Options or
2831
        // ClientAdvertise.
2832
        if err := s.setInfoHostPort(); err != nil {
6,702✔
2833
                s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
×
2834
                l.Close()
×
2835
                s.mu.Unlock()
×
2836
                return
×
2837
        }
×
2838
        // Keep track of client connect URLs. We may need them later.
2839
        s.clientConnectURLs = s.getClientConnectURLs()
6,702✔
2840
        s.listener = l
6,702✔
2841

6,702✔
2842
        go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
16,297✔
2843
                func(_ error) bool {
6,697✔
2844
                        if s.isLameDuckMode() {
6,703✔
2845
                                // Signal that we are not accepting new clients
6✔
2846
                                s.ldmCh <- true
6✔
2847
                                // Now wait for the Shutdown...
6✔
2848
                                <-s.quitCh
6✔
2849
                                return true
6✔
2850
                        }
6✔
2851
                        return false
6,691✔
2852
                })
2853
        s.mu.Unlock()
6,702✔
2854

6,702✔
2855
        // Let the caller know that we are ready
6,702✔
2856
        close(clr)
6,702✔
2857
        clr = nil
6,702✔
2858
}
2859

2860
// getServerListener returns a network listener for the given host-port address.
2861
// If the Server already has an active listener (s.listener), it returns that listener
2862
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
2863
// a new TCP listener on the specified address using natsListen.
2864
func (s *Server) getServerListener(hp string) (net.Listener, error) {
6,702✔
2865
        if s.listener != nil {
6,702✔
2866
                return s.listener, s.listenerErr
×
2867
        }
×
2868

2869
        return natsListen("tcp", hp)
6,702✔
2870
}
2871

2872
// InProcessConn returns an in-process connection to the server,
2873
// avoiding the need to use a TCP listener for local connectivity
2874
// within the same process. This can be used regardless of the
2875
// state of the DontListen option.
2876
func (s *Server) InProcessConn() (net.Conn, error) {
8✔
2877
        pl, pr := net.Pipe()
8✔
2878
        if !s.startGoRoutine(func() {
16✔
2879
                s.createClientInProcess(pl)
8✔
2880
                s.grWG.Done()
8✔
2881
        }) {
8✔
2882
                pl.Close()
×
2883
                pr.Close()
×
2884
                return nil, fmt.Errorf("failed to create connection")
×
2885
        }
×
2886
        return pr, nil
8✔
2887
}
2888

2889
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
16,442✔
2890
        tmpDelay := ACCEPT_MIN_SLEEP
16,442✔
2891

16,442✔
2892
        for {
77,406✔
2893
                conn, err := l.Accept()
60,964✔
2894
                if err != nil {
77,396✔
2895
                        if errFunc != nil && errFunc(err) {
16,438✔
2896
                                return
6✔
2897
                        }
6✔
2898
                        if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
32,852✔
2899
                                break
16,426✔
2900
                        }
2901
                        continue
×
2902
                }
2903
                tmpDelay = ACCEPT_MIN_SLEEP
44,522✔
2904
                if !s.startGoRoutine(func() {
89,037✔
2905
                        s.reloadMu.RLock()
44,515✔
2906
                        createFunc(conn)
44,515✔
2907
                        s.reloadMu.RUnlock()
44,515✔
2908
                        s.grWG.Done()
44,515✔
2909
                }) {
44,522✔
2910
                        conn.Close()
7✔
2911
                }
7✔
2912
        }
2913
        s.Debugf(acceptName + " accept loop exiting..")
16,426✔
2914
        s.done <- true
16,426✔
2915
}
2916

2917
// This function sets the server's info Host/Port based on server Options.
2918
// Note that this function may be called during config reload, this is why
2919
// Host/Port may be reset to original Options if the ClientAdvertise option
2920
// is not set (since it may have previously been).
2921
func (s *Server) setInfoHostPort() error {
13,623✔
2922
        // When this function is called, opts.Port is set to the actual listen
13,623✔
2923
        // port (if option was originally set to RANDOM), even during a config
13,623✔
2924
        // reload. So use of s.opts.Port is safe.
13,623✔
2925
        opts := s.getOpts()
13,623✔
2926
        if opts.ClientAdvertise != _EMPTY_ {
13,630✔
2927
                h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
7✔
2928
                if err != nil {
7✔
2929
                        return err
×
2930
                }
×
2931
                s.info.Host = h
7✔
2932
                s.info.Port = p
7✔
2933
        } else {
13,616✔
2934
                s.info.Host = opts.Host
13,616✔
2935
                s.info.Port = opts.Port
13,616✔
2936
        }
13,616✔
2937
        return nil
13,623✔
2938
}
2939

2940
// StartProfiler is called to enable dynamic profiling.
2941
func (s *Server) StartProfiler() {
2✔
2942
        if s.isShuttingDown() {
2✔
2943
                return
×
2944
        }
×
2945

2946
        // Snapshot server options.
2947
        opts := s.getOpts()
2✔
2948

2✔
2949
        port := opts.ProfPort
2✔
2950

2✔
2951
        // Check for Random Port
2✔
2952
        if port == -1 {
3✔
2953
                port = 0
1✔
2954
        }
1✔
2955

2956
        s.mu.Lock()
2✔
2957
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
2✔
2958
        l, err := net.Listen("tcp", hp)
2✔
2959

2✔
2960
        if err != nil {
2✔
2961
                s.mu.Unlock()
×
2962
                s.Fatalf("error starting profiler: %s", err)
×
2963
                return
×
2964
        }
×
2965
        s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
2✔
2966

2✔
2967
        srv := &http.Server{
2✔
2968
                Addr:           hp,
2✔
2969
                Handler:        http.DefaultServeMux,
2✔
2970
                MaxHeaderBytes: 1 << 20,
2✔
2971
                ReadTimeout:    time.Second * 5,
2✔
2972
        }
2✔
2973
        s.profiler = l
2✔
2974
        s.profilingServer = srv
2✔
2975

2✔
2976
        s.setBlockProfileRate(opts.ProfBlockRate)
2✔
2977

2✔
2978
        go func() {
4✔
2979
                // if this errors out, it's probably because the server is being shutdown
2✔
2980
                err := srv.Serve(l)
2✔
2981
                if err != nil {
4✔
2982
                        if !s.isShuttingDown() {
2✔
2983
                                s.Fatalf("error starting profiler: %s", err)
×
2984
                        }
×
2985
                }
2986
                srv.Close()
2✔
2987
                s.done <- true
2✔
2988
        }()
2989
        s.mu.Unlock()
2✔
2990
}
2991

2992
func (s *Server) setBlockProfileRate(rate int) {
6,707✔
2993
        // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
6,707✔
2994
        runtime.SetBlockProfileRate(rate)
6,707✔
2995

6,707✔
2996
        if rate > 0 {
6,707✔
2997
                s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
×
2998
        }
×
2999
}
3000

3001
// StartHTTPMonitoring will enable the HTTP monitoring port.
3002
// DEPRECATED: Should use StartMonitoring.
3003
func (s *Server) StartHTTPMonitoring() {
×
3004
        s.startMonitoring(false)
×
3005
}
×
3006

3007
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
3008
// DEPRECATED: Should use StartMonitoring.
3009
func (s *Server) StartHTTPSMonitoring() {
×
3010
        s.startMonitoring(true)
×
3011
}
×
3012

3013
// StartMonitoring starts the HTTP or HTTPs server if needed.
3014
func (s *Server) StartMonitoring() error {
6,710✔
3015
        // Snapshot server options.
6,710✔
3016
        opts := s.getOpts()
6,710✔
3017

6,710✔
3018
        // Specifying both HTTP and HTTPS ports is a misconfiguration
6,710✔
3019
        if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
6,711✔
3020
                return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
1✔
3021
        }
1✔
3022
        var err error
6,709✔
3023
        if opts.HTTPPort != 0 {
7,749✔
3024
                err = s.startMonitoring(false)
1,040✔
3025
        } else if opts.HTTPSPort != 0 {
6,724✔
3026
                if opts.TLSConfig == nil {
17✔
3027
                        return fmt.Errorf("TLS cert and key required for HTTPS")
2✔
3028
                }
2✔
3029
                err = s.startMonitoring(true)
13✔
3030
        }
3031
        return err
6,707✔
3032
}
3033

3034
// HTTP endpoints
3035
const (
3036
        RootPath         = "/"
3037
        VarzPath         = "/varz"
3038
        ConnzPath        = "/connz"
3039
        RoutezPath       = "/routez"
3040
        GatewayzPath     = "/gatewayz"
3041
        LeafzPath        = "/leafz"
3042
        SubszPath        = "/subsz"
3043
        StackszPath      = "/stacksz"
3044
        AccountzPath     = "/accountz"
3045
        AccountStatzPath = "/accstatz"
3046
        JszPath          = "/jsz"
3047
        HealthzPath      = "/healthz"
3048
        IPQueuesPath     = "/ipqueuesz"
3049
        RaftzPath        = "/raftz"
3050
        ExpvarzPath      = "/debug/vars"
3051
)
3052

3053
func (s *Server) basePath(p string) string {
16,864✔
3054
        return path.Join(s.httpBasePath, p)
16,864✔
3055
}
16,864✔
3056

3057
type captureHTTPServerLog struct {
3058
        s      *Server
3059
        prefix string
3060
}
3061

3062
func (cl *captureHTTPServerLog) Write(p []byte) (int, error) {
3✔
3063
        var buf [128]byte
3✔
3064
        var b = buf[:0]
3✔
3065

3✔
3066
        b = append(b, []byte(cl.prefix)...)
3✔
3067
        offset := 0
3✔
3068
        if bytes.HasPrefix(p, []byte("http:")) {
6✔
3069
                offset = 6
3✔
3070
        }
3✔
3071
        b = append(b, p[offset:]...)
3✔
3072
        cl.s.Errorf(string(b))
3✔
3073
        return len(p), nil
3✔
3074
}
3075

3076
// The TLS configuration is passed to the listener when the monitoring
3077
// "server" is setup. That prevents TLS configuration updates on reload
3078
// from being used. By setting this function in tls.Config.GetConfigForClient
3079
// we instruct the TLS handshake to ask for the tls configuration to be
3080
// used for a specific client. We don't care which client, we always use
3081
// the same TLS configuration.
3082
func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, error) {
4✔
3083
        opts := s.getOpts()
4✔
3084
        tc := opts.TLSConfig.Clone()
4✔
3085
        tc.ClientAuth = tls.NoClientCert
4✔
3086
        return tc, nil
4✔
3087
}
4✔
3088

3089
// Start the monitoring server
3090
func (s *Server) startMonitoring(secure bool) error {
1,053✔
3091
        if s.isShuttingDown() {
1,054✔
3092
                return nil
1✔
3093
        }
1✔
3094

3095
        // Snapshot server options.
3096
        opts := s.getOpts()
1,052✔
3097

1,052✔
3098
        var (
1,052✔
3099
                hp           string
1,052✔
3100
                err          error
1,052✔
3101
                httpListener net.Listener
1,052✔
3102
                port         int
1,052✔
3103
        )
1,052✔
3104

1,052✔
3105
        monitorProtocol := "http"
1,052✔
3106

1,052✔
3107
        if secure {
1,065✔
3108
                monitorProtocol += "s"
13✔
3109
                port = opts.HTTPSPort
13✔
3110
                if port == -1 {
17✔
3111
                        port = 0
4✔
3112
                }
4✔
3113
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
13✔
3114
                config := opts.TLSConfig.Clone()
13✔
3115
                if !s.ocspPeerVerify {
25✔
3116
                        config.GetConfigForClient = s.getMonitoringTLSConfig
12✔
3117
                        config.ClientAuth = tls.NoClientCert
12✔
3118
                }
12✔
3119
                httpListener, err = tls.Listen("tcp", hp, config)
13✔
3120

3121
        } else {
1,039✔
3122
                port = opts.HTTPPort
1,039✔
3123
                if port == -1 {
1,881✔
3124
                        port = 0
842✔
3125
                }
842✔
3126
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
1,039✔
3127
                httpListener, err = net.Listen("tcp", hp)
1,039✔
3128
        }
3129

3130
        if err != nil {
1,053✔
3131
                return fmt.Errorf("can't listen to the monitor port: %v", err)
1✔
3132
        }
1✔
3133

3134
        rport := httpListener.Addr().(*net.TCPAddr).Port
1,051✔
3135
        s.Noticef("Starting %s monitor on %s", monitorProtocol, net.JoinHostPort(opts.HTTPHost, strconv.Itoa(rport)))
1,051✔
3136

1,051✔
3137
        mux := http.NewServeMux()
1,051✔
3138

1,051✔
3139
        // Root
1,051✔
3140
        mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
1,051✔
3141
        // Varz
1,051✔
3142
        mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
1,051✔
3143
        // Connz
1,051✔
3144
        mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
1,051✔
3145
        // Routez
1,051✔
3146
        mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
1,051✔
3147
        // Gatewayz
1,051✔
3148
        mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
1,051✔
3149
        // Leafz
1,051✔
3150
        mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
1,051✔
3151
        // Subz
1,051✔
3152
        mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
1,051✔
3153
        // Subz alias for backwards compatibility
1,051✔
3154
        mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
1,051✔
3155
        // Stacksz
1,051✔
3156
        mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
1,051✔
3157
        // Accountz
1,051✔
3158
        mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
1,051✔
3159
        // Accstatz
1,051✔
3160
        mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz)
1,051✔
3161
        // Jsz
1,051✔
3162
        mux.HandleFunc(s.basePath(JszPath), s.HandleJsz)
1,051✔
3163
        // Healthz
1,051✔
3164
        mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz)
1,051✔
3165
        // IPQueuesz
1,051✔
3166
        mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz)
1,051✔
3167
        // Raftz
1,051✔
3168
        mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz)
1,051✔
3169
        // Expvarz
1,051✔
3170
        mux.Handle(s.basePath(ExpvarzPath), expvar.Handler())
1,051✔
3171

1,051✔
3172
        // Do not set a WriteTimeout because it could cause cURL/browser
1,051✔
3173
        // to return empty response or unable to display page if the
1,051✔
3174
        // server needs more time to build the response.
1,051✔
3175
        srv := &http.Server{
1,051✔
3176
                Addr:              hp,
1,051✔
3177
                Handler:           mux,
1,051✔
3178
                MaxHeaderBytes:    1 << 20,
1,051✔
3179
                ErrorLog:          log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
1,051✔
3180
                ReadHeaderTimeout: time.Second * 5,
1,051✔
3181
        }
1,051✔
3182
        s.mu.Lock()
1,051✔
3183
        s.http = httpListener
1,051✔
3184
        s.httpHandler = mux
1,051✔
3185
        s.monitoringServer = srv
1,051✔
3186
        s.mu.Unlock()
1,051✔
3187

1,051✔
3188
        go func() {
2,102✔
3189
                if err := srv.Serve(httpListener); err != nil {
2,101✔
3190
                        if !s.isShuttingDown() {
1,050✔
3191
                                s.Fatalf("Error starting monitor on %q: %v", hp, err)
×
3192
                        }
×
3193
                }
3194
                srv.Close()
1,050✔
3195
                s.mu.Lock()
1,050✔
3196
                s.httpHandler = nil
1,050✔
3197
                s.mu.Unlock()
1,050✔
3198
                s.done <- true
1,050✔
3199
        }()
3200

3201
        return nil
1,051✔
3202
}
3203

3204
// HTTPHandler returns the http.Handler object used to handle monitoring
3205
// endpoints. It will return nil if the server is not configured for
3206
// monitoring, or if the server has not been started yet (Server.Start()).
3207
func (s *Server) HTTPHandler() http.Handler {
2✔
3208
        s.mu.Lock()
2✔
3209
        defer s.mu.Unlock()
2✔
3210
        return s.httpHandler
2✔
3211
}
2✔
3212

3213
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
3214
// If updates are made to Info, this function should be consulted and updated.
3215
// Assume lock is held.
3216
func (s *Server) copyInfo() Info {
18,912✔
3217
        info := s.info
18,912✔
3218
        if len(info.ClientConnectURLs) > 0 {
30,167✔
3219
                info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
11,255✔
3220
        }
11,255✔
3221
        if len(info.WSConnectURLs) > 0 {
18,994✔
3222
                info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
82✔
3223
        }
82✔
3224
        return info
18,912✔
3225
}
3226

3227
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
3228
type tlsMixConn struct {
3229
        net.Conn
3230
        pre *bytes.Buffer
3231
}
3232

3233
// Read for our mixed multi-reader.
3234
func (c *tlsMixConn) Read(b []byte) (int, error) {
49✔
3235
        if c.pre != nil {
58✔
3236
                n, err := c.pre.Read(b)
9✔
3237
                if c.pre.Len() == 0 {
18✔
3238
                        c.pre = nil
9✔
3239
                }
9✔
3240
                return n, err
9✔
3241
        }
3242
        return c.Conn.Read(b)
40✔
3243
}
3244

3245
func (s *Server) createClient(conn net.Conn) *client {
9,814✔
3246
        return s.createClientEx(conn, false)
9,814✔
3247
}
9,814✔
3248

3249
func (s *Server) createClientInProcess(conn net.Conn) *client {
8✔
3250
        return s.createClientEx(conn, true)
8✔
3251
}
8✔
3252

3253
func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
9,822✔
3254
        // Snapshot server options.
9,822✔
3255
        opts := s.getOpts()
9,822✔
3256

9,822✔
3257
        maxPay := int32(opts.MaxPayload)
9,822✔
3258
        maxSubs := int32(opts.MaxSubs)
9,822✔
3259
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
9,822✔
3260
        if maxSubs == 0 {
19,643✔
3261
                maxSubs = -1
9,821✔
3262
        }
9,821✔
3263
        now := time.Now()
9,822✔
3264

9,822✔
3265
        c := &client{
9,822✔
3266
                srv:   s,
9,822✔
3267
                nc:    conn,
9,822✔
3268
                opts:  defaultOpts,
9,822✔
3269
                mpay:  maxPay,
9,822✔
3270
                msubs: maxSubs,
9,822✔
3271
                start: now,
9,822✔
3272
                last:  now,
9,822✔
3273
                iproc: inProcess,
9,822✔
3274
        }
9,822✔
3275

9,822✔
3276
        c.registerWithAccount(s.globalAccount())
9,822✔
3277

9,822✔
3278
        var info Info
9,822✔
3279
        var authRequired bool
9,822✔
3280

9,822✔
3281
        s.mu.Lock()
9,822✔
3282
        // Grab JSON info string
9,822✔
3283
        info = s.copyInfo()
9,822✔
3284
        if s.nonceRequired() {
12,323✔
3285
                // Nonce handling
2,501✔
3286
                var raw [nonceLen]byte
2,501✔
3287
                nonce := raw[:]
2,501✔
3288
                s.generateNonce(nonce)
2,501✔
3289
                info.Nonce = string(nonce)
2,501✔
3290
        }
2,501✔
3291
        c.nonce = []byte(info.Nonce)
9,822✔
3292
        authRequired = info.AuthRequired
9,822✔
3293

9,822✔
3294
        // Check to see if we have auth_required set but we also have a no_auth_user.
9,822✔
3295
        // If so set back to false.
9,822✔
3296
        if info.AuthRequired && opts.NoAuthUser != _EMPTY_ && opts.NoAuthUser != s.sysAccOnlyNoAuthUser {
10,009✔
3297
                info.AuthRequired = false
187✔
3298
        }
187✔
3299

3300
        // Check to see if this is an in-process connection with tls_required.
3301
        // If so, set as not required, but available.
3302
        if inProcess && info.TLSRequired {
9,826✔
3303
                info.TLSRequired = false
4✔
3304
                info.TLSAvailable = true
4✔
3305
        }
4✔
3306

3307
        s.totalClients++
9,822✔
3308
        s.mu.Unlock()
9,822✔
3309

9,822✔
3310
        // Grab lock
9,822✔
3311
        c.mu.Lock()
9,822✔
3312
        if authRequired {
17,040✔
3313
                c.flags.set(expectConnect)
7,218✔
3314
        }
7,218✔
3315

3316
        // Initialize
3317
        c.initClient()
9,822✔
3318

9,822✔
3319
        c.Debugf("Client connection created")
9,822✔
3320

9,822✔
3321
        // Save info.TLSRequired value since we may neeed to change it back and forth.
9,822✔
3322
        orgInfoTLSReq := info.TLSRequired
9,822✔
3323

9,822✔
3324
        var tlsFirstFallback time.Duration
9,822✔
3325
        // Check if we should do TLS first.
9,822✔
3326
        tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst
9,822✔
3327
        if tlsFirst {
9,836✔
3328
                // Make sure info.TLSRequired is set to true (it could be false
14✔
3329
                // if AllowNonTLS is enabled).
14✔
3330
                info.TLSRequired = true
14✔
3331
                // Get the fallback delay value if applicable.
14✔
3332
                if f := opts.TLSHandshakeFirstFallback; f > 0 {
19✔
3333
                        tlsFirstFallback = f
5✔
3334
                } else if inProcess {
17✔
3335
                        // For in-process connection, we will always have a fallback
3✔
3336
                        // delay. It allows support for non-TLS, TLS and "TLS First"
3✔
3337
                        // in-process clients to successfully connect.
3✔
3338
                        tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY
3✔
3339
                }
3✔
3340
        }
3341

3342
        // Decide if we are going to require TLS or not and generate INFO json.
3343
        // If we have ProxyProtocol enabled then we won't include the client
3344
        // IP in the initial INFO, as that would leak the proxy IP itself.
3345
        // In that case we'll send another INFO after the client introduces itself.
3346
        tlsRequired := info.TLSRequired
9,822✔
3347
        infoBytes := c.generateClientInfoJSON(info, !opts.ProxyProtocol)
9,822✔
3348

9,822✔
3349
        // Send our information, except if TLS and TLSHandshakeFirst is requested.
9,822✔
3350
        if !tlsFirst {
19,630✔
3351
                // Need to be sent in place since writeLoop cannot be started until
9,808✔
3352
                // TLS handshake is done (if applicable).
9,808✔
3353
                c.sendProtoNow(infoBytes)
9,808✔
3354
        }
9,808✔
3355

3356
        // Unlock to register
3357
        c.mu.Unlock()
9,822✔
3358

9,822✔
3359
        // Register with the server.
9,822✔
3360
        s.mu.Lock()
9,822✔
3361
        // If server is not running, Shutdown() may have already gathered the
9,822✔
3362
        // list of connections to close. It won't contain this one, so we need
9,822✔
3363
        // to bail out now otherwise the readLoop started down there would not
9,822✔
3364
        // be interrupted. Skip also if in lame duck mode.
9,822✔
3365
        if !s.isRunning() || s.ldm {
10,018✔
3366
                // There are some tests that create a server but don't start it,
196✔
3367
                // and use "async" clients and perform the parsing manually. Such
196✔
3368
                // clients would branch here (since server is not running). However,
196✔
3369
                // when a server was really running and has been shutdown, we must
196✔
3370
                // close this connection.
196✔
3371
                if s.isShuttingDown() {
197✔
3372
                        conn.Close()
1✔
3373
                }
1✔
3374
                s.mu.Unlock()
196✔
3375
                return c
196✔
3376
        }
3377

3378
        // If there is a max connections specified, check that adding
3379
        // this new client would not push us over the max
3380
        if opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn {
9,627✔
3381
                s.mu.Unlock()
1✔
3382
                c.maxConnExceeded()
1✔
3383
                return nil
1✔
3384
        }
1✔
3385
        s.clients[c.cid] = c
9,625✔
3386

9,625✔
3387
        s.mu.Unlock()
9,625✔
3388

9,625✔
3389
        // Re-Grab lock
9,625✔
3390
        c.mu.Lock()
9,625✔
3391

9,625✔
3392
        isClosed := c.isClosed()
9,625✔
3393
        var pre []byte
9,625✔
3394
        // We need first to check for "TLS First" fallback delay.
9,625✔
3395
        if !isClosed && tlsFirstFallback > 0 {
9,633✔
3396
                // We wait and see if we are getting any data. Since we did not send
8✔
3397
                // the INFO protocol yet, only clients that use TLS first should be
8✔
3398
                // sending data (the TLS handshake). We don't really check the content:
8✔
3399
                // if it is a rogue agent and not an actual client performing the
8✔
3400
                // TLS handshake, the error will be detected when performing the
8✔
3401
                // handshake on our side.
8✔
3402
                pre = make([]byte, 4)
8✔
3403
                c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
8✔
3404
                n, _ := io.ReadFull(c.nc, pre[:])
8✔
3405
                c.nc.SetReadDeadline(time.Time{})
8✔
3406
                // If we get any data (regardless of possible timeout), we will proceed
8✔
3407
                // with the TLS handshake.
8✔
3408
                if n > 0 {
11✔
3409
                        pre = pre[:n]
3✔
3410
                } else {
8✔
3411
                        // We did not get anything so we will send the INFO protocol.
5✔
3412
                        pre = nil
5✔
3413

5✔
3414
                        // Restore the original info.TLSRequired value if it is
5✔
3415
                        // different that the current value and regenerate infoBytes.
5✔
3416
                        if orgInfoTLSReq != info.TLSRequired {
9✔
3417
                                info.TLSRequired = orgInfoTLSReq
4✔
3418
                                infoBytes = c.generateClientInfoJSON(info, !opts.ProxyProtocol)
4✔
3419
                        }
4✔
3420
                        c.sendProtoNow(infoBytes)
5✔
3421
                        // Set the boolean to false for the rest of the function.
5✔
3422
                        tlsFirst = false
5✔
3423
                        // Check closed status again
5✔
3424
                        isClosed = c.isClosed()
5✔
3425
                }
3426
        }
3427
        // If we have both TLS and non-TLS allowed we need to see which
3428
        // one the client wants. We'll always allow this for in-process
3429
        // connections.
3430
        if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) {
9,634✔
3431
                pre = make([]byte, 6) // Minimum 6 bytes for proxy proto in next step.
9✔
3432
                c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
9✔
3433
                n, _ := io.ReadFull(c.nc, pre[:])
9✔
3434
                c.nc.SetReadDeadline(time.Time{})
9✔
3435
                pre = pre[:n]
9✔
3436
                if n > 0 && pre[0] == 0x16 {
12✔
3437
                        tlsRequired = true
3✔
3438
                } else {
9✔
3439
                        tlsRequired = false
6✔
3440
                }
6✔
3441
        }
3442

3443
        // Check for proxy protocol if enabled.
3444
        if !isClosed && !tlsRequired && opts.ProxyProtocol {
9,628✔
3445
                if len(pre) == 0 {
6✔
3446
                        // There has been no pre-read yet, do so so we can work out
3✔
3447
                        // if the client is trying to negotiate PROXY.
3✔
3448
                        pre = make([]byte, 6)
3✔
3449
                        c.nc.SetReadDeadline(time.Now().Add(proxyProtoReadTimeout))
3✔
3450
                        n, _ := io.ReadFull(c.nc, pre)
3✔
3451
                        c.nc.SetReadDeadline(time.Time{})
3✔
3452
                        pre = pre[:n]
3✔
3453
                }
3✔
3454
                conn = &tlsMixConn{conn, bytes.NewBuffer(pre)}
3✔
3455
                addr, err := readProxyProtoHeader(conn)
3✔
3456
                if err != nil && err != errProxyProtoUnrecognized {
3✔
3457
                        // err != errProxyProtoUnrecognized implies that we detected a proxy
×
3458
                        // protocol header but we failed to parse it, so don't continue.
×
3459
                        c.mu.Unlock()
×
3460
                        s.Warnf("Error reading PROXY protocol header from %s: %v", conn.RemoteAddr(), err)
×
3461
                        c.closeConnection(ProtocolViolation)
×
3462
                        return nil
×
3463
                }
×
3464
                // If addr is nil, it was a LOCAL/UNKNOWN command (health check)
3465
                // Use the connection as-is
3466
                if addr != nil {
5✔
3467
                        c.nc = &proxyConn{
2✔
3468
                                Conn:       conn,
2✔
3469
                                remoteAddr: addr,
2✔
3470
                        }
2✔
3471
                        // These were set already by initClient, override them.
2✔
3472
                        c.host = addr.srcIP.String()
2✔
3473
                        c.port = addr.srcPort
2✔
3474
                }
2✔
3475
                // At this point, err is either:
3476
                //  - nil => we parsed the proxy protocol header successfully
3477
                //  - errProxyProtoUnrecognized => we didn't detect proxy protocol at all
3478
                // We only clear the pre-read if we successfully read the protocol header
3479
                // so that the next step doesn't re-read it. Otherwise we have to assume
3480
                // that it's a non-proxied connection and we want the pre-read to remain
3481
                // for the next step.
3482
                if err == nil {
6✔
3483
                        pre = nil
3✔
3484
                }
3✔
3485
                // Because we have ProxyProtocol enabled, our earlier INFO message didn't
3486
                // include the client_ip. If we need to send it again then we will include
3487
                // it, but sending it here immediately can confuse clients who have just
3488
                // PING'd.
3489
                infoBytes = c.generateClientInfoJSON(info, true)
3✔
3490
        }
3491

3492
        // Check for TLS
3493
        if !isClosed && tlsRequired {
10,002✔
3494
                if s.connRateCounter != nil && !s.connRateCounter.allow() {
378✔
3495
                        c.mu.Unlock()
1✔
3496
                        c.sendErr("Connection throttling is active. Please try again later.")
1✔
3497
                        c.closeConnection(MaxConnectionsExceeded)
1✔
3498
                        return nil
1✔
3499
                }
1✔
3500

3501
                // If we have a prebuffer create a multi-reader.
3502
                if len(pre) > 0 {
382✔
3503
                        c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
6✔
3504
                        // Clear pre so it is not parsed.
6✔
3505
                        pre = nil
6✔
3506
                }
6✔
3507
                // Performs server-side TLS handshake.
3508
                if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout, opts.TLSPinnedCerts); err != nil {
509✔
3509
                        c.mu.Unlock()
133✔
3510
                        return nil
133✔
3511
                }
133✔
3512
        }
3513

3514
        // Now, send the INFO if it was delayed
3515
        if !isClosed && tlsFirst {
9,497✔
3516
                c.flags.set(didTLSFirst)
6✔
3517
                c.sendProtoNow(infoBytes)
6✔
3518
                // Check closed status
6✔
3519
                isClosed = c.isClosed()
6✔
3520
        }
6✔
3521

3522
        // Connection could have been closed while sending the INFO proto.
3523
        if isClosed {
9,494✔
3524
                c.mu.Unlock()
3✔
3525
                // We need to call closeConnection() to make sure that proper cleanup is done.
3✔
3526
                c.closeConnection(WriteError)
3✔
3527
                return nil
3✔
3528
        }
3✔
3529

3530
        // Check for Auth. We schedule this timer after the TLS handshake to avoid
3531
        // the race where the timer fires during the handshake and causes the
3532
        // server to write bad data to the socket. See issue #432.
3533
        if authRequired {
16,580✔
3534
                c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
7,092✔
3535
        }
7,092✔
3536

3537
        // Do final client initialization
3538

3539
        // Set the Ping timer. Will be reset once connect was received.
3540
        c.setPingTimer()
9,488✔
3541

9,488✔
3542
        // Spin up the read loop.
9,488✔
3543
        s.startGoRoutine(func() { c.readLoop(pre) })
18,976✔
3544

3545
        // Spin up the write loop.
3546
        s.startGoRoutine(func() { c.writeLoop() })
18,976✔
3547

3548
        if tlsRequired {
9,731✔
3549
                c.Debugf("TLS handshake complete")
243✔
3550
                cs := c.nc.(*tls.Conn).ConnectionState()
243✔
3551
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
243✔
3552
        }
243✔
3553

3554
        c.mu.Unlock()
9,488✔
3555

9,488✔
3556
        return c
9,488✔
3557
}
3558

3559
// This will save off a closed client in a ring buffer such that
3560
// /connz can inspect. Useful for debugging, etc.
3561
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
12,369✔
3562
        now := time.Now()
12,369✔
3563

12,369✔
3564
        s.accountDisconnectEvent(c, now, reason.String())
12,369✔
3565

12,369✔
3566
        c.mu.Lock()
12,369✔
3567

12,369✔
3568
        cc := &closedClient{}
12,369✔
3569
        cc.fill(c, nc, now, false)
12,369✔
3570
        // Note that cc.fill is using len(c.subs), which may have been set to nil by now,
12,369✔
3571
        // so replace cc.NumSubs with len(subs).
12,369✔
3572
        cc.NumSubs = uint32(len(subs))
12,369✔
3573
        cc.Stop = &now
12,369✔
3574
        cc.Reason = reason.String()
12,369✔
3575

12,369✔
3576
        // Do subs, do not place by default in main ConnInfo
12,369✔
3577
        if len(subs) > 0 {
22,060✔
3578
                cc.subs = make([]SubDetail, 0, len(subs))
9,691✔
3579
                for _, sub := range subs {
159,706✔
3580
                        cc.subs = append(cc.subs, newSubDetail(sub))
150,015✔
3581
                }
150,015✔
3582
        }
3583
        // Hold user as well.
3584
        cc.user = c.getRawAuthUser()
12,369✔
3585
        // Hold account name if not the global account.
12,369✔
3586
        if c.acc != nil && c.acc.Name != globalAccountName {
19,034✔
3587
                cc.acc = c.acc.Name
6,665✔
3588
        }
6,665✔
3589
        cc.JWT = c.opts.JWT
12,369✔
3590
        cc.IssuerKey = issuerForClient(c)
12,369✔
3591
        cc.Tags = c.tags
12,369✔
3592
        cc.NameTag = c.nameTag
12,369✔
3593
        c.mu.Unlock()
12,369✔
3594

12,369✔
3595
        // Place in the ring buffer
12,369✔
3596
        s.mu.Lock()
12,369✔
3597
        if s.closed != nil {
24,737✔
3598
                s.closed.append(cc)
12,368✔
3599
        }
12,368✔
3600
        s.mu.Unlock()
12,369✔
3601
}
3602

3603
// Adds to the list of client and websocket clients connect URLs.
3604
// If there was a change, an INFO protocol is sent to registered clients
3605
// that support async INFO protocols.
3606
// Server lock held on entry.
3607
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,057✔
3608
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
9,057✔
3609
}
9,057✔
3610

3611
// Removes from the list of client and websocket clients connect URLs.
3612
// If there was a change, an INFO protocol is sent to registered clients
3613
// that support async INFO protocols.
3614
// Server lock held on entry.
3615
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,047✔
3616
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
9,047✔
3617
}
9,047✔
3618

3619
// Updates the list of client and websocket clients connect URLs and if any change
3620
// sends an async INFO update to clients that support it.
3621
// Server lock held on entry.
3622
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
18,104✔
3623
        remove := !add
18,104✔
3624
        // Will return true if we need alter the server's Info object.
18,104✔
3625
        updateMap := func(urls []string, m refCountedUrlSet) bool {
54,312✔
3626
                wasUpdated := false
36,208✔
3627
                for _, url := range urls {
54,916✔
3628
                        if add && m.addUrl(url) {
28,060✔
3629
                                wasUpdated = true
9,352✔
3630
                        } else if remove && m.removeUrl(url) {
28,052✔
3631
                                wasUpdated = true
9,344✔
3632
                        }
9,344✔
3633
                }
3634
                return wasUpdated
36,208✔
3635
        }
3636
        cliUpdated := updateMap(curls, s.clientConnectURLsMap)
18,104✔
3637
        wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
18,104✔
3638

18,104✔
3639
        updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
36,526✔
3640
                // Recreate the info's slice from the map
18,422✔
3641
                *infoURLs = (*infoURLs)[:0]
18,422✔
3642
                // Add this server client connect ULRs first...
18,422✔
3643
                *infoURLs = append(*infoURLs, urls...)
18,422✔
3644
                // Then the ones from the map
18,422✔
3645
                for url := range m {
42,669✔
3646
                        *infoURLs = append(*infoURLs, url)
24,247✔
3647
                }
24,247✔
3648
        }
3649
        if cliUpdated {
36,180✔
3650
                updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
18,076✔
3651
        }
18,076✔
3652
        if wsUpdated {
18,450✔
3653
                updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
346✔
3654
        }
346✔
3655
        if cliUpdated || wsUpdated {
36,180✔
3656
                // Send to all registered clients that support async INFO protocols.
18,076✔
3657
                s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
18,076✔
3658
        }
18,076✔
3659
}
3660

3661
// Handle closing down a connection when the handshake has timedout.
3662
func tlsTimeout(c *client, conn *tls.Conn) {
38✔
3663
        c.mu.Lock()
38✔
3664
        closed := c.isClosed()
38✔
3665
        c.mu.Unlock()
38✔
3666
        // Check if already closed
38✔
3667
        if closed {
44✔
3668
                return
6✔
3669
        }
6✔
3670
        cs := conn.ConnectionState()
32✔
3671
        if !cs.HandshakeComplete {
38✔
3672
                c.Errorf("TLS handshake timeout")
6✔
3673
                c.sendErr("Secure Connection - TLS Required")
6✔
3674
                c.closeConnection(TLSHandshakeError)
6✔
3675
        }
6✔
3676
}
3677

3678
// Seems silly we have to write these
3679
func tlsVersion(ver uint16) string {
1,215✔
3680
        switch ver {
1,215✔
3681
        case tls.VersionTLS10:
×
3682
                return "1.0"
×
3683
        case tls.VersionTLS11:
×
3684
                return "1.1"
×
3685
        case tls.VersionTLS12:
×
3686
                return "1.2"
×
3687
        case tls.VersionTLS13:
1,215✔
3688
                return "1.3"
1,215✔
3689
        }
3690
        return fmt.Sprintf("Unknown [0x%x]", ver)
×
3691
}
3692

3693
func tlsVersionFromString(ver string) (uint16, error) {
×
3694
        switch ver {
×
3695
        case "1.0":
×
3696
                return tls.VersionTLS10, nil
×
3697
        case "1.1":
×
3698
                return tls.VersionTLS11, nil
×
3699
        case "1.2":
×
3700
                return tls.VersionTLS12, nil
×
3701
        case "1.3":
×
3702
                return tls.VersionTLS13, nil
×
3703
        }
3704
        return 0, fmt.Errorf("unknown version: %v", ver)
×
3705
}
3706

3707
// Remove a client or route from our internal accounting.
3708
func (s *Server) removeClient(c *client) {
146,588✔
3709
        // kind is immutable, so can check without lock
146,588✔
3710
        switch c.kind {
146,588✔
3711
        case CLIENT:
10,710✔
3712
                c.mu.Lock()
10,710✔
3713
                cid := c.cid
10,710✔
3714
                updateProtoInfoCount := false
10,710✔
3715
                if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
19,586✔
3716
                        updateProtoInfoCount = true
8,876✔
3717
                }
8,876✔
3718
                proxyKey := c.proxyKey
10,710✔
3719
                c.mu.Unlock()
10,710✔
3720

10,710✔
3721
                s.mu.Lock()
10,710✔
3722
                delete(s.clients, cid)
10,710✔
3723
                if updateProtoInfoCount {
19,586✔
3724
                        s.cproto--
8,876✔
3725
                }
8,876✔
3726
                if proxyKey != _EMPTY_ {
10,714✔
3727
                        s.removeProxiedConn(proxyKey, cid)
4✔
3728
                }
4✔
3729
                s.mu.Unlock()
10,710✔
3730
        case ROUTER:
62,232✔
3731
                s.removeRoute(c)
62,232✔
3732
        case GATEWAY:
3,764✔
3733
                s.removeRemoteGatewayConnection(c)
3,764✔
3734
        case LEAF:
1,661✔
3735
                s.removeLeafNodeConnection(c)
1,661✔
3736
        }
3737
}
3738

3739
// Remove the connection with id `cid` from the map of connections
3740
// under the public key `key` of the trusted proxies.
3741
//
3742
// Server lock must be held on entry.
3743
func (s *Server) removeProxiedConn(key string, cid uint64) {
8✔
3744
        conns := s.proxiedConns[key]
8✔
3745
        delete(conns, cid)
8✔
3746
        if len(conns) == 0 {
16✔
3747
                delete(s.proxiedConns, key)
8✔
3748
        }
8✔
3749
}
3750

3751
func (s *Server) removeFromTempClients(cid uint64) {
106,263✔
3752
        s.grMu.Lock()
106,263✔
3753
        delete(s.grTmpClients, cid)
106,263✔
3754
        s.grMu.Unlock()
106,263✔
3755
}
106,263✔
3756

3757
func (s *Server) addToTempClients(cid uint64, c *client) bool {
67,360✔
3758
        added := false
67,360✔
3759
        s.grMu.Lock()
67,360✔
3760
        if s.grRunning {
134,714✔
3761
                s.grTmpClients[cid] = c
67,354✔
3762
                added = true
67,354✔
3763
        }
67,354✔
3764
        s.grMu.Unlock()
67,360✔
3765
        return added
67,360✔
3766
}
3767

3768
/////////////////////////////////////////////////////////////////
3769
// These are some helpers for accounting in functional tests.
3770
/////////////////////////////////////////////////////////////////
3771

3772
// NumRoutes will report the number of registered routes.
3773
func (s *Server) NumRoutes() int {
5,561✔
3774
        s.mu.RLock()
5,561✔
3775
        defer s.mu.RUnlock()
5,561✔
3776
        return s.numRoutes()
5,561✔
3777
}
5,561✔
3778

3779
// numRoutes will report the number of registered routes.
3780
// Server lock held on entry
3781
func (s *Server) numRoutes() int {
756,639✔
3782
        var nr int
756,639✔
3783
        s.forEachRoute(func(c *client) {
1,467,659✔
3784
                nr++
711,020✔
3785
        })
711,020✔
3786
        return nr
756,639✔
3787
}
3788

3789
// NumRemotes will report number of registered remotes.
3790
func (s *Server) NumRemotes() int {
×
3791
        s.mu.RLock()
×
3792
        defer s.mu.RUnlock()
×
3793
        return s.numRemotes()
×
3794
}
×
3795

3796
// numRemotes will report number of registered remotes.
3797
// Server lock held on entry
3798
func (s *Server) numRemotes() int {
28,588✔
3799
        return len(s.routes)
28,588✔
3800
}
28,588✔
3801

3802
// NumLeafNodes will report number of leaf node connections.
3803
func (s *Server) NumLeafNodes() int {
3,996✔
3804
        s.mu.RLock()
3,996✔
3805
        defer s.mu.RUnlock()
3,996✔
3806
        return len(s.leafs)
3,996✔
3807
}
3,996✔
3808

3809
// NumClients will report the number of registered clients.
3810
func (s *Server) NumClients() int {
68✔
3811
        s.mu.RLock()
68✔
3812
        defer s.mu.RUnlock()
68✔
3813
        return len(s.clients)
68✔
3814
}
68✔
3815

3816
// GetClient will return the client associated with cid.
3817
func (s *Server) GetClient(cid uint64) *client {
134✔
3818
        return s.getClient(cid)
134✔
3819
}
134✔
3820

3821
// getClient will return the client associated with cid.
3822
func (s *Server) getClient(cid uint64) *client {
164✔
3823
        s.mu.RLock()
164✔
3824
        defer s.mu.RUnlock()
164✔
3825
        return s.clients[cid]
164✔
3826
}
164✔
3827

3828
// GetLeafNode returns the leafnode associated with the cid.
3829
func (s *Server) GetLeafNode(cid uint64) *client {
1✔
3830
        s.mu.RLock()
1✔
3831
        defer s.mu.RUnlock()
1✔
3832
        return s.leafs[cid]
1✔
3833
}
1✔
3834

3835
// NumSubscriptions will report how many subscriptions are active.
3836
func (s *Server) NumSubscriptions() uint32 {
375✔
3837
        s.mu.RLock()
375✔
3838
        defer s.mu.RUnlock()
375✔
3839
        return s.numSubscriptions()
375✔
3840
}
375✔
3841

3842
// numSubscriptions will report how many subscriptions are active.
3843
// Lock should be held.
3844
func (s *Server) numSubscriptions() uint32 {
26,953✔
3845
        var subs int
26,953✔
3846
        s.accounts.Range(func(k, v any) bool {
88,055✔
3847
                acc := v.(*Account)
61,102✔
3848
                subs += acc.TotalSubs()
61,102✔
3849
                return true
61,102✔
3850
        })
61,102✔
3851
        return uint32(subs)
26,953✔
3852
}
3853

3854
// NumSlowConsumers will report the number of slow consumers.
3855
func (s *Server) NumSlowConsumers() int64 {
1✔
3856
        return atomic.LoadInt64(&s.slowConsumers)
1✔
3857
}
1✔
3858

3859
// NumStalledClients will report the total number of times clients have been stalled.
3860
func (s *Server) NumStalledClients() int64 {
×
3861
        return atomic.LoadInt64(&s.stalls)
×
3862
}
×
3863

3864
// NumSlowConsumersClients will report the number of slow consumers clients.
3865
func (s *Server) NumSlowConsumersClients() uint64 {
37,480✔
3866
        return s.scStats.clients.Load()
37,480✔
3867
}
37,480✔
3868

3869
// NumSlowConsumersRoutes will report the number of slow consumers routes.
3870
func (s *Server) NumSlowConsumersRoutes() uint64 {
37,480✔
3871
        return s.scStats.routes.Load()
37,480✔
3872
}
37,480✔
3873

3874
// NumSlowConsumersGateways will report the number of slow consumers leafs.
3875
func (s *Server) NumSlowConsumersGateways() uint64 {
37,482✔
3876
        return s.scStats.gateways.Load()
37,482✔
3877
}
37,482✔
3878

3879
// NumSlowConsumersLeafs will report the number of slow consumers leafs.
3880
func (s *Server) NumSlowConsumersLeafs() uint64 {
37,482✔
3881
        return s.scStats.leafs.Load()
37,482✔
3882
}
37,482✔
3883

3884
// NumStaleConnections will report the number of stale connections.
3885
func (s *Server) NumStaleConnections() int64 {
4✔
3886
        return atomic.LoadInt64(&s.staleConnections)
4✔
3887
}
4✔
3888

3889
// NumStaleConnectionsClients will report the number of stale client connections.
3890
func (s *Server) NumStaleConnectionsClients() uint64 {
37,484✔
3891
        return s.staleStats.clients.Load()
37,484✔
3892
}
37,484✔
3893

3894
// NumStaleConnectionsRoutes will report the number of stale route connections.
3895
func (s *Server) NumStaleConnectionsRoutes() uint64 {
37,484✔
3896
        return s.staleStats.routes.Load()
37,484✔
3897
}
37,484✔
3898

3899
// NumStaleConnectionsGateways will report the number of stale gateway connections.
3900
func (s *Server) NumStaleConnectionsGateways() uint64 {
37,484✔
3901
        return s.staleStats.gateways.Load()
37,484✔
3902
}
37,484✔
3903

3904
// NumStaleConnectionsLeafs will report the number of stale leaf connections.
3905
func (s *Server) NumStaleConnectionsLeafs() uint64 {
37,484✔
3906
        return s.staleStats.leafs.Load()
37,484✔
3907
}
37,484✔
3908

3909
// ConfigTime will report the last time the server configuration was loaded.
3910
func (s *Server) ConfigTime() time.Time {
10✔
3911
        s.mu.RLock()
10✔
3912
        defer s.mu.RUnlock()
10✔
3913
        return s.configTime
10✔
3914
}
10✔
3915

3916
// Addr will return the net.Addr object for the current listener.
3917
func (s *Server) Addr() net.Addr {
226✔
3918
        s.mu.RLock()
226✔
3919
        defer s.mu.RUnlock()
226✔
3920
        if s.listener == nil {
226✔
3921
                return nil
×
3922
        }
×
3923
        return s.listener.Addr()
226✔
3924
}
3925

3926
// MonitorAddr will return the net.Addr object for the monitoring listener.
3927
func (s *Server) MonitorAddr() *net.TCPAddr {
136✔
3928
        s.mu.RLock()
136✔
3929
        defer s.mu.RUnlock()
136✔
3930
        if s.http == nil {
136✔
3931
                return nil
×
3932
        }
×
3933
        return s.http.Addr().(*net.TCPAddr)
136✔
3934
}
3935

3936
// ClusterAddr returns the net.Addr object for the route listener.
3937
func (s *Server) ClusterAddr() *net.TCPAddr {
38✔
3938
        s.mu.RLock()
38✔
3939
        defer s.mu.RUnlock()
38✔
3940
        if s.routeListener == nil {
38✔
3941
                return nil
×
3942
        }
×
3943
        return s.routeListener.Addr().(*net.TCPAddr)
38✔
3944
}
3945

3946
// ProfilerAddr returns the net.Addr object for the profiler listener.
3947
func (s *Server) ProfilerAddr() *net.TCPAddr {
×
3948
        s.mu.RLock()
×
3949
        defer s.mu.RUnlock()
×
3950
        if s.profiler == nil {
×
3951
                return nil
×
3952
        }
×
3953
        return s.profiler.Addr().(*net.TCPAddr)
×
3954
}
3955

3956
func (s *Server) readyForConnections(d time.Duration) error {
10,251✔
3957
        // Snapshot server options.
10,251✔
3958
        opts := s.getOpts()
10,251✔
3959

10,251✔
3960
        type info struct {
10,251✔
3961
                ok  bool
10,251✔
3962
                err error
10,251✔
3963
        }
10,251✔
3964
        chk := make(map[string]info)
10,251✔
3965

10,251✔
3966
        end := time.Now().Add(d)
10,251✔
3967
        for time.Now().Before(end) {
49,395✔
3968
                s.mu.RLock()
39,144✔
3969
                chk["server"] = info{ok: s.listener != nil || opts.DontListen, err: s.listenerErr}
39,144✔
3970
                chk["route"] = info{ok: (opts.Cluster.Port == 0 || s.routeListener != nil), err: s.routeListenerErr}
39,144✔
3971
                chk["gateway"] = info{ok: (opts.Gateway.Name == _EMPTY_ || s.gatewayListener != nil), err: s.gatewayListenerErr}
39,144✔
3972
                chk["leafnode"] = info{ok: (opts.LeafNode.Port == 0 || s.leafNodeListener != nil), err: s.leafNodeListenerErr}
39,144✔
3973
                chk["websocket"] = info{ok: (opts.Websocket.Port == 0 || s.websocket.listener != nil), err: s.websocket.listenerErr}
39,144✔
3974
                chk["mqtt"] = info{ok: (opts.MQTT.Port == 0 || s.mqtt.listener != nil), err: s.mqtt.listenerErr}
39,144✔
3975
                s.mu.RUnlock()
39,144✔
3976

39,144✔
3977
                var numOK int
39,144✔
3978
                for _, inf := range chk {
274,008✔
3979
                        if inf.ok {
439,782✔
3980
                                numOK++
204,918✔
3981
                        }
204,918✔
3982
                }
3983
                if numOK == len(chk) {
49,388✔
3984
                        // In the case of DontListen option (no accept loop), we still want
10,244✔
3985
                        // to make sure that Start() has done all the work, so we wait on
10,244✔
3986
                        // that.
10,244✔
3987
                        if opts.DontListen {
10,244✔
3988
                                select {
×
3989
                                case <-s.startupComplete:
×
3990
                                case <-time.After(d):
×
3991
                                        return fmt.Errorf("failed to be ready for connections after %s: startup did not complete", d)
×
3992
                                }
3993
                        }
3994
                        return nil
10,244✔
3995
                }
3996
                if d > 25*time.Millisecond {
33,797✔
3997
                        time.Sleep(25 * time.Millisecond)
4,897✔
3998
                }
4,897✔
3999
        }
4000

4001
        failed := make([]string, 0, len(chk))
7✔
4002
        for name, inf := range chk {
49✔
4003
                if inf.ok && inf.err != nil {
42✔
4004
                        failed = append(failed, fmt.Sprintf("%s(ok, but %s)", name, inf.err))
×
4005
                }
×
4006
                if !inf.ok && inf.err == nil {
51✔
4007
                        failed = append(failed, name)
9✔
4008
                }
9✔
4009
                if !inf.ok && inf.err != nil {
42✔
4010
                        failed = append(failed, fmt.Sprintf("%s(%s)", name, inf.err))
×
4011
                }
×
4012
        }
4013

4014
        return fmt.Errorf(
7✔
4015
                "failed to be ready for connections after %s: %s",
7✔
4016
                d, strings.Join(failed, ", "),
7✔
4017
        )
7✔
4018
}
4019

4020
// ReadyForConnections returns `true` if the server is ready to accept clients
4021
// and, if routing is enabled, route connections. If after the duration
4022
// `dur` the server is still not ready, returns `false`.
4023
func (s *Server) ReadyForConnections(dur time.Duration) bool {
799✔
4024
        return s.readyForConnections(dur) == nil
799✔
4025
}
799✔
4026

4027
// Quick utility to function to tell if the server supports headers.
4028
func (s *Server) supportsHeaders() bool {
185,276✔
4029
        if s == nil {
185,276✔
4030
                return false
×
4031
        }
×
4032
        return !(s.getOpts().NoHeaderSupport)
185,276✔
4033
}
4034

4035
// ID returns the server's ID
4036
func (s *Server) ID() string {
54,445✔
4037
        return s.info.ID
54,445✔
4038
}
54,445✔
4039

4040
// NodeName returns the node name for this server.
4041
func (s *Server) NodeName() string {
218✔
4042
        return getHash(s.info.Name)
218✔
4043
}
218✔
4044

4045
// Name returns the server's name. This will be the same as the ID if it was not set.
4046
func (s *Server) Name() string {
215,406✔
4047
        return s.info.Name
215,406✔
4048
}
215,406✔
4049

4050
func (s *Server) String() string {
5,087✔
4051
        return s.info.Name
5,087✔
4052
}
5,087✔
4053

4054
type pprofLabels map[string]string
4055

4056
func setGoRoutineLabels(tags ...pprofLabels) {
358,186✔
4057
        var labels []string
358,186✔
4058
        for _, m := range tags {
408,183✔
4059
                for k, v := range m {
227,636✔
4060
                        labels = append(labels, k, v)
177,639✔
4061
                }
177,639✔
4062
        }
4063
        if len(labels) > 0 {
408,182✔
4064
                pprof.SetGoroutineLabels(
49,996✔
4065
                        pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
49,996✔
4066
                )
49,996✔
4067
        }
49,996✔
4068
}
4069

4070
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
336,941✔
4071
        var started bool
336,941✔
4072
        s.grMu.Lock()
336,941✔
4073
        defer s.grMu.Unlock()
336,941✔
4074
        if s.grRunning {
673,752✔
4075
                s.grWG.Add(1)
336,811✔
4076
                go func() {
673,622✔
4077
                        setGoRoutineLabels(tags...)
336,811✔
4078
                        f()
336,811✔
4079
                }()
336,811✔
4080
                started = true
336,811✔
4081
        }
4082
        return started
336,941✔
4083
}
4084

4085
func (s *Server) numClosedConns() int {
102✔
4086
        s.mu.RLock()
102✔
4087
        defer s.mu.RUnlock()
102✔
4088
        return s.closed.len()
102✔
4089
}
102✔
4090

4091
func (s *Server) totalClosedConns() uint64 {
43✔
4092
        s.mu.RLock()
43✔
4093
        defer s.mu.RUnlock()
43✔
4094
        return s.closed.totalConns()
43✔
4095
}
43✔
4096

4097
func (s *Server) closedClients() []*closedClient {
8✔
4098
        s.mu.RLock()
8✔
4099
        defer s.mu.RUnlock()
8✔
4100
        return s.closed.closedClients()
8✔
4101
}
8✔
4102

4103
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
4104
// port based on the server options' Host and Port. If the Host corresponds to
4105
// "any" interfaces, this call returns the list of resolved IP addresses.
4106
// If ClientAdvertise is set, returns the client advertise host and port.
4107
// The server lock is assumed held on entry.
4108
func (s *Server) getClientConnectURLs() []string {
6,702✔
4109
        // Snapshot server options.
6,702✔
4110
        opts := s.getOpts()
6,702✔
4111
        // Ignore error here since we know that if there is client advertise, the
6,702✔
4112
        // parseHostPort is correct because we did it right before calling this
6,702✔
4113
        // function in Server.New().
6,702✔
4114
        urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
6,702✔
4115
        return urls
6,702✔
4116
}
6,702✔
4117

4118
// Generic version that will return an array of URLs based on the given
4119
// advertise, host and port values.
4120
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
6,824✔
4121
        urls := make([]string, 0, 1)
6,824✔
4122

6,824✔
4123
        // short circuit if advertise is set
6,824✔
4124
        if advertise != "" {
6,827✔
4125
                h, p, err := parseHostPort(advertise, port)
3✔
4126
                if err != nil {
3✔
4127
                        return nil, err
×
4128
                }
×
4129
                urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
3✔
4130
        } else {
6,821✔
4131
                sPort := strconv.Itoa(port)
6,821✔
4132
                _, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
6,821✔
4133
                for _, ip := range ips {
8,165✔
4134
                        urls = append(urls, net.JoinHostPort(ip, sPort))
1,344✔
4135
                }
1,344✔
4136
                if err != nil || len(urls) == 0 {
12,970✔
4137
                        // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
6,149✔
4138
                        // reason we could not add any URL in the loop above.
6,149✔
4139
                        // We had a case where a Windows VM was hosed and would have err == nil
6,149✔
4140
                        // and not add any address in the array in the loop above, and we
6,149✔
4141
                        // ended-up returning 0.0.0.0, which is problematic for Windows clients.
6,149✔
4142
                        // Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
6,149✔
4143
                        if host == "0.0.0.0" || host == "::" {
6,149✔
4144
                                s.Errorf("Address %q can not be resolved properly", host)
×
4145
                        } else {
6,149✔
4146
                                urls = append(urls, net.JoinHostPort(host, sPort))
6,149✔
4147
                        }
6,149✔
4148
                }
4149
        }
4150
        return urls, nil
6,824✔
4151
}
4152

4153
// Returns an array of non local IPs if the provided host is
4154
// 0.0.0.0 or ::. It returns the first resolved if `all` is
4155
// false.
4156
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
4157
// so that if the returned array is empty caller can decide
4158
// what to do next.
4159
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
11,713✔
4160
        ip := net.ParseIP(host)
11,713✔
4161
        // If this is not an IP, we are done
11,713✔
4162
        if ip == nil {
11,736✔
4163
                return false, nil, nil
23✔
4164
        }
23✔
4165
        // If this is not 0.0.0.0 or :: we have nothing to do.
4166
        if !ip.IsUnspecified() {
22,356✔
4167
                return false, nil, nil
10,666✔
4168
        }
10,666✔
4169
        s.Debugf("Get non local IPs for %q", host)
1,024✔
4170
        var ips []string
1,024✔
4171
        ifaces, _ := net.Interfaces()
1,024✔
4172
        for _, i := range ifaces {
4,096✔
4173
                addrs, _ := i.Addrs()
3,072✔
4174
                for _, addr := range addrs {
7,840✔
4175
                        switch v := addr.(type) {
4,768✔
4176
                        case *net.IPNet:
4,768✔
4177
                                ip = v.IP
4,768✔
4178
                        case *net.IPAddr:
×
4179
                                ip = v.IP
×
4180
                        }
4181
                        ipStr := ip.String()
4,768✔
4182
                        // Skip non global unicast addresses
4,768✔
4183
                        if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
7,488✔
4184
                                ip = nil
2,720✔
4185
                                continue
2,720✔
4186
                        }
4187
                        s.Debugf("  ip=%s", ipStr)
2,048✔
4188
                        ips = append(ips, ipStr)
2,048✔
4189
                        if !all {
2,752✔
4190
                                break
704✔
4191
                        }
4192
                }
4193
        }
4194
        return true, ips, nil
1,024✔
4195
}
4196

4197
// if the ip is not specified, attempt to resolve it
4198
func resolveHostPorts(addr net.Listener) []string {
15✔
4199
        hostPorts := make([]string, 0)
15✔
4200
        hp := addr.Addr().(*net.TCPAddr)
15✔
4201
        port := strconv.Itoa(hp.Port)
15✔
4202
        if hp.IP.IsUnspecified() {
22✔
4203
                var ip net.IP
7✔
4204
                ifaces, _ := net.Interfaces()
7✔
4205
                for _, i := range ifaces {
28✔
4206
                        addrs, _ := i.Addrs()
21✔
4207
                        for _, addr := range addrs {
56✔
4208
                                switch v := addr.(type) {
35✔
4209
                                case *net.IPNet:
35✔
4210
                                        ip = v.IP
35✔
4211
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
35✔
4212
                                case *net.IPAddr:
×
4213
                                        ip = v.IP
×
4214
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
×
4215
                                default:
×
4216
                                        continue
×
4217
                                }
4218
                        }
4219
                }
4220
        } else {
8✔
4221
                hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
8✔
4222
        }
8✔
4223
        return hostPorts
15✔
4224
}
4225

4226
// format the address of a net.Listener with a protocol
4227
func formatURL(protocol string, addr net.Listener) []string {
15✔
4228
        hostports := resolveHostPorts(addr)
15✔
4229
        for i, hp := range hostports {
58✔
4230
                hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
43✔
4231
        }
43✔
4232
        return hostports
15✔
4233
}
4234

4235
// Ports describes URLs that the server can be contacted in
4236
type Ports struct {
4237
        Nats       []string `json:"nats,omitempty"`
4238
        Monitoring []string `json:"monitoring,omitempty"`
4239
        Cluster    []string `json:"cluster,omitempty"`
4240
        Profile    []string `json:"profile,omitempty"`
4241
        WebSocket  []string `json:"websocket,omitempty"`
4242
}
4243

4244
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
4245
// resolved, it returns nil. Otherwise it returns a Ports struct
4246
// describing ports where the server can be contacted
4247
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
6✔
4248
        if s.readyForListeners(maxWait) {
12✔
4249
                opts := s.getOpts()
6✔
4250

6✔
4251
                s.mu.RLock()
6✔
4252
                tls := s.info.TLSRequired
6✔
4253
                listener := s.listener
6✔
4254
                httpListener := s.http
6✔
4255
                clusterListener := s.routeListener
6✔
4256
                profileListener := s.profiler
6✔
4257
                wsListener := s.websocket.listener
6✔
4258
                wss := s.websocket.tls
6✔
4259
                s.mu.RUnlock()
6✔
4260

6✔
4261
                ports := Ports{}
6✔
4262

6✔
4263
                if listener != nil {
12✔
4264
                        natsProto := "nats"
6✔
4265
                        if tls {
7✔
4266
                                natsProto = "tls"
1✔
4267
                        }
1✔
4268
                        ports.Nats = formatURL(natsProto, listener)
6✔
4269
                }
4270

4271
                if httpListener != nil {
8✔
4272
                        monProto := "http"
2✔
4273
                        if opts.HTTPSPort != 0 {
2✔
4274
                                monProto = "https"
×
4275
                        }
×
4276
                        ports.Monitoring = formatURL(monProto, httpListener)
2✔
4277
                }
4278

4279
                if clusterListener != nil {
8✔
4280
                        clusterProto := "nats"
2✔
4281
                        if opts.Cluster.TLSConfig != nil {
2✔
4282
                                clusterProto = "tls"
×
4283
                        }
×
4284
                        ports.Cluster = formatURL(clusterProto, clusterListener)
2✔
4285
                }
4286

4287
                if profileListener != nil {
8✔
4288
                        ports.Profile = formatURL("http", profileListener)
2✔
4289
                }
2✔
4290

4291
                if wsListener != nil {
9✔
4292
                        protocol := wsSchemePrefix
3✔
4293
                        if wss {
6✔
4294
                                protocol = wsSchemePrefixTLS
3✔
4295
                        }
3✔
4296
                        ports.WebSocket = formatURL(protocol, wsListener)
3✔
4297
                }
4298

4299
                return &ports
6✔
4300
        }
4301

4302
        return nil
×
4303
}
4304

4305
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
4306
// path is used instead of the server option value
4307
func (s *Server) portFile(dirHint string) string {
6✔
4308
        dirname := s.getOpts().PortsFileDir
6✔
4309
        if dirHint != "" {
12✔
4310
                dirname = dirHint
6✔
4311
        }
6✔
4312
        if dirname == _EMPTY_ {
6✔
4313
                return _EMPTY_
×
4314
        }
×
4315
        return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
6✔
4316
}
4317

4318
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
4319
// path is used instead of the server option value
4320
func (s *Server) deletePortsFile(hintDir string) {
3✔
4321
        portsFile := s.portFile(hintDir)
3✔
4322
        if portsFile != "" {
6✔
4323
                if err := os.Remove(portsFile); err != nil {
3✔
4324
                        s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
×
4325
                }
×
4326
        }
4327
}
4328

4329
// Writes a file with a serialized Ports to the specified ports_file_dir.
4330
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
4331
// if ports file is not set, this function has no effect
4332
func (s *Server) logPorts() {
3✔
4333
        opts := s.getOpts()
3✔
4334
        portsFile := s.portFile(opts.PortsFileDir)
3✔
4335
        if portsFile != _EMPTY_ {
6✔
4336
                go func() {
6✔
4337
                        info := s.PortsInfo(5 * time.Second)
3✔
4338
                        if info == nil {
3✔
4339
                                s.Errorf("Unable to resolve the ports in the specified time")
×
4340
                                return
×
4341
                        }
×
4342
                        data, err := json.Marshal(info)
3✔
4343
                        if err != nil {
3✔
4344
                                s.Errorf("Error marshaling ports file: %v", err)
×
4345
                                return
×
4346
                        }
×
4347
                        if err := os.WriteFile(portsFile, data, 0666); err != nil {
3✔
4348
                                s.Errorf("Error writing ports file (%s): %v", portsFile, err)
×
4349
                                return
×
4350
                        }
×
4351

4352
                }()
4353
        }
4354
}
4355

4356
// waits until a calculated list of listeners is resolved or a timeout
4357
func (s *Server) readyForListeners(dur time.Duration) bool {
6✔
4358
        end := time.Now().Add(dur)
6✔
4359
        for time.Now().Before(end) {
12✔
4360
                s.mu.RLock()
6✔
4361
                listeners := s.serviceListeners()
6✔
4362
                s.mu.RUnlock()
6✔
4363
                if len(listeners) == 0 {
6✔
4364
                        return false
×
4365
                }
×
4366

4367
                ok := true
6✔
4368
                for _, l := range listeners {
21✔
4369
                        if l == nil {
15✔
4370
                                ok = false
×
4371
                                break
×
4372
                        }
4373
                }
4374
                if ok {
12✔
4375
                        return true
6✔
4376
                }
6✔
4377
                select {
×
4378
                case <-s.quitCh:
×
4379
                        return false
×
4380
                case <-time.After(25 * time.Millisecond):
×
4381
                        // continue - unable to select from quit - we are still running
4382
                }
4383
        }
4384
        return false
×
4385
}
4386

4387
// returns a list of listeners that are intended for the process
4388
// if the entry is nil, the interface is yet to be resolved
4389
func (s *Server) serviceListeners() []net.Listener {
6✔
4390
        listeners := make([]net.Listener, 0)
6✔
4391
        opts := s.getOpts()
6✔
4392
        listeners = append(listeners, s.listener)
6✔
4393
        if opts.Cluster.Port != 0 {
8✔
4394
                listeners = append(listeners, s.routeListener)
2✔
4395
        }
2✔
4396
        if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
8✔
4397
                listeners = append(listeners, s.http)
2✔
4398
        }
2✔
4399
        if opts.ProfPort != 0 {
8✔
4400
                listeners = append(listeners, s.profiler)
2✔
4401
        }
2✔
4402
        if opts.Websocket.Port != 0 {
9✔
4403
                listeners = append(listeners, s.websocket.listener)
3✔
4404
        }
3✔
4405
        return listeners
6✔
4406
}
4407

4408
// Returns true if in lame duck mode.
4409
func (s *Server) isLameDuckMode() bool {
21,257✔
4410
        s.mu.RLock()
21,257✔
4411
        defer s.mu.RUnlock()
21,257✔
4412
        return s.ldm
21,257✔
4413
}
21,257✔
4414

4415
// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
4416
// the client listener is closed, existing client connections are
4417
// kicked, Raft leaderships are transferred, JetStream is shutdown
4418
// and then finally shutdown the the NATS Server itself.
4419
// This function blocks and will not return until the NATS Server
4420
// has completed the entire shutdown operation.
4421
func (s *Server) LameDuckShutdown() {
1✔
4422
        s.lameDuckMode()
1✔
4423
}
1✔
4424

4425
// This function will close the client listener then close the clients
4426
// at some interval to avoid a reconnect storm.
4427
// We will also transfer any raft leaders and shutdown JetStream.
4428
func (s *Server) lameDuckMode() {
6✔
4429
        s.mu.Lock()
6✔
4430
        // Check if there is actually anything to do
6✔
4431
        if s.isShuttingDown() || s.ldm || s.listener == nil {
6✔
4432
                s.mu.Unlock()
×
4433
                return
×
4434
        }
×
4435
        s.Noticef("Entering lame duck mode, stop accepting new clients")
6✔
4436
        s.ldm = true
6✔
4437
        s.sendLDMShutdownEventLocked()
6✔
4438
        expected := 1
6✔
4439
        s.listener.Close()
6✔
4440
        s.listener = nil
6✔
4441
        expected += s.closeWebsocketServer()
6✔
4442
        s.ldmCh = make(chan bool, expected)
6✔
4443
        opts := s.getOpts()
6✔
4444
        gp := opts.LameDuckGracePeriod
6✔
4445
        // For tests, we want the grace period to be in some cases bigger
6✔
4446
        // than the ldm duration, so to by-pass the validateOptions() check,
6✔
4447
        // we use negative number and flip it here.
6✔
4448
        if gp < 0 {
11✔
4449
                gp *= -1
5✔
4450
        }
5✔
4451
        s.mu.Unlock()
6✔
4452

6✔
4453
        // If we are running any raftNodes transfer leaders.
6✔
4454
        if hadTransfers := s.transferRaftLeaders(); hadTransfers {
11✔
4455
                // They will transfer leadership quickly, but wait here for a second.
5✔
4456
                select {
5✔
4457
                case <-time.After(time.Second):
5✔
4458
                case <-s.quitCh:
×
4459
                        return
×
4460
                }
4461
        }
4462

4463
        // Now check and shutdown jetstream.
4464
        s.shutdownJetStream()
6✔
4465

6✔
4466
        // Now shutdown the nodes
6✔
4467
        s.shutdownRaftNodes()
6✔
4468

6✔
4469
        // Wait for accept loops to be done to make sure that no new
6✔
4470
        // client can connect
6✔
4471
        for i := 0; i < expected; i++ {
12✔
4472
                <-s.ldmCh
6✔
4473
        }
6✔
4474

4475
        s.mu.Lock()
6✔
4476
        // Need to recheck few things
6✔
4477
        if s.isShuttingDown() || len(s.clients) == 0 {
6✔
4478
                s.mu.Unlock()
×
4479
                // If there is no client, we need to call Shutdown() to complete
×
4480
                // the LDMode. If server has been shutdown while lock was released,
×
4481
                // calling Shutdown() should be no-op.
×
4482
                s.Shutdown()
×
4483
                return
×
4484
        }
×
4485
        dur := int64(opts.LameDuckDuration)
6✔
4486
        dur -= int64(gp)
6✔
4487
        if dur <= 0 {
11✔
4488
                dur = int64(time.Second)
5✔
4489
        }
5✔
4490
        numClients := int64(len(s.clients))
6✔
4491
        batch := 1
6✔
4492
        // Sleep interval between each client connection close.
6✔
4493
        var si int64
6✔
4494
        if numClients != 0 {
12✔
4495
                si = dur / numClients
6✔
4496
        }
6✔
4497
        if si < 1 {
6✔
4498
                // Should not happen (except in test with very small LD duration), but
×
4499
                // if there are too many clients, batch the number of close and
×
4500
                // use a tiny sleep interval that will result in yield likely.
×
4501
                si = 1
×
4502
                batch = int(numClients / dur)
×
4503
        } else if si > int64(time.Second) {
7✔
4504
                // Conversely, there is no need to sleep too long between clients
1✔
4505
                // and spread say 10 clients for the 2min duration. Sleeping no
1✔
4506
                // more than 1sec.
1✔
4507
                si = int64(time.Second)
1✔
4508
        }
1✔
4509

4510
        // Now capture all clients
4511
        clients := make([]*client, 0, len(s.clients))
6✔
4512
        for _, client := range s.clients {
22✔
4513
                clients = append(clients, client)
16✔
4514
        }
16✔
4515
        // Now that we know that no new client can be accepted,
4516
        // send INFO to routes and clients to notify this state.
4517
        s.sendLDMToRoutes()
6✔
4518
        s.sendLDMToClients()
6✔
4519
        s.mu.Unlock()
6✔
4520

6✔
4521
        t := time.NewTimer(gp)
6✔
4522
        // Delay start of closing of client connections in case
6✔
4523
        // we have several servers that we want to signal to enter LD mode
6✔
4524
        // and not have their client reconnect to each other.
6✔
4525
        select {
6✔
4526
        case <-t.C:
6✔
4527
                s.Noticef("Closing existing clients")
6✔
4528
        case <-s.quitCh:
×
4529
                t.Stop()
×
4530
                return
×
4531
        }
4532
        for i, client := range clients {
22✔
4533
                client.closeConnection(ServerShutdown)
16✔
4534
                if i == len(clients)-1 {
22✔
4535
                        break
6✔
4536
                }
4537
                if batch == 1 || i%batch == 0 {
20✔
4538
                        // We pick a random interval which will be at least si/2
10✔
4539
                        v := rand.Int63n(si)
10✔
4540
                        if v < si/2 {
18✔
4541
                                v = si / 2
8✔
4542
                        }
8✔
4543
                        t.Reset(time.Duration(v))
10✔
4544
                        // Sleep for given interval or bail out if kicked by Shutdown().
10✔
4545
                        select {
10✔
4546
                        case <-t.C:
10✔
4547
                        case <-s.quitCh:
×
4548
                                t.Stop()
×
4549
                                return
×
4550
                        }
4551
                }
4552
        }
4553
        s.Shutdown()
6✔
4554
        s.WaitForShutdown()
6✔
4555
}
4556

4557
// Send an INFO update to routes with the indication that this server is in LDM mode.
4558
// Server lock is held on entry.
4559
func (s *Server) sendLDMToRoutes() {
6✔
4560
        s.routeInfo.LameDuckMode = true
6✔
4561
        infoJSON := generateInfoJSON(&s.routeInfo)
6✔
4562
        s.forEachRemote(func(r *client) {
17✔
4563
                r.mu.Lock()
11✔
4564
                r.enqueueProto(infoJSON)
11✔
4565
                r.mu.Unlock()
11✔
4566
        })
11✔
4567
        // Clear now so that we notify only once, should we have to send other INFOs.
4568
        s.routeInfo.LameDuckMode = false
6✔
4569
}
4570

4571
// Send an INFO update to clients with the indication that this server is in
4572
// LDM mode and with only URLs of other nodes.
4573
// Server lock is held on entry.
4574
func (s *Server) sendLDMToClients() {
6✔
4575
        s.info.LameDuckMode = true
6✔
4576
        // Clear this so that if there are further updates, we don't send our URLs.
6✔
4577
        s.clientConnectURLs = s.clientConnectURLs[:0]
6✔
4578
        if s.websocket.connectURLs != nil {
6✔
4579
                s.websocket.connectURLs = s.websocket.connectURLs[:0]
×
4580
        }
×
4581
        // Reset content first.
4582
        s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
6✔
4583
        s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
6✔
4584
        // Only add the other nodes if we are allowed to.
6✔
4585
        if !s.getOpts().Cluster.NoAdvertise {
12✔
4586
                for url := range s.clientConnectURLsMap {
18✔
4587
                        s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
12✔
4588
                }
12✔
4589
                for url := range s.websocket.connectURLsMap {
6✔
4590
                        s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
×
4591
                }
×
4592
        }
4593
        // Send to all registered clients that support async INFO protocols.
4594
        s.sendAsyncInfoToClients(true, true)
6✔
4595
        // We now clear the info.LameDuckMode flag so that if there are
6✔
4596
        // cluster updates and we send the INFO, we don't have the boolean
6✔
4597
        // set which would cause multiple LDM notifications to clients.
6✔
4598
        s.info.LameDuckMode = false
6✔
4599
}
4600

4601
// If given error is a net.Error and is temporary, sleeps for the given
4602
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
4603
// interrupted if the server is shutdown.
4604
// An error message is displayed depending on the type of error.
4605
// Returns the new (or unchanged) delay, or a negative value if the
4606
// server has been or is being shutdown.
4607
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
16,426✔
4608
        if !s.isRunning() {
32,852✔
4609
                return -1
16,426✔
4610
        }
16,426✔
4611
        //lint:ignore SA1019 We want to retry on a bunch of errors here.
4612
        if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
×
4613
                s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
×
4614
                select {
×
4615
                case <-time.After(tmpDelay):
×
4616
                case <-s.quitCh:
×
4617
                        return -1
×
4618
                }
4619
                tmpDelay *= 2
×
4620
                if tmpDelay > ACCEPT_MAX_SLEEP {
×
4621
                        tmpDelay = ACCEPT_MAX_SLEEP
×
4622
                }
×
4623
        } else {
×
4624
                s.Errorf("%s Accept error: %v", acceptName, err)
×
4625
        }
×
4626
        return tmpDelay
×
4627
}
4628

4629
var errNoIPAvail = errors.New("no IP available")
4630

4631
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
116,922✔
4632
        host, port, err := net.SplitHostPort(url)
116,922✔
4633
        if err != nil {
116,923✔
4634
                return "", err
1✔
4635
        }
1✔
4636
        // If already an IP, skip.
4637
        if net.ParseIP(host) != nil {
233,702✔
4638
                return url, nil
116,781✔
4639
        }
116,781✔
4640
        ips, err := resolver.LookupHost(context.Background(), host)
140✔
4641
        if err != nil {
147✔
4642
                return "", fmt.Errorf("lookup for host %q: %v", host, err)
7✔
4643
        }
7✔
4644
        if len(excludedAddresses) > 0 {
157✔
4645
                for i := 0; i < len(ips); i++ {
72✔
4646
                        ip := ips[i]
48✔
4647
                        addr := net.JoinHostPort(ip, port)
48✔
4648
                        if _, excluded := excludedAddresses[addr]; excluded {
54✔
4649
                                if len(ips) == 1 {
9✔
4650
                                        ips = nil
3✔
4651
                                        break
3✔
4652
                                }
4653
                                ips[i] = ips[len(ips)-1]
3✔
4654
                                ips = ips[:len(ips)-1]
3✔
4655
                                i--
3✔
4656
                        }
4657
                }
4658
                if len(ips) == 0 {
27✔
4659
                        return "", errNoIPAvail
3✔
4660
                }
3✔
4661
        }
4662
        var address string
130✔
4663
        if len(ips) == 0 {
131✔
4664
                s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
1✔
4665
                address = url
1✔
4666
        } else {
130✔
4667
                var ip string
129✔
4668
                if len(ips) == 1 {
137✔
4669
                        ip = ips[0]
8✔
4670
                } else {
129✔
4671
                        ip = ips[rand.Int31n(int32(len(ips)))]
121✔
4672
                }
121✔
4673
                // add the port
4674
                address = net.JoinHostPort(ip, port)
129✔
4675
        }
4676
        return address, nil
130✔
4677
}
4678

4679
// Returns true for the first attempt and depending on the nature
4680
// of the attempt (first connect or a reconnect), when the number
4681
// of attempts is equal to the configured report attempts.
4682
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
67,802✔
4683
        opts := s.getOpts()
67,802✔
4684
        if firstConnect {
107,907✔
4685
                if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
49,582✔
4686
                        return true
9,477✔
4687
                }
9,477✔
4688
                return false
30,628✔
4689
        }
4690
        if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
55,394✔
4691
                return true
27,697✔
4692
        }
27,697✔
4693
        return false
×
4694
}
4695

4696
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
2,987✔
4697
        s.updateRouteSubscriptionMap(acc, sub, delta)
2,987✔
4698
        if s.gateway.enabled {
3,646✔
4699
                s.gatewayUpdateSubInterest(acc.Name, sub, delta)
659✔
4700
        }
659✔
4701

4702
        acc.updateLeafNodes(sub, delta)
2,987✔
4703
}
4704

4705
func (s *Server) startRateLimitLogExpiration() {
6,706✔
4706
        interval := time.Second
6,706✔
4707
        s.startGoRoutine(func() {
13,412✔
4708
                defer s.grWG.Done()
6,706✔
4709

6,706✔
4710
                ticker := time.NewTicker(time.Second)
6,706✔
4711
                defer ticker.Stop()
6,706✔
4712
                for {
28,272✔
4713
                        select {
21,566✔
4714
                        case <-s.quitCh:
6,699✔
4715
                                return
6,699✔
4716
                        case interval = <-s.rateLimitLoggingCh:
×
4717
                                ticker.Reset(interval)
×
4718
                        case <-ticker.C:
14,860✔
4719
                                s.rateLimitLogging.Range(func(k, v any) bool {
17,988✔
4720
                                        start := v.(time.Time)
3,128✔
4721
                                        if time.Since(start) >= interval {
4,409✔
4722
                                                s.rateLimitLogging.Delete(k)
1,281✔
4723
                                        }
1,281✔
4724
                                        return true
3,128✔
4725
                                })
4726
                        }
4727
                }
4728
        })
4729
}
4730

4731
func (s *Server) changeRateLimitLogInterval(d time.Duration) {
×
4732
        if d <= 0 {
×
4733
                return
×
4734
        }
×
4735
        select {
×
4736
        case s.rateLimitLoggingCh <- d:
×
4737
        default:
×
4738
        }
4739
}
4740

4741
// DisconnectClientByID disconnects a client by connection ID
4742
func (s *Server) DisconnectClientByID(id uint64) error {
2✔
4743
        if s == nil {
2✔
4744
                return ErrServerNotRunning
×
4745
        }
×
4746
        if client := s.getClient(id); client != nil {
3✔
4747
                client.closeConnection(Kicked)
1✔
4748
                return nil
1✔
4749
        } else if client = s.GetLeafNode(id); client != nil {
3✔
4750
                client.closeConnection(Kicked)
1✔
4751
                return nil
1✔
4752
        }
1✔
4753
        return errors.New("no such client or leafnode id")
×
4754
}
4755

4756
// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
4757
func (s *Server) LDMClientByID(id uint64) error {
1✔
4758
        if s == nil {
1✔
4759
                return ErrServerNotRunning
×
4760
        }
×
4761
        s.mu.RLock()
1✔
4762
        c := s.clients[id]
1✔
4763
        if c == nil {
1✔
4764
                s.mu.RUnlock()
×
4765
                return errors.New("no such client id")
×
4766
        }
×
4767
        info := s.copyInfo()
1✔
4768
        info.LameDuckMode = true
1✔
4769
        s.mu.RUnlock()
1✔
4770
        c.mu.Lock()
1✔
4771
        defer c.mu.Unlock()
1✔
4772
        if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
2✔
4773
                // sendInfo takes care of checking if the connection is still
1✔
4774
                // valid or not, so don't duplicate tests here.
1✔
4775
                c.Debugf("Sending Lame Duck Mode info to client")
1✔
4776
                c.enqueueProto(c.generateClientInfoJSON(info, true))
1✔
4777
                return nil
1✔
4778
        } else {
1✔
4779
                return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
×
4780
        }
×
4781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc