• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.73
/server/server.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "crypto/fips140"
20
        "crypto/tls"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math/rand"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path"
33
        "path/filepath"
34
        "regexp"
35
        "runtime"
36
        "runtime/pprof"
37
        "strconv"
38
        "strings"
39
        "sync"
40
        "sync/atomic"
41
        "time"
42

43
        // Allow dynamic profiling.
44
        _ "net/http/pprof"
45

46
        "expvar"
47

48
        "github.com/klauspost/compress/s2"
49
        "github.com/nats-io/jwt/v2"
50
        "github.com/nats-io/nats-server/v2/logger"
51
        "github.com/nats-io/nkeys"
52
        "github.com/nats-io/nuid"
53
)
54

55
const (
56
        // Interval for the first PING for non client connections.
57
        firstPingInterval = time.Second
58

59
        // This is for the first ping for client connections.
60
        firstClientPingInterval = 2 * time.Second
61
)
62

63
// These are protocol versions sent between server connections: ROUTER, LEAF and
64
// GATEWAY. We may have protocol versions that have a meaning only for a certain
65
// type of connections, but we don't have to have separate enums for that.
66
// However, it is CRITICAL to not change the order of those constants since they
67
// are exchanged between servers. When adding a new protocol version, add to the
68
// end of the list, don't try to group them by connection types.
69
const (
70
        // RouteProtoZero is the original Route protocol from 2009.
71
        // http://nats.io/documentation/internals/nats-protocol/
72
        RouteProtoZero = iota
73
        // RouteProtoInfo signals a route can receive more then the original INFO block.
74
        // This can be used to update remote cluster permissions, etc...
75
        RouteProtoInfo
76
        // RouteProtoV2 is the new route/cluster protocol that provides account support.
77
        RouteProtoV2
78
        // MsgTraceProto indicates that this server understands distributed message tracing.
79
        MsgTraceProto
80
)
81

82
// Will return the latest server-to-server protocol versions, unless the
83
// option to override it is set.
84
func (s *Server) getServerProto() int {
10,244✔
85
        opts := s.getOpts()
10,244✔
86
        // Initialize with the latest protocol version.
10,244✔
87
        proto := MsgTraceProto
10,244✔
88
        // For tests, we want to be able to make this server behave
10,244✔
89
        // as an older server so check this option to see if we should override.
10,244✔
90
        if opts.overrideProto < 0 {
10,244✔
91
                // The option overrideProto is set to 0 by default (when creating an
×
92
                // Options structure). Since this is the same value than the original
×
93
                // proto RouteProtoZero, tests call setServerProtoForTest() with the
×
94
                // desired protocol level, which sets it as negative value equal to:
×
95
                // (wantedProto + 1) * -1. Here we compute back the real value.
×
96
                proto = (opts.overrideProto * -1) - 1
×
97
        }
×
98
        return proto
10,244✔
99
}
100

101
// Used by tests.
102
func setServerProtoForTest(wantedProto int) int {
×
103
        return (wantedProto + 1) * -1
×
104
}
×
105

106
// Info is the information sent to clients, routes, gateways, and leaf nodes,
107
// to help them understand information about this server.
108
type Info struct {
109
        ID                string   `json:"server_id"`
110
        Name              string   `json:"server_name"`
111
        Version           string   `json:"version"`
112
        Proto             int      `json:"proto"`
113
        GitCommit         string   `json:"git_commit,omitempty"`
114
        GoVersion         string   `json:"go"`
115
        Host              string   `json:"host"`
116
        Port              int      `json:"port"`
117
        Headers           bool     `json:"headers"`
118
        AuthRequired      bool     `json:"auth_required,omitempty"`
119
        TLSRequired       bool     `json:"tls_required,omitempty"`
120
        TLSVerify         bool     `json:"tls_verify,omitempty"`
121
        TLSAvailable      bool     `json:"tls_available,omitempty"`
122
        MaxPayload        int32    `json:"max_payload"`
123
        JetStream         bool     `json:"jetstream,omitempty"`
124
        IP                string   `json:"ip,omitempty"`
125
        CID               uint64   `json:"client_id,omitempty"`
126
        ClientIP          string   `json:"client_ip,omitempty"`
127
        Nonce             string   `json:"nonce,omitempty"`
128
        Cluster           string   `json:"cluster,omitempty"`
129
        Dynamic           bool     `json:"cluster_dynamic,omitempty"`
130
        Domain            string   `json:"domain,omitempty"`
131
        ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
132
        WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
133
        LameDuckMode      bool     `json:"ldm,omitempty"`
134
        Compression       string   `json:"compression,omitempty"`
135
        ConnectInfo       bool     `json:"connect_info,omitempty"`   // When true this is the server INFO response to CONNECT
136
        RemoteAccount     string   `json:"remote_account,omitempty"` // Lets the client or leafnode side know the remote account that they bind to.
137
        IsSystemAccount   bool     `json:"acc_is_sys,omitempty"`     // Indicates if the account is a system account.
138
        JSApiLevel        int      `json:"api_lvl,omitempty"`
139

140
        // Route Specific
141
        Import        *SubjectPermission `json:"import,omitempty"`
142
        Export        *SubjectPermission `json:"export,omitempty"`
143
        LNOC          bool               `json:"lnoc,omitempty"`
144
        LNOCU         bool               `json:"lnocu,omitempty"`
145
        InfoOnConnect bool               `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
146
        RoutePoolSize int                `json:"route_pool_size,omitempty"`
147
        RoutePoolIdx  int                `json:"route_pool_idx,omitempty"`
148
        RouteAccount  string             `json:"route_account,omitempty"`
149
        RouteAccReqID string             `json:"route_acc_add_reqid,omitempty"`
150
        GossipMode    byte               `json:"gossip_mode,omitempty"`
151

152
        // Gateways Specific
153
        Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
154
        GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
155
        GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
156
        GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
157
        GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
158
        GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies
159
        GatewayIOM        bool     `json:"gateway_iom,omitempty"`         // Indicate that all accounts will be switched to InterestOnly mode "right away"
160

161
        // LeafNode Specific
162
        LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
163

164
        XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
165
}
166

167
// Server is our main struct.
168
type Server struct {
169
        // Fields accessed with atomic operations need to be 64-bit aligned
170
        gcid uint64
171
        // How often user logon fails due to the issuer account not being pinned.
172
        pinnedAccFail uint64
173
        stats
174
        scStats
175
        staleStats
176
        mu                  sync.RWMutex
177
        reloadMu            sync.RWMutex // Write-locked when a config reload is taking place ONLY
178
        kp                  nkeys.KeyPair
179
        xkp                 nkeys.KeyPair
180
        xpub                string
181
        info                Info
182
        configFile          string
183
        optsMu              sync.RWMutex
184
        opts                *Options
185
        running             atomic.Bool
186
        shutdown            atomic.Bool
187
        listener            net.Listener
188
        listenerErr         error
189
        gacc                *Account
190
        sys                 *internal
191
        sysAcc              atomic.Pointer[Account]
192
        js                  atomic.Pointer[jetStream]
193
        isMetaLeader        atomic.Bool
194
        jsClustered         atomic.Bool
195
        accounts            sync.Map
196
        tmpAccounts         sync.Map // Temporarily stores accounts that are being built
197
        activeAccounts      int32
198
        accResolver         AccountResolver
199
        clients             map[uint64]*client
200
        routes              map[string][]*client
201
        remoteRoutePoolSize map[string]int                // Map for remote's configure route pool size
202
        routesPoolSize      int                           // Configured pool size
203
        routesReject        bool                          // During reload, we may want to reject adding routes until some conditions are met
204
        routesNoPool        int                           // Number of routes that don't use pooling (connecting to older server for instance)
205
        accRoutes           map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
206
        accRouteByHash      sync.Map                      // Key is account name, value is nil or a pool index
207
        accAddedCh          chan struct{}
208
        accAddedReqID       string
209
        leafs               map[uint64]*client
210
        users               map[string]*User
211
        nkeys               map[string]*NkeyUser
212
        totalClients        uint64
213
        closed              *closedRingBuffer
214
        done                chan bool
215
        start               time.Time
216
        http                net.Listener
217
        httpHandler         http.Handler
218
        httpBasePath        string
219
        profiler            net.Listener
220
        httpReqStats        map[string]uint64
221
        routeListener       net.Listener
222
        routeListenerErr    error
223
        routeInfo           Info
224
        routeResolver       netResolver
225
        routesToSelf        map[string]struct{}
226
        routeTLSName        string
227
        leafNodeListener    net.Listener
228
        leafNodeListenerErr error
229
        leafNodeInfo        Info
230
        leafNodeInfoJSON    []byte
231
        leafURLsMap         refCountedUrlSet
232
        leafNodeOpts        struct {
233
                resolver    netResolver
234
                dialTimeout time.Duration
235
        }
236
        leafRemoteCfgs     map[*leafNodeCfg]struct{}
237
        rmLeafRemoteCfgs   map[string]*leafNodeCfg
238
        leafRemoteAccounts sync.Map
239
        leafNodeEnabled    bool
240
        leafDisableConnect bool // Used in test only
241
        leafNoCluster      bool // Indicate that this server has only remotes and no cluster defined
242

243
        quitCh           chan struct{}
244
        startupComplete  chan struct{}
245
        shutdownComplete chan struct{}
246

247
        // Tracking Go routines
248
        grMu         sync.Mutex
249
        grTmpClients map[uint64]*client
250
        grRunning    bool
251
        grWG         sync.WaitGroup // to wait on various go routines
252

253
        cproto     int64     // number of clients supporting async INFO
254
        configTime time.Time // last time config was loaded
255

256
        logging struct {
257
                sync.RWMutex
258
                logger      Logger
259
                trace       int32
260
                debug       int32
261
                traceSysAcc int32
262
        }
263

264
        clientConnectURLs []string
265

266
        // Used internally for quick look-ups.
267
        clientConnectURLsMap refCountedUrlSet
268

269
        // For Gateways
270
        gatewayListener    net.Listener // Accept listener
271
        gatewayListenerErr error
272
        gateway            *srvGateway
273

274
        // Used by tests to check that http.Servers do
275
        // not set any timeout.
276
        monitoringServer *http.Server
277
        profilingServer  *http.Server
278

279
        // LameDuck mode
280
        ldm   bool
281
        ldmCh chan bool
282

283
        // Trusted public operator keys.
284
        trustedKeys []string
285
        // map of trusted keys to operator setting StrictSigningKeyUsage
286
        strictSigningKeyUsage map[string]struct{}
287

288
        // We use this to minimize mem copies for requests to monitoring
289
        // endpoint /varz (when it comes from http).
290
        varzMu sync.Mutex
291
        varz   *Varz
292
        // This is set during a config reload if we detect that we have
293
        // added/removed routes. The monitoring code then check that
294
        // to know if it should update the cluster's URLs array.
295
        varzUpdateRouteURLs bool
296

297
        // Keeps a sublist of of subscriptions attached to leafnode connections
298
        // for the $GNR.*.*.*.> subject so that a server can send back a mapped
299
        // gateway reply.
300
        gwLeafSubs *Sublist
301

302
        // Used for expiration of mapped GW replies
303
        gwrm struct {
304
                w  int32
305
                ch chan time.Duration
306
                m  sync.Map
307
        }
308

309
        // For eventIDs
310
        eventIds *nuid.NUID
311

312
        // Websocket structure
313
        websocket srvWebsocket
314

315
        // MQTT structure
316
        mqtt srvMQTT
317

318
        // OCSP monitoring
319
        ocsps []*OCSPMonitor
320

321
        // OCSP peer verification (at least one TLS block)
322
        ocspPeerVerify bool
323

324
        // OCSP response cache
325
        ocsprc OCSPResponseCache
326

327
        // exporting account name the importer experienced issues with
328
        incompleteAccExporterMap sync.Map
329

330
        // Holds cluster name under different lock for mapping
331
        cnMu sync.RWMutex
332
        cn   string
333

334
        // For registering raft nodes with the server.
335
        rnMu      sync.RWMutex
336
        raftNodes map[string]RaftNode
337

338
        // For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
339
        nodeToInfo sync.Map
340

341
        // For out of resources to not log errors too fast.
342
        rerrMu   sync.Mutex
343
        rerrLast time.Time
344

345
        connRateCounter *rateCounter
346

347
        // If there is a system account configured, to still support the $G account,
348
        // the server will create a fake user and add it to the list of users.
349
        // Keep track of what that user name is for config reload purposes.
350
        sysAccOnlyNoAuthUser string
351

352
        // IPQueues map
353
        ipQueues sync.Map
354

355
        // To limit logging frequency
356
        rateLimitLogging   sync.Map
357
        rateLimitLoggingCh chan time.Duration
358

359
        // Total outstanding catchup bytes in flight.
360
        gcbMu     sync.RWMutex
361
        gcbOut    int64
362
        gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
363
        // A global chanel to kick out stalled catchup sequences.
364
        gcbKick chan struct{}
365

366
        // Total outbound syncRequests
367
        syncOutSem chan struct{}
368

369
        // Queue to process JS API requests that come from routes (or gateways)
370
        jsAPIRoutedReqs     *ipQueue[*jsAPIRoutedReq]
371
        jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq]
372

373
        // Delayed API responses.
374
        delayedAPIResponses *ipQueue[*delayedAPIResponse]
375

376
        // Whether moving NRG traffic into accounts is permitted on this server.
377
        // Controls whether or not the account NRG capability is set in statsz.
378
        // Currently used by unit tests to simulate nodes not supporting account NRG.
379
        accountNRGAllowed atomic.Bool
380

381
        // List of proxies trusted keys in `KeyPair` form so we can do signature
382
        // verification when processing incoming proxy connections.
383
        proxiesKeyPairs []nkeys.KeyPair
384
        proxiedConns    map[string]map[uint64]*client
385
}
386

387
// For tracking JS nodes.
388
type nodeInfo struct {
389
        name            string
390
        version         string
391
        cluster         string
392
        domain          string
393
        id              string
394
        tags            jwt.TagList
395
        cfg             *JetStreamConfig
396
        stats           *JetStreamStats
397
        offline         bool
398
        js              bool
399
        binarySnapshots bool
400
        accountNRG      bool
401
}
402

403
type stats struct {
404
        inMsgs           int64
405
        outMsgs          int64
406
        inBytes          int64
407
        outBytes         int64
408
        slowConsumers    int64
409
        staleConnections int64
410
        stalls           int64
411
}
412

413
// scStats includes the total and per connection counters of Slow Consumers.
414
type scStats struct {
415
        clients  atomic.Uint64
416
        routes   atomic.Uint64
417
        leafs    atomic.Uint64
418
        gateways atomic.Uint64
419
}
420

421
// staleStats includes the total and per connection counters of Stale Connections.
422
type staleStats struct {
423
        clients  atomic.Uint64
424
        routes   atomic.Uint64
425
        leafs    atomic.Uint64
426
        gateways atomic.Uint64
427
}
428

429
// This is used by tests so we can run all server tests with a default route
430
// or leafnode compression mode. For instance:
431
// go test -race -v ./server -cluster_compression=fast
432
var (
433
        testDefaultClusterCompression  string
434
        testDefaultLeafNodeCompression string
435
)
436

437
// Compression modes.
438
const (
439
        CompressionNotSupported   = "not supported"
440
        CompressionOff            = "off"
441
        CompressionAccept         = "accept"
442
        CompressionS2Auto         = "s2_auto"
443
        CompressionS2Uncompressed = "s2_uncompressed"
444
        CompressionS2Fast         = "s2_fast"
445
        CompressionS2Better       = "s2_better"
446
        CompressionS2Best         = "s2_best"
447
)
448

449
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
450
// the CompressionS2Auto mode.
451
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
452
        // [0..10ms] -> CompressionS2Uncompressed
453
        10 * time.Millisecond,
454
        // ]10ms..50ms] -> CompressionS2Fast
455
        50 * time.Millisecond,
456
        // ]50ms..100ms] -> CompressionS2Better
457
        100 * time.Millisecond,
458
        // ]100ms..] -> CompressionS2Best
459
}
460

461
// For a given user provided string, matches to one of the compression mode
462
// constant and updates the provided string to that constant. Returns an
463
// error if the provided compression mode is not known.
464
// The parameter `chosenModeForOn` indicates which compression mode to use
465
// when the user selects "on" (or enabled, true, etc..). This is because
466
// we may have different defaults depending on where the compression is used.
467
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
10,891✔
468
        if c == nil {
10,891✔
469
                return nil
×
470
        }
×
471
        cmtl := strings.ToLower(c.Mode)
10,891✔
472
        // First, check for the "on" case so that we set to the default compression
10,891✔
473
        // mode for that. The other switch/case will finish setup if needed (for
10,891✔
474
        // instance if the default mode is s2Auto).
10,891✔
475
        switch cmtl {
10,891✔
476
        case "on", "enabled", "true":
14✔
477
                cmtl = chosenModeForOn
14✔
478
        default:
10,877✔
479
        }
480
        // Check (again) with the proper mode.
481
        switch cmtl {
10,891✔
482
        case "not supported", "not_supported":
2✔
483
                c.Mode = CompressionNotSupported
2✔
484
        case "disabled", "off", "false":
1,776✔
485
                c.Mode = CompressionOff
1,776✔
486
        case "accept":
4,111✔
487
                c.Mode = CompressionAccept
4,111✔
488
        case "auto", "s2_auto":
4,921✔
489
                var rtts []time.Duration
4,921✔
490
                if len(c.RTTThresholds) == 0 {
8,996✔
491
                        rtts = defaultCompressionS2AutoRTTThresholds
4,075✔
492
                } else {
4,921✔
493
                        for _, n := range c.RTTThresholds {
3,394✔
494
                                // Do not error on negative, but simply set to 0
2,548✔
495
                                if n < 0 {
2,552✔
496
                                        n = 0
4✔
497
                                }
4✔
498
                                // Make sure they are properly ordered. However, it is possible
499
                                // to have a "0" anywhere in the list to indicate that this
500
                                // compression level should not be used.
501
                                if l := len(rtts); l > 0 && n != 0 {
4,220✔
502
                                        for _, v := range rtts {
4,202✔
503
                                                if n < v {
2,532✔
504
                                                        return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
2✔
505
                                                }
2✔
506
                                        }
507
                                }
508
                                rtts = append(rtts, n)
2,546✔
509
                        }
510
                        if len(rtts) > 0 {
1,688✔
511
                                // Trim 0 that are at the end.
844✔
512
                                stop := -1
844✔
513
                                for i := len(rtts) - 1; i >= 0; i-- {
1,702✔
514
                                        if rtts[i] != 0 {
1,698✔
515
                                                stop = i
840✔
516
                                                break
840✔
517
                                        }
518
                                }
519
                                rtts = rtts[:stop+1]
844✔
520
                        }
521
                        if len(rtts) > 4 {
846✔
522
                                // There should be at most values for "uncompressed", "fast",
2✔
523
                                // "better" and "best" (when some 0 are present).
2✔
524
                                return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
2✔
525
                        } else if len(rtts) == 0 {
848✔
526
                                // But there should be at least 1 if the user provided the slice.
4✔
527
                                // We would be here only if it was provided by say with values
4✔
528
                                // being a single or all zeros.
4✔
529
                                return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
4✔
530
                        }
4✔
531
                }
532
                c.Mode = CompressionS2Auto
4,913✔
533
                c.RTTThresholds = rtts
4,913✔
534
        case "fast", "s2_fast":
25✔
535
                c.Mode = CompressionS2Fast
25✔
536
        case "better", "s2_better":
27✔
537
                c.Mode = CompressionS2Better
27✔
538
        case "best", "s2_best":
27✔
539
                c.Mode = CompressionS2Best
27✔
540
        default:
2✔
541
                return fmt.Errorf("unsupported compression mode %q", c.Mode)
2✔
542
        }
543
        return nil
10,881✔
544
}
545

546
// Returns `true` if the compression mode `m` indicates that the server
547
// will negotiate compression with the remote server, `false` otherwise.
548
// Note that the provided compression mode is assumed to have been
549
// normalized and validated.
550
func needsCompression(m string) bool {
174,270✔
551
        return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
174,270✔
552
}
174,270✔
553

554
// Compression is asymmetric, meaning that one side can have a different
555
// compression level than the other. However, we need to check for cases
556
// when this server `scm` or the remote `rcm` do not support compression
557
// (say older server, or test to make it behave as it is not), or have
558
// the compression off.
559
// Note that `scm` is assumed to not be "off" or "not supported".
560
func selectCompressionMode(scm, rcm string) (mode string, err error) {
42,036✔
561
        if rcm == CompressionNotSupported || rcm == _EMPTY_ {
42,037✔
562
                return CompressionNotSupported, nil
1✔
563
        }
1✔
564
        switch rcm {
42,035✔
565
        case CompressionOff:
16,713✔
566
                // If the remote explicitly disables compression, then we won't
16,713✔
567
                // use compression.
16,713✔
568
                return CompressionOff, nil
16,713✔
569
        case CompressionAccept:
24,178✔
570
                // If the remote is ok with compression (but is not initiating it),
24,178✔
571
                // and if we too are in this mode, then it means no compression.
24,178✔
572
                if scm == CompressionAccept {
48,353✔
573
                        return CompressionOff, nil
24,175✔
574
                }
24,175✔
575
                // Otherwise use our compression mode.
576
                return scm, nil
3✔
577
        case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
1,144✔
578
                // This case is here to make sure that if we don't recognize a
1,144✔
579
                // compression setting, we error out.
1,144✔
580
                if scm == CompressionAccept {
1,150✔
581
                        // If our compression mode is "accept", then we will use the remote
6✔
582
                        // compression mode, except if it is "auto", in which case we will
6✔
583
                        // default to "fast". This is not a configuration (auto in one
6✔
584
                        // side and accept in the other) that would be recommended.
6✔
585
                        if rcm == CompressionS2Auto {
8✔
586
                                return CompressionS2Fast, nil
2✔
587
                        }
2✔
588
                        // Use their compression mode.
589
                        return rcm, nil
4✔
590
                }
591
                // Otherwise use our compression mode.
592
                return scm, nil
1,138✔
593
        default:
×
594
                return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
×
595
        }
596
}
597

598
// If the configured compression mode is "auto" then will return that,
599
// otherwise will return the given `cm` compression mode.
600
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
17,771✔
601
        if co.Mode == CompressionS2Auto {
18,886✔
602
                return CompressionS2Auto
1,115✔
603
        }
1,115✔
604
        return cm
16,656✔
605
}
606

607
// Given a connection RTT and a list of thresholds durations, this
608
// function will return an S2 compression level such as "uncompressed",
609
// "fast", "better" or "best". For instance, with the following slice:
610
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
611
// in the compression level "uncompressed", ]5ms..10ms] will result in
612
// "fast" compression, etc..
613
// However, the 0 value allows for disabling of some compression levels.
614
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
615
// [0..20ms] would result in the "better" compression - effectively disabling
616
// the use of "uncompressed" and "fast", then anything above 20ms would
617
// result in the use of "best" level (the 30 in the list has no effect
618
// and the list could have been simplified to [0, 0, 20]).
619
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
1,412✔
620
        var idx int
1,412✔
621
        var found bool
1,412✔
622
        for i, d := range rttThresholds {
2,862✔
623
                if rtt <= d {
2,846✔
624
                        idx = i
1,396✔
625
                        found = true
1,396✔
626
                        break
1,396✔
627
                }
628
        }
629
        if !found {
1,428✔
630
                // If we did not find but we have all levels, then use "best",
16✔
631
                // otherwise use the last one in array.
16✔
632
                if l := len(rttThresholds); l >= 3 {
32✔
633
                        idx = 3
16✔
634
                } else {
16✔
635
                        idx = l - 1
×
636
                }
×
637
        }
638
        switch idx {
1,412✔
639
        case 0:
1,392✔
640
                return CompressionS2Uncompressed
1,392✔
641
        case 1:
2✔
642
                return CompressionS2Fast
2✔
643
        case 2:
2✔
644
                return CompressionS2Better
2✔
645
        }
646
        return CompressionS2Best
16✔
647
}
648

649
// Returns an array of s2 WriterOption based on the route compression mode.
650
// So far we return a single option, but this way we can call s2.NewWriter()
651
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
652
// this is more versatile.
653
func s2WriterOptions(cm string) []s2.WriterOption {
1,159✔
654
        _opts := [2]s2.WriterOption{}
1,159✔
655
        opts := append(
1,159✔
656
                _opts[:0],
1,159✔
657
                s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
1,159✔
658
        )
1,159✔
659
        switch cm {
1,159✔
660
        case CompressionS2Uncompressed:
1,107✔
661
                return append(opts, s2.WriterUncompressed())
1,107✔
662
        case CompressionS2Best:
27✔
663
                return append(opts, s2.WriterBestCompression())
27✔
664
        case CompressionS2Better:
13✔
665
                return append(opts, s2.WriterBetterCompression())
13✔
666
        default:
12✔
667
                return nil
12✔
668
        }
669
}
670

671
// New will setup a new server struct after parsing the options.
672
// DEPRECATED: Use NewServer(opts)
673
func New(opts *Options) *Server {
191✔
674
        s, _ := NewServer(opts)
191✔
675
        return s
191✔
676
}
191✔
677

678
func NewServerFromConfig(opts *Options) (*Server, error) {
×
679
        if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
×
680
                if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
×
681
                        return nil, err
×
682
                }
×
683
        }
684
        return NewServer(opts)
×
685
}
686

687
// NewServer will setup a new server struct after parsing the options.
688
// Could return an error if options can not be validated.
689
// The provided Options type should not be re-used afterwards.
690
// Either use Options.Clone() to pass a copy, or make a new one.
691
func NewServer(opts *Options) (*Server, error) {
6,755✔
692
        setBaselineOptions(opts)
6,755✔
693

6,755✔
694
        // Process TLS options, including whether we require client certificates.
6,755✔
695
        tlsReq := opts.TLSConfig != nil
6,755✔
696
        verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
6,755✔
697

6,755✔
698
        // Create our server's nkey identity.
6,755✔
699
        kp, _ := nkeys.CreateServer()
6,755✔
700
        pub, _ := kp.PublicKey()
6,755✔
701

6,755✔
702
        // Create an xkey for encrypting messages from this server.
6,755✔
703
        var xkp nkeys.KeyPair
6,755✔
704
        var xpub string
6,755✔
705
        if !fips140.Enabled() {
13,510✔
706
                xkp, _ = nkeys.CreateCurveKeys()
6,755✔
707
                xpub, _ = xkp.PublicKey()
6,755✔
708
        }
6,755✔
709

710
        serverName := pub
6,755✔
711
        if opts.ServerName != _EMPTY_ {
11,084✔
712
                serverName = opts.ServerName
4,329✔
713
        }
4,329✔
714

715
        httpBasePath := normalizeBasePath(opts.HTTPBasePath)
6,755✔
716

6,755✔
717
        // Validate some options. This is here because we cannot assume that
6,755✔
718
        // server will always be started with configuration parsing (that could
6,755✔
719
        // report issues). Its options can be (incorrectly) set by hand when
6,755✔
720
        // server is embedded. If there is an error, return nil.
6,755✔
721
        if err := validateOptions(opts); err != nil {
6,805✔
722
                return nil, err
50✔
723
        }
50✔
724

725
        info := Info{
6,705✔
726
                ID:           pub,
6,705✔
727
                XKey:         xpub,
6,705✔
728
                Version:      VERSION,
6,705✔
729
                Proto:        PROTO,
6,705✔
730
                GitCommit:    gitCommit,
6,705✔
731
                GoVersion:    runtime.Version(),
6,705✔
732
                Name:         serverName,
6,705✔
733
                Host:         opts.Host,
6,705✔
734
                Port:         opts.Port,
6,705✔
735
                AuthRequired: false,
6,705✔
736
                TLSRequired:  tlsReq && !opts.AllowNonTLS,
6,705✔
737
                TLSVerify:    verify,
6,705✔
738
                MaxPayload:   opts.MaxPayload,
6,705✔
739
                JetStream:    opts.JetStream,
6,705✔
740
                Headers:      !opts.NoHeaderSupport,
6,705✔
741
                Cluster:      opts.Cluster.Name,
6,705✔
742
                Domain:       opts.JetStreamDomain,
6,705✔
743
                JSApiLevel:   JSApiLevel,
6,705✔
744
        }
6,705✔
745

6,705✔
746
        if tlsReq && !info.TLSRequired {
6,708✔
747
                info.TLSAvailable = true
3✔
748
        }
3✔
749

750
        now := time.Now()
6,705✔
751

6,705✔
752
        s := &Server{
6,705✔
753
                kp:                 kp,
6,705✔
754
                xkp:                xkp,
6,705✔
755
                xpub:               xpub,
6,705✔
756
                configFile:         opts.ConfigFile,
6,705✔
757
                info:               info,
6,705✔
758
                opts:               opts,
6,705✔
759
                done:               make(chan bool, 1),
6,705✔
760
                start:              now,
6,705✔
761
                configTime:         now,
6,705✔
762
                gwLeafSubs:         NewSublistWithCache(),
6,705✔
763
                httpBasePath:       httpBasePath,
6,705✔
764
                eventIds:           nuid.New(),
6,705✔
765
                routesToSelf:       make(map[string]struct{}),
6,705✔
766
                httpReqStats:       make(map[string]uint64), // Used to track HTTP requests
6,705✔
767
                rateLimitLoggingCh: make(chan time.Duration, 1),
6,705✔
768
                leafNodeEnabled:    opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
6,705✔
769
                syncOutSem:         make(chan struct{}, maxConcurrentSyncRequests),
6,705✔
770
        }
6,705✔
771

6,705✔
772
        // Delayed API response queue. Create regardless if JetStream is configured
6,705✔
773
        // or not (since it can be enabled/disabled with config reload, we want this
6,705✔
774
        // queue to exist at all times).
6,705✔
775
        s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
6,705✔
776

6,705✔
777
        // By default we'll allow account NRG.
6,705✔
778
        s.accountNRGAllowed.Store(true)
6,705✔
779

6,705✔
780
        // Fill up the maximum in flight syncRequests for this server.
6,705✔
781
        // Used in JetStream catchup semantics.
6,705✔
782
        for i := 0; i < maxConcurrentSyncRequests; i++ {
221,265✔
783
                s.syncOutSem <- struct{}{}
214,560✔
784
        }
214,560✔
785

786
        if opts.TLSRateLimit > 0 {
6,706✔
787
                s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
1✔
788
        }
1✔
789

790
        // Trusted root operator keys.
791
        if !s.processTrustedKeys() {
6,705✔
792
                return nil, fmt.Errorf("Error processing trusted operator keys")
×
793
        }
×
794

795
        // If we have solicited leafnodes but no clustering and no clustername.
796
        // However we may need a stable clustername so use the server name.
797
        if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
7,699✔
798
                s.leafNoCluster = true
994✔
799
                opts.Cluster.Name = opts.ServerName
994✔
800
        }
994✔
801

802
        if opts.Cluster.Name != _EMPTY_ {
11,547✔
803
                // Also place into mapping cn with cnMu lock.
4,842✔
804
                s.cnMu.Lock()
4,842✔
805
                s.cn = opts.Cluster.Name
4,842✔
806
                s.cnMu.Unlock()
4,842✔
807
        }
4,842✔
808

809
        s.mu.Lock()
6,705✔
810
        defer s.mu.Unlock()
6,705✔
811

6,705✔
812
        // If there are proxies trusted public keys in the configuration
6,705✔
813
        // this will fill create the corresponding list of nkeys.KeyPair
6,705✔
814
        // that we can use for signature verification.
6,705✔
815
        s.processProxiesTrustedKeys()
6,705✔
816

6,705✔
817
        // Place ourselves in the JetStream nodeInfo if needed.
6,705✔
818
        if opts.JetStream {
10,974✔
819
                ourNode := getHash(serverName)
4,269✔
820
                s.nodeToInfo.Store(ourNode, nodeInfo{
4,269✔
821
                        name:            serverName,
4,269✔
822
                        version:         VERSION,
4,269✔
823
                        cluster:         opts.Cluster.Name,
4,269✔
824
                        domain:          opts.JetStreamDomain,
4,269✔
825
                        id:              info.ID,
4,269✔
826
                        tags:            opts.Tags,
4,269✔
827
                        cfg:             &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
4,269✔
828
                        stats:           nil,
4,269✔
829
                        offline:         false,
4,269✔
830
                        js:              true,
4,269✔
831
                        binarySnapshots: true,
4,269✔
832
                        accountNRG:      true,
4,269✔
833
                })
4,269✔
834
        }
4,269✔
835

836
        s.routeResolver = opts.Cluster.resolver
6,705✔
837
        if s.routeResolver == nil {
13,410✔
838
                s.routeResolver = net.DefaultResolver
6,705✔
839
        }
6,705✔
840

841
        // Used internally for quick look-ups.
842
        s.clientConnectURLsMap = make(refCountedUrlSet)
6,705✔
843
        s.websocket.connectURLsMap = make(refCountedUrlSet)
6,705✔
844
        s.leafURLsMap = make(refCountedUrlSet)
6,705✔
845

6,705✔
846
        // Ensure that non-exported options (used in tests) are properly set.
6,705✔
847
        s.setLeafNodeNonExportedOptions()
6,705✔
848

6,705✔
849
        // Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
6,705✔
850
        // are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
6,705✔
851
        if err := s.enableOCSP(); err != nil {
6,707✔
852
                return nil, err
2✔
853
        }
2✔
854

855
        // Call this even if there is no gateway defined. It will
856
        // initialize the structure so we don't have to check for
857
        // it to be nil or not in various places in the code.
858
        if err := s.newGateway(opts); err != nil {
6,703✔
859
                return nil, err
×
860
        }
×
861

862
        // If we have a cluster definition but do not have a cluster name, create one.
863
        if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
6,713✔
864
                s.info.Cluster = nuid.Next()
10✔
865
        } else if opts.Cluster.Name != _EMPTY_ {
11,545✔
866
                // Likewise here if we have a cluster name set.
4,842✔
867
                s.info.Cluster = opts.Cluster.Name
4,842✔
868
        }
4,842✔
869

870
        // This is normally done in the AcceptLoop, once the
871
        // listener has been created (possibly with random port),
872
        // but since some tests may expect the INFO to be properly
873
        // set after New(), let's do it now.
874
        s.setInfoHostPort()
6,703✔
875

6,703✔
876
        // For tracking clients
6,703✔
877
        s.clients = make(map[uint64]*client)
6,703✔
878

6,703✔
879
        // For tracking closed clients.
6,703✔
880
        s.closed = newClosedRingBuffer(opts.MaxClosedClients)
6,703✔
881

6,703✔
882
        // For tracking connections that are not yet registered
6,703✔
883
        // in s.routes, but for which readLoop has started.
6,703✔
884
        s.grTmpClients = make(map[uint64]*client)
6,703✔
885

6,703✔
886
        // For tracking routes and their remote ids
6,703✔
887
        s.initRouteStructures(opts)
6,703✔
888

6,703✔
889
        // For tracking leaf nodes.
6,703✔
890
        s.leafs = make(map[uint64]*client)
6,703✔
891

6,703✔
892
        // Used to kick out all go routines possibly waiting on server
6,703✔
893
        // to shutdown.
6,703✔
894
        s.quitCh = make(chan struct{})
6,703✔
895

6,703✔
896
        // Closed when startup is complete. ReadyForConnections() will block on
6,703✔
897
        // this before checking the presence of listening sockets.
6,703✔
898
        s.startupComplete = make(chan struct{})
6,703✔
899

6,703✔
900
        // Closed when Shutdown() is complete. Allows WaitForShutdown() to block
6,703✔
901
        // waiting for complete shutdown.
6,703✔
902
        s.shutdownComplete = make(chan struct{})
6,703✔
903

6,703✔
904
        // Check for configured account resolvers.
6,703✔
905
        if err := s.configureResolver(); err != nil {
6,703✔
906
                return nil, err
×
907
        }
×
908
        // If there is an URL account resolver, do basic test to see if anyone is home.
909
        if ar := opts.AccountResolver; ar != nil {
7,063✔
910
                if ur, ok := ar.(*URLAccResolver); ok {
399✔
911
                        if _, err := ur.Fetch(_EMPTY_); err != nil {
40✔
912
                                return nil, err
1✔
913
                        }
1✔
914
                }
915
        }
916
        // For other resolver:
917
        // In operator mode, when the account resolver depends on an external system and
918
        // the system account can't be fetched, inject a temporary one.
919
        if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
6,702✔
920
                opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
6,963✔
921
                if _, ok := ar.(*MemAccResolver); !ok {
413✔
922
                        s.mu.Unlock()
152✔
923
                        var a *Account
152✔
924
                        // perform direct lookup to avoid warning trace
152✔
925
                        if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
240✔
926
                                a, _ = s.lookupAccount(opts.SystemAccount)
88✔
927
                        }
88✔
928
                        s.mu.Lock()
152✔
929
                        if a == nil {
216✔
930
                                sac := NewAccount(opts.SystemAccount)
64✔
931
                                sac.Issuer = opts.TrustedOperators[0].Issuer
64✔
932
                                sac.signingKeys = map[string]jwt.Scope{}
64✔
933
                                sac.signingKeys[opts.SystemAccount] = nil
64✔
934
                                s.registerAccountNoLock(sac)
64✔
935
                        }
64✔
936
                }
937
        }
938

939
        // For tracking accounts
940
        if _, err := s.configureAccounts(false); err != nil {
6,702✔
941
                return nil, err
×
942
        }
×
943

944
        // Used to setup Authorization.
945
        s.configureAuthorization()
6,702✔
946

6,702✔
947
        // Start signal handler
6,702✔
948
        s.handleSignals()
6,702✔
949

6,702✔
950
        return s, nil
6,702✔
951
}
952

953
// Initializes route structures based on pooling and/or per-account routes.
954
//
955
// Server lock is held on entry
956
func (s *Server) initRouteStructures(opts *Options) {
6,703✔
957
        s.routes = make(map[string][]*client)
6,703✔
958
        if ps := opts.Cluster.PoolSize; ps > 0 {
10,784✔
959
                s.routesPoolSize = ps
4,081✔
960
        } else {
6,703✔
961
                s.routesPoolSize = 1
2,622✔
962
        }
2,622✔
963
        // If we have per-account routes, we create accRoutes and initialize it
964
        // with nil values. The presence of an account as the key will allow us
965
        // to know if a given account is supposed to have dedicated routes.
966
        if l := len(opts.Cluster.PinnedAccounts); l > 0 {
10,501✔
967
                s.accRoutes = make(map[string]map[string]*client, l)
3,798✔
968
                for _, acc := range opts.Cluster.PinnedAccounts {
7,608✔
969
                        s.accRoutes[acc] = make(map[string]*client)
3,810✔
970
                }
3,810✔
971
        }
972
}
973

974
func (s *Server) logRejectedTLSConns() {
1✔
975
        defer s.grWG.Done()
1✔
976
        t := time.NewTicker(time.Second)
1✔
977
        defer t.Stop()
1✔
978
        for {
3✔
979
                select {
2✔
980
                case <-s.quitCh:
1✔
981
                        return
1✔
982
                case <-t.C:
1✔
983
                        blocked := s.connRateCounter.countBlocked()
1✔
984
                        if blocked > 0 {
2✔
985
                                s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
1✔
986
                        }
1✔
987
                }
988
        }
989
}
990

991
// clusterName returns our cluster name which could be dynamic.
992
func (s *Server) ClusterName() string {
114,448✔
993
        s.mu.RLock()
114,448✔
994
        cn := s.info.Cluster
114,448✔
995
        s.mu.RUnlock()
114,448✔
996
        return cn
114,448✔
997
}
114,448✔
998

999
// Grabs cluster name with cluster name specific lock.
1000
func (s *Server) cachedClusterName() string {
110,037✔
1001
        s.cnMu.RLock()
110,037✔
1002
        cn := s.cn
110,037✔
1003
        s.cnMu.RUnlock()
110,037✔
1004
        return cn
110,037✔
1005
}
110,037✔
1006

1007
// setClusterName will update the cluster name for this server.
1008
func (s *Server) setClusterName(name string) {
5✔
1009
        s.mu.Lock()
5✔
1010
        var resetCh chan struct{}
5✔
1011
        if s.sys != nil && s.info.Cluster != name {
9✔
1012
                // can't hold the lock as go routine reading it may be waiting for lock as well
4✔
1013
                resetCh = s.sys.resetCh
4✔
1014
        }
4✔
1015
        s.info.Cluster = name
5✔
1016
        s.routeInfo.Cluster = name
5✔
1017

5✔
1018
        // Need to close solicited leaf nodes. The close has to be done outside of the server lock.
5✔
1019
        var leafs []*client
5✔
1020
        for _, c := range s.leafs {
6✔
1021
                c.mu.Lock()
1✔
1022
                if c.leaf != nil && c.leaf.remote != nil {
2✔
1023
                        leafs = append(leafs, c)
1✔
1024
                }
1✔
1025
                c.mu.Unlock()
1✔
1026
        }
1027
        s.mu.Unlock()
5✔
1028

5✔
1029
        // Also place into mapping cn with cnMu lock.
5✔
1030
        s.cnMu.Lock()
5✔
1031
        s.cn = name
5✔
1032
        s.cnMu.Unlock()
5✔
1033

5✔
1034
        for _, l := range leafs {
6✔
1035
                l.closeConnection(ClusterNameConflict)
1✔
1036
        }
1✔
1037
        if resetCh != nil {
9✔
1038
                resetCh <- struct{}{}
4✔
1039
        }
4✔
1040
        s.Noticef("Cluster name updated to %s", name)
5✔
1041
}
1042

1043
// Return whether the cluster name is dynamic.
1044
func (s *Server) isClusterNameDynamic() bool {
43,728✔
1045
        // We need to lock the whole "Cluster.Name" check and not use s.getOpts()
43,728✔
1046
        // because otherwise this could cause a data race with setting the name in
43,728✔
1047
        // route.go's processRouteConnect().
43,728✔
1048
        s.optsMu.RLock()
43,728✔
1049
        dynamic := s.opts.Cluster.Name == _EMPTY_
43,728✔
1050
        s.optsMu.RUnlock()
43,728✔
1051
        return dynamic
43,728✔
1052
}
43,728✔
1053

1054
// Returns our configured serverName.
1055
func (s *Server) serverName() string {
24,383✔
1056
        return s.getOpts().ServerName
24,383✔
1057
}
24,383✔
1058

1059
// ClientURL returns the URL used to connect clients.
1060
// Helpful in tests and with in-process servers using a random client port (-1).
1061
func (s *Server) ClientURL() string {
6,608✔
1062
        // FIXME(dlc) - should we add in user and pass if defined single?
6,608✔
1063
        opts := s.getOpts()
6,608✔
1064
        var u url.URL
6,608✔
1065
        u.Scheme = "nats"
6,608✔
1066
        if opts.TLSConfig != nil {
6,618✔
1067
                u.Scheme = "tls"
10✔
1068
        }
10✔
1069
        u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
6,608✔
1070
        return u.String()
6,608✔
1071
}
1072

1073
// WebsocketURL returns the URL used to connect websocket clients.
1074
// Helpful in tests and with in-process servers using a random websocket port (-1).
1075
func (s *Server) WebsocketURL() string {
×
1076
        opts := s.getOpts()
×
1077
        var u url.URL
×
1078
        u.Scheme = "ws"
×
1079
        if opts.Websocket.TLSConfig != nil {
×
1080
                u.Scheme = "wss"
×
1081
        }
×
1082
        u.Host = net.JoinHostPort(opts.Websocket.Host, fmt.Sprintf("%d", opts.Websocket.Port))
×
1083
        return u.String()
×
1084
}
1085

1086
func validateCluster(o *Options) error {
7,851✔
1087
        if o.Cluster.Name != _EMPTY_ && strings.Contains(o.Cluster.Name, " ") {
7,851✔
1088
                return ErrClusterNameHasSpaces
×
1089
        }
×
1090
        if o.Cluster.Compression.Mode != _EMPTY_ {
12,812✔
1091
                if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil {
4,961✔
1092
                        return err
×
1093
                }
×
1094
        }
1095
        if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil {
7,851✔
1096
                return fmt.Errorf("cluster: %v", err)
×
1097
        }
×
1098
        // Check that cluster name if defined matches any gateway name.
1099
        // Note that we have already verified that the gateway name does not have spaces.
1100
        if o.Gateway.Name != _EMPTY_ && o.Gateway.Name != o.Cluster.Name {
8,211✔
1101
                if o.Cluster.Name != _EMPTY_ {
360✔
1102
                        return ErrClusterNameConfigConflict
×
1103
                }
×
1104
                // Set this here so we do not consider it dynamic.
1105
                o.Cluster.Name = o.Gateway.Name
360✔
1106
        }
1107
        if l := len(o.Cluster.PinnedAccounts); l > 0 {
11,663✔
1108
                if o.Cluster.PoolSize < 0 {
3,812✔
1109
                        return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
×
1110
                }
×
1111
                m := make(map[string]struct{}, l)
3,812✔
1112
                for _, a := range o.Cluster.PinnedAccounts {
7,636✔
1113
                        if _, exists := m[a]; exists {
3,824✔
1114
                                return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
×
1115
                        }
×
1116
                        m[a] = struct{}{}
3,824✔
1117
                }
1118
        }
1119
        return nil
7,851✔
1120
}
1121

1122
func validatePinnedCerts(pinned PinnedCertSet) error {
17,833✔
1123
        re := regexp.MustCompile("^[a-f0-9]{64}$")
17,833✔
1124
        for certId := range pinned {
17,839✔
1125
                entry := strings.ToLower(certId)
6✔
1126
                if !re.MatchString(entry) {
6✔
1127
                        return fmt.Errorf("error parsing 'pinned_certs' key %s does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo", entry)
×
1128
                }
×
1129
        }
1130
        return nil
17,833✔
1131
}
1132

1133
func validateOptions(o *Options) error {
7,892✔
1134
        if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
7,892✔
1135
                return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
×
1136
                        o.LameDuckGracePeriod, o.LameDuckDuration)
×
1137
        }
×
1138
        if int64(o.MaxPayload) > o.MaxPending {
7,892✔
1139
                return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)",
×
1140
                        o.MaxPayload, o.MaxPending)
×
1141
        }
×
1142
        if o.ServerName != _EMPTY_ && strings.Contains(o.ServerName, " ") {
7,892✔
1143
                return errors.New("server name cannot contain spaces")
×
1144
        }
×
1145
        // Check that the trust configuration is correct.
1146
        if err := validateTrustedOperators(o); err != nil {
7,901✔
1147
                return err
9✔
1148
        }
9✔
1149
        // Check on leaf nodes which will require a system
1150
        // account when gateways are also configured.
1151
        if err := validateLeafNode(o); err != nil {
7,913✔
1152
                return err
30✔
1153
        }
30✔
1154
        // Check that authentication is properly configured.
1155
        if err := validateAuth(o); err != nil {
7,855✔
1156
                return err
2✔
1157
        }
2✔
1158
        // Check that proxies is properly configured.
1159
        if err := validateProxies(o); err != nil {
7,851✔
1160
                return err
×
1161
        }
×
1162
        // Check that gateway is properly configured. Returns no error
1163
        // if there is no gateway defined.
1164
        if err := validateGatewayOptions(o); err != nil {
7,851✔
1165
                return err
×
1166
        }
×
1167
        // Check that cluster name if defined matches any gateway name.
1168
        if err := validateCluster(o); err != nil {
7,851✔
1169
                return err
×
1170
        }
×
1171
        if err := validateMQTTOptions(o); err != nil {
7,856✔
1172
                return err
5✔
1173
        }
5✔
1174
        if err := validateJetStreamOptions(o); err != nil {
7,858✔
1175
                return err
12✔
1176
        }
12✔
1177
        // Finally check websocket options.
1178
        return validateWebsocketOptions(o)
7,834✔
1179
}
1180

1181
func (s *Server) getOpts() *Options {
3,159,122✔
1182
        s.optsMu.RLock()
3,159,122✔
1183
        opts := s.opts
3,159,122✔
1184
        s.optsMu.RUnlock()
3,159,122✔
1185
        return opts
3,159,122✔
1186
}
3,159,122✔
1187

1188
func (s *Server) setOpts(opts *Options) {
1,141✔
1189
        s.optsMu.Lock()
1,141✔
1190
        s.opts = opts
1,141✔
1191
        s.optsMu.Unlock()
1,141✔
1192
}
1,141✔
1193

1194
func (s *Server) globalAccount() *Account {
11,081✔
1195
        s.mu.RLock()
11,081✔
1196
        gacc := s.gacc
11,081✔
1197
        s.mu.RUnlock()
11,081✔
1198
        return gacc
11,081✔
1199
}
11,081✔
1200

1201
// Used to setup or update Accounts.
1202
// Returns a map that indicates which accounts have had their stream imports
1203
// changed (in case of an update in configuration reload).
1204
// Lock is held upon entry, but will be released/reacquired in this function.
1205
func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) {
7,820✔
1206
        awcsti := make(map[string]struct{})
7,820✔
1207

7,820✔
1208
        // Create the global account.
7,820✔
1209
        if s.gacc == nil {
14,522✔
1210
                s.gacc = NewAccount(globalAccountName)
6,702✔
1211
                s.registerAccountNoLock(s.gacc)
6,702✔
1212
        }
6,702✔
1213

1214
        opts := s.getOpts()
7,820✔
1215

7,820✔
1216
        // We need to track service imports since we can not swap them out (unsub and re-sub)
7,820✔
1217
        // until the proper server struct accounts have been swapped in properly. Doing it in
7,820✔
1218
        // place could lead to data loss or server panic since account under new si has no real
7,820✔
1219
        // account and hence no sublist, so will panic on inbound message.
7,820✔
1220
        siMap := make(map[*Account][][]byte)
7,820✔
1221

7,820✔
1222
        // Check opts and walk through them. We need to copy them here
7,820✔
1223
        // so that we do not keep a real one sitting in the options.
7,820✔
1224
        for _, acc := range opts.Accounts {
16,373✔
1225
                var a *Account
8,553✔
1226
                create := true
8,553✔
1227
                // For the global account, we want to skip the reload process
8,553✔
1228
                // and fall back into the "create" case which will in that
8,553✔
1229
                // case really be just an update (shallowCopy will make sure
8,553✔
1230
                // that mappings are copied over).
8,553✔
1231
                if reloading && acc.Name != globalAccountName {
11,691✔
1232
                        if ai, ok := s.accounts.Load(acc.Name); ok {
6,276✔
1233
                                a = ai.(*Account)
3,138✔
1234
                                // Before updating the account, check if stream imports have changed.
3,138✔
1235
                                if !a.checkStreamImportsEqual(acc) {
3,139✔
1236
                                        awcsti[acc.Name] = struct{}{}
1✔
1237
                                }
1✔
1238
                                a.mu.Lock()
3,138✔
1239
                                // Collect the sids for the service imports since we are going to
3,138✔
1240
                                // replace with new ones.
3,138✔
1241
                                var sids [][]byte
3,138✔
1242
                                for _, sis := range a.imports.services {
13,518✔
1243
                                        for _, si := range sis {
20,760✔
1244
                                                if si.sid != nil {
20,760✔
1245
                                                        sids = append(sids, si.sid)
10,380✔
1246
                                                }
10,380✔
1247
                                        }
1248
                                }
1249
                                // Setup to process later if needed.
1250
                                if len(sids) > 0 || len(acc.imports.services) > 0 {
6,276✔
1251
                                        siMap[a] = sids
3,138✔
1252
                                }
3,138✔
1253

1254
                                // Now reset all export/imports fields since they are going to be
1255
                                // filled in shallowCopy()
1256
                                a.imports.streams, a.imports.services = nil, nil
3,138✔
1257
                                a.exports.streams, a.exports.services = nil, nil
3,138✔
1258
                                // We call shallowCopy from the account `acc` (the one in Options)
3,138✔
1259
                                // and pass `a` (our existing account) to get it updated.
3,138✔
1260
                                acc.shallowCopy(a)
3,138✔
1261
                                a.mu.Unlock()
3,138✔
1262
                                create = false
3,138✔
1263
                        }
1264
                }
1265
                // Track old mappings if global account.
1266
                var oldGMappings []*mapping
8,553✔
1267
                if create {
13,968✔
1268
                        if acc.Name == globalAccountName {
5,423✔
1269
                                a = s.gacc
8✔
1270
                                a.mu.Lock()
8✔
1271
                                oldGMappings = append(oldGMappings, a.mappings...)
8✔
1272
                                a.mu.Unlock()
8✔
1273
                        } else {
5,415✔
1274
                                a = NewAccount(acc.Name)
5,407✔
1275
                        }
5,407✔
1276
                        // Locking matters in the case of an update of the global account
1277
                        a.mu.Lock()
5,415✔
1278
                        acc.shallowCopy(a)
5,415✔
1279
                        a.mu.Unlock()
5,415✔
1280
                        // Will be a no-op in case of the global account since it is already registered.
5,415✔
1281
                        s.registerAccountNoLock(a)
5,415✔
1282
                }
1283

1284
                // The `acc` account is stored in options, not in the server, and these can be cleared.
1285
                acc.sl, acc.clients, acc.mappings = nil, nil, nil
8,553✔
1286

8,553✔
1287
                // Check here if we have been reloaded and we have a global account with mappings that may have changed.
8,553✔
1288
                // If we have leafnodes they need to be updated.
8,553✔
1289
                if reloading && a == s.gacc {
8,555✔
1290
                        a.mu.Lock()
2✔
1291
                        mappings := make(map[string]*mapping)
2✔
1292
                        if len(a.mappings) > 0 && a.nleafs > 0 {
4✔
1293
                                for _, em := range a.mappings {
4✔
1294
                                        mappings[em.src] = em
2✔
1295
                                }
2✔
1296
                        }
1297
                        a.mu.Unlock()
2✔
1298
                        if len(mappings) > 0 || len(oldGMappings) > 0 {
4✔
1299
                                a.lmu.RLock()
2✔
1300
                                for _, lc := range a.lleafs {
4✔
1301
                                        for _, em := range mappings {
4✔
1302
                                                lc.forceAddToSmap(em.src)
2✔
1303
                                        }
2✔
1304
                                        // Remove any old ones if needed.
1305
                                        for _, em := range oldGMappings {
4✔
1306
                                                // Only remove if not in the new ones.
2✔
1307
                                                if _, ok := mappings[em.src]; !ok {
3✔
1308
                                                        lc.forceRemoveFromSmap(em.src)
1✔
1309
                                                }
1✔
1310
                                        }
1311
                                }
1312
                                a.lmu.RUnlock()
2✔
1313
                        }
1314
                }
1315

1316
                // If we see an account defined using $SYS we will make sure that is set as system account.
1317
                if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ {
12,340✔
1318
                        opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT
3,787✔
1319
                }
3,787✔
1320
        }
1321

1322
        // Now that we have this we need to remap any referenced accounts in
1323
        // import or export maps to the new ones.
1324
        swapApproved := func(ea *exportAuth) {
9,375✔
1325
                for sub, a := range ea.approved {
1,600✔
1326
                        var acc *Account
45✔
1327
                        if v, ok := s.accounts.Load(a.Name); ok {
90✔
1328
                                acc = v.(*Account)
45✔
1329
                        }
45✔
1330
                        ea.approved[sub] = acc
45✔
1331
                }
1332
        }
1333
        var numAccounts int
7,820✔
1334
        s.accounts.Range(func(k, v any) bool {
24,393✔
1335
                numAccounts++
16,573✔
1336
                acc := v.(*Account)
16,573✔
1337
                acc.mu.Lock()
16,573✔
1338
                // Exports
16,573✔
1339
                for _, se := range acc.exports.streams {
16,701✔
1340
                        if se != nil {
137✔
1341
                                swapApproved(&se.exportAuth)
9✔
1342
                        }
9✔
1343
                }
1344
                for _, se := range acc.exports.services {
18,119✔
1345
                        if se != nil {
3,092✔
1346
                                // Swap over the bound account for service exports.
1,546✔
1347
                                if se.acc != nil {
3,092✔
1348
                                        if v, ok := s.accounts.Load(se.acc.Name); ok {
3,092✔
1349
                                                se.acc = v.(*Account)
1,546✔
1350
                                        }
1,546✔
1351
                                }
1352
                                swapApproved(&se.exportAuth)
1,546✔
1353
                        }
1354
                }
1355
                // Imports
1356
                for _, si := range acc.imports.streams {
16,748✔
1357
                        if v, ok := s.accounts.Load(si.acc.Name); ok {
350✔
1358
                                si.acc = v.(*Account)
175✔
1359
                        }
175✔
1360
                }
1361
                for _, sis := range acc.imports.services {
22,419✔
1362
                        for _, si := range sis {
11,695✔
1363
                                if v, ok := s.accounts.Load(si.acc.Name); ok {
11,698✔
1364
                                        si.acc = v.(*Account)
5,849✔
1365

5,849✔
1366
                                        // It is possible to allow for latency tracking inside your
5,849✔
1367
                                        // own account, so lock only when not the same account.
5,849✔
1368
                                        if si.acc == acc {
5,906✔
1369
                                                si.se = si.acc.getServiceExport(si.to)
57✔
1370
                                                continue
57✔
1371
                                        }
1372
                                        si.acc.mu.RLock()
5,792✔
1373
                                        si.se = si.acc.getServiceExport(si.to)
5,792✔
1374
                                        si.acc.mu.RUnlock()
5,792✔
1375
                                }
1376
                        }
1377
                }
1378
                // Make sure the subs are running, but only if not reloading.
1379
                if len(acc.imports.services) > 0 && acc.ic == nil && !reloading {
16,821✔
1380
                        acc.ic = s.createInternalAccountClient()
248✔
1381
                        acc.ic.acc = acc
248✔
1382
                        // Need to release locks to invoke this function.
248✔
1383
                        acc.mu.Unlock()
248✔
1384
                        s.mu.Unlock()
248✔
1385
                        acc.addAllServiceImportSubs()
248✔
1386
                        s.mu.Lock()
248✔
1387
                        acc.mu.Lock()
248✔
1388
                }
248✔
1389
                acc.updated = time.Now()
16,573✔
1390
                acc.mu.Unlock()
16,573✔
1391
                return true
16,573✔
1392
        })
1393

1394
        // Check if we need to process service imports pending from above.
1395
        // This processing needs to be after we swap in the real accounts above.
1396
        for acc, sids := range siMap {
10,958✔
1397
                c := acc.ic
3,138✔
1398
                for _, sid := range sids {
13,518✔
1399
                        c.processUnsub(sid)
10,380✔
1400
                }
10,380✔
1401
                acc.addAllServiceImportSubs()
3,138✔
1402
                s.mu.Unlock()
3,138✔
1403
                s.registerSystemImports(acc)
3,138✔
1404
                s.mu.Lock()
3,138✔
1405
        }
1406

1407
        // Set the system account if it was configured.
1408
        // Otherwise create a default one.
1409
        if opts.SystemAccount != _EMPTY_ {
12,757✔
1410
                // Lock may be acquired in lookupAccount, so release to call lookupAccount.
4,937✔
1411
                s.mu.Unlock()
4,937✔
1412
                acc, err := s.lookupAccount(opts.SystemAccount)
4,937✔
1413
                s.mu.Lock()
4,937✔
1414
                if err == nil && s.sys != nil && acc != s.sys.account {
4,937✔
1415
                        // sys.account.clients (including internal client)/respmap/etc... are transferred separately
×
1416
                        s.sys.account = acc
×
1417
                        s.sysAcc.Store(acc)
×
1418
                }
×
1419
                if err != nil {
4,937✔
1420
                        return awcsti, fmt.Errorf("error resolving system account: %v", err)
×
1421
                }
×
1422

1423
                // If we have defined a system account here check to see if its just us and the $G account.
1424
                // We would do this to add user/pass to the system account. If this is the case add in
1425
                // no-auth-user for $G.
1426
                // Only do this if non-operator mode and we did not have an authorization block defined.
1427
                if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
7,928✔
1428
                        // If we come here from config reload, let's not recreate the fake user name otherwise
2,991✔
1429
                        // it will cause currently clients to be disconnected.
2,991✔
1430
                        uname := s.sysAccOnlyNoAuthUser
2,991✔
1431
                        if uname == _EMPTY_ {
5,980✔
1432
                                // Create a unique name so we do not collide.
2,989✔
1433
                                var b [8]byte
2,989✔
1434
                                rn := rand.Int63()
2,989✔
1435
                                for i, l := 0, rn; i < len(b); i++ {
26,901✔
1436
                                        b[i] = digits[l%base]
23,912✔
1437
                                        l /= base
23,912✔
1438
                                }
23,912✔
1439
                                uname = fmt.Sprintf("nats-%s", b[:])
2,989✔
1440
                                s.sysAccOnlyNoAuthUser = uname
2,989✔
1441
                        }
1442
                        opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc})
2,991✔
1443
                        opts.NoAuthUser = uname
2,991✔
1444
                }
1445
        }
1446

1447
        // Add any required exports from system account.
1448
        if s.sys != nil {
8,938✔
1449
                sysAcc := s.sys.account
1,118✔
1450
                s.mu.Unlock()
1,118✔
1451
                s.addSystemAccountExports(sysAcc)
1,118✔
1452
                s.mu.Lock()
1,118✔
1453
        }
1,118✔
1454

1455
        return awcsti, nil
7,820✔
1456
}
1457

1458
// Setup the account resolver. For memory resolver, make sure the JWTs are
1459
// properly formed but do not enforce expiration etc.
1460
// Lock is held on entry, but may be released/reacquired during this call.
1461
func (s *Server) configureResolver() error {
6,712✔
1462
        opts := s.getOpts()
6,712✔
1463
        s.accResolver = opts.AccountResolver
6,712✔
1464
        if opts.AccountResolver != nil {
7,081✔
1465
                // For URL resolver, set the TLSConfig if specified.
369✔
1466
                if opts.AccountResolverTLSConfig != nil {
371✔
1467
                        if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
4✔
1468
                                if t, ok := ar.c.Transport.(*http.Transport); ok {
4✔
1469
                                        t.CloseIdleConnections()
2✔
1470
                                        t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
2✔
1471
                                }
2✔
1472
                        }
1473
                }
1474
                if len(opts.resolverPreloads) > 0 {
541✔
1475
                        // Lock ordering is account resolver -> server, so we need to release
172✔
1476
                        // the lock and reacquire it when done with account resolver's calls.
172✔
1477
                        ar := s.accResolver
172✔
1478
                        s.mu.Unlock()
172✔
1479
                        defer s.mu.Lock()
172✔
1480
                        if ar.IsReadOnly() {
172✔
1481
                                return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
×
1482
                        }
×
1483
                        for k, v := range opts.resolverPreloads {
1,092✔
1484
                                _, err := jwt.DecodeAccountClaims(v)
920✔
1485
                                if err != nil {
920✔
1486
                                        return fmt.Errorf("preload account error for %q: %v", k, err)
×
1487
                                }
×
1488
                                ar.Store(k, v)
920✔
1489
                        }
1490
                }
1491
        }
1492
        return nil
6,712✔
1493
}
1494

1495
// This will check preloads for validation issues.
1496
func (s *Server) checkResolvePreloads() {
173✔
1497
        opts := s.getOpts()
173✔
1498
        // We can just check the read-only opts versions here, that way we do not need
173✔
1499
        // to grab server lock or access s.accResolver.
173✔
1500
        for k, v := range opts.resolverPreloads {
1,093✔
1501
                claims, err := jwt.DecodeAccountClaims(v)
920✔
1502
                if err != nil {
920✔
1503
                        s.Errorf("Preloaded account [%s] not valid", k)
×
1504
                        continue
×
1505
                }
1506
                // Check if it is expired.
1507
                vr := jwt.CreateValidationResults()
920✔
1508
                claims.Validate(vr)
920✔
1509
                if vr.IsBlocking(true) {
921✔
1510
                        s.Warnf("Account [%s] has validation issues:", k)
1✔
1511
                        for _, v := range vr.Issues {
2✔
1512
                                s.Warnf("  - %s", v.Description)
1✔
1513
                        }
1✔
1514
                }
1515
        }
1516
}
1517

1518
// Determines if we are in pre NATS 2.0 setup with no accounts.
1519
func (s *Server) globalAccountOnly() bool {
4,289✔
1520
        var hasOthers bool
4,289✔
1521

4,289✔
1522
        if s.trustedKeys != nil {
4,432✔
1523
                return false
143✔
1524
        }
143✔
1525

1526
        s.mu.RLock()
4,146✔
1527
        s.accounts.Range(func(k, v any) bool {
12,375✔
1528
                acc := v.(*Account)
8,229✔
1529
                // Ignore global and system
8,229✔
1530
                if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
16,094✔
1531
                        return true
7,865✔
1532
                }
7,865✔
1533
                hasOthers = true
364✔
1534
                return false
364✔
1535
        })
1536
        s.mu.RUnlock()
4,146✔
1537

4,146✔
1538
        return !hasOthers
4,146✔
1539
}
1540

1541
// Determines if this server is in standalone mode, meaning no routes or gateways.
1542
func (s *Server) standAloneMode() bool {
25,356✔
1543
        opts := s.getOpts()
25,356✔
1544
        return opts.Cluster.Port == 0 && opts.Gateway.Port == 0
25,356✔
1545
}
25,356✔
1546

1547
func (s *Server) configuredRoutes() int {
3,333✔
1548
        return len(s.getOpts().Routes)
3,333✔
1549
}
3,333✔
1550

1551
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
1552
func (s *Server) ActivePeers() (peers []string) {
3,122✔
1553
        s.nodeToInfo.Range(func(k, v any) bool {
6,627✔
1554
                si := v.(nodeInfo)
3,505✔
1555
                if !si.offline {
7,010✔
1556
                        peers = append(peers, k.(string))
3,505✔
1557
                }
3,505✔
1558
                return true
3,505✔
1559
        })
1560
        return peers
3,122✔
1561
}
1562

1563
// isTrustedIssuer will check that the issuer is a trusted public key.
1564
// This is used to make sure an account was signed by a trusted operator.
1565
func (s *Server) isTrustedIssuer(issuer string) bool {
10,714✔
1566
        s.mu.RLock()
10,714✔
1567
        defer s.mu.RUnlock()
10,714✔
1568
        // If we are not running in trusted mode and there is no issuer, that is ok.
10,714✔
1569
        if s.trustedKeys == nil && issuer == _EMPTY_ {
16,303✔
1570
                return true
5,589✔
1571
        }
5,589✔
1572
        for _, tk := range s.trustedKeys {
10,460✔
1573
                if tk == issuer {
10,383✔
1574
                        return true
5,048✔
1575
                }
5,048✔
1576
        }
1577
        return false
77✔
1578
}
1579

1580
// processTrustedKeys will process binary stamped and
1581
// options-based trusted nkeys. Returns success.
1582
func (s *Server) processTrustedKeys() bool {
6,705✔
1583
        s.strictSigningKeyUsage = map[string]struct{}{}
6,705✔
1584
        opts := s.getOpts()
6,705✔
1585
        if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() {
6,705✔
1586
                return false
×
1587
        } else if opts.TrustedKeys != nil {
7,117✔
1588
                for _, key := range opts.TrustedKeys {
1,704✔
1589
                        if !nkeys.IsValidPublicOperatorKey(key) {
1,292✔
1590
                                return false
×
1591
                        }
×
1592
                }
1593
                s.trustedKeys = append([]string(nil), opts.TrustedKeys...)
412✔
1594
                for _, claim := range opts.TrustedOperators {
715✔
1595
                        if !claim.StrictSigningKeyUsage {
604✔
1596
                                continue
301✔
1597
                        }
1598
                        for _, key := range claim.SigningKeys {
4✔
1599
                                s.strictSigningKeyUsage[key] = struct{}{}
2✔
1600
                        }
2✔
1601
                }
1602
        }
1603
        return true
6,705✔
1604
}
1605

1606
// checkTrustedKeyString will check that the string is a valid array
1607
// of public operator nkeys.
1608
func checkTrustedKeyString(keys string) []string {
×
1609
        tks := strings.Fields(keys)
×
1610
        if len(tks) == 0 {
×
1611
                return nil
×
1612
        }
×
1613
        // Walk all the keys and make sure they are valid.
1614
        for _, key := range tks {
×
1615
                if !nkeys.IsValidPublicOperatorKey(key) {
×
1616
                        return nil
×
1617
                }
×
1618
        }
1619
        return tks
×
1620
}
1621

1622
// initStampedTrustedKeys will check the stamped trusted keys
1623
// and will set the server field 'trustedKeys'. Returns whether
1624
// it succeeded or not.
1625
func (s *Server) initStampedTrustedKeys() bool {
×
1626
        // Check to see if we have an override in options, which will cause us to fail.
×
1627
        if len(s.getOpts().TrustedKeys) > 0 {
×
1628
                return false
×
1629
        }
×
1630
        tks := checkTrustedKeyString(trustedKeys)
×
1631
        if len(tks) == 0 {
×
1632
                return false
×
1633
        }
×
1634
        s.trustedKeys = tks
×
1635
        return true
×
1636
}
1637

1638
// PrintAndDie is exported for access in other packages.
1639
func PrintAndDie(msg string) {
×
1640
        fmt.Fprintln(os.Stderr, msg)
×
1641
        os.Exit(1)
×
1642
}
×
1643

1644
// PrintServerAndExit will print our version and exit.
1645
func PrintServerAndExit() {
×
1646
        fmt.Printf("nats-server: v%s\n", VERSION)
×
1647
        os.Exit(0)
×
1648
}
×
1649

1650
// ProcessCommandLineArgs takes the command line arguments
1651
// validating and setting flags for handling in case any
1652
// sub command was present.
1653
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
×
1654
        if len(cmd.Args()) > 0 {
×
1655
                arg := cmd.Args()[0]
×
1656
                switch strings.ToLower(arg) {
×
1657
                case "version":
×
1658
                        return true, false, nil
×
1659
                case "help":
×
1660
                        return false, true, nil
×
1661
                default:
×
1662
                        return false, false, fmt.Errorf("unrecognized command: %q", arg)
×
1663
                }
1664
        }
1665

1666
        return false, false, nil
×
1667
}
1668

1669
// Public version.
1670
func (s *Server) Running() bool {
2,427✔
1671
        return s.isRunning()
2,427✔
1672
}
2,427✔
1673

1674
// Protected check on running state
1675
func (s *Server) isRunning() bool {
329,765,675✔
1676
        return s.running.Load()
329,765,675✔
1677
}
329,765,675✔
1678

1679
func (s *Server) logPid() error {
1✔
1680
        pidStr := strconv.Itoa(os.Getpid())
1✔
1681
        return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
1✔
1682
}
1✔
1683

1684
// numReservedAccounts will return the number of reserved accounts configured in the server.
1685
// Currently this is 1, one for the global default account.
1686
func (s *Server) numReservedAccounts() int {
1✔
1687
        return 1
1✔
1688
}
1✔
1689

1690
// NumActiveAccounts reports number of active accounts on this server.
1691
func (s *Server) NumActiveAccounts() int32 {
×
1692
        return atomic.LoadInt32(&s.activeAccounts)
×
1693
}
×
1694

1695
// incActiveAccounts() just adds one under lock.
1696
func (s *Server) incActiveAccounts() {
21,655✔
1697
        atomic.AddInt32(&s.activeAccounts, 1)
21,655✔
1698
}
21,655✔
1699

1700
// decActiveAccounts() just subtracts one under lock.
1701
func (s *Server) decActiveAccounts() {
11,412✔
1702
        atomic.AddInt32(&s.activeAccounts, -1)
11,412✔
1703
}
11,412✔
1704

1705
// This should be used for testing only. Will be slow since we have to
1706
// range over all accounts in the sync.Map to count.
1707
func (s *Server) numAccounts() int {
6✔
1708
        count := 0
6✔
1709
        s.mu.RLock()
6✔
1710
        s.accounts.Range(func(k, v any) bool {
20✔
1711
                count++
14✔
1712
                return true
14✔
1713
        })
14✔
1714
        s.mu.RUnlock()
6✔
1715
        return count
6✔
1716
}
1717

1718
// NumLoadedAccounts returns the number of loaded accounts.
1719
func (s *Server) NumLoadedAccounts() int {
4✔
1720
        return s.numAccounts()
4✔
1721
}
4✔
1722

1723
// LookupOrRegisterAccount will return the given account if known or create a new entry.
1724
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
2,088✔
1725
        s.mu.Lock()
2,088✔
1726
        defer s.mu.Unlock()
2,088✔
1727
        if v, ok := s.accounts.Load(name); ok {
2,088✔
1728
                return v.(*Account), false
×
1729
        }
×
1730
        acc := NewAccount(name)
2,088✔
1731
        s.registerAccountNoLock(acc)
2,088✔
1732
        return acc, true
2,088✔
1733
}
1734

1735
// RegisterAccount will register an account. The account must be new
1736
// or this call will fail.
1737
func (s *Server) RegisterAccount(name string) (*Account, error) {
849✔
1738
        s.mu.Lock()
849✔
1739
        defer s.mu.Unlock()
849✔
1740
        if _, ok := s.accounts.Load(name); ok {
978✔
1741
                return nil, ErrAccountExists
129✔
1742
        }
129✔
1743
        acc := NewAccount(name)
720✔
1744
        s.registerAccountNoLock(acc)
720✔
1745
        return acc, nil
720✔
1746
}
1747

1748
// SetSystemAccount will set the internal system account.
1749
// If root operators are present it will also check validity.
1750
func (s *Server) SetSystemAccount(accName string) error {
5,959✔
1751
        // Lookup from sync.Map first.
5,959✔
1752
        if v, ok := s.accounts.Load(accName); ok {
11,916✔
1753
                return s.setSystemAccount(v.(*Account))
5,957✔
1754
        }
5,957✔
1755

1756
        // If we are here we do not have local knowledge of this account.
1757
        // Do this one by hand to return more useful error.
1758
        ac, jwt, err := s.fetchAccountClaims(accName)
2✔
1759
        if err != nil {
3✔
1760
                return err
1✔
1761
        }
1✔
1762
        acc := s.buildInternalAccount(ac)
1✔
1763
        acc.claimJWT = jwt
1✔
1764
        // Due to race, we need to make sure that we are not
1✔
1765
        // registering twice.
1✔
1766
        if racc := s.registerAccount(acc); racc != nil {
1✔
1767
                return nil
×
1768
        }
×
1769
        return s.setSystemAccount(acc)
1✔
1770
}
1771

1772
// SystemAccount returns the system account if set.
1773
func (s *Server) SystemAccount() *Account {
401,947✔
1774
        return s.sysAcc.Load()
401,947✔
1775
}
401,947✔
1776

1777
// GlobalAccount returns the global account.
1778
// Default clients will use the global account.
1779
func (s *Server) GlobalAccount() *Account {
7,335✔
1780
        s.mu.RLock()
7,335✔
1781
        defer s.mu.RUnlock()
7,335✔
1782
        return s.gacc
7,335✔
1783
}
7,335✔
1784

1785
// SetDefaultSystemAccount will create a default system account if one is not present.
1786
func (s *Server) SetDefaultSystemAccount() error {
2,078✔
1787
        if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
2,078✔
1788
                return nil
×
1789
        }
×
1790
        s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
2,078✔
1791
        return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
2,078✔
1792
}
1793

1794
// Assign a system account. Should only be called once.
1795
// This sets up a server to send and receive messages from
1796
// inside the server itself.
1797
func (s *Server) setSystemAccount(acc *Account) error {
5,978✔
1798
        if acc == nil {
5,978✔
1799
                return ErrMissingAccount
×
1800
        }
×
1801
        // Don't try to fix this here.
1802
        if acc.IsExpired() {
5,978✔
1803
                return ErrAccountExpired
×
1804
        }
×
1805
        // If we are running with trusted keys for an operator
1806
        // make sure we check the account is legit.
1807
        if !s.isTrustedIssuer(acc.Issuer) {
6,048✔
1808
                return ErrAccountValidation
70✔
1809
        }
70✔
1810

1811
        s.mu.Lock()
5,908✔
1812

5,908✔
1813
        if s.sys != nil {
5,910✔
1814
                s.mu.Unlock()
2✔
1815
                return ErrAccountExists
2✔
1816
        }
2✔
1817

1818
        // This is here in an attempt to quiet the race detector and not have to place
1819
        // locks on fast path for inbound messages and checking service imports.
1820
        acc.mu.Lock()
5,906✔
1821
        if acc.imports.services == nil {
11,812✔
1822
                acc.imports.services = make(map[string][]*serviceImport)
5,906✔
1823
        }
5,906✔
1824
        acc.mu.Unlock()
5,906✔
1825

5,906✔
1826
        s.sys = &internal{
5,906✔
1827
                account: acc,
5,906✔
1828
                client:  s.createInternalSystemClient(),
5,906✔
1829
                seq:     1,
5,906✔
1830
                sid:     1,
5,906✔
1831
                servers: make(map[string]*serverUpdate),
5,906✔
1832
                replies: make(map[string]msgHandler),
5,906✔
1833
                sendq:   newIPQueue[*pubMsg](s, "System sendQ"),
5,906✔
1834
                recvq:   newIPQueue[*inSysMsg](s, "System recvQ"),
5,906✔
1835
                recvqp:  newIPQueue[*inSysMsg](s, "System recvQ Pings"),
5,906✔
1836
                resetCh: make(chan struct{}),
5,906✔
1837
                sq:      s.newSendQ(acc),
5,906✔
1838
                statsz:  statsHBInterval,
5,906✔
1839
                orphMax: 5 * eventsHBInterval,
5,906✔
1840
                chkOrph: 3 * eventsHBInterval,
5,906✔
1841
        }
5,906✔
1842
        recvq, recvqp := s.sys.recvq, s.sys.recvqp
5,906✔
1843
        s.sys.wg.Add(1)
5,906✔
1844
        s.mu.Unlock()
5,906✔
1845

5,906✔
1846
        // Store in atomic for fast lookup.
5,906✔
1847
        s.sysAcc.Store(acc)
5,906✔
1848

5,906✔
1849
        // Register with the account.
5,906✔
1850
        s.sys.client.registerWithAccount(acc)
5,906✔
1851

5,906✔
1852
        s.addSystemAccountExports(acc)
5,906✔
1853

5,906✔
1854
        // Start our internal loop to serialize outbound messages.
5,906✔
1855
        // We do our own wg here since we will stop first during shutdown.
5,906✔
1856
        go s.internalSendLoop(&s.sys.wg)
5,906✔
1857

5,906✔
1858
        // Start the internal loop for inbound messages.
5,906✔
1859
        go s.internalReceiveLoop(recvq)
5,906✔
1860
        // Start the internal loop for inbound STATSZ/Ping messages.
5,906✔
1861
        go s.internalReceiveLoop(recvqp)
5,906✔
1862

5,906✔
1863
        // Start up our general subscriptions
5,906✔
1864
        s.initEventTracking()
5,906✔
1865

5,906✔
1866
        // Track for dead remote servers.
5,906✔
1867
        s.wrapChk(s.startRemoteServerSweepTimer)()
5,906✔
1868

5,906✔
1869
        // Send out statsz updates periodically.
5,906✔
1870
        s.wrapChk(s.startStatszTimer)()
5,906✔
1871

5,906✔
1872
        // If we have existing accounts make sure we enable account tracking.
5,906✔
1873
        s.mu.Lock()
5,906✔
1874
        s.accounts.Range(func(k, v any) bool {
19,455✔
1875
                acc := v.(*Account)
13,549✔
1876
                s.enableAccountTracking(acc)
13,549✔
1877
                return true
13,549✔
1878
        })
13,549✔
1879
        s.mu.Unlock()
5,906✔
1880

5,906✔
1881
        return nil
5,906✔
1882
}
1883

1884
// Creates an internal system client.
1885
func (s *Server) createInternalSystemClient() *client {
29,925✔
1886
        return s.createInternalClient(SYSTEM)
29,925✔
1887
}
29,925✔
1888

1889
// Creates an internal jetstream client.
1890
func (s *Server) createInternalJetStreamClient() *client {
44,817✔
1891
        return s.createInternalClient(JETSTREAM)
44,817✔
1892
}
44,817✔
1893

1894
// Creates an internal client for Account.
1895
func (s *Server) createInternalAccountClient() *client {
14,728✔
1896
        return s.createInternalClient(ACCOUNT)
14,728✔
1897
}
14,728✔
1898

1899
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
1900
func (s *Server) createInternalClient(kind int) *client {
89,470✔
1901
        if !isInternalClient(kind) {
89,470✔
1902
                return nil
×
1903
        }
×
1904
        now := time.Now()
89,470✔
1905
        c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
89,470✔
1906
        c.initClient()
89,470✔
1907
        c.echo = false
89,470✔
1908
        c.headers = true
89,470✔
1909
        c.flags.set(noReconnect)
89,470✔
1910
        return c
89,470✔
1911
}
1912

1913
// Determine if accounts should track subscriptions for
1914
// efficient propagation.
1915
// Lock should be held on entry.
1916
func (s *Server) shouldTrackSubscriptions() bool {
16,269✔
1917
        opts := s.getOpts()
16,269✔
1918
        return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0)
16,269✔
1919
}
16,269✔
1920

1921
// Invokes registerAccountNoLock under the protection of the server lock.
1922
// That is, server lock is acquired/released in this function.
1923
// See registerAccountNoLock for comment on returned value.
1924
func (s *Server) registerAccount(acc *Account) *Account {
1,662✔
1925
        s.mu.Lock()
1,662✔
1926
        racc := s.registerAccountNoLock(acc)
1,662✔
1927
        s.mu.Unlock()
1,662✔
1928
        return racc
1,662✔
1929
}
1,662✔
1930

1931
// Helper to set the sublist based on preferences.
1932
func (s *Server) setAccountSublist(acc *Account) {
17,931✔
1933
        if acc != nil && acc.sl == nil {
34,574✔
1934
                opts := s.getOpts()
16,643✔
1935
                if opts != nil && opts.NoSublistCache {
16,643✔
1936
                        acc.sl = NewSublistNoCache()
×
1937
                } else {
16,643✔
1938
                        acc.sl = NewSublistWithCache()
16,643✔
1939
                }
16,643✔
1940
        }
1941
}
1942

1943
// Registers an account in the server.
1944
// Due to some locking considerations, we may end-up trying
1945
// to register the same account twice. This function will
1946
// then return the already registered account.
1947
// Lock should be held on entry.
1948
func (s *Server) registerAccountNoLock(acc *Account) *Account {
16,651✔
1949
        // We are under the server lock. Lookup from map, if present
16,651✔
1950
        // return existing account.
16,651✔
1951
        if a, _ := s.accounts.Load(acc.Name); a != nil {
17,033✔
1952
                s.tmpAccounts.Delete(acc.Name)
382✔
1953
                return a.(*Account)
382✔
1954
        }
382✔
1955
        // Finish account setup and store.
1956
        s.setAccountSublist(acc)
16,269✔
1957

16,269✔
1958
        acc.mu.Lock()
16,269✔
1959
        s.setRouteInfo(acc)
16,269✔
1960
        if acc.clients == nil {
32,530✔
1961
                acc.clients = make(map[*client]struct{})
16,261✔
1962
        }
16,261✔
1963

1964
        // If we are capable of routing we will track subscription
1965
        // information for efficient interest propagation.
1966
        // During config reload, it is possible that account was
1967
        // already created (global account), so use locking and
1968
        // make sure we create only if needed.
1969
        // TODO(dlc)- Double check that we need this for GWs.
1970
        if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
27,285✔
1971
                acc.rm = make(map[string]int32)
11,016✔
1972
                acc.lqws = make(map[string]int32)
11,016✔
1973
        }
11,016✔
1974
        acc.srv = s
16,269✔
1975
        acc.updated = time.Now()
16,269✔
1976
        accName := acc.Name
16,269✔
1977
        jsEnabled := len(acc.jsLimits) > 0
16,269✔
1978
        acc.mu.Unlock()
16,269✔
1979

16,269✔
1980
        if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 {
16,329✔
1981
                if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok {
80✔
1982
                        if jsEnabled {
23✔
1983
                                s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
3✔
1984
                        } else if defDomain != _EMPTY_ {
29✔
1985
                                for src, dest := range generateJSMappingTable(defDomain) {
90✔
1986
                                        // flip src and dest around so the domain is inserted
81✔
1987
                                        s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
81✔
1988
                                        if err := acc.AddMapping(dest, src); err != nil {
81✔
1989
                                                s.Errorf("Error adding JetStream default domain mapping: %v", err)
×
1990
                                        }
×
1991
                                }
1992
                        }
1993
                }
1994
        }
1995

1996
        s.accounts.Store(acc.Name, acc)
16,269✔
1997
        s.tmpAccounts.Delete(acc.Name)
16,269✔
1998
        s.enableAccountTracking(acc)
16,269✔
1999

16,269✔
2000
        // Can not have server lock here.
16,269✔
2001
        s.mu.Unlock()
16,269✔
2002
        s.registerSystemImports(acc)
16,269✔
2003
        // Starting 2.9.0, we are phasing out the optimistic mode, so change
16,269✔
2004
        // the account to interest-only mode (except if instructed not to do
16,269✔
2005
        // it in some tests).
16,269✔
2006
        if s.gateway.enabled && !gwDoNotForceInterestOnlyMode {
19,155✔
2007
                s.switchAccountToInterestMode(acc.GetName())
2,886✔
2008
        }
2,886✔
2009
        s.mu.Lock()
16,269✔
2010

16,269✔
2011
        return nil
16,269✔
2012
}
2013

2014
// Sets the account's routePoolIdx depending on presence or not of
2015
// pooling or per-account routes. Also updates a map used by
2016
// gateway code to retrieve a route based on some route hash.
2017
//
2018
// Both Server and Account lock held on entry.
2019
func (s *Server) setRouteInfo(acc *Account) {
16,305✔
2020
        // If there is a dedicated route configured for this account
16,305✔
2021
        if _, ok := s.accRoutes[acc.Name]; ok {
20,106✔
2022
                // We want the account name to be in the map, but we don't
3,801✔
2023
                // need a value (we could store empty string)
3,801✔
2024
                s.accRouteByHash.Store(acc.Name, nil)
3,801✔
2025
                // Set the route pool index to -1 so that it is easy when
3,801✔
2026
                // ranging over accounts to exclude those accounts when
3,801✔
2027
                // trying to get accounts for a given pool index.
3,801✔
2028
                acc.routePoolIdx = accDedicatedRoute
3,801✔
2029
        } else {
16,305✔
2030
                // If pool size more than 1, we will compute a hash code and
12,504✔
2031
                // use modulo to assign to an index of the pool slice. For 1
12,504✔
2032
                // and below, all accounts will be bound to the single connection
12,504✔
2033
                // at index 0.
12,504✔
2034
                acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
12,504✔
2035
                if s.routesPoolSize > 1 {
17,545✔
2036
                        s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
5,041✔
2037
                }
5,041✔
2038
        }
2039
}
2040

2041
// lookupAccount is a function to return the account structure
2042
// associated with an account name.
2043
// Lock MUST NOT be held upon entry.
2044
func (s *Server) lookupAccount(name string) (*Account, error) {
2,793,650✔
2045
        return s.lookupOrFetchAccount(name, true)
2,793,650✔
2046
}
2,793,650✔
2047

2048
// lookupOrFetchAccount is a function to return the account structure
2049
// associated with an account name.
2050
// Lock MUST NOT be held upon entry.
2051
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
3,024,375✔
2052
        var acc *Account
3,024,375✔
2053
        if v, ok := s.accounts.Load(name); ok {
6,046,142✔
2054
                acc = v.(*Account)
3,021,767✔
2055
        }
3,021,767✔
2056
        if acc != nil {
6,046,142✔
2057
                // If we are expired and we have a resolver, then
3,021,767✔
2058
                // return the latest information from the resolver.
3,021,767✔
2059
                if acc.IsExpired() {
3,021,779✔
2060
                        s.Debugf("Requested account [%s] has expired", name)
12✔
2061
                        if s.AccountResolver() != nil && fetch {
24✔
2062
                                if err := s.updateAccount(acc); err != nil {
14✔
2063
                                        // This error could mask expired, so just return expired here.
2✔
2064
                                        return nil, ErrAccountExpired
2✔
2065
                                }
2✔
2066
                        } else {
×
2067
                                return nil, ErrAccountExpired
×
2068
                        }
×
2069
                }
2070
                return acc, nil
3,021,765✔
2071
        }
2072
        // If we have a resolver see if it can fetch the account.
2073
        if s.AccountResolver() == nil || !fetch {
3,605✔
2074
                return nil, ErrMissingAccount
997✔
2075
        }
997✔
2076
        return s.fetchAccount(name)
1,611✔
2077
}
2078

2079
// LookupAccount is a public function to return the account structure
2080
// associated with name.
2081
func (s *Server) LookupAccount(name string) (*Account, error) {
2,636,583✔
2082
        return s.lookupAccount(name)
2,636,583✔
2083
}
2,636,583✔
2084

2085
// This will fetch new claims and if found update the account with new claims.
2086
// Lock MUST NOT be held upon entry.
2087
func (s *Server) updateAccount(acc *Account) error {
12✔
2088
        acc.mu.RLock()
12✔
2089
        // TODO(dlc) - Make configurable
12✔
2090
        if !acc.incomplete && time.Since(acc.updated) < time.Second {
12✔
2091
                acc.mu.RUnlock()
×
2092
                s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
×
2093
                return ErrAccountResolverUpdateTooSoon
×
2094
        }
×
2095
        acc.mu.RUnlock()
12✔
2096
        claimJWT, err := s.fetchRawAccountClaims(acc.Name)
12✔
2097
        if err != nil {
14✔
2098
                return err
2✔
2099
        }
2✔
2100
        return s.updateAccountWithClaimJWT(acc, claimJWT)
10✔
2101
}
2102

2103
// updateAccountWithClaimJWT will check and apply the claim update.
2104
// Lock MUST NOT be held upon entry.
2105
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
568✔
2106
        if acc == nil {
568✔
2107
                return ErrMissingAccount
×
2108
        }
×
2109
        acc.mu.RLock()
568✔
2110
        sameClaim := acc.claimJWT != _EMPTY_ && acc.claimJWT == claimJWT && !acc.incomplete
568✔
2111
        acc.mu.RUnlock()
568✔
2112
        if sameClaim {
959✔
2113
                s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
391✔
2114
                return nil
391✔
2115
        }
391✔
2116
        accClaims, _, err := s.verifyAccountClaims(claimJWT)
177✔
2117
        if err == nil && accClaims != nil {
354✔
2118
                acc.mu.Lock()
177✔
2119
                // if an account is updated with a different operator signing key, we want to
177✔
2120
                // show a consistent issuer.
177✔
2121
                acc.Issuer = accClaims.Issuer
177✔
2122
                if acc.Name != accClaims.Subject {
178✔
2123
                        acc.mu.Unlock()
1✔
2124
                        return ErrAccountValidation
1✔
2125
                }
1✔
2126
                acc.mu.Unlock()
176✔
2127
                s.UpdateAccountClaims(acc, accClaims)
176✔
2128
                acc.mu.Lock()
176✔
2129
                // needs to be set after update completed.
176✔
2130
                // This causes concurrent calls to return with sameClaim=true if the change is effective.
176✔
2131
                acc.claimJWT = claimJWT
176✔
2132
                acc.mu.Unlock()
176✔
2133
                return nil
176✔
2134
        }
2135
        return err
×
2136
}
2137

2138
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
2139
// Lock is NOT held upon entry.
2140
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
1,747✔
2141
        accResolver := s.AccountResolver()
1,747✔
2142
        if accResolver == nil {
1,747✔
2143
                return _EMPTY_, ErrNoAccountResolver
×
2144
        }
×
2145
        // Need to do actual Fetch
2146
        start := time.Now()
1,747✔
2147
        claimJWT, err := fetchAccount(accResolver, name)
1,747✔
2148
        fetchTime := time.Since(start)
1,747✔
2149
        if fetchTime > time.Second {
1,751✔
2150
                s.Warnf("Account [%s] fetch took %v", name, fetchTime)
4✔
2151
        } else {
1,747✔
2152
                s.Debugf("Account [%s] fetch took %v", name, fetchTime)
1,743✔
2153
        }
1,743✔
2154
        if err != nil {
1,791✔
2155
                s.Warnf("Account fetch failed: %v", err)
44✔
2156
                return "", err
44✔
2157
        }
44✔
2158
        return claimJWT, nil
1,703✔
2159
}
2160

2161
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
2162
// Lock is NOT held upon entry.
2163
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
1,735✔
2164
        claimJWT, err := s.fetchRawAccountClaims(name)
1,735✔
2165
        if err != nil {
1,777✔
2166
                return nil, _EMPTY_, err
42✔
2167
        }
42✔
2168
        var claim *jwt.AccountClaims
1,693✔
2169
        claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
1,693✔
2170
        if claim != nil && claim.Subject != name {
1,693✔
2171
                return nil, _EMPTY_, ErrAccountValidation
×
2172
        }
×
2173
        return claim, claimJWT, err
1,693✔
2174
}
2175

2176
// verifyAccountClaims will decode and validate any account claims.
2177
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
2,302✔
2178
        accClaims, err := jwt.DecodeAccountClaims(claimJWT)
2,302✔
2179
        if err != nil {
2,303✔
2180
                return nil, _EMPTY_, err
1✔
2181
        }
1✔
2182
        if !s.isTrustedIssuer(accClaims.Issuer) {
2,308✔
2183
                return nil, _EMPTY_, ErrAccountValidation
7✔
2184
        }
7✔
2185
        vr := jwt.CreateValidationResults()
2,294✔
2186
        accClaims.Validate(vr)
2,294✔
2187
        if vr.IsBlocking(true) {
2,300✔
2188
                return nil, _EMPTY_, ErrAccountValidation
6✔
2189
        }
6✔
2190
        return accClaims, claimJWT, nil
2,288✔
2191
}
2192

2193
// This will fetch an account from a resolver if defined.
2194
// Lock is NOT held upon entry.
2195
func (s *Server) fetchAccount(name string) (*Account, error) {
1,714✔
2196
        accClaims, claimJWT, err := s.fetchAccountClaims(name)
1,714✔
2197
        if accClaims == nil {
1,767✔
2198
                return nil, err
53✔
2199
        }
53✔
2200
        acc := s.buildInternalAccount(accClaims)
1,661✔
2201
        // Due to possible race, if registerAccount() returns a non
1,661✔
2202
        // nil account, it means the same account was already
1,661✔
2203
        // registered and we should use this one.
1,661✔
2204
        if racc := s.registerAccount(acc); racc != nil {
2,035✔
2205
                // Update with the new claims in case they are new.
374✔
2206
                if err = s.updateAccountWithClaimJWT(racc, claimJWT); err != nil {
374✔
2207
                        return nil, err
×
2208
                }
×
2209
                return racc, nil
374✔
2210
        }
2211
        // The sub imports may have been setup but will not have had their
2212
        // subscriptions properly setup. Do that here.
2213
        var needImportSubs bool
1,287✔
2214

1,287✔
2215
        acc.mu.Lock()
1,287✔
2216
        acc.claimJWT = claimJWT
1,287✔
2217
        if len(acc.imports.services) > 0 {
2,206✔
2218
                if acc.ic == nil {
1,635✔
2219
                        acc.ic = s.createInternalAccountClient()
716✔
2220
                        acc.ic.acc = acc
716✔
2221
                }
716✔
2222
                needImportSubs = true
919✔
2223
        }
2224
        acc.mu.Unlock()
1,287✔
2225

1,287✔
2226
        // Do these outside the lock.
1,287✔
2227
        if needImportSubs {
2,206✔
2228
                acc.addAllServiceImportSubs()
919✔
2229
        }
919✔
2230

2231
        return acc, nil
1,287✔
2232
}
2233

2234
// Start up the server, this will not block.
2235
//
2236
// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed
2237
// after calling s.Shutdown()
2238
func (s *Server) Start() {
6,575✔
2239
        s.Noticef("Starting nats-server")
6,575✔
2240

6,575✔
2241
        gc := gitCommit
6,575✔
2242
        if gc == _EMPTY_ {
13,150✔
2243
                gc = "not set"
6,575✔
2244
        }
6,575✔
2245

2246
        // Snapshot server options.
2247
        opts := s.getOpts()
6,575✔
2248

6,575✔
2249
        // Capture if this server is a leaf that has no cluster, so we don't
6,575✔
2250
        // display the cluster name if that is the case.
6,575✔
2251
        s.mu.RLock()
6,575✔
2252
        leafNoCluster := s.leafNoCluster
6,575✔
2253
        s.mu.RUnlock()
6,575✔
2254

6,575✔
2255
        var clusterName string
6,575✔
2256
        if !leafNoCluster {
12,156✔
2257
                clusterName = s.ClusterName()
5,581✔
2258
        }
5,581✔
2259

2260
        s.Noticef("  Version:  %s", VERSION)
6,575✔
2261
        s.Noticef("  Git:      [%s]", gc)
6,575✔
2262
        s.Debugf("  Go build: %s", s.info.GoVersion)
6,575✔
2263
        if clusterName != _EMPTY_ {
11,281✔
2264
                s.Noticef("  Cluster:  %s", clusterName)
4,706✔
2265
        }
4,706✔
2266
        s.Noticef("  Name:     %s", s.info.Name)
6,575✔
2267
        if opts.JetStream {
10,844✔
2268
                s.Noticef("  Node:     %s", getHash(s.info.Name))
4,269✔
2269
        }
4,269✔
2270
        s.Noticef("  ID:       %s", s.info.ID)
6,575✔
2271
        s.printFeatureFlags(opts)
6,575✔
2272

6,575✔
2273
        defer s.Noticef("Server is ready")
6,575✔
2274

6,575✔
2275
        // Check for insecure configurations.
6,575✔
2276
        s.checkAuthforWarnings()
6,575✔
2277

6,575✔
2278
        // Avoid RACE between Start() and Shutdown()
6,575✔
2279
        s.running.Store(true)
6,575✔
2280
        s.mu.Lock()
6,575✔
2281
        // Update leafNodeEnabled in case options have changed post NewServer()
6,575✔
2282
        // and before Start() (we should not be able to allow that, but server has
6,575✔
2283
        // direct reference to user-provided options - at least before a Reload() is
6,575✔
2284
        // performed.
6,575✔
2285
        s.leafNodeEnabled = opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0
6,575✔
2286
        s.mu.Unlock()
6,575✔
2287

6,575✔
2288
        s.grMu.Lock()
6,575✔
2289
        s.grRunning = true
6,575✔
2290
        s.grMu.Unlock()
6,575✔
2291

6,575✔
2292
        s.startRateLimitLogExpiration()
6,575✔
2293

6,575✔
2294
        // Pprof http endpoint for the profiler.
6,575✔
2295
        if opts.ProfPort != 0 {
6,576✔
2296
                s.StartProfiler()
1✔
2297
        } else {
6,575✔
2298
                // It's still possible to access this profile via a SYS endpoint, so set
6,574✔
2299
                // this anyway. (Otherwise StartProfiler would have called it.)
6,574✔
2300
                s.setBlockProfileRate(opts.ProfBlockRate)
6,574✔
2301
        }
6,574✔
2302

2303
        if opts.ConfigFile != _EMPTY_ {
11,349✔
2304
                var cd string
4,774✔
2305
                if opts.configDigest != "" {
9,548✔
2306
                        cd = fmt.Sprintf("(%s)", opts.configDigest)
4,774✔
2307
                }
4,774✔
2308
                s.Noticef("Using configuration file: %s %s", opts.ConfigFile, cd)
4,774✔
2309
        }
2310

2311
        hasOperators := len(opts.TrustedOperators) > 0
6,575✔
2312
        if hasOperators {
6,874✔
2313
                s.Noticef("Trusted Operators")
299✔
2314
        }
299✔
2315
        for _, opc := range opts.TrustedOperators {
6,874✔
2316
                s.Noticef("  System  : %q", opc.Audience)
299✔
2317
                s.Noticef("  Operator: %q", opc.Name)
299✔
2318
                s.Noticef("  Issued  : %v", time.Unix(opc.IssuedAt, 0))
299✔
2319
                switch opc.Expires {
299✔
2320
                case 0:
12✔
2321
                        s.Noticef("  Expires : Never")
12✔
2322
                default:
287✔
2323
                        s.Noticef("  Expires : %v", time.Unix(opc.Expires, 0))
287✔
2324
                }
2325
        }
2326
        if hasOperators && opts.SystemAccount == _EMPTY_ {
6,613✔
2327
                s.Warnf("Trusted Operators should utilize a System Account")
38✔
2328
        }
38✔
2329
        if opts.MaxPayload > MAX_PAYLOAD_MAX_SIZE {
6,577✔
2330
                s.Warnf("Maximum payloads over %v are generally discouraged and could lead to poor performance",
2✔
2331
                        friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE)))
2✔
2332
        }
2✔
2333

2334
        if len(opts.JsAccDefaultDomain) > 0 {
6,595✔
2335
                s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed")
20✔
2336
        }
20✔
2337

2338
        // If we have a memory resolver, check the accounts here for validation exceptions.
2339
        // This allows them to be logged right away vs when they are accessed via a client.
2340
        if hasOperators && len(opts.resolverPreloads) > 0 {
6,740✔
2341
                s.checkResolvePreloads()
165✔
2342
        }
165✔
2343

2344
        // Log the pid to a file.
2345
        if opts.PidFile != _EMPTY_ {
6,576✔
2346
                if err := s.logPid(); err != nil {
1✔
2347
                        s.Fatalf("Could not write pidfile: %v", err)
×
2348
                        return
×
2349
                }
×
2350
        }
2351

2352
        // Setup system account which will start the eventing stack.
2353
        if sa := opts.SystemAccount; sa != _EMPTY_ {
10,450✔
2354
                if err := s.SetSystemAccount(sa); err != nil {
3,875✔
2355
                        s.Fatalf("Can't set system account: %v", err)
×
2356
                        return
×
2357
                }
×
2358
        } else if !opts.NoSystemAccount {
4,777✔
2359
                // We will create a default system account here.
2,077✔
2360
                s.SetDefaultSystemAccount()
2,077✔
2361
        }
2,077✔
2362

2363
        // Start monitoring before enabling other subsystems of the
2364
        // server to be able to monitor during startup.
2365
        if err := s.StartMonitoring(); err != nil {
6,576✔
2366
                s.Fatalf("Can't start monitoring: %v", err)
1✔
2367
                return
1✔
2368
        }
1✔
2369

2370
        // Start up resolver machinery.
2371
        if ar := s.AccountResolver(); ar != nil {
6,930✔
2372
                if err := ar.Start(s); err != nil {
356✔
2373
                        s.Fatalf("Could not start resolver: %v", err)
×
2374
                        return
×
2375
                }
×
2376
                // In operator mode, when the account resolver depends on an external system and
2377
                // the system account is the bootstrapping account, start fetching it.
2378
                if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
617✔
2379
                        opts := s.getOpts()
261✔
2380
                        _, isMemResolver := ar.(*MemAccResolver)
261✔
2381
                        if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
325✔
2382
                                s.Noticef("Using bootstrapping system account")
64✔
2383
                                s.startGoRoutine(func() {
128✔
2384
                                        defer s.grWG.Done()
64✔
2385
                                        t := time.NewTicker(time.Second)
64✔
2386
                                        defer t.Stop()
64✔
2387
                                        for {
177✔
2388
                                                select {
113✔
2389
                                                case <-s.quitCh:
41✔
2390
                                                        return
41✔
2391
                                                case <-t.C:
72✔
2392
                                                        sacc := s.SystemAccount()
72✔
2393
                                                        if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil {
121✔
2394
                                                                continue
49✔
2395
                                                        } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil {
23✔
2396
                                                                continue
×
2397
                                                        }
2398
                                                        s.Noticef("System account fetched and updated")
23✔
2399
                                                        return
23✔
2400
                                                }
2401
                                        }
2402
                                })
2403
                        }
2404
                }
2405
        }
2406

2407
        // Start expiration of mapped GW replies, regardless if
2408
        // this server is configured with gateway or not.
2409
        s.startGWReplyMapExpiration()
6,574✔
2410

6,574✔
2411
        // Check if JetStream has been enabled. This needs to be after
6,574✔
2412
        // the system account setup above. JetStream will create its
6,574✔
2413
        // own system account if one is not present.
6,574✔
2414
        if opts.JetStream {
10,843✔
2415
                // Make sure someone is not trying to enable on the system account.
4,269✔
2416
                if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 {
4,269✔
2417
                        s.Fatalf("Not allowed to enable JetStream on the system account")
×
2418
                }
×
2419
                cfg := &JetStreamConfig{
4,269✔
2420
                        StoreDir:     opts.StoreDir,
4,269✔
2421
                        SyncInterval: opts.SyncInterval,
4,269✔
2422
                        SyncAlways:   opts.SyncAlways,
4,269✔
2423
                        Strict:       !opts.NoJetStreamStrict,
4,269✔
2424
                        MaxMemory:    opts.JetStreamMaxMemory,
4,269✔
2425
                        MaxStore:     opts.JetStreamMaxStore,
4,269✔
2426
                        Domain:       opts.JetStreamDomain,
4,269✔
2427
                        CompressOK:   true,
4,269✔
2428
                        UniqueTag:    opts.JetStreamUniqueTag,
4,269✔
2429
                }
4,269✔
2430
                if err := s.EnableJetStream(cfg); err != nil {
4,271✔
2431
                        s.Fatalf("Can't start JetStream: %v", err)
2✔
2432
                        return
2✔
2433
                }
2✔
2434
        } else {
2,305✔
2435
                // Check to see if any configured accounts have JetStream enabled.
2,305✔
2436
                sa, ga := s.SystemAccount(), s.GlobalAccount()
2,305✔
2437
                var hasSys, hasGlobal bool
2,305✔
2438
                var total int
2,305✔
2439

2,305✔
2440
                s.accounts.Range(func(k, v any) bool {
7,473✔
2441
                        total++
5,168✔
2442
                        acc := v.(*Account)
5,168✔
2443
                        if acc == sa {
6,780✔
2444
                                hasSys = true
1,612✔
2445
                        } else if acc == ga {
7,473✔
2446
                                hasGlobal = true
2,305✔
2447
                        }
2,305✔
2448
                        acc.mu.RLock()
5,168✔
2449
                        hasJs := len(acc.jsLimits) > 0
5,168✔
2450
                        acc.mu.RUnlock()
5,168✔
2451
                        if hasJs {
5,193✔
2452
                                s.checkJetStreamExports()
25✔
2453
                                acc.enableAllJetStreamServiceImportsAndMappings()
25✔
2454
                        }
25✔
2455
                        return true
5,168✔
2456
                })
2457
                // If we only have the system account and the global account and we are not standalone,
2458
                // go ahead and enable JS on $G in case we are in simple mixed mode setup.
2459
                if total == 2 && hasSys && hasGlobal && !s.standAloneMode() {
2,821✔
2460
                        ga.mu.Lock()
516✔
2461
                        ga.jsLimits = map[string]JetStreamAccountLimits{
516✔
2462
                                _EMPTY_: dynamicJSAccountLimits,
516✔
2463
                        }
516✔
2464
                        ga.mu.Unlock()
516✔
2465
                        s.checkJetStreamExports()
516✔
2466
                        ga.enableAllJetStreamServiceImportsAndMappings()
516✔
2467
                }
516✔
2468
        }
2469

2470
        // Delayed API response handling. Start regardless of JetStream being
2471
        // currently configured or not (since it can be enabled/disabled with
2472
        // configuration reload).
2473
        s.startGoRoutine(s.delayedAPIResponder)
6,572✔
2474

6,572✔
2475
        // Start OCSP Stapling monitoring for TLS certificates if enabled. Hook TLS handshake for
6,572✔
2476
        // OCSP check on peers (LEAF and CLIENT kind) if enabled.
6,572✔
2477
        s.startOCSPMonitoring()
6,572✔
2478

6,572✔
2479
        // Configure OCSP Response Cache for peer OCSP checks if enabled.
6,572✔
2480
        s.initOCSPResponseCache()
6,572✔
2481

6,572✔
2482
        // Start up gateway if needed. Do this before starting the routes, because
6,572✔
2483
        // we want to resolve the gateway host:port so that this information can
6,572✔
2484
        // be sent to other routes.
6,572✔
2485
        if opts.Gateway.Port != 0 {
7,695✔
2486
                s.startGateways()
1,123✔
2487
        }
1,123✔
2488

2489
        // Start websocket server if needed. Do this before starting the routes, and
2490
        // leaf node because we want to resolve the gateway host:port so that this
2491
        // information can be sent to other routes.
2492
        if opts.Websocket.Port != 0 {
6,689✔
2493
                s.startWebsocketServer()
117✔
2494
        }
117✔
2495

2496
        // Start up listen if we want to accept leaf node connections.
2497
        if opts.LeafNode.Port != 0 {
10,471✔
2498
                // Will resolve or assign the advertise address for the leafnode listener.
3,899✔
2499
                // We need that in StartRouting().
3,899✔
2500
                s.startLeafNodeAcceptLoop()
3,899✔
2501
        }
3,899✔
2502

2503
        // Solicit remote servers for leaf node connections.
2504
        if len(opts.LeafNode.Remotes) > 0 {
7,821✔
2505
                s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
1,249✔
2506
        }
1,249✔
2507

2508
        // TODO (ik): I wanted to refactor this by starting the client
2509
        // accept loop first, that is, it would resolve listen spec
2510
        // in place, but start the accept-for-loop in a different go
2511
        // routine. This would get rid of the synchronization between
2512
        // this function and StartRouting, which I also would have wanted
2513
        // to refactor, but both AcceptLoop() and StartRouting() have
2514
        // been exported and not sure if that would break users using them.
2515
        // We could mark them as deprecated and remove in a release or two...
2516

2517
        // The Routing routine needs to wait for the client listen
2518
        // port to be opened and potential ephemeral port selected.
2519
        clientListenReady := make(chan struct{})
6,572✔
2520

6,572✔
2521
        // MQTT
6,572✔
2522
        if opts.MQTT.Port != 0 {
6,717✔
2523
                s.startMQTT()
145✔
2524
        }
145✔
2525

2526
        // Start up routing as well if needed.
2527
        if opts.Cluster.Port != 0 {
11,124✔
2528
                s.startGoRoutine(func() {
9,104✔
2529
                        s.StartRouting(clientListenReady)
4,552✔
2530
                })
4,552✔
2531
        }
2532

2533
        if opts.PortsFileDir != _EMPTY_ {
6,574✔
2534
                s.logPorts()
2✔
2535
        }
2✔
2536

2537
        if opts.TLSRateLimit > 0 {
6,573✔
2538
                s.startGoRoutine(s.logRejectedTLSConns)
1✔
2539
        }
1✔
2540

2541
        // We've finished starting up.
2542
        close(s.startupComplete)
6,572✔
2543

6,572✔
2544
        // Wait for clients.
6,572✔
2545
        if !opts.DontListen {
13,144✔
2546
                s.AcceptLoop(clientListenReady)
6,572✔
2547
        }
6,572✔
2548

2549
        // Bring OSCP Response cache online after accept loop started in anticipation of NATS-enabled cache types
2550
        s.startOCSPResponseCache()
6,572✔
2551
}
2552

2553
func (s *Server) isShuttingDown() bool {
517,734✔
2554
        return s.shutdown.Load()
517,734✔
2555
}
517,734✔
2556

2557
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
2558
// and closing all associated clients.
2559
func (s *Server) Shutdown() {
7,069✔
2560
        if s == nil {
7,073✔
2561
                return
4✔
2562
        }
4✔
2563
        // This is for JetStream R1 Pull Consumers to allow signaling
2564
        // that pending pull requests are invalid.
2565
        s.signalPullConsumers()
7,065✔
2566

7,065✔
2567
        // Transfer off any raft nodes that we are a leader by stepping them down.
7,065✔
2568
        s.stepdownRaftNodes()
7,065✔
2569

7,065✔
2570
        // Shutdown the eventing system as needed.
7,065✔
2571
        // This is done first to send out any messages for
7,065✔
2572
        // account status. We will also clean up any
7,065✔
2573
        // eventing items associated with accounts.
7,065✔
2574
        s.shutdownEventing()
7,065✔
2575

7,065✔
2576
        // Prevent issues with multiple calls.
7,065✔
2577
        if s.isShuttingDown() {
7,499✔
2578
                return
434✔
2579
        }
434✔
2580

2581
        s.mu.Lock()
6,631✔
2582
        s.Noticef("Initiating Shutdown...")
6,631✔
2583

6,631✔
2584
        accRes := s.accResolver
6,631✔
2585

6,631✔
2586
        opts := s.getOpts()
6,631✔
2587

6,631✔
2588
        s.shutdown.Store(true)
6,631✔
2589
        s.running.Store(false)
6,631✔
2590
        s.grMu.Lock()
6,631✔
2591
        s.grRunning = false
6,631✔
2592
        s.grMu.Unlock()
6,631✔
2593
        s.mu.Unlock()
6,631✔
2594

6,631✔
2595
        if accRes != nil {
7,039✔
2596
                accRes.Close()
408✔
2597
        }
408✔
2598

2599
        // Now check and shutdown jetstream.
2600
        s.shutdownJetStream()
6,631✔
2601

6,631✔
2602
        // Now shutdown the nodes
6,631✔
2603
        s.shutdownRaftNodes()
6,631✔
2604

6,631✔
2605
        s.mu.Lock()
6,631✔
2606
        conns := make(map[uint64]*client)
6,631✔
2607

6,631✔
2608
        // Copy off the clients
6,631✔
2609
        for i, c := range s.clients {
7,936✔
2610
                conns[i] = c
1,305✔
2611
        }
1,305✔
2612
        // Copy off the connections that are not yet registered
2613
        // in s.routes, but for which the readLoop has started
2614
        s.grMu.Lock()
6,631✔
2615
        for i, c := range s.grTmpClients {
6,653✔
2616
                conns[i] = c
22✔
2617
        }
22✔
2618
        s.grMu.Unlock()
6,631✔
2619
        // Copy off the routes
6,631✔
2620
        s.forEachRoute(func(r *client) {
23,576✔
2621
                r.mu.Lock()
16,945✔
2622
                conns[r.cid] = r
16,945✔
2623
                r.mu.Unlock()
16,945✔
2624
        })
16,945✔
2625
        // Copy off the gateways
2626
        s.getAllGatewayConnections(conns)
6,631✔
2627

6,631✔
2628
        // Copy off the leaf nodes
6,631✔
2629
        for i, c := range s.leafs {
7,276✔
2630
                conns[i] = c
645✔
2631
        }
645✔
2632

2633
        // Number of done channel responses we expect.
2634
        doneExpected := 0
6,631✔
2635

6,631✔
2636
        // Kick client AcceptLoop()
6,631✔
2637
        if s.listener != nil {
13,186✔
2638
                doneExpected++
6,555✔
2639
                s.listener.Close()
6,555✔
2640
                s.listener = nil
6,555✔
2641
        }
6,555✔
2642

2643
        // Kick websocket server
2644
        doneExpected += s.closeWebsocketServer()
6,631✔
2645

6,631✔
2646
        // Kick MQTT accept loop
6,631✔
2647
        if s.mqtt.listener != nil {
6,768✔
2648
                doneExpected++
137✔
2649
                s.mqtt.listener.Close()
137✔
2650
                s.mqtt.listener = nil
137✔
2651
        }
137✔
2652

2653
        // Kick leafnodes AcceptLoop()
2654
        if s.leafNodeListener != nil {
10,527✔
2655
                doneExpected++
3,896✔
2656
                s.leafNodeListener.Close()
3,896✔
2657
                s.leafNodeListener = nil
3,896✔
2658
        }
3,896✔
2659

2660
        // Kick route AcceptLoop()
2661
        if s.routeListener != nil {
11,173✔
2662
                doneExpected++
4,542✔
2663
                s.routeListener.Close()
4,542✔
2664
                s.routeListener = nil
4,542✔
2665
        }
4,542✔
2666

2667
        // Kick Gateway AcceptLoop()
2668
        if s.gatewayListener != nil {
7,751✔
2669
                doneExpected++
1,120✔
2670
                s.gatewayListener.Close()
1,120✔
2671
                s.gatewayListener = nil
1,120✔
2672
        }
1,120✔
2673

2674
        // Kick HTTP monitoring if its running
2675
        if s.http != nil {
7,605✔
2676
                doneExpected++
974✔
2677
                s.http.Close()
974✔
2678
                s.http = nil
974✔
2679
        }
974✔
2680

2681
        // Kick Profiling if its running
2682
        if s.profiler != nil {
6,633✔
2683
                doneExpected++
2✔
2684
                s.profiler.Close()
2✔
2685
        }
2✔
2686

2687
        s.mu.Unlock()
6,631✔
2688

6,631✔
2689
        // Release go routines that wait on that channel
6,631✔
2690
        close(s.quitCh)
6,631✔
2691

6,631✔
2692
        // Close client and route connections
6,631✔
2693
        for _, c := range conns {
27,185✔
2694
                c.setNoReconnect()
20,554✔
2695
                c.closeConnection(ServerShutdown)
20,554✔
2696
        }
20,554✔
2697

2698
        // Block until the accept loops exit
2699
        for doneExpected > 0 {
23,974✔
2700
                <-s.done
17,343✔
2701
                doneExpected--
17,343✔
2702
        }
17,343✔
2703

2704
        // Wait for go routines to be done.
2705
        s.grWG.Wait()
6,631✔
2706

6,631✔
2707
        if opts.PortsFileDir != _EMPTY_ {
6,633✔
2708
                s.deletePortsFile(opts.PortsFileDir)
2✔
2709
        }
2✔
2710

2711
        s.Noticef("Server Exiting..")
6,631✔
2712

6,631✔
2713
        // Stop OCSP Response Cache
6,631✔
2714
        if s.ocsprc != nil {
6,681✔
2715
                s.ocsprc.Stop(s)
50✔
2716
        }
50✔
2717

2718
        // Close logger if applicable. It allows tests on Windows
2719
        // to be able to do proper cleanup (delete log file).
2720
        s.logging.RLock()
6,631✔
2721
        log := s.logging.logger
6,631✔
2722
        s.logging.RUnlock()
6,631✔
2723
        if log != nil {
7,069✔
2724
                if l, ok := log.(*logger.Logger); ok {
448✔
2725
                        l.Close()
10✔
2726
                }
10✔
2727
        }
2728
        // Notify that the shutdown is complete
2729
        close(s.shutdownComplete)
6,631✔
2730
}
2731

2732
// Close the websocket server if running. If so, returns 1, else 0.
2733
// Server lock held on entry.
2734
func (s *Server) closeWebsocketServer() int {
6,637✔
2735
        ws := &s.websocket
6,637✔
2736
        ws.mu.Lock()
6,637✔
2737
        hs := ws.server
6,637✔
2738
        if hs != nil {
6,754✔
2739
                ws.server = nil
117✔
2740
                ws.listener = nil
117✔
2741
        }
117✔
2742
        ws.mu.Unlock()
6,637✔
2743
        if hs != nil {
6,754✔
2744
                hs.Close()
117✔
2745
                return 1
117✔
2746
        }
117✔
2747
        return 0
6,520✔
2748
}
2749

2750
// WaitForShutdown will block until the server has been fully shutdown.
2751
func (s *Server) WaitForShutdown() {
3,321✔
2752
        <-s.shutdownComplete
3,321✔
2753
}
3,321✔
2754

2755
// AcceptLoop is exported for easier testing.
2756
func (s *Server) AcceptLoop(clr chan struct{}) {
6,572✔
2757
        // If we were to exit before the listener is setup properly,
6,572✔
2758
        // make sure we close the channel.
6,572✔
2759
        defer func() {
13,144✔
2760
                if clr != nil {
6,573✔
2761
                        close(clr)
1✔
2762
                }
1✔
2763
        }()
2764

2765
        if s.isShuttingDown() {
6,573✔
2766
                return
1✔
2767
        }
1✔
2768

2769
        // Snapshot server options.
2770
        opts := s.getOpts()
6,571✔
2771

6,571✔
2772
        // Setup state that can enable shutdown
6,571✔
2773
        s.mu.Lock()
6,571✔
2774
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
6,571✔
2775
        l, e := s.getServerListener(hp)
6,571✔
2776
        s.listenerErr = e
6,571✔
2777
        if e != nil {
6,571✔
2778
                s.mu.Unlock()
×
2779
                s.Fatalf("Error listening on port: %s, %q", hp, e)
×
2780
                return
×
2781
        }
×
2782
        s.Noticef("Listening for client connections on %s",
6,571✔
2783
                net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
6,571✔
2784

6,571✔
2785
        // Alert if PROXY protocol is enabled
6,571✔
2786
        if opts.ProxyProtocol {
6,574✔
2787
                s.Noticef("PROXY protocol enabled for client connections")
3✔
2788
        }
3✔
2789

2790
        // Alert of TLS enabled.
2791
        if opts.TLSConfig != nil {
6,729✔
2792
                s.Noticef("TLS required for client connections")
158✔
2793
                if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
164✔
2794
                        s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
6✔
2795
                }
6✔
2796
        }
2797

2798
        // If server was started with RANDOM_PORT (-1), opts.Port would be equal
2799
        // to 0 at the beginning this function. So we need to get the actual port
2800
        if opts.Port == 0 {
12,647✔
2801
                // Write resolved port back to options.
6,076✔
2802
                opts.Port = l.Addr().(*net.TCPAddr).Port
6,076✔
2803
        }
6,076✔
2804

2805
        // Now that port has been set (if it was set to RANDOM), set the
2806
        // server's info Host/Port with either values from Options or
2807
        // ClientAdvertise.
2808
        if err := s.setInfoHostPort(); err != nil {
6,571✔
2809
                s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
×
2810
                l.Close()
×
2811
                s.mu.Unlock()
×
2812
                return
×
2813
        }
×
2814
        // Keep track of client connect URLs. We may need them later.
2815
        s.clientConnectURLs = s.getClientConnectURLs()
6,571✔
2816
        s.listener = l
6,571✔
2817

6,571✔
2818
        go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
16,128✔
2819
                func(_ error) bool {
6,561✔
2820
                        if s.isLameDuckMode() {
6,567✔
2821
                                // Signal that we are not accepting new clients
6✔
2822
                                s.ldmCh <- true
6✔
2823
                                // Now wait for the Shutdown...
6✔
2824
                                <-s.quitCh
6✔
2825
                                return true
6✔
2826
                        }
6✔
2827
                        return false
6,555✔
2828
                })
2829
        s.mu.Unlock()
6,571✔
2830

6,571✔
2831
        // Let the caller know that we are ready
6,571✔
2832
        close(clr)
6,571✔
2833
        clr = nil
6,571✔
2834
}
2835

2836
// getServerListener returns a network listener for the given host-port address.
2837
// If the Server already has an active listener (s.listener), it returns that listener
2838
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
2839
// a new TCP listener on the specified address using natsListen.
2840
func (s *Server) getServerListener(hp string) (net.Listener, error) {
6,571✔
2841
        if s.listener != nil {
6,571✔
2842
                return s.listener, s.listenerErr
×
2843
        }
×
2844

2845
        return natsListen("tcp", hp)
6,571✔
2846
}
2847

2848
// InProcessConn returns an in-process connection to the server,
2849
// avoiding the need to use a TCP listener for local connectivity
2850
// within the same process. This can be used regardless of the
2851
// state of the DontListen option.
2852
func (s *Server) InProcessConn() (net.Conn, error) {
8✔
2853
        pl, pr := net.Pipe()
8✔
2854
        if !s.startGoRoutine(func() {
16✔
2855
                s.createClientInProcess(pl)
8✔
2856
                s.grWG.Done()
8✔
2857
        }) {
8✔
2858
                pl.Close()
×
2859
                pr.Close()
×
2860
                return nil, fmt.Errorf("failed to create connection")
×
2861
        }
×
2862
        return pr, nil
8✔
2863
}
2864

2865
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
16,283✔
2866
        tmpDelay := ACCEPT_MIN_SLEEP
16,283✔
2867

16,283✔
2868
        for {
76,972✔
2869
                conn, err := l.Accept()
60,689✔
2870
                if err != nil {
76,945✔
2871
                        if errFunc != nil && errFunc(err) {
16,262✔
2872
                                return
6✔
2873
                        }
6✔
2874
                        if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
32,500✔
2875
                                break
16,250✔
2876
                        }
2877
                        continue
×
2878
                }
2879
                tmpDelay = ACCEPT_MIN_SLEEP
44,406✔
2880
                if !s.startGoRoutine(func() {
88,804✔
2881
                        s.reloadMu.RLock()
44,398✔
2882
                        createFunc(conn)
44,398✔
2883
                        s.reloadMu.RUnlock()
44,398✔
2884
                        s.grWG.Done()
44,398✔
2885
                }) {
44,406✔
2886
                        conn.Close()
8✔
2887
                }
8✔
2888
        }
2889
        s.Debugf(acceptName + " accept loop exiting..")
16,250✔
2890
        s.done <- true
16,250✔
2891
}
2892

2893
// This function sets the server's info Host/Port based on server Options.
2894
// Note that this function may be called during config reload, this is why
2895
// Host/Port may be reset to original Options if the ClientAdvertise option
2896
// is not set (since it may have previously been).
2897
func (s *Server) setInfoHostPort() error {
13,274✔
2898
        // When this function is called, opts.Port is set to the actual listen
13,274✔
2899
        // port (if option was originally set to RANDOM), even during a config
13,274✔
2900
        // reload. So use of s.opts.Port is safe.
13,274✔
2901
        opts := s.getOpts()
13,274✔
2902
        if opts.ClientAdvertise != _EMPTY_ {
13,276✔
2903
                h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
2✔
2904
                if err != nil {
2✔
2905
                        return err
×
2906
                }
×
2907
                s.info.Host = h
2✔
2908
                s.info.Port = p
2✔
2909
        } else {
13,272✔
2910
                s.info.Host = opts.Host
13,272✔
2911
                s.info.Port = opts.Port
13,272✔
2912
        }
13,272✔
2913
        return nil
13,274✔
2914
}
2915

2916
// StartProfiler is called to enable dynamic profiling.
2917
func (s *Server) StartProfiler() {
2✔
2918
        if s.isShuttingDown() {
2✔
2919
                return
×
2920
        }
×
2921

2922
        // Snapshot server options.
2923
        opts := s.getOpts()
2✔
2924

2✔
2925
        port := opts.ProfPort
2✔
2926

2✔
2927
        // Check for Random Port
2✔
2928
        if port == -1 {
3✔
2929
                port = 0
1✔
2930
        }
1✔
2931

2932
        s.mu.Lock()
2✔
2933
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
2✔
2934
        l, err := net.Listen("tcp", hp)
2✔
2935

2✔
2936
        if err != nil {
2✔
2937
                s.mu.Unlock()
×
2938
                s.Fatalf("error starting profiler: %s", err)
×
2939
                return
×
2940
        }
×
2941
        s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
2✔
2942

2✔
2943
        srv := &http.Server{
2✔
2944
                Addr:           hp,
2✔
2945
                Handler:        http.DefaultServeMux,
2✔
2946
                MaxHeaderBytes: 1 << 20,
2✔
2947
                ReadTimeout:    time.Second * 5,
2✔
2948
        }
2✔
2949
        s.profiler = l
2✔
2950
        s.profilingServer = srv
2✔
2951

2✔
2952
        s.setBlockProfileRate(opts.ProfBlockRate)
2✔
2953

2✔
2954
        go func() {
4✔
2955
                // if this errors out, it's probably because the server is being shutdown
2✔
2956
                err := srv.Serve(l)
2✔
2957
                if err != nil {
4✔
2958
                        if !s.isShuttingDown() {
2✔
2959
                                s.Fatalf("error starting profiler: %s", err)
×
2960
                        }
×
2961
                }
2962
                srv.Close()
2✔
2963
                s.done <- true
2✔
2964
        }()
2965
        s.mu.Unlock()
2✔
2966
}
2967

2968
func (s *Server) setBlockProfileRate(rate int) {
6,576✔
2969
        // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
6,576✔
2970
        runtime.SetBlockProfileRate(rate)
6,576✔
2971

6,576✔
2972
        if rate > 0 {
6,576✔
2973
                s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
×
2974
        }
×
2975
}
2976

2977
// StartHTTPMonitoring will enable the HTTP monitoring port.
2978
// DEPRECATED: Should use StartMonitoring.
2979
func (s *Server) StartHTTPMonitoring() {
×
2980
        s.startMonitoring(false)
×
2981
}
×
2982

2983
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
2984
// DEPRECATED: Should use StartMonitoring.
2985
func (s *Server) StartHTTPSMonitoring() {
×
2986
        s.startMonitoring(true)
×
2987
}
×
2988

2989
// StartMonitoring starts the HTTP or HTTPs server if needed.
2990
func (s *Server) StartMonitoring() error {
6,579✔
2991
        // Snapshot server options.
6,579✔
2992
        opts := s.getOpts()
6,579✔
2993

6,579✔
2994
        // Specifying both HTTP and HTTPS ports is a misconfiguration
6,579✔
2995
        if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
6,580✔
2996
                return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
1✔
2997
        }
1✔
2998
        var err error
6,578✔
2999
        if opts.HTTPPort != 0 {
7,541✔
3000
                err = s.startMonitoring(false)
963✔
3001
        } else if opts.HTTPSPort != 0 {
6,593✔
3002
                if opts.TLSConfig == nil {
17✔
3003
                        return fmt.Errorf("TLS cert and key required for HTTPS")
2✔
3004
                }
2✔
3005
                err = s.startMonitoring(true)
13✔
3006
        }
3007
        return err
6,576✔
3008
}
3009

3010
// HTTP endpoints
3011
const (
3012
        RootPath         = "/"
3013
        VarzPath         = "/varz"
3014
        ConnzPath        = "/connz"
3015
        RoutezPath       = "/routez"
3016
        GatewayzPath     = "/gatewayz"
3017
        LeafzPath        = "/leafz"
3018
        SubszPath        = "/subsz"
3019
        StackszPath      = "/stacksz"
3020
        AccountzPath     = "/accountz"
3021
        AccountStatzPath = "/accstatz"
3022
        JszPath          = "/jsz"
3023
        HealthzPath      = "/healthz"
3024
        IPQueuesPath     = "/ipqueuesz"
3025
        RaftzPath        = "/raftz"
3026
        ExpvarzPath      = "/debug/vars"
3027
)
3028

3029
func (s *Server) basePath(p string) string {
15,632✔
3030
        return path.Join(s.httpBasePath, p)
15,632✔
3031
}
15,632✔
3032

3033
type captureHTTPServerLog struct {
3034
        s      *Server
3035
        prefix string
3036
}
3037

3038
func (cl *captureHTTPServerLog) Write(p []byte) (int, error) {
3✔
3039
        var buf [128]byte
3✔
3040
        var b = buf[:0]
3✔
3041

3✔
3042
        b = append(b, []byte(cl.prefix)...)
3✔
3043
        offset := 0
3✔
3044
        if bytes.HasPrefix(p, []byte("http:")) {
6✔
3045
                offset = 6
3✔
3046
        }
3✔
3047
        b = append(b, p[offset:]...)
3✔
3048
        cl.s.Errorf(string(b))
3✔
3049
        return len(p), nil
3✔
3050
}
3051

3052
// The TLS configuration is passed to the listener when the monitoring
3053
// "server" is setup. That prevents TLS configuration updates on reload
3054
// from being used. By setting this function in tls.Config.GetConfigForClient
3055
// we instruct the TLS handshake to ask for the tls configuration to be
3056
// used for a specific client. We don't care which client, we always use
3057
// the same TLS configuration.
3058
func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, error) {
4✔
3059
        opts := s.getOpts()
4✔
3060
        tc := opts.TLSConfig.Clone()
4✔
3061
        tc.ClientAuth = tls.NoClientCert
4✔
3062
        return tc, nil
4✔
3063
}
4✔
3064

3065
// Start the monitoring server
3066
func (s *Server) startMonitoring(secure bool) error {
976✔
3067
        if s.isShuttingDown() {
977✔
3068
                return nil
1✔
3069
        }
1✔
3070

3071
        // Snapshot server options.
3072
        opts := s.getOpts()
975✔
3073

975✔
3074
        var (
975✔
3075
                hp           string
975✔
3076
                err          error
975✔
3077
                httpListener net.Listener
975✔
3078
                port         int
975✔
3079
        )
975✔
3080

975✔
3081
        monitorProtocol := "http"
975✔
3082

975✔
3083
        if secure {
988✔
3084
                monitorProtocol += "s"
13✔
3085
                port = opts.HTTPSPort
13✔
3086
                if port == -1 {
17✔
3087
                        port = 0
4✔
3088
                }
4✔
3089
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
13✔
3090
                config := opts.TLSConfig.Clone()
13✔
3091
                if !s.ocspPeerVerify {
25✔
3092
                        config.GetConfigForClient = s.getMonitoringTLSConfig
12✔
3093
                        config.ClientAuth = tls.NoClientCert
12✔
3094
                }
12✔
3095
                httpListener, err = tls.Listen("tcp", hp, config)
13✔
3096

3097
        } else {
962✔
3098
                port = opts.HTTPPort
962✔
3099
                if port == -1 {
1,728✔
3100
                        port = 0
766✔
3101
                }
766✔
3102
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
962✔
3103
                httpListener, err = net.Listen("tcp", hp)
962✔
3104
        }
3105

3106
        if err != nil {
976✔
3107
                return fmt.Errorf("can't listen to the monitor port: %v", err)
1✔
3108
        }
1✔
3109

3110
        rport := httpListener.Addr().(*net.TCPAddr).Port
974✔
3111
        s.Noticef("Starting %s monitor on %s", monitorProtocol, net.JoinHostPort(opts.HTTPHost, strconv.Itoa(rport)))
974✔
3112

974✔
3113
        mux := http.NewServeMux()
974✔
3114

974✔
3115
        // Root
974✔
3116
        mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
974✔
3117
        // Varz
974✔
3118
        mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
974✔
3119
        // Connz
974✔
3120
        mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
974✔
3121
        // Routez
974✔
3122
        mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
974✔
3123
        // Gatewayz
974✔
3124
        mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
974✔
3125
        // Leafz
974✔
3126
        mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
974✔
3127
        // Subz
974✔
3128
        mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
974✔
3129
        // Subz alias for backwards compatibility
974✔
3130
        mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
974✔
3131
        // Stacksz
974✔
3132
        mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
974✔
3133
        // Accountz
974✔
3134
        mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
974✔
3135
        // Accstatz
974✔
3136
        mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz)
974✔
3137
        // Jsz
974✔
3138
        mux.HandleFunc(s.basePath(JszPath), s.HandleJsz)
974✔
3139
        // Healthz
974✔
3140
        mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz)
974✔
3141
        // IPQueuesz
974✔
3142
        mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz)
974✔
3143
        // Raftz
974✔
3144
        mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz)
974✔
3145
        // Expvarz
974✔
3146
        mux.Handle(s.basePath(ExpvarzPath), expvar.Handler())
974✔
3147

974✔
3148
        // Do not set a WriteTimeout because it could cause cURL/browser
974✔
3149
        // to return empty response or unable to display page if the
974✔
3150
        // server needs more time to build the response.
974✔
3151
        srv := &http.Server{
974✔
3152
                Addr:              hp,
974✔
3153
                Handler:           mux,
974✔
3154
                MaxHeaderBytes:    1 << 20,
974✔
3155
                ErrorLog:          log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
974✔
3156
                ReadHeaderTimeout: time.Second * 5,
974✔
3157
        }
974✔
3158
        s.mu.Lock()
974✔
3159
        s.http = httpListener
974✔
3160
        s.httpHandler = mux
974✔
3161
        s.monitoringServer = srv
974✔
3162
        s.mu.Unlock()
974✔
3163

974✔
3164
        go func() {
1,948✔
3165
                if err := srv.Serve(httpListener); err != nil {
1,948✔
3166
                        if !s.isShuttingDown() {
974✔
3167
                                s.Fatalf("Error starting monitor on %q: %v", hp, err)
×
3168
                        }
×
3169
                }
3170
                srv.Close()
974✔
3171
                s.mu.Lock()
974✔
3172
                s.httpHandler = nil
974✔
3173
                s.mu.Unlock()
974✔
3174
                s.done <- true
974✔
3175
        }()
3176

3177
        return nil
974✔
3178
}
3179

3180
// HTTPHandler returns the http.Handler object used to handle monitoring
3181
// endpoints. It will return nil if the server is not configured for
3182
// monitoring, or if the server has not been started yet (Server.Start()).
3183
func (s *Server) HTTPHandler() http.Handler {
2✔
3184
        s.mu.Lock()
2✔
3185
        defer s.mu.Unlock()
2✔
3186
        return s.httpHandler
2✔
3187
}
2✔
3188

3189
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
3190
// If updates are made to Info, this function should be consulted and updated.
3191
// Assume lock is held.
3192
func (s *Server) copyInfo() Info {
18,799✔
3193
        info := s.info
18,799✔
3194
        if len(info.ClientConnectURLs) > 0 {
30,082✔
3195
                info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
11,283✔
3196
        }
11,283✔
3197
        if len(info.WSConnectURLs) > 0 {
18,881✔
3198
                info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
82✔
3199
        }
82✔
3200
        return info
18,799✔
3201
}
3202

3203
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
3204
type tlsMixConn struct {
3205
        net.Conn
3206
        pre *bytes.Buffer
3207
}
3208

3209
// Read for our mixed multi-reader.
3210
func (c *tlsMixConn) Read(b []byte) (int, error) {
49✔
3211
        if c.pre != nil {
58✔
3212
                n, err := c.pre.Read(b)
9✔
3213
                if c.pre.Len() == 0 {
18✔
3214
                        c.pre = nil
9✔
3215
                }
9✔
3216
                return n, err
9✔
3217
        }
3218
        return c.Conn.Read(b)
40✔
3219
}
3220

3221
func (s *Server) createClient(conn net.Conn) *client {
9,773✔
3222
        return s.createClientEx(conn, false)
9,773✔
3223
}
9,773✔
3224

3225
func (s *Server) createClientInProcess(conn net.Conn) *client {
8✔
3226
        return s.createClientEx(conn, true)
8✔
3227
}
8✔
3228

3229
func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
9,781✔
3230
        // Snapshot server options.
9,781✔
3231
        opts := s.getOpts()
9,781✔
3232

9,781✔
3233
        maxPay := int32(opts.MaxPayload)
9,781✔
3234
        maxSubs := int32(opts.MaxSubs)
9,781✔
3235
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
9,781✔
3236
        if maxSubs == 0 {
19,561✔
3237
                maxSubs = -1
9,780✔
3238
        }
9,780✔
3239
        now := time.Now()
9,781✔
3240

9,781✔
3241
        c := &client{
9,781✔
3242
                srv:   s,
9,781✔
3243
                nc:    conn,
9,781✔
3244
                opts:  defaultOpts,
9,781✔
3245
                mpay:  maxPay,
9,781✔
3246
                msubs: maxSubs,
9,781✔
3247
                start: now,
9,781✔
3248
                last:  now,
9,781✔
3249
                iproc: inProcess,
9,781✔
3250
        }
9,781✔
3251

9,781✔
3252
        c.registerWithAccount(s.globalAccount())
9,781✔
3253

9,781✔
3254
        var info Info
9,781✔
3255
        var authRequired bool
9,781✔
3256

9,781✔
3257
        s.mu.Lock()
9,781✔
3258
        // Grab JSON info string
9,781✔
3259
        info = s.copyInfo()
9,781✔
3260
        if s.nonceRequired() {
12,262✔
3261
                // Nonce handling
2,481✔
3262
                var raw [nonceLen]byte
2,481✔
3263
                nonce := raw[:]
2,481✔
3264
                s.generateNonce(nonce)
2,481✔
3265
                info.Nonce = string(nonce)
2,481✔
3266
        }
2,481✔
3267
        c.nonce = []byte(info.Nonce)
9,781✔
3268
        authRequired = info.AuthRequired
9,781✔
3269

9,781✔
3270
        // Check to see if we have auth_required set but we also have a no_auth_user.
9,781✔
3271
        // If so set back to false.
9,781✔
3272
        if info.AuthRequired && opts.NoAuthUser != _EMPTY_ && opts.NoAuthUser != s.sysAccOnlyNoAuthUser {
9,959✔
3273
                info.AuthRequired = false
178✔
3274
        }
178✔
3275

3276
        // Check to see if this is an in-process connection with tls_required.
3277
        // If so, set as not required, but available.
3278
        if inProcess && info.TLSRequired {
9,785✔
3279
                info.TLSRequired = false
4✔
3280
                info.TLSAvailable = true
4✔
3281
        }
4✔
3282

3283
        s.totalClients++
9,781✔
3284
        s.mu.Unlock()
9,781✔
3285

9,781✔
3286
        // Grab lock
9,781✔
3287
        c.mu.Lock()
9,781✔
3288
        if authRequired {
16,965✔
3289
                c.flags.set(expectConnect)
7,184✔
3290
        }
7,184✔
3291

3292
        // Initialize
3293
        c.initClient()
9,781✔
3294

9,781✔
3295
        c.Debugf("Client connection created")
9,781✔
3296

9,781✔
3297
        // Save info.TLSRequired value since we may neeed to change it back and forth.
9,781✔
3298
        orgInfoTLSReq := info.TLSRequired
9,781✔
3299

9,781✔
3300
        var tlsFirstFallback time.Duration
9,781✔
3301
        // Check if we should do TLS first.
9,781✔
3302
        tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst
9,781✔
3303
        if tlsFirst {
9,795✔
3304
                // Make sure info.TLSRequired is set to true (it could be false
14✔
3305
                // if AllowNonTLS is enabled).
14✔
3306
                info.TLSRequired = true
14✔
3307
                // Get the fallback delay value if applicable.
14✔
3308
                if f := opts.TLSHandshakeFirstFallback; f > 0 {
19✔
3309
                        tlsFirstFallback = f
5✔
3310
                } else if inProcess {
17✔
3311
                        // For in-process connection, we will always have a fallback
3✔
3312
                        // delay. It allows support for non-TLS, TLS and "TLS First"
3✔
3313
                        // in-process clients to successfully connect.
3✔
3314
                        tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY
3✔
3315
                }
3✔
3316
        }
3317

3318
        // Decide if we are going to require TLS or not and generate INFO json.
3319
        // If we have ProxyProtocol enabled then we won't include the client
3320
        // IP in the initial INFO, as that would leak the proxy IP itself.
3321
        // In that case we'll send another INFO after the client introduces itself.
3322
        tlsRequired := info.TLSRequired
9,781✔
3323
        infoBytes := c.generateClientInfoJSON(info, !opts.ProxyProtocol)
9,781✔
3324

9,781✔
3325
        // Send our information, except if TLS and TLSHandshakeFirst is requested.
9,781✔
3326
        if !tlsFirst {
19,548✔
3327
                // Need to be sent in place since writeLoop cannot be started until
9,767✔
3328
                // TLS handshake is done (if applicable).
9,767✔
3329
                c.sendProtoNow(infoBytes)
9,767✔
3330
        }
9,767✔
3331

3332
        // Unlock to register
3333
        c.mu.Unlock()
9,781✔
3334

9,781✔
3335
        // Register with the server.
9,781✔
3336
        s.mu.Lock()
9,781✔
3337
        // If server is not running, Shutdown() may have already gathered the
9,781✔
3338
        // list of connections to close. It won't contain this one, so we need
9,781✔
3339
        // to bail out now otherwise the readLoop started down there would not
9,781✔
3340
        // be interrupted. Skip also if in lame duck mode.
9,781✔
3341
        if !s.isRunning() || s.ldm {
9,979✔
3342
                // There are some tests that create a server but don't start it,
198✔
3343
                // and use "async" clients and perform the parsing manually. Such
198✔
3344
                // clients would branch here (since server is not running). However,
198✔
3345
                // when a server was really running and has been shutdown, we must
198✔
3346
                // close this connection.
198✔
3347
                if s.isShuttingDown() {
202✔
3348
                        conn.Close()
4✔
3349
                }
4✔
3350
                s.mu.Unlock()
198✔
3351
                return c
198✔
3352
        }
3353

3354
        // If there is a max connections specified, check that adding
3355
        // this new client would not push us over the max
3356
        if opts.MaxConn < 0 || (opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn) {
9,583✔
3357
                s.mu.Unlock()
×
3358
                c.maxConnExceeded()
×
3359
                return nil
×
3360
        }
×
3361
        s.clients[c.cid] = c
9,583✔
3362

9,583✔
3363
        s.mu.Unlock()
9,583✔
3364

9,583✔
3365
        // Re-Grab lock
9,583✔
3366
        c.mu.Lock()
9,583✔
3367

9,583✔
3368
        isClosed := c.isClosed()
9,583✔
3369
        var pre []byte
9,583✔
3370
        // We need first to check for "TLS First" fallback delay.
9,583✔
3371
        if !isClosed && tlsFirstFallback > 0 {
9,591✔
3372
                // We wait and see if we are getting any data. Since we did not send
8✔
3373
                // the INFO protocol yet, only clients that use TLS first should be
8✔
3374
                // sending data (the TLS handshake). We don't really check the content:
8✔
3375
                // if it is a rogue agent and not an actual client performing the
8✔
3376
                // TLS handshake, the error will be detected when performing the
8✔
3377
                // handshake on our side.
8✔
3378
                pre = make([]byte, 4)
8✔
3379
                c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
8✔
3380
                n, _ := io.ReadFull(c.nc, pre[:])
8✔
3381
                c.nc.SetReadDeadline(time.Time{})
8✔
3382
                // If we get any data (regardless of possible timeout), we will proceed
8✔
3383
                // with the TLS handshake.
8✔
3384
                if n > 0 {
11✔
3385
                        pre = pre[:n]
3✔
3386
                } else {
8✔
3387
                        // We did not get anything so we will send the INFO protocol.
5✔
3388
                        pre = nil
5✔
3389

5✔
3390
                        // Restore the original info.TLSRequired value if it is
5✔
3391
                        // different that the current value and regenerate infoBytes.
5✔
3392
                        if orgInfoTLSReq != info.TLSRequired {
9✔
3393
                                info.TLSRequired = orgInfoTLSReq
4✔
3394
                                infoBytes = c.generateClientInfoJSON(info, !opts.ProxyProtocol)
4✔
3395
                        }
4✔
3396
                        c.sendProtoNow(infoBytes)
5✔
3397
                        // Set the boolean to false for the rest of the function.
5✔
3398
                        tlsFirst = false
5✔
3399
                        // Check closed status again
5✔
3400
                        isClosed = c.isClosed()
5✔
3401
                }
3402
        }
3403
        // If we have both TLS and non-TLS allowed we need to see which
3404
        // one the client wants. We'll always allow this for in-process
3405
        // connections.
3406
        if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) {
9,592✔
3407
                pre = make([]byte, 6) // Minimum 6 bytes for proxy proto in next step.
9✔
3408
                c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
9✔
3409
                n, _ := io.ReadFull(c.nc, pre[:])
9✔
3410
                c.nc.SetReadDeadline(time.Time{})
9✔
3411
                pre = pre[:n]
9✔
3412
                if n > 0 && pre[0] == 0x16 {
12✔
3413
                        tlsRequired = true
3✔
3414
                } else {
9✔
3415
                        tlsRequired = false
6✔
3416
                }
6✔
3417
        }
3418

3419
        // Check for proxy protocol if enabled.
3420
        if !isClosed && !tlsRequired && opts.ProxyProtocol {
9,586✔
3421
                if len(pre) == 0 {
6✔
3422
                        // There has been no pre-read yet, do so so we can work out
3✔
3423
                        // if the client is trying to negotiate PROXY.
3✔
3424
                        pre = make([]byte, 6)
3✔
3425
                        c.nc.SetReadDeadline(time.Now().Add(proxyProtoReadTimeout))
3✔
3426
                        n, _ := io.ReadFull(c.nc, pre)
3✔
3427
                        c.nc.SetReadDeadline(time.Time{})
3✔
3428
                        pre = pre[:n]
3✔
3429
                }
3✔
3430
                conn = &tlsMixConn{conn, bytes.NewBuffer(pre)}
3✔
3431
                addr, proxyPre, err := readProxyProtoHeader(conn)
3✔
3432
                if err != nil && err != errProxyProtoUnrecognized {
3✔
3433
                        // err != errProxyProtoUnrecognized implies that we detected a proxy
×
3434
                        // protocol header but we failed to parse it, so don't continue.
×
3435
                        c.mu.Unlock()
×
3436
                        s.Warnf("Error reading PROXY protocol header from %s: %v", conn.RemoteAddr(), err)
×
3437
                        c.closeConnection(ProtocolViolation)
×
3438
                        return nil
×
3439
                }
×
3440
                // If addr is nil, it was a LOCAL/UNKNOWN command (health check)
3441
                // Use the connection as-is
3442
                if addr != nil {
5✔
3443
                        c.nc = &proxyConn{
2✔
3444
                                Conn:       conn,
2✔
3445
                                remoteAddr: addr,
2✔
3446
                        }
2✔
3447
                        // These were set already by initClient, override them.
2✔
3448
                        c.host = addr.srcIP.String()
2✔
3449
                        c.port = addr.srcPort
2✔
3450
                }
2✔
3451
                // At this point, err is either:
3452
                //  - nil => we parsed the proxy protocol header successfully
3453
                //  - errProxyProtoUnrecognized => we didn't detect proxy protocol at all
3454
                // We only clear the pre-read if we successfully read the protocol header
3455
                // so that the next step doesn't re-read it. Otherwise we have to assume
3456
                // that it's a non-proxied connection and we want the pre-read to remain
3457
                // for the next step.
3458
                if err == nil {
6✔
3459
                        pre = proxyPre
3✔
3460
                }
3✔
3461
                // Because we have ProxyProtocol enabled, our earlier INFO message didn't
3462
                // include the client_ip. If we need to send it again then we will include
3463
                // it, but sending it here immediately can confuse clients who have just
3464
                // PING'd.
3465
                infoBytes = c.generateClientInfoJSON(info, true)
3✔
3466
        }
3467

3468
        // Check for TLS
3469
        if !isClosed && tlsRequired {
9,951✔
3470
                if s.connRateCounter != nil && !s.connRateCounter.allow() {
369✔
3471
                        c.mu.Unlock()
1✔
3472
                        c.sendErr("Connection throttling is active. Please try again later.")
1✔
3473
                        c.closeConnection(MaxConnectionsExceeded)
1✔
3474
                        return nil
1✔
3475
                }
1✔
3476

3477
                // If we have a prebuffer create a multi-reader.
3478
                if len(pre) > 0 {
373✔
3479
                        c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
6✔
3480
                        // Clear pre so it is not parsed.
6✔
3481
                        pre = nil
6✔
3482
                }
6✔
3483
                // Performs server-side TLS handshake.
3484
                if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout, opts.TLSPinnedCerts); err != nil {
498✔
3485
                        c.mu.Unlock()
131✔
3486
                        return nil
131✔
3487
                }
131✔
3488
        }
3489

3490
        // Now, send the INFO if it was delayed
3491
        if !isClosed && tlsFirst {
9,457✔
3492
                c.flags.set(didTLSFirst)
6✔
3493
                c.sendProtoNow(infoBytes)
6✔
3494
                // Check closed status
6✔
3495
                isClosed = c.isClosed()
6✔
3496
        }
6✔
3497

3498
        // Connection could have been closed while sending the INFO proto.
3499
        if isClosed {
9,454✔
3500
                c.mu.Unlock()
3✔
3501
                // We need to call closeConnection() to make sure that proper cleanup is done.
3✔
3502
                c.closeConnection(WriteError)
3✔
3503
                return nil
3✔
3504
        }
3✔
3505

3506
        // Check for Auth. We schedule this timer after the TLS handshake to avoid
3507
        // the race where the timer fires during the handshake and causes the
3508
        // server to write bad data to the socket. See issue #432.
3509
        if authRequired {
16,503✔
3510
                c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
7,055✔
3511
        }
7,055✔
3512

3513
        // Do final client initialization
3514

3515
        // Set the Ping timer. Will be reset once connect was received.
3516
        c.setPingTimer()
9,448✔
3517

9,448✔
3518
        // Spin up the read loop.
9,448✔
3519
        s.startGoRoutine(func() { c.readLoop(pre) })
18,896✔
3520

3521
        // Spin up the write loop.
3522
        s.startGoRoutine(func() { c.writeLoop() })
18,896✔
3523

3524
        if tlsRequired {
9,684✔
3525
                c.Debugf("TLS handshake complete")
236✔
3526
                cs := c.nc.(*tls.Conn).ConnectionState()
236✔
3527
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
236✔
3528
        }
236✔
3529

3530
        c.mu.Unlock()
9,448✔
3531

9,448✔
3532
        return c
9,448✔
3533
}
3534

3535
// This will save off a closed client in a ring buffer such that
3536
// /connz can inspect. Useful for debugging, etc.
3537
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
11,960✔
3538
        now := time.Now()
11,960✔
3539

11,960✔
3540
        s.accountDisconnectEvent(c, now, reason.String())
11,960✔
3541

11,960✔
3542
        c.mu.Lock()
11,960✔
3543

11,960✔
3544
        cc := &closedClient{}
11,960✔
3545
        cc.fill(c, nc, now, false)
11,960✔
3546
        // Note that cc.fill is using len(c.subs), which may have been set to nil by now,
11,960✔
3547
        // so replace cc.NumSubs with len(subs).
11,960✔
3548
        cc.NumSubs = uint32(len(subs))
11,960✔
3549
        cc.Stop = &now
11,960✔
3550
        cc.Reason = reason.String()
11,960✔
3551

11,960✔
3552
        // Do subs, do not place by default in main ConnInfo
11,960✔
3553
        if len(subs) > 0 {
21,447✔
3554
                cc.subs = make([]SubDetail, 0, len(subs))
9,487✔
3555
                for _, sub := range subs {
157,934✔
3556
                        cc.subs = append(cc.subs, newSubDetail(sub))
148,447✔
3557
                }
148,447✔
3558
        }
3559
        // Hold user as well.
3560
        cc.user = c.getRawAuthUser()
11,960✔
3561
        // Hold account name if not the global account.
11,960✔
3562
        if c.acc != nil && c.acc.Name != globalAccountName {
18,409✔
3563
                cc.acc = c.acc.Name
6,449✔
3564
        }
6,449✔
3565
        cc.JWT = c.opts.JWT
11,960✔
3566
        cc.IssuerKey = issuerForClient(c)
11,960✔
3567
        cc.Tags = c.tags
11,960✔
3568
        cc.NameTag = c.nameTag
11,960✔
3569
        c.mu.Unlock()
11,960✔
3570

11,960✔
3571
        // Place in the ring buffer
11,960✔
3572
        s.mu.Lock()
11,960✔
3573
        if s.closed != nil {
23,919✔
3574
                s.closed.append(cc)
11,959✔
3575
        }
11,959✔
3576
        s.mu.Unlock()
11,960✔
3577
}
3578

3579
// Adds to the list of client and websocket clients connect URLs.
3580
// If there was a change, an INFO protocol is sent to registered clients
3581
// that support async INFO protocols.
3582
// Server lock held on entry.
3583
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
8,970✔
3584
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
8,970✔
3585
}
8,970✔
3586

3587
// Removes from the list of client and websocket clients connect URLs.
3588
// If there was a change, an INFO protocol is sent to registered clients
3589
// that support async INFO protocols.
3590
// Server lock held on entry.
3591
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
8,953✔
3592
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
8,953✔
3593
}
8,953✔
3594

3595
// Updates the list of client and websocket clients connect URLs and if any change
3596
// sends an async INFO update to clients that support it.
3597
// Server lock held on entry.
3598
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
17,923✔
3599
        remove := !add
17,923✔
3600
        // Will return true if we need alter the server's Info object.
17,923✔
3601
        updateMap := func(urls []string, m refCountedUrlSet) bool {
53,769✔
3602
                wasUpdated := false
35,846✔
3603
                for _, url := range urls {
54,178✔
3604
                        if add && m.addUrl(url) {
27,497✔
3605
                                wasUpdated = true
9,165✔
3606
                        } else if remove && m.removeUrl(url) {
27,487✔
3607
                                wasUpdated = true
9,155✔
3608
                        }
9,155✔
3609
                }
3610
                return wasUpdated
35,846✔
3611
        }
3612
        cliUpdated := updateMap(curls, s.clientConnectURLsMap)
17,923✔
3613
        wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
17,923✔
3614

17,923✔
3615
        updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
36,155✔
3616
                // Recreate the info's slice from the map
18,232✔
3617
                *infoURLs = (*infoURLs)[:0]
18,232✔
3618
                // Add this server client connect ULRs first...
18,232✔
3619
                *infoURLs = append(*infoURLs, urls...)
18,232✔
3620
                // Then the ones from the map
18,232✔
3621
                for url := range m {
41,123✔
3622
                        *infoURLs = append(*infoURLs, url)
22,891✔
3623
                }
22,891✔
3624
        }
3625
        if cliUpdated {
35,817✔
3626
                updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
17,894✔
3627
        }
17,894✔
3628
        if wsUpdated {
18,261✔
3629
                updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
338✔
3630
        }
338✔
3631
        if cliUpdated || wsUpdated {
35,817✔
3632
                // Send to all registered clients that support async INFO protocols.
17,894✔
3633
                s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
17,894✔
3634
        }
17,894✔
3635
}
3636

3637
// Handle closing down a connection when the handshake has timedout.
3638
func tlsTimeout(c *client, conn *tls.Conn) {
38✔
3639
        c.mu.Lock()
38✔
3640
        closed := c.isClosed()
38✔
3641
        c.mu.Unlock()
38✔
3642
        // Check if already closed
38✔
3643
        if closed {
47✔
3644
                return
9✔
3645
        }
9✔
3646
        cs := conn.ConnectionState()
29✔
3647
        if !cs.HandshakeComplete {
32✔
3648
                c.Errorf("TLS handshake timeout")
3✔
3649
                c.sendErr("Secure Connection - TLS Required")
3✔
3650
                c.closeConnection(TLSHandshakeError)
3✔
3651
        }
3✔
3652
}
3653

3654
// Seems silly we have to write these
3655
func tlsVersion(ver uint16) string {
1,187✔
3656
        switch ver {
1,187✔
3657
        case tls.VersionTLS10:
×
3658
                return "1.0"
×
3659
        case tls.VersionTLS11:
×
3660
                return "1.1"
×
3661
        case tls.VersionTLS12:
×
3662
                return "1.2"
×
3663
        case tls.VersionTLS13:
1,187✔
3664
                return "1.3"
1,187✔
3665
        }
3666
        return fmt.Sprintf("Unknown [0x%x]", ver)
×
3667
}
3668

3669
func tlsVersionFromString(ver string) (uint16, error) {
×
3670
        switch ver {
×
3671
        case "1.0":
×
3672
                return tls.VersionTLS10, nil
×
3673
        case "1.1":
×
3674
                return tls.VersionTLS11, nil
×
3675
        case "1.2":
×
3676
                return tls.VersionTLS12, nil
×
3677
        case "1.3":
×
3678
                return tls.VersionTLS13, nil
×
3679
        }
3680
        return 0, fmt.Errorf("unknown version: %v", ver)
×
3681
}
3682

3683
// Remove a client or route from our internal accounting.
3684
func (s *Server) removeClient(c *client) {
147,181✔
3685
        // kind is immutable, so can check without lock
147,181✔
3686
        switch c.kind {
147,181✔
3687
        case CLIENT:
10,296✔
3688
                c.mu.Lock()
10,296✔
3689
                cid := c.cid
10,296✔
3690
                updateProtoInfoCount := false
10,296✔
3691
                if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
19,134✔
3692
                        updateProtoInfoCount = true
8,838✔
3693
                }
8,838✔
3694
                proxyKey := c.proxyKey
10,296✔
3695
                c.mu.Unlock()
10,296✔
3696

10,296✔
3697
                s.mu.Lock()
10,296✔
3698
                delete(s.clients, cid)
10,296✔
3699
                if updateProtoInfoCount {
19,134✔
3700
                        s.cproto--
8,838✔
3701
                }
8,838✔
3702
                if proxyKey != _EMPTY_ {
10,300✔
3703
                        s.removeProxiedConn(proxyKey, cid)
4✔
3704
                }
4✔
3705
                s.mu.Unlock()
10,296✔
3706
        case ROUTER:
62,825✔
3707
                s.removeRoute(c)
62,825✔
3708
        case GATEWAY:
3,678✔
3709
                s.removeRemoteGatewayConnection(c)
3,678✔
3710
        case LEAF:
1,668✔
3711
                s.removeLeafNodeConnection(c)
1,668✔
3712
        }
3713
}
3714

3715
// Remove the connection with id `cid` from the map of connections
3716
// under the public key `key` of the trusted proxies.
3717
//
3718
// Server lock must be held on entry.
3719
func (s *Server) removeProxiedConn(key string, cid uint64) {
8✔
3720
        conns := s.proxiedConns[key]
8✔
3721
        delete(conns, cid)
8✔
3722
        if len(conns) == 0 {
16✔
3723
                delete(s.proxiedConns, key)
8✔
3724
        }
8✔
3725
}
3726

3727
func (s *Server) removeFromTempClients(cid uint64) {
106,641✔
3728
        s.grMu.Lock()
106,641✔
3729
        delete(s.grTmpClients, cid)
106,641✔
3730
        s.grMu.Unlock()
106,641✔
3731
}
106,641✔
3732

3733
func (s *Server) addToTempClients(cid uint64, c *client) bool {
67,910✔
3734
        added := false
67,910✔
3735
        s.grMu.Lock()
67,910✔
3736
        if s.grRunning {
135,815✔
3737
                s.grTmpClients[cid] = c
67,905✔
3738
                added = true
67,905✔
3739
        }
67,905✔
3740
        s.grMu.Unlock()
67,910✔
3741
        return added
67,910✔
3742
}
3743

3744
/////////////////////////////////////////////////////////////////
3745
// These are some helpers for accounting in functional tests.
3746
/////////////////////////////////////////////////////////////////
3747

3748
// NumRoutes will report the number of registered routes.
3749
func (s *Server) NumRoutes() int {
5,987✔
3750
        s.mu.RLock()
5,987✔
3751
        defer s.mu.RUnlock()
5,987✔
3752
        return s.numRoutes()
5,987✔
3753
}
5,987✔
3754

3755
// numRoutes will report the number of registered routes.
3756
// Server lock held on entry
3757
func (s *Server) numRoutes() int {
774,344✔
3758
        var nr int
774,344✔
3759
        s.forEachRoute(func(c *client) {
1,513,721✔
3760
                nr++
739,377✔
3761
        })
739,377✔
3762
        return nr
774,344✔
3763
}
3764

3765
// NumRemotes will report number of registered remotes.
3766
func (s *Server) NumRemotes() int {
×
3767
        s.mu.RLock()
×
3768
        defer s.mu.RUnlock()
×
3769
        return s.numRemotes()
×
3770
}
×
3771

3772
// numRemotes will report number of registered remotes.
3773
// Server lock held on entry
3774
func (s *Server) numRemotes() int {
28,326✔
3775
        return len(s.routes)
28,326✔
3776
}
28,326✔
3777

3778
// NumLeafNodes will report number of leaf node connections.
3779
func (s *Server) NumLeafNodes() int {
3,739✔
3780
        s.mu.RLock()
3,739✔
3781
        defer s.mu.RUnlock()
3,739✔
3782
        return len(s.leafs)
3,739✔
3783
}
3,739✔
3784

3785
// NumClients will report the number of registered clients.
3786
func (s *Server) NumClients() int {
55✔
3787
        s.mu.RLock()
55✔
3788
        defer s.mu.RUnlock()
55✔
3789
        return len(s.clients)
55✔
3790
}
55✔
3791

3792
// GetClient will return the client associated with cid.
3793
func (s *Server) GetClient(cid uint64) *client {
130✔
3794
        return s.getClient(cid)
130✔
3795
}
130✔
3796

3797
// getClient will return the client associated with cid.
3798
func (s *Server) getClient(cid uint64) *client {
147✔
3799
        s.mu.RLock()
147✔
3800
        defer s.mu.RUnlock()
147✔
3801
        return s.clients[cid]
147✔
3802
}
147✔
3803

3804
// GetLeafNode returns the leafnode associated with the cid.
3805
func (s *Server) GetLeafNode(cid uint64) *client {
1✔
3806
        s.mu.RLock()
1✔
3807
        defer s.mu.RUnlock()
1✔
3808
        return s.leafs[cid]
1✔
3809
}
1✔
3810

3811
// NumSubscriptions will report how many subscriptions are active.
3812
func (s *Server) NumSubscriptions() uint32 {
363✔
3813
        s.mu.RLock()
363✔
3814
        defer s.mu.RUnlock()
363✔
3815
        return s.numSubscriptions()
363✔
3816
}
363✔
3817

3818
// numSubscriptions will report how many subscriptions are active.
3819
// Lock should be held.
3820
func (s *Server) numSubscriptions() uint32 {
26,819✔
3821
        var subs int
26,819✔
3822
        s.accounts.Range(func(k, v any) bool {
87,126✔
3823
                acc := v.(*Account)
60,307✔
3824
                subs += acc.TotalSubs()
60,307✔
3825
                return true
60,307✔
3826
        })
60,307✔
3827
        return uint32(subs)
26,819✔
3828
}
3829

3830
// NumSlowConsumers will report the number of slow consumers.
3831
func (s *Server) NumSlowConsumers() int64 {
1✔
3832
        return atomic.LoadInt64(&s.slowConsumers)
1✔
3833
}
1✔
3834

3835
// NumStalledClients will report the total number of times clients have been stalled.
3836
func (s *Server) NumStalledClients() int64 {
×
3837
        return atomic.LoadInt64(&s.stalls)
×
3838
}
×
3839

3840
// NumSlowConsumersClients will report the number of slow consumers clients.
3841
func (s *Server) NumSlowConsumersClients() uint64 {
36,635✔
3842
        return s.scStats.clients.Load()
36,635✔
3843
}
36,635✔
3844

3845
// NumSlowConsumersRoutes will report the number of slow consumers routes.
3846
func (s *Server) NumSlowConsumersRoutes() uint64 {
36,635✔
3847
        return s.scStats.routes.Load()
36,635✔
3848
}
36,635✔
3849

3850
// NumSlowConsumersGateways will report the number of slow consumers leafs.
3851
func (s *Server) NumSlowConsumersGateways() uint64 {
36,637✔
3852
        return s.scStats.gateways.Load()
36,637✔
3853
}
36,637✔
3854

3855
// NumSlowConsumersLeafs will report the number of slow consumers leafs.
3856
func (s *Server) NumSlowConsumersLeafs() uint64 {
36,637✔
3857
        return s.scStats.leafs.Load()
36,637✔
3858
}
36,637✔
3859

3860
// NumStaleConnections will report the number of stale connections.
3861
func (s *Server) NumStaleConnections() int64 {
4✔
3862
        return atomic.LoadInt64(&s.staleConnections)
4✔
3863
}
4✔
3864

3865
// NumStaleConnectionsClients will report the number of stale client connections.
3866
func (s *Server) NumStaleConnectionsClients() uint64 {
36,639✔
3867
        return s.staleStats.clients.Load()
36,639✔
3868
}
36,639✔
3869

3870
// NumStaleConnectionsRoutes will report the number of stale route connections.
3871
func (s *Server) NumStaleConnectionsRoutes() uint64 {
36,639✔
3872
        return s.staleStats.routes.Load()
36,639✔
3873
}
36,639✔
3874

3875
// NumStaleConnectionsGateways will report the number of stale gateway connections.
3876
func (s *Server) NumStaleConnectionsGateways() uint64 {
36,639✔
3877
        return s.staleStats.gateways.Load()
36,639✔
3878
}
36,639✔
3879

3880
// NumStaleConnectionsLeafs will report the number of stale leaf connections.
3881
func (s *Server) NumStaleConnectionsLeafs() uint64 {
36,639✔
3882
        return s.staleStats.leafs.Load()
36,639✔
3883
}
36,639✔
3884

3885
// ConfigTime will report the last time the server configuration was loaded.
3886
func (s *Server) ConfigTime() time.Time {
×
3887
        s.mu.RLock()
×
3888
        defer s.mu.RUnlock()
×
3889
        return s.configTime
×
3890
}
×
3891

3892
// Addr will return the net.Addr object for the current listener.
3893
func (s *Server) Addr() net.Addr {
214✔
3894
        s.mu.RLock()
214✔
3895
        defer s.mu.RUnlock()
214✔
3896
        if s.listener == nil {
214✔
3897
                return nil
×
3898
        }
×
3899
        return s.listener.Addr()
214✔
3900
}
3901

3902
// MonitorAddr will return the net.Addr object for the monitoring listener.
3903
func (s *Server) MonitorAddr() *net.TCPAddr {
136✔
3904
        s.mu.RLock()
136✔
3905
        defer s.mu.RUnlock()
136✔
3906
        if s.http == nil {
136✔
3907
                return nil
×
3908
        }
×
3909
        return s.http.Addr().(*net.TCPAddr)
136✔
3910
}
3911

3912
// ClusterAddr returns the net.Addr object for the route listener.
3913
func (s *Server) ClusterAddr() *net.TCPAddr {
29✔
3914
        s.mu.RLock()
29✔
3915
        defer s.mu.RUnlock()
29✔
3916
        if s.routeListener == nil {
29✔
3917
                return nil
×
3918
        }
×
3919
        return s.routeListener.Addr().(*net.TCPAddr)
29✔
3920
}
3921

3922
// ProfilerAddr returns the net.Addr object for the profiler listener.
3923
func (s *Server) ProfilerAddr() *net.TCPAddr {
×
3924
        s.mu.RLock()
×
3925
        defer s.mu.RUnlock()
×
3926
        if s.profiler == nil {
×
3927
                return nil
×
3928
        }
×
3929
        return s.profiler.Addr().(*net.TCPAddr)
×
3930
}
3931

3932
func (s *Server) readyForConnections(d time.Duration) error {
10,393✔
3933
        // Snapshot server options.
10,393✔
3934
        opts := s.getOpts()
10,393✔
3935

10,393✔
3936
        type info struct {
10,393✔
3937
                ok  bool
10,393✔
3938
                err error
10,393✔
3939
        }
10,393✔
3940
        chk := make(map[string]info)
10,393✔
3941

10,393✔
3942
        end := time.Now().Add(d)
10,393✔
3943
        for time.Now().Before(end) {
53,672✔
3944
                s.mu.RLock()
43,279✔
3945
                chk["server"] = info{ok: s.listener != nil || opts.DontListen, err: s.listenerErr}
43,279✔
3946
                chk["route"] = info{ok: (opts.Cluster.Port == 0 || s.routeListener != nil), err: s.routeListenerErr}
43,279✔
3947
                chk["gateway"] = info{ok: (opts.Gateway.Name == _EMPTY_ || s.gatewayListener != nil), err: s.gatewayListenerErr}
43,279✔
3948
                chk["leafnode"] = info{ok: (opts.LeafNode.Port == 0 || s.leafNodeListener != nil), err: s.leafNodeListenerErr}
43,279✔
3949
                chk["websocket"] = info{ok: (opts.Websocket.Port == 0 || s.websocket.listener != nil), err: s.websocket.listenerErr}
43,279✔
3950
                chk["mqtt"] = info{ok: (opts.MQTT.Port == 0 || s.mqtt.listener != nil), err: s.mqtt.listenerErr}
43,279✔
3951
                s.mu.RUnlock()
43,279✔
3952

43,279✔
3953
                var numOK int
43,279✔
3954
                for _, inf := range chk {
302,953✔
3955
                        if inf.ok {
485,413✔
3956
                                numOK++
225,739✔
3957
                        }
225,739✔
3958
                }
3959
                if numOK == len(chk) {
53,664✔
3960
                        // In the case of DontListen option (no accept loop), we still want
10,385✔
3961
                        // to make sure that Start() has done all the work, so we wait on
10,385✔
3962
                        // that.
10,385✔
3963
                        if opts.DontListen {
10,385✔
3964
                                select {
×
3965
                                case <-s.startupComplete:
×
3966
                                case <-time.After(d):
×
3967
                                        return fmt.Errorf("failed to be ready for connections after %s: startup did not complete", d)
×
3968
                                }
3969
                        }
3970
                        return nil
10,385✔
3971
                }
3972
                if d > 25*time.Millisecond {
37,790✔
3973
                        time.Sleep(25 * time.Millisecond)
4,896✔
3974
                }
4,896✔
3975
        }
3976

3977
        failed := make([]string, 0, len(chk))
8✔
3978
        for name, inf := range chk {
56✔
3979
                if inf.ok && inf.err != nil {
48✔
3980
                        failed = append(failed, fmt.Sprintf("%s(ok, but %s)", name, inf.err))
×
3981
                }
×
3982
                if !inf.ok && inf.err == nil {
58✔
3983
                        failed = append(failed, name)
10✔
3984
                }
10✔
3985
                if !inf.ok && inf.err != nil {
48✔
3986
                        failed = append(failed, fmt.Sprintf("%s(%s)", name, inf.err))
×
3987
                }
×
3988
        }
3989

3990
        return fmt.Errorf(
8✔
3991
                "failed to be ready for connections after %s: %s",
8✔
3992
                d, strings.Join(failed, ", "),
8✔
3993
        )
8✔
3994
}
3995

3996
// ReadyForConnections returns `true` if the server is ready to accept clients
3997
// and, if routing is enabled, route connections. If after the duration
3998
// `dur` the server is still not ready, returns `false`.
3999
func (s *Server) ReadyForConnections(dur time.Duration) bool {
794✔
4000
        return s.readyForConnections(dur) == nil
794✔
4001
}
794✔
4002

4003
// Quick utility to function to tell if the server supports headers.
4004
func (s *Server) supportsHeaders() bool {
185,551✔
4005
        if s == nil {
185,551✔
4006
                return false
×
4007
        }
×
4008
        return !(s.getOpts().NoHeaderSupport)
185,551✔
4009
}
4010

4011
// ID returns the server's ID
4012
func (s *Server) ID() string {
54,609✔
4013
        return s.info.ID
54,609✔
4014
}
54,609✔
4015

4016
// NodeName returns the node name for this server.
4017
func (s *Server) NodeName() string {
110✔
4018
        return getHash(s.info.Name)
110✔
4019
}
110✔
4020

4021
// Name returns the server's name. This will be the same as the ID if it was not set.
4022
func (s *Server) Name() string {
220,877✔
4023
        return s.info.Name
220,877✔
4024
}
220,877✔
4025

4026
func (s *Server) String() string {
5,286✔
4027
        return s.info.Name
5,286✔
4028
}
5,286✔
4029

4030
type pprofLabels map[string]string
4031

4032
func setGoRoutineLabels(tags ...pprofLabels) {
368,050✔
4033
        var labels []string
368,050✔
4034
        for _, m := range tags {
420,028✔
4035
                for k, v := range m {
237,954✔
4036
                        labels = append(labels, k, v)
185,976✔
4037
                }
185,976✔
4038
        }
4039
        if len(labels) > 0 {
420,028✔
4040
                pprof.SetGoroutineLabels(
51,978✔
4041
                        pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
51,978✔
4042
                )
51,978✔
4043
        }
51,978✔
4044
}
4045

4046
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
346,734✔
4047
        var started bool
346,734✔
4048
        s.grMu.Lock()
346,734✔
4049
        defer s.grMu.Unlock()
346,734✔
4050
        if s.grRunning {
693,350✔
4051
                s.grWG.Add(1)
346,616✔
4052
                go func() {
693,232✔
4053
                        setGoRoutineLabels(tags...)
346,616✔
4054
                        f()
346,616✔
4055
                }()
346,616✔
4056
                started = true
346,616✔
4057
        }
4058
        return started
346,734✔
4059
}
4060

4061
func (s *Server) numClosedConns() int {
102✔
4062
        s.mu.RLock()
102✔
4063
        defer s.mu.RUnlock()
102✔
4064
        return s.closed.len()
102✔
4065
}
102✔
4066

4067
func (s *Server) totalClosedConns() uint64 {
43✔
4068
        s.mu.RLock()
43✔
4069
        defer s.mu.RUnlock()
43✔
4070
        return s.closed.totalConns()
43✔
4071
}
43✔
4072

4073
func (s *Server) closedClients() []*closedClient {
8✔
4074
        s.mu.RLock()
8✔
4075
        defer s.mu.RUnlock()
8✔
4076
        return s.closed.closedClients()
8✔
4077
}
8✔
4078

4079
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
4080
// port based on the server options' Host and Port. If the Host corresponds to
4081
// "any" interfaces, this call returns the list of resolved IP addresses.
4082
// If ClientAdvertise is set, returns the client advertise host and port.
4083
// The server lock is assumed held on entry.
4084
func (s *Server) getClientConnectURLs() []string {
6,571✔
4085
        // Snapshot server options.
6,571✔
4086
        opts := s.getOpts()
6,571✔
4087
        // Ignore error here since we know that if there is client advertise, the
6,571✔
4088
        // parseHostPort is correct because we did it right before calling this
6,571✔
4089
        // function in Server.New().
6,571✔
4090
        urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
6,571✔
4091
        return urls
6,571✔
4092
}
6,571✔
4093

4094
// Generic version that will return an array of URLs based on the given
4095
// advertise, host and port values.
4096
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
6,688✔
4097
        urls := make([]string, 0, 1)
6,688✔
4098

6,688✔
4099
        // short circuit if advertise is set
6,688✔
4100
        if advertise != "" {
6,690✔
4101
                h, p, err := parseHostPort(advertise, port)
2✔
4102
                if err != nil {
2✔
4103
                        return nil, err
×
4104
                }
×
4105
                urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
2✔
4106
        } else {
6,686✔
4107
                sPort := strconv.Itoa(port)
6,686✔
4108
                _, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
6,686✔
4109
                for _, ip := range ips {
7,814✔
4110
                        urls = append(urls, net.JoinHostPort(ip, sPort))
1,128✔
4111
                }
1,128✔
4112
                if err != nil || len(urls) == 0 {
12,808✔
4113
                        // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
6,122✔
4114
                        // reason we could not add any URL in the loop above.
6,122✔
4115
                        // We had a case where a Windows VM was hosed and would have err == nil
6,122✔
4116
                        // and not add any address in the array in the loop above, and we
6,122✔
4117
                        // ended-up returning 0.0.0.0, which is problematic for Windows clients.
6,122✔
4118
                        // Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
6,122✔
4119
                        if host == "0.0.0.0" || host == "::" {
6,122✔
4120
                                s.Errorf("Address %q can not be resolved properly", host)
×
4121
                        } else {
6,122✔
4122
                                urls = append(urls, net.JoinHostPort(host, sPort))
6,122✔
4123
                        }
6,122✔
4124
                }
4125
        }
4126
        return urls, nil
6,688✔
4127
}
4128

4129
// Returns an array of non local IPs if the provided host is
4130
// 0.0.0.0 or ::. It returns the first resolved if `all` is
4131
// false.
4132
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
4133
// so that if the returned array is empty caller can decide
4134
// what to do next.
4135
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
11,672✔
4136
        ip := net.ParseIP(host)
11,672✔
4137
        // If this is not an IP, we are done
11,672✔
4138
        if ip == nil {
11,695✔
4139
                return false, nil, nil
23✔
4140
        }
23✔
4141
        // If this is not 0.0.0.0 or :: we have nothing to do.
4142
        if !ip.IsUnspecified() {
22,396✔
4143
                return false, nil, nil
10,747✔
4144
        }
10,747✔
4145
        s.Debugf("Get non local IPs for %q", host)
902✔
4146
        var ips []string
902✔
4147
        ifaces, _ := net.Interfaces()
902✔
4148
        for _, i := range ifaces {
4,510✔
4149
                addrs, _ := i.Addrs()
3,608✔
4150
                for _, addr := range addrs {
7,780✔
4151
                        switch v := addr.(type) {
4,172✔
4152
                        case *net.IPNet:
4,172✔
4153
                                ip = v.IP
4,172✔
4154
                        case *net.IPAddr:
×
4155
                                ip = v.IP
×
4156
                        }
4157
                        ipStr := ip.String()
4,172✔
4158
                        // Skip non global unicast addresses
4,172✔
4159
                        if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
6,540✔
4160
                                ip = nil
2,368✔
4161
                                continue
2,368✔
4162
                        }
4163
                        s.Debugf("  ip=%s", ipStr)
1,804✔
4164
                        ips = append(ips, ipStr)
1,804✔
4165
                        if !all {
2,480✔
4166
                                break
676✔
4167
                        }
4168
                }
4169
        }
4170
        return true, ips, nil
902✔
4171
}
4172

4173
// if the ip is not specified, attempt to resolve it
4174
func resolveHostPorts(addr net.Listener) []string {
15✔
4175
        hostPorts := make([]string, 0)
15✔
4176
        hp := addr.Addr().(*net.TCPAddr)
15✔
4177
        port := strconv.Itoa(hp.Port)
15✔
4178
        if hp.IP.IsUnspecified() {
22✔
4179
                var ip net.IP
7✔
4180
                ifaces, _ := net.Interfaces()
7✔
4181
                for _, i := range ifaces {
35✔
4182
                        addrs, _ := i.Addrs()
28✔
4183
                        for _, addr := range addrs {
63✔
4184
                                switch v := addr.(type) {
35✔
4185
                                case *net.IPNet:
35✔
4186
                                        ip = v.IP
35✔
4187
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
35✔
4188
                                case *net.IPAddr:
×
4189
                                        ip = v.IP
×
4190
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
×
4191
                                default:
×
4192
                                        continue
×
4193
                                }
4194
                        }
4195
                }
4196
        } else {
8✔
4197
                hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
8✔
4198
        }
8✔
4199
        return hostPorts
15✔
4200
}
4201

4202
// format the address of a net.Listener with a protocol
4203
func formatURL(protocol string, addr net.Listener) []string {
15✔
4204
        hostports := resolveHostPorts(addr)
15✔
4205
        for i, hp := range hostports {
58✔
4206
                hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
43✔
4207
        }
43✔
4208
        return hostports
15✔
4209
}
4210

4211
// Ports describes URLs that the server can be contacted in
4212
type Ports struct {
4213
        Nats       []string `json:"nats,omitempty"`
4214
        Monitoring []string `json:"monitoring,omitempty"`
4215
        Cluster    []string `json:"cluster,omitempty"`
4216
        Profile    []string `json:"profile,omitempty"`
4217
        WebSocket  []string `json:"websocket,omitempty"`
4218
}
4219

4220
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
4221
// resolved, it returns nil. Otherwise it returns a Ports struct
4222
// describing ports where the server can be contacted
4223
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
6✔
4224
        if s.readyForListeners(maxWait) {
12✔
4225
                opts := s.getOpts()
6✔
4226

6✔
4227
                s.mu.RLock()
6✔
4228
                tls := s.info.TLSRequired
6✔
4229
                listener := s.listener
6✔
4230
                httpListener := s.http
6✔
4231
                clusterListener := s.routeListener
6✔
4232
                profileListener := s.profiler
6✔
4233
                wsListener := s.websocket.listener
6✔
4234
                wss := s.websocket.tls
6✔
4235
                s.mu.RUnlock()
6✔
4236

6✔
4237
                ports := Ports{}
6✔
4238

6✔
4239
                if listener != nil {
12✔
4240
                        natsProto := "nats"
6✔
4241
                        if tls {
7✔
4242
                                natsProto = "tls"
1✔
4243
                        }
1✔
4244
                        ports.Nats = formatURL(natsProto, listener)
6✔
4245
                }
4246

4247
                if httpListener != nil {
8✔
4248
                        monProto := "http"
2✔
4249
                        if opts.HTTPSPort != 0 {
2✔
4250
                                monProto = "https"
×
4251
                        }
×
4252
                        ports.Monitoring = formatURL(monProto, httpListener)
2✔
4253
                }
4254

4255
                if clusterListener != nil {
8✔
4256
                        clusterProto := "nats"
2✔
4257
                        if opts.Cluster.TLSConfig != nil {
2✔
4258
                                clusterProto = "tls"
×
4259
                        }
×
4260
                        ports.Cluster = formatURL(clusterProto, clusterListener)
2✔
4261
                }
4262

4263
                if profileListener != nil {
8✔
4264
                        ports.Profile = formatURL("http", profileListener)
2✔
4265
                }
2✔
4266

4267
                if wsListener != nil {
9✔
4268
                        protocol := wsSchemePrefix
3✔
4269
                        if wss {
6✔
4270
                                protocol = wsSchemePrefixTLS
3✔
4271
                        }
3✔
4272
                        ports.WebSocket = formatURL(protocol, wsListener)
3✔
4273
                }
4274

4275
                return &ports
6✔
4276
        }
4277

4278
        return nil
×
4279
}
4280

4281
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
4282
// path is used instead of the server option value
4283
func (s *Server) portFile(dirHint string) string {
6✔
4284
        dirname := s.getOpts().PortsFileDir
6✔
4285
        if dirHint != "" {
12✔
4286
                dirname = dirHint
6✔
4287
        }
6✔
4288
        if dirname == _EMPTY_ {
6✔
4289
                return _EMPTY_
×
4290
        }
×
4291
        return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
6✔
4292
}
4293

4294
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
4295
// path is used instead of the server option value
4296
func (s *Server) deletePortsFile(hintDir string) {
3✔
4297
        portsFile := s.portFile(hintDir)
3✔
4298
        if portsFile != "" {
6✔
4299
                if err := os.Remove(portsFile); err != nil {
3✔
4300
                        s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
×
4301
                }
×
4302
        }
4303
}
4304

4305
// Writes a file with a serialized Ports to the specified ports_file_dir.
4306
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
4307
// if ports file is not set, this function has no effect
4308
func (s *Server) logPorts() {
3✔
4309
        opts := s.getOpts()
3✔
4310
        portsFile := s.portFile(opts.PortsFileDir)
3✔
4311
        if portsFile != _EMPTY_ {
6✔
4312
                go func() {
6✔
4313
                        info := s.PortsInfo(5 * time.Second)
3✔
4314
                        if info == nil {
3✔
4315
                                s.Errorf("Unable to resolve the ports in the specified time")
×
4316
                                return
×
4317
                        }
×
4318
                        data, err := json.Marshal(info)
3✔
4319
                        if err != nil {
3✔
4320
                                s.Errorf("Error marshaling ports file: %v", err)
×
4321
                                return
×
4322
                        }
×
4323
                        if err := os.WriteFile(portsFile, data, 0666); err != nil {
3✔
4324
                                s.Errorf("Error writing ports file (%s): %v", portsFile, err)
×
4325
                                return
×
4326
                        }
×
4327

4328
                }()
4329
        }
4330
}
4331

4332
// waits until a calculated list of listeners is resolved or a timeout
4333
func (s *Server) readyForListeners(dur time.Duration) bool {
6✔
4334
        end := time.Now().Add(dur)
6✔
4335
        for time.Now().Before(end) {
13✔
4336
                s.mu.RLock()
7✔
4337
                listeners := s.serviceListeners()
7✔
4338
                s.mu.RUnlock()
7✔
4339
                if len(listeners) == 0 {
7✔
4340
                        return false
×
4341
                }
×
4342

4343
                ok := true
7✔
4344
                for _, l := range listeners {
24✔
4345
                        if l == nil {
18✔
4346
                                ok = false
1✔
4347
                                break
1✔
4348
                        }
4349
                }
4350
                if ok {
13✔
4351
                        return true
6✔
4352
                }
6✔
4353
                select {
1✔
4354
                case <-s.quitCh:
×
4355
                        return false
×
4356
                case <-time.After(25 * time.Millisecond):
1✔
4357
                        // continue - unable to select from quit - we are still running
4358
                }
4359
        }
4360
        return false
×
4361
}
4362

4363
// returns a list of listeners that are intended for the process
4364
// if the entry is nil, the interface is yet to be resolved
4365
func (s *Server) serviceListeners() []net.Listener {
7✔
4366
        listeners := make([]net.Listener, 0)
7✔
4367
        opts := s.getOpts()
7✔
4368
        listeners = append(listeners, s.listener)
7✔
4369
        if opts.Cluster.Port != 0 {
10✔
4370
                listeners = append(listeners, s.routeListener)
3✔
4371
        }
3✔
4372
        if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
10✔
4373
                listeners = append(listeners, s.http)
3✔
4374
        }
3✔
4375
        if opts.ProfPort != 0 {
10✔
4376
                listeners = append(listeners, s.profiler)
3✔
4377
        }
3✔
4378
        if opts.Websocket.Port != 0 {
11✔
4379
                listeners = append(listeners, s.websocket.listener)
4✔
4380
        }
4✔
4381
        return listeners
7✔
4382
}
4383

4384
// Returns true if in lame duck mode.
4385
func (s *Server) isLameDuckMode() bool {
21,445✔
4386
        s.mu.RLock()
21,445✔
4387
        defer s.mu.RUnlock()
21,445✔
4388
        return s.ldm
21,445✔
4389
}
21,445✔
4390

4391
// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
4392
// the client listener is closed, existing client connections are
4393
// kicked, Raft leaderships are transferred, JetStream is shutdown
4394
// and then finally shutdown the the NATS Server itself.
4395
// This function blocks and will not return until the NATS Server
4396
// has completed the entire shutdown operation.
4397
func (s *Server) LameDuckShutdown() {
1✔
4398
        s.lameDuckMode()
1✔
4399
}
1✔
4400

4401
// This function will close the client listener then close the clients
4402
// at some interval to avoid a reconnect storm.
4403
// We will also transfer any raft leaders and shutdown JetStream.
4404
func (s *Server) lameDuckMode() {
6✔
4405
        s.mu.Lock()
6✔
4406
        // Check if there is actually anything to do
6✔
4407
        if s.isShuttingDown() || s.ldm || s.listener == nil {
6✔
4408
                s.mu.Unlock()
×
4409
                return
×
4410
        }
×
4411
        s.Noticef("Entering lame duck mode, stop accepting new clients")
6✔
4412
        s.ldm = true
6✔
4413
        s.sendLDMShutdownEventLocked()
6✔
4414
        expected := 1
6✔
4415
        s.listener.Close()
6✔
4416
        s.listener = nil
6✔
4417
        expected += s.closeWebsocketServer()
6✔
4418
        s.ldmCh = make(chan bool, expected)
6✔
4419
        opts := s.getOpts()
6✔
4420
        gp := opts.LameDuckGracePeriod
6✔
4421
        // For tests, we want the grace period to be in some cases bigger
6✔
4422
        // than the ldm duration, so to by-pass the validateOptions() check,
6✔
4423
        // we use negative number and flip it here.
6✔
4424
        if gp < 0 {
11✔
4425
                gp *= -1
5✔
4426
        }
5✔
4427
        s.mu.Unlock()
6✔
4428

6✔
4429
        // If we are running any raftNodes transfer leaders.
6✔
4430
        if hadTransfers := s.transferRaftLeaders(); hadTransfers {
10✔
4431
                // They will transfer leadership quickly, but wait here for a second.
4✔
4432
                select {
4✔
4433
                case <-time.After(time.Second):
4✔
4434
                case <-s.quitCh:
×
4435
                        return
×
4436
                }
4437
        }
4438

4439
        // Now check and shutdown jetstream.
4440
        s.shutdownJetStream()
6✔
4441

6✔
4442
        // Now shutdown the nodes
6✔
4443
        s.shutdownRaftNodes()
6✔
4444

6✔
4445
        // Wait for accept loops to be done to make sure that no new
6✔
4446
        // client can connect
6✔
4447
        for i := 0; i < expected; i++ {
12✔
4448
                <-s.ldmCh
6✔
4449
        }
6✔
4450

4451
        s.mu.Lock()
6✔
4452
        // Need to recheck few things
6✔
4453
        if s.isShuttingDown() || len(s.clients) == 0 {
6✔
4454
                s.mu.Unlock()
×
4455
                // If there is no client, we need to call Shutdown() to complete
×
4456
                // the LDMode. If server has been shutdown while lock was released,
×
4457
                // calling Shutdown() should be no-op.
×
4458
                s.Shutdown()
×
4459
                return
×
4460
        }
×
4461
        dur := int64(opts.LameDuckDuration)
6✔
4462
        dur -= int64(gp)
6✔
4463
        if dur <= 0 {
11✔
4464
                dur = int64(time.Second)
5✔
4465
        }
5✔
4466
        numClients := int64(len(s.clients))
6✔
4467
        batch := 1
6✔
4468
        // Sleep interval between each client connection close.
6✔
4469
        var si int64
6✔
4470
        if numClients != 0 {
12✔
4471
                si = dur / numClients
6✔
4472
        }
6✔
4473
        if si < 1 {
6✔
4474
                // Should not happen (except in test with very small LD duration), but
×
4475
                // if there are too many clients, batch the number of close and
×
4476
                // use a tiny sleep interval that will result in yield likely.
×
4477
                si = 1
×
4478
                batch = int(numClients / dur)
×
4479
        } else if si > int64(time.Second) {
7✔
4480
                // Conversely, there is no need to sleep too long between clients
1✔
4481
                // and spread say 10 clients for the 2min duration. Sleeping no
1✔
4482
                // more than 1sec.
1✔
4483
                si = int64(time.Second)
1✔
4484
        }
1✔
4485

4486
        // Now capture all clients
4487
        clients := make([]*client, 0, len(s.clients))
6✔
4488
        for _, client := range s.clients {
22✔
4489
                clients = append(clients, client)
16✔
4490
        }
16✔
4491
        // Now that we know that no new client can be accepted,
4492
        // send INFO to routes and clients to notify this state.
4493
        s.sendLDMToRoutes()
6✔
4494
        s.sendLDMToClients()
6✔
4495
        s.mu.Unlock()
6✔
4496

6✔
4497
        t := time.NewTimer(gp)
6✔
4498
        // Delay start of closing of client connections in case
6✔
4499
        // we have several servers that we want to signal to enter LD mode
6✔
4500
        // and not have their client reconnect to each other.
6✔
4501
        select {
6✔
4502
        case <-t.C:
6✔
4503
                s.Noticef("Closing existing clients")
6✔
4504
        case <-s.quitCh:
×
4505
                t.Stop()
×
4506
                return
×
4507
        }
4508
        for i, client := range clients {
22✔
4509
                client.closeConnection(ServerShutdown)
16✔
4510
                if i == len(clients)-1 {
22✔
4511
                        break
6✔
4512
                }
4513
                if batch == 1 || i%batch == 0 {
20✔
4514
                        // We pick a random interval which will be at least si/2
10✔
4515
                        v := rand.Int63n(si)
10✔
4516
                        if v < si/2 {
12✔
4517
                                v = si / 2
2✔
4518
                        }
2✔
4519
                        t.Reset(time.Duration(v))
10✔
4520
                        // Sleep for given interval or bail out if kicked by Shutdown().
10✔
4521
                        select {
10✔
4522
                        case <-t.C:
10✔
4523
                        case <-s.quitCh:
×
4524
                                t.Stop()
×
4525
                                return
×
4526
                        }
4527
                }
4528
        }
4529
        s.Shutdown()
6✔
4530
        s.WaitForShutdown()
6✔
4531
}
4532

4533
// Send an INFO update to routes with the indication that this server is in LDM mode.
4534
// Server lock is held on entry.
4535
func (s *Server) sendLDMToRoutes() {
6✔
4536
        s.routeInfo.LameDuckMode = true
6✔
4537
        infoJSON := generateInfoJSON(&s.routeInfo)
6✔
4538
        s.forEachRemote(func(r *client) {
17✔
4539
                r.mu.Lock()
11✔
4540
                r.enqueueProto(infoJSON)
11✔
4541
                r.mu.Unlock()
11✔
4542
        })
11✔
4543
        // Clear now so that we notify only once, should we have to send other INFOs.
4544
        s.routeInfo.LameDuckMode = false
6✔
4545
}
4546

4547
// Send an INFO update to clients with the indication that this server is in
4548
// LDM mode and with only URLs of other nodes.
4549
// Server lock is held on entry.
4550
func (s *Server) sendLDMToClients() {
6✔
4551
        s.info.LameDuckMode = true
6✔
4552
        // Clear this so that if there are further updates, we don't send our URLs.
6✔
4553
        s.clientConnectURLs = s.clientConnectURLs[:0]
6✔
4554
        if s.websocket.connectURLs != nil {
6✔
4555
                s.websocket.connectURLs = s.websocket.connectURLs[:0]
×
4556
        }
×
4557
        // Reset content first.
4558
        s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
6✔
4559
        s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
6✔
4560
        // Only add the other nodes if we are allowed to.
6✔
4561
        if !s.getOpts().Cluster.NoAdvertise {
12✔
4562
                for url := range s.clientConnectURLsMap {
18✔
4563
                        s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
12✔
4564
                }
12✔
4565
                for url := range s.websocket.connectURLsMap {
6✔
4566
                        s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
×
4567
                }
×
4568
        }
4569
        // Send to all registered clients that support async INFO protocols.
4570
        s.sendAsyncInfoToClients(true, true)
6✔
4571
        // We now clear the info.LameDuckMode flag so that if there are
6✔
4572
        // cluster updates and we send the INFO, we don't have the boolean
6✔
4573
        // set which would cause multiple LDM notifications to clients.
6✔
4574
        s.info.LameDuckMode = false
6✔
4575
}
4576

4577
// If given error is a net.Error and is temporary, sleeps for the given
4578
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
4579
// interrupted if the server is shutdown.
4580
// An error message is displayed depending on the type of error.
4581
// Returns the new (or unchanged) delay, or a negative value if the
4582
// server has been or is being shutdown.
4583
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
16,250✔
4584
        if !s.isRunning() {
32,500✔
4585
                return -1
16,250✔
4586
        }
16,250✔
4587
        //lint:ignore SA1019 We want to retry on a bunch of errors here.
4588
        if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
×
4589
                s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
×
4590
                select {
×
4591
                case <-time.After(tmpDelay):
×
4592
                case <-s.quitCh:
×
4593
                        return -1
×
4594
                }
4595
                tmpDelay *= 2
×
4596
                if tmpDelay > ACCEPT_MAX_SLEEP {
×
4597
                        tmpDelay = ACCEPT_MAX_SLEEP
×
4598
                }
×
4599
        } else {
×
4600
                s.Errorf("%s Accept error: %v", acceptName, err)
×
4601
        }
×
4602
        return tmpDelay
×
4603
}
4604

4605
var errNoIPAvail = errors.New("no IP available")
4606

4607
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
133,962✔
4608
        host, port, err := net.SplitHostPort(url)
133,962✔
4609
        if err != nil {
133,963✔
4610
                return "", err
1✔
4611
        }
1✔
4612
        // If already an IP, skip.
4613
        if net.ParseIP(host) != nil {
267,790✔
4614
                return url, nil
133,829✔
4615
        }
133,829✔
4616
        ips, err := resolver.LookupHost(context.Background(), host)
132✔
4617
        if err != nil {
137✔
4618
                return "", fmt.Errorf("lookup for host %q: %v", host, err)
5✔
4619
        }
5✔
4620
        if len(excludedAddresses) > 0 {
144✔
4621
                for i := 0; i < len(ips); i++ {
51✔
4622
                        ip := ips[i]
34✔
4623
                        addr := net.JoinHostPort(ip, port)
34✔
4624
                        if _, excluded := excludedAddresses[addr]; excluded {
40✔
4625
                                if len(ips) == 1 {
9✔
4626
                                        ips = nil
3✔
4627
                                        break
3✔
4628
                                }
4629
                                ips[i] = ips[len(ips)-1]
3✔
4630
                                ips = ips[:len(ips)-1]
3✔
4631
                                i--
3✔
4632
                        }
4633
                }
4634
                if len(ips) == 0 {
20✔
4635
                        return "", errNoIPAvail
3✔
4636
                }
3✔
4637
        }
4638
        var address string
124✔
4639
        if len(ips) == 0 {
125✔
4640
                s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
1✔
4641
                address = url
1✔
4642
        } else {
124✔
4643
                var ip string
123✔
4644
                if len(ips) == 1 {
129✔
4645
                        ip = ips[0]
6✔
4646
                } else {
123✔
4647
                        ip = ips[rand.Int31n(int32(len(ips)))]
117✔
4648
                }
117✔
4649
                // add the port
4650
                address = net.JoinHostPort(ip, port)
123✔
4651
        }
4652
        return address, nil
124✔
4653
}
4654

4655
// Returns true for the first attempt and depending on the nature
4656
// of the attempt (first connect or a reconnect), when the number
4657
// of attempts is equal to the configured report attempts.
4658
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
76,761✔
4659
        opts := s.getOpts()
76,761✔
4660
        if firstConnect {
118,434✔
4661
                if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
51,276✔
4662
                        return true
9,603✔
4663
                }
9,603✔
4664
                return false
32,070✔
4665
        }
4666
        if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
70,176✔
4667
                return true
35,088✔
4668
        }
35,088✔
4669
        return false
×
4670
}
4671

4672
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
3,194✔
4673
        s.updateRouteSubscriptionMap(acc, sub, delta)
3,194✔
4674
        if s.gateway.enabled {
3,845✔
4675
                s.gatewayUpdateSubInterest(acc.Name, sub, delta)
651✔
4676
        }
651✔
4677

4678
        acc.updateLeafNodes(sub, delta)
3,194✔
4679
}
4680

4681
func (s *Server) startRateLimitLogExpiration() {
6,575✔
4682
        interval := time.Second
6,575✔
4683
        s.startGoRoutine(func() {
13,150✔
4684
                defer s.grWG.Done()
6,575✔
4685

6,575✔
4686
                ticker := time.NewTicker(time.Second)
6,575✔
4687
                defer ticker.Stop()
6,575✔
4688
                for {
30,416✔
4689
                        select {
23,841✔
4690
                        case <-s.quitCh:
6,563✔
4691
                                return
6,563✔
4692
                        case interval = <-s.rateLimitLoggingCh:
×
4693
                                ticker.Reset(interval)
×
4694
                        case <-ticker.C:
17,266✔
4695
                                s.rateLimitLogging.Range(func(k, v any) bool {
20,470✔
4696
                                        start := v.(time.Time)
3,204✔
4697
                                        if time.Since(start) >= interval {
4,480✔
4698
                                                s.rateLimitLogging.Delete(k)
1,276✔
4699
                                        }
1,276✔
4700
                                        return true
3,204✔
4701
                                })
4702
                        }
4703
                }
4704
        })
4705
}
4706

4707
func (s *Server) changeRateLimitLogInterval(d time.Duration) {
×
4708
        if d <= 0 {
×
4709
                return
×
4710
        }
×
4711
        select {
×
4712
        case s.rateLimitLoggingCh <- d:
×
4713
        default:
×
4714
        }
4715
}
4716

4717
// DisconnectClientByID disconnects a client by connection ID
4718
func (s *Server) DisconnectClientByID(id uint64) error {
2✔
4719
        if s == nil {
2✔
4720
                return ErrServerNotRunning
×
4721
        }
×
4722
        if client := s.getClient(id); client != nil {
3✔
4723
                client.closeConnection(Kicked)
1✔
4724
                return nil
1✔
4725
        } else if client = s.GetLeafNode(id); client != nil {
3✔
4726
                client.closeConnection(Kicked)
1✔
4727
                return nil
1✔
4728
        }
1✔
4729
        return errors.New("no such client or leafnode id")
×
4730
}
4731

4732
// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
4733
func (s *Server) LDMClientByID(id uint64) error {
1✔
4734
        if s == nil {
1✔
4735
                return ErrServerNotRunning
×
4736
        }
×
4737
        s.mu.RLock()
1✔
4738
        c := s.clients[id]
1✔
4739
        if c == nil {
1✔
4740
                s.mu.RUnlock()
×
4741
                return errors.New("no such client id")
×
4742
        }
×
4743
        info := s.copyInfo()
1✔
4744
        info.LameDuckMode = true
1✔
4745
        s.mu.RUnlock()
1✔
4746
        c.mu.Lock()
1✔
4747
        defer c.mu.Unlock()
1✔
4748
        if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
2✔
4749
                // sendInfo takes care of checking if the connection is still
1✔
4750
                // valid or not, so don't duplicate tests here.
1✔
4751
                c.Debugf("Sending Lame Duck Mode info to client")
1✔
4752
                c.enqueueProto(c.generateClientInfoJSON(info, true))
1✔
4753
                return nil
1✔
4754
        } else {
1✔
4755
                return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
×
4756
        }
×
4757
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc