• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25621091956

08 May 2026 01:49PM UTC coverage: 81.505% (-1.7%) from 83.18%
25621091956

push

github

web-flow
MQTT: Return `errMQTTUnsupportedCharacters` for control characters on both pub and sub (#8112)

Replaces #8104.

75814 of 93018 relevant lines covered (81.5%)

630142.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.56
/server/server.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "crypto/fips140"
20
        "crypto/tls"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math/rand"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path"
33
        "path/filepath"
34
        "regexp"
35
        "runtime"
36
        "runtime/pprof"
37
        "strconv"
38
        "strings"
39
        "sync"
40
        "sync/atomic"
41
        "time"
42

43
        // Allow dynamic profiling.
44
        _ "net/http/pprof"
45

46
        "expvar"
47

48
        "github.com/klauspost/compress/s2"
49
        "github.com/nats-io/jwt/v2"
50
        "github.com/nats-io/nats-server/v2/logger"
51
        "github.com/nats-io/nkeys"
52
        "github.com/nats-io/nuid"
53
)
54

55
const (
56
        // Interval for the first PING for non client connections.
57
        firstPingInterval = time.Second
58

59
        // This is for the first ping for client connections.
60
        firstClientPingInterval = 2 * time.Second
61
)
62

63
// These are protocol versions sent between server connections: ROUTER, LEAF and
64
// GATEWAY. We may have protocol versions that have a meaning only for a certain
65
// type of connections, but we don't have to have separate enums for that.
66
// However, it is CRITICAL to not change the order of those constants since they
67
// are exchanged between servers. When adding a new protocol version, add to the
68
// end of the list, don't try to group them by connection types.
69
const (
70
        // RouteProtoZero is the original Route protocol from 2009.
71
        // http://nats.io/documentation/internals/nats-protocol/
72
        RouteProtoZero = iota
73
        // RouteProtoInfo signals a route can receive more then the original INFO block.
74
        // This can be used to update remote cluster permissions, etc...
75
        RouteProtoInfo
76
        // RouteProtoV2 is the new route/cluster protocol that provides account support.
77
        RouteProtoV2
78
        // MsgTraceProto indicates that this server understands distributed message tracing.
79
        MsgTraceProto
80
)
81

82
// Will return the latest server-to-server protocol versions, unless the
83
// option to override it is set.
84
func (s *Server) getServerProto() int {
10,365✔
85
        opts := s.getOpts()
10,365✔
86
        // Initialize with the latest protocol version.
10,365✔
87
        proto := MsgTraceProto
10,365✔
88
        // For tests, we want to be able to make this server behave
10,365✔
89
        // as an older server so check this option to see if we should override.
10,365✔
90
        if opts.overrideProto < 0 {
10,365✔
91
                // The option overrideProto is set to 0 by default (when creating an
×
92
                // Options structure). Since this is the same value than the original
×
93
                // proto RouteProtoZero, tests call setServerProtoForTest() with the
×
94
                // desired protocol level, which sets it as negative value equal to:
×
95
                // (wantedProto + 1) * -1. Here we compute back the real value.
×
96
                proto = (opts.overrideProto * -1) - 1
×
97
        }
×
98
        return proto
10,365✔
99
}
100

101
// Used by tests.
102
func setServerProtoForTest(wantedProto int) int {
×
103
        return (wantedProto + 1) * -1
×
104
}
×
105

106
// Info is the information sent to clients, routes, gateways, and leaf nodes,
107
// to help them understand information about this server.
108
type Info struct {
109
        ID                string   `json:"server_id"`
110
        Name              string   `json:"server_name"`
111
        Version           string   `json:"version"`
112
        Proto             int      `json:"proto"`
113
        GitCommit         string   `json:"git_commit,omitempty"`
114
        GoVersion         string   `json:"go"`
115
        Host              string   `json:"host"`
116
        Port              int      `json:"port"`
117
        Headers           bool     `json:"headers"`
118
        AuthRequired      bool     `json:"auth_required,omitempty"`
119
        TLSRequired       bool     `json:"tls_required,omitempty"`
120
        TLSVerify         bool     `json:"tls_verify,omitempty"`
121
        TLSAvailable      bool     `json:"tls_available,omitempty"`
122
        MaxPayload        int32    `json:"max_payload"`
123
        JetStream         bool     `json:"jetstream,omitempty"`
124
        IP                string   `json:"ip,omitempty"`
125
        CID               uint64   `json:"client_id,omitempty"`
126
        ClientIP          string   `json:"client_ip,omitempty"`
127
        Nonce             string   `json:"nonce,omitempty"`
128
        Cluster           string   `json:"cluster,omitempty"`
129
        Dynamic           bool     `json:"cluster_dynamic,omitempty"`
130
        Domain            string   `json:"domain,omitempty"`
131
        ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
132
        WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
133
        LameDuckMode      bool     `json:"ldm,omitempty"`
134
        Compression       string   `json:"compression,omitempty"`
135
        ConnectInfo       bool     `json:"connect_info,omitempty"`   // When true this is the server INFO response to CONNECT
136
        RemoteAccount     string   `json:"remote_account,omitempty"` // Lets the client or leafnode side know the remote account that they bind to.
137
        IsSystemAccount   bool     `json:"acc_is_sys,omitempty"`     // Indicates if the account is a system account.
138
        JSApiLevel        int      `json:"api_lvl,omitempty"`
139

140
        // Route Specific
141
        Import        *SubjectPermission `json:"import,omitempty"`
142
        Export        *SubjectPermission `json:"export,omitempty"`
143
        LNOC          bool               `json:"lnoc,omitempty"`
144
        LNOCU         bool               `json:"lnocu,omitempty"`
145
        InfoOnConnect bool               `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
146
        RoutePoolSize int                `json:"route_pool_size,omitempty"`
147
        RoutePoolIdx  int                `json:"route_pool_idx,omitempty"`
148
        RouteAccount  string             `json:"route_account,omitempty"`
149
        RouteAccReqID string             `json:"route_acc_add_reqid,omitempty"`
150
        GossipMode    byte               `json:"gossip_mode,omitempty"`
151

152
        // Gateways Specific
153
        Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
154
        GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
155
        GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
156
        GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
157
        GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
158
        GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies
159
        GatewayIOM        bool     `json:"gateway_iom,omitempty"`         // Indicate that all accounts will be switched to InterestOnly mode "right away"
160

161
        // LeafNode Specific
162
        LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
163

164
        XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
165
}
166

167
// Server is our main struct.
168
type Server struct {
169
        // Fields accessed with atomic operations need to be 64-bit aligned
170
        gcid uint64
171
        // How often user logon fails due to the issuer account not being pinned.
172
        pinnedAccFail uint64
173
        stats
174
        scStats
175
        staleStats
176
        mu                  sync.RWMutex
177
        reloadMu            sync.RWMutex // Write-locked when a config reload is taking place ONLY
178
        kp                  nkeys.KeyPair
179
        xkp                 nkeys.KeyPair
180
        xpub                string
181
        info                Info
182
        configFile          string
183
        optsMu              sync.RWMutex
184
        opts                *Options
185
        running             atomic.Bool
186
        shutdown            atomic.Bool
187
        listener            net.Listener
188
        listenerErr         error
189
        gacc                *Account
190
        sys                 *internal
191
        sysAcc              atomic.Pointer[Account]
192
        js                  atomic.Pointer[jetStream]
193
        isMetaLeader        atomic.Bool
194
        jsClustered         atomic.Bool
195
        accounts            sync.Map
196
        tmpAccounts         sync.Map // Temporarily stores accounts that are being built
197
        activeAccounts      int32
198
        accResolver         AccountResolver
199
        clients             map[uint64]*client
200
        routes              map[string][]*client
201
        remoteRoutePoolSize map[string]int                // Map for remote's configure route pool size
202
        routesPoolSize      int                           // Configured pool size
203
        routesReject        bool                          // During reload, we may want to reject adding routes until some conditions are met
204
        routesNoPool        int                           // Number of routes that don't use pooling (connecting to older server for instance)
205
        accRoutes           map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
206
        accRouteByHash      sync.Map                      // Key is account name, value is nil or a pool index
207
        accAddedCh          chan struct{}
208
        accAddedReqID       string
209
        leafs               map[uint64]*client
210
        users               map[string]*User
211
        nkeys               map[string]*NkeyUser
212
        totalClients        uint64
213
        closed              *closedRingBuffer
214
        done                chan bool
215
        start               time.Time
216
        http                net.Listener
217
        httpHandler         http.Handler
218
        httpBasePath        string
219
        profiler            net.Listener
220
        httpReqStats        map[string]uint64
221
        routeListener       net.Listener
222
        routeListenerErr    error
223
        routeInfo           Info
224
        routeResolver       netResolver
225
        routesToSelf        map[string]struct{}
226
        routeTLSName        string
227
        leafNodeListener    net.Listener
228
        leafNodeListenerErr error
229
        leafNodeInfo        Info
230
        leafNodeInfoJSON    []byte
231
        leafURLsMap         refCountedUrlSet
232
        leafNodeOpts        struct {
233
                resolver    netResolver
234
                dialTimeout time.Duration
235
        }
236
        leafRemoteCfgs     map[*leafNodeCfg]struct{}
237
        rmLeafRemoteCfgs   map[string]*leafNodeCfg
238
        leafRemoteAccounts sync.Map
239
        leafNodeEnabled    bool
240
        leafDisableConnect bool // Used in test only
241
        leafNoCluster      bool // Indicate that this server has only remotes and no cluster defined
242

243
        quitCh           chan struct{}
244
        startupComplete  chan struct{}
245
        shutdownComplete chan struct{}
246

247
        // Tracking Go routines
248
        grMu         sync.Mutex
249
        grTmpClients map[uint64]*client
250
        grRunning    bool
251
        grWG         sync.WaitGroup // to wait on various go routines
252

253
        cproto     int64     // number of clients supporting async INFO
254
        configTime time.Time // last time config was loaded
255

256
        logging struct {
257
                sync.RWMutex
258
                logger      Logger
259
                trace       int32
260
                debug       int32
261
                traceSysAcc int32
262
        }
263

264
        clientConnectURLs []string
265

266
        // Used internally for quick look-ups.
267
        clientConnectURLsMap refCountedUrlSet
268

269
        // For Gateways
270
        gatewayListener    net.Listener // Accept listener
271
        gatewayListenerErr error
272
        gateway            *srvGateway
273

274
        // Used by tests to check that http.Servers do
275
        // not set any timeout.
276
        monitoringServer *http.Server
277
        profilingServer  *http.Server
278

279
        // LameDuck mode
280
        ldm   bool
281
        ldmCh chan bool
282

283
        // Trusted public operator keys.
284
        trustedKeys []string
285
        // map of trusted keys to operator setting StrictSigningKeyUsage
286
        strictSigningKeyUsage map[string]struct{}
287

288
        // We use this to minimize mem copies for requests to monitoring
289
        // endpoint /varz (when it comes from http).
290
        varzMu sync.Mutex
291
        varz   *Varz
292
        // This is set during a config reload if we detect that we have
293
        // added/removed routes. The monitoring code then check that
294
        // to know if it should update the cluster's URLs array.
295
        varzUpdateRouteURLs bool
296

297
        // Keeps a sublist of of subscriptions attached to leafnode connections
298
        // for the $GNR.*.*.*.> subject so that a server can send back a mapped
299
        // gateway reply.
300
        gwLeafSubs *Sublist
301

302
        // Used for expiration of mapped GW replies
303
        gwrm struct {
304
                w  int32
305
                ch chan time.Duration
306
                m  sync.Map
307
        }
308

309
        // For eventIDs
310
        eventIds *nuid.NUID
311

312
        // Websocket structure
313
        websocket srvWebsocket
314

315
        // MQTT structure
316
        mqtt srvMQTT
317

318
        // OCSP monitoring
319
        ocsps []*OCSPMonitor
320

321
        // OCSP peer verification (at least one TLS block)
322
        ocspPeerVerify bool
323

324
        // OCSP response cache
325
        ocsprc OCSPResponseCache
326

327
        // exporting account name the importer experienced issues with
328
        incompleteAccExporterMap sync.Map
329

330
        // Holds cluster name under different lock for mapping
331
        cnMu sync.RWMutex
332
        cn   string
333

334
        // For registering raft nodes with the server.
335
        rnMu      sync.RWMutex
336
        raftNodes map[string]RaftNode
337

338
        // For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
339
        nodeToInfo sync.Map
340

341
        // For out of resources to not log errors too fast.
342
        rerrMu   sync.Mutex
343
        rerrLast time.Time
344

345
        connRateCounter *rateCounter
346

347
        // If there is a system account configured, to still support the $G account,
348
        // the server will create a fake user and add it to the list of users.
349
        // Keep track of what that user name is for config reload purposes.
350
        sysAccOnlyNoAuthUser string
351

352
        // IPQueues map
353
        ipQueues sync.Map
354

355
        // To limit logging frequency
356
        rateLimitLogging   sync.Map
357
        rateLimitLoggingCh chan time.Duration
358

359
        // Total outstanding catchup bytes in flight.
360
        gcbMu     sync.RWMutex
361
        gcbOut    int64
362
        gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
363
        // A global chanel to kick out stalled catchup sequences.
364
        gcbKick chan struct{}
365

366
        // Total outbound syncRequests
367
        syncOutSem chan struct{}
368

369
        // Queue to process JS API requests that come from routes (or gateways)
370
        jsAPIRoutedReqs     *ipQueue[*jsAPIRoutedReq]
371
        jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq]
372

373
        // Delayed API responses.
374
        delayedAPIResponses *ipQueue[*delayedAPIResponse]
375

376
        // Whether moving NRG traffic into accounts is permitted on this server.
377
        // Controls whether or not the account NRG capability is set in statsz.
378
        // Currently used by unit tests to simulate nodes not supporting account NRG.
379
        accountNRGAllowed atomic.Bool
380

381
        // List of proxies trusted keys in `KeyPair` form so we can do signature
382
        // verification when processing incoming proxy connections.
383
        proxiesKeyPairs []nkeys.KeyPair
384
        proxiedConns    map[string]map[uint64]*client
385
}
386

387
// For tracking JS nodes.
388
type nodeInfo struct {
389
        name            string
390
        version         string
391
        cluster         string
392
        domain          string
393
        id              string
394
        tags            jwt.TagList
395
        cfg             *JetStreamConfig
396
        stats           *JetStreamStats
397
        offline         bool
398
        js              bool
399
        binarySnapshots bool
400
        accountNRG      bool
401
}
402

403
type stats struct {
404
        inMsgs           int64
405
        outMsgs          int64
406
        inBytes          int64
407
        outBytes         int64
408
        slowConsumers    int64
409
        staleConnections int64
410
        stalls           int64
411
}
412

413
// scStats includes the total and per connection counters of Slow Consumers.
414
type scStats struct {
415
        clients  atomic.Uint64
416
        routes   atomic.Uint64
417
        leafs    atomic.Uint64
418
        gateways atomic.Uint64
419
}
420

421
// staleStats includes the total and per connection counters of Stale Connections.
422
type staleStats struct {
423
        clients  atomic.Uint64
424
        routes   atomic.Uint64
425
        leafs    atomic.Uint64
426
        gateways atomic.Uint64
427
}
428

429
// This is used by tests so we can run all server tests with a default route
430
// or leafnode compression mode. For instance:
431
// go test -race -v ./server -cluster_compression=fast
432
var (
433
        testDefaultClusterCompression  string
434
        testDefaultLeafNodeCompression string
435
)
436

437
// Compression modes.
438
const (
439
        CompressionNotSupported   = "not supported"
440
        CompressionOff            = "off"
441
        CompressionAccept         = "accept"
442
        CompressionS2Auto         = "s2_auto"
443
        CompressionS2Uncompressed = "s2_uncompressed"
444
        CompressionS2Fast         = "s2_fast"
445
        CompressionS2Better       = "s2_better"
446
        CompressionS2Best         = "s2_best"
447
)
448

449
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
450
// the CompressionS2Auto mode.
451
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
452
        // [0..10ms] -> CompressionS2Uncompressed
453
        10 * time.Millisecond,
454
        // ]10ms..50ms] -> CompressionS2Fast
455
        50 * time.Millisecond,
456
        // ]50ms..100ms] -> CompressionS2Better
457
        100 * time.Millisecond,
458
        // ]100ms..] -> CompressionS2Best
459
}
460

461
// For a given user provided string, matches to one of the compression mode
462
// constant and updates the provided string to that constant. Returns an
463
// error if the provided compression mode is not known.
464
// The parameter `chosenModeForOn` indicates which compression mode to use
465
// when the user selects "on" (or enabled, true, etc..). This is because
466
// we may have different defaults depending on where the compression is used.
467
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
11,029✔
468
        if c == nil {
11,029✔
469
                return nil
×
470
        }
×
471
        cmtl := strings.ToLower(c.Mode)
11,029✔
472
        // First, check for the "on" case so that we set to the default compression
11,029✔
473
        // mode for that. The other switch/case will finish setup if needed (for
11,029✔
474
        // instance if the default mode is s2Auto).
11,029✔
475
        switch cmtl {
11,029✔
476
        case "on", "enabled", "true":
14✔
477
                cmtl = chosenModeForOn
14✔
478
        default:
11,015✔
479
        }
480
        // Check (again) with the proper mode.
481
        switch cmtl {
11,029✔
482
        case "not supported", "not_supported":
2✔
483
                c.Mode = CompressionNotSupported
2✔
484
        case "disabled", "off", "false":
1,800✔
485
                c.Mode = CompressionOff
1,800✔
486
        case "accept":
4,174✔
487
                c.Mode = CompressionAccept
4,174✔
488
        case "auto", "s2_auto":
4,972✔
489
                var rtts []time.Duration
4,972✔
490
                if len(c.RTTThresholds) == 0 {
9,090✔
491
                        rtts = defaultCompressionS2AutoRTTThresholds
4,118✔
492
                } else {
4,972✔
493
                        for _, n := range c.RTTThresholds {
3,426✔
494
                                // Do not error on negative, but simply set to 0
2,572✔
495
                                if n < 0 {
2,576✔
496
                                        n = 0
4✔
497
                                }
4✔
498
                                // Make sure they are properly ordered. However, it is possible
499
                                // to have a "0" anywhere in the list to indicate that this
500
                                // compression level should not be used.
501
                                if l := len(rtts); l > 0 && n != 0 {
4,260✔
502
                                        for _, v := range rtts {
4,242✔
503
                                                if n < v {
2,556✔
504
                                                        return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
2✔
505
                                                }
2✔
506
                                        }
507
                                }
508
                                rtts = append(rtts, n)
2,570✔
509
                        }
510
                        if len(rtts) > 0 {
1,704✔
511
                                // Trim 0 that are at the end.
852✔
512
                                stop := -1
852✔
513
                                for i := len(rtts) - 1; i >= 0; i-- {
1,718✔
514
                                        if rtts[i] != 0 {
1,714✔
515
                                                stop = i
848✔
516
                                                break
848✔
517
                                        }
518
                                }
519
                                rtts = rtts[:stop+1]
852✔
520
                        }
521
                        if len(rtts) > 4 {
854✔
522
                                // There should be at most values for "uncompressed", "fast",
2✔
523
                                // "better" and "best" (when some 0 are present).
2✔
524
                                return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
2✔
525
                        } else if len(rtts) == 0 {
856✔
526
                                // But there should be at least 1 if the user provided the slice.
4✔
527
                                // We would be here only if it was provided by say with values
4✔
528
                                // being a single or all zeros.
4✔
529
                                return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
4✔
530
                        }
4✔
531
                }
532
                c.Mode = CompressionS2Auto
4,964✔
533
                c.RTTThresholds = rtts
4,964✔
534
        case "fast", "s2_fast":
25✔
535
                c.Mode = CompressionS2Fast
25✔
536
        case "better", "s2_better":
27✔
537
                c.Mode = CompressionS2Better
27✔
538
        case "best", "s2_best":
27✔
539
                c.Mode = CompressionS2Best
27✔
540
        default:
2✔
541
                return fmt.Errorf("unsupported compression mode %q", c.Mode)
2✔
542
        }
543
        return nil
11,019✔
544
}
545

546
// Returns `true` if the compression mode `m` indicates that the server
547
// will negotiate compression with the remote server, `false` otherwise.
548
// Note that the provided compression mode is assumed to have been
549
// normalized and validated.
550
func needsCompression(m string) bool {
175,813✔
551
        return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
175,813✔
552
}
175,813✔
553

554
// Compression is asymmetric, meaning that one side can have a different
555
// compression level than the other. However, we need to check for cases
556
// when this server `scm` or the remote `rcm` do not support compression
557
// (say older server, or test to make it behave as it is not), or have
558
// the compression off.
559
// Note that `scm` is assumed to not be "off" or "not supported".
560
func selectCompressionMode(scm, rcm string) (mode string, err error) {
42,410✔
561
        if rcm == CompressionNotSupported || rcm == _EMPTY_ {
42,413✔
562
                return CompressionNotSupported, nil
3✔
563
        }
3✔
564
        switch rcm {
42,407✔
565
        case CompressionOff:
16,835✔
566
                // If the remote explicitly disables compression, then we won't
16,835✔
567
                // use compression.
16,835✔
568
                return CompressionOff, nil
16,835✔
569
        case CompressionAccept:
24,423✔
570
                // If the remote is ok with compression (but is not initiating it),
24,423✔
571
                // and if we too are in this mode, then it means no compression.
24,423✔
572
                if scm == CompressionAccept {
48,843✔
573
                        return CompressionOff, nil
24,420✔
574
                }
24,420✔
575
                // Otherwise use our compression mode.
576
                return scm, nil
3✔
577
        case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
1,149✔
578
                // This case is here to make sure that if we don't recognize a
1,149✔
579
                // compression setting, we error out.
1,149✔
580
                if scm == CompressionAccept {
1,155✔
581
                        // If our compression mode is "accept", then we will use the remote
6✔
582
                        // compression mode, except if it is "auto", in which case we will
6✔
583
                        // default to "fast". This is not a configuration (auto in one
6✔
584
                        // side and accept in the other) that would be recommended.
6✔
585
                        if rcm == CompressionS2Auto {
8✔
586
                                return CompressionS2Fast, nil
2✔
587
                        }
2✔
588
                        // Use their compression mode.
589
                        return rcm, nil
4✔
590
                }
591
                // Otherwise use our compression mode.
592
                return scm, nil
1,143✔
593
        default:
×
594
                return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
×
595
        }
596
}
597

598
// If the configured compression mode is "auto" then will return that,
599
// otherwise will return the given `cm` compression mode.
600
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
17,899✔
601
        if co.Mode == CompressionS2Auto {
19,019✔
602
                return CompressionS2Auto
1,120✔
603
        }
1,120✔
604
        return cm
16,779✔
605
}
606

607
// Given a connection RTT and a list of thresholds durations, this
608
// function will return an S2 compression level such as "uncompressed",
609
// "fast", "better" or "best". For instance, with the following slice:
610
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
611
// in the compression level "uncompressed", ]5ms..10ms] will result in
612
// "fast" compression, etc..
613
// However, the 0 value allows for disabling of some compression levels.
614
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
615
// [0..20ms] would result in the "better" compression - effectively disabling
616
// the use of "uncompressed" and "fast", then anything above 20ms would
617
// result in the use of "best" level (the 30 in the list has no effect
618
// and the list could have been simplified to [0, 0, 20]).
619
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
1,429✔
620
        var idx int
1,429✔
621
        var found bool
1,429✔
622
        for i, d := range rttThresholds {
2,896✔
623
                if rtt <= d {
2,880✔
624
                        idx = i
1,413✔
625
                        found = true
1,413✔
626
                        break
1,413✔
627
                }
628
        }
629
        if !found {
1,445✔
630
                // If we did not find but we have all levels, then use "best",
16✔
631
                // otherwise use the last one in array.
16✔
632
                if l := len(rttThresholds); l >= 3 {
32✔
633
                        idx = 3
16✔
634
                } else {
16✔
635
                        idx = l - 1
×
636
                }
×
637
        }
638
        switch idx {
1,429✔
639
        case 0:
1,409✔
640
                return CompressionS2Uncompressed
1,409✔
641
        case 1:
2✔
642
                return CompressionS2Fast
2✔
643
        case 2:
2✔
644
                return CompressionS2Better
2✔
645
        }
646
        return CompressionS2Best
16✔
647
}
648

649
// Returns an array of s2 WriterOption based on the route compression mode.
650
// So far we return a single option, but this way we can call s2.NewWriter()
651
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
652
// this is more versatile.
653
func s2WriterOptions(cm string) []s2.WriterOption {
1,162✔
654
        _opts := [2]s2.WriterOption{}
1,162✔
655
        opts := append(
1,162✔
656
                _opts[:0],
1,162✔
657
                s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
1,162✔
658
        )
1,162✔
659
        switch cm {
1,162✔
660
        case CompressionS2Uncompressed:
1,110✔
661
                return append(opts, s2.WriterUncompressed())
1,110✔
662
        case CompressionS2Best:
27✔
663
                return append(opts, s2.WriterBestCompression())
27✔
664
        case CompressionS2Better:
13✔
665
                return append(opts, s2.WriterBetterCompression())
13✔
666
        default:
12✔
667
                return nil
12✔
668
        }
669
}
670

671
// New will setup a new server struct after parsing the options.
672
// DEPRECATED: Use NewServer(opts)
673
func New(opts *Options) *Server {
191✔
674
        s, _ := NewServer(opts)
191✔
675
        return s
191✔
676
}
191✔
677

678
func NewServerFromConfig(opts *Options) (*Server, error) {
×
679
        if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
×
680
                if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
×
681
                        return nil, err
×
682
                }
×
683
        }
684
        return NewServer(opts)
×
685
}
686

687
// NewServer will setup a new server struct after parsing the options.
688
// Could return an error if options can not be validated.
689
// The provided Options type should not be re-used afterwards.
690
// Either use Options.Clone() to pass a copy, or make a new one.
691
func NewServer(opts *Options) (*Server, error) {
6,951✔
692
        setBaselineOptions(opts)
6,951✔
693

6,951✔
694
        // Process TLS options, including whether we require client certificates.
6,951✔
695
        tlsReq := opts.TLSConfig != nil
6,951✔
696
        verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
6,951✔
697

6,951✔
698
        // Create our server's nkey identity.
6,951✔
699
        kp, _ := nkeys.CreateServer()
6,951✔
700
        pub, _ := kp.PublicKey()
6,951✔
701

6,951✔
702
        // Create an xkey for encrypting messages from this server.
6,951✔
703
        var xkp nkeys.KeyPair
6,951✔
704
        var xpub string
6,951✔
705
        if !fips140.Enabled() {
13,902✔
706
                xkp, _ = nkeys.CreateCurveKeys()
6,951✔
707
                xpub, _ = xkp.PublicKey()
6,951✔
708
        }
6,951✔
709

710
        serverName := pub
6,951✔
711
        if opts.ServerName != _EMPTY_ {
11,445✔
712
                serverName = opts.ServerName
4,494✔
713
        }
4,494✔
714

715
        httpBasePath := normalizeBasePath(opts.HTTPBasePath)
6,951✔
716

6,951✔
717
        // Validate some options. This is here because we cannot assume that
6,951✔
718
        // server will always be started with configuration parsing (that could
6,951✔
719
        // report issues). Its options can be (incorrectly) set by hand when
6,951✔
720
        // server is embedded. If there is an error, return nil.
6,951✔
721
        if err := validateOptions(opts); err != nil {
7,001✔
722
                return nil, err
50✔
723
        }
50✔
724

725
        info := Info{
6,901✔
726
                ID:           pub,
6,901✔
727
                XKey:         xpub,
6,901✔
728
                Version:      VERSION,
6,901✔
729
                Proto:        PROTO,
6,901✔
730
                GitCommit:    gitCommit,
6,901✔
731
                GoVersion:    runtime.Version(),
6,901✔
732
                Name:         serverName,
6,901✔
733
                Host:         opts.Host,
6,901✔
734
                Port:         opts.Port,
6,901✔
735
                AuthRequired: false,
6,901✔
736
                TLSRequired:  tlsReq && !opts.AllowNonTLS,
6,901✔
737
                TLSVerify:    verify,
6,901✔
738
                MaxPayload:   opts.MaxPayload,
6,901✔
739
                JetStream:    opts.JetStream,
6,901✔
740
                Headers:      !opts.NoHeaderSupport,
6,901✔
741
                Cluster:      opts.Cluster.Name,
6,901✔
742
                Domain:       opts.JetStreamDomain,
6,901✔
743
                JSApiLevel:   JSApiLevel,
6,901✔
744
        }
6,901✔
745

6,901✔
746
        if tlsReq && !info.TLSRequired {
6,904✔
747
                info.TLSAvailable = true
3✔
748
        }
3✔
749

750
        now := time.Now()
6,901✔
751

6,901✔
752
        s := &Server{
6,901✔
753
                kp:                 kp,
6,901✔
754
                xkp:                xkp,
6,901✔
755
                xpub:               xpub,
6,901✔
756
                configFile:         opts.ConfigFile,
6,901✔
757
                info:               info,
6,901✔
758
                opts:               opts,
6,901✔
759
                done:               make(chan bool, 1),
6,901✔
760
                start:              now,
6,901✔
761
                configTime:         now,
6,901✔
762
                gwLeafSubs:         NewSublistWithCache(),
6,901✔
763
                httpBasePath:       httpBasePath,
6,901✔
764
                eventIds:           nuid.New(),
6,901✔
765
                routesToSelf:       make(map[string]struct{}),
6,901✔
766
                httpReqStats:       make(map[string]uint64), // Used to track HTTP requests
6,901✔
767
                rateLimitLoggingCh: make(chan time.Duration, 1),
6,901✔
768
                leafNodeEnabled:    opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
6,901✔
769
                syncOutSem:         make(chan struct{}, maxConcurrentSyncRequests),
6,901✔
770
        }
6,901✔
771

6,901✔
772
        // Delayed API response queue. Create regardless if JetStream is configured
6,901✔
773
        // or not (since it can be enabled/disabled with config reload, we want this
6,901✔
774
        // queue to exist at all times).
6,901✔
775
        s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
6,901✔
776

6,901✔
777
        // By default we'll allow account NRG.
6,901✔
778
        s.accountNRGAllowed.Store(true)
6,901✔
779

6,901✔
780
        // Fill up the maximum in flight syncRequests for this server.
6,901✔
781
        // Used in JetStream catchup semantics.
6,901✔
782
        for i := 0; i < maxConcurrentSyncRequests; i++ {
227,733✔
783
                s.syncOutSem <- struct{}{}
220,832✔
784
        }
220,832✔
785

786
        if opts.TLSRateLimit > 0 {
6,902✔
787
                s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
1✔
788
        }
1✔
789

790
        // Trusted root operator keys.
791
        if !s.processTrustedKeys() {
6,901✔
792
                return nil, fmt.Errorf("Error processing trusted operator keys")
×
793
        }
×
794

795
        // If we have solicited leafnodes but no clustering and no clustername.
796
        // However we may need a stable clustername so use the server name.
797
        if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
7,908✔
798
                s.leafNoCluster = true
1,007✔
799
                opts.Cluster.Name = opts.ServerName
1,007✔
800
        }
1,007✔
801

802
        if opts.Cluster.Name != _EMPTY_ {
11,890✔
803
                // Also place into mapping cn with cnMu lock.
4,989✔
804
                s.cnMu.Lock()
4,989✔
805
                s.cn = opts.Cluster.Name
4,989✔
806
                s.cnMu.Unlock()
4,989✔
807
        }
4,989✔
808

809
        s.mu.Lock()
6,901✔
810
        defer s.mu.Unlock()
6,901✔
811

6,901✔
812
        // If there are proxies trusted public keys in the configuration
6,901✔
813
        // this will fill create the corresponding list of nkeys.KeyPair
6,901✔
814
        // that we can use for signature verification.
6,901✔
815
        s.processProxiesTrustedKeys()
6,901✔
816

6,901✔
817
        // Place ourselves in the JetStream nodeInfo if needed.
6,901✔
818
        if opts.JetStream {
11,341✔
819
                ourNode := getHash(serverName)
4,440✔
820
                s.nodeToInfo.Store(ourNode, nodeInfo{
4,440✔
821
                        name:            serverName,
4,440✔
822
                        version:         VERSION,
4,440✔
823
                        cluster:         opts.Cluster.Name,
4,440✔
824
                        domain:          opts.JetStreamDomain,
4,440✔
825
                        id:              info.ID,
4,440✔
826
                        tags:            opts.Tags,
4,440✔
827
                        cfg:             &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
4,440✔
828
                        stats:           nil,
4,440✔
829
                        offline:         false,
4,440✔
830
                        js:              true,
4,440✔
831
                        binarySnapshots: true,
4,440✔
832
                        accountNRG:      true,
4,440✔
833
                })
4,440✔
834
        }
4,440✔
835

836
        s.routeResolver = opts.Cluster.resolver
6,901✔
837
        if s.routeResolver == nil {
13,802✔
838
                s.routeResolver = net.DefaultResolver
6,901✔
839
        }
6,901✔
840

841
        // Used internally for quick look-ups.
842
        s.clientConnectURLsMap = make(refCountedUrlSet)
6,901✔
843
        s.websocket.connectURLsMap = make(refCountedUrlSet)
6,901✔
844
        s.leafURLsMap = make(refCountedUrlSet)
6,901✔
845

6,901✔
846
        // Ensure that non-exported options (used in tests) are properly set.
6,901✔
847
        s.setLeafNodeNonExportedOptions()
6,901✔
848

6,901✔
849
        // Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
6,901✔
850
        // are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
6,901✔
851
        if err := s.enableOCSP(); err != nil {
6,903✔
852
                return nil, err
2✔
853
        }
2✔
854

855
        // Call this even if there is no gateway defined. It will
856
        // initialize the structure so we don't have to check for
857
        // it to be nil or not in various places in the code.
858
        if err := s.newGateway(opts); err != nil {
6,899✔
859
                return nil, err
×
860
        }
×
861

862
        // If we have a cluster definition but do not have a cluster name, create one.
863
        if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
6,920✔
864
                s.info.Cluster = nuid.Next()
21✔
865
        } else if opts.Cluster.Name != _EMPTY_ {
11,888✔
866
                // Likewise here if we have a cluster name set.
4,989✔
867
                s.info.Cluster = opts.Cluster.Name
4,989✔
868
        }
4,989✔
869

870
        // This is normally done in the AcceptLoop, once the
871
        // listener has been created (possibly with random port),
872
        // but since some tests may expect the INFO to be properly
873
        // set after New(), let's do it now.
874
        s.setInfoHostPort()
6,899✔
875

6,899✔
876
        // For tracking clients
6,899✔
877
        s.clients = make(map[uint64]*client)
6,899✔
878

6,899✔
879
        // For tracking closed clients.
6,899✔
880
        s.closed = newClosedRingBuffer(opts.MaxClosedClients)
6,899✔
881

6,899✔
882
        // For tracking connections that are not yet registered
6,899✔
883
        // in s.routes, but for which readLoop has started.
6,899✔
884
        s.grTmpClients = make(map[uint64]*client)
6,899✔
885

6,899✔
886
        // For tracking routes and their remote ids
6,899✔
887
        s.initRouteStructures(opts)
6,899✔
888

6,899✔
889
        // For tracking leaf nodes.
6,899✔
890
        s.leafs = make(map[uint64]*client)
6,899✔
891

6,899✔
892
        // Used to kick out all go routines possibly waiting on server
6,899✔
893
        // to shutdown.
6,899✔
894
        s.quitCh = make(chan struct{})
6,899✔
895

6,899✔
896
        // Closed when startup is complete. ReadyForConnections() will block on
6,899✔
897
        // this before checking the presence of listening sockets.
6,899✔
898
        s.startupComplete = make(chan struct{})
6,899✔
899

6,899✔
900
        // Closed when Shutdown() is complete. Allows WaitForShutdown() to block
6,899✔
901
        // waiting for complete shutdown.
6,899✔
902
        s.shutdownComplete = make(chan struct{})
6,899✔
903

6,899✔
904
        // Check for configured account resolvers.
6,899✔
905
        if err := s.configureResolver(); err != nil {
6,899✔
906
                return nil, err
×
907
        }
×
908
        // If there is an URL account resolver, do basic test to see if anyone is home.
909
        if ar := opts.AccountResolver; ar != nil {
7,259✔
910
                if ur, ok := ar.(*URLAccResolver); ok {
399✔
911
                        if _, err := ur.Fetch(_EMPTY_); err != nil {
40✔
912
                                return nil, err
1✔
913
                        }
1✔
914
                }
915
        }
916
        // For other resolver:
917
        // In operator mode, when the account resolver depends on an external system and
918
        // the system account can't be fetched, inject a temporary one.
919
        if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
6,898✔
920
                opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
7,159✔
921
                if _, ok := ar.(*MemAccResolver); !ok {
413✔
922
                        s.mu.Unlock()
152✔
923
                        var a *Account
152✔
924
                        // perform direct lookup to avoid warning trace
152✔
925
                        if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
240✔
926
                                a, _ = s.lookupAccount(opts.SystemAccount)
88✔
927
                        }
88✔
928
                        s.mu.Lock()
152✔
929
                        if a == nil {
216✔
930
                                sac := NewAccount(opts.SystemAccount)
64✔
931
                                sac.Issuer = opts.TrustedOperators[0].Issuer
64✔
932
                                sac.signingKeys = map[string]jwt.Scope{}
64✔
933
                                sac.signingKeys[opts.SystemAccount] = nil
64✔
934
                                s.registerAccountNoLock(sac)
64✔
935
                        }
64✔
936
                }
937
        }
938

939
        // For tracking accounts
940
        if _, err := s.configureAccounts(false); err != nil {
6,898✔
941
                return nil, err
×
942
        }
×
943

944
        // Used to setup Authorization.
945
        s.configureAuthorization()
6,898✔
946

6,898✔
947
        // Start signal handler
6,898✔
948
        s.handleSignals()
6,898✔
949

6,898✔
950
        return s, nil
6,898✔
951
}
952

953
// Initializes route structures based on pooling and/or per-account routes.
954
//
955
// Server lock is held on entry
956
func (s *Server) initRouteStructures(opts *Options) {
6,899✔
957
        s.routes = make(map[string][]*client)
6,899✔
958
        if ps := opts.Cluster.PoolSize; ps > 0 {
11,043✔
959
                s.routesPoolSize = ps
4,144✔
960
        } else {
6,899✔
961
                s.routesPoolSize = 1
2,755✔
962
        }
2,755✔
963
        // If we have per-account routes, we create accRoutes and initialize it
964
        // with nil values. The presence of an account as the key will allow us
965
        // to know if a given account is supposed to have dedicated routes.
966
        if l := len(opts.Cluster.PinnedAccounts); l > 0 {
10,755✔
967
                s.accRoutes = make(map[string]map[string]*client, l)
3,856✔
968
                for _, acc := range opts.Cluster.PinnedAccounts {
7,724✔
969
                        s.accRoutes[acc] = make(map[string]*client)
3,868✔
970
                }
3,868✔
971
        }
972
}
973

974
func (s *Server) logRejectedTLSConns() {
1✔
975
        defer s.grWG.Done()
1✔
976
        t := time.NewTicker(time.Second)
1✔
977
        defer t.Stop()
1✔
978
        for {
3✔
979
                select {
2✔
980
                case <-s.quitCh:
1✔
981
                        return
1✔
982
                case <-t.C:
1✔
983
                        blocked := s.connRateCounter.countBlocked()
1✔
984
                        if blocked > 0 {
2✔
985
                                s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
1✔
986
                        }
1✔
987
                }
988
        }
989
}
990

991
// clusterName returns our cluster name which could be dynamic.
992
func (s *Server) ClusterName() string {
116,064✔
993
        s.mu.RLock()
116,064✔
994
        cn := s.info.Cluster
116,064✔
995
        s.mu.RUnlock()
116,064✔
996
        return cn
116,064✔
997
}
116,064✔
998

999
// Grabs cluster name with cluster name specific lock.
1000
func (s *Server) cachedClusterName() string {
101,546✔
1001
        s.cnMu.RLock()
101,546✔
1002
        cn := s.cn
101,546✔
1003
        s.cnMu.RUnlock()
101,546✔
1004
        return cn
101,546✔
1005
}
101,546✔
1006

1007
// setClusterName will update the cluster name for this server.
1008
func (s *Server) setClusterName(name string) {
27✔
1009
        s.mu.Lock()
27✔
1010
        var resetCh chan struct{}
27✔
1011
        if s.sys != nil && s.info.Cluster != name {
53✔
1012
                // can't hold the lock as go routine reading it may be waiting for lock as well
26✔
1013
                resetCh = s.sys.resetCh
26✔
1014
        }
26✔
1015
        s.info.Cluster = name
27✔
1016
        s.routeInfo.Cluster = name
27✔
1017

27✔
1018
        // Need to close solicited leaf nodes. The close has to be done outside of the server lock.
27✔
1019
        var leafs []*client
27✔
1020
        for _, c := range s.leafs {
28✔
1021
                c.mu.Lock()
1✔
1022
                if c.leaf != nil && c.leaf.remote != nil {
2✔
1023
                        leafs = append(leafs, c)
1✔
1024
                }
1✔
1025
                c.mu.Unlock()
1✔
1026
        }
1027
        s.mu.Unlock()
27✔
1028

27✔
1029
        // Also place into mapping cn with cnMu lock.
27✔
1030
        s.cnMu.Lock()
27✔
1031
        s.cn = name
27✔
1032
        s.cnMu.Unlock()
27✔
1033

27✔
1034
        for _, l := range leafs {
28✔
1035
                l.closeConnection(ClusterNameConflict)
1✔
1036
        }
1✔
1037
        if resetCh != nil {
53✔
1038
                resetCh <- struct{}{}
26✔
1039
        }
26✔
1040
        s.Noticef("Cluster name updated to %s", name)
27✔
1041
}
1042

1043
// Return whether the cluster name is dynamic.
1044
func (s *Server) isClusterNameDynamic() bool {
44,416✔
1045
        // We need to lock the whole "Cluster.Name" check and not use s.getOpts()
44,416✔
1046
        // because otherwise this could cause a data race with setting the name in
44,416✔
1047
        // route.go's processRouteConnect().
44,416✔
1048
        s.optsMu.RLock()
44,416✔
1049
        dynamic := s.opts.Cluster.Name == _EMPTY_
44,416✔
1050
        s.optsMu.RUnlock()
44,416✔
1051
        return dynamic
44,416✔
1052
}
44,416✔
1053

1054
// Returns our configured serverName.
1055
func (s *Server) serverName() string {
24,712✔
1056
        return s.getOpts().ServerName
24,712✔
1057
}
24,712✔
1058

1059
// ClientURL returns the URL used to connect clients.
1060
// Helpful in tests and with in-process servers using a random client port (-1).
1061
func (s *Server) ClientURL() string {
6,670✔
1062
        // FIXME(dlc) - should we add in user and pass if defined single?
6,670✔
1063
        opts := s.getOpts()
6,670✔
1064
        var u url.URL
6,670✔
1065
        u.Scheme = "nats"
6,670✔
1066
        if opts.TLSConfig != nil {
6,680✔
1067
                u.Scheme = "tls"
10✔
1068
        }
10✔
1069
        u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
6,670✔
1070
        return u.String()
6,670✔
1071
}
1072

1073
// WebsocketURL returns the URL used to connect websocket clients.
1074
// Helpful in tests and with in-process servers using a random websocket port (-1).
1075
func (s *Server) WebsocketURL() string {
×
1076
        opts := s.getOpts()
×
1077
        var u url.URL
×
1078
        u.Scheme = "ws"
×
1079
        if opts.Websocket.TLSConfig != nil {
×
1080
                u.Scheme = "wss"
×
1081
        }
×
1082
        u.Host = net.JoinHostPort(opts.Websocket.Host, fmt.Sprintf("%d", opts.Websocket.Port))
×
1083
        return u.String()
×
1084
}
1085

1086
func validateCluster(o *Options) error {
8,059✔
1087
        if o.Cluster.Name != _EMPTY_ && strings.Contains(o.Cluster.Name, " ") {
8,059✔
1088
                return ErrClusterNameHasSpaces
×
1089
        }
×
1090
        if o.Cluster.Compression.Mode != _EMPTY_ {
13,095✔
1091
                if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil {
5,036✔
1092
                        return err
×
1093
                }
×
1094
        }
1095
        if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil {
8,059✔
1096
                return fmt.Errorf("cluster: %v", err)
×
1097
        }
×
1098
        // Check that cluster name if defined matches any gateway name.
1099
        // Note that we have already verified that the gateway name does not have spaces.
1100
        if o.Gateway.Name != _EMPTY_ && o.Gateway.Name != o.Cluster.Name {
8,423✔
1101
                if o.Cluster.Name != _EMPTY_ {
364✔
1102
                        return ErrClusterNameConfigConflict
×
1103
                }
×
1104
                // Set this here so we do not consider it dynamic.
1105
                o.Cluster.Name = o.Gateway.Name
364✔
1106
        }
1107
        if l := len(o.Cluster.PinnedAccounts); l > 0 {
11,929✔
1108
                if o.Cluster.PoolSize < 0 {
3,870✔
1109
                        return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
×
1110
                }
×
1111
                m := make(map[string]struct{}, l)
3,870✔
1112
                for _, a := range o.Cluster.PinnedAccounts {
7,752✔
1113
                        if _, exists := m[a]; exists {
3,882✔
1114
                                return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
×
1115
                        }
×
1116
                        m[a] = struct{}{}
3,882✔
1117
                }
1118
        }
1119
        return nil
8,059✔
1120
}
1121

1122
func validatePinnedCerts(pinned PinnedCertSet) error {
18,402✔
1123
        re := regexp.MustCompile("^[a-f0-9]{64}$")
18,402✔
1124
        for certId := range pinned {
18,411✔
1125
                entry := strings.ToLower(certId)
9✔
1126
                if !re.MatchString(entry) {
9✔
1127
                        return fmt.Errorf("error parsing 'pinned_certs' key %s does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo", entry)
×
1128
                }
×
1129
        }
1130
        return nil
18,402✔
1131
}
1132

1133
func validateOptions(o *Options) error {
8,100✔
1134
        if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
8,100✔
1135
                return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
×
1136
                        o.LameDuckGracePeriod, o.LameDuckDuration)
×
1137
        }
×
1138
        if int64(o.MaxPayload) > o.MaxPending {
8,100✔
1139
                return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)",
×
1140
                        o.MaxPayload, o.MaxPending)
×
1141
        }
×
1142
        if o.ServerName != _EMPTY_ && strings.Contains(o.ServerName, " ") {
8,100✔
1143
                return errors.New("server name cannot contain spaces")
×
1144
        }
×
1145
        // Check that the trust configuration is correct.
1146
        if err := validateTrustedOperators(o); err != nil {
8,109✔
1147
                return err
9✔
1148
        }
9✔
1149
        // Check on leaf nodes which will require a system
1150
        // account when gateways are also configured.
1151
        if err := validateLeafNode(o); err != nil {
8,121✔
1152
                return err
30✔
1153
        }
30✔
1154
        // Check that authentication is properly configured.
1155
        if err := validateAuth(o); err != nil {
8,063✔
1156
                return err
2✔
1157
        }
2✔
1158
        // Check that proxies is properly configured.
1159
        if err := validateProxies(o); err != nil {
8,059✔
1160
                return err
×
1161
        }
×
1162
        // Check that gateway is properly configured. Returns no error
1163
        // if there is no gateway defined.
1164
        if err := validateGatewayOptions(o); err != nil {
8,059✔
1165
                return err
×
1166
        }
×
1167
        // Check that cluster name if defined matches any gateway name.
1168
        if err := validateCluster(o); err != nil {
8,059✔
1169
                return err
×
1170
        }
×
1171
        if err := validateMQTTOptions(o); err != nil {
8,064✔
1172
                return err
5✔
1173
        }
5✔
1174
        if err := validateJetStreamOptions(o); err != nil {
8,066✔
1175
                return err
12✔
1176
        }
12✔
1177
        // Finally check websocket options.
1178
        return validateWebsocketOptions(o)
8,042✔
1179
}
1180

1181
func (s *Server) getOpts() *Options {
3,177,876✔
1182
        s.optsMu.RLock()
3,177,876✔
1183
        opts := s.opts
3,177,876✔
1184
        s.optsMu.RUnlock()
3,177,876✔
1185
        return opts
3,177,876✔
1186
}
3,177,876✔
1187

1188
func (s *Server) setOpts(opts *Options) {
1,153✔
1189
        s.optsMu.Lock()
1,153✔
1190
        s.opts = opts
1,153✔
1191
        s.optsMu.Unlock()
1,153✔
1192
}
1,153✔
1193

1194
func (s *Server) globalAccount() *Account {
11,606✔
1195
        s.mu.RLock()
11,606✔
1196
        gacc := s.gacc
11,606✔
1197
        s.mu.RUnlock()
11,606✔
1198
        return gacc
11,606✔
1199
}
11,606✔
1200

1201
// Used to setup or update Accounts.
1202
// Returns a map that indicates which accounts have had their stream imports
1203
// changed (in case of an update in configuration reload).
1204
// Lock is held upon entry, but will be released/reacquired in this function.
1205
func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) {
8,028✔
1206
        awcsti := make(map[string]struct{})
8,028✔
1207

8,028✔
1208
        // Create the global account.
8,028✔
1209
        if s.gacc == nil {
14,926✔
1210
                s.gacc = NewAccount(globalAccountName)
6,898✔
1211
                s.registerAccountNoLock(s.gacc)
6,898✔
1212
        }
6,898✔
1213

1214
        opts := s.getOpts()
8,028✔
1215

8,028✔
1216
        // We need to track service imports since we can not swap them out (unsub and re-sub)
8,028✔
1217
        // until the proper server struct accounts have been swapped in properly. Doing it in
8,028✔
1218
        // place could lead to data loss or server panic since account under new si has no real
8,028✔
1219
        // account and hence no sublist, so will panic on inbound message.
8,028✔
1220
        siMap := make(map[*Account][][]byte)
8,028✔
1221

8,028✔
1222
        // Check opts and walk through them. We need to copy them here
8,028✔
1223
        // so that we do not keep a real one sitting in the options.
8,028✔
1224
        for _, acc := range opts.Accounts {
16,670✔
1225
                var a *Account
8,642✔
1226
                create := true
8,642✔
1227
                // For the global account, we want to skip the reload process
8,642✔
1228
                // and fall back into the "create" case which will in that
8,642✔
1229
                // case really be just an update (shallowCopy will make sure
8,642✔
1230
                // that mappings are copied over).
8,642✔
1231
                if reloading && acc.Name != globalAccountName {
11,785✔
1232
                        if ai, ok := s.accounts.Load(acc.Name); ok {
6,286✔
1233
                                a = ai.(*Account)
3,143✔
1234
                                // Before updating the account, check if stream imports have changed.
3,143✔
1235
                                if !a.checkStreamImportsEqual(acc) {
3,144✔
1236
                                        awcsti[acc.Name] = struct{}{}
1✔
1237
                                }
1✔
1238
                                a.mu.Lock()
3,143✔
1239
                                // Collect the sids for the service imports since we are going to
3,143✔
1240
                                // replace with new ones.
3,143✔
1241
                                var sids [][]byte
3,143✔
1242
                                for _, sis := range a.imports.services {
13,540✔
1243
                                        for _, si := range sis {
20,794✔
1244
                                                if si.sid != nil {
20,794✔
1245
                                                        sids = append(sids, si.sid)
10,397✔
1246
                                                }
10,397✔
1247
                                        }
1248
                                }
1249
                                // Setup to process later if needed.
1250
                                if len(sids) > 0 || len(acc.imports.services) > 0 {
6,286✔
1251
                                        siMap[a] = sids
3,143✔
1252
                                }
3,143✔
1253

1254
                                // Now reset all export/imports fields since they are going to be
1255
                                // filled in shallowCopy()
1256
                                a.imports.streams, a.imports.services = nil, nil
3,143✔
1257
                                a.exports.streams, a.exports.services = nil, nil
3,143✔
1258
                                // We call shallowCopy from the account `acc` (the one in Options)
3,143✔
1259
                                // and pass `a` (our existing account) to get it updated.
3,143✔
1260
                                acc.shallowCopy(a)
3,143✔
1261
                                a.mu.Unlock()
3,143✔
1262
                                create = false
3,143✔
1263
                        }
1264
                }
1265
                // Track old mappings if global account.
1266
                var oldGMappings []*mapping
8,642✔
1267
                if create {
14,141✔
1268
                        if acc.Name == globalAccountName {
5,508✔
1269
                                a = s.gacc
9✔
1270
                                a.mu.Lock()
9✔
1271
                                oldGMappings = append(oldGMappings, a.mappings...)
9✔
1272
                                a.mu.Unlock()
9✔
1273
                        } else {
5,499✔
1274
                                a = NewAccount(acc.Name)
5,490✔
1275
                        }
5,490✔
1276
                        // Locking matters in the case of an update of the global account
1277
                        a.mu.Lock()
5,499✔
1278
                        acc.shallowCopy(a)
5,499✔
1279
                        a.mu.Unlock()
5,499✔
1280
                        // Will be a no-op in case of the global account since it is already registered.
5,499✔
1281
                        s.registerAccountNoLock(a)
5,499✔
1282
                }
1283

1284
                // The `acc` account is stored in options, not in the server, and these can be cleared.
1285
                acc.sl, acc.clients, acc.mappings = nil, nil, nil
8,642✔
1286

8,642✔
1287
                // Check here if we have been reloaded and we have a global account with mappings that may have changed.
8,642✔
1288
                // If we have leafnodes they need to be updated.
8,642✔
1289
                if reloading && a == s.gacc {
8,644✔
1290
                        a.mu.Lock()
2✔
1291
                        mappings := make(map[string]*mapping)
2✔
1292
                        if len(a.mappings) > 0 && a.nleafs > 0 {
4✔
1293
                                for _, em := range a.mappings {
4✔
1294
                                        mappings[em.src] = em
2✔
1295
                                }
2✔
1296
                        }
1297
                        a.mu.Unlock()
2✔
1298
                        if len(mappings) > 0 || len(oldGMappings) > 0 {
4✔
1299
                                a.lmu.RLock()
2✔
1300
                                for _, lc := range a.lleafs {
4✔
1301
                                        for _, em := range mappings {
4✔
1302
                                                lc.forceAddToSmap(em.src)
2✔
1303
                                        }
2✔
1304
                                        // Remove any old ones if needed.
1305
                                        for _, em := range oldGMappings {
4✔
1306
                                                // Only remove if not in the new ones.
2✔
1307
                                                if _, ok := mappings[em.src]; !ok {
3✔
1308
                                                        lc.forceRemoveFromSmap(em.src)
1✔
1309
                                                }
1✔
1310
                                        }
1311
                                }
1312
                                a.lmu.RUnlock()
2✔
1313
                        }
1314
                }
1315

1316
                // If we see an account defined using $SYS we will make sure that is set as system account.
1317
                if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ {
12,481✔
1318
                        opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT
3,839✔
1319
                }
3,839✔
1320
        }
1321

1322
        // Now that we have this we need to remap any referenced accounts in
1323
        // import or export maps to the new ones.
1324
        swapApproved := func(ea *exportAuth) {
9,636✔
1325
                for sub, a := range ea.approved {
1,653✔
1326
                        var acc *Account
45✔
1327
                        if v, ok := s.accounts.Load(a.Name); ok {
90✔
1328
                                acc = v.(*Account)
45✔
1329
                        }
45✔
1330
                        ea.approved[sub] = acc
45✔
1331
                }
1332
        }
1333
        var numAccounts int
8,028✔
1334
        s.accounts.Range(func(k, v any) bool {
24,907✔
1335
                numAccounts++
16,879✔
1336
                acc := v.(*Account)
16,879✔
1337
                acc.mu.Lock()
16,879✔
1338
                // Exports
16,879✔
1339
                for _, se := range acc.exports.streams {
17,012✔
1340
                        if se != nil {
142✔
1341
                                swapApproved(&se.exportAuth)
9✔
1342
                        }
9✔
1343
                }
1344
                for _, se := range acc.exports.services {
18,478✔
1345
                        if se != nil {
3,198✔
1346
                                // Swap over the bound account for service exports.
1,599✔
1347
                                if se.acc != nil {
3,198✔
1348
                                        if v, ok := s.accounts.Load(se.acc.Name); ok {
3,198✔
1349
                                                se.acc = v.(*Account)
1,599✔
1350
                                        }
1,599✔
1351
                                }
1352
                                swapApproved(&se.exportAuth)
1,599✔
1353
                        }
1354
                }
1355
                // Imports
1356
                for _, si := range acc.imports.streams {
17,061✔
1357
                        if v, ok := s.accounts.Load(si.acc.Name); ok {
364✔
1358
                                si.acc = v.(*Account)
182✔
1359
                        }
182✔
1360
                }
1361
                for _, sis := range acc.imports.services {
22,798✔
1362
                        for _, si := range sis {
11,841✔
1363
                                if v, ok := s.accounts.Load(si.acc.Name); ok {
11,844✔
1364
                                        si.acc = v.(*Account)
5,922✔
1365

5,922✔
1366
                                        // It is possible to allow for latency tracking inside your
5,922✔
1367
                                        // own account, so lock only when not the same account.
5,922✔
1368
                                        if si.acc == acc {
5,989✔
1369
                                                si.se = si.acc.getServiceExport(si.to)
67✔
1370
                                                continue
67✔
1371
                                        }
1372
                                        si.acc.mu.RLock()
5,855✔
1373
                                        si.se = si.acc.getServiceExport(si.to)
5,855✔
1374
                                        si.acc.mu.RUnlock()
5,855✔
1375
                                }
1376
                        }
1377
                }
1378
                // Make sure the subs are running, but only if not reloading.
1379
                if len(acc.imports.services) > 0 && acc.ic == nil && !reloading {
17,130✔
1380
                        acc.ic = s.createInternalAccountClient()
251✔
1381
                        acc.ic.acc = acc
251✔
1382
                        // Need to release locks to invoke this function.
251✔
1383
                        acc.mu.Unlock()
251✔
1384
                        s.mu.Unlock()
251✔
1385
                        acc.addAllServiceImportSubs()
251✔
1386
                        s.mu.Lock()
251✔
1387
                        acc.mu.Lock()
251✔
1388
                }
251✔
1389
                acc.updated = time.Now()
16,879✔
1390
                acc.mu.Unlock()
16,879✔
1391
                return true
16,879✔
1392
        })
1393

1394
        // Check if we need to process service imports pending from above.
1395
        // This processing needs to be after we swap in the real accounts above.
1396
        for acc, sids := range siMap {
11,171✔
1397
                c := acc.ic
3,143✔
1398
                for _, sid := range sids {
13,540✔
1399
                        c.processUnsub(sid)
10,397✔
1400
                }
10,397✔
1401
                acc.addAllServiceImportSubs()
3,143✔
1402
                s.mu.Unlock()
3,143✔
1403
                s.registerSystemImports(acc)
3,143✔
1404
                s.mu.Lock()
3,143✔
1405
        }
1406

1407
        // Set the system account if it was configured.
1408
        // Otherwise create a default one.
1409
        if opts.SystemAccount != _EMPTY_ {
13,031✔
1410
                // Lock may be acquired in lookupAccount, so release to call lookupAccount.
5,003✔
1411
                s.mu.Unlock()
5,003✔
1412
                acc, err := s.lookupAccount(opts.SystemAccount)
5,003✔
1413
                s.mu.Lock()
5,003✔
1414
                if err == nil && s.sys != nil && acc != s.sys.account {
5,003✔
1415
                        // sys.account.clients (including internal client)/respmap/etc... are transferred separately
×
1416
                        s.sys.account = acc
×
1417
                        s.sysAcc.Store(acc)
×
1418
                }
×
1419
                if err != nil {
5,003✔
1420
                        return awcsti, fmt.Errorf("error resolving system account: %v", err)
×
1421
                }
×
1422

1423
                // If we have defined a system account here check to see if its just us and the $G account.
1424
                // We would do this to add user/pass to the system account. If this is the case add in
1425
                // no-auth-user for $G.
1426
                // Only do this if non-operator mode and we did not have an authorization block defined.
1427
                if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
8,045✔
1428
                        // If we come here from config reload, let's not recreate the fake user name otherwise
3,042✔
1429
                        // it will cause currently clients to be disconnected.
3,042✔
1430
                        uname := s.sysAccOnlyNoAuthUser
3,042✔
1431
                        if uname == _EMPTY_ {
6,081✔
1432
                                // Create a unique name so we do not collide.
3,039✔
1433
                                var b [8]byte
3,039✔
1434
                                rn := rand.Int63()
3,039✔
1435
                                for i, l := 0, rn; i < len(b); i++ {
27,351✔
1436
                                        b[i] = digits[l%base]
24,312✔
1437
                                        l /= base
24,312✔
1438
                                }
24,312✔
1439
                                uname = fmt.Sprintf("nats-%s", b[:])
3,039✔
1440
                                s.sysAccOnlyNoAuthUser = uname
3,039✔
1441
                        }
1442
                        opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc})
3,042✔
1443
                        opts.NoAuthUser = uname
3,042✔
1444
                }
1445
        }
1446

1447
        // Add any required exports from system account.
1448
        if s.sys != nil {
9,158✔
1449
                sysAcc := s.sys.account
1,130✔
1450
                s.mu.Unlock()
1,130✔
1451
                s.addSystemAccountExports(sysAcc)
1,130✔
1452
                s.mu.Lock()
1,130✔
1453
        }
1,130✔
1454

1455
        return awcsti, nil
8,028✔
1456
}
1457

1458
// Setup the account resolver. For memory resolver, make sure the JWTs are
1459
// properly formed but do not enforce expiration etc.
1460
// Lock is held on entry, but may be released/reacquired during this call.
1461
func (s *Server) configureResolver() error {
6,908✔
1462
        opts := s.getOpts()
6,908✔
1463
        s.accResolver = opts.AccountResolver
6,908✔
1464
        if opts.AccountResolver != nil {
7,277✔
1465
                // For URL resolver, set the TLSConfig if specified.
369✔
1466
                if opts.AccountResolverTLSConfig != nil {
371✔
1467
                        if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
4✔
1468
                                if t, ok := ar.c.Transport.(*http.Transport); ok {
4✔
1469
                                        t.CloseIdleConnections()
2✔
1470
                                        t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
2✔
1471
                                }
2✔
1472
                        }
1473
                }
1474
                if len(opts.resolverPreloads) > 0 {
541✔
1475
                        // Lock ordering is account resolver -> server, so we need to release
172✔
1476
                        // the lock and reacquire it when done with account resolver's calls.
172✔
1477
                        ar := s.accResolver
172✔
1478
                        s.mu.Unlock()
172✔
1479
                        defer s.mu.Lock()
172✔
1480
                        if ar.IsReadOnly() {
172✔
1481
                                return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
×
1482
                        }
×
1483
                        for k, v := range opts.resolverPreloads {
1,092✔
1484
                                _, err := jwt.DecodeAccountClaims(v)
920✔
1485
                                if err != nil {
920✔
1486
                                        return fmt.Errorf("preload account error for %q: %v", k, err)
×
1487
                                }
×
1488
                                ar.Store(k, v)
920✔
1489
                        }
1490
                }
1491
        }
1492
        return nil
6,908✔
1493
}
1494

1495
// This will check preloads for validation issues.
1496
func (s *Server) checkResolvePreloads() {
173✔
1497
        opts := s.getOpts()
173✔
1498
        // We can just check the read-only opts versions here, that way we do not need
173✔
1499
        // to grab server lock or access s.accResolver.
173✔
1500
        for k, v := range opts.resolverPreloads {
1,093✔
1501
                claims, err := jwt.DecodeAccountClaims(v)
920✔
1502
                if err != nil {
920✔
1503
                        s.Errorf("Preloaded account [%s] not valid", k)
×
1504
                        continue
×
1505
                }
1506
                // Check if it is expired.
1507
                vr := jwt.CreateValidationResults()
920✔
1508
                claims.Validate(vr)
920✔
1509
                if vr.IsBlocking(true) {
921✔
1510
                        s.Warnf("Account [%s] has validation issues:", k)
1✔
1511
                        for _, v := range vr.Issues {
2✔
1512
                                s.Warnf("  - %s", v.Description)
1✔
1513
                        }
1✔
1514
                }
1515
        }
1516
}
1517

1518
// Determines if we are in pre NATS 2.0 setup with no accounts.
1519
func (s *Server) globalAccountOnly() bool {
4,469✔
1520
        var hasOthers bool
4,469✔
1521

4,469✔
1522
        if s.trustedKeys != nil {
4,612✔
1523
                return false
143✔
1524
        }
143✔
1525

1526
        s.mu.RLock()
4,326✔
1527
        s.accounts.Range(func(k, v any) bool {
12,887✔
1528
                acc := v.(*Account)
8,561✔
1529
                // Ignore global and system
8,561✔
1530
                if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
16,748✔
1531
                        return true
8,187✔
1532
                }
8,187✔
1533
                hasOthers = true
374✔
1534
                return false
374✔
1535
        })
1536
        s.mu.RUnlock()
4,326✔
1537

4,326✔
1538
        return !hasOthers
4,326✔
1539
}
1540

1541
// Determines if this server is in standalone mode, meaning no routes or gateways.
1542
func (s *Server) standAloneMode() bool {
27,634✔
1543
        opts := s.getOpts()
27,634✔
1544
        return opts.Cluster.Port == 0 && opts.Gateway.Port == 0
27,634✔
1545
}
27,634✔
1546

1547
func (s *Server) configuredRoutes() int {
3,388✔
1548
        return len(s.getOpts().Routes)
3,388✔
1549
}
3,388✔
1550

1551
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
1552
func (s *Server) ActivePeers() (peers []string) {
3,171✔
1553
        s.nodeToInfo.Range(func(k, v any) bool {
6,725✔
1554
                si := v.(nodeInfo)
3,554✔
1555
                if !si.offline {
7,108✔
1556
                        peers = append(peers, k.(string))
3,554✔
1557
                }
3,554✔
1558
                return true
3,554✔
1559
        })
1560
        return peers
3,171✔
1561
}
1562

1563
// isTrustedIssuer will check that the issuer is a trusted public key.
1564
// This is used to make sure an account was signed by a trusted operator.
1565
func (s *Server) isTrustedIssuer(issuer string) bool {
10,706✔
1566
        s.mu.RLock()
10,706✔
1567
        defer s.mu.RUnlock()
10,706✔
1568
        // If we are not running in trusted mode and there is no issuer, that is ok.
10,706✔
1569
        if s.trustedKeys == nil && issuer == _EMPTY_ {
16,484✔
1570
                return true
5,778✔
1571
        }
5,778✔
1572
        for _, tk := range s.trustedKeys {
10,066✔
1573
                if tk == issuer {
9,987✔
1574
                        return true
4,849✔
1575
                }
4,849✔
1576
        }
1577
        return false
79✔
1578
}
1579

1580
// processTrustedKeys will process binary stamped and
1581
// options-based trusted nkeys. Returns success.
1582
func (s *Server) processTrustedKeys() bool {
6,901✔
1583
        s.strictSigningKeyUsage = map[string]struct{}{}
6,901✔
1584
        opts := s.getOpts()
6,901✔
1585
        if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() {
6,901✔
1586
                return false
×
1587
        } else if opts.TrustedKeys != nil {
7,315✔
1588
                for _, key := range opts.TrustedKeys {
1,708✔
1589
                        if !nkeys.IsValidPublicOperatorKey(key) {
1,294✔
1590
                                return false
×
1591
                        }
×
1592
                }
1593
                s.trustedKeys = append([]string(nil), opts.TrustedKeys...)
414✔
1594
                for _, claim := range opts.TrustedOperators {
717✔
1595
                        if !claim.StrictSigningKeyUsage {
604✔
1596
                                continue
301✔
1597
                        }
1598
                        for _, key := range claim.SigningKeys {
4✔
1599
                                s.strictSigningKeyUsage[key] = struct{}{}
2✔
1600
                        }
2✔
1601
                }
1602
        }
1603
        return true
6,901✔
1604
}
1605

1606
// checkTrustedKeyString will check that the string is a valid array
1607
// of public operator nkeys.
1608
func checkTrustedKeyString(keys string) []string {
×
1609
        tks := strings.Fields(keys)
×
1610
        if len(tks) == 0 {
×
1611
                return nil
×
1612
        }
×
1613
        // Walk all the keys and make sure they are valid.
1614
        for _, key := range tks {
×
1615
                if !nkeys.IsValidPublicOperatorKey(key) {
×
1616
                        return nil
×
1617
                }
×
1618
        }
1619
        return tks
×
1620
}
1621

1622
// initStampedTrustedKeys will check the stamped trusted keys
1623
// and will set the server field 'trustedKeys'. Returns whether
1624
// it succeeded or not.
1625
func (s *Server) initStampedTrustedKeys() bool {
×
1626
        // Check to see if we have an override in options, which will cause us to fail.
×
1627
        if len(s.getOpts().TrustedKeys) > 0 {
×
1628
                return false
×
1629
        }
×
1630
        tks := checkTrustedKeyString(trustedKeys)
×
1631
        if len(tks) == 0 {
×
1632
                return false
×
1633
        }
×
1634
        s.trustedKeys = tks
×
1635
        return true
×
1636
}
1637

1638
// PrintAndDie is exported for access in other packages.
1639
func PrintAndDie(msg string) {
×
1640
        fmt.Fprintln(os.Stderr, msg)
×
1641
        os.Exit(1)
×
1642
}
×
1643

1644
// PrintServerAndExit will print our version and exit.
1645
func PrintServerAndExit() {
×
1646
        fmt.Printf("nats-server: v%s\n", VERSION)
×
1647
        os.Exit(0)
×
1648
}
×
1649

1650
// ProcessCommandLineArgs takes the command line arguments
1651
// validating and setting flags for handling in case any
1652
// sub command was present.
1653
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
×
1654
        if len(cmd.Args()) > 0 {
×
1655
                arg := cmd.Args()[0]
×
1656
                switch strings.ToLower(arg) {
×
1657
                case "version":
×
1658
                        return true, false, nil
×
1659
                case "help":
×
1660
                        return false, true, nil
×
1661
                default:
×
1662
                        return false, false, fmt.Errorf("unrecognized command: %q", arg)
×
1663
                }
1664
        }
1665

1666
        return false, false, nil
×
1667
}
1668

1669
// Public version.
1670
func (s *Server) Running() bool {
2,430✔
1671
        return s.isRunning()
2,430✔
1672
}
2,430✔
1673

1674
// Protected check on running state
1675
func (s *Server) isRunning() bool {
326,583,346✔
1676
        return s.running.Load()
326,583,346✔
1677
}
326,583,346✔
1678

1679
func (s *Server) logPid() error {
1✔
1680
        pidStr := strconv.Itoa(os.Getpid())
1✔
1681
        return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
1✔
1682
}
1✔
1683

1684
// numReservedAccounts will return the number of reserved accounts configured in the server.
1685
// Currently this is 1, one for the global default account.
1686
func (s *Server) numReservedAccounts() int {
1✔
1687
        return 1
1✔
1688
}
1✔
1689

1690
// NumActiveAccounts reports number of active accounts on this server.
1691
func (s *Server) NumActiveAccounts() int32 {
×
1692
        return atomic.LoadInt32(&s.activeAccounts)
×
1693
}
×
1694

1695
// incActiveAccounts() just adds one under lock.
1696
func (s *Server) incActiveAccounts() {
22,779✔
1697
        atomic.AddInt32(&s.activeAccounts, 1)
22,779✔
1698
}
22,779✔
1699

1700
// decActiveAccounts() just subtracts one under lock.
1701
func (s *Server) decActiveAccounts() {
12,162✔
1702
        atomic.AddInt32(&s.activeAccounts, -1)
12,162✔
1703
}
12,162✔
1704

1705
// This should be used for testing only. Will be slow since we have to
1706
// range over all accounts in the sync.Map to count.
1707
func (s *Server) numAccounts() int {
6✔
1708
        count := 0
6✔
1709
        s.mu.RLock()
6✔
1710
        s.accounts.Range(func(k, v any) bool {
20✔
1711
                count++
14✔
1712
                return true
14✔
1713
        })
14✔
1714
        s.mu.RUnlock()
6✔
1715
        return count
6✔
1716
}
1717

1718
// NumLoadedAccounts returns the number of loaded accounts.
1719
func (s *Server) NumLoadedAccounts() int {
4✔
1720
        return s.numAccounts()
4✔
1721
}
4✔
1722

1723
// LookupOrRegisterAccount will return the given account if known or create a new entry.
1724
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
2,216✔
1725
        s.mu.Lock()
2,216✔
1726
        defer s.mu.Unlock()
2,216✔
1727
        if v, ok := s.accounts.Load(name); ok {
2,216✔
1728
                return v.(*Account), false
×
1729
        }
×
1730
        acc := NewAccount(name)
2,216✔
1731
        s.registerAccountNoLock(acc)
2,216✔
1732
        return acc, true
2,216✔
1733
}
1734

1735
// RegisterAccount will register an account. The account must be new
1736
// or this call will fail.
1737
func (s *Server) RegisterAccount(name string) (*Account, error) {
849✔
1738
        s.mu.Lock()
849✔
1739
        defer s.mu.Unlock()
849✔
1740
        if _, ok := s.accounts.Load(name); ok {
978✔
1741
                return nil, ErrAccountExists
129✔
1742
        }
129✔
1743
        acc := NewAccount(name)
720✔
1744
        s.registerAccountNoLock(acc)
720✔
1745
        return acc, nil
720✔
1746
}
1747

1748
// SetSystemAccount will set the internal system account.
1749
// If root operators are present it will also check validity.
1750
func (s *Server) SetSystemAccount(accName string) error {
6,150✔
1751
        // Lookup from sync.Map first.
6,150✔
1752
        if v, ok := s.accounts.Load(accName); ok {
12,298✔
1753
                return s.setSystemAccount(v.(*Account))
6,148✔
1754
        }
6,148✔
1755

1756
        // If we are here we do not have local knowledge of this account.
1757
        // Do this one by hand to return more useful error.
1758
        ac, jwt, err := s.fetchAccountClaims(accName)
2✔
1759
        if err != nil {
3✔
1760
                return err
1✔
1761
        }
1✔
1762
        acc := s.buildInternalAccount(ac)
1✔
1763
        acc.claimJWT = jwt
1✔
1764
        // Due to race, we need to make sure that we are not
1✔
1765
        // registering twice.
1✔
1766
        if racc := s.registerAccount(acc); racc != nil {
1✔
1767
                return nil
×
1768
        }
×
1769
        return s.setSystemAccount(acc)
1✔
1770
}
1771

1772
// SystemAccount returns the system account if set.
1773
func (s *Server) SystemAccount() *Account {
397,788✔
1774
        return s.sysAcc.Load()
397,788✔
1775
}
397,788✔
1776

1777
// GlobalAccount returns the global account.
1778
// Default clients will use the global account.
1779
func (s *Server) GlobalAccount() *Account {
7,539✔
1780
        s.mu.RLock()
7,539✔
1781
        defer s.mu.RUnlock()
7,539✔
1782
        return s.gacc
7,539✔
1783
}
7,539✔
1784

1785
// SetDefaultSystemAccount will create a default system account if one is not present.
1786
func (s *Server) SetDefaultSystemAccount() error {
2,205✔
1787
        if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
2,205✔
1788
                return nil
×
1789
        }
×
1790
        s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
2,205✔
1791
        return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
2,205✔
1792
}
1793

1794
// Assign a system account. Should only be called once.
1795
// This sets up a server to send and receive messages from
1796
// inside the server itself.
1797
func (s *Server) setSystemAccount(acc *Account) error {
6,169✔
1798
        if acc == nil {
6,169✔
1799
                return ErrMissingAccount
×
1800
        }
×
1801
        // Don't try to fix this here.
1802
        if acc.IsExpired() {
6,169✔
1803
                return ErrAccountExpired
×
1804
        }
×
1805
        // If we are running with trusted keys for an operator
1806
        // make sure we check the account is legit.
1807
        if !s.isTrustedIssuer(acc.Issuer) {
6,241✔
1808
                return ErrAccountValidation
72✔
1809
        }
72✔
1810

1811
        s.mu.Lock()
6,097✔
1812

6,097✔
1813
        if s.sys != nil {
6,099✔
1814
                s.mu.Unlock()
2✔
1815
                return ErrAccountExists
2✔
1816
        }
2✔
1817

1818
        // This is here in an attempt to quiet the race detector and not have to place
1819
        // locks on fast path for inbound messages and checking service imports.
1820
        acc.mu.Lock()
6,095✔
1821
        if acc.imports.services == nil {
12,190✔
1822
                acc.imports.services = make(map[string][]*serviceImport)
6,095✔
1823
        }
6,095✔
1824
        acc.mu.Unlock()
6,095✔
1825

6,095✔
1826
        s.sys = &internal{
6,095✔
1827
                account: acc,
6,095✔
1828
                client:  s.createInternalSystemClient(),
6,095✔
1829
                seq:     1,
6,095✔
1830
                sid:     1,
6,095✔
1831
                servers: make(map[string]*serverUpdate),
6,095✔
1832
                replies: make(map[string]msgHandler),
6,095✔
1833
                sendq:   newIPQueue[*pubMsg](s, "System sendQ"),
6,095✔
1834
                recvq:   newIPQueue[*inSysMsg](s, "System recvQ"),
6,095✔
1835
                recvqp:  newIPQueue[*inSysMsg](s, "System recvQ Pings"),
6,095✔
1836
                resetCh: make(chan struct{}),
6,095✔
1837
                sq:      s.newSendQ(acc),
6,095✔
1838
                statsz:  statsHBInterval,
6,095✔
1839
                orphMax: 5 * eventsHBInterval,
6,095✔
1840
                chkOrph: 3 * eventsHBInterval,
6,095✔
1841
        }
6,095✔
1842
        recvq, recvqp := s.sys.recvq, s.sys.recvqp
6,095✔
1843
        s.sys.wg.Add(1)
6,095✔
1844
        s.mu.Unlock()
6,095✔
1845

6,095✔
1846
        // Store in atomic for fast lookup.
6,095✔
1847
        s.sysAcc.Store(acc)
6,095✔
1848

6,095✔
1849
        // Register with the account.
6,095✔
1850
        s.sys.client.registerWithAccount(acc)
6,095✔
1851

6,095✔
1852
        s.addSystemAccountExports(acc)
6,095✔
1853

6,095✔
1854
        // Start our internal loop to serialize outbound messages.
6,095✔
1855
        // We do our own wg here since we will stop first during shutdown.
6,095✔
1856
        go s.internalSendLoop(&s.sys.wg)
6,095✔
1857

6,095✔
1858
        // Start the internal loop for inbound messages.
6,095✔
1859
        go s.internalReceiveLoop(recvq)
6,095✔
1860
        // Start the internal loop for inbound STATSZ/Ping messages.
6,095✔
1861
        go s.internalReceiveLoop(recvqp)
6,095✔
1862

6,095✔
1863
        // Start up our general subscriptions
6,095✔
1864
        s.initEventTracking()
6,095✔
1865

6,095✔
1866
        // Track for dead remote servers.
6,095✔
1867
        s.wrapChk(s.startRemoteServerSweepTimer)()
6,095✔
1868

6,095✔
1869
        // Send out statsz updates periodically.
6,095✔
1870
        s.wrapChk(s.startStatszTimer)()
6,095✔
1871

6,095✔
1872
        // If we have existing accounts make sure we enable account tracking.
6,095✔
1873
        s.mu.Lock()
6,095✔
1874
        s.accounts.Range(func(k, v any) bool {
20,041✔
1875
                acc := v.(*Account)
13,946✔
1876
                s.enableAccountTracking(acc)
13,946✔
1877
                return true
13,946✔
1878
        })
13,946✔
1879
        s.mu.Unlock()
6,095✔
1880

6,095✔
1881
        return nil
6,095✔
1882
}
1883

1884
// Creates an internal system client.
1885
func (s *Server) createInternalSystemClient() *client {
30,583✔
1886
        return s.createInternalClient(SYSTEM)
30,583✔
1887
}
30,583✔
1888

1889
// Creates an internal jetstream client.
1890
func (s *Server) createInternalJetStreamClient() *client {
47,598✔
1891
        return s.createInternalClient(JETSTREAM)
47,598✔
1892
}
47,598✔
1893

1894
// Creates an internal client for Account.
1895
func (s *Server) createInternalAccountClient() *client {
15,244✔
1896
        return s.createInternalClient(ACCOUNT)
15,244✔
1897
}
15,244✔
1898

1899
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
1900
func (s *Server) createInternalClient(kind int) *client {
93,425✔
1901
        if !isInternalClient(kind) {
93,425✔
1902
                return nil
×
1903
        }
×
1904
        now := time.Now()
93,425✔
1905
        c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
93,425✔
1906
        c.initClient()
93,425✔
1907
        c.echo = false
93,425✔
1908
        c.headers = true
93,425✔
1909
        c.flags.set(noReconnect)
93,425✔
1910
        return c
93,425✔
1911
}
1912

1913
// Determine if accounts should track subscriptions for
1914
// efficient propagation.
1915
// Lock should be held on entry.
1916
func (s *Server) shouldTrackSubscriptions() bool {
16,674✔
1917
        opts := s.getOpts()
16,674✔
1918
        return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0)
16,674✔
1919
}
16,674✔
1920

1921
// Invokes registerAccountNoLock under the protection of the server lock.
1922
// That is, server lock is acquired/released in this function.
1923
// See registerAccountNoLock for comment on returned value.
1924
func (s *Server) registerAccount(acc *Account) *Account {
1,424✔
1925
        s.mu.Lock()
1,424✔
1926
        racc := s.registerAccountNoLock(acc)
1,424✔
1927
        s.mu.Unlock()
1,424✔
1928
        return racc
1,424✔
1929
}
1,424✔
1930

1931
// Helper to set the sublist based on preferences.
1932
func (s *Server) setAccountSublist(acc *Account) {
18,098✔
1933
        if acc != nil && acc.sl == nil {
34,910✔
1934
                acc.sl = NewSublistForServer(s)
16,812✔
1935
        }
16,812✔
1936
}
1937

1938
// Registers an account in the server.
1939
// Due to some locking considerations, we may end-up trying
1940
// to register the same account twice. This function will
1941
// then return the already registered account.
1942
// Lock should be held on entry.
1943
func (s *Server) registerAccountNoLock(acc *Account) *Account {
16,821✔
1944
        // We are under the server lock. Lookup from map, if present
16,821✔
1945
        // return existing account.
16,821✔
1946
        if a, _ := s.accounts.Load(acc.Name); a != nil {
16,968✔
1947
                s.tmpAccounts.Delete(acc.Name)
147✔
1948
                return a.(*Account)
147✔
1949
        }
147✔
1950
        // Finish account setup and store.
1951
        s.setAccountSublist(acc)
16,674✔
1952

16,674✔
1953
        acc.mu.Lock()
16,674✔
1954
        s.setRouteInfo(acc)
16,674✔
1955
        if acc.clients == nil {
33,340✔
1956
                acc.clients = make(map[*client]struct{})
16,666✔
1957
        }
16,666✔
1958

1959
        // If we are capable of routing we will track subscription
1960
        // information for efficient interest propagation.
1961
        // During config reload, it is possible that account was
1962
        // already created (global account), so use locking and
1963
        // make sure we create only if needed.
1964
        // TODO(dlc)- Double check that we need this for GWs.
1965
        if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
27,832✔
1966
                acc.rm = make(map[string]int32)
11,158✔
1967
                acc.lqws = make(map[string]int32)
11,158✔
1968
        }
11,158✔
1969
        acc.srv = s
16,674✔
1970
        acc.updated = time.Now()
16,674✔
1971
        accName := acc.Name
16,674✔
1972
        jsEnabled := len(acc.jsLimits) > 0
16,674✔
1973
        acc.mu.Unlock()
16,674✔
1974

16,674✔
1975
        if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 {
16,734✔
1976
                if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok {
80✔
1977
                        if jsEnabled {
23✔
1978
                                s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
3✔
1979
                        } else if defDomain != _EMPTY_ {
29✔
1980
                                for src, dest := range generateJSMappingTable(defDomain) {
90✔
1981
                                        // flip src and dest around so the domain is inserted
81✔
1982
                                        s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
81✔
1983
                                        if err := acc.AddMapping(dest, src); err != nil {
81✔
1984
                                                s.Errorf("Error adding JetStream default domain mapping: %v", err)
×
1985
                                        }
×
1986
                                }
1987
                        }
1988
                }
1989
        }
1990

1991
        s.accounts.Store(acc.Name, acc)
16,674✔
1992
        s.tmpAccounts.Delete(acc.Name)
16,674✔
1993
        s.enableAccountTracking(acc)
16,674✔
1994

16,674✔
1995
        // Can not have server lock here.
16,674✔
1996
        s.mu.Unlock()
16,674✔
1997
        s.registerSystemImports(acc)
16,674✔
1998
        // Starting 2.9.0, we are phasing out the optimistic mode, so change
16,674✔
1999
        // the account to interest-only mode (except if instructed not to do
16,674✔
2000
        // it in some tests).
16,674✔
2001
        if s.gateway.enabled && !gwDoNotForceInterestOnlyMode {
19,564✔
2002
                s.switchAccountToInterestMode(acc.GetName())
2,890✔
2003
        }
2,890✔
2004
        s.mu.Lock()
16,674✔
2005

16,674✔
2006
        return nil
16,674✔
2007
}
2008

2009
// Sets the account's routePoolIdx depending on presence or not of
2010
// pooling or per-account routes. Also updates a map used by
2011
// gateway code to retrieve a route based on some route hash.
2012
//
2013
// Both Server and Account lock held on entry.
2014
func (s *Server) setRouteInfo(acc *Account) {
16,710✔
2015
        // If there is a dedicated route configured for this account
16,710✔
2016
        if _, ok := s.accRoutes[acc.Name]; ok {
20,569✔
2017
                // We want the account name to be in the map, but we don't
3,859✔
2018
                // need a value (we could store empty string)
3,859✔
2019
                s.accRouteByHash.Store(acc.Name, nil)
3,859✔
2020
                // Set the route pool index to -1 so that it is easy when
3,859✔
2021
                // ranging over accounts to exclude those accounts when
3,859✔
2022
                // trying to get accounts for a given pool index.
3,859✔
2023
                acc.routePoolIdx = accDedicatedRoute
3,859✔
2024
        } else {
16,710✔
2025
                // If pool size more than 1, we will compute a hash code and
12,851✔
2026
                // use modulo to assign to an index of the pool slice. For 1
12,851✔
2027
                // and below, all accounts will be bound to the single connection
12,851✔
2028
                // at index 0.
12,851✔
2029
                acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
12,851✔
2030
                if s.routesPoolSize > 1 {
17,952✔
2031
                        s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
5,101✔
2032
                }
5,101✔
2033
        }
2034
}
2035

2036
// lookupAccount is a function to return the account structure
2037
// associated with an account name.
2038
// Lock MUST NOT be held upon entry.
2039
func (s *Server) lookupAccount(name string) (*Account, error) {
2,984,479✔
2040
        return s.lookupOrFetchAccount(name, true)
2,984,479✔
2041
}
2,984,479✔
2042

2043
// lookupOrFetchAccount is a function to return the account structure
2044
// associated with an account name.
2045
// Lock MUST NOT be held upon entry.
2046
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
3,215,282✔
2047
        var acc *Account
3,215,282✔
2048
        if v, ok := s.accounts.Load(name); ok {
6,428,067✔
2049
                acc = v.(*Account)
3,212,785✔
2050
        }
3,212,785✔
2051
        if acc != nil {
6,428,067✔
2052
                // If we are expired and we have a resolver, then
3,212,785✔
2053
                // return the latest information from the resolver.
3,212,785✔
2054
                if acc.IsExpired() {
3,212,795✔
2055
                        s.Debugf("Requested account [%s] has expired", name)
10✔
2056
                        if s.AccountResolver() != nil && fetch {
20✔
2057
                                if err := s.updateAccount(acc); err != nil {
12✔
2058
                                        // This error could mask expired, so just return expired here.
2✔
2059
                                        return nil, ErrAccountExpired
2✔
2060
                                }
2✔
2061
                        } else {
×
2062
                                return nil, ErrAccountExpired
×
2063
                        }
×
2064
                }
2065
                return acc, nil
3,212,783✔
2066
        }
2067
        // If we have a resolver see if it can fetch the account.
2068
        if s.AccountResolver() == nil || !fetch {
3,621✔
2069
                return nil, ErrMissingAccount
1,124✔
2070
        }
1,124✔
2071
        return s.fetchAccount(name)
1,373✔
2072
}
2073

2074
// LookupAccount is a public function to return the account structure
2075
// associated with name.
2076
func (s *Server) LookupAccount(name string) (*Account, error) {
2,826,965✔
2077
        return s.lookupAccount(name)
2,826,965✔
2078
}
2,826,965✔
2079

2080
// This will fetch new claims and if found update the account with new claims.
2081
// Lock MUST NOT be held upon entry.
2082
func (s *Server) updateAccount(acc *Account) error {
10✔
2083
        acc.mu.RLock()
10✔
2084
        // TODO(dlc) - Make configurable
10✔
2085
        if !acc.incomplete && time.Since(acc.updated) < time.Second {
10✔
2086
                acc.mu.RUnlock()
×
2087
                s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
×
2088
                return ErrAccountResolverUpdateTooSoon
×
2089
        }
×
2090
        acc.mu.RUnlock()
10✔
2091
        claimJWT, err := s.fetchRawAccountClaims(acc.Name)
10✔
2092
        if err != nil {
12✔
2093
                return err
2✔
2094
        }
2✔
2095
        return s.updateAccountWithClaimJWT(acc, claimJWT)
8✔
2096
}
2097

2098
// updateAccountWithClaimJWT will check and apply the claim update.
2099
// Lock MUST NOT be held upon entry.
2100
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
330✔
2101
        if acc == nil {
330✔
2102
                return ErrMissingAccount
×
2103
        }
×
2104
        acc.mu.RLock()
330✔
2105
        sameClaim := acc.claimJWT != _EMPTY_ && acc.claimJWT == claimJWT && !acc.incomplete
330✔
2106
        acc.mu.RUnlock()
330✔
2107
        if sameClaim {
509✔
2108
                s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
179✔
2109
                return nil
179✔
2110
        }
179✔
2111
        accClaims, _, err := s.verifyAccountClaims(claimJWT)
151✔
2112
        if err == nil && accClaims != nil {
302✔
2113
                acc.mu.Lock()
151✔
2114
                // if an account is updated with a different operator signing key, we want to
151✔
2115
                // show a consistent issuer.
151✔
2116
                acc.Issuer = accClaims.Issuer
151✔
2117
                if acc.Name != accClaims.Subject {
152✔
2118
                        acc.mu.Unlock()
1✔
2119
                        return ErrAccountValidation
1✔
2120
                }
1✔
2121
                acc.mu.Unlock()
150✔
2122
                s.UpdateAccountClaims(acc, accClaims)
150✔
2123
                acc.mu.Lock()
150✔
2124
                // needs to be set after update completed.
150✔
2125
                // This causes concurrent calls to return with sameClaim=true if the change is effective.
150✔
2126
                acc.claimJWT = claimJWT
150✔
2127
                acc.mu.Unlock()
150✔
2128
                return nil
150✔
2129
        }
2130
        return err
×
2131
}
2132

2133
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
2134
// Lock is NOT held upon entry.
2135
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
1,507✔
2136
        accResolver := s.AccountResolver()
1,507✔
2137
        if accResolver == nil {
1,507✔
2138
                return _EMPTY_, ErrNoAccountResolver
×
2139
        }
×
2140
        // Need to do actual Fetch
2141
        start := time.Now()
1,507✔
2142
        claimJWT, err := fetchAccount(accResolver, name)
1,507✔
2143
        fetchTime := time.Since(start)
1,507✔
2144
        if fetchTime > time.Second {
1,511✔
2145
                s.Warnf("Account [%s] fetch took %v", name, fetchTime)
4✔
2146
        } else {
1,507✔
2147
                s.Debugf("Account [%s] fetch took %v", name, fetchTime)
1,503✔
2148
        }
1,503✔
2149
        if err != nil {
1,551✔
2150
                s.Warnf("Account fetch failed: %v", err)
44✔
2151
                return "", err
44✔
2152
        }
44✔
2153
        return claimJWT, nil
1,463✔
2154
}
2155

2156
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
2157
// Lock is NOT held upon entry.
2158
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
1,497✔
2159
        claimJWT, err := s.fetchRawAccountClaims(name)
1,497✔
2160
        if err != nil {
1,539✔
2161
                return nil, _EMPTY_, err
42✔
2162
        }
42✔
2163
        var claim *jwt.AccountClaims
1,455✔
2164
        claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
1,455✔
2165
        if claim != nil && claim.Subject != name {
1,455✔
2166
                return nil, _EMPTY_, ErrAccountValidation
×
2167
        }
×
2168
        return claim, claimJWT, err
1,455✔
2169
}
2170

2171
// verifyAccountClaims will decode and validate any account claims.
2172
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
2,104✔
2173
        accClaims, err := jwt.DecodeAccountClaims(claimJWT)
2,104✔
2174
        if err != nil {
2,105✔
2175
                return nil, _EMPTY_, err
1✔
2176
        }
1✔
2177
        if !s.isTrustedIssuer(accClaims.Issuer) {
2,110✔
2178
                return nil, _EMPTY_, ErrAccountValidation
7✔
2179
        }
7✔
2180
        vr := jwt.CreateValidationResults()
2,096✔
2181
        accClaims.Validate(vr)
2,096✔
2182
        if vr.IsBlocking(true) {
2,102✔
2183
                return nil, _EMPTY_, ErrAccountValidation
6✔
2184
        }
6✔
2185
        return accClaims, claimJWT, nil
2,090✔
2186
}
2187

2188
// This will fetch an account from a resolver if defined.
2189
// Lock is NOT held upon entry.
2190
func (s *Server) fetchAccount(name string) (*Account, error) {
1,476✔
2191
        accClaims, claimJWT, err := s.fetchAccountClaims(name)
1,476✔
2192
        if accClaims == nil {
1,529✔
2193
                return nil, err
53✔
2194
        }
53✔
2195
        acc := s.buildInternalAccount(accClaims)
1,423✔
2196
        // Due to possible race, if registerAccount() returns a non
1,423✔
2197
        // nil account, it means the same account was already
1,423✔
2198
        // registered and we should use this one.
1,423✔
2199
        if racc := s.registerAccount(acc); racc != nil {
1,561✔
2200
                // Update with the new claims in case they are new.
138✔
2201
                if err = s.updateAccountWithClaimJWT(racc, claimJWT); err != nil {
138✔
2202
                        return nil, err
×
2203
                }
×
2204
                return racc, nil
138✔
2205
        }
2206
        // The sub imports may have been setup but will not have had their
2207
        // subscriptions properly setup. Do that here.
2208
        var needImportSubs bool
1,285✔
2209

1,285✔
2210
        acc.mu.Lock()
1,285✔
2211
        acc.claimJWT = claimJWT
1,285✔
2212
        if len(acc.imports.services) > 0 {
2,200✔
2213
                if acc.ic == nil {
1,652✔
2214
                        acc.ic = s.createInternalAccountClient()
737✔
2215
                        acc.ic.acc = acc
737✔
2216
                }
737✔
2217
                needImportSubs = true
915✔
2218
        }
2219
        acc.mu.Unlock()
1,285✔
2220

1,285✔
2221
        // Do these outside the lock.
1,285✔
2222
        if needImportSubs {
2,200✔
2223
                acc.addAllServiceImportSubs()
915✔
2224
        }
915✔
2225

2226
        return acc, nil
1,285✔
2227
}
2228

2229
// Start up the server, this will not block.
2230
//
2231
// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed
2232
// after calling s.Shutdown()
2233
func (s *Server) Start() {
6,771✔
2234
        s.Noticef("Starting nats-server")
6,771✔
2235

6,771✔
2236
        gc := gitCommit
6,771✔
2237
        if gc == _EMPTY_ {
13,542✔
2238
                gc = "not set"
6,771✔
2239
        }
6,771✔
2240

2241
        // Snapshot server options.
2242
        opts := s.getOpts()
6,771✔
2243

6,771✔
2244
        // Capture if this server is a leaf that has no cluster, so we don't
6,771✔
2245
        // display the cluster name if that is the case.
6,771✔
2246
        s.mu.RLock()
6,771✔
2247
        leafNoCluster := s.leafNoCluster
6,771✔
2248
        s.mu.RUnlock()
6,771✔
2249

6,771✔
2250
        var clusterName string
6,771✔
2251
        if !leafNoCluster {
12,535✔
2252
                clusterName = s.ClusterName()
5,764✔
2253
        }
5,764✔
2254

2255
        s.Noticef("  Version:  %s", VERSION)
6,771✔
2256
        s.Noticef("  Git:      [%s]", gc)
6,771✔
2257
        s.Debugf("  Go build: %s", s.info.GoVersion)
6,771✔
2258
        if clusterName != _EMPTY_ {
11,630✔
2259
                s.Noticef("  Cluster:  %s", clusterName)
4,859✔
2260
        }
4,859✔
2261
        s.Noticef("  Name:     %s", s.info.Name)
6,771✔
2262
        if opts.JetStream {
11,211✔
2263
                s.Noticef("  Node:     %s", getHash(s.info.Name))
4,440✔
2264
        }
4,440✔
2265
        s.Noticef("  ID:       %s", s.info.ID)
6,771✔
2266
        s.printFeatureFlags(opts)
6,771✔
2267

6,771✔
2268
        defer s.Noticef("Server is ready")
6,771✔
2269

6,771✔
2270
        // Check for insecure configurations.
6,771✔
2271
        s.checkAuthforWarnings()
6,771✔
2272

6,771✔
2273
        // Avoid RACE between Start() and Shutdown()
6,771✔
2274
        s.running.Store(true)
6,771✔
2275
        s.mu.Lock()
6,771✔
2276
        // Update leafNodeEnabled in case options have changed post NewServer()
6,771✔
2277
        // and before Start() (we should not be able to allow that, but server has
6,771✔
2278
        // direct reference to user-provided options - at least before a Reload() is
6,771✔
2279
        // performed.
6,771✔
2280
        s.leafNodeEnabled = opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0
6,771✔
2281
        s.mu.Unlock()
6,771✔
2282

6,771✔
2283
        s.grMu.Lock()
6,771✔
2284
        s.grRunning = true
6,771✔
2285
        s.grMu.Unlock()
6,771✔
2286

6,771✔
2287
        s.startRateLimitLogExpiration()
6,771✔
2288

6,771✔
2289
        // Pprof http endpoint for the profiler.
6,771✔
2290
        if opts.ProfPort != 0 {
6,772✔
2291
                s.StartProfiler()
1✔
2292
        } else {
6,771✔
2293
                // It's still possible to access this profile via a SYS endpoint, so set
6,770✔
2294
                // this anyway. (Otherwise StartProfiler would have called it.)
6,770✔
2295
                s.setBlockProfileRate(opts.ProfBlockRate)
6,770✔
2296
        }
6,770✔
2297

2298
        if opts.ConfigFile != _EMPTY_ {
11,645✔
2299
                var cd string
4,874✔
2300
                if opts.configDigest != "" {
9,748✔
2301
                        cd = fmt.Sprintf("(%s)", opts.configDigest)
4,874✔
2302
                }
4,874✔
2303
                s.Noticef("Using configuration file: %s %s", opts.ConfigFile, cd)
4,874✔
2304
        }
2305

2306
        hasOperators := len(opts.TrustedOperators) > 0
6,771✔
2307
        if hasOperators {
7,070✔
2308
                s.Noticef("Trusted Operators")
299✔
2309
        }
299✔
2310
        for _, opc := range opts.TrustedOperators {
7,070✔
2311
                s.Noticef("  System  : %q", opc.Audience)
299✔
2312
                s.Noticef("  Operator: %q", opc.Name)
299✔
2313
                s.Noticef("  Issued  : %v", time.Unix(opc.IssuedAt, 0))
299✔
2314
                switch opc.Expires {
299✔
2315
                case 0:
12✔
2316
                        s.Noticef("  Expires : Never")
12✔
2317
                default:
287✔
2318
                        s.Noticef("  Expires : %v", time.Unix(opc.Expires, 0))
287✔
2319
                }
2320
        }
2321
        if hasOperators && opts.SystemAccount == _EMPTY_ {
6,809✔
2322
                s.Warnf("Trusted Operators should utilize a System Account")
38✔
2323
        }
38✔
2324
        if opts.MaxPayload > MAX_PAYLOAD_MAX_SIZE {
6,773✔
2325
                s.Warnf("Maximum payloads over %v are generally discouraged and could lead to poor performance",
2✔
2326
                        friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE)))
2✔
2327
        }
2✔
2328

2329
        if len(opts.JsAccDefaultDomain) > 0 {
6,791✔
2330
                s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed")
20✔
2331
        }
20✔
2332

2333
        // If we have a memory resolver, check the accounts here for validation exceptions.
2334
        // This allows them to be logged right away vs when they are accessed via a client.
2335
        if hasOperators && len(opts.resolverPreloads) > 0 {
6,936✔
2336
                s.checkResolvePreloads()
165✔
2337
        }
165✔
2338

2339
        // Log the pid to a file.
2340
        if opts.PidFile != _EMPTY_ {
6,772✔
2341
                if err := s.logPid(); err != nil {
1✔
2342
                        s.Fatalf("Could not write pidfile: %v", err)
×
2343
                        return
×
2344
                }
×
2345
        }
2346

2347
        // Setup system account which will start the eventing stack.
2348
        if sa := opts.SystemAccount; sa != _EMPTY_ {
10,710✔
2349
                if err := s.SetSystemAccount(sa); err != nil {
3,939✔
2350
                        s.Fatalf("Can't set system account: %v", err)
×
2351
                        return
×
2352
                }
×
2353
        } else if !opts.NoSystemAccount {
5,036✔
2354
                // We will create a default system account here.
2,204✔
2355
                s.SetDefaultSystemAccount()
2,204✔
2356
        }
2,204✔
2357

2358
        // Start monitoring before enabling other subsystems of the
2359
        // server to be able to monitor during startup.
2360
        if err := s.StartMonitoring(); err != nil {
6,772✔
2361
                s.Fatalf("Can't start monitoring: %v", err)
1✔
2362
                return
1✔
2363
        }
1✔
2364

2365
        // Start up resolver machinery.
2366
        if ar := s.AccountResolver(); ar != nil {
7,126✔
2367
                if err := ar.Start(s); err != nil {
356✔
2368
                        s.Fatalf("Could not start resolver: %v", err)
×
2369
                        return
×
2370
                }
×
2371
                // In operator mode, when the account resolver depends on an external system and
2372
                // the system account is the bootstrapping account, start fetching it.
2373
                if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
617✔
2374
                        opts := s.getOpts()
261✔
2375
                        _, isMemResolver := ar.(*MemAccResolver)
261✔
2376
                        if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
325✔
2377
                                s.Noticef("Using bootstrapping system account")
64✔
2378
                                s.startGoRoutine(func() {
128✔
2379
                                        defer s.grWG.Done()
64✔
2380
                                        t := time.NewTicker(time.Second)
64✔
2381
                                        defer t.Stop()
64✔
2382
                                        for {
170✔
2383
                                                select {
106✔
2384
                                                case <-s.quitCh:
41✔
2385
                                                        return
41✔
2386
                                                case <-t.C:
65✔
2387
                                                        sacc := s.SystemAccount()
65✔
2388
                                                        if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil {
107✔
2389
                                                                continue
42✔
2390
                                                        } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil {
23✔
2391
                                                                continue
×
2392
                                                        }
2393
                                                        s.Noticef("System account fetched and updated")
23✔
2394
                                                        return
23✔
2395
                                                }
2396
                                        }
2397
                                })
2398
                        }
2399
                }
2400
        }
2401

2402
        // Start expiration of mapped GW replies, regardless if
2403
        // this server is configured with gateway or not.
2404
        s.startGWReplyMapExpiration()
6,770✔
2405

6,770✔
2406
        // Check if JetStream has been enabled. This needs to be after
6,770✔
2407
        // the system account setup above. JetStream will create its
6,770✔
2408
        // own system account if one is not present.
6,770✔
2409
        if opts.JetStream {
11,210✔
2410
                // Make sure someone is not trying to enable on the system account.
4,440✔
2411
                if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 {
4,440✔
2412
                        s.Fatalf("Not allowed to enable JetStream on the system account")
×
2413
                }
×
2414
                cfg := &JetStreamConfig{
4,440✔
2415
                        StoreDir:     opts.StoreDir,
4,440✔
2416
                        SyncInterval: opts.SyncInterval,
4,440✔
2417
                        SyncAlways:   opts.SyncAlways,
4,440✔
2418
                        Strict:       !opts.NoJetStreamStrict,
4,440✔
2419
                        MaxMemory:    opts.JetStreamMaxMemory,
4,440✔
2420
                        MaxStore:     opts.JetStreamMaxStore,
4,440✔
2421
                        Domain:       opts.JetStreamDomain,
4,440✔
2422
                        CompressOK:   true,
4,440✔
2423
                        UniqueTag:    opts.JetStreamUniqueTag,
4,440✔
2424
                }
4,440✔
2425
                if err := s.EnableJetStream(cfg); err != nil {
4,442✔
2426
                        s.Fatalf("Can't start JetStream: %v", err)
2✔
2427
                        return
2✔
2428
                }
2✔
2429
        } else {
2,330✔
2430
                // Check to see if any configured accounts have JetStream enabled.
2,330✔
2431
                sa, ga := s.SystemAccount(), s.GlobalAccount()
2,330✔
2432
                var hasSys, hasGlobal bool
2,330✔
2433
                var total int
2,330✔
2434

2,330✔
2435
                s.accounts.Range(func(k, v any) bool {
7,547✔
2436
                        total++
5,217✔
2437
                        acc := v.(*Account)
5,217✔
2438
                        if acc == sa {
6,847✔
2439
                                hasSys = true
1,630✔
2440
                        } else if acc == ga {
7,547✔
2441
                                hasGlobal = true
2,330✔
2442
                        }
2,330✔
2443
                        acc.mu.RLock()
5,217✔
2444
                        hasJs := len(acc.jsLimits) > 0
5,217✔
2445
                        acc.mu.RUnlock()
5,217✔
2446
                        if hasJs {
5,242✔
2447
                                s.checkJetStreamExports()
25✔
2448
                                acc.enableAllJetStreamServiceImportsAndMappings()
25✔
2449
                        }
25✔
2450
                        return true
5,217✔
2451
                })
2452
                // If we only have the system account and the global account and we are not standalone,
2453
                // go ahead and enable JS on $G in case we are in simple mixed mode setup.
2454
                if total == 2 && hasSys && hasGlobal && !s.standAloneMode() {
2,861✔
2455
                        ga.mu.Lock()
531✔
2456
                        ga.jsLimits = map[string]JetStreamAccountLimits{
531✔
2457
                                _EMPTY_: dynamicJSAccountLimits,
531✔
2458
                        }
531✔
2459
                        ga.mu.Unlock()
531✔
2460
                        s.checkJetStreamExports()
531✔
2461
                        ga.enableAllJetStreamServiceImportsAndMappings()
531✔
2462
                }
531✔
2463
        }
2464

2465
        // Delayed API response handling. Start regardless of JetStream being
2466
        // currently configured or not (since it can be enabled/disabled with
2467
        // configuration reload).
2468
        s.startGoRoutine(s.delayedAPIResponder)
6,768✔
2469

6,768✔
2470
        // Start OCSP Stapling monitoring for TLS certificates if enabled. Hook TLS handshake for
6,768✔
2471
        // OCSP check on peers (LEAF and CLIENT kind) if enabled.
6,768✔
2472
        s.startOCSPMonitoring()
6,768✔
2473

6,768✔
2474
        // Configure OCSP Response Cache for peer OCSP checks if enabled.
6,768✔
2475
        s.initOCSPResponseCache()
6,768✔
2476

6,768✔
2477
        // Start up gateway if needed. Do this before starting the routes, because
6,768✔
2478
        // we want to resolve the gateway host:port so that this information can
6,768✔
2479
        // be sent to other routes.
6,768✔
2480
        if opts.Gateway.Port != 0 {
7,897✔
2481
                s.startGateways()
1,129✔
2482
        }
1,129✔
2483

2484
        // Start websocket server if needed. Do this before starting the routes, and
2485
        // leaf node because we want to resolve the gateway host:port so that this
2486
        // information can be sent to other routes.
2487
        if opts.Websocket.Port != 0 {
6,889✔
2488
                s.startWebsocketServer()
121✔
2489
        }
121✔
2490

2491
        // Start up listen if we want to accept leaf node connections.
2492
        if opts.LeafNode.Port != 0 {
10,706✔
2493
                // Will resolve or assign the advertise address for the leafnode listener.
3,938✔
2494
                // We need that in StartRouting().
3,938✔
2495
                s.startLeafNodeAcceptLoop()
3,938✔
2496
        }
3,938✔
2497

2498
        // Solicit remote servers for leaf node connections.
2499
        if len(opts.LeafNode.Remotes) > 0 {
8,028✔
2500
                s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
1,260✔
2501
        }
1,260✔
2502

2503
        // TODO (ik): I wanted to refactor this by starting the client
2504
        // accept loop first, that is, it would resolve listen spec
2505
        // in place, but start the accept-for-loop in a different go
2506
        // routine. This would get rid of the synchronization between
2507
        // this function and StartRouting, which I also would have wanted
2508
        // to refactor, but both AcceptLoop() and StartRouting() have
2509
        // been exported and not sure if that would break users using them.
2510
        // We could mark them as deprecated and remove in a release or two...
2511

2512
        // The Routing routine needs to wait for the client listen
2513
        // port to be opened and potential ephemeral port selected.
2514
        clientListenReady := make(chan struct{})
6,768✔
2515

6,768✔
2516
        // MQTT
6,768✔
2517
        if opts.MQTT.Port != 0 {
7,038✔
2518
                s.startMQTT()
270✔
2519
        }
270✔
2520

2521
        // Start up routing as well if needed.
2522
        if opts.Cluster.Port != 0 {
11,394✔
2523
                s.startGoRoutine(func() {
9,252✔
2524
                        s.StartRouting(clientListenReady)
4,626✔
2525
                })
4,626✔
2526
        }
2527

2528
        if opts.PortsFileDir != _EMPTY_ {
6,770✔
2529
                s.logPorts()
2✔
2530
        }
2✔
2531

2532
        if opts.TLSRateLimit > 0 {
6,769✔
2533
                s.startGoRoutine(s.logRejectedTLSConns)
1✔
2534
        }
1✔
2535

2536
        // We've finished starting up.
2537
        close(s.startupComplete)
6,768✔
2538

6,768✔
2539
        // Wait for clients.
6,768✔
2540
        if !opts.DontListen {
13,536✔
2541
                s.AcceptLoop(clientListenReady)
6,768✔
2542
        }
6,768✔
2543

2544
        // Bring OSCP Response cache online after accept loop started in anticipation of NATS-enabled cache types
2545
        s.startOCSPResponseCache()
6,768✔
2546
}
2547

2548
func (s *Server) isShuttingDown() bool {
524,365✔
2549
        return s.shutdown.Load()
524,365✔
2550
}
524,365✔
2551

2552
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
2553
// and closing all associated clients.
2554
func (s *Server) Shutdown() {
7,277✔
2555
        if s == nil {
7,281✔
2556
                return
4✔
2557
        }
4✔
2558
        // This is for JetStream R1 Pull Consumers to allow signaling
2559
        // that pending pull requests are invalid.
2560
        s.signalPullConsumers()
7,273✔
2561

7,273✔
2562
        // Transfer off any raft nodes that we are a leader by stepping them down.
7,273✔
2563
        s.stepdownRaftNodes()
7,273✔
2564

7,273✔
2565
        // Shutdown the eventing system as needed.
7,273✔
2566
        // This is done first to send out any messages for
7,273✔
2567
        // account status. We will also clean up any
7,273✔
2568
        // eventing items associated with accounts.
7,273✔
2569
        s.shutdownEventing()
7,273✔
2570

7,273✔
2571
        // Prevent issues with multiple calls.
7,273✔
2572
        if s.isShuttingDown() {
7,712✔
2573
                return
439✔
2574
        }
439✔
2575

2576
        s.mu.Lock()
6,834✔
2577
        s.Noticef("Initiating Shutdown...")
6,834✔
2578

6,834✔
2579
        accRes := s.accResolver
6,834✔
2580

6,834✔
2581
        opts := s.getOpts()
6,834✔
2582

6,834✔
2583
        s.shutdown.Store(true)
6,834✔
2584
        s.running.Store(false)
6,834✔
2585
        s.grMu.Lock()
6,834✔
2586
        s.grRunning = false
6,834✔
2587
        s.grMu.Unlock()
6,834✔
2588
        s.mu.Unlock()
6,834✔
2589

6,834✔
2590
        if accRes != nil {
7,244✔
2591
                accRes.Close()
410✔
2592
        }
410✔
2593

2594
        // Now check and shutdown jetstream.
2595
        s.shutdownJetStream()
6,834✔
2596

6,834✔
2597
        // Now shutdown the nodes
6,834✔
2598
        s.shutdownRaftNodes()
6,834✔
2599

6,834✔
2600
        s.mu.Lock()
6,834✔
2601
        conns := make(map[uint64]*client)
6,834✔
2602

6,834✔
2603
        // Copy off the clients
6,834✔
2604
        for i, c := range s.clients {
8,119✔
2605
                conns[i] = c
1,285✔
2606
        }
1,285✔
2607
        // Copy off the connections that are not yet registered
2608
        // in s.routes, but for which the readLoop has started
2609
        s.grMu.Lock()
6,834✔
2610
        for i, c := range s.grTmpClients {
6,855✔
2611
                conns[i] = c
21✔
2612
        }
21✔
2613
        s.grMu.Unlock()
6,834✔
2614
        // Copy off the routes
6,834✔
2615
        s.forEachRoute(func(r *client) {
24,132✔
2616
                r.mu.Lock()
17,298✔
2617
                conns[r.cid] = r
17,298✔
2618
                r.mu.Unlock()
17,298✔
2619
        })
17,298✔
2620
        // Copy off the gateways
2621
        s.getAllGatewayConnections(conns)
6,834✔
2622

6,834✔
2623
        // Copy off the leaf nodes
6,834✔
2624
        for i, c := range s.leafs {
7,479✔
2625
                conns[i] = c
645✔
2626
        }
645✔
2627

2628
        // Number of done channel responses we expect.
2629
        doneExpected := 0
6,834✔
2630

6,834✔
2631
        // Kick client AcceptLoop()
6,834✔
2632
        if s.listener != nil {
13,592✔
2633
                doneExpected++
6,758✔
2634
                s.listener.Close()
6,758✔
2635
                s.listener = nil
6,758✔
2636
        }
6,758✔
2637

2638
        // Kick websocket server
2639
        doneExpected += s.closeWebsocketServer()
6,834✔
2640

6,834✔
2641
        // Kick MQTT accept loop
6,834✔
2642
        if s.mqtt.listener != nil {
7,103✔
2643
                doneExpected++
269✔
2644
                s.mqtt.listener.Close()
269✔
2645
                s.mqtt.listener = nil
269✔
2646
        }
269✔
2647

2648
        // Kick leafnodes AcceptLoop()
2649
        if s.leafNodeListener != nil {
10,772✔
2650
                doneExpected++
3,938✔
2651
                s.leafNodeListener.Close()
3,938✔
2652
                s.leafNodeListener = nil
3,938✔
2653
        }
3,938✔
2654

2655
        // Kick route AcceptLoop()
2656
        if s.routeListener != nil {
11,457✔
2657
                doneExpected++
4,623✔
2658
                s.routeListener.Close()
4,623✔
2659
                s.routeListener = nil
4,623✔
2660
        }
4,623✔
2661

2662
        // Kick Gateway AcceptLoop()
2663
        if s.gatewayListener != nil {
7,960✔
2664
                doneExpected++
1,126✔
2665
                s.gatewayListener.Close()
1,126✔
2666
                s.gatewayListener = nil
1,126✔
2667
        }
1,126✔
2668

2669
        // Kick HTTP monitoring if its running
2670
        if s.http != nil {
7,901✔
2671
                doneExpected++
1,067✔
2672
                s.http.Close()
1,067✔
2673
                s.http = nil
1,067✔
2674
        }
1,067✔
2675

2676
        // Kick Profiling if its running
2677
        if s.profiler != nil {
6,836✔
2678
                doneExpected++
2✔
2679
                s.profiler.Close()
2✔
2680
        }
2✔
2681

2682
        s.mu.Unlock()
6,834✔
2683

6,834✔
2684
        // Release go routines that wait on that channel
6,834✔
2685
        close(s.quitCh)
6,834✔
2686

6,834✔
2687
        // Close client and route connections
6,834✔
2688
        for _, c := range conns {
27,729✔
2689
                c.setNoReconnect()
20,895✔
2690
                c.closeConnection(ServerShutdown)
20,895✔
2691
        }
20,895✔
2692

2693
        // Block until the accept loops exit
2694
        for doneExpected > 0 {
24,738✔
2695
                <-s.done
17,904✔
2696
                doneExpected--
17,904✔
2697
        }
17,904✔
2698

2699
        // Wait for go routines to be done.
2700
        s.grWG.Wait()
6,834✔
2701

6,834✔
2702
        if opts.PortsFileDir != _EMPTY_ {
6,836✔
2703
                s.deletePortsFile(opts.PortsFileDir)
2✔
2704
        }
2✔
2705

2706
        s.Noticef("Server Exiting..")
6,834✔
2707

6,834✔
2708
        // Stop OCSP Response Cache
6,834✔
2709
        if s.ocsprc != nil {
6,884✔
2710
                s.ocsprc.Stop(s)
50✔
2711
        }
50✔
2712

2713
        // Close logger if applicable. It allows tests on Windows
2714
        // to be able to do proper cleanup (delete log file).
2715
        s.logging.RLock()
6,834✔
2716
        log := s.logging.logger
6,834✔
2717
        s.logging.RUnlock()
6,834✔
2718
        if log != nil {
7,362✔
2719
                if l, ok := log.(*logger.Logger); ok {
538✔
2720
                        l.Close()
10✔
2721
                }
10✔
2722
        }
2723
        // Notify that the shutdown is complete
2724
        close(s.shutdownComplete)
6,834✔
2725
}
2726

2727
// Close the websocket server if running. If so, returns 1, else 0.
2728
// Server lock held on entry.
2729
func (s *Server) closeWebsocketServer() int {
6,840✔
2730
        ws := &s.websocket
6,840✔
2731
        ws.mu.Lock()
6,840✔
2732
        hs := ws.server
6,840✔
2733
        if hs != nil {
6,961✔
2734
                ws.server = nil
121✔
2735
                ws.listener = nil
121✔
2736
        }
121✔
2737
        ws.mu.Unlock()
6,840✔
2738
        if hs != nil {
6,961✔
2739
                hs.Close()
121✔
2740
                return 1
121✔
2741
        }
121✔
2742
        return 0
6,719✔
2743
}
2744

2745
// WaitForShutdown will block until the server has been fully shutdown.
2746
func (s *Server) WaitForShutdown() {
3,385✔
2747
        <-s.shutdownComplete
3,385✔
2748
}
3,385✔
2749

2750
// AcceptLoop is exported for easier testing.
2751
func (s *Server) AcceptLoop(clr chan struct{}) {
6,768✔
2752
        // If we were to exit before the listener is setup properly,
6,768✔
2753
        // make sure we close the channel.
6,768✔
2754
        defer func() {
13,536✔
2755
                if clr != nil {
6,769✔
2756
                        close(clr)
1✔
2757
                }
1✔
2758
        }()
2759

2760
        if s.isShuttingDown() {
6,769✔
2761
                return
1✔
2762
        }
1✔
2763

2764
        // Snapshot server options.
2765
        opts := s.getOpts()
6,767✔
2766

6,767✔
2767
        // Setup state that can enable shutdown
6,767✔
2768
        s.mu.Lock()
6,767✔
2769
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
6,767✔
2770
        l, e := s.getServerListener(hp)
6,767✔
2771
        s.listenerErr = e
6,767✔
2772
        if e != nil {
6,767✔
2773
                s.mu.Unlock()
×
2774
                s.Fatalf("Error listening on port: %s, %q", hp, e)
×
2775
                return
×
2776
        }
×
2777
        s.Noticef("Listening for client connections on %s",
6,767✔
2778
                net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
6,767✔
2779

6,767✔
2780
        // Alert if PROXY protocol is enabled
6,767✔
2781
        if opts.ProxyProtocol {
6,770✔
2782
                s.Noticef("PROXY protocol enabled for client connections")
3✔
2783
        }
3✔
2784

2785
        // Alert of TLS enabled.
2786
        if opts.TLSConfig != nil {
6,926✔
2787
                s.Noticef("TLS required for client connections")
159✔
2788
                if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
165✔
2789
                        s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
6✔
2790
                }
6✔
2791
        }
2792

2793
        // If server was started with RANDOM_PORT (-1), opts.Port would be equal
2794
        // to 0 at the beginning this function. So we need to get the actual port
2795
        if opts.Port == 0 {
12,998✔
2796
                // Write resolved port back to options.
6,231✔
2797
                opts.Port = l.Addr().(*net.TCPAddr).Port
6,231✔
2798
        }
6,231✔
2799

2800
        // Now that port has been set (if it was set to RANDOM), set the
2801
        // server's info Host/Port with either values from Options or
2802
        // ClientAdvertise.
2803
        if err := s.setInfoHostPort(); err != nil {
6,767✔
2804
                s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
×
2805
                l.Close()
×
2806
                s.mu.Unlock()
×
2807
                return
×
2808
        }
×
2809
        // Keep track of client connect URLs. We may need them later.
2810
        s.clientConnectURLs = s.getClientConnectURLs()
6,767✔
2811
        s.listener = l
6,767✔
2812

6,767✔
2813
        go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
16,380✔
2814
                func(_ error) bool {
6,764✔
2815
                        if s.isLameDuckMode() {
6,770✔
2816
                                // Signal that we are not accepting new clients
6✔
2817
                                s.ldmCh <- true
6✔
2818
                                // Now wait for the Shutdown...
6✔
2819
                                <-s.quitCh
6✔
2820
                                return true
6✔
2821
                        }
6✔
2822
                        return false
6,758✔
2823
                })
2824
        s.mu.Unlock()
6,767✔
2825

6,767✔
2826
        // Let the caller know that we are ready
6,767✔
2827
        close(clr)
6,767✔
2828
        clr = nil
6,767✔
2829
}
2830

2831
// getServerListener returns a network listener for the given host-port address.
2832
// If the Server already has an active listener (s.listener), it returns that listener
2833
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
2834
// a new TCP listener on the specified address using natsListen.
2835
func (s *Server) getServerListener(hp string) (net.Listener, error) {
6,767✔
2836
        if s.listener != nil {
6,767✔
2837
                return s.listener, s.listenerErr
×
2838
        }
×
2839

2840
        return natsListen("tcp", hp)
6,767✔
2841
}
2842

2843
// InProcessConn returns an in-process connection to the server,
2844
// avoiding the need to use a TCP listener for local connectivity
2845
// within the same process. This can be used regardless of the
2846
// state of the DontListen option.
2847
func (s *Server) InProcessConn() (net.Conn, error) {
8✔
2848
        pl, pr := net.Pipe()
8✔
2849
        if !s.startGoRoutine(func() {
16✔
2850
                s.createClientInProcess(pl)
8✔
2851
                s.grWG.Done()
8✔
2852
        }) {
8✔
2853
                pl.Close()
×
2854
                pr.Close()
×
2855
                return nil, fmt.Errorf("failed to create connection")
×
2856
        }
×
2857
        return pr, nil
8✔
2858
}
2859

2860
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
16,723✔
2861
        tmpDelay := ACCEPT_MIN_SLEEP
16,723✔
2862

16,723✔
2863
        for {
78,755✔
2864
                conn, err := l.Accept()
62,032✔
2865
                if err != nil {
78,752✔
2866
                        if errFunc != nil && errFunc(err) {
16,726✔
2867
                                return
6✔
2868
                        }
6✔
2869
                        if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
33,428✔
2870
                                break
16,714✔
2871
                        }
2872
                        continue
×
2873
                }
2874
                tmpDelay = ACCEPT_MIN_SLEEP
45,309✔
2875
                if !s.startGoRoutine(func() {
90,614✔
2876
                        s.reloadMu.RLock()
45,305✔
2877
                        createFunc(conn)
45,305✔
2878
                        s.reloadMu.RUnlock()
45,305✔
2879
                        s.grWG.Done()
45,305✔
2880
                }) {
45,309✔
2881
                        conn.Close()
4✔
2882
                }
4✔
2883
        }
2884
        s.Debugf(acceptName + " accept loop exiting..")
16,714✔
2885
        s.done <- true
16,714✔
2886
}
2887

2888
// This function sets the server's info Host/Port based on server Options.
2889
// Note that this function may be called during config reload, this is why
2890
// Host/Port may be reset to original Options if the ClientAdvertise option
2891
// is not set (since it may have previously been).
2892
func (s *Server) setInfoHostPort() error {
13,666✔
2893
        // When this function is called, opts.Port is set to the actual listen
13,666✔
2894
        // port (if option was originally set to RANDOM), even during a config
13,666✔
2895
        // reload. So use of s.opts.Port is safe.
13,666✔
2896
        opts := s.getOpts()
13,666✔
2897
        if opts.ClientAdvertise != _EMPTY_ {
13,668✔
2898
                h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
2✔
2899
                if err != nil {
2✔
2900
                        return err
×
2901
                }
×
2902
                s.info.Host = h
2✔
2903
                s.info.Port = p
2✔
2904
        } else {
13,664✔
2905
                s.info.Host = opts.Host
13,664✔
2906
                s.info.Port = opts.Port
13,664✔
2907
        }
13,664✔
2908
        return nil
13,666✔
2909
}
2910

2911
// StartProfiler is called to enable dynamic profiling.
2912
func (s *Server) StartProfiler() {
2✔
2913
        if s.isShuttingDown() {
2✔
2914
                return
×
2915
        }
×
2916

2917
        // Snapshot server options.
2918
        opts := s.getOpts()
2✔
2919

2✔
2920
        port := opts.ProfPort
2✔
2921

2✔
2922
        // Check for Random Port
2✔
2923
        if port == -1 {
3✔
2924
                port = 0
1✔
2925
        }
1✔
2926

2927
        s.mu.Lock()
2✔
2928
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
2✔
2929
        l, err := net.Listen("tcp", hp)
2✔
2930

2✔
2931
        if err != nil {
2✔
2932
                s.mu.Unlock()
×
2933
                s.Fatalf("error starting profiler: %s", err)
×
2934
                return
×
2935
        }
×
2936
        s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
2✔
2937

2✔
2938
        srv := &http.Server{
2✔
2939
                Addr:           hp,
2✔
2940
                Handler:        http.DefaultServeMux,
2✔
2941
                MaxHeaderBytes: 1 << 20,
2✔
2942
                ReadTimeout:    time.Second * 5,
2✔
2943
        }
2✔
2944
        s.profiler = l
2✔
2945
        s.profilingServer = srv
2✔
2946

2✔
2947
        s.setBlockProfileRate(opts.ProfBlockRate)
2✔
2948

2✔
2949
        go func() {
4✔
2950
                // if this errors out, it's probably because the server is being shutdown
2✔
2951
                err := srv.Serve(l)
2✔
2952
                if err != nil {
4✔
2953
                        if !s.isShuttingDown() {
2✔
2954
                                s.Fatalf("error starting profiler: %s", err)
×
2955
                        }
×
2956
                }
2957
                srv.Close()
2✔
2958
                s.done <- true
2✔
2959
        }()
2960
        s.mu.Unlock()
2✔
2961
}
2962

2963
func (s *Server) setBlockProfileRate(rate int) {
6,772✔
2964
        // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
6,772✔
2965
        runtime.SetBlockProfileRate(rate)
6,772✔
2966

6,772✔
2967
        if rate > 0 {
6,772✔
2968
                s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
×
2969
        }
×
2970
}
2971

2972
// StartHTTPMonitoring will enable the HTTP monitoring port.
2973
// DEPRECATED: Should use StartMonitoring.
2974
func (s *Server) StartHTTPMonitoring() {
×
2975
        s.startMonitoring(false)
×
2976
}
×
2977

2978
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
2979
// DEPRECATED: Should use StartMonitoring.
2980
func (s *Server) StartHTTPSMonitoring() {
×
2981
        s.startMonitoring(true)
×
2982
}
×
2983

2984
// StartMonitoring starts the HTTP or HTTPs server if needed.
2985
func (s *Server) StartMonitoring() error {
6,775✔
2986
        // Snapshot server options.
6,775✔
2987
        opts := s.getOpts()
6,775✔
2988

6,775✔
2989
        // Specifying both HTTP and HTTPS ports is a misconfiguration
6,775✔
2990
        if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
6,776✔
2991
                return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
1✔
2992
        }
1✔
2993
        var err error
6,774✔
2994
        if opts.HTTPPort != 0 {
7,830✔
2995
                err = s.startMonitoring(false)
1,056✔
2996
        } else if opts.HTTPSPort != 0 {
6,789✔
2997
                if opts.TLSConfig == nil {
17✔
2998
                        return fmt.Errorf("TLS cert and key required for HTTPS")
2✔
2999
                }
2✔
3000
                err = s.startMonitoring(true)
13✔
3001
        }
3002
        return err
6,772✔
3003
}
3004

3005
// HTTP endpoints
3006
const (
3007
        RootPath         = "/"
3008
        VarzPath         = "/varz"
3009
        ConnzPath        = "/connz"
3010
        RoutezPath       = "/routez"
3011
        GatewayzPath     = "/gatewayz"
3012
        LeafzPath        = "/leafz"
3013
        SubszPath        = "/subsz"
3014
        StackszPath      = "/stacksz"
3015
        AccountzPath     = "/accountz"
3016
        AccountStatzPath = "/accstatz"
3017
        JszPath          = "/jsz"
3018
        HealthzPath      = "/healthz"
3019
        IPQueuesPath     = "/ipqueuesz"
3020
        RaftzPath        = "/raftz"
3021
        ExpvarzPath      = "/debug/vars"
3022
)
3023

3024
func (s *Server) basePath(p string) string {
17,120✔
3025
        return path.Join(s.httpBasePath, p)
17,120✔
3026
}
17,120✔
3027

3028
type captureHTTPServerLog struct {
3029
        s      *Server
3030
        prefix string
3031
}
3032

3033
func (cl *captureHTTPServerLog) Write(p []byte) (int, error) {
3✔
3034
        var buf [128]byte
3✔
3035
        var b = buf[:0]
3✔
3036

3✔
3037
        b = append(b, []byte(cl.prefix)...)
3✔
3038
        offset := 0
3✔
3039
        if bytes.HasPrefix(p, []byte("http:")) {
6✔
3040
                offset = 6
3✔
3041
        }
3✔
3042
        b = append(b, p[offset:]...)
3✔
3043
        cl.s.Errorf(string(b))
3✔
3044
        return len(p), nil
3✔
3045
}
3046

3047
// The TLS configuration is passed to the listener when the monitoring
3048
// "server" is setup. That prevents TLS configuration updates on reload
3049
// from being used. By setting this function in tls.Config.GetConfigForClient
3050
// we instruct the TLS handshake to ask for the tls configuration to be
3051
// used for a specific client. We don't care which client, we always use
3052
// the same TLS configuration.
3053
func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, error) {
4✔
3054
        opts := s.getOpts()
4✔
3055
        tc := opts.TLSConfig.Clone()
4✔
3056
        tc.ClientAuth = tls.NoClientCert
4✔
3057
        return tc, nil
4✔
3058
}
4✔
3059

3060
// Start the monitoring server
3061
func (s *Server) startMonitoring(secure bool) error {
1,069✔
3062
        if s.isShuttingDown() {
1,070✔
3063
                return nil
1✔
3064
        }
1✔
3065

3066
        // Snapshot server options.
3067
        opts := s.getOpts()
1,068✔
3068

1,068✔
3069
        var (
1,068✔
3070
                hp           string
1,068✔
3071
                err          error
1,068✔
3072
                httpListener net.Listener
1,068✔
3073
                port         int
1,068✔
3074
        )
1,068✔
3075

1,068✔
3076
        monitorProtocol := "http"
1,068✔
3077

1,068✔
3078
        if secure {
1,081✔
3079
                monitorProtocol += "s"
13✔
3080
                port = opts.HTTPSPort
13✔
3081
                if port == -1 {
17✔
3082
                        port = 0
4✔
3083
                }
4✔
3084
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
13✔
3085
                config := opts.TLSConfig.Clone()
13✔
3086
                if !s.ocspPeerVerify {
25✔
3087
                        config.GetConfigForClient = s.getMonitoringTLSConfig
12✔
3088
                        config.ClientAuth = tls.NoClientCert
12✔
3089
                }
12✔
3090
                httpListener, err = tls.Listen("tcp", hp, config)
13✔
3091

3092
        } else {
1,055✔
3093
                port = opts.HTTPPort
1,055✔
3094
                if port == -1 {
1,914✔
3095
                        port = 0
859✔
3096
                }
859✔
3097
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
1,055✔
3098
                httpListener, err = net.Listen("tcp", hp)
1,055✔
3099
        }
3100

3101
        if err != nil {
1,069✔
3102
                return fmt.Errorf("can't listen to the monitor port: %v", err)
1✔
3103
        }
1✔
3104

3105
        rport := httpListener.Addr().(*net.TCPAddr).Port
1,067✔
3106
        s.Noticef("Starting %s monitor on %s", monitorProtocol, net.JoinHostPort(opts.HTTPHost, strconv.Itoa(rport)))
1,067✔
3107

1,067✔
3108
        mux := http.NewServeMux()
1,067✔
3109

1,067✔
3110
        // Root
1,067✔
3111
        mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
1,067✔
3112
        // Varz
1,067✔
3113
        mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
1,067✔
3114
        // Connz
1,067✔
3115
        mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
1,067✔
3116
        // Routez
1,067✔
3117
        mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
1,067✔
3118
        // Gatewayz
1,067✔
3119
        mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
1,067✔
3120
        // Leafz
1,067✔
3121
        mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
1,067✔
3122
        // Subz
1,067✔
3123
        mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
1,067✔
3124
        // Subz alias for backwards compatibility
1,067✔
3125
        mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
1,067✔
3126
        // Stacksz
1,067✔
3127
        mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
1,067✔
3128
        // Accountz
1,067✔
3129
        mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
1,067✔
3130
        // Accstatz
1,067✔
3131
        mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz)
1,067✔
3132
        // Jsz
1,067✔
3133
        mux.HandleFunc(s.basePath(JszPath), s.HandleJsz)
1,067✔
3134
        // Healthz
1,067✔
3135
        mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz)
1,067✔
3136
        // IPQueuesz
1,067✔
3137
        mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz)
1,067✔
3138
        // Raftz
1,067✔
3139
        mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz)
1,067✔
3140
        // Expvarz
1,067✔
3141
        mux.Handle(s.basePath(ExpvarzPath), expvar.Handler())
1,067✔
3142

1,067✔
3143
        // Do not set a WriteTimeout because it could cause cURL/browser
1,067✔
3144
        // to return empty response or unable to display page if the
1,067✔
3145
        // server needs more time to build the response.
1,067✔
3146
        srv := &http.Server{
1,067✔
3147
                Addr:              hp,
1,067✔
3148
                Handler:           mux,
1,067✔
3149
                MaxHeaderBytes:    1 << 20,
1,067✔
3150
                ErrorLog:          log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
1,067✔
3151
                ReadHeaderTimeout: time.Second * 5,
1,067✔
3152
        }
1,067✔
3153
        s.mu.Lock()
1,067✔
3154
        s.http = httpListener
1,067✔
3155
        s.httpHandler = mux
1,067✔
3156
        s.monitoringServer = srv
1,067✔
3157
        s.mu.Unlock()
1,067✔
3158

1,067✔
3159
        go func() {
2,134✔
3160
                if err := srv.Serve(httpListener); err != nil {
2,134✔
3161
                        if !s.isShuttingDown() {
1,067✔
3162
                                s.Fatalf("Error starting monitor on %q: %v", hp, err)
×
3163
                        }
×
3164
                }
3165
                srv.Close()
1,067✔
3166
                s.mu.Lock()
1,067✔
3167
                s.httpHandler = nil
1,067✔
3168
                s.mu.Unlock()
1,067✔
3169
                s.done <- true
1,067✔
3170
        }()
3171

3172
        return nil
1,067✔
3173
}
3174

3175
// HTTPHandler returns the http.Handler object used to handle monitoring
3176
// endpoints. It will return nil if the server is not configured for
3177
// monitoring, or if the server has not been started yet (Server.Start()).
3178
func (s *Server) HTTPHandler() http.Handler {
2✔
3179
        s.mu.Lock()
2✔
3180
        defer s.mu.Unlock()
2✔
3181
        return s.httpHandler
2✔
3182
}
2✔
3183

3184
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
3185
// If updates are made to Info, this function should be consulted and updated.
3186
// Assume lock is held.
3187
func (s *Server) copyInfo() Info {
18,924✔
3188
        info := s.info
18,924✔
3189
        if len(info.ClientConnectURLs) > 0 {
30,246✔
3190
                info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
11,322✔
3191
        }
11,322✔
3192
        if len(info.WSConnectURLs) > 0 {
19,006✔
3193
                info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
82✔
3194
        }
82✔
3195
        return info
18,924✔
3196
}
3197

3198
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
3199
type tlsMixConn struct {
3200
        net.Conn
3201
        pre *bytes.Buffer
3202
}
3203

3204
// Read for our mixed multi-reader.
3205
func (c *tlsMixConn) Read(b []byte) (int, error) {
51✔
3206
        if c.pre != nil {
60✔
3207
                n, err := c.pre.Read(b)
9✔
3208
                if c.pre.Len() == 0 {
18✔
3209
                        c.pre = nil
9✔
3210
                }
9✔
3211
                return n, err
9✔
3212
        }
3213
        return c.Conn.Read(b)
42✔
3214
}
3215

3216
func (s *Server) createClient(conn net.Conn) *client {
9,829✔
3217
        return s.createClientEx(conn, false)
9,829✔
3218
}
9,829✔
3219

3220
func (s *Server) createClientInProcess(conn net.Conn) *client {
8✔
3221
        return s.createClientEx(conn, true)
8✔
3222
}
8✔
3223

3224
func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
9,837✔
3225
        // Snapshot server options.
9,837✔
3226
        opts := s.getOpts()
9,837✔
3227

9,837✔
3228
        maxPay := int32(opts.MaxPayload)
9,837✔
3229
        maxSubs := int32(opts.MaxSubs)
9,837✔
3230
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
9,837✔
3231
        if maxSubs == 0 {
19,673✔
3232
                maxSubs = -1
9,836✔
3233
        }
9,836✔
3234
        now := time.Now()
9,837✔
3235

9,837✔
3236
        c := &client{
9,837✔
3237
                srv:   s,
9,837✔
3238
                nc:    conn,
9,837✔
3239
                opts:  defaultOpts,
9,837✔
3240
                mpay:  maxPay,
9,837✔
3241
                msubs: maxSubs,
9,837✔
3242
                start: now,
9,837✔
3243
                last:  now,
9,837✔
3244
                iproc: inProcess,
9,837✔
3245
        }
9,837✔
3246

9,837✔
3247
        c.registerWithAccount(s.globalAccount())
9,837✔
3248

9,837✔
3249
        var info Info
9,837✔
3250
        var authRequired bool
9,837✔
3251

9,837✔
3252
        s.mu.Lock()
9,837✔
3253
        // Grab JSON info string
9,837✔
3254
        info = s.copyInfo()
9,837✔
3255
        if s.nonceRequired() {
12,317✔
3256
                // Nonce handling
2,480✔
3257
                var raw [nonceLen]byte
2,480✔
3258
                nonce := raw[:]
2,480✔
3259
                s.generateNonce(nonce)
2,480✔
3260
                info.Nonce = string(nonce)
2,480✔
3261
        }
2,480✔
3262
        c.nonce = []byte(info.Nonce)
9,837✔
3263
        authRequired = info.AuthRequired
9,837✔
3264

9,837✔
3265
        // Check to see if we have auth_required set but we also have a no_auth_user.
9,837✔
3266
        // If so set back to false.
9,837✔
3267
        if info.AuthRequired && opts.NoAuthUser != _EMPTY_ && opts.NoAuthUser != s.sysAccOnlyNoAuthUser {
10,015✔
3268
                info.AuthRequired = false
178✔
3269
        }
178✔
3270

3271
        // Check to see if this is an in-process connection with tls_required.
3272
        // If so, set as not required, but available.
3273
        if inProcess && info.TLSRequired {
9,841✔
3274
                info.TLSRequired = false
4✔
3275
                info.TLSAvailable = true
4✔
3276
        }
4✔
3277

3278
        s.totalClients++
9,837✔
3279
        s.mu.Unlock()
9,837✔
3280

9,837✔
3281
        // Grab lock
9,837✔
3282
        c.mu.Lock()
9,837✔
3283
        if authRequired {
17,050✔
3284
                c.flags.set(expectConnect)
7,213✔
3285
        }
7,213✔
3286

3287
        // Initialize
3288
        c.initClient()
9,837✔
3289

9,837✔
3290
        c.Debugf("Client connection created")
9,837✔
3291

9,837✔
3292
        // Save info.TLSRequired value since we may neeed to change it back and forth.
9,837✔
3293
        orgInfoTLSReq := info.TLSRequired
9,837✔
3294

9,837✔
3295
        var tlsFirstFallback time.Duration
9,837✔
3296
        // Check if we should do TLS first.
9,837✔
3297
        tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst
9,837✔
3298
        if tlsFirst {
9,851✔
3299
                // Make sure info.TLSRequired is set to true (it could be false
14✔
3300
                // if AllowNonTLS is enabled).
14✔
3301
                info.TLSRequired = true
14✔
3302
                // Get the fallback delay value if applicable.
14✔
3303
                if f := opts.TLSHandshakeFirstFallback; f > 0 {
19✔
3304
                        tlsFirstFallback = f
5✔
3305
                } else if inProcess {
17✔
3306
                        // For in-process connection, we will always have a fallback
3✔
3307
                        // delay. It allows support for non-TLS, TLS and "TLS First"
3✔
3308
                        // in-process clients to successfully connect.
3✔
3309
                        tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY
3✔
3310
                }
3✔
3311
        }
3312

3313
        // Decide if we are going to require TLS or not and generate INFO json.
3314
        // If we have ProxyProtocol enabled then we won't include the client
3315
        // IP in the initial INFO, as that would leak the proxy IP itself.
3316
        // In that case we'll send another INFO after the client introduces itself.
3317
        tlsRequired := info.TLSRequired
9,837✔
3318
        infoBytes := c.generateClientInfoJSON(info, !opts.ProxyProtocol)
9,837✔
3319

9,837✔
3320
        // Send our information, except if TLS and TLSHandshakeFirst is requested.
9,837✔
3321
        if !tlsFirst {
19,660✔
3322
                // Need to be sent in place since writeLoop cannot be started until
9,823✔
3323
                // TLS handshake is done (if applicable).
9,823✔
3324
                c.sendProtoNow(infoBytes)
9,823✔
3325
        }
9,823✔
3326

3327
        // Unlock to register
3328
        c.mu.Unlock()
9,837✔
3329

9,837✔
3330
        // Register with the server.
9,837✔
3331
        s.mu.Lock()
9,837✔
3332
        // If server is not running, Shutdown() may have already gathered the
9,837✔
3333
        // list of connections to close. It won't contain this one, so we need
9,837✔
3334
        // to bail out now otherwise the readLoop started down there would not
9,837✔
3335
        // be interrupted. Skip also if in lame duck mode.
9,837✔
3336
        if !s.isRunning() || s.ldm {
10,034✔
3337
                // There are some tests that create a server but don't start it,
197✔
3338
                // and use "async" clients and perform the parsing manually. Such
197✔
3339
                // clients would branch here (since server is not running). However,
197✔
3340
                // when a server was really running and has been shutdown, we must
197✔
3341
                // close this connection.
197✔
3342
                if s.isShuttingDown() {
200✔
3343
                        conn.Close()
3✔
3344
                }
3✔
3345
                s.mu.Unlock()
197✔
3346
                return c
197✔
3347
        }
3348

3349
        // If there is a max connections specified, check that adding
3350
        // this new client would not push us over the max
3351
        if opts.MaxConn < 0 || (opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn) {
9,640✔
3352
                s.mu.Unlock()
×
3353
                c.maxConnExceeded()
×
3354
                return nil
×
3355
        }
×
3356
        s.clients[c.cid] = c
9,640✔
3357

9,640✔
3358
        s.mu.Unlock()
9,640✔
3359

9,640✔
3360
        // Re-Grab lock
9,640✔
3361
        c.mu.Lock()
9,640✔
3362

9,640✔
3363
        isClosed := c.isClosed()
9,640✔
3364
        var pre []byte
9,640✔
3365
        // We need first to check for "TLS First" fallback delay.
9,640✔
3366
        if !isClosed && tlsFirstFallback > 0 {
9,648✔
3367
                // We wait and see if we are getting any data. Since we did not send
8✔
3368
                // the INFO protocol yet, only clients that use TLS first should be
8✔
3369
                // sending data (the TLS handshake). We don't really check the content:
8✔
3370
                // if it is a rogue agent and not an actual client performing the
8✔
3371
                // TLS handshake, the error will be detected when performing the
8✔
3372
                // handshake on our side.
8✔
3373
                pre = make([]byte, 4)
8✔
3374
                c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
8✔
3375
                n, _ := io.ReadFull(c.nc, pre[:])
8✔
3376
                c.nc.SetReadDeadline(time.Time{})
8✔
3377
                // If we get any data (regardless of possible timeout), we will proceed
8✔
3378
                // with the TLS handshake.
8✔
3379
                if n > 0 {
11✔
3380
                        pre = pre[:n]
3✔
3381
                } else {
8✔
3382
                        // We did not get anything so we will send the INFO protocol.
5✔
3383
                        pre = nil
5✔
3384

5✔
3385
                        // Restore the original info.TLSRequired value if it is
5✔
3386
                        // different that the current value and regenerate infoBytes.
5✔
3387
                        if orgInfoTLSReq != info.TLSRequired {
9✔
3388
                                info.TLSRequired = orgInfoTLSReq
4✔
3389
                                infoBytes = c.generateClientInfoJSON(info, !opts.ProxyProtocol)
4✔
3390
                        }
4✔
3391
                        c.sendProtoNow(infoBytes)
5✔
3392
                        // Set the boolean to false for the rest of the function.
5✔
3393
                        tlsFirst = false
5✔
3394
                        // Check closed status again
5✔
3395
                        isClosed = c.isClosed()
5✔
3396
                }
3397
        }
3398
        // If we have both TLS and non-TLS allowed we need to see which
3399
        // one the client wants. We'll always allow this for in-process
3400
        // connections.
3401
        if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) {
9,649✔
3402
                pre = make([]byte, 6) // Minimum 6 bytes for proxy proto in next step.
9✔
3403
                c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
9✔
3404
                n, _ := io.ReadFull(c.nc, pre[:])
9✔
3405
                c.nc.SetReadDeadline(time.Time{})
9✔
3406
                pre = pre[:n]
9✔
3407
                if n > 0 && pre[0] == 0x16 {
12✔
3408
                        tlsRequired = true
3✔
3409
                } else {
9✔
3410
                        tlsRequired = false
6✔
3411
                }
6✔
3412
        }
3413

3414
        // Check for proxy protocol if enabled.
3415
        if !isClosed && !tlsRequired && opts.ProxyProtocol {
9,643✔
3416
                if len(pre) == 0 {
6✔
3417
                        // There has been no pre-read yet, do so so we can work out
3✔
3418
                        // if the client is trying to negotiate PROXY.
3✔
3419
                        pre = make([]byte, 6)
3✔
3420
                        c.nc.SetReadDeadline(time.Now().Add(proxyProtoReadTimeout))
3✔
3421
                        n, _ := io.ReadFull(c.nc, pre)
3✔
3422
                        c.nc.SetReadDeadline(time.Time{})
3✔
3423
                        pre = pre[:n]
3✔
3424
                }
3✔
3425
                conn = &tlsMixConn{conn, bytes.NewBuffer(pre)}
3✔
3426
                addr, proxyPre, err := readProxyProtoHeader(conn)
3✔
3427
                if err != nil && err != errProxyProtoUnrecognized {
3✔
3428
                        // err != errProxyProtoUnrecognized implies that we detected a proxy
×
3429
                        // protocol header but we failed to parse it, so don't continue.
×
3430
                        c.mu.Unlock()
×
3431
                        s.Warnf("Error reading PROXY protocol header from %s: %v", conn.RemoteAddr(), err)
×
3432
                        c.closeConnection(ProtocolViolation)
×
3433
                        return nil
×
3434
                }
×
3435
                // If addr is nil, it was a LOCAL/UNKNOWN command (health check)
3436
                // Use the connection as-is
3437
                if addr != nil {
5✔
3438
                        c.nc = &proxyConn{
2✔
3439
                                Conn:       conn,
2✔
3440
                                remoteAddr: addr,
2✔
3441
                        }
2✔
3442
                        // These were set already by initClient, override them.
2✔
3443
                        c.host = addr.srcIP.String()
2✔
3444
                        c.port = addr.srcPort
2✔
3445
                }
2✔
3446
                // At this point, err is either:
3447
                //  - nil => we parsed the proxy protocol header successfully
3448
                //  - errProxyProtoUnrecognized => we didn't detect proxy protocol at all
3449
                // We only clear the pre-read if we successfully read the protocol header
3450
                // so that the next step doesn't re-read it. Otherwise we have to assume
3451
                // that it's a non-proxied connection and we want the pre-read to remain
3452
                // for the next step.
3453
                if err == nil {
6✔
3454
                        pre = proxyPre
3✔
3455
                }
3✔
3456
                // Because we have ProxyProtocol enabled, our earlier INFO message didn't
3457
                // include the client_ip. If we need to send it again then we will include
3458
                // it, but sending it here immediately can confuse clients who have just
3459
                // PING'd.
3460
                infoBytes = c.generateClientInfoJSON(info, true)
3✔
3461
        }
3462

3463
        // Check for TLS
3464
        if !isClosed && tlsRequired {
10,010✔
3465
                if s.connRateCounter != nil && !s.connRateCounter.allow() {
371✔
3466
                        c.mu.Unlock()
1✔
3467
                        c.sendErr("Connection throttling is active. Please try again later.")
1✔
3468
                        c.closeConnection(MaxConnectionsExceeded)
1✔
3469
                        return nil
1✔
3470
                }
1✔
3471

3472
                // If we have a prebuffer create a multi-reader.
3473
                if len(pre) > 0 {
375✔
3474
                        c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
6✔
3475
                        // Clear pre so it is not parsed.
6✔
3476
                        pre = nil
6✔
3477
                }
6✔
3478
                // Performs server-side TLS handshake.
3479
                if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout, opts.TLSPinnedCerts); err != nil {
501✔
3480
                        c.mu.Unlock()
132✔
3481
                        return nil
132✔
3482
                }
132✔
3483
        }
3484

3485
        // Now, send the INFO if it was delayed
3486
        if !isClosed && tlsFirst {
9,513✔
3487
                c.flags.set(didTLSFirst)
6✔
3488
                c.sendProtoNow(infoBytes)
6✔
3489
                // Check closed status
6✔
3490
                isClosed = c.isClosed()
6✔
3491
        }
6✔
3492

3493
        // Connection could have been closed while sending the INFO proto.
3494
        if isClosed {
9,510✔
3495
                c.mu.Unlock()
3✔
3496
                // We need to call closeConnection() to make sure that proper cleanup is done.
3✔
3497
                c.closeConnection(WriteError)
3✔
3498
                return nil
3✔
3499
        }
3✔
3500

3501
        // Check for Auth. We schedule this timer after the TLS handshake to avoid
3502
        // the race where the timer fires during the handshake and causes the
3503
        // server to write bad data to the socket. See issue #432.
3504
        if authRequired {
16,588✔
3505
                c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
7,084✔
3506
        }
7,084✔
3507

3508
        // Do final client initialization
3509

3510
        // Set the Ping timer. Will be reset once connect was received.
3511
        c.setPingTimer()
9,504✔
3512

9,504✔
3513
        // Spin up the read loop.
9,504✔
3514
        s.startGoRoutine(func() { c.readLoop(pre) })
19,008✔
3515

3516
        // Spin up the write loop.
3517
        s.startGoRoutine(func() { c.writeLoop() })
19,008✔
3518

3519
        if tlsRequired {
9,741✔
3520
                c.Debugf("TLS handshake complete")
237✔
3521
                cs := c.nc.(*tls.Conn).ConnectionState()
237✔
3522
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
237✔
3523
        }
237✔
3524

3525
        c.mu.Unlock()
9,504✔
3526

9,504✔
3527
        return c
9,504✔
3528
}
3529

3530
// This will save off a closed client in a ring buffer such that
3531
// /connz can inspect. Useful for debugging, etc.
3532
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
12,433✔
3533
        now := time.Now()
12,433✔
3534

12,433✔
3535
        s.accountDisconnectEvent(c, now, reason.String())
12,433✔
3536

12,433✔
3537
        c.mu.Lock()
12,433✔
3538

12,433✔
3539
        cc := &closedClient{}
12,433✔
3540
        cc.fill(c, nc, now, false)
12,433✔
3541
        // Note that cc.fill is using len(c.subs), which may have been set to nil by now,
12,433✔
3542
        // so replace cc.NumSubs with len(subs).
12,433✔
3543
        cc.NumSubs = uint32(len(subs))
12,433✔
3544
        cc.Stop = &now
12,433✔
3545
        cc.Reason = reason.String()
12,433✔
3546

12,433✔
3547
        // Do subs, do not place by default in main ConnInfo
12,433✔
3548
        if len(subs) > 0 {
22,252✔
3549
                cc.subs = make([]SubDetail, 0, len(subs))
9,819✔
3550
                for _, sub := range subs {
159,669✔
3551
                        cc.subs = append(cc.subs, newSubDetail(sub))
149,850✔
3552
                }
149,850✔
3553
        }
3554
        // Hold user as well.
3555
        cc.user = c.getRawAuthUser()
12,433✔
3556
        // Hold account name if not the global account.
12,433✔
3557
        if c.acc != nil && c.acc.Name != globalAccountName {
18,953✔
3558
                cc.acc = c.acc.Name
6,520✔
3559
        }
6,520✔
3560
        cc.JWT = redactBearerJWT(c.opts.JWT)
12,433✔
3561
        cc.IssuerKey = issuerForClient(c)
12,433✔
3562
        cc.Tags = c.tags
12,433✔
3563
        cc.NameTag = c.nameTag
12,433✔
3564
        c.mu.Unlock()
12,433✔
3565

12,433✔
3566
        // Place in the ring buffer
12,433✔
3567
        s.mu.Lock()
12,433✔
3568
        if s.closed != nil {
24,865✔
3569
                s.closed.append(cc)
12,432✔
3570
        }
12,432✔
3571
        s.mu.Unlock()
12,433✔
3572
}
3573

3574
// Adds to the list of client and websocket clients connect URLs.
3575
// If there was a change, an INFO protocol is sent to registered clients
3576
// that support async INFO protocols.
3577
// Server lock held on entry.
3578
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,242✔
3579
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
9,242✔
3580
}
9,242✔
3581

3582
// Removes from the list of client and websocket clients connect URLs.
3583
// If there was a change, an INFO protocol is sent to registered clients
3584
// that support async INFO protocols.
3585
// Server lock held on entry.
3586
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,237✔
3587
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
9,237✔
3588
}
9,237✔
3589

3590
// Updates the list of client and websocket clients connect URLs and if any change
3591
// sends an async INFO update to clients that support it.
3592
// Server lock held on entry.
3593
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
18,479✔
3594
        remove := !add
18,479✔
3595
        // Will return true if we need alter the server's Info object.
18,479✔
3596
        updateMap := func(urls []string, m refCountedUrlSet) bool {
55,437✔
3597
                wasUpdated := false
36,958✔
3598
                for _, url := range urls {
55,846✔
3599
                        if add && m.addUrl(url) {
28,326✔
3600
                                wasUpdated = true
9,438✔
3601
                        } else if remove && m.removeUrl(url) {
28,326✔
3602
                                wasUpdated = true
9,438✔
3603
                        }
9,438✔
3604
                }
3605
                return wasUpdated
36,958✔
3606
        }
3607
        cliUpdated := updateMap(curls, s.clientConnectURLsMap)
18,479✔
3608
        wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
18,479✔
3609

18,479✔
3610
        updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
37,267✔
3611
                // Recreate the info's slice from the map
18,788✔
3612
                *infoURLs = (*infoURLs)[:0]
18,788✔
3613
                // Add this server client connect ULRs first...
18,788✔
3614
                *infoURLs = append(*infoURLs, urls...)
18,788✔
3615
                // Then the ones from the map
18,788✔
3616
                for url := range m {
43,847✔
3617
                        *infoURLs = append(*infoURLs, url)
25,059✔
3618
                }
25,059✔
3619
        }
3620
        if cliUpdated {
36,931✔
3621
                updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
18,452✔
3622
        }
18,452✔
3623
        if wsUpdated {
18,815✔
3624
                updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
336✔
3625
        }
336✔
3626
        if cliUpdated || wsUpdated {
36,931✔
3627
                // Send to all registered clients that support async INFO protocols.
18,452✔
3628
                s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
18,452✔
3629
        }
18,452✔
3630
}
3631

3632
// Handle closing down a connection when the handshake has timedout.
3633
func tlsTimeout(c *client, conn *tls.Conn) {
38✔
3634
        c.mu.Lock()
38✔
3635
        closed := c.isClosed()
38✔
3636
        c.mu.Unlock()
38✔
3637
        // Check if already closed
38✔
3638
        if closed {
47✔
3639
                return
9✔
3640
        }
9✔
3641
        cs := conn.ConnectionState()
29✔
3642
        if !cs.HandshakeComplete {
32✔
3643
                c.Errorf("TLS handshake timeout")
3✔
3644
                c.sendErr("Secure Connection - TLS Required")
3✔
3645
                c.closeConnection(TLSHandshakeError)
3✔
3646
        }
3✔
3647
}
3648

3649
// Seems silly we have to write these
3650
func tlsVersion(ver uint16) string {
1,202✔
3651
        switch ver {
1,202✔
3652
        case tls.VersionTLS10:
×
3653
                return "1.0"
×
3654
        case tls.VersionTLS11:
×
3655
                return "1.1"
×
3656
        case tls.VersionTLS12:
×
3657
                return "1.2"
×
3658
        case tls.VersionTLS13:
1,202✔
3659
                return "1.3"
1,202✔
3660
        }
3661
        return fmt.Sprintf("Unknown [0x%x]", ver)
×
3662
}
3663

3664
func tlsVersionFromString(ver string) (uint16, error) {
×
3665
        switch ver {
×
3666
        case "1.0":
×
3667
                return tls.VersionTLS10, nil
×
3668
        case "1.1":
×
3669
                return tls.VersionTLS11, nil
×
3670
        case "1.2":
×
3671
                return tls.VersionTLS12, nil
×
3672
        case "1.3":
×
3673
                return tls.VersionTLS13, nil
×
3674
        }
3675
        return 0, fmt.Errorf("unknown version: %v", ver)
×
3676
}
3677

3678
// Remove a client or route from our internal accounting.
3679
func (s *Server) removeClient(c *client) {
151,903✔
3680
        // kind is immutable, so can check without lock
151,903✔
3681
        switch c.kind {
151,903✔
3682
        case CLIENT:
10,753✔
3683
                c.mu.Lock()
10,753✔
3684
                cid := c.cid
10,753✔
3685
                updateProtoInfoCount := false
10,753✔
3686
                if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
19,645✔
3687
                        updateProtoInfoCount = true
8,892✔
3688
                }
8,892✔
3689
                proxyKey := c.proxyKey
10,753✔
3690
                c.mu.Unlock()
10,753✔
3691

10,753✔
3692
                s.mu.Lock()
10,753✔
3693
                delete(s.clients, cid)
10,753✔
3694
                if updateProtoInfoCount {
19,645✔
3695
                        s.cproto--
8,892✔
3696
                }
8,892✔
3697
                if proxyKey != _EMPTY_ {
10,757✔
3698
                        s.removeProxiedConn(proxyKey, cid)
4✔
3699
                }
4✔
3700
                s.mu.Unlock()
10,753✔
3701
        case ROUTER:
63,747✔
3702
                s.removeRoute(c)
63,747✔
3703
        case GATEWAY:
3,709✔
3704
                s.removeRemoteGatewayConnection(c)
3,709✔
3705
        case LEAF:
1,684✔
3706
                s.removeLeafNodeConnection(c)
1,684✔
3707
        }
3708
}
3709

3710
// Remove the connection with id `cid` from the map of connections
3711
// under the public key `key` of the trusted proxies.
3712
//
3713
// Server lock must be held on entry.
3714
func (s *Server) removeProxiedConn(key string, cid uint64) {
8✔
3715
        conns := s.proxiedConns[key]
8✔
3716
        delete(conns, cid)
8✔
3717
        if len(conns) == 0 {
16✔
3718
                delete(s.proxiedConns, key)
8✔
3719
        }
8✔
3720
}
3721

3722
func (s *Server) removeFromTempClients(cid uint64) {
108,194✔
3723
        s.grMu.Lock()
108,194✔
3724
        delete(s.grTmpClients, cid)
108,194✔
3725
        s.grMu.Unlock()
108,194✔
3726
}
108,194✔
3727

3728
func (s *Server) addToTempClients(cid uint64, c *client) bool {
68,826✔
3729
        added := false
68,826✔
3730
        s.grMu.Lock()
68,826✔
3731
        if s.grRunning {
137,649✔
3732
                s.grTmpClients[cid] = c
68,823✔
3733
                added = true
68,823✔
3734
        }
68,823✔
3735
        s.grMu.Unlock()
68,826✔
3736
        return added
68,826✔
3737
}
3738

3739
/////////////////////////////////////////////////////////////////
3740
// These are some helpers for accounting in functional tests.
3741
/////////////////////////////////////////////////////////////////
3742

3743
// NumRoutes will report the number of registered routes.
3744
func (s *Server) NumRoutes() int {
5,776✔
3745
        s.mu.RLock()
5,776✔
3746
        defer s.mu.RUnlock()
5,776✔
3747
        return s.numRoutes()
5,776✔
3748
}
5,776✔
3749

3750
// numRoutes will report the number of registered routes.
3751
// Server lock held on entry
3752
func (s *Server) numRoutes() int {
774,705✔
3753
        var nr int
774,705✔
3754
        s.forEachRoute(func(c *client) {
1,488,537✔
3755
                nr++
713,832✔
3756
        })
713,832✔
3757
        return nr
774,705✔
3758
}
3759

3760
// NumRemotes will report number of registered remotes.
3761
func (s *Server) NumRemotes() int {
×
3762
        s.mu.RLock()
×
3763
        defer s.mu.RUnlock()
×
3764
        return s.numRemotes()
×
3765
}
×
3766

3767
// numRemotes will report number of registered remotes.
3768
// Server lock held on entry
3769
func (s *Server) numRemotes() int {
28,391✔
3770
        return len(s.routes)
28,391✔
3771
}
28,391✔
3772

3773
// NumLeafNodes will report number of leaf node connections.
3774
func (s *Server) NumLeafNodes() int {
3,767✔
3775
        s.mu.RLock()
3,767✔
3776
        defer s.mu.RUnlock()
3,767✔
3777
        return len(s.leafs)
3,767✔
3778
}
3,767✔
3779

3780
// NumClients will report the number of registered clients.
3781
func (s *Server) NumClients() int {
63✔
3782
        s.mu.RLock()
63✔
3783
        defer s.mu.RUnlock()
63✔
3784
        return len(s.clients)
63✔
3785
}
63✔
3786

3787
// GetClient will return the client associated with cid.
3788
func (s *Server) GetClient(cid uint64) *client {
130✔
3789
        return s.getClient(cid)
130✔
3790
}
130✔
3791

3792
// getClient will return the client associated with cid.
3793
func (s *Server) getClient(cid uint64) *client {
147✔
3794
        s.mu.RLock()
147✔
3795
        defer s.mu.RUnlock()
147✔
3796
        return s.clients[cid]
147✔
3797
}
147✔
3798

3799
// GetLeafNode returns the leafnode associated with the cid.
3800
func (s *Server) GetLeafNode(cid uint64) *client {
1✔
3801
        s.mu.RLock()
1✔
3802
        defer s.mu.RUnlock()
1✔
3803
        return s.leafs[cid]
1✔
3804
}
1✔
3805

3806
// NumSubscriptions will report how many subscriptions are active.
3807
func (s *Server) NumSubscriptions() uint32 {
364✔
3808
        s.mu.RLock()
364✔
3809
        defer s.mu.RUnlock()
364✔
3810
        return s.numSubscriptions()
364✔
3811
}
364✔
3812

3813
// numSubscriptions will report how many subscriptions are active.
3814
// Lock should be held.
3815
func (s *Server) numSubscriptions() uint32 {
27,158✔
3816
        var subs int
27,158✔
3817
        s.accounts.Range(func(k, v any) bool {
88,143✔
3818
                acc := v.(*Account)
60,985✔
3819
                subs += acc.TotalSubs()
60,985✔
3820
                return true
60,985✔
3821
        })
60,985✔
3822
        return uint32(subs)
27,158✔
3823
}
3824

3825
// NumSlowConsumers will report the number of slow consumers.
3826
func (s *Server) NumSlowConsumers() int64 {
1✔
3827
        return atomic.LoadInt64(&s.slowConsumers)
1✔
3828
}
1✔
3829

3830
// NumStalledClients will report the total number of times clients have been stalled.
3831
func (s *Server) NumStalledClients() int64 {
×
3832
        return atomic.LoadInt64(&s.stalls)
×
3833
}
×
3834

3835
// NumSlowConsumersClients will report the number of slow consumers clients.
3836
func (s *Server) NumSlowConsumersClients() uint64 {
36,767✔
3837
        return s.scStats.clients.Load()
36,767✔
3838
}
36,767✔
3839

3840
// NumSlowConsumersRoutes will report the number of slow consumers routes.
3841
func (s *Server) NumSlowConsumersRoutes() uint64 {
36,767✔
3842
        return s.scStats.routes.Load()
36,767✔
3843
}
36,767✔
3844

3845
// NumSlowConsumersGateways will report the number of slow consumers leafs.
3846
func (s *Server) NumSlowConsumersGateways() uint64 {
36,769✔
3847
        return s.scStats.gateways.Load()
36,769✔
3848
}
36,769✔
3849

3850
// NumSlowConsumersLeafs will report the number of slow consumers leafs.
3851
func (s *Server) NumSlowConsumersLeafs() uint64 {
36,768✔
3852
        return s.scStats.leafs.Load()
36,768✔
3853
}
36,768✔
3854

3855
// NumStaleConnections will report the number of stale connections.
3856
func (s *Server) NumStaleConnections() int64 {
4✔
3857
        return atomic.LoadInt64(&s.staleConnections)
4✔
3858
}
4✔
3859

3860
// NumStaleConnectionsClients will report the number of stale client connections.
3861
func (s *Server) NumStaleConnectionsClients() uint64 {
36,771✔
3862
        return s.staleStats.clients.Load()
36,771✔
3863
}
36,771✔
3864

3865
// NumStaleConnectionsRoutes will report the number of stale route connections.
3866
func (s *Server) NumStaleConnectionsRoutes() uint64 {
36,771✔
3867
        return s.staleStats.routes.Load()
36,771✔
3868
}
36,771✔
3869

3870
// NumStaleConnectionsGateways will report the number of stale gateway connections.
3871
func (s *Server) NumStaleConnectionsGateways() uint64 {
36,771✔
3872
        return s.staleStats.gateways.Load()
36,771✔
3873
}
36,771✔
3874

3875
// NumStaleConnectionsLeafs will report the number of stale leaf connections.
3876
func (s *Server) NumStaleConnectionsLeafs() uint64 {
36,771✔
3877
        return s.staleStats.leafs.Load()
36,771✔
3878
}
36,771✔
3879

3880
// ConfigTime will report the last time the server configuration was loaded.
3881
func (s *Server) ConfigTime() time.Time {
×
3882
        s.mu.RLock()
×
3883
        defer s.mu.RUnlock()
×
3884
        return s.configTime
×
3885
}
×
3886

3887
// Addr will return the net.Addr object for the current listener.
3888
func (s *Server) Addr() net.Addr {
216✔
3889
        s.mu.RLock()
216✔
3890
        defer s.mu.RUnlock()
216✔
3891
        if s.listener == nil {
216✔
3892
                return nil
×
3893
        }
×
3894
        return s.listener.Addr()
216✔
3895
}
3896

3897
// MonitorAddr will return the net.Addr object for the monitoring listener.
3898
func (s *Server) MonitorAddr() *net.TCPAddr {
139✔
3899
        s.mu.RLock()
139✔
3900
        defer s.mu.RUnlock()
139✔
3901
        if s.http == nil {
139✔
3902
                return nil
×
3903
        }
×
3904
        return s.http.Addr().(*net.TCPAddr)
139✔
3905
}
3906

3907
// ClusterAddr returns the net.Addr object for the route listener.
3908
func (s *Server) ClusterAddr() *net.TCPAddr {
29✔
3909
        s.mu.RLock()
29✔
3910
        defer s.mu.RUnlock()
29✔
3911
        if s.routeListener == nil {
29✔
3912
                return nil
×
3913
        }
×
3914
        return s.routeListener.Addr().(*net.TCPAddr)
29✔
3915
}
3916

3917
// ProfilerAddr returns the net.Addr object for the profiler listener.
3918
func (s *Server) ProfilerAddr() *net.TCPAddr {
×
3919
        s.mu.RLock()
×
3920
        defer s.mu.RUnlock()
×
3921
        if s.profiler == nil {
×
3922
                return nil
×
3923
        }
×
3924
        return s.profiler.Addr().(*net.TCPAddr)
×
3925
}
3926

3927
func (s *Server) readyForConnections(d time.Duration) error {
10,540✔
3928
        // Snapshot server options.
10,540✔
3929
        opts := s.getOpts()
10,540✔
3930

10,540✔
3931
        type info struct {
10,540✔
3932
                ok  bool
10,540✔
3933
                err error
10,540✔
3934
        }
10,540✔
3935
        chk := make(map[string]info)
10,540✔
3936

10,540✔
3937
        end := time.Now().Add(d)
10,540✔
3938
        for time.Now().Before(end) {
53,390✔
3939
                s.mu.RLock()
42,850✔
3940
                chk["server"] = info{ok: s.listener != nil || opts.DontListen, err: s.listenerErr}
42,850✔
3941
                chk["route"] = info{ok: (opts.Cluster.Port == 0 || s.routeListener != nil), err: s.routeListenerErr}
42,850✔
3942
                chk["gateway"] = info{ok: (opts.Gateway.Name == _EMPTY_ || s.gatewayListener != nil), err: s.gatewayListenerErr}
42,850✔
3943
                chk["leafnode"] = info{ok: (opts.LeafNode.Port == 0 || s.leafNodeListener != nil), err: s.leafNodeListenerErr}
42,850✔
3944
                chk["websocket"] = info{ok: (opts.Websocket.Port == 0 || s.websocket.listener != nil), err: s.websocket.listenerErr}
42,850✔
3945
                chk["mqtt"] = info{ok: (opts.MQTT.Port == 0 || s.mqtt.listener != nil), err: s.mqtt.listenerErr}
42,850✔
3946
                s.mu.RUnlock()
42,850✔
3947

42,850✔
3948
                var numOK int
42,850✔
3949
                for _, inf := range chk {
299,950✔
3950
                        if inf.ok {
480,830✔
3951
                                numOK++
223,730✔
3952
                        }
223,730✔
3953
                }
3954
                if numOK == len(chk) {
53,382✔
3955
                        // In the case of DontListen option (no accept loop), we still want
10,532✔
3956
                        // to make sure that Start() has done all the work, so we wait on
10,532✔
3957
                        // that.
10,532✔
3958
                        if opts.DontListen {
10,532✔
3959
                                select {
×
3960
                                case <-s.startupComplete:
×
3961
                                case <-time.After(d):
×
3962
                                        return fmt.Errorf("failed to be ready for connections after %s: startup did not complete", d)
×
3963
                                }
3964
                        }
3965
                        return nil
10,532✔
3966
                }
3967
                if d > 25*time.Millisecond {
37,289✔
3968
                        time.Sleep(25 * time.Millisecond)
4,971✔
3969
                }
4,971✔
3970
        }
3971

3972
        failed := make([]string, 0, len(chk))
8✔
3973
        for name, inf := range chk {
56✔
3974
                if inf.ok && inf.err != nil {
48✔
3975
                        failed = append(failed, fmt.Sprintf("%s(ok, but %s)", name, inf.err))
×
3976
                }
×
3977
                if !inf.ok && inf.err == nil {
58✔
3978
                        failed = append(failed, name)
10✔
3979
                }
10✔
3980
                if !inf.ok && inf.err != nil {
48✔
3981
                        failed = append(failed, fmt.Sprintf("%s(%s)", name, inf.err))
×
3982
                }
×
3983
        }
3984

3985
        return fmt.Errorf(
8✔
3986
                "failed to be ready for connections after %s: %s",
8✔
3987
                d, strings.Join(failed, ", "),
8✔
3988
        )
8✔
3989
}
3990

3991
// ReadyForConnections returns `true` if the server is ready to accept clients
3992
// and, if routing is enabled, route connections. If after the duration
3993
// `dur` the server is still not ready, returns `false`.
3994
func (s *Server) ReadyForConnections(dur time.Duration) bool {
806✔
3995
        return s.readyForConnections(dur) == nil
806✔
3996
}
806✔
3997

3998
// Quick utility to function to tell if the server supports headers.
3999
func (s *Server) supportsHeaders() bool {
187,926✔
4000
        if s == nil {
187,926✔
4001
                return false
×
4002
        }
×
4003
        return !(s.getOpts().NoHeaderSupport)
187,926✔
4004
}
4005

4006
// ID returns the server's ID
4007
func (s *Server) ID() string {
55,513✔
4008
        return s.info.ID
55,513✔
4009
}
55,513✔
4010

4011
// NodeName returns the node name for this server.
4012
func (s *Server) NodeName() string {
226✔
4013
        return getHash(s.info.Name)
226✔
4014
}
226✔
4015

4016
// Name returns the server's name. This will be the same as the ID if it was not set.
4017
func (s *Server) Name() string {
212,982✔
4018
        return s.info.Name
212,982✔
4019
}
212,982✔
4020

4021
func (s *Server) String() string {
4,837✔
4022
        return s.info.Name
4,837✔
4023
}
4,837✔
4024

4025
type pprofLabels map[string]string
4026

4027
func setGoRoutineLabels(tags ...pprofLabels) {
376,285✔
4028
        var labels []string
376,285✔
4029
        for _, m := range tags {
430,062✔
4030
                for k, v := range m {
245,225✔
4031
                        labels = append(labels, k, v)
191,448✔
4032
                }
191,448✔
4033
        }
4034
        if len(labels) > 0 {
430,062✔
4035
                pprof.SetGoroutineLabels(
53,777✔
4036
                        pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
53,777✔
4037
                )
53,777✔
4038
        }
53,777✔
4039
}
4040

4041
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
353,627✔
4042
        var started bool
353,627✔
4043
        s.grMu.Lock()
353,627✔
4044
        defer s.grMu.Unlock()
353,627✔
4045
        if s.grRunning {
707,142✔
4046
                s.grWG.Add(1)
353,515✔
4047
                go func() {
707,030✔
4048
                        setGoRoutineLabels(tags...)
353,515✔
4049
                        f()
353,515✔
4050
                }()
353,515✔
4051
                started = true
353,515✔
4052
        }
4053
        return started
353,627✔
4054
}
4055

4056
func (s *Server) numClosedConns() int {
102✔
4057
        s.mu.RLock()
102✔
4058
        defer s.mu.RUnlock()
102✔
4059
        return s.closed.len()
102✔
4060
}
102✔
4061

4062
func (s *Server) totalClosedConns() uint64 {
43✔
4063
        s.mu.RLock()
43✔
4064
        defer s.mu.RUnlock()
43✔
4065
        return s.closed.totalConns()
43✔
4066
}
43✔
4067

4068
func (s *Server) closedClients() []*closedClient {
8✔
4069
        s.mu.RLock()
8✔
4070
        defer s.mu.RUnlock()
8✔
4071
        return s.closed.closedClients()
8✔
4072
}
8✔
4073

4074
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
4075
// port based on the server options' Host and Port. If the Host corresponds to
4076
// "any" interfaces, this call returns the list of resolved IP addresses.
4077
// If ClientAdvertise is set, returns the client advertise host and port.
4078
// The server lock is assumed held on entry.
4079
func (s *Server) getClientConnectURLs() []string {
6,767✔
4080
        // Snapshot server options.
6,767✔
4081
        opts := s.getOpts()
6,767✔
4082
        // Ignore error here since we know that if there is client advertise, the
6,767✔
4083
        // parseHostPort is correct because we did it right before calling this
6,767✔
4084
        // function in Server.New().
6,767✔
4085
        urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
6,767✔
4086
        return urls
6,767✔
4087
}
6,767✔
4088

4089
// Generic version that will return an array of URLs based on the given
4090
// advertise, host and port values.
4091
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
6,888✔
4092
        urls := make([]string, 0, 1)
6,888✔
4093

6,888✔
4094
        // short circuit if advertise is set
6,888✔
4095
        if advertise != "" {
6,890✔
4096
                h, p, err := parseHostPort(advertise, port)
2✔
4097
                if err != nil {
2✔
4098
                        return nil, err
×
4099
                }
×
4100
                urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
2✔
4101
        } else {
6,886✔
4102
                sPort := strconv.Itoa(port)
6,886✔
4103
                _, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
6,886✔
4104
                for _, ip := range ips {
8,038✔
4105
                        urls = append(urls, net.JoinHostPort(ip, sPort))
1,152✔
4106
                }
1,152✔
4107
                if err != nil || len(urls) == 0 {
13,196✔
4108
                        // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
6,310✔
4109
                        // reason we could not add any URL in the loop above.
6,310✔
4110
                        // We had a case where a Windows VM was hosed and would have err == nil
6,310✔
4111
                        // and not add any address in the array in the loop above, and we
6,310✔
4112
                        // ended-up returning 0.0.0.0, which is problematic for Windows clients.
6,310✔
4113
                        // Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
6,310✔
4114
                        if host == "0.0.0.0" || host == "::" {
6,310✔
4115
                                s.Errorf("Address %q can not be resolved properly", host)
×
4116
                        } else {
6,310✔
4117
                                urls = append(urls, net.JoinHostPort(host, sPort))
6,310✔
4118
                        }
6,310✔
4119
                }
4120
        }
4121
        return urls, nil
6,888✔
4122
}
4123

4124
// Returns an array of non local IPs if the provided host is
4125
// 0.0.0.0 or ::. It returns the first resolved if `all` is
4126
// false.
4127
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
4128
// so that if the returned array is empty caller can decide
4129
// what to do next.
4130
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
11,917✔
4131
        ip := net.ParseIP(host)
11,917✔
4132
        // If this is not an IP, we are done
11,917✔
4133
        if ip == nil {
11,940✔
4134
                return false, nil, nil
23✔
4135
        }
23✔
4136
        // If this is not 0.0.0.0 or :: we have nothing to do.
4137
        if !ip.IsUnspecified() {
22,871✔
4138
                return false, nil, nil
10,977✔
4139
        }
10,977✔
4140
        s.Debugf("Get non local IPs for %q", host)
917✔
4141
        var ips []string
917✔
4142
        ifaces, _ := net.Interfaces()
917✔
4143
        for _, i := range ifaces {
4,585✔
4144
                addrs, _ := i.Addrs()
3,668✔
4145
                for _, addr := range addrs {
7,912✔
4146
                        switch v := addr.(type) {
4,244✔
4147
                        case *net.IPNet:
4,244✔
4148
                                ip = v.IP
4,244✔
4149
                        case *net.IPAddr:
×
4150
                                ip = v.IP
×
4151
                        }
4152
                        ipStr := ip.String()
4,244✔
4153
                        // Skip non global unicast addresses
4,244✔
4154
                        if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
6,654✔
4155
                                ip = nil
2,410✔
4156
                                continue
2,410✔
4157
                        }
4158
                        s.Debugf("  ip=%s", ipStr)
1,834✔
4159
                        ips = append(ips, ipStr)
1,834✔
4160
                        if !all {
2,516✔
4161
                                break
682✔
4162
                        }
4163
                }
4164
        }
4165
        return true, ips, nil
917✔
4166
}
4167

4168
// if the ip is not specified, attempt to resolve it
4169
func resolveHostPorts(addr net.Listener) []string {
15✔
4170
        hostPorts := make([]string, 0)
15✔
4171
        hp := addr.Addr().(*net.TCPAddr)
15✔
4172
        port := strconv.Itoa(hp.Port)
15✔
4173
        if hp.IP.IsUnspecified() {
22✔
4174
                var ip net.IP
7✔
4175
                ifaces, _ := net.Interfaces()
7✔
4176
                for _, i := range ifaces {
35✔
4177
                        addrs, _ := i.Addrs()
28✔
4178
                        for _, addr := range addrs {
63✔
4179
                                switch v := addr.(type) {
35✔
4180
                                case *net.IPNet:
35✔
4181
                                        ip = v.IP
35✔
4182
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
35✔
4183
                                case *net.IPAddr:
×
4184
                                        ip = v.IP
×
4185
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
×
4186
                                default:
×
4187
                                        continue
×
4188
                                }
4189
                        }
4190
                }
4191
        } else {
8✔
4192
                hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
8✔
4193
        }
8✔
4194
        return hostPorts
15✔
4195
}
4196

4197
// format the address of a net.Listener with a protocol
4198
func formatURL(protocol string, addr net.Listener) []string {
15✔
4199
        hostports := resolveHostPorts(addr)
15✔
4200
        for i, hp := range hostports {
58✔
4201
                hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
43✔
4202
        }
43✔
4203
        return hostports
15✔
4204
}
4205

4206
// Ports describes URLs that the server can be contacted in
4207
type Ports struct {
4208
        Nats       []string `json:"nats,omitempty"`
4209
        Monitoring []string `json:"monitoring,omitempty"`
4210
        Cluster    []string `json:"cluster,omitempty"`
4211
        Profile    []string `json:"profile,omitempty"`
4212
        WebSocket  []string `json:"websocket,omitempty"`
4213
}
4214

4215
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
4216
// resolved, it returns nil. Otherwise it returns a Ports struct
4217
// describing ports where the server can be contacted
4218
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
6✔
4219
        if s.readyForListeners(maxWait) {
12✔
4220
                opts := s.getOpts()
6✔
4221

6✔
4222
                s.mu.RLock()
6✔
4223
                tls := s.info.TLSRequired
6✔
4224
                listener := s.listener
6✔
4225
                httpListener := s.http
6✔
4226
                clusterListener := s.routeListener
6✔
4227
                profileListener := s.profiler
6✔
4228
                wsListener := s.websocket.listener
6✔
4229
                wss := s.websocket.tls
6✔
4230
                s.mu.RUnlock()
6✔
4231

6✔
4232
                ports := Ports{}
6✔
4233

6✔
4234
                if listener != nil {
12✔
4235
                        natsProto := "nats"
6✔
4236
                        if tls {
7✔
4237
                                natsProto = "tls"
1✔
4238
                        }
1✔
4239
                        ports.Nats = formatURL(natsProto, listener)
6✔
4240
                }
4241

4242
                if httpListener != nil {
8✔
4243
                        monProto := "http"
2✔
4244
                        if opts.HTTPSPort != 0 {
2✔
4245
                                monProto = "https"
×
4246
                        }
×
4247
                        ports.Monitoring = formatURL(monProto, httpListener)
2✔
4248
                }
4249

4250
                if clusterListener != nil {
8✔
4251
                        clusterProto := "nats"
2✔
4252
                        if opts.Cluster.TLSConfig != nil {
2✔
4253
                                clusterProto = "tls"
×
4254
                        }
×
4255
                        ports.Cluster = formatURL(clusterProto, clusterListener)
2✔
4256
                }
4257

4258
                if profileListener != nil {
8✔
4259
                        ports.Profile = formatURL("http", profileListener)
2✔
4260
                }
2✔
4261

4262
                if wsListener != nil {
9✔
4263
                        protocol := wsSchemePrefix
3✔
4264
                        if wss {
6✔
4265
                                protocol = wsSchemePrefixTLS
3✔
4266
                        }
3✔
4267
                        ports.WebSocket = formatURL(protocol, wsListener)
3✔
4268
                }
4269

4270
                return &ports
6✔
4271
        }
4272

4273
        return nil
×
4274
}
4275

4276
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
4277
// path is used instead of the server option value
4278
func (s *Server) portFile(dirHint string) string {
6✔
4279
        dirname := s.getOpts().PortsFileDir
6✔
4280
        if dirHint != "" {
12✔
4281
                dirname = dirHint
6✔
4282
        }
6✔
4283
        if dirname == _EMPTY_ {
6✔
4284
                return _EMPTY_
×
4285
        }
×
4286
        return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
6✔
4287
}
4288

4289
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
4290
// path is used instead of the server option value
4291
func (s *Server) deletePortsFile(hintDir string) {
3✔
4292
        portsFile := s.portFile(hintDir)
3✔
4293
        if portsFile != "" {
6✔
4294
                if err := os.Remove(portsFile); err != nil {
3✔
4295
                        s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
×
4296
                }
×
4297
        }
4298
}
4299

4300
// Writes a file with a serialized Ports to the specified ports_file_dir.
4301
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
4302
// if ports file is not set, this function has no effect
4303
func (s *Server) logPorts() {
3✔
4304
        opts := s.getOpts()
3✔
4305
        portsFile := s.portFile(opts.PortsFileDir)
3✔
4306
        if portsFile != _EMPTY_ {
6✔
4307
                go func() {
6✔
4308
                        info := s.PortsInfo(5 * time.Second)
3✔
4309
                        if info == nil {
3✔
4310
                                s.Errorf("Unable to resolve the ports in the specified time")
×
4311
                                return
×
4312
                        }
×
4313
                        data, err := json.Marshal(info)
3✔
4314
                        if err != nil {
3✔
4315
                                s.Errorf("Error marshaling ports file: %v", err)
×
4316
                                return
×
4317
                        }
×
4318
                        if err := os.WriteFile(portsFile, data, 0666); err != nil {
3✔
4319
                                s.Errorf("Error writing ports file (%s): %v", portsFile, err)
×
4320
                                return
×
4321
                        }
×
4322

4323
                }()
4324
        }
4325
}
4326

4327
// waits until a calculated list of listeners is resolved or a timeout
4328
func (s *Server) readyForListeners(dur time.Duration) bool {
6✔
4329
        end := time.Now().Add(dur)
6✔
4330
        for time.Now().Before(end) {
12✔
4331
                s.mu.RLock()
6✔
4332
                listeners := s.serviceListeners()
6✔
4333
                s.mu.RUnlock()
6✔
4334
                if len(listeners) == 0 {
6✔
4335
                        return false
×
4336
                }
×
4337

4338
                ok := true
6✔
4339
                for _, l := range listeners {
21✔
4340
                        if l == nil {
15✔
4341
                                ok = false
×
4342
                                break
×
4343
                        }
4344
                }
4345
                if ok {
12✔
4346
                        return true
6✔
4347
                }
6✔
4348
                select {
×
4349
                case <-s.quitCh:
×
4350
                        return false
×
4351
                case <-time.After(25 * time.Millisecond):
×
4352
                        // continue - unable to select from quit - we are still running
4353
                }
4354
        }
4355
        return false
×
4356
}
4357

4358
// returns a list of listeners that are intended for the process
4359
// if the entry is nil, the interface is yet to be resolved
4360
func (s *Server) serviceListeners() []net.Listener {
6✔
4361
        listeners := make([]net.Listener, 0)
6✔
4362
        opts := s.getOpts()
6✔
4363
        listeners = append(listeners, s.listener)
6✔
4364
        if opts.Cluster.Port != 0 {
8✔
4365
                listeners = append(listeners, s.routeListener)
2✔
4366
        }
2✔
4367
        if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
8✔
4368
                listeners = append(listeners, s.http)
2✔
4369
        }
2✔
4370
        if opts.ProfPort != 0 {
8✔
4371
                listeners = append(listeners, s.profiler)
2✔
4372
        }
2✔
4373
        if opts.Websocket.Port != 0 {
9✔
4374
                listeners = append(listeners, s.websocket.listener)
3✔
4375
        }
3✔
4376
        return listeners
6✔
4377
}
4378

4379
// Returns true if in lame duck mode.
4380
func (s *Server) isLameDuckMode() bool {
21,877✔
4381
        s.mu.RLock()
21,877✔
4382
        defer s.mu.RUnlock()
21,877✔
4383
        return s.ldm
21,877✔
4384
}
21,877✔
4385

4386
// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
4387
// the client listener is closed, existing client connections are
4388
// kicked, Raft leaderships are transferred, JetStream is shutdown
4389
// and then finally shutdown the the NATS Server itself.
4390
// This function blocks and will not return until the NATS Server
4391
// has completed the entire shutdown operation.
4392
func (s *Server) LameDuckShutdown() {
1✔
4393
        s.lameDuckMode()
1✔
4394
}
1✔
4395

4396
// This function will close the client listener then close the clients
4397
// at some interval to avoid a reconnect storm.
4398
// We will also transfer any raft leaders and shutdown JetStream.
4399
func (s *Server) lameDuckMode() {
6✔
4400
        s.mu.Lock()
6✔
4401
        // Check if there is actually anything to do
6✔
4402
        if s.isShuttingDown() || s.ldm || s.listener == nil {
6✔
4403
                s.mu.Unlock()
×
4404
                return
×
4405
        }
×
4406
        s.Noticef("Entering lame duck mode, stop accepting new clients")
6✔
4407
        s.ldm = true
6✔
4408
        s.sendLDMShutdownEventLocked()
6✔
4409
        expected := 1
6✔
4410
        s.listener.Close()
6✔
4411
        s.listener = nil
6✔
4412
        expected += s.closeWebsocketServer()
6✔
4413
        s.ldmCh = make(chan bool, expected)
6✔
4414
        opts := s.getOpts()
6✔
4415
        gp := opts.LameDuckGracePeriod
6✔
4416
        // For tests, we want the grace period to be in some cases bigger
6✔
4417
        // than the ldm duration, so to by-pass the validateOptions() check,
6✔
4418
        // we use negative number and flip it here.
6✔
4419
        if gp < 0 {
11✔
4420
                gp *= -1
5✔
4421
        }
5✔
4422
        s.mu.Unlock()
6✔
4423

6✔
4424
        // If we are running any raftNodes transfer leaders.
6✔
4425
        if hadTransfers := s.transferRaftLeaders(); hadTransfers {
10✔
4426
                // They will transfer leadership quickly, but wait here for a second.
4✔
4427
                select {
4✔
4428
                case <-time.After(time.Second):
4✔
4429
                case <-s.quitCh:
×
4430
                        return
×
4431
                }
4432
        }
4433

4434
        // Now check and shutdown jetstream.
4435
        s.shutdownJetStream()
6✔
4436

6✔
4437
        // Now shutdown the nodes
6✔
4438
        s.shutdownRaftNodes()
6✔
4439

6✔
4440
        // Wait for accept loops to be done to make sure that no new
6✔
4441
        // client can connect
6✔
4442
        for i := 0; i < expected; i++ {
12✔
4443
                <-s.ldmCh
6✔
4444
        }
6✔
4445

4446
        s.mu.Lock()
6✔
4447
        // Need to recheck few things
6✔
4448
        if s.isShuttingDown() || len(s.clients) == 0 {
6✔
4449
                s.mu.Unlock()
×
4450
                // If there is no client, we need to call Shutdown() to complete
×
4451
                // the LDMode. If server has been shutdown while lock was released,
×
4452
                // calling Shutdown() should be no-op.
×
4453
                s.Shutdown()
×
4454
                return
×
4455
        }
×
4456
        dur := int64(opts.LameDuckDuration)
6✔
4457
        dur -= int64(gp)
6✔
4458
        if dur <= 0 {
11✔
4459
                dur = int64(time.Second)
5✔
4460
        }
5✔
4461
        numClients := int64(len(s.clients))
6✔
4462
        batch := 1
6✔
4463
        // Sleep interval between each client connection close.
6✔
4464
        var si int64
6✔
4465
        if numClients != 0 {
12✔
4466
                si = dur / numClients
6✔
4467
        }
6✔
4468
        if si < 1 {
6✔
4469
                // Should not happen (except in test with very small LD duration), but
×
4470
                // if there are too many clients, batch the number of close and
×
4471
                // use a tiny sleep interval that will result in yield likely.
×
4472
                si = 1
×
4473
                batch = int(numClients / dur)
×
4474
        } else if si > int64(time.Second) {
7✔
4475
                // Conversely, there is no need to sleep too long between clients
1✔
4476
                // and spread say 10 clients for the 2min duration. Sleeping no
1✔
4477
                // more than 1sec.
1✔
4478
                si = int64(time.Second)
1✔
4479
        }
1✔
4480

4481
        // Now capture all clients
4482
        clients := make([]*client, 0, len(s.clients))
6✔
4483
        for _, client := range s.clients {
21✔
4484
                clients = append(clients, client)
15✔
4485
        }
15✔
4486
        // Now that we know that no new client can be accepted,
4487
        // send INFO to routes and clients to notify this state.
4488
        s.sendLDMToRoutes()
6✔
4489
        s.sendLDMToClients()
6✔
4490
        s.mu.Unlock()
6✔
4491

6✔
4492
        t := time.NewTimer(gp)
6✔
4493
        // Delay start of closing of client connections in case
6✔
4494
        // we have several servers that we want to signal to enter LD mode
6✔
4495
        // and not have their client reconnect to each other.
6✔
4496
        select {
6✔
4497
        case <-t.C:
6✔
4498
                s.Noticef("Closing existing clients")
6✔
4499
        case <-s.quitCh:
×
4500
                t.Stop()
×
4501
                return
×
4502
        }
4503
        for i, client := range clients {
21✔
4504
                client.closeConnection(ServerShutdown)
15✔
4505
                if i == len(clients)-1 {
21✔
4506
                        break
6✔
4507
                }
4508
                if batch == 1 || i%batch == 0 {
18✔
4509
                        // We pick a random interval which will be at least si/2
9✔
4510
                        v := rand.Int63n(si)
9✔
4511
                        if v < si/2 {
13✔
4512
                                v = si / 2
4✔
4513
                        }
4✔
4514
                        t.Reset(time.Duration(v))
9✔
4515
                        // Sleep for given interval or bail out if kicked by Shutdown().
9✔
4516
                        select {
9✔
4517
                        case <-t.C:
9✔
4518
                        case <-s.quitCh:
×
4519
                                t.Stop()
×
4520
                                return
×
4521
                        }
4522
                }
4523
        }
4524
        s.Shutdown()
6✔
4525
        s.WaitForShutdown()
6✔
4526
}
4527

4528
// Send an INFO update to routes with the indication that this server is in LDM mode.
4529
// Server lock is held on entry.
4530
func (s *Server) sendLDMToRoutes() {
6✔
4531
        s.routeInfo.LameDuckMode = true
6✔
4532
        infoJSON := generateInfoJSON(&s.routeInfo)
6✔
4533
        s.forEachRemote(func(r *client) {
17✔
4534
                r.mu.Lock()
11✔
4535
                r.enqueueProto(infoJSON)
11✔
4536
                r.mu.Unlock()
11✔
4537
        })
11✔
4538
        // Clear now so that we notify only once, should we have to send other INFOs.
4539
        s.routeInfo.LameDuckMode = false
6✔
4540
}
4541

4542
// Send an INFO update to clients with the indication that this server is in
4543
// LDM mode and with only URLs of other nodes.
4544
// Server lock is held on entry.
4545
func (s *Server) sendLDMToClients() {
6✔
4546
        s.info.LameDuckMode = true
6✔
4547
        // Clear this so that if there are further updates, we don't send our URLs.
6✔
4548
        s.clientConnectURLs = s.clientConnectURLs[:0]
6✔
4549
        if s.websocket.connectURLs != nil {
6✔
4550
                s.websocket.connectURLs = s.websocket.connectURLs[:0]
×
4551
        }
×
4552
        // Reset content first.
4553
        s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
6✔
4554
        s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
6✔
4555
        // Only add the other nodes if we are allowed to.
6✔
4556
        if !s.getOpts().Cluster.NoAdvertise {
12✔
4557
                for url := range s.clientConnectURLsMap {
18✔
4558
                        s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
12✔
4559
                }
12✔
4560
                for url := range s.websocket.connectURLsMap {
6✔
4561
                        s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
×
4562
                }
×
4563
        }
4564
        // Send to all registered clients that support async INFO protocols.
4565
        s.sendAsyncInfoToClients(true, true)
6✔
4566
        // We now clear the info.LameDuckMode flag so that if there are
6✔
4567
        // cluster updates and we send the INFO, we don't have the boolean
6✔
4568
        // set which would cause multiple LDM notifications to clients.
6✔
4569
        s.info.LameDuckMode = false
6✔
4570
}
4571

4572
// If given error is a net.Error and is temporary, sleeps for the given
4573
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
4574
// interrupted if the server is shutdown.
4575
// An error message is displayed depending on the type of error.
4576
// Returns the new (or unchanged) delay, or a negative value if the
4577
// server has been or is being shutdown.
4578
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
16,714✔
4579
        if !s.isRunning() {
33,428✔
4580
                return -1
16,714✔
4581
        }
16,714✔
4582
        //lint:ignore SA1019 We want to retry on a bunch of errors here.
4583
        if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
×
4584
                s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
×
4585
                select {
×
4586
                case <-time.After(tmpDelay):
×
4587
                case <-s.quitCh:
×
4588
                        return -1
×
4589
                }
4590
                tmpDelay *= 2
×
4591
                if tmpDelay > ACCEPT_MAX_SLEEP {
×
4592
                        tmpDelay = ACCEPT_MAX_SLEEP
×
4593
                }
×
4594
        } else {
×
4595
                s.Errorf("%s Accept error: %v", acceptName, err)
×
4596
        }
×
4597
        return tmpDelay
×
4598
}
4599

4600
var errNoIPAvail = errors.New("no IP available")
4601

4602
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
125,804✔
4603
        host, port, err := net.SplitHostPort(url)
125,804✔
4604
        if err != nil {
125,804✔
4605
                return "", err
×
4606
        }
×
4607
        // If already an IP, skip.
4608
        if net.ParseIP(host) != nil {
251,467✔
4609
                return url, nil
125,663✔
4610
        }
125,663✔
4611
        ips, err := resolver.LookupHost(context.Background(), host)
141✔
4612
        if err != nil {
146✔
4613
                return "", fmt.Errorf("lookup for host %q: %v", host, err)
5✔
4614
        }
5✔
4615
        if len(excludedAddresses) > 0 {
161✔
4616
                for i := 0; i < len(ips); i++ {
75✔
4617
                        ip := ips[i]
50✔
4618
                        addr := net.JoinHostPort(ip, port)
50✔
4619
                        if _, excluded := excludedAddresses[addr]; excluded {
56✔
4620
                                if len(ips) == 1 {
9✔
4621
                                        ips = nil
3✔
4622
                                        break
3✔
4623
                                }
4624
                                ips[i] = ips[len(ips)-1]
3✔
4625
                                ips = ips[:len(ips)-1]
3✔
4626
                                i--
3✔
4627
                        }
4628
                }
4629
                if len(ips) == 0 {
28✔
4630
                        return "", errNoIPAvail
3✔
4631
                }
3✔
4632
        }
4633
        var address string
133✔
4634
        if len(ips) == 0 {
134✔
4635
                s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
1✔
4636
                address = url
1✔
4637
        } else {
133✔
4638
                var ip string
132✔
4639
                if len(ips) == 1 {
139✔
4640
                        ip = ips[0]
7✔
4641
                } else {
132✔
4642
                        ip = ips[rand.Int31n(int32(len(ips)))]
125✔
4643
                }
125✔
4644
                // add the port
4645
                address = net.JoinHostPort(ip, port)
132✔
4646
        }
4647
        return address, nil
133✔
4648
}
4649

4650
// Returns true for the first attempt and depending on the nature
4651
// of the attempt (first connect or a reconnect), when the number
4652
// of attempts is equal to the configured report attempts.
4653
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
75,549✔
4654
        opts := s.getOpts()
75,549✔
4655
        if firstConnect {
115,271✔
4656
                if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
49,473✔
4657
                        return true
9,751✔
4658
                }
9,751✔
4659
                return false
29,971✔
4660
        }
4661
        if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
71,654✔
4662
                return true
35,827✔
4663
        }
35,827✔
4664
        return false
×
4665
}
4666

4667
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
3,003✔
4668
        s.updateRouteSubscriptionMap(acc, sub, delta)
3,003✔
4669
        if s.gateway.enabled {
3,653✔
4670
                s.gatewayUpdateSubInterest(acc.Name, sub, delta)
650✔
4671
        }
650✔
4672

4673
        acc.updateLeafNodes(sub, delta)
3,003✔
4674
}
4675

4676
func (s *Server) startRateLimitLogExpiration() {
6,771✔
4677
        interval := time.Second
6,771✔
4678
        s.startGoRoutine(func() {
13,542✔
4679
                defer s.grWG.Done()
6,771✔
4680

6,771✔
4681
                ticker := time.NewTicker(time.Second)
6,771✔
4682
                defer ticker.Stop()
6,771✔
4683
                for {
30,989✔
4684
                        select {
24,218✔
4685
                        case <-s.quitCh:
6,766✔
4686
                                return
6,766✔
4687
                        case interval = <-s.rateLimitLoggingCh:
×
4688
                                ticker.Reset(interval)
×
4689
                        case <-ticker.C:
17,447✔
4690
                                s.rateLimitLogging.Range(func(k, v any) bool {
20,517✔
4691
                                        start := v.(time.Time)
3,070✔
4692
                                        if time.Since(start) >= interval {
4,263✔
4693
                                                s.rateLimitLogging.Delete(k)
1,193✔
4694
                                        }
1,193✔
4695
                                        return true
3,070✔
4696
                                })
4697
                        }
4698
                }
4699
        })
4700
}
4701

4702
func (s *Server) changeRateLimitLogInterval(d time.Duration) {
×
4703
        if d <= 0 {
×
4704
                return
×
4705
        }
×
4706
        select {
×
4707
        case s.rateLimitLoggingCh <- d:
×
4708
        default:
×
4709
        }
4710
}
4711

4712
// DisconnectClientByID disconnects a client by connection ID
4713
func (s *Server) DisconnectClientByID(id uint64) error {
2✔
4714
        if s == nil {
2✔
4715
                return ErrServerNotRunning
×
4716
        }
×
4717
        if client := s.getClient(id); client != nil {
3✔
4718
                client.closeConnection(Kicked)
1✔
4719
                return nil
1✔
4720
        } else if client = s.GetLeafNode(id); client != nil {
3✔
4721
                client.closeConnection(Kicked)
1✔
4722
                return nil
1✔
4723
        }
1✔
4724
        return errors.New("no such client or leafnode id")
×
4725
}
4726

4727
// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
4728
func (s *Server) LDMClientByID(id uint64) error {
1✔
4729
        if s == nil {
1✔
4730
                return ErrServerNotRunning
×
4731
        }
×
4732
        s.mu.RLock()
1✔
4733
        c := s.clients[id]
1✔
4734
        if c == nil {
1✔
4735
                s.mu.RUnlock()
×
4736
                return errors.New("no such client id")
×
4737
        }
×
4738
        info := s.copyInfo()
1✔
4739
        info.LameDuckMode = true
1✔
4740
        s.mu.RUnlock()
1✔
4741
        c.mu.Lock()
1✔
4742
        defer c.mu.Unlock()
1✔
4743
        if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
2✔
4744
                // sendInfo takes care of checking if the connection is still
1✔
4745
                // valid or not, so don't duplicate tests here.
1✔
4746
                c.Debugf("Sending Lame Duck Mode info to client")
1✔
4747
                c.enqueueProto(c.generateClientInfoJSON(info, true))
1✔
4748
                return nil
1✔
4749
        } else {
1✔
4750
                return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
×
4751
        }
×
4752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc