• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25478192467

06 May 2026 02:32PM UTC coverage: 82.15% (-0.9%) from 83.018%
25478192467

push

github

web-flow
Ensure invalid subject characters cannot be forwarded from MQTT to other connection types (#8104)

Some characters are invalid in NATS subjects but were making it in due
to missing MQTT topic validation.

76411 of 93014 relevant lines covered (82.15%)

481569.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.06
/server/server.go
1
// Copyright 2012-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "context"
19
        "crypto/fips140"
20
        "crypto/tls"
21
        "encoding/json"
22
        "errors"
23
        "flag"
24
        "fmt"
25
        "io"
26
        "log"
27
        "math/rand"
28
        "net"
29
        "net/http"
30
        "net/url"
31
        "os"
32
        "path"
33
        "path/filepath"
34
        "regexp"
35
        "runtime"
36
        "runtime/pprof"
37
        "strconv"
38
        "strings"
39
        "sync"
40
        "sync/atomic"
41
        "time"
42

43
        // Allow dynamic profiling.
44
        _ "net/http/pprof"
45

46
        "expvar"
47

48
        "github.com/klauspost/compress/s2"
49
        "github.com/nats-io/jwt/v2"
50
        "github.com/nats-io/nats-server/v2/logger"
51
        "github.com/nats-io/nkeys"
52
        "github.com/nats-io/nuid"
53
)
54

55
const (
56
        // Interval for the first PING for non client connections.
57
        firstPingInterval = time.Second
58

59
        // This is for the first ping for client connections.
60
        firstClientPingInterval = 2 * time.Second
61
)
62

63
// These are protocol versions sent between server connections: ROUTER, LEAF and
64
// GATEWAY. We may have protocol versions that have a meaning only for a certain
65
// type of connections, but we don't have to have separate enums for that.
66
// However, it is CRITICAL to not change the order of those constants since they
67
// are exchanged between servers. When adding a new protocol version, add to the
68
// end of the list, don't try to group them by connection types.
69
const (
70
        // RouteProtoZero is the original Route protocol from 2009.
71
        // http://nats.io/documentation/internals/nats-protocol/
72
        RouteProtoZero = iota
73
        // RouteProtoInfo signals a route can receive more then the original INFO block.
74
        // This can be used to update remote cluster permissions, etc...
75
        RouteProtoInfo
76
        // RouteProtoV2 is the new route/cluster protocol that provides account support.
77
        RouteProtoV2
78
        // MsgTraceProto indicates that this server understands distributed message tracing.
79
        MsgTraceProto
80
)
81

82
// Will return the latest server-to-server protocol versions, unless the
83
// option to override it is set.
84
func (s *Server) getServerProto() int {
10,503✔
85
        opts := s.getOpts()
10,503✔
86
        // Initialize with the latest protocol version.
10,503✔
87
        proto := MsgTraceProto
10,503✔
88
        // For tests, we want to be able to make this server behave
10,503✔
89
        // as an older server so check this option to see if we should override.
10,503✔
90
        if opts.overrideProto < 0 {
10,512✔
91
                // The option overrideProto is set to 0 by default (when creating an
9✔
92
                // Options structure). Since this is the same value than the original
9✔
93
                // proto RouteProtoZero, tests call setServerProtoForTest() with the
9✔
94
                // desired protocol level, which sets it as negative value equal to:
9✔
95
                // (wantedProto + 1) * -1. Here we compute back the real value.
9✔
96
                proto = (opts.overrideProto * -1) - 1
9✔
97
        }
9✔
98
        return proto
10,503✔
99
}
100

101
// Used by tests.
102
func setServerProtoForTest(wantedProto int) int {
4✔
103
        return (wantedProto + 1) * -1
4✔
104
}
4✔
105

106
// Info is the information sent to clients, routes, gateways, and leaf nodes,
107
// to help them understand information about this server.
108
type Info struct {
109
        ID                string   `json:"server_id"`
110
        Name              string   `json:"server_name"`
111
        Version           string   `json:"version"`
112
        Proto             int      `json:"proto"`
113
        GitCommit         string   `json:"git_commit,omitempty"`
114
        GoVersion         string   `json:"go"`
115
        Host              string   `json:"host"`
116
        Port              int      `json:"port"`
117
        Headers           bool     `json:"headers"`
118
        AuthRequired      bool     `json:"auth_required,omitempty"`
119
        TLSRequired       bool     `json:"tls_required,omitempty"`
120
        TLSVerify         bool     `json:"tls_verify,omitempty"`
121
        TLSAvailable      bool     `json:"tls_available,omitempty"`
122
        MaxPayload        int32    `json:"max_payload"`
123
        JetStream         bool     `json:"jetstream,omitempty"`
124
        IP                string   `json:"ip,omitempty"`
125
        CID               uint64   `json:"client_id,omitempty"`
126
        ClientIP          string   `json:"client_ip,omitempty"`
127
        Nonce             string   `json:"nonce,omitempty"`
128
        Cluster           string   `json:"cluster,omitempty"`
129
        Dynamic           bool     `json:"cluster_dynamic,omitempty"`
130
        Domain            string   `json:"domain,omitempty"`
131
        ClientConnectURLs []string `json:"connect_urls,omitempty"`    // Contains URLs a client can connect to.
132
        WSConnectURLs     []string `json:"ws_connect_urls,omitempty"` // Contains URLs a ws client can connect to.
133
        LameDuckMode      bool     `json:"ldm,omitempty"`
134
        Compression       string   `json:"compression,omitempty"`
135
        ConnectInfo       bool     `json:"connect_info,omitempty"`   // When true this is the server INFO response to CONNECT
136
        RemoteAccount     string   `json:"remote_account,omitempty"` // Lets the client or leafnode side know the remote account that they bind to.
137
        IsSystemAccount   bool     `json:"acc_is_sys,omitempty"`     // Indicates if the account is a system account.
138
        JSApiLevel        int      `json:"api_lvl,omitempty"`
139

140
        // Route Specific
141
        Import        *SubjectPermission `json:"import,omitempty"`
142
        Export        *SubjectPermission `json:"export,omitempty"`
143
        LNOC          bool               `json:"lnoc,omitempty"`
144
        LNOCU         bool               `json:"lnocu,omitempty"`
145
        InfoOnConnect bool               `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO
146
        RoutePoolSize int                `json:"route_pool_size,omitempty"`
147
        RoutePoolIdx  int                `json:"route_pool_idx,omitempty"`
148
        RouteAccount  string             `json:"route_account,omitempty"`
149
        RouteAccReqID string             `json:"route_acc_add_reqid,omitempty"`
150
        GossipMode    byte               `json:"gossip_mode,omitempty"`
151

152
        // Gateways Specific
153
        Gateway           string   `json:"gateway,omitempty"`             // Name of the origin Gateway (sent by gateway's INFO)
154
        GatewayURLs       []string `json:"gateway_urls,omitempty"`        // Gateway URLs in the originating cluster (sent by gateway's INFO)
155
        GatewayURL        string   `json:"gateway_url,omitempty"`         // Gateway URL on that server (sent by route's INFO)
156
        GatewayCmd        byte     `json:"gateway_cmd,omitempty"`         // Command code for the receiving server to know what to do
157
        GatewayCmdPayload []byte   `json:"gateway_cmd_payload,omitempty"` // Command payload when needed
158
        GatewayNRP        bool     `json:"gateway_nrp,omitempty"`         // Uses new $GNR. prefix for mapped replies
159
        GatewayIOM        bool     `json:"gateway_iom,omitempty"`         // Indicate that all accounts will be switched to InterestOnly mode "right away"
160

161
        // LeafNode Specific
162
        LeafNodeURLs []string `json:"leafnode_urls,omitempty"` // LeafNode URLs that the server can reconnect to.
163

164
        XKey string `json:"xkey,omitempty"` // Public server's x25519 key.
165
}
166

167
// Server is our main struct.
168
type Server struct {
169
        // Fields accessed with atomic operations need to be 64-bit aligned
170
        gcid uint64
171
        // How often user logon fails due to the issuer account not being pinned.
172
        pinnedAccFail uint64
173
        stats
174
        scStats
175
        staleStats
176
        mu                  sync.RWMutex
177
        reloadMu            sync.RWMutex // Write-locked when a config reload is taking place ONLY
178
        kp                  nkeys.KeyPair
179
        xkp                 nkeys.KeyPair
180
        xpub                string
181
        info                Info
182
        configFile          string
183
        optsMu              sync.RWMutex
184
        opts                *Options
185
        running             atomic.Bool
186
        shutdown            atomic.Bool
187
        listener            net.Listener
188
        listenerErr         error
189
        gacc                *Account
190
        sys                 *internal
191
        sysAcc              atomic.Pointer[Account]
192
        js                  atomic.Pointer[jetStream]
193
        isMetaLeader        atomic.Bool
194
        jsClustered         atomic.Bool
195
        accounts            sync.Map
196
        tmpAccounts         sync.Map // Temporarily stores accounts that are being built
197
        activeAccounts      int32
198
        accResolver         AccountResolver
199
        clients             map[uint64]*client
200
        routes              map[string][]*client
201
        remoteRoutePoolSize map[string]int                // Map for remote's configure route pool size
202
        routesPoolSize      int                           // Configured pool size
203
        routesReject        bool                          // During reload, we may want to reject adding routes until some conditions are met
204
        routesNoPool        int                           // Number of routes that don't use pooling (connecting to older server for instance)
205
        accRoutes           map[string]map[string]*client // Key is account name, value is key=remoteID/value=route connection
206
        accRouteByHash      sync.Map                      // Key is account name, value is nil or a pool index
207
        accAddedCh          chan struct{}
208
        accAddedReqID       string
209
        leafs               map[uint64]*client
210
        users               map[string]*User
211
        nkeys               map[string]*NkeyUser
212
        totalClients        uint64
213
        closed              *closedRingBuffer
214
        done                chan bool
215
        start               time.Time
216
        http                net.Listener
217
        httpHandler         http.Handler
218
        httpBasePath        string
219
        profiler            net.Listener
220
        httpReqStats        map[string]uint64
221
        routeListener       net.Listener
222
        routeListenerErr    error
223
        routeInfo           Info
224
        routeResolver       netResolver
225
        routesToSelf        map[string]struct{}
226
        routeTLSName        string
227
        leafNodeListener    net.Listener
228
        leafNodeListenerErr error
229
        leafNodeInfo        Info
230
        leafNodeInfoJSON    []byte
231
        leafURLsMap         refCountedUrlSet
232
        leafNodeOpts        struct {
233
                resolver    netResolver
234
                dialTimeout time.Duration
235
        }
236
        leafRemoteCfgs     map[*leafNodeCfg]struct{}
237
        rmLeafRemoteCfgs   map[string]*leafNodeCfg
238
        leafRemoteAccounts sync.Map
239
        leafNodeEnabled    bool
240
        leafDisableConnect bool // Used in test only
241
        leafNoCluster      bool // Indicate that this server has only remotes and no cluster defined
242

243
        quitCh           chan struct{}
244
        startupComplete  chan struct{}
245
        shutdownComplete chan struct{}
246

247
        // Tracking Go routines
248
        grMu         sync.Mutex
249
        grTmpClients map[uint64]*client
250
        grRunning    bool
251
        grWG         sync.WaitGroup // to wait on various go routines
252

253
        cproto     int64     // number of clients supporting async INFO
254
        configTime time.Time // last time config was loaded
255

256
        logging struct {
257
                sync.RWMutex
258
                logger      Logger
259
                trace       int32
260
                debug       int32
261
                traceSysAcc int32
262
        }
263

264
        clientConnectURLs []string
265

266
        // Used internally for quick look-ups.
267
        clientConnectURLsMap refCountedUrlSet
268

269
        // For Gateways
270
        gatewayListener    net.Listener // Accept listener
271
        gatewayListenerErr error
272
        gateway            *srvGateway
273

274
        // Used by tests to check that http.Servers do
275
        // not set any timeout.
276
        monitoringServer *http.Server
277
        profilingServer  *http.Server
278

279
        // LameDuck mode
280
        ldm   bool
281
        ldmCh chan bool
282

283
        // Trusted public operator keys.
284
        trustedKeys []string
285
        // map of trusted keys to operator setting StrictSigningKeyUsage
286
        strictSigningKeyUsage map[string]struct{}
287

288
        // We use this to minimize mem copies for requests to monitoring
289
        // endpoint /varz (when it comes from http).
290
        varzMu sync.Mutex
291
        varz   *Varz
292
        // This is set during a config reload if we detect that we have
293
        // added/removed routes. The monitoring code then check that
294
        // to know if it should update the cluster's URLs array.
295
        varzUpdateRouteURLs bool
296

297
        // Keeps a sublist of of subscriptions attached to leafnode connections
298
        // for the $GNR.*.*.*.> subject so that a server can send back a mapped
299
        // gateway reply.
300
        gwLeafSubs *Sublist
301

302
        // Used for expiration of mapped GW replies
303
        gwrm struct {
304
                w  int32
305
                ch chan time.Duration
306
                m  sync.Map
307
        }
308

309
        // For eventIDs
310
        eventIds *nuid.NUID
311

312
        // Websocket structure
313
        websocket srvWebsocket
314

315
        // MQTT structure
316
        mqtt srvMQTT
317

318
        // OCSP monitoring
319
        ocsps []*OCSPMonitor
320

321
        // OCSP peer verification (at least one TLS block)
322
        ocspPeerVerify bool
323

324
        // OCSP response cache
325
        ocsprc OCSPResponseCache
326

327
        // exporting account name the importer experienced issues with
328
        incompleteAccExporterMap sync.Map
329

330
        // Holds cluster name under different lock for mapping
331
        cnMu sync.RWMutex
332
        cn   string
333

334
        // For registering raft nodes with the server.
335
        rnMu      sync.RWMutex
336
        raftNodes map[string]RaftNode
337

338
        // For mapping from a raft node name back to a server name and cluster. Node has to be in the same domain.
339
        nodeToInfo sync.Map
340

341
        // For out of resources to not log errors too fast.
342
        rerrMu   sync.Mutex
343
        rerrLast time.Time
344

345
        connRateCounter *rateCounter
346

347
        // If there is a system account configured, to still support the $G account,
348
        // the server will create a fake user and add it to the list of users.
349
        // Keep track of what that user name is for config reload purposes.
350
        sysAccOnlyNoAuthUser string
351

352
        // IPQueues map
353
        ipQueues sync.Map
354

355
        // To limit logging frequency
356
        rateLimitLogging   sync.Map
357
        rateLimitLoggingCh chan time.Duration
358

359
        // Total outstanding catchup bytes in flight.
360
        gcbMu     sync.RWMutex
361
        gcbOut    int64
362
        gcbOutMax int64 // Taken from JetStreamMaxCatchup or defaultMaxTotalCatchupOutBytes
363
        // A global chanel to kick out stalled catchup sequences.
364
        gcbKick chan struct{}
365

366
        // Total outbound syncRequests
367
        syncOutSem chan struct{}
368

369
        // Queue to process JS API requests that come from routes (or gateways)
370
        jsAPIRoutedReqs     *ipQueue[*jsAPIRoutedReq]
371
        jsAPIRoutedInfoReqs *ipQueue[*jsAPIRoutedReq]
372

373
        // Delayed API responses.
374
        delayedAPIResponses *ipQueue[*delayedAPIResponse]
375

376
        // Whether moving NRG traffic into accounts is permitted on this server.
377
        // Controls whether or not the account NRG capability is set in statsz.
378
        // Currently used by unit tests to simulate nodes not supporting account NRG.
379
        accountNRGAllowed atomic.Bool
380

381
        // List of proxies trusted keys in `KeyPair` form so we can do signature
382
        // verification when processing incoming proxy connections.
383
        proxiesKeyPairs []nkeys.KeyPair
384
        proxiedConns    map[string]map[uint64]*client
385
}
386

387
// For tracking JS nodes.
388
type nodeInfo struct {
389
        name            string
390
        version         string
391
        cluster         string
392
        domain          string
393
        id              string
394
        tags            jwt.TagList
395
        cfg             *JetStreamConfig
396
        stats           *JetStreamStats
397
        offline         bool
398
        js              bool
399
        binarySnapshots bool
400
        accountNRG      bool
401
}
402

403
type stats struct {
404
        inMsgs           int64
405
        outMsgs          int64
406
        inBytes          int64
407
        outBytes         int64
408
        slowConsumers    int64
409
        staleConnections int64
410
        stalls           int64
411
}
412

413
// scStats includes the total and per connection counters of Slow Consumers.
414
type scStats struct {
415
        clients  atomic.Uint64
416
        routes   atomic.Uint64
417
        leafs    atomic.Uint64
418
        gateways atomic.Uint64
419
}
420

421
// staleStats includes the total and per connection counters of Stale Connections.
422
type staleStats struct {
423
        clients  atomic.Uint64
424
        routes   atomic.Uint64
425
        leafs    atomic.Uint64
426
        gateways atomic.Uint64
427
}
428

429
// This is used by tests so we can run all server tests with a default route
430
// or leafnode compression mode. For instance:
431
// go test -race -v ./server -cluster_compression=fast
432
var (
433
        testDefaultClusterCompression  string
434
        testDefaultLeafNodeCompression string
435
)
436

437
// Compression modes.
438
const (
439
        CompressionNotSupported   = "not supported"
440
        CompressionOff            = "off"
441
        CompressionAccept         = "accept"
442
        CompressionS2Auto         = "s2_auto"
443
        CompressionS2Uncompressed = "s2_uncompressed"
444
        CompressionS2Fast         = "s2_fast"
445
        CompressionS2Better       = "s2_better"
446
        CompressionS2Best         = "s2_best"
447
)
448

449
// defaultCompressionS2AutoRTTThresholds is the default of RTT thresholds for
450
// the CompressionS2Auto mode.
451
var defaultCompressionS2AutoRTTThresholds = []time.Duration{
452
        // [0..10ms] -> CompressionS2Uncompressed
453
        10 * time.Millisecond,
454
        // ]10ms..50ms] -> CompressionS2Fast
455
        50 * time.Millisecond,
456
        // ]50ms..100ms] -> CompressionS2Better
457
        100 * time.Millisecond,
458
        // ]100ms..] -> CompressionS2Best
459
}
460

461
// For a given user provided string, matches to one of the compression mode
462
// constant and updates the provided string to that constant. Returns an
463
// error if the provided compression mode is not known.
464
// The parameter `chosenModeForOn` indicates which compression mode to use
465
// when the user selects "on" (or enabled, true, etc..). This is because
466
// we may have different defaults depending on where the compression is used.
467
func validateAndNormalizeCompressionOption(c *CompressionOpts, chosenModeForOn string) error {
11,122✔
468
        if c == nil {
11,122✔
469
                return nil
×
470
        }
×
471
        cmtl := strings.ToLower(c.Mode)
11,122✔
472
        // First, check for the "on" case so that we set to the default compression
11,122✔
473
        // mode for that. The other switch/case will finish setup if needed (for
11,122✔
474
        // instance if the default mode is s2Auto).
11,122✔
475
        switch cmtl {
11,122✔
476
        case "on", "enabled", "true":
14✔
477
                cmtl = chosenModeForOn
14✔
478
        default:
11,108✔
479
        }
480
        // Check (again) with the proper mode.
481
        switch cmtl {
11,122✔
482
        case "not supported", "not_supported":
2✔
483
                c.Mode = CompressionNotSupported
2✔
484
        case "disabled", "off", "false":
1,796✔
485
                c.Mode = CompressionOff
1,796✔
486
        case "accept":
4,229✔
487
                c.Mode = CompressionAccept
4,229✔
488
        case "auto", "s2_auto":
5,014✔
489
                var rtts []time.Duration
5,014✔
490
                if len(c.RTTThresholds) == 0 {
9,174✔
491
                        rtts = defaultCompressionS2AutoRTTThresholds
4,160✔
492
                } else {
5,014✔
493
                        for _, n := range c.RTTThresholds {
3,426✔
494
                                // Do not error on negative, but simply set to 0
2,572✔
495
                                if n < 0 {
2,576✔
496
                                        n = 0
4✔
497
                                }
4✔
498
                                // Make sure they are properly ordered. However, it is possible
499
                                // to have a "0" anywhere in the list to indicate that this
500
                                // compression level should not be used.
501
                                if l := len(rtts); l > 0 && n != 0 {
4,260✔
502
                                        for _, v := range rtts {
4,242✔
503
                                                if n < v {
2,556✔
504
                                                        return fmt.Errorf("RTT threshold values %v should be in ascending order", c.RTTThresholds)
2✔
505
                                                }
2✔
506
                                        }
507
                                }
508
                                rtts = append(rtts, n)
2,570✔
509
                        }
510
                        if len(rtts) > 0 {
1,704✔
511
                                // Trim 0 that are at the end.
852✔
512
                                stop := -1
852✔
513
                                for i := len(rtts) - 1; i >= 0; i-- {
1,718✔
514
                                        if rtts[i] != 0 {
1,714✔
515
                                                stop = i
848✔
516
                                                break
848✔
517
                                        }
518
                                }
519
                                rtts = rtts[:stop+1]
852✔
520
                        }
521
                        if len(rtts) > 4 {
854✔
522
                                // There should be at most values for "uncompressed", "fast",
2✔
523
                                // "better" and "best" (when some 0 are present).
2✔
524
                                return fmt.Errorf("compression mode %q should have no more than 4 RTT thresholds: %v", c.Mode, c.RTTThresholds)
2✔
525
                        } else if len(rtts) == 0 {
856✔
526
                                // But there should be at least 1 if the user provided the slice.
4✔
527
                                // We would be here only if it was provided by say with values
4✔
528
                                // being a single or all zeros.
4✔
529
                                return fmt.Errorf("compression mode %q requires at least one RTT threshold", c.Mode)
4✔
530
                        }
4✔
531
                }
532
                c.Mode = CompressionS2Auto
5,006✔
533
                c.RTTThresholds = rtts
5,006✔
534
        case "fast", "s2_fast":
25✔
535
                c.Mode = CompressionS2Fast
25✔
536
        case "better", "s2_better":
27✔
537
                c.Mode = CompressionS2Better
27✔
538
        case "best", "s2_best":
27✔
539
                c.Mode = CompressionS2Best
27✔
540
        default:
2✔
541
                return fmt.Errorf("unsupported compression mode %q", c.Mode)
2✔
542
        }
543
        return nil
11,112✔
544
}
545

546
// Returns `true` if the compression mode `m` indicates that the server
547
// will negotiate compression with the remote server, `false` otherwise.
548
// Note that the provided compression mode is assumed to have been
549
// normalized and validated.
550
func needsCompression(m string) bool {
179,201✔
551
        return m != _EMPTY_ && m != CompressionOff && m != CompressionNotSupported
179,201✔
552
}
179,201✔
553

554
// Compression is asymmetric, meaning that one side can have a different
555
// compression level than the other. However, we need to check for cases
556
// when this server `scm` or the remote `rcm` do not support compression
557
// (say older server, or test to make it behave as it is not), or have
558
// the compression off.
559
// Note that `scm` is assumed to not be "off" or "not supported".
560
func selectCompressionMode(scm, rcm string) (mode string, err error) {
43,118✔
561
        if rcm == CompressionNotSupported || rcm == _EMPTY_ {
43,121✔
562
                return CompressionNotSupported, nil
3✔
563
        }
3✔
564
        switch rcm {
43,115✔
565
        case CompressionOff:
17,149✔
566
                // If the remote explicitly disables compression, then we won't
17,149✔
567
                // use compression.
17,149✔
568
                return CompressionOff, nil
17,149✔
569
        case CompressionAccept:
24,778✔
570
                // If the remote is ok with compression (but is not initiating it),
24,778✔
571
                // and if we too are in this mode, then it means no compression.
24,778✔
572
                if scm == CompressionAccept {
49,553✔
573
                        return CompressionOff, nil
24,775✔
574
                }
24,775✔
575
                // Otherwise use our compression mode.
576
                return scm, nil
3✔
577
        case CompressionS2Auto, CompressionS2Uncompressed, CompressionS2Fast, CompressionS2Better, CompressionS2Best:
1,188✔
578
                // This case is here to make sure that if we don't recognize a
1,188✔
579
                // compression setting, we error out.
1,188✔
580
                if scm == CompressionAccept {
1,194✔
581
                        // If our compression mode is "accept", then we will use the remote
6✔
582
                        // compression mode, except if it is "auto", in which case we will
6✔
583
                        // default to "fast". This is not a configuration (auto in one
6✔
584
                        // side and accept in the other) that would be recommended.
6✔
585
                        if rcm == CompressionS2Auto {
8✔
586
                                return CompressionS2Fast, nil
2✔
587
                        }
2✔
588
                        // Use their compression mode.
589
                        return rcm, nil
4✔
590
                }
591
                // Otherwise use our compression mode.
592
                return scm, nil
1,182✔
593
        default:
×
594
                return _EMPTY_, fmt.Errorf("unsupported route compression mode %q", rcm)
×
595
        }
596
}
597

598
// If the configured compression mode is "auto" then will return that,
599
// otherwise will return the given `cm` compression mode.
600
func compressionModeForInfoProtocol(co *CompressionOpts, cm string) string {
18,251✔
601
        if co.Mode == CompressionS2Auto {
19,410✔
602
                return CompressionS2Auto
1,159✔
603
        }
1,159✔
604
        return cm
17,092✔
605
}
606

607
// Given a connection RTT and a list of thresholds durations, this
608
// function will return an S2 compression level such as "uncompressed",
609
// "fast", "better" or "best". For instance, with the following slice:
610
// [5ms, 10ms, 15ms, 20ms], a RTT of up to 5ms will result
611
// in the compression level "uncompressed", ]5ms..10ms] will result in
612
// "fast" compression, etc..
613
// However, the 0 value allows for disabling of some compression levels.
614
// For instance, the following slice: [0, 0, 20, 30] means that a RTT of
615
// [0..20ms] would result in the "better" compression - effectively disabling
616
// the use of "uncompressed" and "fast", then anything above 20ms would
617
// result in the use of "best" level (the 30 in the list has no effect
618
// and the list could have been simplified to [0, 0, 20]).
619
func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration) string {
1,515✔
620
        var idx int
1,515✔
621
        var found bool
1,515✔
622
        for i, d := range rttThresholds {
3,068✔
623
                if rtt <= d {
3,052✔
624
                        idx = i
1,499✔
625
                        found = true
1,499✔
626
                        break
1,499✔
627
                }
628
        }
629
        if !found {
1,531✔
630
                // If we did not find but we have all levels, then use "best",
16✔
631
                // otherwise use the last one in array.
16✔
632
                if l := len(rttThresholds); l >= 3 {
32✔
633
                        idx = 3
16✔
634
                } else {
16✔
635
                        idx = l - 1
×
636
                }
×
637
        }
638
        switch idx {
1,515✔
639
        case 0:
1,495✔
640
                return CompressionS2Uncompressed
1,495✔
641
        case 1:
2✔
642
                return CompressionS2Fast
2✔
643
        case 2:
2✔
644
                return CompressionS2Better
2✔
645
        }
646
        return CompressionS2Best
16✔
647
}
648

649
// Returns an array of s2 WriterOption based on the route compression mode.
650
// So far we return a single option, but this way we can call s2.NewWriter()
651
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
652
// this is more versatile.
653
func s2WriterOptions(cm string) []s2.WriterOption {
1,205✔
654
        _opts := [2]s2.WriterOption{}
1,205✔
655
        opts := append(
1,205✔
656
                _opts[:0],
1,205✔
657
                s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
1,205✔
658
        )
1,205✔
659
        switch cm {
1,205✔
660
        case CompressionS2Uncompressed:
1,153✔
661
                return append(opts, s2.WriterUncompressed())
1,153✔
662
        case CompressionS2Best:
27✔
663
                return append(opts, s2.WriterBestCompression())
27✔
664
        case CompressionS2Better:
13✔
665
                return append(opts, s2.WriterBetterCompression())
13✔
666
        default:
12✔
667
                return nil
12✔
668
        }
669
}
670

671
// New will setup a new server struct after parsing the options.
672
// DEPRECATED: Use NewServer(opts)
673
func New(opts *Options) *Server {
195✔
674
        s, _ := NewServer(opts)
195✔
675
        return s
195✔
676
}
195✔
677

678
func NewServerFromConfig(opts *Options) (*Server, error) {
×
679
        if opts.ConfigFile != _EMPTY_ && opts.configDigest == "" {
×
680
                if err := opts.ProcessConfigFile(opts.ConfigFile); err != nil {
×
681
                        return nil, err
×
682
                }
×
683
        }
684
        return NewServer(opts)
×
685
}
686

687
// NewServer will setup a new server struct after parsing the options.
688
// Could return an error if options can not be validated.
689
// The provided Options type should not be re-used afterwards.
690
// Either use Options.Clone() to pass a copy, or make a new one.
691
func NewServer(opts *Options) (*Server, error) {
7,034✔
692
        setBaselineOptions(opts)
7,034✔
693

7,034✔
694
        // Process TLS options, including whether we require client certificates.
7,034✔
695
        tlsReq := opts.TLSConfig != nil
7,034✔
696
        verify := (tlsReq && opts.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert)
7,034✔
697

7,034✔
698
        // Create our server's nkey identity.
7,034✔
699
        kp, _ := nkeys.CreateServer()
7,034✔
700
        pub, _ := kp.PublicKey()
7,034✔
701

7,034✔
702
        // Create an xkey for encrypting messages from this server.
7,034✔
703
        var xkp nkeys.KeyPair
7,034✔
704
        var xpub string
7,034✔
705
        if !fips140.Enabled() {
14,068✔
706
                xkp, _ = nkeys.CreateCurveKeys()
7,034✔
707
                xpub, _ = xkp.PublicKey()
7,034✔
708
        }
7,034✔
709

710
        serverName := pub
7,034✔
711
        if opts.ServerName != _EMPTY_ {
11,585✔
712
                serverName = opts.ServerName
4,551✔
713
        }
4,551✔
714

715
        httpBasePath := normalizeBasePath(opts.HTTPBasePath)
7,034✔
716

7,034✔
717
        // Validate some options. This is here because we cannot assume that
7,034✔
718
        // server will always be started with configuration parsing (that could
7,034✔
719
        // report issues). Its options can be (incorrectly) set by hand when
7,034✔
720
        // server is embedded. If there is an error, return nil.
7,034✔
721
        if err := validateOptions(opts); err != nil {
7,084✔
722
                return nil, err
50✔
723
        }
50✔
724

725
        info := Info{
6,984✔
726
                ID:           pub,
6,984✔
727
                XKey:         xpub,
6,984✔
728
                Version:      VERSION,
6,984✔
729
                Proto:        PROTO,
6,984✔
730
                GitCommit:    gitCommit,
6,984✔
731
                GoVersion:    runtime.Version(),
6,984✔
732
                Name:         serverName,
6,984✔
733
                Host:         opts.Host,
6,984✔
734
                Port:         opts.Port,
6,984✔
735
                AuthRequired: false,
6,984✔
736
                TLSRequired:  tlsReq && !opts.AllowNonTLS,
6,984✔
737
                TLSVerify:    verify,
6,984✔
738
                MaxPayload:   opts.MaxPayload,
6,984✔
739
                JetStream:    opts.JetStream,
6,984✔
740
                Headers:      !opts.NoHeaderSupport,
6,984✔
741
                Cluster:      opts.Cluster.Name,
6,984✔
742
                Domain:       opts.JetStreamDomain,
6,984✔
743
                JSApiLevel:   JSApiLevel,
6,984✔
744
        }
6,984✔
745

6,984✔
746
        if tlsReq && !info.TLSRequired {
6,987✔
747
                info.TLSAvailable = true
3✔
748
        }
3✔
749

750
        now := time.Now()
6,984✔
751

6,984✔
752
        s := &Server{
6,984✔
753
                kp:                 kp,
6,984✔
754
                xkp:                xkp,
6,984✔
755
                xpub:               xpub,
6,984✔
756
                configFile:         opts.ConfigFile,
6,984✔
757
                info:               info,
6,984✔
758
                opts:               opts,
6,984✔
759
                done:               make(chan bool, 1),
6,984✔
760
                start:              now,
6,984✔
761
                configTime:         now,
6,984✔
762
                gwLeafSubs:         NewSublistWithCache(),
6,984✔
763
                httpBasePath:       httpBasePath,
6,984✔
764
                eventIds:           nuid.New(),
6,984✔
765
                routesToSelf:       make(map[string]struct{}),
6,984✔
766
                httpReqStats:       make(map[string]uint64), // Used to track HTTP requests
6,984✔
767
                rateLimitLoggingCh: make(chan time.Duration, 1),
6,984✔
768
                leafNodeEnabled:    opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0,
6,984✔
769
                syncOutSem:         make(chan struct{}, maxConcurrentSyncRequests),
6,984✔
770
        }
6,984✔
771

6,984✔
772
        // Delayed API response queue. Create regardless if JetStream is configured
6,984✔
773
        // or not (since it can be enabled/disabled with config reload, we want this
6,984✔
774
        // queue to exist at all times).
6,984✔
775
        s.delayedAPIResponses = newIPQueue[*delayedAPIResponse](s, "delayed API responses")
6,984✔
776

6,984✔
777
        // By default we'll allow account NRG.
6,984✔
778
        s.accountNRGAllowed.Store(true)
6,984✔
779

6,984✔
780
        // Fill up the maximum in flight syncRequests for this server.
6,984✔
781
        // Used in JetStream catchup semantics.
6,984✔
782
        for i := 0; i < maxConcurrentSyncRequests; i++ {
230,472✔
783
                s.syncOutSem <- struct{}{}
223,488✔
784
        }
223,488✔
785

786
        if opts.TLSRateLimit > 0 {
6,985✔
787
                s.connRateCounter = newRateCounter(opts.tlsConfigOpts.RateLimit)
1✔
788
        }
1✔
789

790
        // Trusted root operator keys.
791
        if !s.processTrustedKeys() {
6,984✔
792
                return nil, fmt.Errorf("Error processing trusted operator keys")
×
793
        }
×
794

795
        // If we have solicited leafnodes but no clustering and no clustername.
796
        // However we may need a stable clustername so use the server name.
797
        if len(opts.LeafNode.Remotes) > 0 && opts.Cluster.Port == 0 && opts.Cluster.Name == _EMPTY_ {
7,999✔
798
                s.leafNoCluster = true
1,015✔
799
                opts.Cluster.Name = opts.ServerName
1,015✔
800
        }
1,015✔
801

802
        if opts.Cluster.Name != _EMPTY_ {
12,031✔
803
                // Also place into mapping cn with cnMu lock.
5,047✔
804
                s.cnMu.Lock()
5,047✔
805
                s.cn = opts.Cluster.Name
5,047✔
806
                s.cnMu.Unlock()
5,047✔
807
        }
5,047✔
808

809
        s.mu.Lock()
6,984✔
810
        defer s.mu.Unlock()
6,984✔
811

6,984✔
812
        // If there are proxies trusted public keys in the configuration
6,984✔
813
        // this will fill create the corresponding list of nkeys.KeyPair
6,984✔
814
        // that we can use for signature verification.
6,984✔
815
        s.processProxiesTrustedKeys()
6,984✔
816

6,984✔
817
        // Place ourselves in the JetStream nodeInfo if needed.
6,984✔
818
        if opts.JetStream {
11,446✔
819
                ourNode := getHash(serverName)
4,462✔
820
                s.nodeToInfo.Store(ourNode, nodeInfo{
4,462✔
821
                        name:            serverName,
4,462✔
822
                        version:         VERSION,
4,462✔
823
                        cluster:         opts.Cluster.Name,
4,462✔
824
                        domain:          opts.JetStreamDomain,
4,462✔
825
                        id:              info.ID,
4,462✔
826
                        tags:            opts.Tags,
4,462✔
827
                        cfg:             &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true},
4,462✔
828
                        stats:           nil,
4,462✔
829
                        offline:         false,
4,462✔
830
                        js:              true,
4,462✔
831
                        binarySnapshots: true,
4,462✔
832
                        accountNRG:      true,
4,462✔
833
                })
4,462✔
834
        }
4,462✔
835

836
        s.routeResolver = opts.Cluster.resolver
6,984✔
837
        if s.routeResolver == nil {
13,968✔
838
                s.routeResolver = net.DefaultResolver
6,984✔
839
        }
6,984✔
840

841
        // Used internally for quick look-ups.
842
        s.clientConnectURLsMap = make(refCountedUrlSet)
6,984✔
843
        s.websocket.connectURLsMap = make(refCountedUrlSet)
6,984✔
844
        s.leafURLsMap = make(refCountedUrlSet)
6,984✔
845

6,984✔
846
        // Ensure that non-exported options (used in tests) are properly set.
6,984✔
847
        s.setLeafNodeNonExportedOptions()
6,984✔
848

6,984✔
849
        // Setup OCSP Stapling and OCSP Peer. This will abort server from starting if there
6,984✔
850
        // are no valid staples and OCSP Stapling policy is set to Always or MustStaple.
6,984✔
851
        if err := s.enableOCSP(); err != nil {
6,986✔
852
                return nil, err
2✔
853
        }
2✔
854

855
        // Call this even if there is no gateway defined. It will
856
        // initialize the structure so we don't have to check for
857
        // it to be nil or not in various places in the code.
858
        if err := s.newGateway(opts); err != nil {
6,982✔
859
                return nil, err
×
860
        }
×
861

862
        // If we have a cluster definition but do not have a cluster name, create one.
863
        if opts.Cluster.Port != 0 && opts.Cluster.Name == _EMPTY_ {
7,004✔
864
                s.info.Cluster = nuid.Next()
22✔
865
        } else if opts.Cluster.Name != _EMPTY_ {
12,029✔
866
                // Likewise here if we have a cluster name set.
5,047✔
867
                s.info.Cluster = opts.Cluster.Name
5,047✔
868
        }
5,047✔
869

870
        // This is normally done in the AcceptLoop, once the
871
        // listener has been created (possibly with random port),
872
        // but since some tests may expect the INFO to be properly
873
        // set after New(), let's do it now.
874
        s.setInfoHostPort()
6,982✔
875

6,982✔
876
        // For tracking clients
6,982✔
877
        s.clients = make(map[uint64]*client)
6,982✔
878

6,982✔
879
        // For tracking closed clients.
6,982✔
880
        s.closed = newClosedRingBuffer(opts.MaxClosedClients)
6,982✔
881

6,982✔
882
        // For tracking connections that are not yet registered
6,982✔
883
        // in s.routes, but for which readLoop has started.
6,982✔
884
        s.grTmpClients = make(map[uint64]*client)
6,982✔
885

6,982✔
886
        // For tracking routes and their remote ids
6,982✔
887
        s.initRouteStructures(opts)
6,982✔
888

6,982✔
889
        // For tracking leaf nodes.
6,982✔
890
        s.leafs = make(map[uint64]*client)
6,982✔
891

6,982✔
892
        // Used to kick out all go routines possibly waiting on server
6,982✔
893
        // to shutdown.
6,982✔
894
        s.quitCh = make(chan struct{})
6,982✔
895

6,982✔
896
        // Closed when startup is complete. ReadyForConnections() will block on
6,982✔
897
        // this before checking the presence of listening sockets.
6,982✔
898
        s.startupComplete = make(chan struct{})
6,982✔
899

6,982✔
900
        // Closed when Shutdown() is complete. Allows WaitForShutdown() to block
6,982✔
901
        // waiting for complete shutdown.
6,982✔
902
        s.shutdownComplete = make(chan struct{})
6,982✔
903

6,982✔
904
        // Check for configured account resolvers.
6,982✔
905
        if err := s.configureResolver(); err != nil {
6,982✔
906
                return nil, err
×
907
        }
×
908
        // If there is an URL account resolver, do basic test to see if anyone is home.
909
        if ar := opts.AccountResolver; ar != nil {
7,345✔
910
                if ur, ok := ar.(*URLAccResolver); ok {
402✔
911
                        if _, err := ur.Fetch(_EMPTY_); err != nil {
40✔
912
                                return nil, err
1✔
913
                        }
1✔
914
                }
915
        }
916
        // For other resolver:
917
        // In operator mode, when the account resolver depends on an external system and
918
        // the system account can't be fetched, inject a temporary one.
919
        if ar := s.accResolver; len(opts.TrustedOperators) == 1 && ar != nil &&
6,981✔
920
                opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
7,245✔
921
                if _, ok := ar.(*MemAccResolver); !ok {
419✔
922
                        s.mu.Unlock()
155✔
923
                        var a *Account
155✔
924
                        // perform direct lookup to avoid warning trace
155✔
925
                        if _, err := fetchAccount(ar, opts.SystemAccount); err == nil {
243✔
926
                                a, _ = s.lookupAccount(opts.SystemAccount)
88✔
927
                        }
88✔
928
                        s.mu.Lock()
155✔
929
                        if a == nil {
222✔
930
                                sac := NewAccount(opts.SystemAccount)
67✔
931
                                sac.Issuer = opts.TrustedOperators[0].Issuer
67✔
932
                                sac.signingKeys = map[string]jwt.Scope{}
67✔
933
                                sac.signingKeys[opts.SystemAccount] = nil
67✔
934
                                s.registerAccountNoLock(sac)
67✔
935
                        }
67✔
936
                }
937
        }
938

939
        // For tracking accounts
940
        if _, err := s.configureAccounts(false); err != nil {
6,981✔
941
                return nil, err
×
942
        }
×
943

944
        // Used to setup Authorization.
945
        s.configureAuthorization()
6,981✔
946

6,981✔
947
        // Start signal handler
6,981✔
948
        s.handleSignals()
6,981✔
949

6,981✔
950
        return s, nil
6,981✔
951
}
952

953
// Initializes route structures based on pooling and/or per-account routes.
954
//
955
// Server lock is held on entry
956
func (s *Server) initRouteStructures(opts *Options) {
6,982✔
957
        s.routes = make(map[string][]*client)
6,982✔
958
        if ps := opts.Cluster.PoolSize; ps > 0 {
11,177✔
959
                s.routesPoolSize = ps
4,195✔
960
        } else {
6,982✔
961
                s.routesPoolSize = 1
2,787✔
962
        }
2,787✔
963
        // If we have per-account routes, we create accRoutes and initialize it
964
        // with nil values. The presence of an account as the key will allow us
965
        // to know if a given account is supposed to have dedicated routes.
966
        if l := len(opts.Cluster.PinnedAccounts); l > 0 {
10,889✔
967
                s.accRoutes = make(map[string]map[string]*client, l)
3,907✔
968
                for _, acc := range opts.Cluster.PinnedAccounts {
7,828✔
969
                        s.accRoutes[acc] = make(map[string]*client)
3,921✔
970
                }
3,921✔
971
        }
972
}
973

974
func (s *Server) logRejectedTLSConns() {
1✔
975
        defer s.grWG.Done()
1✔
976
        t := time.NewTicker(time.Second)
1✔
977
        defer t.Stop()
1✔
978
        for {
3✔
979
                select {
2✔
980
                case <-s.quitCh:
1✔
981
                        return
1✔
982
                case <-t.C:
1✔
983
                        blocked := s.connRateCounter.countBlocked()
1✔
984
                        if blocked > 0 {
2✔
985
                                s.Warnf("Rejected %d connections due to TLS rate limiting", blocked)
1✔
986
                        }
1✔
987
                }
988
        }
989
}
990

991
// clusterName returns our cluster name which could be dynamic.
992
func (s *Server) ClusterName() string {
117,494✔
993
        s.mu.RLock()
117,494✔
994
        cn := s.info.Cluster
117,494✔
995
        s.mu.RUnlock()
117,494✔
996
        return cn
117,494✔
997
}
117,494✔
998

999
// Grabs cluster name with cluster name specific lock.
1000
func (s *Server) cachedClusterName() string {
118,314✔
1001
        s.cnMu.RLock()
118,314✔
1002
        cn := s.cn
118,314✔
1003
        s.cnMu.RUnlock()
118,314✔
1004
        return cn
118,314✔
1005
}
118,314✔
1006

1007
// setClusterName will update the cluster name for this server.
1008
func (s *Server) setClusterName(name string) {
26✔
1009
        s.mu.Lock()
26✔
1010
        var resetCh chan struct{}
26✔
1011
        if s.sys != nil && s.info.Cluster != name {
51✔
1012
                // can't hold the lock as go routine reading it may be waiting for lock as well
25✔
1013
                resetCh = s.sys.resetCh
25✔
1014
        }
25✔
1015
        s.info.Cluster = name
26✔
1016
        s.routeInfo.Cluster = name
26✔
1017

26✔
1018
        // Need to close solicited leaf nodes. The close has to be done outside of the server lock.
26✔
1019
        var leafs []*client
26✔
1020
        for _, c := range s.leafs {
27✔
1021
                c.mu.Lock()
1✔
1022
                if c.leaf != nil && c.leaf.remote != nil {
2✔
1023
                        leafs = append(leafs, c)
1✔
1024
                }
1✔
1025
                c.mu.Unlock()
1✔
1026
        }
1027
        s.mu.Unlock()
26✔
1028

26✔
1029
        // Also place into mapping cn with cnMu lock.
26✔
1030
        s.cnMu.Lock()
26✔
1031
        s.cn = name
26✔
1032
        s.cnMu.Unlock()
26✔
1033

26✔
1034
        for _, l := range leafs {
27✔
1035
                l.closeConnection(ClusterNameConflict)
1✔
1036
        }
1✔
1037
        if resetCh != nil {
51✔
1038
                resetCh <- struct{}{}
25✔
1039
        }
25✔
1040
        s.Noticef("Cluster name updated to %s", name)
26✔
1041
}
1042

1043
// Return whether the cluster name is dynamic.
1044
func (s *Server) isClusterNameDynamic() bool {
44,962✔
1045
        // We need to lock the whole "Cluster.Name" check and not use s.getOpts()
44,962✔
1046
        // because otherwise this could cause a data race with setting the name in
44,962✔
1047
        // route.go's processRouteConnect().
44,962✔
1048
        s.optsMu.RLock()
44,962✔
1049
        dynamic := s.opts.Cluster.Name == _EMPTY_
44,962✔
1050
        s.optsMu.RUnlock()
44,962✔
1051
        return dynamic
44,962✔
1052
}
44,962✔
1053

1054
// Returns our configured serverName.
1055
func (s *Server) serverName() string {
24,868✔
1056
        return s.getOpts().ServerName
24,868✔
1057
}
24,868✔
1058

1059
// ClientURL returns the URL used to connect clients.
1060
// Helpful in tests and with in-process servers using a random client port (-1).
1061
func (s *Server) ClientURL() string {
6,791✔
1062
        // FIXME(dlc) - should we add in user and pass if defined single?
6,791✔
1063
        opts := s.getOpts()
6,791✔
1064
        var u url.URL
6,791✔
1065
        u.Scheme = "nats"
6,791✔
1066
        if opts.TLSConfig != nil {
6,801✔
1067
                u.Scheme = "tls"
10✔
1068
        }
10✔
1069
        u.Host = net.JoinHostPort(opts.Host, fmt.Sprintf("%d", opts.Port))
6,791✔
1070
        return u.String()
6,791✔
1071
}
1072

1073
// WebsocketURL returns the URL used to connect websocket clients.
1074
// Helpful in tests and with in-process servers using a random websocket port (-1).
1075
func (s *Server) WebsocketURL() string {
×
1076
        opts := s.getOpts()
×
1077
        var u url.URL
×
1078
        u.Scheme = "ws"
×
1079
        if opts.Websocket.TLSConfig != nil {
×
1080
                u.Scheme = "wss"
×
1081
        }
×
1082
        u.Host = net.JoinHostPort(opts.Websocket.Host, fmt.Sprintf("%d", opts.Websocket.Port))
×
1083
        return u.String()
×
1084
}
1085

1086
func validateCluster(o *Options) error {
8,148✔
1087
        if o.Cluster.Name != _EMPTY_ && strings.Contains(o.Cluster.Name, " ") {
8,148✔
1088
                return ErrClusterNameHasSpaces
×
1089
        }
×
1090
        if o.Cluster.Compression.Mode != _EMPTY_ {
13,237✔
1091
                if err := validateAndNormalizeCompressionOption(&o.Cluster.Compression, CompressionS2Fast); err != nil {
5,089✔
1092
                        return err
×
1093
                }
×
1094
        }
1095
        if err := validatePinnedCerts(o.Cluster.TLSPinnedCerts); err != nil {
8,148✔
1096
                return fmt.Errorf("cluster: %v", err)
×
1097
        }
×
1098
        // Check that cluster name if defined matches any gateway name.
1099
        // Note that we have already verified that the gateway name does not have spaces.
1100
        if o.Gateway.Name != _EMPTY_ && o.Gateway.Name != o.Cluster.Name {
8,512✔
1101
                if o.Cluster.Name != _EMPTY_ {
364✔
1102
                        return ErrClusterNameConfigConflict
×
1103
                }
×
1104
                // Set this here so we do not consider it dynamic.
1105
                o.Cluster.Name = o.Gateway.Name
364✔
1106
        }
1107
        if l := len(o.Cluster.PinnedAccounts); l > 0 {
12,071✔
1108
                if o.Cluster.PoolSize < 0 {
3,923✔
1109
                        return fmt.Errorf("pool_size cannot be negative if pinned accounts are specified")
×
1110
                }
×
1111
                m := make(map[string]struct{}, l)
3,923✔
1112
                for _, a := range o.Cluster.PinnedAccounts {
7,860✔
1113
                        if _, exists := m[a]; exists {
3,937✔
1114
                                return fmt.Errorf("found duplicate account name %q in pinned accounts list %q", a, o.Cluster.PinnedAccounts)
×
1115
                        }
×
1116
                        m[a] = struct{}{}
3,937✔
1117
                }
1118
        }
1119
        return nil
8,148✔
1120
}
1121

1122
func validatePinnedCerts(pinned PinnedCertSet) error {
18,623✔
1123
        re := regexp.MustCompile("^[a-f0-9]{64}$")
18,623✔
1124
        for certId := range pinned {
18,632✔
1125
                entry := strings.ToLower(certId)
9✔
1126
                if !re.MatchString(entry) {
9✔
1127
                        return fmt.Errorf("error parsing 'pinned_certs' key %s does not look like lower case hex-encoded sha256 of DER encoded SubjectPublicKeyInfo", entry)
×
1128
                }
×
1129
        }
1130
        return nil
18,623✔
1131
}
1132

1133
func validateOptions(o *Options) error {
8,189✔
1134
        if o.LameDuckDuration > 0 && o.LameDuckGracePeriod >= o.LameDuckDuration {
8,189✔
1135
                return fmt.Errorf("lame duck grace period (%v) should be strictly lower than lame duck duration (%v)",
×
1136
                        o.LameDuckGracePeriod, o.LameDuckDuration)
×
1137
        }
×
1138
        if int64(o.MaxPayload) > o.MaxPending {
8,189✔
1139
                return fmt.Errorf("max_payload (%v) cannot be higher than max_pending (%v)",
×
1140
                        o.MaxPayload, o.MaxPending)
×
1141
        }
×
1142
        if o.ServerName != _EMPTY_ && strings.Contains(o.ServerName, " ") {
8,189✔
1143
                return errors.New("server name cannot contain spaces")
×
1144
        }
×
1145
        // Check that the trust configuration is correct.
1146
        if err := validateTrustedOperators(o); err != nil {
8,198✔
1147
                return err
9✔
1148
        }
9✔
1149
        // Check on leaf nodes which will require a system
1150
        // account when gateways are also configured.
1151
        if err := validateLeafNode(o); err != nil {
8,210✔
1152
                return err
30✔
1153
        }
30✔
1154
        // Check that authentication is properly configured.
1155
        if err := validateAuth(o); err != nil {
8,152✔
1156
                return err
2✔
1157
        }
2✔
1158
        // Check that proxies is properly configured.
1159
        if err := validateProxies(o); err != nil {
8,148✔
1160
                return err
×
1161
        }
×
1162
        // Check that gateway is properly configured. Returns no error
1163
        // if there is no gateway defined.
1164
        if err := validateGatewayOptions(o); err != nil {
8,148✔
1165
                return err
×
1166
        }
×
1167
        // Check that cluster name if defined matches any gateway name.
1168
        if err := validateCluster(o); err != nil {
8,148✔
1169
                return err
×
1170
        }
×
1171
        if err := validateMQTTOptions(o); err != nil {
8,153✔
1172
                return err
5✔
1173
        }
5✔
1174
        if err := validateJetStreamOptions(o); err != nil {
8,155✔
1175
                return err
12✔
1176
        }
12✔
1177
        // Finally check websocket options.
1178
        return validateWebsocketOptions(o)
8,131✔
1179
}
1180

1181
func (s *Server) getOpts() *Options {
3,221,613✔
1182
        s.optsMu.RLock()
3,221,613✔
1183
        opts := s.opts
3,221,613✔
1184
        s.optsMu.RUnlock()
3,221,613✔
1185
        return opts
3,221,613✔
1186
}
3,221,613✔
1187

1188
func (s *Server) setOpts(opts *Options) {
1,159✔
1189
        s.optsMu.Lock()
1,159✔
1190
        s.opts = opts
1,159✔
1191
        s.optsMu.Unlock()
1,159✔
1192
}
1,159✔
1193

1194
func (s *Server) globalAccount() *Account {
11,734✔
1195
        s.mu.RLock()
11,734✔
1196
        gacc := s.gacc
11,734✔
1197
        s.mu.RUnlock()
11,734✔
1198
        return gacc
11,734✔
1199
}
11,734✔
1200

1201
// Used to setup or update Accounts.
1202
// Returns a map that indicates which accounts have had their stream imports
1203
// changed (in case of an update in configuration reload).
1204
// Lock is held upon entry, but will be released/reacquired in this function.
1205
func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) {
8,117✔
1206
        awcsti := make(map[string]struct{})
8,117✔
1207

8,117✔
1208
        // Create the global account.
8,117✔
1209
        if s.gacc == nil {
15,098✔
1210
                s.gacc = NewAccount(globalAccountName)
6,981✔
1211
                s.registerAccountNoLock(s.gacc)
6,981✔
1212
        }
6,981✔
1213

1214
        opts := s.getOpts()
8,117✔
1215

8,117✔
1216
        // We need to track service imports since we can not swap them out (unsub and re-sub)
8,117✔
1217
        // until the proper server struct accounts have been swapped in properly. Doing it in
8,117✔
1218
        // place could lead to data loss or server panic since account under new si has no real
8,117✔
1219
        // account and hence no sublist, so will panic on inbound message.
8,117✔
1220
        siMap := make(map[*Account][][]byte)
8,117✔
1221

8,117✔
1222
        // Check opts and walk through them. We need to copy them here
8,117✔
1223
        // so that we do not keep a real one sitting in the options.
8,117✔
1224
        for _, acc := range opts.Accounts {
16,951✔
1225
                var a *Account
8,834✔
1226
                create := true
8,834✔
1227
                // For the global account, we want to skip the reload process
8,834✔
1228
                // and fall back into the "create" case which will in that
8,834✔
1229
                // case really be just an update (shallowCopy will make sure
8,834✔
1230
                // that mappings are copied over).
8,834✔
1231
                if reloading && acc.Name != globalAccountName {
11,995✔
1232
                        if ai, ok := s.accounts.Load(acc.Name); ok {
6,322✔
1233
                                a = ai.(*Account)
3,161✔
1234
                                // Before updating the account, check if stream imports have changed.
3,161✔
1235
                                if !a.checkStreamImportsEqual(acc) {
3,166✔
1236
                                        awcsti[acc.Name] = struct{}{}
5✔
1237
                                }
5✔
1238
                                a.mu.Lock()
3,161✔
1239
                                // Collect the sids for the service imports since we are going to
3,161✔
1240
                                // replace with new ones.
3,161✔
1241
                                var sids [][]byte
3,161✔
1242
                                for _, sis := range a.imports.services {
13,636✔
1243
                                        for _, si := range sis {
20,950✔
1244
                                                if si.sid != nil {
20,950✔
1245
                                                        sids = append(sids, si.sid)
10,475✔
1246
                                                }
10,475✔
1247
                                        }
1248
                                }
1249
                                // Setup to process later if needed.
1250
                                if len(sids) > 0 || len(acc.imports.services) > 0 {
6,322✔
1251
                                        siMap[a] = sids
3,161✔
1252
                                }
3,161✔
1253

1254
                                // Now reset all export/imports fields since they are going to be
1255
                                // filled in shallowCopy()
1256
                                a.imports.streams, a.imports.services = nil, nil
3,161✔
1257
                                a.exports.streams, a.exports.services = nil, nil
3,161✔
1258
                                // We call shallowCopy from the account `acc` (the one in Options)
3,161✔
1259
                                // and pass `a` (our existing account) to get it updated.
3,161✔
1260
                                acc.shallowCopy(a)
3,161✔
1261
                                a.mu.Unlock()
3,161✔
1262
                                create = false
3,161✔
1263
                        }
1264
                }
1265
                // Track old mappings if global account.
1266
                var oldGMappings []*mapping
8,834✔
1267
                if create {
14,507✔
1268
                        if acc.Name == globalAccountName {
5,683✔
1269
                                a = s.gacc
10✔
1270
                                a.mu.Lock()
10✔
1271
                                oldGMappings = append(oldGMappings, a.mappings...)
10✔
1272
                                a.mu.Unlock()
10✔
1273
                        } else {
5,673✔
1274
                                a = NewAccount(acc.Name)
5,663✔
1275
                        }
5,663✔
1276
                        // Locking matters in the case of an update of the global account
1277
                        a.mu.Lock()
5,673✔
1278
                        acc.shallowCopy(a)
5,673✔
1279
                        a.mu.Unlock()
5,673✔
1280
                        // Will be a no-op in case of the global account since it is already registered.
5,673✔
1281
                        s.registerAccountNoLock(a)
5,673✔
1282
                }
1283

1284
                // The `acc` account is stored in options, not in the server, and these can be cleared.
1285
                acc.sl, acc.clients, acc.mappings = nil, nil, nil
8,834✔
1286

8,834✔
1287
                // Check here if we have been reloaded and we have a global account with mappings that may have changed.
8,834✔
1288
                // If we have leafnodes they need to be updated.
8,834✔
1289
                if reloading && a == s.gacc {
8,836✔
1290
                        a.mu.Lock()
2✔
1291
                        mappings := make(map[string]*mapping)
2✔
1292
                        if len(a.mappings) > 0 && a.nleafs > 0 {
4✔
1293
                                for _, em := range a.mappings {
4✔
1294
                                        mappings[em.src] = em
2✔
1295
                                }
2✔
1296
                        }
1297
                        a.mu.Unlock()
2✔
1298
                        if len(mappings) > 0 || len(oldGMappings) > 0 {
4✔
1299
                                a.lmu.RLock()
2✔
1300
                                for _, lc := range a.lleafs {
4✔
1301
                                        for _, em := range mappings {
4✔
1302
                                                lc.forceAddToSmap(em.src)
2✔
1303
                                        }
2✔
1304
                                        // Remove any old ones if needed.
1305
                                        for _, em := range oldGMappings {
4✔
1306
                                                // Only remove if not in the new ones.
2✔
1307
                                                if _, ok := mappings[em.src]; !ok {
3✔
1308
                                                        lc.forceRemoveFromSmap(em.src)
1✔
1309
                                                }
1✔
1310
                                        }
1311
                                }
1312
                                a.lmu.RUnlock()
2✔
1313
                        }
1314
                }
1315

1316
                // If we see an account defined using $SYS we will make sure that is set as system account.
1317
                if acc.Name == DEFAULT_SYSTEM_ACCOUNT && opts.SystemAccount == _EMPTY_ {
12,678✔
1318
                        opts.SystemAccount = DEFAULT_SYSTEM_ACCOUNT
3,844✔
1319
                }
3,844✔
1320
        }
1321

1322
        // Now that we have this we need to remap any referenced accounts in
1323
        // import or export maps to the new ones.
1324
        swapApproved := func(ea *exportAuth) {
9,801✔
1325
                for sub, a := range ea.approved {
1,729✔
1326
                        var acc *Account
45✔
1327
                        if v, ok := s.accounts.Load(a.Name); ok {
90✔
1328
                                acc = v.(*Account)
45✔
1329
                        }
45✔
1330
                        ea.approved[sub] = acc
45✔
1331
                }
1332
        }
1333
        var numAccounts int
8,117✔
1334
        s.accounts.Range(func(k, v any) bool {
25,285✔
1335
                numAccounts++
17,168✔
1336
                acc := v.(*Account)
17,168✔
1337
                acc.mu.Lock()
17,168✔
1338
                // Exports
17,168✔
1339
                for _, se := range acc.exports.streams {
17,314✔
1340
                        if se != nil {
155✔
1341
                                swapApproved(&se.exportAuth)
9✔
1342
                        }
9✔
1343
                }
1344
                for _, se := range acc.exports.services {
18,843✔
1345
                        if se != nil {
3,350✔
1346
                                // Swap over the bound account for service exports.
1,675✔
1347
                                if se.acc != nil {
3,350✔
1348
                                        if v, ok := s.accounts.Load(se.acc.Name); ok {
3,350✔
1349
                                                se.acc = v.(*Account)
1,675✔
1350
                                        }
1,675✔
1351
                                }
1352
                                swapApproved(&se.exportAuth)
1,675✔
1353
                        }
1354
                }
1355
                // Imports
1356
                for _, si := range acc.imports.streams {
17,376✔
1357
                        if v, ok := s.accounts.Load(si.acc.Name); ok {
416✔
1358
                                si.acc = v.(*Account)
208✔
1359
                        }
208✔
1360
                }
1361
                for _, sis := range acc.imports.services {
23,166✔
1362
                        for _, si := range sis {
11,999✔
1363
                                if v, ok := s.accounts.Load(si.acc.Name); ok {
12,002✔
1364
                                        si.acc = v.(*Account)
6,001✔
1365

6,001✔
1366
                                        // It is possible to allow for latency tracking inside your
6,001✔
1367
                                        // own account, so lock only when not the same account.
6,001✔
1368
                                        if si.acc == acc {
6,074✔
1369
                                                si.se = si.acc.getServiceExport(si.to)
73✔
1370
                                                continue
73✔
1371
                                        }
1372
                                        si.acc.mu.RLock()
5,928✔
1373
                                        si.se = si.acc.getServiceExport(si.to)
5,928✔
1374
                                        si.acc.mu.RUnlock()
5,928✔
1375
                                }
1376
                        }
1377
                }
1378
                // Make sure the subs are running, but only if not reloading.
1379
                if len(acc.imports.services) > 0 && acc.ic == nil && !reloading {
17,448✔
1380
                        acc.ic = s.createInternalAccountClient()
280✔
1381
                        acc.ic.acc = acc
280✔
1382
                        // Need to release locks to invoke this function.
280✔
1383
                        acc.mu.Unlock()
280✔
1384
                        s.mu.Unlock()
280✔
1385
                        acc.addAllServiceImportSubs()
280✔
1386
                        s.mu.Lock()
280✔
1387
                        acc.mu.Lock()
280✔
1388
                }
280✔
1389
                acc.updated = time.Now()
17,168✔
1390
                acc.mu.Unlock()
17,168✔
1391
                return true
17,168✔
1392
        })
1393

1394
        // Check if we need to process service imports pending from above.
1395
        // This processing needs to be after we swap in the real accounts above.
1396
        for acc, sids := range siMap {
11,278✔
1397
                c := acc.ic
3,161✔
1398
                for _, sid := range sids {
13,636✔
1399
                        c.processUnsub(sid)
10,475✔
1400
                }
10,475✔
1401
                acc.addAllServiceImportSubs()
3,161✔
1402
                s.mu.Unlock()
3,161✔
1403
                s.registerSystemImports(acc)
3,161✔
1404
                s.mu.Lock()
3,161✔
1405
        }
1406

1407
        // Set the system account if it was configured.
1408
        // Otherwise create a default one.
1409
        if opts.SystemAccount != _EMPTY_ {
13,162✔
1410
                // Lock may be acquired in lookupAccount, so release to call lookupAccount.
5,045✔
1411
                s.mu.Unlock()
5,045✔
1412
                acc, err := s.lookupAccount(opts.SystemAccount)
5,045✔
1413
                s.mu.Lock()
5,045✔
1414
                if err == nil && s.sys != nil && acc != s.sys.account {
5,045✔
1415
                        // sys.account.clients (including internal client)/respmap/etc... are transferred separately
×
1416
                        s.sys.account = acc
×
1417
                        s.sysAcc.Store(acc)
×
1418
                }
×
1419
                if err != nil {
5,045✔
1420
                        return awcsti, fmt.Errorf("error resolving system account: %v", err)
×
1421
                }
×
1422

1423
                // If we have defined a system account here check to see if its just us and the $G account.
1424
                // We would do this to add user/pass to the system account. If this is the case add in
1425
                // no-auth-user for $G.
1426
                // Only do this if non-operator mode and we did not have an authorization block defined.
1427
                if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
8,094✔
1428
                        // If we come here from config reload, let's not recreate the fake user name otherwise
3,049✔
1429
                        // it will cause currently clients to be disconnected.
3,049✔
1430
                        uname := s.sysAccOnlyNoAuthUser
3,049✔
1431
                        if uname == _EMPTY_ {
6,095✔
1432
                                // Create a unique name so we do not collide.
3,046✔
1433
                                var b [8]byte
3,046✔
1434
                                rn := rand.Int63()
3,046✔
1435
                                for i, l := 0, rn; i < len(b); i++ {
27,414✔
1436
                                        b[i] = digits[l%base]
24,368✔
1437
                                        l /= base
24,368✔
1438
                                }
24,368✔
1439
                                uname = fmt.Sprintf("nats-%s", b[:])
3,046✔
1440
                                s.sysAccOnlyNoAuthUser = uname
3,046✔
1441
                        }
1442
                        opts.Users = append(opts.Users, &User{Username: uname, Password: uname[6:], Account: s.gacc})
3,049✔
1443
                        opts.NoAuthUser = uname
3,049✔
1444
                }
1445
        }
1446

1447
        // Add any required exports from system account.
1448
        if s.sys != nil {
9,253✔
1449
                sysAcc := s.sys.account
1,136✔
1450
                s.mu.Unlock()
1,136✔
1451
                s.addSystemAccountExports(sysAcc)
1,136✔
1452
                s.mu.Lock()
1,136✔
1453
        }
1,136✔
1454

1455
        return awcsti, nil
8,117✔
1456
}
1457

1458
// Setup the account resolver. For memory resolver, make sure the JWTs are
1459
// properly formed but do not enforce expiration etc.
1460
// Lock is held on entry, but may be released/reacquired during this call.
1461
func (s *Server) configureResolver() error {
6,991✔
1462
        opts := s.getOpts()
6,991✔
1463
        s.accResolver = opts.AccountResolver
6,991✔
1464
        if opts.AccountResolver != nil {
7,363✔
1465
                // For URL resolver, set the TLSConfig if specified.
372✔
1466
                if opts.AccountResolverTLSConfig != nil {
374✔
1467
                        if ar, ok := opts.AccountResolver.(*URLAccResolver); ok {
4✔
1468
                                if t, ok := ar.c.Transport.(*http.Transport); ok {
4✔
1469
                                        t.CloseIdleConnections()
2✔
1470
                                        t.TLSClientConfig = opts.AccountResolverTLSConfig.Clone()
2✔
1471
                                }
2✔
1472
                        }
1473
                }
1474
                if len(opts.resolverPreloads) > 0 {
544✔
1475
                        // Lock ordering is account resolver -> server, so we need to release
172✔
1476
                        // the lock and reacquire it when done with account resolver's calls.
172✔
1477
                        ar := s.accResolver
172✔
1478
                        s.mu.Unlock()
172✔
1479
                        defer s.mu.Lock()
172✔
1480
                        if ar.IsReadOnly() {
172✔
1481
                                return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
×
1482
                        }
×
1483
                        for k, v := range opts.resolverPreloads {
1,092✔
1484
                                _, err := jwt.DecodeAccountClaims(v)
920✔
1485
                                if err != nil {
920✔
1486
                                        return fmt.Errorf("preload account error for %q: %v", k, err)
×
1487
                                }
×
1488
                                ar.Store(k, v)
920✔
1489
                        }
1490
                }
1491
        }
1492
        return nil
6,991✔
1493
}
1494

1495
// This will check preloads for validation issues.
1496
func (s *Server) checkResolvePreloads() {
173✔
1497
        opts := s.getOpts()
173✔
1498
        // We can just check the read-only opts versions here, that way we do not need
173✔
1499
        // to grab server lock or access s.accResolver.
173✔
1500
        for k, v := range opts.resolverPreloads {
1,093✔
1501
                claims, err := jwt.DecodeAccountClaims(v)
920✔
1502
                if err != nil {
920✔
1503
                        s.Errorf("Preloaded account [%s] not valid", k)
×
1504
                        continue
×
1505
                }
1506
                // Check if it is expired.
1507
                vr := jwt.CreateValidationResults()
920✔
1508
                claims.Validate(vr)
920✔
1509
                if vr.IsBlocking(true) {
921✔
1510
                        s.Warnf("Account [%s] has validation issues:", k)
1✔
1511
                        for _, v := range vr.Issues {
2✔
1512
                                s.Warnf("  - %s", v.Description)
1✔
1513
                        }
1✔
1514
                }
1515
        }
1516
}
1517

1518
// Determines if we are in pre NATS 2.0 setup with no accounts.
1519
func (s *Server) globalAccountOnly() bool {
4,491✔
1520
        var hasOthers bool
4,491✔
1521

4,491✔
1522
        if s.trustedKeys != nil {
4,634✔
1523
                return false
143✔
1524
        }
143✔
1525

1526
        s.mu.RLock()
4,348✔
1527
        s.accounts.Range(func(k, v any) bool {
12,933✔
1528
                acc := v.(*Account)
8,585✔
1529
                // Ignore global and system
8,585✔
1530
                if acc == s.gacc || (s.sys != nil && acc == s.sys.account) {
16,775✔
1531
                        return true
8,190✔
1532
                }
8,190✔
1533
                hasOthers = true
395✔
1534
                return false
395✔
1535
        })
1536
        s.mu.RUnlock()
4,348✔
1537

4,348✔
1538
        return !hasOthers
4,348✔
1539
}
1540

1541
// Determines if this server is in standalone mode, meaning no routes or gateways.
1542
func (s *Server) standAloneMode() bool {
27,722✔
1543
        opts := s.getOpts()
27,722✔
1544
        return opts.Cluster.Port == 0 && opts.Gateway.Port == 0
27,722✔
1545
}
27,722✔
1546

1547
func (s *Server) configuredRoutes() int {
3,416✔
1548
        return len(s.getOpts().Routes)
3,416✔
1549
}
3,416✔
1550

1551
// activePeers is used in bootstrapping raft groups like the JetStream meta controller.
1552
func (s *Server) ActivePeers() (peers []string) {
3,199✔
1553
        s.nodeToInfo.Range(func(k, v any) bool {
6,802✔
1554
                si := v.(nodeInfo)
3,603✔
1555
                if !si.offline {
7,206✔
1556
                        peers = append(peers, k.(string))
3,603✔
1557
                }
3,603✔
1558
                return true
3,603✔
1559
        })
1560
        return peers
3,199✔
1561
}
1562

1563
// isTrustedIssuer will check that the issuer is a trusted public key.
1564
// This is used to make sure an account was signed by a trusted operator.
1565
func (s *Server) isTrustedIssuer(issuer string) bool {
10,518✔
1566
        s.mu.RLock()
10,518✔
1567
        defer s.mu.RUnlock()
10,518✔
1568
        // If we are not running in trusted mode and there is no issuer, that is ok.
10,518✔
1569
        if s.trustedKeys == nil && issuer == _EMPTY_ {
16,372✔
1570
                return true
5,854✔
1571
        }
5,854✔
1572
        for _, tk := range s.trustedKeys {
9,538✔
1573
                if tk == issuer {
9,459✔
1574
                        return true
4,585✔
1575
                }
4,585✔
1576
        }
1577
        return false
79✔
1578
}
1579

1580
// processTrustedKeys will process binary stamped and
1581
// options-based trusted nkeys. Returns success.
1582
func (s *Server) processTrustedKeys() bool {
6,984✔
1583
        s.strictSigningKeyUsage = map[string]struct{}{}
6,984✔
1584
        opts := s.getOpts()
6,984✔
1585
        if trustedKeys != _EMPTY_ && !s.initStampedTrustedKeys() {
6,984✔
1586
                return false
×
1587
        } else if opts.TrustedKeys != nil {
7,401✔
1588
                for _, key := range opts.TrustedKeys {
1,723✔
1589
                        if !nkeys.IsValidPublicOperatorKey(key) {
1,306✔
1590
                                return false
×
1591
                        }
×
1592
                }
1593
                s.trustedKeys = append([]string(nil), opts.TrustedKeys...)
417✔
1594
                for _, claim := range opts.TrustedOperators {
723✔
1595
                        if !claim.StrictSigningKeyUsage {
610✔
1596
                                continue
304✔
1597
                        }
1598
                        for _, key := range claim.SigningKeys {
4✔
1599
                                s.strictSigningKeyUsage[key] = struct{}{}
2✔
1600
                        }
2✔
1601
                }
1602
        }
1603
        return true
6,984✔
1604
}
1605

1606
// checkTrustedKeyString will check that the string is a valid array
1607
// of public operator nkeys.
1608
func checkTrustedKeyString(keys string) []string {
×
1609
        tks := strings.Fields(keys)
×
1610
        if len(tks) == 0 {
×
1611
                return nil
×
1612
        }
×
1613
        // Walk all the keys and make sure they are valid.
1614
        for _, key := range tks {
×
1615
                if !nkeys.IsValidPublicOperatorKey(key) {
×
1616
                        return nil
×
1617
                }
×
1618
        }
1619
        return tks
×
1620
}
1621

1622
// initStampedTrustedKeys will check the stamped trusted keys
1623
// and will set the server field 'trustedKeys'. Returns whether
1624
// it succeeded or not.
1625
func (s *Server) initStampedTrustedKeys() bool {
×
1626
        // Check to see if we have an override in options, which will cause us to fail.
×
1627
        if len(s.getOpts().TrustedKeys) > 0 {
×
1628
                return false
×
1629
        }
×
1630
        tks := checkTrustedKeyString(trustedKeys)
×
1631
        if len(tks) == 0 {
×
1632
                return false
×
1633
        }
×
1634
        s.trustedKeys = tks
×
1635
        return true
×
1636
}
1637

1638
// PrintAndDie is exported for access in other packages.
1639
func PrintAndDie(msg string) {
×
1640
        fmt.Fprintln(os.Stderr, msg)
×
1641
        os.Exit(1)
×
1642
}
×
1643

1644
// PrintServerAndExit will print our version and exit.
1645
func PrintServerAndExit() {
×
1646
        fmt.Printf("nats-server: v%s\n", VERSION)
×
1647
        os.Exit(0)
×
1648
}
×
1649

1650
// ProcessCommandLineArgs takes the command line arguments
1651
// validating and setting flags for handling in case any
1652
// sub command was present.
1653
func ProcessCommandLineArgs(cmd *flag.FlagSet) (showVersion bool, showHelp bool, err error) {
×
1654
        if len(cmd.Args()) > 0 {
×
1655
                arg := cmd.Args()[0]
×
1656
                switch strings.ToLower(arg) {
×
1657
                case "version":
×
1658
                        return true, false, nil
×
1659
                case "help":
×
1660
                        return false, true, nil
×
1661
                default:
×
1662
                        return false, false, fmt.Errorf("unrecognized command: %q", arg)
×
1663
                }
1664
        }
1665

1666
        return false, false, nil
×
1667
}
1668

1669
// Public version.
1670
func (s *Server) Running() bool {
2,440✔
1671
        return s.isRunning()
2,440✔
1672
}
2,440✔
1673

1674
// Protected check on running state
1675
func (s *Server) isRunning() bool {
326,534,349✔
1676
        return s.running.Load()
326,534,349✔
1677
}
326,534,349✔
1678

1679
func (s *Server) logPid() error {
1✔
1680
        pidStr := strconv.Itoa(os.Getpid())
1✔
1681
        return os.WriteFile(s.getOpts().PidFile, []byte(pidStr), defaultFilePerms)
1✔
1682
}
1✔
1683

1684
// numReservedAccounts will return the number of reserved accounts configured in the server.
1685
// Currently this is 1, one for the global default account.
1686
func (s *Server) numReservedAccounts() int {
1✔
1687
        return 1
1✔
1688
}
1✔
1689

1690
// NumActiveAccounts reports number of active accounts on this server.
1691
func (s *Server) NumActiveAccounts() int32 {
×
1692
        return atomic.LoadInt32(&s.activeAccounts)
×
1693
}
×
1694

1695
// incActiveAccounts() just adds one under lock.
1696
func (s *Server) incActiveAccounts() {
23,057✔
1697
        atomic.AddInt32(&s.activeAccounts, 1)
23,057✔
1698
}
23,057✔
1699

1700
// decActiveAccounts() just subtracts one under lock.
1701
func (s *Server) decActiveAccounts() {
12,361✔
1702
        atomic.AddInt32(&s.activeAccounts, -1)
12,361✔
1703
}
12,361✔
1704

1705
// This should be used for testing only. Will be slow since we have to
1706
// range over all accounts in the sync.Map to count.
1707
func (s *Server) numAccounts() int {
6✔
1708
        count := 0
6✔
1709
        s.mu.RLock()
6✔
1710
        s.accounts.Range(func(k, v any) bool {
20✔
1711
                count++
14✔
1712
                return true
14✔
1713
        })
14✔
1714
        s.mu.RUnlock()
6✔
1715
        return count
6✔
1716
}
1717

1718
// NumLoadedAccounts returns the number of loaded accounts.
1719
func (s *Server) NumLoadedAccounts() int {
4✔
1720
        return s.numAccounts()
4✔
1721
}
4✔
1722

1723
// LookupOrRegisterAccount will return the given account if known or create a new entry.
1724
func (s *Server) LookupOrRegisterAccount(name string) (account *Account, isNew bool) {
2,253✔
1725
        s.mu.Lock()
2,253✔
1726
        defer s.mu.Unlock()
2,253✔
1727
        if v, ok := s.accounts.Load(name); ok {
2,253✔
1728
                return v.(*Account), false
×
1729
        }
×
1730
        acc := NewAccount(name)
2,253✔
1731
        s.registerAccountNoLock(acc)
2,253✔
1732
        return acc, true
2,253✔
1733
}
1734

1735
// RegisterAccount will register an account. The account must be new
1736
// or this call will fail.
1737
func (s *Server) RegisterAccount(name string) (*Account, error) {
849✔
1738
        s.mu.Lock()
849✔
1739
        defer s.mu.Unlock()
849✔
1740
        if _, ok := s.accounts.Load(name); ok {
978✔
1741
                return nil, ErrAccountExists
129✔
1742
        }
129✔
1743
        acc := NewAccount(name)
720✔
1744
        s.registerAccountNoLock(acc)
720✔
1745
        return acc, nil
720✔
1746
}
1747

1748
// SetSystemAccount will set the internal system account.
1749
// If root operators are present it will also check validity.
1750
func (s *Server) SetSystemAccount(accName string) error {
6,229✔
1751
        // Lookup from sync.Map first.
6,229✔
1752
        if v, ok := s.accounts.Load(accName); ok {
12,456✔
1753
                return s.setSystemAccount(v.(*Account))
6,227✔
1754
        }
6,227✔
1755

1756
        // If we are here we do not have local knowledge of this account.
1757
        // Do this one by hand to return more useful error.
1758
        ac, jwt, err := s.fetchAccountClaims(accName)
2✔
1759
        if err != nil {
3✔
1760
                return err
1✔
1761
        }
1✔
1762
        acc := s.buildInternalAccount(ac)
1✔
1763
        acc.claimJWT = jwt
1✔
1764
        // Due to race, we need to make sure that we are not
1✔
1765
        // registering twice.
1✔
1766
        if racc := s.registerAccount(acc); racc != nil {
1✔
1767
                return nil
×
1768
        }
×
1769
        return s.setSystemAccount(acc)
1✔
1770
}
1771

1772
// SystemAccount returns the system account if set.
1773
func (s *Server) SystemAccount() *Account {
391,125✔
1774
        return s.sysAcc.Load()
391,125✔
1775
}
391,125✔
1776

1777
// GlobalAccount returns the global account.
1778
// Default clients will use the global account.
1779
func (s *Server) GlobalAccount() *Account {
7,589✔
1780
        s.mu.RLock()
7,589✔
1781
        defer s.mu.RUnlock()
7,589✔
1782
        return s.gacc
7,589✔
1783
}
7,589✔
1784

1785
// SetDefaultSystemAccount will create a default system account if one is not present.
1786
func (s *Server) SetDefaultSystemAccount() error {
2,242✔
1787
        if _, isNew := s.LookupOrRegisterAccount(DEFAULT_SYSTEM_ACCOUNT); !isNew {
2,242✔
1788
                return nil
×
1789
        }
×
1790
        s.Debugf("Created system account: %q", DEFAULT_SYSTEM_ACCOUNT)
2,242✔
1791
        return s.SetSystemAccount(DEFAULT_SYSTEM_ACCOUNT)
2,242✔
1792
}
1793

1794
// Assign a system account. Should only be called once.
1795
// This sets up a server to send and receive messages from
1796
// inside the server itself.
1797
func (s *Server) setSystemAccount(acc *Account) error {
6,248✔
1798
        if acc == nil {
6,248✔
1799
                return ErrMissingAccount
×
1800
        }
×
1801
        // Don't try to fix this here.
1802
        if acc.IsExpired() {
6,248✔
1803
                return ErrAccountExpired
×
1804
        }
×
1805
        // If we are running with trusted keys for an operator
1806
        // make sure we check the account is legit.
1807
        if !s.isTrustedIssuer(acc.Issuer) {
6,320✔
1808
                return ErrAccountValidation
72✔
1809
        }
72✔
1810

1811
        s.mu.Lock()
6,176✔
1812

6,176✔
1813
        if s.sys != nil {
6,178✔
1814
                s.mu.Unlock()
2✔
1815
                return ErrAccountExists
2✔
1816
        }
2✔
1817

1818
        // This is here in an attempt to quiet the race detector and not have to place
1819
        // locks on fast path for inbound messages and checking service imports.
1820
        acc.mu.Lock()
6,174✔
1821
        if acc.imports.services == nil {
12,348✔
1822
                acc.imports.services = make(map[string][]*serviceImport)
6,174✔
1823
        }
6,174✔
1824
        acc.mu.Unlock()
6,174✔
1825

6,174✔
1826
        s.sys = &internal{
6,174✔
1827
                account: acc,
6,174✔
1828
                client:  s.createInternalSystemClient(),
6,174✔
1829
                seq:     1,
6,174✔
1830
                sid:     1,
6,174✔
1831
                servers: make(map[string]*serverUpdate),
6,174✔
1832
                replies: make(map[string]msgHandler),
6,174✔
1833
                sendq:   newIPQueue[*pubMsg](s, "System sendQ"),
6,174✔
1834
                recvq:   newIPQueue[*inSysMsg](s, "System recvQ"),
6,174✔
1835
                recvqp:  newIPQueue[*inSysMsg](s, "System recvQ Pings"),
6,174✔
1836
                resetCh: make(chan struct{}),
6,174✔
1837
                sq:      s.newSendQ(acc),
6,174✔
1838
                statsz:  statsHBInterval,
6,174✔
1839
                orphMax: 5 * eventsHBInterval,
6,174✔
1840
                chkOrph: 3 * eventsHBInterval,
6,174✔
1841
        }
6,174✔
1842
        recvq, recvqp := s.sys.recvq, s.sys.recvqp
6,174✔
1843
        s.sys.wg.Add(1)
6,174✔
1844
        s.mu.Unlock()
6,174✔
1845

6,174✔
1846
        // Store in atomic for fast lookup.
6,174✔
1847
        s.sysAcc.Store(acc)
6,174✔
1848

6,174✔
1849
        // Register with the account.
6,174✔
1850
        s.sys.client.registerWithAccount(acc)
6,174✔
1851

6,174✔
1852
        s.addSystemAccountExports(acc)
6,174✔
1853

6,174✔
1854
        // Start our internal loop to serialize outbound messages.
6,174✔
1855
        // We do our own wg here since we will stop first during shutdown.
6,174✔
1856
        go s.internalSendLoop(&s.sys.wg)
6,174✔
1857

6,174✔
1858
        // Start the internal loop for inbound messages.
6,174✔
1859
        go s.internalReceiveLoop(recvq)
6,174✔
1860
        // Start the internal loop for inbound STATSZ/Ping messages.
6,174✔
1861
        go s.internalReceiveLoop(recvqp)
6,174✔
1862

6,174✔
1863
        // Start up our general subscriptions
6,174✔
1864
        s.initEventTracking()
6,174✔
1865

6,174✔
1866
        // Track for dead remote servers.
6,174✔
1867
        s.wrapChk(s.startRemoteServerSweepTimer)()
6,174✔
1868

6,174✔
1869
        // Send out statsz updates periodically.
6,174✔
1870
        s.wrapChk(s.startStatszTimer)()
6,174✔
1871

6,174✔
1872
        // If we have existing accounts make sure we enable account tracking.
6,174✔
1873
        s.mu.Lock()
6,174✔
1874
        s.accounts.Range(func(k, v any) bool {
20,412✔
1875
                acc := v.(*Account)
14,238✔
1876
                s.enableAccountTracking(acc)
14,238✔
1877
                return true
14,238✔
1878
        })
14,238✔
1879
        s.mu.Unlock()
6,174✔
1880

6,174✔
1881
        return nil
6,174✔
1882
}
1883

1884
// Creates an internal system client.
1885
func (s *Server) createInternalSystemClient() *client {
30,807✔
1886
        return s.createInternalClient(SYSTEM)
30,807✔
1887
}
30,807✔
1888

1889
// Creates an internal jetstream client.
1890
func (s *Server) createInternalJetStreamClient() *client {
47,632✔
1891
        return s.createInternalClient(JETSTREAM)
47,632✔
1892
}
47,632✔
1893

1894
// Creates an internal client for Account.
1895
func (s *Server) createInternalAccountClient() *client {
15,539✔
1896
        return s.createInternalClient(ACCOUNT)
15,539✔
1897
}
15,539✔
1898

1899
// Internal clients. kind should be SYSTEM, JETSTREAM or ACCOUNT
1900
func (s *Server) createInternalClient(kind int) *client {
93,978✔
1901
        if !isInternalClient(kind) {
93,978✔
1902
                return nil
×
1903
        }
×
1904
        now := time.Now()
93,978✔
1905
        c := &client{srv: s, kind: kind, opts: internalOpts, msubs: -1, mpay: -1, start: now, last: now}
93,978✔
1906
        c.initClient()
93,978✔
1907
        c.echo = false
93,978✔
1908
        c.headers = true
93,978✔
1909
        c.flags.set(noReconnect)
93,978✔
1910
        return c
93,978✔
1911
}
1912

1913
// Determine if accounts should track subscriptions for
1914
// efficient propagation.
1915
// Lock should be held on entry.
1916
func (s *Server) shouldTrackSubscriptions() bool {
16,976✔
1917
        opts := s.getOpts()
16,976✔
1918
        return (opts.Cluster.Port != 0 || opts.Gateway.Port != 0)
16,976✔
1919
}
16,976✔
1920

1921
// Invokes registerAccountNoLock under the protection of the server lock.
1922
// That is, server lock is acquired/released in this function.
1923
// See registerAccountNoLock for comment on returned value.
1924
func (s *Server) registerAccount(acc *Account) *Account {
1,358✔
1925
        s.mu.Lock()
1,358✔
1926
        racc := s.registerAccountNoLock(acc)
1,358✔
1927
        s.mu.Unlock()
1,358✔
1928
        return racc
1,358✔
1929
}
1,358✔
1930

1931
// Helper to set the sublist based on preferences.
1932
func (s *Server) setAccountSublist(acc *Account) {
18,334✔
1933
        if acc != nil && acc.sl == nil {
35,376✔
1934
                acc.sl = NewSublistForServer(s)
17,042✔
1935
        }
17,042✔
1936
}
1937

1938
// Registers an account in the server.
1939
// Due to some locking considerations, we may end-up trying
1940
// to register the same account twice. This function will
1941
// then return the already registered account.
1942
// Lock should be held on entry.
1943
func (s *Server) registerAccountNoLock(acc *Account) *Account {
17,052✔
1944
        // We are under the server lock. Lookup from map, if present
17,052✔
1945
        // return existing account.
17,052✔
1946
        if a, _ := s.accounts.Load(acc.Name); a != nil {
17,128✔
1947
                s.tmpAccounts.Delete(acc.Name)
76✔
1948
                return a.(*Account)
76✔
1949
        }
76✔
1950
        // Finish account setup and store.
1951
        s.setAccountSublist(acc)
16,976✔
1952

16,976✔
1953
        acc.mu.Lock()
16,976✔
1954
        s.setRouteInfo(acc)
16,976✔
1955
        if acc.clients == nil {
33,944✔
1956
                acc.clients = make(map[*client]struct{})
16,968✔
1957
        }
16,968✔
1958

1959
        // If we are capable of routing we will track subscription
1960
        // information for efficient interest propagation.
1961
        // During config reload, it is possible that account was
1962
        // already created (global account), so use locking and
1963
        // make sure we create only if needed.
1964
        // TODO(dlc)- Double check that we need this for GWs.
1965
        if acc.rm == nil && s.opts != nil && s.shouldTrackSubscriptions() {
28,332✔
1966
                acc.rm = make(map[string]int32)
11,356✔
1967
                acc.lqws = make(map[string]int32)
11,356✔
1968
        }
11,356✔
1969
        acc.srv = s
16,976✔
1970
        acc.updated = time.Now()
16,976✔
1971
        accName := acc.Name
16,976✔
1972
        jsEnabled := len(acc.jsLimits) > 0
16,976✔
1973
        acc.mu.Unlock()
16,976✔
1974

16,976✔
1975
        if opts := s.getOpts(); opts != nil && len(opts.JsAccDefaultDomain) > 0 {
17,036✔
1976
                if defDomain, ok := opts.JsAccDefaultDomain[accName]; ok {
80✔
1977
                        if jsEnabled {
23✔
1978
                                s.Warnf("Skipping Default Domain %q, set for JetStream enabled account %q", defDomain, accName)
3✔
1979
                        } else if defDomain != _EMPTY_ {
29✔
1980
                                for src, dest := range generateJSMappingTable(defDomain) {
90✔
1981
                                        // flip src and dest around so the domain is inserted
81✔
1982
                                        s.Noticef("Adding default domain mapping %q -> %q to account %q %p", dest, src, accName, acc)
81✔
1983
                                        if err := acc.AddMapping(dest, src); err != nil {
81✔
1984
                                                s.Errorf("Error adding JetStream default domain mapping: %v", err)
×
1985
                                        }
×
1986
                                }
1987
                        }
1988
                }
1989
        }
1990

1991
        s.accounts.Store(acc.Name, acc)
16,976✔
1992
        s.tmpAccounts.Delete(acc.Name)
16,976✔
1993
        s.enableAccountTracking(acc)
16,976✔
1994

16,976✔
1995
        // Can not have server lock here.
16,976✔
1996
        s.mu.Unlock()
16,976✔
1997
        s.registerSystemImports(acc)
16,976✔
1998
        // Starting 2.9.0, we are phasing out the optimistic mode, so change
16,976✔
1999
        // the account to interest-only mode (except if instructed not to do
16,976✔
2000
        // it in some tests).
16,976✔
2001
        if s.gateway.enabled && !gwDoNotForceInterestOnlyMode {
20,010✔
2002
                s.switchAccountToInterestMode(acc.GetName())
3,034✔
2003
        }
3,034✔
2004
        s.mu.Lock()
16,976✔
2005

16,976✔
2006
        return nil
16,976✔
2007
}
2008

2009
// Sets the account's routePoolIdx depending on presence or not of
2010
// pooling or per-account routes. Also updates a map used by
2011
// gateway code to retrieve a route based on some route hash.
2012
//
2013
// Both Server and Account lock held on entry.
2014
func (s *Server) setRouteInfo(acc *Account) {
17,012✔
2015
        // If there is a dedicated route configured for this account
17,012✔
2016
        if _, ok := s.accRoutes[acc.Name]; ok {
20,924✔
2017
                // We want the account name to be in the map, but we don't
3,912✔
2018
                // need a value (we could store empty string)
3,912✔
2019
                s.accRouteByHash.Store(acc.Name, nil)
3,912✔
2020
                // Set the route pool index to -1 so that it is easy when
3,912✔
2021
                // ranging over accounts to exclude those accounts when
3,912✔
2022
                // trying to get accounts for a given pool index.
3,912✔
2023
                acc.routePoolIdx = accDedicatedRoute
3,912✔
2024
        } else {
17,012✔
2025
                // If pool size more than 1, we will compute a hash code and
13,100✔
2026
                // use modulo to assign to an index of the pool slice. For 1
13,100✔
2027
                // and below, all accounts will be bound to the single connection
13,100✔
2028
                // at index 0.
13,100✔
2029
                acc.routePoolIdx = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
13,100✔
2030
                if s.routesPoolSize > 1 {
18,344✔
2031
                        s.accRouteByHash.Store(acc.Name, acc.routePoolIdx)
5,244✔
2032
                }
5,244✔
2033
        }
2034
}
2035

2036
// lookupAccount is a function to return the account structure
2037
// associated with an account name.
2038
// Lock MUST NOT be held upon entry.
2039
func (s *Server) lookupAccount(name string) (*Account, error) {
2,349,427✔
2040
        return s.lookupOrFetchAccount(name, true)
2,349,427✔
2041
}
2,349,427✔
2042

2043
// lookupOrFetchAccount is a function to return the account structure
2044
// associated with an account name.
2045
// Lock MUST NOT be held upon entry.
2046
func (s *Server) lookupOrFetchAccount(name string, fetch bool) (*Account, error) {
2,583,116✔
2047
        var acc *Account
2,583,116✔
2048
        if v, ok := s.accounts.Load(name); ok {
5,163,818✔
2049
                acc = v.(*Account)
2,580,702✔
2050
        }
2,580,702✔
2051
        if acc != nil {
5,163,818✔
2052
                // If we are expired and we have a resolver, then
2,580,702✔
2053
                // return the latest information from the resolver.
2,580,702✔
2054
                if acc.IsExpired() {
2,580,712✔
2055
                        s.Debugf("Requested account [%s] has expired", name)
10✔
2056
                        if s.AccountResolver() != nil && fetch {
20✔
2057
                                if err := s.updateAccount(acc); err != nil {
12✔
2058
                                        // This error could mask expired, so just return expired here.
2✔
2059
                                        return nil, ErrAccountExpired
2✔
2060
                                }
2✔
2061
                        } else {
×
2062
                                return nil, ErrAccountExpired
×
2063
                        }
×
2064
                }
2065
                return acc, nil
2,580,700✔
2066
        }
2067
        // If we have a resolver see if it can fetch the account.
2068
        if s.AccountResolver() == nil || !fetch {
3,521✔
2069
                return nil, ErrMissingAccount
1,107✔
2070
        }
1,107✔
2071
        return s.fetchAccount(name)
1,307✔
2072
}
2073

2074
// LookupAccount is a public function to return the account structure
2075
// associated with name.
2076
func (s *Server) LookupAccount(name string) (*Account, error) {
2,191,157✔
2077
        return s.lookupAccount(name)
2,191,157✔
2078
}
2,191,157✔
2079

2080
// This will fetch new claims and if found update the account with new claims.
2081
// Lock MUST NOT be held upon entry.
2082
func (s *Server) updateAccount(acc *Account) error {
10✔
2083
        acc.mu.RLock()
10✔
2084
        // TODO(dlc) - Make configurable
10✔
2085
        if !acc.incomplete && time.Since(acc.updated) < time.Second {
10✔
2086
                acc.mu.RUnlock()
×
2087
                s.Debugf("Requested account update for [%s] ignored, too soon", acc.Name)
×
2088
                return ErrAccountResolverUpdateTooSoon
×
2089
        }
×
2090
        acc.mu.RUnlock()
10✔
2091
        claimJWT, err := s.fetchRawAccountClaims(acc.Name)
10✔
2092
        if err != nil {
12✔
2093
                return err
2✔
2094
        }
2✔
2095
        return s.updateAccountWithClaimJWT(acc, claimJWT)
8✔
2096
}
2097

2098
// updateAccountWithClaimJWT will check and apply the claim update.
2099
// Lock MUST NOT be held upon entry.
2100
func (s *Server) updateAccountWithClaimJWT(acc *Account, claimJWT string) error {
264✔
2101
        if acc == nil {
264✔
2102
                return ErrMissingAccount
×
2103
        }
×
2104
        acc.mu.RLock()
264✔
2105
        sameClaim := acc.claimJWT != _EMPTY_ && acc.claimJWT == claimJWT && !acc.incomplete
264✔
2106
        acc.mu.RUnlock()
264✔
2107
        if sameClaim {
377✔
2108
                s.Debugf("Requested account update for [%s], same claims detected", acc.Name)
113✔
2109
                return nil
113✔
2110
        }
113✔
2111
        accClaims, _, err := s.verifyAccountClaims(claimJWT)
151✔
2112
        if err == nil && accClaims != nil {
302✔
2113
                acc.mu.Lock()
151✔
2114
                // if an account is updated with a different operator signing key, we want to
151✔
2115
                // show a consistent issuer.
151✔
2116
                acc.Issuer = accClaims.Issuer
151✔
2117
                if acc.Name != accClaims.Subject {
152✔
2118
                        acc.mu.Unlock()
1✔
2119
                        return ErrAccountValidation
1✔
2120
                }
1✔
2121
                acc.mu.Unlock()
150✔
2122
                s.UpdateAccountClaims(acc, accClaims)
150✔
2123
                acc.mu.Lock()
150✔
2124
                // needs to be set after update completed.
150✔
2125
                // This causes concurrent calls to return with sameClaim=true if the change is effective.
150✔
2126
                acc.claimJWT = claimJWT
150✔
2127
                acc.mu.Unlock()
150✔
2128
                return nil
150✔
2129
        }
2130
        return err
×
2131
}
2132

2133
// fetchRawAccountClaims will grab raw account claims iff we have a resolver.
2134
// Lock is NOT held upon entry.
2135
func (s *Server) fetchRawAccountClaims(name string) (string, error) {
1,441✔
2136
        accResolver := s.AccountResolver()
1,441✔
2137
        if accResolver == nil {
1,441✔
2138
                return _EMPTY_, ErrNoAccountResolver
×
2139
        }
×
2140
        // Need to do actual Fetch
2141
        start := time.Now()
1,441✔
2142
        claimJWT, err := fetchAccount(accResolver, name)
1,441✔
2143
        fetchTime := time.Since(start)
1,441✔
2144
        if fetchTime > time.Second {
1,445✔
2145
                s.Warnf("Account [%s] fetch took %v", name, fetchTime)
4✔
2146
        } else {
1,441✔
2147
                s.Debugf("Account [%s] fetch took %v", name, fetchTime)
1,437✔
2148
        }
1,437✔
2149
        if err != nil {
1,485✔
2150
                s.Warnf("Account fetch failed: %v", err)
44✔
2151
                return "", err
44✔
2152
        }
44✔
2153
        return claimJWT, nil
1,397✔
2154
}
2155

2156
// fetchAccountClaims will attempt to fetch new claims if a resolver is present.
2157
// Lock is NOT held upon entry.
2158
func (s *Server) fetchAccountClaims(name string) (*jwt.AccountClaims, string, error) {
1,431✔
2159
        claimJWT, err := s.fetchRawAccountClaims(name)
1,431✔
2160
        if err != nil {
1,473✔
2161
                return nil, _EMPTY_, err
42✔
2162
        }
42✔
2163
        var claim *jwt.AccountClaims
1,389✔
2164
        claim, claimJWT, err = s.verifyAccountClaims(claimJWT)
1,389✔
2165
        if claim != nil && claim.Subject != name {
1,389✔
2166
                return nil, _EMPTY_, ErrAccountValidation
×
2167
        }
×
2168
        return claim, claimJWT, err
1,389✔
2169
}
2170

2171
// verifyAccountClaims will decode and validate any account claims.
2172
func (s *Server) verifyAccountClaims(claimJWT string) (*jwt.AccountClaims, string, error) {
1,819✔
2173
        accClaims, err := jwt.DecodeAccountClaims(claimJWT)
1,819✔
2174
        if err != nil {
1,820✔
2175
                return nil, _EMPTY_, err
1✔
2176
        }
1✔
2177
        if !s.isTrustedIssuer(accClaims.Issuer) {
1,825✔
2178
                return nil, _EMPTY_, ErrAccountValidation
7✔
2179
        }
7✔
2180
        vr := jwt.CreateValidationResults()
1,811✔
2181
        accClaims.Validate(vr)
1,811✔
2182
        if vr.IsBlocking(true) {
1,817✔
2183
                return nil, _EMPTY_, ErrAccountValidation
6✔
2184
        }
6✔
2185
        return accClaims, claimJWT, nil
1,805✔
2186
}
2187

2188
// This will fetch an account from a resolver if defined.
2189
// Lock is NOT held upon entry.
2190
func (s *Server) fetchAccount(name string) (*Account, error) {
1,410✔
2191
        accClaims, claimJWT, err := s.fetchAccountClaims(name)
1,410✔
2192
        if accClaims == nil {
1,463✔
2193
                return nil, err
53✔
2194
        }
53✔
2195
        acc := s.buildInternalAccount(accClaims)
1,357✔
2196
        // Due to possible race, if registerAccount() returns a non
1,357✔
2197
        // nil account, it means the same account was already
1,357✔
2198
        // registered and we should use this one.
1,357✔
2199
        if racc := s.registerAccount(acc); racc != nil {
1,423✔
2200
                // Update with the new claims in case they are new.
66✔
2201
                if err = s.updateAccountWithClaimJWT(racc, claimJWT); err != nil {
66✔
2202
                        return nil, err
×
2203
                }
×
2204
                return racc, nil
66✔
2205
        }
2206
        // The sub imports may have been setup but will not have had their
2207
        // subscriptions properly setup. Do that here.
2208
        var needImportSubs bool
1,291✔
2209

1,291✔
2210
        acc.mu.Lock()
1,291✔
2211
        acc.claimJWT = claimJWT
1,291✔
2212
        if len(acc.imports.services) > 0 {
2,213✔
2213
                if acc.ic == nil {
1,671✔
2214
                        acc.ic = s.createInternalAccountClient()
749✔
2215
                        acc.ic.acc = acc
749✔
2216
                }
749✔
2217
                needImportSubs = true
922✔
2218
        }
2219
        acc.mu.Unlock()
1,291✔
2220

1,291✔
2221
        // Do these outside the lock.
1,291✔
2222
        if needImportSubs {
2,213✔
2223
                acc.addAllServiceImportSubs()
922✔
2224
        }
922✔
2225

2226
        return acc, nil
1,291✔
2227
}
2228

2229
// Start up the server, this will not block.
2230
//
2231
// WaitForShutdown can be used to block and wait for the server to shutdown properly if needed
2232
// after calling s.Shutdown()
2233
func (s *Server) Start() {
6,850✔
2234
        s.Noticef("Starting nats-server")
6,850✔
2235

6,850✔
2236
        gc := gitCommit
6,850✔
2237
        if gc == _EMPTY_ {
13,700✔
2238
                gc = "not set"
6,850✔
2239
        }
6,850✔
2240

2241
        // Snapshot server options.
2242
        opts := s.getOpts()
6,850✔
2243

6,850✔
2244
        // Capture if this server is a leaf that has no cluster, so we don't
6,850✔
2245
        // display the cluster name if that is the case.
6,850✔
2246
        s.mu.RLock()
6,850✔
2247
        leafNoCluster := s.leafNoCluster
6,850✔
2248
        s.mu.RUnlock()
6,850✔
2249

6,850✔
2250
        var clusterName string
6,850✔
2251
        if !leafNoCluster {
12,685✔
2252
                clusterName = s.ClusterName()
5,835✔
2253
        }
5,835✔
2254

2255
        s.Noticef("  Version:  %s", VERSION)
6,850✔
2256
        s.Noticef("  Git:      [%s]", gc)
6,850✔
2257
        s.Debugf("  Go build: %s", s.info.GoVersion)
6,850✔
2258
        if clusterName != _EMPTY_ {
11,760✔
2259
                s.Noticef("  Cluster:  %s", clusterName)
4,910✔
2260
        }
4,910✔
2261
        s.Noticef("  Name:     %s", s.info.Name)
6,850✔
2262
        if opts.JetStream {
11,312✔
2263
                s.Noticef("  Node:     %s", getHash(s.info.Name))
4,462✔
2264
        }
4,462✔
2265
        s.Noticef("  ID:       %s", s.info.ID)
6,850✔
2266
        s.printFeatureFlags(opts)
6,850✔
2267

6,850✔
2268
        defer s.Noticef("Server is ready")
6,850✔
2269

6,850✔
2270
        // Check for insecure configurations.
6,850✔
2271
        s.checkAuthforWarnings()
6,850✔
2272

6,850✔
2273
        // Avoid RACE between Start() and Shutdown()
6,850✔
2274
        s.running.Store(true)
6,850✔
2275
        s.mu.Lock()
6,850✔
2276
        // Update leafNodeEnabled in case options have changed post NewServer()
6,850✔
2277
        // and before Start() (we should not be able to allow that, but server has
6,850✔
2278
        // direct reference to user-provided options - at least before a Reload() is
6,850✔
2279
        // performed.
6,850✔
2280
        s.leafNodeEnabled = opts.LeafNode.Port != 0 || len(opts.LeafNode.Remotes) > 0
6,850✔
2281
        s.mu.Unlock()
6,850✔
2282

6,850✔
2283
        s.grMu.Lock()
6,850✔
2284
        s.grRunning = true
6,850✔
2285
        s.grMu.Unlock()
6,850✔
2286

6,850✔
2287
        s.startRateLimitLogExpiration()
6,850✔
2288

6,850✔
2289
        // Pprof http endpoint for the profiler.
6,850✔
2290
        if opts.ProfPort != 0 {
6,851✔
2291
                s.StartProfiler()
1✔
2292
        } else {
6,850✔
2293
                // It's still possible to access this profile via a SYS endpoint, so set
6,849✔
2294
                // this anyway. (Otherwise StartProfiler would have called it.)
6,849✔
2295
                s.setBlockProfileRate(opts.ProfBlockRate)
6,849✔
2296
        }
6,849✔
2297

2298
        if opts.ConfigFile != _EMPTY_ {
11,795✔
2299
                var cd string
4,945✔
2300
                if opts.configDigest != "" {
9,890✔
2301
                        cd = fmt.Sprintf("(%s)", opts.configDigest)
4,945✔
2302
                }
4,945✔
2303
                s.Noticef("Using configuration file: %s %s", opts.ConfigFile, cd)
4,945✔
2304
        }
2305

2306
        hasOperators := len(opts.TrustedOperators) > 0
6,850✔
2307
        if hasOperators {
7,152✔
2308
                s.Noticef("Trusted Operators")
302✔
2309
        }
302✔
2310
        for _, opc := range opts.TrustedOperators {
7,152✔
2311
                s.Noticef("  System  : %q", opc.Audience)
302✔
2312
                s.Noticef("  Operator: %q", opc.Name)
302✔
2313
                s.Noticef("  Issued  : %v", time.Unix(opc.IssuedAt, 0))
302✔
2314
                switch opc.Expires {
302✔
2315
                case 0:
12✔
2316
                        s.Noticef("  Expires : Never")
12✔
2317
                default:
290✔
2318
                        s.Noticef("  Expires : %v", time.Unix(opc.Expires, 0))
290✔
2319
                }
2320
        }
2321
        if hasOperators && opts.SystemAccount == _EMPTY_ {
6,888✔
2322
                s.Warnf("Trusted Operators should utilize a System Account")
38✔
2323
        }
38✔
2324
        if opts.MaxPayload > MAX_PAYLOAD_MAX_SIZE {
6,852✔
2325
                s.Warnf("Maximum payloads over %v are generally discouraged and could lead to poor performance",
2✔
2326
                        friendlyBytes(int64(MAX_PAYLOAD_MAX_SIZE)))
2✔
2327
        }
2✔
2328

2329
        if len(opts.JsAccDefaultDomain) > 0 {
6,870✔
2330
                s.Warnf("The option `default_js_domain` is a temporary backwards compatibility measure and will be removed")
20✔
2331
        }
20✔
2332

2333
        // If we have a memory resolver, check the accounts here for validation exceptions.
2334
        // This allows them to be logged right away vs when they are accessed via a client.
2335
        if hasOperators && len(opts.resolverPreloads) > 0 {
7,015✔
2336
                s.checkResolvePreloads()
165✔
2337
        }
165✔
2338

2339
        // Log the pid to a file.
2340
        if opts.PidFile != _EMPTY_ {
6,851✔
2341
                if err := s.logPid(); err != nil {
1✔
2342
                        s.Fatalf("Could not write pidfile: %v", err)
×
2343
                        return
×
2344
                }
×
2345
        }
2346

2347
        // Setup system account which will start the eventing stack.
2348
        if sa := opts.SystemAccount; sa != _EMPTY_ {
10,831✔
2349
                if err := s.SetSystemAccount(sa); err != nil {
3,981✔
2350
                        s.Fatalf("Can't set system account: %v", err)
×
2351
                        return
×
2352
                }
×
2353
        } else if !opts.NoSystemAccount {
5,110✔
2354
                // We will create a default system account here.
2,241✔
2355
                s.SetDefaultSystemAccount()
2,241✔
2356
        }
2,241✔
2357

2358
        // Start monitoring before enabling other subsystems of the
2359
        // server to be able to monitor during startup.
2360
        if err := s.StartMonitoring(); err != nil {
6,851✔
2361
                s.Fatalf("Can't start monitoring: %v", err)
1✔
2362
                return
1✔
2363
        }
1✔
2364

2365
        // Start up resolver machinery.
2366
        if ar := s.AccountResolver(); ar != nil {
7,208✔
2367
                if err := ar.Start(s); err != nil {
359✔
2368
                        s.Fatalf("Could not start resolver: %v", err)
×
2369
                        return
×
2370
                }
×
2371
                // In operator mode, when the account resolver depends on an external system and
2372
                // the system account is the bootstrapping account, start fetching it.
2373
                if len(opts.TrustedOperators) == 1 && opts.SystemAccount != _EMPTY_ && opts.SystemAccount != DEFAULT_SYSTEM_ACCOUNT {
623✔
2374
                        opts := s.getOpts()
264✔
2375
                        _, isMemResolver := ar.(*MemAccResolver)
264✔
2376
                        if v, ok := s.accounts.Load(opts.SystemAccount); !isMemResolver && ok && v.(*Account).claimJWT == _EMPTY_ {
331✔
2377
                                s.Noticef("Using bootstrapping system account")
67✔
2378
                                s.startGoRoutine(func() {
134✔
2379
                                        defer s.grWG.Done()
67✔
2380
                                        t := time.NewTicker(time.Second)
67✔
2381
                                        defer t.Stop()
67✔
2382
                                        for {
186✔
2383
                                                select {
119✔
2384
                                                case <-s.quitCh:
44✔
2385
                                                        return
44✔
2386
                                                case <-t.C:
75✔
2387
                                                        sacc := s.SystemAccount()
75✔
2388
                                                        if claimJWT, err := fetchAccount(ar, opts.SystemAccount); err != nil {
127✔
2389
                                                                continue
52✔
2390
                                                        } else if err = s.updateAccountWithClaimJWT(sacc, claimJWT); err != nil {
23✔
2391
                                                                continue
×
2392
                                                        }
2393
                                                        s.Noticef("System account fetched and updated")
23✔
2394
                                                        return
23✔
2395
                                                }
2396
                                        }
2397
                                })
2398
                        }
2399
                }
2400
        }
2401

2402
        // Start expiration of mapped GW replies, regardless if
2403
        // this server is configured with gateway or not.
2404
        s.startGWReplyMapExpiration()
6,849✔
2405

6,849✔
2406
        // Check if JetStream has been enabled. This needs to be after
6,849✔
2407
        // the system account setup above. JetStream will create its
6,849✔
2408
        // own system account if one is not present.
6,849✔
2409
        if opts.JetStream {
11,311✔
2410
                // Make sure someone is not trying to enable on the system account.
4,462✔
2411
                if sa := s.SystemAccount(); sa != nil && len(sa.jsLimits) > 0 {
4,462✔
2412
                        s.Fatalf("Not allowed to enable JetStream on the system account")
×
2413
                }
×
2414
                cfg := &JetStreamConfig{
4,462✔
2415
                        StoreDir:     opts.StoreDir,
4,462✔
2416
                        SyncInterval: opts.SyncInterval,
4,462✔
2417
                        SyncAlways:   opts.SyncAlways,
4,462✔
2418
                        Strict:       !opts.NoJetStreamStrict,
4,462✔
2419
                        MaxMemory:    opts.JetStreamMaxMemory,
4,462✔
2420
                        MaxStore:     opts.JetStreamMaxStore,
4,462✔
2421
                        Domain:       opts.JetStreamDomain,
4,462✔
2422
                        CompressOK:   true,
4,462✔
2423
                        UniqueTag:    opts.JetStreamUniqueTag,
4,462✔
2424
                }
4,462✔
2425
                if err := s.EnableJetStream(cfg); err != nil {
4,464✔
2426
                        s.Fatalf("Can't start JetStream: %v", err)
2✔
2427
                        return
2✔
2428
                }
2✔
2429
        } else {
2,387✔
2430
                // Check to see if any configured accounts have JetStream enabled.
2,387✔
2431
                sa, ga := s.SystemAccount(), s.GlobalAccount()
2,387✔
2432
                var hasSys, hasGlobal bool
2,387✔
2433
                var total int
2,387✔
2434

2,387✔
2435
                s.accounts.Range(func(k, v any) bool {
7,777✔
2436
                        total++
5,390✔
2437
                        acc := v.(*Account)
5,390✔
2438
                        if acc == sa {
7,077✔
2439
                                hasSys = true
1,687✔
2440
                        } else if acc == ga {
7,777✔
2441
                                hasGlobal = true
2,387✔
2442
                        }
2,387✔
2443
                        acc.mu.RLock()
5,390✔
2444
                        hasJs := len(acc.jsLimits) > 0
5,390✔
2445
                        acc.mu.RUnlock()
5,390✔
2446
                        if hasJs {
5,415✔
2447
                                s.checkJetStreamExports()
25✔
2448
                                acc.enableAllJetStreamServiceImportsAndMappings()
25✔
2449
                        }
25✔
2450
                        return true
5,390✔
2451
                })
2452
                // If we only have the system account and the global account and we are not standalone,
2453
                // go ahead and enable JS on $G in case we are in simple mixed mode setup.
2454
                if total == 2 && hasSys && hasGlobal && !s.standAloneMode() {
2,928✔
2455
                        ga.mu.Lock()
541✔
2456
                        ga.jsLimits = map[string]JetStreamAccountLimits{
541✔
2457
                                _EMPTY_: dynamicJSAccountLimits,
541✔
2458
                        }
541✔
2459
                        ga.mu.Unlock()
541✔
2460
                        s.checkJetStreamExports()
541✔
2461
                        ga.enableAllJetStreamServiceImportsAndMappings()
541✔
2462
                }
541✔
2463
        }
2464

2465
        // Delayed API response handling. Start regardless of JetStream being
2466
        // currently configured or not (since it can be enabled/disabled with
2467
        // configuration reload).
2468
        s.startGoRoutine(s.delayedAPIResponder)
6,847✔
2469

6,847✔
2470
        // Start OCSP Stapling monitoring for TLS certificates if enabled. Hook TLS handshake for
6,847✔
2471
        // OCSP check on peers (LEAF and CLIENT kind) if enabled.
6,847✔
2472
        s.startOCSPMonitoring()
6,847✔
2473

6,847✔
2474
        // Configure OCSP Response Cache for peer OCSP checks if enabled.
6,847✔
2475
        s.initOCSPResponseCache()
6,847✔
2476

6,847✔
2477
        // Start up gateway if needed. Do this before starting the routes, because
6,847✔
2478
        // we want to resolve the gateway host:port so that this information can
6,847✔
2479
        // be sent to other routes.
6,847✔
2480
        if opts.Gateway.Port != 0 {
8,010✔
2481
                s.startGateways()
1,163✔
2482
        }
1,163✔
2483

2484
        // Start websocket server if needed. Do this before starting the routes, and
2485
        // leaf node because we want to resolve the gateway host:port so that this
2486
        // information can be sent to other routes.
2487
        if opts.Websocket.Port != 0 {
6,969✔
2488
                s.startWebsocketServer()
122✔
2489
        }
122✔
2490

2491
        // Start up listen if we want to accept leaf node connections.
2492
        if opts.LeafNode.Port != 0 {
10,805✔
2493
                // Will resolve or assign the advertise address for the leafnode listener.
3,958✔
2494
                // We need that in StartRouting().
3,958✔
2495
                s.startLeafNodeAcceptLoop()
3,958✔
2496
        }
3,958✔
2497

2498
        // Solicit remote servers for leaf node connections.
2499
        if len(opts.LeafNode.Remotes) > 0 {
8,124✔
2500
                s.solicitLeafNodeRemotes(opts.LeafNode.Remotes)
1,277✔
2501
        }
1,277✔
2502

2503
        // TODO (ik): I wanted to refactor this by starting the client
2504
        // accept loop first, that is, it would resolve listen spec
2505
        // in place, but start the accept-for-loop in a different go
2506
        // routine. This would get rid of the synchronization between
2507
        // this function and StartRouting, which I also would have wanted
2508
        // to refactor, but both AcceptLoop() and StartRouting() have
2509
        // been exported and not sure if that would break users using them.
2510
        // We could mark them as deprecated and remove in a release or two...
2511

2512
        // The Routing routine needs to wait for the client listen
2513
        // port to be opened and potential ephemeral port selected.
2514
        clientListenReady := make(chan struct{})
6,847✔
2515

6,847✔
2516
        // MQTT
6,847✔
2517
        if opts.MQTT.Port != 0 {
7,116✔
2518
                s.startMQTT()
269✔
2519
        }
269✔
2520

2521
        // Start up routing as well if needed.
2522
        if opts.Cluster.Port != 0 {
11,525✔
2523
                s.startGoRoutine(func() {
9,356✔
2524
                        s.StartRouting(clientListenReady)
4,678✔
2525
                })
4,678✔
2526
        }
2527

2528
        if opts.PortsFileDir != _EMPTY_ {
6,849✔
2529
                s.logPorts()
2✔
2530
        }
2✔
2531

2532
        if opts.TLSRateLimit > 0 {
6,848✔
2533
                s.startGoRoutine(s.logRejectedTLSConns)
1✔
2534
        }
1✔
2535

2536
        // We've finished starting up.
2537
        close(s.startupComplete)
6,847✔
2538

6,847✔
2539
        // Wait for clients.
6,847✔
2540
        if !opts.DontListen {
13,694✔
2541
                s.AcceptLoop(clientListenReady)
6,847✔
2542
        }
6,847✔
2543

2544
        // Bring OSCP Response cache online after accept loop started in anticipation of NATS-enabled cache types
2545
        s.startOCSPResponseCache()
6,847✔
2546
}
2547

2548
func (s *Server) isShuttingDown() bool {
534,034✔
2549
        return s.shutdown.Load()
534,034✔
2550
}
534,034✔
2551

2552
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
2553
// and closing all associated clients.
2554
func (s *Server) Shutdown() {
7,353✔
2555
        if s == nil {
7,357✔
2556
                return
4✔
2557
        }
4✔
2558
        // This is for JetStream R1 Pull Consumers to allow signaling
2559
        // that pending pull requests are invalid.
2560
        s.signalPullConsumers()
7,349✔
2561

7,349✔
2562
        // Transfer off any raft nodes that we are a leader by stepping them down.
7,349✔
2563
        s.stepdownRaftNodes()
7,349✔
2564

7,349✔
2565
        // Shutdown the eventing system as needed.
7,349✔
2566
        // This is done first to send out any messages for
7,349✔
2567
        // account status. We will also clean up any
7,349✔
2568
        // eventing items associated with accounts.
7,349✔
2569
        s.shutdownEventing()
7,349✔
2570

7,349✔
2571
        // Prevent issues with multiple calls.
7,349✔
2572
        if s.isShuttingDown() {
7,787✔
2573
                return
438✔
2574
        }
438✔
2575

2576
        s.mu.Lock()
6,911✔
2577
        s.Noticef("Initiating Shutdown...")
6,911✔
2578

6,911✔
2579
        accRes := s.accResolver
6,911✔
2580

6,911✔
2581
        opts := s.getOpts()
6,911✔
2582

6,911✔
2583
        s.shutdown.Store(true)
6,911✔
2584
        s.running.Store(false)
6,911✔
2585
        s.grMu.Lock()
6,911✔
2586
        s.grRunning = false
6,911✔
2587
        s.grMu.Unlock()
6,911✔
2588
        s.mu.Unlock()
6,911✔
2589

6,911✔
2590
        if accRes != nil {
7,324✔
2591
                accRes.Close()
413✔
2592
        }
413✔
2593

2594
        // Now check and shutdown jetstream.
2595
        s.shutdownJetStream()
6,911✔
2596

6,911✔
2597
        // Now shutdown the nodes
6,911✔
2598
        s.shutdownRaftNodes()
6,911✔
2599

6,911✔
2600
        s.mu.Lock()
6,911✔
2601
        conns := make(map[uint64]*client)
6,911✔
2602

6,911✔
2603
        // Copy off the clients
6,911✔
2604
        for i, c := range s.clients {
8,263✔
2605
                conns[i] = c
1,352✔
2606
        }
1,352✔
2607
        // Copy off the connections that are not yet registered
2608
        // in s.routes, but for which the readLoop has started
2609
        s.grMu.Lock()
6,911✔
2610
        for i, c := range s.grTmpClients {
6,930✔
2611
                conns[i] = c
19✔
2612
        }
19✔
2613
        s.grMu.Unlock()
6,911✔
2614
        // Copy off the routes
6,911✔
2615
        s.forEachRoute(func(r *client) {
24,343✔
2616
                r.mu.Lock()
17,432✔
2617
                conns[r.cid] = r
17,432✔
2618
                r.mu.Unlock()
17,432✔
2619
        })
17,432✔
2620
        // Copy off the gateways
2621
        s.getAllGatewayConnections(conns)
6,911✔
2622

6,911✔
2623
        // Copy off the leaf nodes
6,911✔
2624
        for i, c := range s.leafs {
7,572✔
2625
                conns[i] = c
661✔
2626
        }
661✔
2627

2628
        // Number of done channel responses we expect.
2629
        doneExpected := 0
6,911✔
2630

6,911✔
2631
        // Kick client AcceptLoop()
6,911✔
2632
        if s.listener != nil {
13,746✔
2633
                doneExpected++
6,835✔
2634
                s.listener.Close()
6,835✔
2635
                s.listener = nil
6,835✔
2636
        }
6,835✔
2637

2638
        // Kick websocket server
2639
        doneExpected += s.closeWebsocketServer()
6,911✔
2640

6,911✔
2641
        // Kick MQTT accept loop
6,911✔
2642
        if s.mqtt.listener != nil {
7,179✔
2643
                doneExpected++
268✔
2644
                s.mqtt.listener.Close()
268✔
2645
                s.mqtt.listener = nil
268✔
2646
        }
268✔
2647

2648
        // Kick leafnodes AcceptLoop()
2649
        if s.leafNodeListener != nil {
10,869✔
2650
                doneExpected++
3,958✔
2651
                s.leafNodeListener.Close()
3,958✔
2652
                s.leafNodeListener = nil
3,958✔
2653
        }
3,958✔
2654

2655
        // Kick route AcceptLoop()
2656
        if s.routeListener != nil {
11,584✔
2657
                doneExpected++
4,673✔
2658
                s.routeListener.Close()
4,673✔
2659
                s.routeListener = nil
4,673✔
2660
        }
4,673✔
2661

2662
        // Kick Gateway AcceptLoop()
2663
        if s.gatewayListener != nil {
8,071✔
2664
                doneExpected++
1,160✔
2665
                s.gatewayListener.Close()
1,160✔
2666
                s.gatewayListener = nil
1,160✔
2667
        }
1,160✔
2668

2669
        // Kick HTTP monitoring if its running
2670
        if s.http != nil {
7,986✔
2671
                doneExpected++
1,075✔
2672
                s.http.Close()
1,075✔
2673
                s.http = nil
1,075✔
2674
        }
1,075✔
2675

2676
        // Kick Profiling if its running
2677
        if s.profiler != nil {
6,913✔
2678
                doneExpected++
2✔
2679
                s.profiler.Close()
2✔
2680
        }
2✔
2681

2682
        s.mu.Unlock()
6,911✔
2683

6,911✔
2684
        // Release go routines that wait on that channel
6,911✔
2685
        close(s.quitCh)
6,911✔
2686

6,911✔
2687
        // Close client and route connections
6,911✔
2688
        for _, c := range conns {
28,059✔
2689
                c.setNoReconnect()
21,148✔
2690
                c.closeConnection(ServerShutdown)
21,148✔
2691
        }
21,148✔
2692

2693
        // Block until the accept loops exit
2694
        for doneExpected > 0 {
25,004✔
2695
                <-s.done
18,093✔
2696
                doneExpected--
18,093✔
2697
        }
18,093✔
2698

2699
        // Wait for go routines to be done.
2700
        s.grWG.Wait()
6,911✔
2701

6,911✔
2702
        if opts.PortsFileDir != _EMPTY_ {
6,913✔
2703
                s.deletePortsFile(opts.PortsFileDir)
2✔
2704
        }
2✔
2705

2706
        s.Noticef("Server Exiting..")
6,911✔
2707

6,911✔
2708
        // Stop OCSP Response Cache
6,911✔
2709
        if s.ocsprc != nil {
6,961✔
2710
                s.ocsprc.Stop(s)
50✔
2711
        }
50✔
2712

2713
        // Close logger if applicable. It allows tests on Windows
2714
        // to be able to do proper cleanup (delete log file).
2715
        s.logging.RLock()
6,911✔
2716
        log := s.logging.logger
6,911✔
2717
        s.logging.RUnlock()
6,911✔
2718
        if log != nil {
7,443✔
2719
                if l, ok := log.(*logger.Logger); ok {
542✔
2720
                        l.Close()
10✔
2721
                }
10✔
2722
        }
2723
        // Notify that the shutdown is complete
2724
        close(s.shutdownComplete)
6,911✔
2725
}
2726

2727
// Close the websocket server if running. If so, returns 1, else 0.
2728
// Server lock held on entry.
2729
func (s *Server) closeWebsocketServer() int {
6,917✔
2730
        ws := &s.websocket
6,917✔
2731
        ws.mu.Lock()
6,917✔
2732
        hs := ws.server
6,917✔
2733
        if hs != nil {
7,039✔
2734
                ws.server = nil
122✔
2735
                ws.listener = nil
122✔
2736
        }
122✔
2737
        ws.mu.Unlock()
6,917✔
2738
        if hs != nil {
7,039✔
2739
                hs.Close()
122✔
2740
                return 1
122✔
2741
        }
122✔
2742
        return 0
6,795✔
2743
}
2744

2745
// WaitForShutdown will block until the server has been fully shutdown.
2746
func (s *Server) WaitForShutdown() {
3,408✔
2747
        <-s.shutdownComplete
3,408✔
2748
}
3,408✔
2749

2750
// AcceptLoop is exported for easier testing.
2751
func (s *Server) AcceptLoop(clr chan struct{}) {
6,847✔
2752
        // If we were to exit before the listener is setup properly,
6,847✔
2753
        // make sure we close the channel.
6,847✔
2754
        defer func() {
13,694✔
2755
                if clr != nil {
6,848✔
2756
                        close(clr)
1✔
2757
                }
1✔
2758
        }()
2759

2760
        if s.isShuttingDown() {
6,848✔
2761
                return
1✔
2762
        }
1✔
2763

2764
        // Snapshot server options.
2765
        opts := s.getOpts()
6,846✔
2766

6,846✔
2767
        // Setup state that can enable shutdown
6,846✔
2768
        s.mu.Lock()
6,846✔
2769
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
6,846✔
2770
        l, e := s.getServerListener(hp)
6,846✔
2771
        s.listenerErr = e
6,846✔
2772
        if e != nil {
6,846✔
2773
                s.mu.Unlock()
×
2774
                s.Fatalf("Error listening on port: %s, %q", hp, e)
×
2775
                return
×
2776
        }
×
2777
        s.Noticef("Listening for client connections on %s",
6,846✔
2778
                net.JoinHostPort(opts.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
6,846✔
2779

6,846✔
2780
        // Alert if PROXY protocol is enabled
6,846✔
2781
        if opts.ProxyProtocol {
6,849✔
2782
                s.Noticef("PROXY protocol enabled for client connections")
3✔
2783
        }
3✔
2784

2785
        // Alert of TLS enabled.
2786
        if opts.TLSConfig != nil {
7,004✔
2787
                s.Noticef("TLS required for client connections")
158✔
2788
                if opts.TLSHandshakeFirst && opts.TLSHandshakeFirstFallback == 0 {
164✔
2789
                        s.Warnf("Clients that are not using \"TLS Handshake First\" option will fail to connect")
6✔
2790
                }
6✔
2791
        }
2792

2793
        // If server was started with RANDOM_PORT (-1), opts.Port would be equal
2794
        // to 0 at the beginning this function. So we need to get the actual port
2795
        if opts.Port == 0 {
13,156✔
2796
                // Write resolved port back to options.
6,310✔
2797
                opts.Port = l.Addr().(*net.TCPAddr).Port
6,310✔
2798
        }
6,310✔
2799

2800
        // Now that port has been set (if it was set to RANDOM), set the
2801
        // server's info Host/Port with either values from Options or
2802
        // ClientAdvertise.
2803
        if err := s.setInfoHostPort(); err != nil {
6,846✔
2804
                s.Fatalf("Error setting server INFO with ClientAdvertise value of %s, err=%v", opts.ClientAdvertise, err)
×
2805
                l.Close()
×
2806
                s.mu.Unlock()
×
2807
                return
×
2808
        }
×
2809
        // Keep track of client connect URLs. We may need them later.
2810
        s.clientConnectURLs = s.getClientConnectURLs()
6,846✔
2811
        s.listener = l
6,846✔
2812

6,846✔
2813
        go s.acceptConnections(l, "Client", func(conn net.Conn) { s.createClient(conn) },
16,583✔
2814
                func(_ error) bool {
6,841✔
2815
                        if s.isLameDuckMode() {
6,847✔
2816
                                // Signal that we are not accepting new clients
6✔
2817
                                s.ldmCh <- true
6✔
2818
                                // Now wait for the Shutdown...
6✔
2819
                                <-s.quitCh
6✔
2820
                                return true
6✔
2821
                        }
6✔
2822
                        return false
6,835✔
2823
                })
2824
        s.mu.Unlock()
6,846✔
2825

6,846✔
2826
        // Let the caller know that we are ready
6,846✔
2827
        close(clr)
6,846✔
2828
        clr = nil
6,846✔
2829
}
2830

2831
// getServerListener returns a network listener for the given host-port address.
2832
// If the Server already has an active listener (s.listener), it returns that listener
2833
// along with any previous error (s.listenerErr). Otherwise, it creates and returns
2834
// a new TCP listener on the specified address using natsListen.
2835
func (s *Server) getServerListener(hp string) (net.Listener, error) {
6,846✔
2836
        if s.listener != nil {
6,846✔
2837
                return s.listener, s.listenerErr
×
2838
        }
×
2839

2840
        return natsListen("tcp", hp)
6,846✔
2841
}
2842

2843
// InProcessConn returns an in-process connection to the server,
2844
// avoiding the need to use a TCP listener for local connectivity
2845
// within the same process. This can be used regardless of the
2846
// state of the DontListen option.
2847
func (s *Server) InProcessConn() (net.Conn, error) {
8✔
2848
        pl, pr := net.Pipe()
8✔
2849
        if !s.startGoRoutine(func() {
16✔
2850
                s.createClientInProcess(pl)
8✔
2851
                s.grWG.Done()
8✔
2852
        }) {
8✔
2853
                pl.Close()
×
2854
                pr.Close()
×
2855
                return nil, fmt.Errorf("failed to create connection")
×
2856
        }
×
2857
        return pr, nil
8✔
2858
}
2859

2860
func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc func(conn net.Conn), errFunc func(err error) bool) {
16,907✔
2861
        tmpDelay := ACCEPT_MIN_SLEEP
16,907✔
2862

16,907✔
2863
        for {
79,720✔
2864
                conn, err := l.Accept()
62,813✔
2865
                if err != nil {
79,713✔
2866
                        if errFunc != nil && errFunc(err) {
16,906✔
2867
                                return
6✔
2868
                        }
6✔
2869
                        if tmpDelay = s.acceptError(acceptName, err, tmpDelay); tmpDelay < 0 {
33,788✔
2870
                                break
16,894✔
2871
                        }
2872
                        continue
×
2873
                }
2874
                tmpDelay = ACCEPT_MIN_SLEEP
45,906✔
2875
                if !s.startGoRoutine(func() {
91,797✔
2876
                        s.reloadMu.RLock()
45,891✔
2877
                        createFunc(conn)
45,891✔
2878
                        s.reloadMu.RUnlock()
45,891✔
2879
                        s.grWG.Done()
45,891✔
2880
                }) {
45,906✔
2881
                        conn.Close()
15✔
2882
                }
15✔
2883
        }
2884
        s.Debugf(acceptName + " accept loop exiting..")
16,894✔
2885
        s.done <- true
16,894✔
2886
}
2887

2888
// This function sets the server's info Host/Port based on server Options.
2889
// Note that this function may be called during config reload, this is why
2890
// Host/Port may be reset to original Options if the ClientAdvertise option
2891
// is not set (since it may have previously been).
2892
func (s *Server) setInfoHostPort() error {
13,828✔
2893
        // When this function is called, opts.Port is set to the actual listen
13,828✔
2894
        // port (if option was originally set to RANDOM), even during a config
13,828✔
2895
        // reload. So use of s.opts.Port is safe.
13,828✔
2896
        opts := s.getOpts()
13,828✔
2897
        if opts.ClientAdvertise != _EMPTY_ {
13,830✔
2898
                h, p, err := parseHostPort(opts.ClientAdvertise, opts.Port)
2✔
2899
                if err != nil {
2✔
2900
                        return err
×
2901
                }
×
2902
                s.info.Host = h
2✔
2903
                s.info.Port = p
2✔
2904
        } else {
13,826✔
2905
                s.info.Host = opts.Host
13,826✔
2906
                s.info.Port = opts.Port
13,826✔
2907
        }
13,826✔
2908
        return nil
13,828✔
2909
}
2910

2911
// StartProfiler is called to enable dynamic profiling.
2912
func (s *Server) StartProfiler() {
2✔
2913
        if s.isShuttingDown() {
2✔
2914
                return
×
2915
        }
×
2916

2917
        // Snapshot server options.
2918
        opts := s.getOpts()
2✔
2919

2✔
2920
        port := opts.ProfPort
2✔
2921

2✔
2922
        // Check for Random Port
2✔
2923
        if port == -1 {
3✔
2924
                port = 0
1✔
2925
        }
1✔
2926

2927
        s.mu.Lock()
2✔
2928
        hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
2✔
2929
        l, err := net.Listen("tcp", hp)
2✔
2930

2✔
2931
        if err != nil {
2✔
2932
                s.mu.Unlock()
×
2933
                s.Fatalf("error starting profiler: %s", err)
×
2934
                return
×
2935
        }
×
2936
        s.Noticef("profiling port: %d", l.Addr().(*net.TCPAddr).Port)
2✔
2937

2✔
2938
        srv := &http.Server{
2✔
2939
                Addr:           hp,
2✔
2940
                Handler:        http.DefaultServeMux,
2✔
2941
                MaxHeaderBytes: 1 << 20,
2✔
2942
                ReadTimeout:    time.Second * 5,
2✔
2943
        }
2✔
2944
        s.profiler = l
2✔
2945
        s.profilingServer = srv
2✔
2946

2✔
2947
        s.setBlockProfileRate(opts.ProfBlockRate)
2✔
2948

2✔
2949
        go func() {
4✔
2950
                // if this errors out, it's probably because the server is being shutdown
2✔
2951
                err := srv.Serve(l)
2✔
2952
                if err != nil {
4✔
2953
                        if !s.isShuttingDown() {
2✔
2954
                                s.Fatalf("error starting profiler: %s", err)
×
2955
                        }
×
2956
                }
2957
                srv.Close()
2✔
2958
                s.done <- true
2✔
2959
        }()
2960
        s.mu.Unlock()
2✔
2961
}
2962

2963
func (s *Server) setBlockProfileRate(rate int) {
6,851✔
2964
        // Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
6,851✔
2965
        runtime.SetBlockProfileRate(rate)
6,851✔
2966

6,851✔
2967
        if rate > 0 {
6,851✔
2968
                s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
×
2969
        }
×
2970
}
2971

2972
// StartHTTPMonitoring will enable the HTTP monitoring port.
2973
// DEPRECATED: Should use StartMonitoring.
2974
func (s *Server) StartHTTPMonitoring() {
×
2975
        s.startMonitoring(false)
×
2976
}
×
2977

2978
// StartHTTPSMonitoring will enable the HTTPS monitoring port.
2979
// DEPRECATED: Should use StartMonitoring.
2980
func (s *Server) StartHTTPSMonitoring() {
×
2981
        s.startMonitoring(true)
×
2982
}
×
2983

2984
// StartMonitoring starts the HTTP or HTTPs server if needed.
2985
func (s *Server) StartMonitoring() error {
6,854✔
2986
        // Snapshot server options.
6,854✔
2987
        opts := s.getOpts()
6,854✔
2988

6,854✔
2989
        // Specifying both HTTP and HTTPS ports is a misconfiguration
6,854✔
2990
        if opts.HTTPPort != 0 && opts.HTTPSPort != 0 {
6,855✔
2991
                return fmt.Errorf("can't specify both HTTP (%v) and HTTPs (%v) ports", opts.HTTPPort, opts.HTTPSPort)
1✔
2992
        }
1✔
2993
        var err error
6,853✔
2994
        if opts.HTTPPort != 0 {
7,917✔
2995
                err = s.startMonitoring(false)
1,064✔
2996
        } else if opts.HTTPSPort != 0 {
6,868✔
2997
                if opts.TLSConfig == nil {
17✔
2998
                        return fmt.Errorf("TLS cert and key required for HTTPS")
2✔
2999
                }
2✔
3000
                err = s.startMonitoring(true)
13✔
3001
        }
3002
        return err
6,851✔
3003
}
3004

3005
// HTTP endpoints
3006
const (
3007
        RootPath         = "/"
3008
        VarzPath         = "/varz"
3009
        ConnzPath        = "/connz"
3010
        RoutezPath       = "/routez"
3011
        GatewayzPath     = "/gatewayz"
3012
        LeafzPath        = "/leafz"
3013
        SubszPath        = "/subsz"
3014
        StackszPath      = "/stacksz"
3015
        AccountzPath     = "/accountz"
3016
        AccountStatzPath = "/accstatz"
3017
        JszPath          = "/jsz"
3018
        HealthzPath      = "/healthz"
3019
        IPQueuesPath     = "/ipqueuesz"
3020
        RaftzPath        = "/raftz"
3021
        ExpvarzPath      = "/debug/vars"
3022
)
3023

3024
func (s *Server) basePath(p string) string {
17,248✔
3025
        return path.Join(s.httpBasePath, p)
17,248✔
3026
}
17,248✔
3027

3028
type captureHTTPServerLog struct {
3029
        s      *Server
3030
        prefix string
3031
}
3032

3033
func (cl *captureHTTPServerLog) Write(p []byte) (int, error) {
3✔
3034
        var buf [128]byte
3✔
3035
        var b = buf[:0]
3✔
3036

3✔
3037
        b = append(b, []byte(cl.prefix)...)
3✔
3038
        offset := 0
3✔
3039
        if bytes.HasPrefix(p, []byte("http:")) {
6✔
3040
                offset = 6
3✔
3041
        }
3✔
3042
        b = append(b, p[offset:]...)
3✔
3043
        cl.s.Errorf(string(b))
3✔
3044
        return len(p), nil
3✔
3045
}
3046

3047
// The TLS configuration is passed to the listener when the monitoring
3048
// "server" is setup. That prevents TLS configuration updates on reload
3049
// from being used. By setting this function in tls.Config.GetConfigForClient
3050
// we instruct the TLS handshake to ask for the tls configuration to be
3051
// used for a specific client. We don't care which client, we always use
3052
// the same TLS configuration.
3053
func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, error) {
4✔
3054
        opts := s.getOpts()
4✔
3055
        tc := opts.TLSConfig.Clone()
4✔
3056
        tc.ClientAuth = tls.NoClientCert
4✔
3057
        return tc, nil
4✔
3058
}
4✔
3059

3060
// Start the monitoring server
3061
func (s *Server) startMonitoring(secure bool) error {
1,077✔
3062
        if s.isShuttingDown() {
1,078✔
3063
                return nil
1✔
3064
        }
1✔
3065

3066
        // Snapshot server options.
3067
        opts := s.getOpts()
1,076✔
3068

1,076✔
3069
        var (
1,076✔
3070
                hp           string
1,076✔
3071
                err          error
1,076✔
3072
                httpListener net.Listener
1,076✔
3073
                port         int
1,076✔
3074
        )
1,076✔
3075

1,076✔
3076
        monitorProtocol := "http"
1,076✔
3077

1,076✔
3078
        if secure {
1,089✔
3079
                monitorProtocol += "s"
13✔
3080
                port = opts.HTTPSPort
13✔
3081
                if port == -1 {
17✔
3082
                        port = 0
4✔
3083
                }
4✔
3084
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
13✔
3085
                config := opts.TLSConfig.Clone()
13✔
3086
                if !s.ocspPeerVerify {
25✔
3087
                        config.GetConfigForClient = s.getMonitoringTLSConfig
12✔
3088
                        config.ClientAuth = tls.NoClientCert
12✔
3089
                }
12✔
3090
                httpListener, err = tls.Listen("tcp", hp, config)
13✔
3091

3092
        } else {
1,063✔
3093
                port = opts.HTTPPort
1,063✔
3094
                if port == -1 {
1,930✔
3095
                        port = 0
867✔
3096
                }
867✔
3097
                hp = net.JoinHostPort(opts.HTTPHost, strconv.Itoa(port))
1,063✔
3098
                httpListener, err = net.Listen("tcp", hp)
1,063✔
3099
        }
3100

3101
        if err != nil {
1,077✔
3102
                return fmt.Errorf("can't listen to the monitor port: %v", err)
1✔
3103
        }
1✔
3104

3105
        rport := httpListener.Addr().(*net.TCPAddr).Port
1,075✔
3106
        s.Noticef("Starting %s monitor on %s", monitorProtocol, net.JoinHostPort(opts.HTTPHost, strconv.Itoa(rport)))
1,075✔
3107

1,075✔
3108
        mux := http.NewServeMux()
1,075✔
3109

1,075✔
3110
        // Root
1,075✔
3111
        mux.HandleFunc(s.basePath(RootPath), s.HandleRoot)
1,075✔
3112
        // Varz
1,075✔
3113
        mux.HandleFunc(s.basePath(VarzPath), s.HandleVarz)
1,075✔
3114
        // Connz
1,075✔
3115
        mux.HandleFunc(s.basePath(ConnzPath), s.HandleConnz)
1,075✔
3116
        // Routez
1,075✔
3117
        mux.HandleFunc(s.basePath(RoutezPath), s.HandleRoutez)
1,075✔
3118
        // Gatewayz
1,075✔
3119
        mux.HandleFunc(s.basePath(GatewayzPath), s.HandleGatewayz)
1,075✔
3120
        // Leafz
1,075✔
3121
        mux.HandleFunc(s.basePath(LeafzPath), s.HandleLeafz)
1,075✔
3122
        // Subz
1,075✔
3123
        mux.HandleFunc(s.basePath(SubszPath), s.HandleSubsz)
1,075✔
3124
        // Subz alias for backwards compatibility
1,075✔
3125
        mux.HandleFunc(s.basePath("/subscriptionsz"), s.HandleSubsz)
1,075✔
3126
        // Stacksz
1,075✔
3127
        mux.HandleFunc(s.basePath(StackszPath), s.HandleStacksz)
1,075✔
3128
        // Accountz
1,075✔
3129
        mux.HandleFunc(s.basePath(AccountzPath), s.HandleAccountz)
1,075✔
3130
        // Accstatz
1,075✔
3131
        mux.HandleFunc(s.basePath(AccountStatzPath), s.HandleAccountStatz)
1,075✔
3132
        // Jsz
1,075✔
3133
        mux.HandleFunc(s.basePath(JszPath), s.HandleJsz)
1,075✔
3134
        // Healthz
1,075✔
3135
        mux.HandleFunc(s.basePath(HealthzPath), s.HandleHealthz)
1,075✔
3136
        // IPQueuesz
1,075✔
3137
        mux.HandleFunc(s.basePath(IPQueuesPath), s.HandleIPQueuesz)
1,075✔
3138
        // Raftz
1,075✔
3139
        mux.HandleFunc(s.basePath(RaftzPath), s.HandleRaftz)
1,075✔
3140
        // Expvarz
1,075✔
3141
        mux.Handle(s.basePath(ExpvarzPath), expvar.Handler())
1,075✔
3142

1,075✔
3143
        // Do not set a WriteTimeout because it could cause cURL/browser
1,075✔
3144
        // to return empty response or unable to display page if the
1,075✔
3145
        // server needs more time to build the response.
1,075✔
3146
        srv := &http.Server{
1,075✔
3147
                Addr:              hp,
1,075✔
3148
                Handler:           mux,
1,075✔
3149
                MaxHeaderBytes:    1 << 20,
1,075✔
3150
                ErrorLog:          log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
1,075✔
3151
                ReadHeaderTimeout: time.Second * 5,
1,075✔
3152
        }
1,075✔
3153
        s.mu.Lock()
1,075✔
3154
        s.http = httpListener
1,075✔
3155
        s.httpHandler = mux
1,075✔
3156
        s.monitoringServer = srv
1,075✔
3157
        s.mu.Unlock()
1,075✔
3158

1,075✔
3159
        go func() {
2,150✔
3160
                if err := srv.Serve(httpListener); err != nil {
2,150✔
3161
                        if !s.isShuttingDown() {
1,075✔
3162
                                s.Fatalf("Error starting monitor on %q: %v", hp, err)
×
3163
                        }
×
3164
                }
3165
                srv.Close()
1,075✔
3166
                s.mu.Lock()
1,075✔
3167
                s.httpHandler = nil
1,075✔
3168
                s.mu.Unlock()
1,075✔
3169
                s.done <- true
1,075✔
3170
        }()
3171

3172
        return nil
1,075✔
3173
}
3174

3175
// HTTPHandler returns the http.Handler object used to handle monitoring
3176
// endpoints. It will return nil if the server is not configured for
3177
// monitoring, or if the server has not been started yet (Server.Start()).
3178
func (s *Server) HTTPHandler() http.Handler {
2✔
3179
        s.mu.Lock()
2✔
3180
        defer s.mu.Unlock()
2✔
3181
        return s.httpHandler
2✔
3182
}
2✔
3183

3184
// Perform a conditional deep copy due to reference nature of [Client|WS]ConnectURLs.
3185
// If updates are made to Info, this function should be consulted and updated.
3186
// Assume lock is held.
3187
func (s *Server) copyInfo() Info {
19,166✔
3188
        info := s.info
19,166✔
3189
        if len(info.ClientConnectURLs) > 0 {
30,553✔
3190
                info.ClientConnectURLs = append([]string(nil), s.info.ClientConnectURLs...)
11,387✔
3191
        }
11,387✔
3192
        if len(info.WSConnectURLs) > 0 {
19,248✔
3193
                info.WSConnectURLs = append([]string(nil), s.info.WSConnectURLs...)
82✔
3194
        }
82✔
3195
        return info
19,166✔
3196
}
3197

3198
// tlsMixConn is used when we can receive both TLS and non-TLS connections on same port.
3199
type tlsMixConn struct {
3200
        net.Conn
3201
        pre *bytes.Buffer
3202
}
3203

3204
// Read for our mixed multi-reader.
3205
func (c *tlsMixConn) Read(b []byte) (int, error) {
50✔
3206
        if c.pre != nil {
59✔
3207
                n, err := c.pre.Read(b)
9✔
3208
                if c.pre.Len() == 0 {
18✔
3209
                        c.pre = nil
9✔
3210
                }
9✔
3211
                return n, err
9✔
3212
        }
3213
        return c.Conn.Read(b)
41✔
3214
}
3215

3216
func (s *Server) createClient(conn net.Conn) *client {
9,953✔
3217
        return s.createClientEx(conn, false)
9,953✔
3218
}
9,953✔
3219

3220
func (s *Server) createClientInProcess(conn net.Conn) *client {
8✔
3221
        return s.createClientEx(conn, true)
8✔
3222
}
8✔
3223

3224
func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
9,961✔
3225
        // Snapshot server options.
9,961✔
3226
        opts := s.getOpts()
9,961✔
3227

9,961✔
3228
        maxPay := int32(opts.MaxPayload)
9,961✔
3229
        maxSubs := int32(opts.MaxSubs)
9,961✔
3230
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
9,961✔
3231
        if maxSubs == 0 {
19,921✔
3232
                maxSubs = -1
9,960✔
3233
        }
9,960✔
3234
        now := time.Now()
9,961✔
3235

9,961✔
3236
        c := &client{
9,961✔
3237
                srv:   s,
9,961✔
3238
                nc:    conn,
9,961✔
3239
                opts:  defaultOpts,
9,961✔
3240
                mpay:  maxPay,
9,961✔
3241
                msubs: maxSubs,
9,961✔
3242
                start: now,
9,961✔
3243
                last:  now,
9,961✔
3244
                iproc: inProcess,
9,961✔
3245
        }
9,961✔
3246

9,961✔
3247
        c.registerWithAccount(s.globalAccount())
9,961✔
3248

9,961✔
3249
        var info Info
9,961✔
3250
        var authRequired bool
9,961✔
3251

9,961✔
3252
        s.mu.Lock()
9,961✔
3253
        // Grab JSON info string
9,961✔
3254
        info = s.copyInfo()
9,961✔
3255
        if s.nonceRequired() {
12,459✔
3256
                // Nonce handling
2,498✔
3257
                var raw [nonceLen]byte
2,498✔
3258
                nonce := raw[:]
2,498✔
3259
                s.generateNonce(nonce)
2,498✔
3260
                info.Nonce = string(nonce)
2,498✔
3261
        }
2,498✔
3262
        c.nonce = []byte(info.Nonce)
9,961✔
3263
        authRequired = info.AuthRequired
9,961✔
3264

9,961✔
3265
        // Check to see if we have auth_required set but we also have a no_auth_user.
9,961✔
3266
        // If so set back to false.
9,961✔
3267
        if info.AuthRequired && opts.NoAuthUser != _EMPTY_ && opts.NoAuthUser != s.sysAccOnlyNoAuthUser {
10,143✔
3268
                info.AuthRequired = false
182✔
3269
        }
182✔
3270

3271
        // Check to see if this is an in-process connection with tls_required.
3272
        // If so, set as not required, but available.
3273
        if inProcess && info.TLSRequired {
9,965✔
3274
                info.TLSRequired = false
4✔
3275
                info.TLSAvailable = true
4✔
3276
        }
4✔
3277

3278
        s.totalClients++
9,961✔
3279
        s.mu.Unlock()
9,961✔
3280

9,961✔
3281
        // Grab lock
9,961✔
3282
        c.mu.Lock()
9,961✔
3283
        if authRequired {
17,268✔
3284
                c.flags.set(expectConnect)
7,307✔
3285
        }
7,307✔
3286

3287
        // Initialize
3288
        c.initClient()
9,961✔
3289

9,961✔
3290
        c.Debugf("Client connection created")
9,961✔
3291

9,961✔
3292
        // Save info.TLSRequired value since we may neeed to change it back and forth.
9,961✔
3293
        orgInfoTLSReq := info.TLSRequired
9,961✔
3294

9,961✔
3295
        var tlsFirstFallback time.Duration
9,961✔
3296
        // Check if we should do TLS first.
9,961✔
3297
        tlsFirst := opts.TLSConfig != nil && opts.TLSHandshakeFirst
9,961✔
3298
        if tlsFirst {
9,975✔
3299
                // Make sure info.TLSRequired is set to true (it could be false
14✔
3300
                // if AllowNonTLS is enabled).
14✔
3301
                info.TLSRequired = true
14✔
3302
                // Get the fallback delay value if applicable.
14✔
3303
                if f := opts.TLSHandshakeFirstFallback; f > 0 {
19✔
3304
                        tlsFirstFallback = f
5✔
3305
                } else if inProcess {
17✔
3306
                        // For in-process connection, we will always have a fallback
3✔
3307
                        // delay. It allows support for non-TLS, TLS and "TLS First"
3✔
3308
                        // in-process clients to successfully connect.
3✔
3309
                        tlsFirstFallback = DEFAULT_TLS_HANDSHAKE_FIRST_FALLBACK_DELAY
3✔
3310
                }
3✔
3311
        }
3312

3313
        // Decide if we are going to require TLS or not and generate INFO json.
3314
        // If we have ProxyProtocol enabled then we won't include the client
3315
        // IP in the initial INFO, as that would leak the proxy IP itself.
3316
        // In that case we'll send another INFO after the client introduces itself.
3317
        tlsRequired := info.TLSRequired
9,961✔
3318
        infoBytes := c.generateClientInfoJSON(info, !opts.ProxyProtocol)
9,961✔
3319

9,961✔
3320
        // Send our information, except if TLS and TLSHandshakeFirst is requested.
9,961✔
3321
        if !tlsFirst {
19,908✔
3322
                // Need to be sent in place since writeLoop cannot be started until
9,947✔
3323
                // TLS handshake is done (if applicable).
9,947✔
3324
                c.sendProtoNow(infoBytes)
9,947✔
3325
        }
9,947✔
3326

3327
        // Unlock to register
3328
        c.mu.Unlock()
9,961✔
3329

9,961✔
3330
        // Register with the server.
9,961✔
3331
        s.mu.Lock()
9,961✔
3332
        // If server is not running, Shutdown() may have already gathered the
9,961✔
3333
        // list of connections to close. It won't contain this one, so we need
9,961✔
3334
        // to bail out now otherwise the readLoop started down there would not
9,961✔
3335
        // be interrupted. Skip also if in lame duck mode.
9,961✔
3336
        if !s.isRunning() || s.ldm {
10,157✔
3337
                // There are some tests that create a server but don't start it,
196✔
3338
                // and use "async" clients and perform the parsing manually. Such
196✔
3339
                // clients would branch here (since server is not running). However,
196✔
3340
                // when a server was really running and has been shutdown, we must
196✔
3341
                // close this connection.
196✔
3342
                if s.isShuttingDown() {
198✔
3343
                        conn.Close()
2✔
3344
                }
2✔
3345
                s.mu.Unlock()
196✔
3346
                return c
196✔
3347
        }
3348

3349
        // If there is a max connections specified, check that adding
3350
        // this new client would not push us over the max
3351
        if opts.MaxConn < 0 || (opts.MaxConn > 0 && len(s.clients) >= opts.MaxConn) {
9,765✔
3352
                s.mu.Unlock()
×
3353
                c.maxConnExceeded()
×
3354
                return nil
×
3355
        }
×
3356
        s.clients[c.cid] = c
9,765✔
3357

9,765✔
3358
        s.mu.Unlock()
9,765✔
3359

9,765✔
3360
        // Re-Grab lock
9,765✔
3361
        c.mu.Lock()
9,765✔
3362

9,765✔
3363
        isClosed := c.isClosed()
9,765✔
3364
        var pre []byte
9,765✔
3365
        // We need first to check for "TLS First" fallback delay.
9,765✔
3366
        if !isClosed && tlsFirstFallback > 0 {
9,773✔
3367
                // We wait and see if we are getting any data. Since we did not send
8✔
3368
                // the INFO protocol yet, only clients that use TLS first should be
8✔
3369
                // sending data (the TLS handshake). We don't really check the content:
8✔
3370
                // if it is a rogue agent and not an actual client performing the
8✔
3371
                // TLS handshake, the error will be detected when performing the
8✔
3372
                // handshake on our side.
8✔
3373
                pre = make([]byte, 4)
8✔
3374
                c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
8✔
3375
                n, _ := io.ReadFull(c.nc, pre[:])
8✔
3376
                c.nc.SetReadDeadline(time.Time{})
8✔
3377
                // If we get any data (regardless of possible timeout), we will proceed
8✔
3378
                // with the TLS handshake.
8✔
3379
                if n > 0 {
11✔
3380
                        pre = pre[:n]
3✔
3381
                } else {
8✔
3382
                        // We did not get anything so we will send the INFO protocol.
5✔
3383
                        pre = nil
5✔
3384

5✔
3385
                        // Restore the original info.TLSRequired value if it is
5✔
3386
                        // different that the current value and regenerate infoBytes.
5✔
3387
                        if orgInfoTLSReq != info.TLSRequired {
9✔
3388
                                info.TLSRequired = orgInfoTLSReq
4✔
3389
                                infoBytes = c.generateClientInfoJSON(info, !opts.ProxyProtocol)
4✔
3390
                        }
4✔
3391
                        c.sendProtoNow(infoBytes)
5✔
3392
                        // Set the boolean to false for the rest of the function.
5✔
3393
                        tlsFirst = false
5✔
3394
                        // Check closed status again
5✔
3395
                        isClosed = c.isClosed()
5✔
3396
                }
3397
        }
3398
        // If we have both TLS and non-TLS allowed we need to see which
3399
        // one the client wants. We'll always allow this for in-process
3400
        // connections.
3401
        if !isClosed && !tlsFirst && opts.TLSConfig != nil && (inProcess || opts.AllowNonTLS) {
9,774✔
3402
                pre = make([]byte, 6) // Minimum 6 bytes for proxy proto in next step.
9✔
3403
                c.nc.SetReadDeadline(time.Now().Add(secondsToDuration(opts.TLSTimeout)))
9✔
3404
                n, _ := io.ReadFull(c.nc, pre[:])
9✔
3405
                c.nc.SetReadDeadline(time.Time{})
9✔
3406
                pre = pre[:n]
9✔
3407
                if n > 0 && pre[0] == 0x16 {
12✔
3408
                        tlsRequired = true
3✔
3409
                } else {
9✔
3410
                        tlsRequired = false
6✔
3411
                }
6✔
3412
        }
3413

3414
        // Check for proxy protocol if enabled.
3415
        if !isClosed && !tlsRequired && opts.ProxyProtocol {
9,768✔
3416
                if len(pre) == 0 {
6✔
3417
                        // There has been no pre-read yet, do so so we can work out
3✔
3418
                        // if the client is trying to negotiate PROXY.
3✔
3419
                        pre = make([]byte, 6)
3✔
3420
                        c.nc.SetReadDeadline(time.Now().Add(proxyProtoReadTimeout))
3✔
3421
                        n, _ := io.ReadFull(c.nc, pre)
3✔
3422
                        c.nc.SetReadDeadline(time.Time{})
3✔
3423
                        pre = pre[:n]
3✔
3424
                }
3✔
3425
                conn = &tlsMixConn{conn, bytes.NewBuffer(pre)}
3✔
3426
                addr, proxyPre, err := readProxyProtoHeader(conn)
3✔
3427
                if err != nil && err != errProxyProtoUnrecognized {
3✔
3428
                        // err != errProxyProtoUnrecognized implies that we detected a proxy
×
3429
                        // protocol header but we failed to parse it, so don't continue.
×
3430
                        c.mu.Unlock()
×
3431
                        s.Warnf("Error reading PROXY protocol header from %s: %v", conn.RemoteAddr(), err)
×
3432
                        c.closeConnection(ProtocolViolation)
×
3433
                        return nil
×
3434
                }
×
3435
                // If addr is nil, it was a LOCAL/UNKNOWN command (health check)
3436
                // Use the connection as-is
3437
                if addr != nil {
5✔
3438
                        c.nc = &proxyConn{
2✔
3439
                                Conn:       conn,
2✔
3440
                                remoteAddr: addr,
2✔
3441
                        }
2✔
3442
                        // These were set already by initClient, override them.
2✔
3443
                        c.host = addr.srcIP.String()
2✔
3444
                        c.port = addr.srcPort
2✔
3445
                }
2✔
3446
                // At this point, err is either:
3447
                //  - nil => we parsed the proxy protocol header successfully
3448
                //  - errProxyProtoUnrecognized => we didn't detect proxy protocol at all
3449
                // We only clear the pre-read if we successfully read the protocol header
3450
                // so that the next step doesn't re-read it. Otherwise we have to assume
3451
                // that it's a non-proxied connection and we want the pre-read to remain
3452
                // for the next step.
3453
                if err == nil {
6✔
3454
                        pre = proxyPre
3✔
3455
                }
3✔
3456
                // Because we have ProxyProtocol enabled, our earlier INFO message didn't
3457
                // include the client_ip. If we need to send it again then we will include
3458
                // it, but sending it here immediately can confuse clients who have just
3459
                // PING'd.
3460
                infoBytes = c.generateClientInfoJSON(info, true)
3✔
3461
        }
3462

3463
        // Check for TLS
3464
        if !isClosed && tlsRequired {
10,133✔
3465
                if s.connRateCounter != nil && !s.connRateCounter.allow() {
369✔
3466
                        c.mu.Unlock()
1✔
3467
                        c.sendErr("Connection throttling is active. Please try again later.")
1✔
3468
                        c.closeConnection(MaxConnectionsExceeded)
1✔
3469
                        return nil
1✔
3470
                }
1✔
3471

3472
                // If we have a prebuffer create a multi-reader.
3473
                if len(pre) > 0 {
373✔
3474
                        c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
6✔
3475
                        // Clear pre so it is not parsed.
6✔
3476
                        pre = nil
6✔
3477
                }
6✔
3478
                // Performs server-side TLS handshake.
3479
                if err := c.doTLSServerHandshake(_EMPTY_, opts.TLSConfig, opts.TLSTimeout, opts.TLSPinnedCerts); err != nil {
498✔
3480
                        c.mu.Unlock()
131✔
3481
                        return nil
131✔
3482
                }
131✔
3483
        }
3484

3485
        // Now, send the INFO if it was delayed
3486
        if !isClosed && tlsFirst {
9,639✔
3487
                c.flags.set(didTLSFirst)
6✔
3488
                c.sendProtoNow(infoBytes)
6✔
3489
                // Check closed status
6✔
3490
                isClosed = c.isClosed()
6✔
3491
        }
6✔
3492

3493
        // Connection could have been closed while sending the INFO proto.
3494
        if isClosed {
9,636✔
3495
                c.mu.Unlock()
3✔
3496
                // We need to call closeConnection() to make sure that proper cleanup is done.
3✔
3497
                c.closeConnection(WriteError)
3✔
3498
                return nil
3✔
3499
        }
3✔
3500

3501
        // Check for Auth. We schedule this timer after the TLS handshake to avoid
3502
        // the race where the timer fires during the handshake and causes the
3503
        // server to write bad data to the socket. See issue #432.
3504
        if authRequired {
16,810✔
3505
                c.setAuthTimer(secondsToDuration(opts.AuthTimeout))
7,180✔
3506
        }
7,180✔
3507

3508
        // Do final client initialization
3509

3510
        // Set the Ping timer. Will be reset once connect was received.
3511
        c.setPingTimer()
9,630✔
3512

9,630✔
3513
        // Spin up the read loop.
9,630✔
3514
        s.startGoRoutine(func() { c.readLoop(pre) })
19,260✔
3515

3516
        // Spin up the write loop.
3517
        s.startGoRoutine(func() { c.writeLoop() })
19,260✔
3518

3519
        if tlsRequired {
9,866✔
3520
                c.Debugf("TLS handshake complete")
236✔
3521
                cs := c.nc.(*tls.Conn).ConnectionState()
236✔
3522
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
236✔
3523
        }
236✔
3524

3525
        c.mu.Unlock()
9,630✔
3526

9,630✔
3527
        return c
9,630✔
3528
}
3529

3530
// This will save off a closed client in a ring buffer such that
3531
// /connz can inspect. Useful for debugging, etc.
3532
func (s *Server) saveClosedClient(c *client, nc net.Conn, subs map[string]*subscription, reason ClosedState) {
12,595✔
3533
        now := time.Now()
12,595✔
3534

12,595✔
3535
        s.accountDisconnectEvent(c, now, reason.String())
12,595✔
3536

12,595✔
3537
        c.mu.Lock()
12,595✔
3538

12,595✔
3539
        cc := &closedClient{}
12,595✔
3540
        cc.fill(c, nc, now, false)
12,595✔
3541
        // Note that cc.fill is using len(c.subs), which may have been set to nil by now,
12,595✔
3542
        // so replace cc.NumSubs with len(subs).
12,595✔
3543
        cc.NumSubs = uint32(len(subs))
12,595✔
3544
        cc.Stop = &now
12,595✔
3545
        cc.Reason = reason.String()
12,595✔
3546

12,595✔
3547
        // Do subs, do not place by default in main ConnInfo
12,595✔
3548
        if len(subs) > 0 {
22,553✔
3549
                cc.subs = make([]SubDetail, 0, len(subs))
9,958✔
3550
                for _, sub := range subs {
160,340✔
3551
                        cc.subs = append(cc.subs, newSubDetail(sub))
150,382✔
3552
                }
150,382✔
3553
        }
3554
        // Hold user as well.
3555
        cc.user = c.getRawAuthUser()
12,595✔
3556
        // Hold account name if not the global account.
12,595✔
3557
        if c.acc != nil && c.acc.Name != globalAccountName {
19,254✔
3558
                cc.acc = c.acc.Name
6,659✔
3559
        }
6,659✔
3560
        cc.JWT = redactBearerJWT(c.opts.JWT)
12,595✔
3561
        cc.IssuerKey = issuerForClient(c)
12,595✔
3562
        cc.Tags = c.tags
12,595✔
3563
        cc.NameTag = c.nameTag
12,595✔
3564
        c.mu.Unlock()
12,595✔
3565

12,595✔
3566
        // Place in the ring buffer
12,595✔
3567
        s.mu.Lock()
12,595✔
3568
        if s.closed != nil {
25,189✔
3569
                s.closed.append(cc)
12,594✔
3570
        }
12,594✔
3571
        s.mu.Unlock()
12,595✔
3572
}
3573

3574
// Adds to the list of client and websocket clients connect URLs.
3575
// If there was a change, an INFO protocol is sent to registered clients
3576
// that support async INFO protocols.
3577
// Server lock held on entry.
3578
func (s *Server) addConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,331✔
3579
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, true)
9,331✔
3580
}
9,331✔
3581

3582
// Removes from the list of client and websocket clients connect URLs.
3583
// If there was a change, an INFO protocol is sent to registered clients
3584
// that support async INFO protocols.
3585
// Server lock held on entry.
3586
func (s *Server) removeConnectURLsAndSendINFOToClients(curls, wsurls []string) {
9,318✔
3587
        s.updateServerINFOAndSendINFOToClients(curls, wsurls, false)
9,318✔
3588
}
9,318✔
3589

3590
// Updates the list of client and websocket clients connect URLs and if any change
3591
// sends an async INFO update to clients that support it.
3592
// Server lock held on entry.
3593
func (s *Server) updateServerINFOAndSendINFOToClients(curls, wsurls []string, add bool) {
18,649✔
3594
        remove := !add
18,649✔
3595
        // Will return true if we need alter the server's Info object.
18,649✔
3596
        updateMap := func(urls []string, m refCountedUrlSet) bool {
55,947✔
3597
                wasUpdated := false
37,298✔
3598
                for _, url := range urls {
56,384✔
3599
                        if add && m.addUrl(url) {
28,623✔
3600
                                wasUpdated = true
9,537✔
3601
                        } else if remove && m.removeUrl(url) {
28,623✔
3602
                                wasUpdated = true
9,537✔
3603
                        }
9,537✔
3604
                }
3605
                return wasUpdated
37,298✔
3606
        }
3607
        cliUpdated := updateMap(curls, s.clientConnectURLsMap)
18,649✔
3608
        wsUpdated := updateMap(wsurls, s.websocket.connectURLsMap)
18,649✔
3609

18,649✔
3610
        updateInfo := func(infoURLs *[]string, urls []string, m refCountedUrlSet) {
37,611✔
3611
                // Recreate the info's slice from the map
18,962✔
3612
                *infoURLs = (*infoURLs)[:0]
18,962✔
3613
                // Add this server client connect ULRs first...
18,962✔
3614
                *infoURLs = append(*infoURLs, urls...)
18,962✔
3615
                // Then the ones from the map
18,962✔
3616
                for url := range m {
43,856✔
3617
                        *infoURLs = append(*infoURLs, url)
24,894✔
3618
                }
24,894✔
3619
        }
3620
        if cliUpdated {
37,263✔
3621
                updateInfo(&s.info.ClientConnectURLs, s.clientConnectURLs, s.clientConnectURLsMap)
18,614✔
3622
        }
18,614✔
3623
        if wsUpdated {
18,997✔
3624
                updateInfo(&s.info.WSConnectURLs, s.websocket.connectURLs, s.websocket.connectURLsMap)
348✔
3625
        }
348✔
3626
        if cliUpdated || wsUpdated {
37,263✔
3627
                // Send to all registered clients that support async INFO protocols.
18,614✔
3628
                s.sendAsyncInfoToClients(cliUpdated, wsUpdated)
18,614✔
3629
        }
18,614✔
3630
}
3631

3632
// Handle closing down a connection when the handshake has timedout.
3633
func tlsTimeout(c *client, conn *tls.Conn) {
34✔
3634
        c.mu.Lock()
34✔
3635
        closed := c.isClosed()
34✔
3636
        c.mu.Unlock()
34✔
3637
        // Check if already closed
34✔
3638
        if closed {
43✔
3639
                return
9✔
3640
        }
9✔
3641
        cs := conn.ConnectionState()
25✔
3642
        if !cs.HandshakeComplete {
28✔
3643
                c.Errorf("TLS handshake timeout")
3✔
3644
                c.sendErr("Secure Connection - TLS Required")
3✔
3645
                c.closeConnection(TLSHandshakeError)
3✔
3646
        }
3✔
3647
}
3648

3649
// Seems silly we have to write these
3650
func tlsVersion(ver uint16) string {
1,212✔
3651
        switch ver {
1,212✔
3652
        case tls.VersionTLS10:
×
3653
                return "1.0"
×
3654
        case tls.VersionTLS11:
×
3655
                return "1.1"
×
3656
        case tls.VersionTLS12:
×
3657
                return "1.2"
×
3658
        case tls.VersionTLS13:
1,212✔
3659
                return "1.3"
1,212✔
3660
        }
3661
        return fmt.Sprintf("Unknown [0x%x]", ver)
×
3662
}
3663

3664
func tlsVersionFromString(ver string) (uint16, error) {
×
3665
        switch ver {
×
3666
        case "1.0":
×
3667
                return tls.VersionTLS10, nil
×
3668
        case "1.1":
×
3669
                return tls.VersionTLS11, nil
×
3670
        case "1.2":
×
3671
                return tls.VersionTLS12, nil
×
3672
        case "1.3":
×
3673
                return tls.VersionTLS13, nil
×
3674
        }
3675
        return 0, fmt.Errorf("unknown version: %v", ver)
×
3676
}
3677

3678
// Remove a client or route from our internal accounting.
3679
func (s *Server) removeClient(c *client) {
153,139✔
3680
        // kind is immutable, so can check without lock
153,139✔
3681
        switch c.kind {
153,139✔
3682
        case CLIENT:
10,870✔
3683
                c.mu.Lock()
10,870✔
3684
                cid := c.cid
10,870✔
3685
                updateProtoInfoCount := false
10,870✔
3686
                if c.kind == CLIENT && c.opts.Protocol >= ClientProtoInfo {
19,888✔
3687
                        updateProtoInfoCount = true
9,018✔
3688
                }
9,018✔
3689
                proxyKey := c.proxyKey
10,870✔
3690
                c.mu.Unlock()
10,870✔
3691

10,870✔
3692
                s.mu.Lock()
10,870✔
3693
                delete(s.clients, cid)
10,870✔
3694
                if updateProtoInfoCount {
19,888✔
3695
                        s.cproto--
9,018✔
3696
                }
9,018✔
3697
                if proxyKey != _EMPTY_ {
10,874✔
3698
                        s.removeProxiedConn(proxyKey, cid)
4✔
3699
                }
4✔
3700
                s.mu.Unlock()
10,870✔
3701
        case ROUTER:
64,580✔
3702
                s.removeRoute(c)
64,580✔
3703
        case GATEWAY:
3,775✔
3704
                s.removeRemoteGatewayConnection(c)
3,775✔
3705
        case LEAF:
1,732✔
3706
                s.removeLeafNodeConnection(c)
1,732✔
3707
        }
3708
}
3709

3710
// Remove the connection with id `cid` from the map of connections
3711
// under the public key `key` of the trusted proxies.
3712
//
3713
// Server lock must be held on entry.
3714
func (s *Server) removeProxiedConn(key string, cid uint64) {
8✔
3715
        conns := s.proxiedConns[key]
8✔
3716
        delete(conns, cid)
8✔
3717
        if len(conns) == 0 {
16✔
3718
                delete(s.proxiedConns, key)
8✔
3719
        }
8✔
3720
}
3721

3722
func (s *Server) removeFromTempClients(cid uint64) {
109,661✔
3723
        s.grMu.Lock()
109,661✔
3724
        delete(s.grTmpClients, cid)
109,661✔
3725
        s.grMu.Unlock()
109,661✔
3726
}
109,661✔
3727

3728
func (s *Server) addToTempClients(cid uint64, c *client) bool {
69,768✔
3729
        added := false
69,768✔
3730
        s.grMu.Lock()
69,768✔
3731
        if s.grRunning {
139,534✔
3732
                s.grTmpClients[cid] = c
69,766✔
3733
                added = true
69,766✔
3734
        }
69,766✔
3735
        s.grMu.Unlock()
69,768✔
3736
        return added
69,768✔
3737
}
3738

3739
/////////////////////////////////////////////////////////////////
3740
// These are some helpers for accounting in functional tests.
3741
/////////////////////////////////////////////////////////////////
3742

3743
// NumRoutes will report the number of registered routes.
3744
func (s *Server) NumRoutes() int {
6,476✔
3745
        s.mu.RLock()
6,476✔
3746
        defer s.mu.RUnlock()
6,476✔
3747
        return s.numRoutes()
6,476✔
3748
}
6,476✔
3749

3750
// numRoutes will report the number of registered routes.
3751
// Server lock held on entry
3752
func (s *Server) numRoutes() int {
793,895✔
3753
        var nr int
793,895✔
3754
        s.forEachRoute(func(c *client) {
1,540,422✔
3755
                nr++
746,527✔
3756
        })
746,527✔
3757
        return nr
793,895✔
3758
}
3759

3760
// NumRemotes will report number of registered remotes.
3761
func (s *Server) NumRemotes() int {
×
3762
        s.mu.RLock()
×
3763
        defer s.mu.RUnlock()
×
3764
        return s.numRemotes()
×
3765
}
×
3766

3767
// numRemotes will report number of registered remotes.
3768
// Server lock held on entry
3769
func (s *Server) numRemotes() int {
28,501✔
3770
        return len(s.routes)
28,501✔
3771
}
28,501✔
3772

3773
// NumLeafNodes will report number of leaf node connections.
3774
func (s *Server) NumLeafNodes() int {
3,819✔
3775
        s.mu.RLock()
3,819✔
3776
        defer s.mu.RUnlock()
3,819✔
3777
        return len(s.leafs)
3,819✔
3778
}
3,819✔
3779

3780
// NumClients will report the number of registered clients.
3781
func (s *Server) NumClients() int {
63✔
3782
        s.mu.RLock()
63✔
3783
        defer s.mu.RUnlock()
63✔
3784
        return len(s.clients)
63✔
3785
}
63✔
3786

3787
// GetClient will return the client associated with cid.
3788
func (s *Server) GetClient(cid uint64) *client {
134✔
3789
        return s.getClient(cid)
134✔
3790
}
134✔
3791

3792
// getClient will return the client associated with cid.
3793
func (s *Server) getClient(cid uint64) *client {
151✔
3794
        s.mu.RLock()
151✔
3795
        defer s.mu.RUnlock()
151✔
3796
        return s.clients[cid]
151✔
3797
}
151✔
3798

3799
// GetLeafNode returns the leafnode associated with the cid.
3800
func (s *Server) GetLeafNode(cid uint64) *client {
1✔
3801
        s.mu.RLock()
1✔
3802
        defer s.mu.RUnlock()
1✔
3803
        return s.leafs[cid]
1✔
3804
}
1✔
3805

3806
// NumSubscriptions will report how many subscriptions are active.
3807
func (s *Server) NumSubscriptions() uint32 {
366✔
3808
        s.mu.RLock()
366✔
3809
        defer s.mu.RUnlock()
366✔
3810
        return s.numSubscriptions()
366✔
3811
}
366✔
3812

3813
// numSubscriptions will report how many subscriptions are active.
3814
// Lock should be held.
3815
func (s *Server) numSubscriptions() uint32 {
27,435✔
3816
        var subs int
27,435✔
3817
        s.accounts.Range(func(k, v any) bool {
89,711✔
3818
                acc := v.(*Account)
62,276✔
3819
                subs += acc.TotalSubs()
62,276✔
3820
                return true
62,276✔
3821
        })
62,276✔
3822
        return uint32(subs)
27,435✔
3823
}
3824

3825
// NumSlowConsumers will report the number of slow consumers.
3826
func (s *Server) NumSlowConsumers() int64 {
1✔
3827
        return atomic.LoadInt64(&s.slowConsumers)
1✔
3828
}
1✔
3829

3830
// NumStalledClients will report the total number of times clients have been stalled.
3831
func (s *Server) NumStalledClients() int64 {
×
3832
        return atomic.LoadInt64(&s.stalls)
×
3833
}
×
3834

3835
// NumSlowConsumersClients will report the number of slow consumers clients.
3836
func (s *Server) NumSlowConsumersClients() uint64 {
37,078✔
3837
        return s.scStats.clients.Load()
37,078✔
3838
}
37,078✔
3839

3840
// NumSlowConsumersRoutes will report the number of slow consumers routes.
3841
func (s *Server) NumSlowConsumersRoutes() uint64 {
37,078✔
3842
        return s.scStats.routes.Load()
37,078✔
3843
}
37,078✔
3844

3845
// NumSlowConsumersGateways will report the number of slow consumers leafs.
3846
func (s *Server) NumSlowConsumersGateways() uint64 {
37,080✔
3847
        return s.scStats.gateways.Load()
37,080✔
3848
}
37,080✔
3849

3850
// NumSlowConsumersLeafs will report the number of slow consumers leafs.
3851
func (s *Server) NumSlowConsumersLeafs() uint64 {
37,079✔
3852
        return s.scStats.leafs.Load()
37,079✔
3853
}
37,079✔
3854

3855
// NumStaleConnections will report the number of stale connections.
3856
func (s *Server) NumStaleConnections() int64 {
4✔
3857
        return atomic.LoadInt64(&s.staleConnections)
4✔
3858
}
4✔
3859

3860
// NumStaleConnectionsClients will report the number of stale client connections.
3861
func (s *Server) NumStaleConnectionsClients() uint64 {
37,082✔
3862
        return s.staleStats.clients.Load()
37,082✔
3863
}
37,082✔
3864

3865
// NumStaleConnectionsRoutes will report the number of stale route connections.
3866
func (s *Server) NumStaleConnectionsRoutes() uint64 {
37,082✔
3867
        return s.staleStats.routes.Load()
37,082✔
3868
}
37,082✔
3869

3870
// NumStaleConnectionsGateways will report the number of stale gateway connections.
3871
func (s *Server) NumStaleConnectionsGateways() uint64 {
37,082✔
3872
        return s.staleStats.gateways.Load()
37,082✔
3873
}
37,082✔
3874

3875
// NumStaleConnectionsLeafs will report the number of stale leaf connections.
3876
func (s *Server) NumStaleConnectionsLeafs() uint64 {
37,082✔
3877
        return s.staleStats.leafs.Load()
37,082✔
3878
}
37,082✔
3879

3880
// ConfigTime will report the last time the server configuration was loaded.
3881
func (s *Server) ConfigTime() time.Time {
×
3882
        s.mu.RLock()
×
3883
        defer s.mu.RUnlock()
×
3884
        return s.configTime
×
3885
}
×
3886

3887
// Addr will return the net.Addr object for the current listener.
3888
func (s *Server) Addr() net.Addr {
216✔
3889
        s.mu.RLock()
216✔
3890
        defer s.mu.RUnlock()
216✔
3891
        if s.listener == nil {
216✔
3892
                return nil
×
3893
        }
×
3894
        return s.listener.Addr()
216✔
3895
}
3896

3897
// MonitorAddr will return the net.Addr object for the monitoring listener.
3898
func (s *Server) MonitorAddr() *net.TCPAddr {
139✔
3899
        s.mu.RLock()
139✔
3900
        defer s.mu.RUnlock()
139✔
3901
        if s.http == nil {
139✔
3902
                return nil
×
3903
        }
×
3904
        return s.http.Addr().(*net.TCPAddr)
139✔
3905
}
3906

3907
// ClusterAddr returns the net.Addr object for the route listener.
3908
func (s *Server) ClusterAddr() *net.TCPAddr {
29✔
3909
        s.mu.RLock()
29✔
3910
        defer s.mu.RUnlock()
29✔
3911
        if s.routeListener == nil {
29✔
3912
                return nil
×
3913
        }
×
3914
        return s.routeListener.Addr().(*net.TCPAddr)
29✔
3915
}
3916

3917
// ProfilerAddr returns the net.Addr object for the profiler listener.
3918
func (s *Server) ProfilerAddr() *net.TCPAddr {
×
3919
        s.mu.RLock()
×
3920
        defer s.mu.RUnlock()
×
3921
        if s.profiler == nil {
×
3922
                return nil
×
3923
        }
×
3924
        return s.profiler.Addr().(*net.TCPAddr)
×
3925
}
3926

3927
func (s *Server) readyForConnections(d time.Duration) error {
10,603✔
3928
        // Snapshot server options.
10,603✔
3929
        opts := s.getOpts()
10,603✔
3930

10,603✔
3931
        type info struct {
10,603✔
3932
                ok  bool
10,603✔
3933
                err error
10,603✔
3934
        }
10,603✔
3935
        chk := make(map[string]info)
10,603✔
3936

10,603✔
3937
        end := time.Now().Add(d)
10,603✔
3938
        for time.Now().Before(end) {
53,276✔
3939
                s.mu.RLock()
42,673✔
3940
                chk["server"] = info{ok: s.listener != nil || opts.DontListen, err: s.listenerErr}
42,673✔
3941
                chk["route"] = info{ok: (opts.Cluster.Port == 0 || s.routeListener != nil), err: s.routeListenerErr}
42,673✔
3942
                chk["gateway"] = info{ok: (opts.Gateway.Name == _EMPTY_ || s.gatewayListener != nil), err: s.gatewayListenerErr}
42,673✔
3943
                chk["leafnode"] = info{ok: (opts.LeafNode.Port == 0 || s.leafNodeListener != nil), err: s.leafNodeListenerErr}
42,673✔
3944
                chk["websocket"] = info{ok: (opts.Websocket.Port == 0 || s.websocket.listener != nil), err: s.websocket.listenerErr}
42,673✔
3945
                chk["mqtt"] = info{ok: (opts.MQTT.Port == 0 || s.mqtt.listener != nil), err: s.mqtt.listenerErr}
42,673✔
3946
                s.mu.RUnlock()
42,673✔
3947

42,673✔
3948
                var numOK int
42,673✔
3949
                for _, inf := range chk {
298,711✔
3950
                        if inf.ok {
478,947✔
3951
                                numOK++
222,909✔
3952
                        }
222,909✔
3953
                }
3954
                if numOK == len(chk) {
53,268✔
3955
                        // In the case of DontListen option (no accept loop), we still want
10,595✔
3956
                        // to make sure that Start() has done all the work, so we wait on
10,595✔
3957
                        // that.
10,595✔
3958
                        if opts.DontListen {
10,595✔
3959
                                select {
×
3960
                                case <-s.startupComplete:
×
3961
                                case <-time.After(d):
×
3962
                                        return fmt.Errorf("failed to be ready for connections after %s: startup did not complete", d)
×
3963
                                }
3964
                        }
3965
                        return nil
10,595✔
3966
                }
3967
                if d > 25*time.Millisecond {
37,100✔
3968
                        time.Sleep(25 * time.Millisecond)
5,022✔
3969
                }
5,022✔
3970
        }
3971

3972
        failed := make([]string, 0, len(chk))
8✔
3973
        for name, inf := range chk {
56✔
3974
                if inf.ok && inf.err != nil {
48✔
3975
                        failed = append(failed, fmt.Sprintf("%s(ok, but %s)", name, inf.err))
×
3976
                }
×
3977
                if !inf.ok && inf.err == nil {
58✔
3978
                        failed = append(failed, name)
10✔
3979
                }
10✔
3980
                if !inf.ok && inf.err != nil {
48✔
3981
                        failed = append(failed, fmt.Sprintf("%s(%s)", name, inf.err))
×
3982
                }
×
3983
        }
3984

3985
        return fmt.Errorf(
8✔
3986
                "failed to be ready for connections after %s: %s",
8✔
3987
                d, strings.Join(failed, ", "),
8✔
3988
        )
8✔
3989
}
3990

3991
// ReadyForConnections returns `true` if the server is ready to accept clients
3992
// and, if routing is enabled, route connections. If after the duration
3993
// `dur` the server is still not ready, returns `false`.
3994
func (s *Server) ReadyForConnections(dur time.Duration) bool {
804✔
3995
        return s.readyForConnections(dur) == nil
804✔
3996
}
804✔
3997

3998
// Quick utility to function to tell if the server supports headers.
3999
func (s *Server) supportsHeaders() bool {
190,375✔
4000
        if s == nil {
190,375✔
4001
                return false
×
4002
        }
×
4003
        return !(s.getOpts().NoHeaderSupport)
190,375✔
4004
}
4005

4006
// ID returns the server's ID
4007
func (s *Server) ID() string {
56,077✔
4008
        return s.info.ID
56,077✔
4009
}
56,077✔
4010

4011
// NodeName returns the node name for this server.
4012
func (s *Server) NodeName() string {
226✔
4013
        return getHash(s.info.Name)
226✔
4014
}
226✔
4015

4016
// Name returns the server's name. This will be the same as the ID if it was not set.
4017
func (s *Server) Name() string {
234,842✔
4018
        return s.info.Name
234,842✔
4019
}
234,842✔
4020

4021
func (s *Server) String() string {
5,518✔
4022
        return s.info.Name
5,518✔
4023
}
5,518✔
4024

4025
type pprofLabels map[string]string
4026

4027
func setGoRoutineLabels(tags ...pprofLabels) {
379,993✔
4028
        var labels []string
379,993✔
4029
        for _, m := range tags {
433,738✔
4030
                for k, v := range m {
244,955✔
4031
                        labels = append(labels, k, v)
191,210✔
4032
                }
191,210✔
4033
        }
4034
        if len(labels) > 0 {
433,738✔
4035
                pprof.SetGoroutineLabels(
53,745✔
4036
                        pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
53,745✔
4037
                )
53,745✔
4038
        }
53,745✔
4039
}
4040

4041
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
357,457✔
4042
        var started bool
357,457✔
4043
        s.grMu.Lock()
357,457✔
4044
        defer s.grMu.Unlock()
357,457✔
4045
        if s.grRunning {
714,783✔
4046
                s.grWG.Add(1)
357,326✔
4047
                go func() {
714,652✔
4048
                        setGoRoutineLabels(tags...)
357,326✔
4049
                        f()
357,326✔
4050
                }()
357,326✔
4051
                started = true
357,326✔
4052
        }
4053
        return started
357,457✔
4054
}
4055

4056
func (s *Server) numClosedConns() int {
102✔
4057
        s.mu.RLock()
102✔
4058
        defer s.mu.RUnlock()
102✔
4059
        return s.closed.len()
102✔
4060
}
102✔
4061

4062
func (s *Server) totalClosedConns() uint64 {
43✔
4063
        s.mu.RLock()
43✔
4064
        defer s.mu.RUnlock()
43✔
4065
        return s.closed.totalConns()
43✔
4066
}
43✔
4067

4068
func (s *Server) closedClients() []*closedClient {
8✔
4069
        s.mu.RLock()
8✔
4070
        defer s.mu.RUnlock()
8✔
4071
        return s.closed.closedClients()
8✔
4072
}
8✔
4073

4074
// getClientConnectURLs returns suitable URLs for clients to connect to the listen
4075
// port based on the server options' Host and Port. If the Host corresponds to
4076
// "any" interfaces, this call returns the list of resolved IP addresses.
4077
// If ClientAdvertise is set, returns the client advertise host and port.
4078
// The server lock is assumed held on entry.
4079
func (s *Server) getClientConnectURLs() []string {
6,846✔
4080
        // Snapshot server options.
6,846✔
4081
        opts := s.getOpts()
6,846✔
4082
        // Ignore error here since we know that if there is client advertise, the
6,846✔
4083
        // parseHostPort is correct because we did it right before calling this
6,846✔
4084
        // function in Server.New().
6,846✔
4085
        urls, _ := s.getConnectURLs(opts.ClientAdvertise, opts.Host, opts.Port)
6,846✔
4086
        return urls
6,846✔
4087
}
6,846✔
4088

4089
// Generic version that will return an array of URLs based on the given
4090
// advertise, host and port values.
4091
func (s *Server) getConnectURLs(advertise, host string, port int) ([]string, error) {
6,968✔
4092
        urls := make([]string, 0, 1)
6,968✔
4093

6,968✔
4094
        // short circuit if advertise is set
6,968✔
4095
        if advertise != "" {
6,970✔
4096
                h, p, err := parseHostPort(advertise, port)
2✔
4097
                if err != nil {
2✔
4098
                        return nil, err
×
4099
                }
×
4100
                urls = append(urls, net.JoinHostPort(h, strconv.Itoa(p)))
2✔
4101
        } else {
6,966✔
4102
                sPort := strconv.Itoa(port)
6,966✔
4103
                _, ips, err := s.getNonLocalIPsIfHostIsIPAny(host, true)
6,966✔
4104
                for _, ip := range ips {
8,184✔
4105
                        urls = append(urls, net.JoinHostPort(ip, sPort))
1,218✔
4106
                }
1,218✔
4107
                if err != nil || len(urls) == 0 {
13,323✔
4108
                        // We are here if s.opts.Host is not "0.0.0.0" nor "::", or if for some
6,357✔
4109
                        // reason we could not add any URL in the loop above.
6,357✔
4110
                        // We had a case where a Windows VM was hosed and would have err == nil
6,357✔
4111
                        // and not add any address in the array in the loop above, and we
6,357✔
4112
                        // ended-up returning 0.0.0.0, which is problematic for Windows clients.
6,357✔
4113
                        // Check for 0.0.0.0 or :: specifically, and ignore if that's the case.
6,357✔
4114
                        if host == "0.0.0.0" || host == "::" {
6,357✔
4115
                                s.Errorf("Address %q can not be resolved properly", host)
×
4116
                        } else {
6,357✔
4117
                                urls = append(urls, net.JoinHostPort(host, sPort))
6,357✔
4118
                        }
6,357✔
4119
                }
4120
        }
4121
        return urls, nil
6,968✔
4122
}
4123

4124
// Returns an array of non local IPs if the provided host is
4125
// 0.0.0.0 or ::. It returns the first resolved if `all` is
4126
// false.
4127
// The boolean indicate if the provided host was 0.0.0.0 (or ::)
4128
// so that if the returned array is empty caller can decide
4129
// what to do next.
4130
func (s *Server) getNonLocalIPsIfHostIsIPAny(host string, all bool) (bool, []string, error) {
12,051✔
4131
        ip := net.ParseIP(host)
12,051✔
4132
        // If this is not an IP, we are done
12,051✔
4133
        if ip == nil {
12,074✔
4134
                return false, nil, nil
23✔
4135
        }
23✔
4136
        // If this is not 0.0.0.0 or :: we have nothing to do.
4137
        if !ip.IsUnspecified() {
23,087✔
4138
                return false, nil, nil
11,059✔
4139
        }
11,059✔
4140
        s.Debugf("Get non local IPs for %q", host)
969✔
4141
        var ips []string
969✔
4142
        ifaces, _ := net.Interfaces()
969✔
4143
        for _, i := range ifaces {
4,845✔
4144
                addrs, _ := i.Addrs()
3,876✔
4145
                for _, addr := range addrs {
8,361✔
4146
                        switch v := addr.(type) {
4,485✔
4147
                        case *net.IPNet:
4,485✔
4148
                                ip = v.IP
4,485✔
4149
                        case *net.IPAddr:
×
4150
                                ip = v.IP
×
4151
                        }
4152
                        ipStr := ip.String()
4,485✔
4153
                        // Skip non global unicast addresses
4,485✔
4154
                        if !ip.IsGlobalUnicast() || ip.IsUnspecified() {
7,032✔
4155
                                ip = nil
2,547✔
4156
                                continue
2,547✔
4157
                        }
4158
                        s.Debugf("  ip=%s", ipStr)
1,938✔
4159
                        ips = append(ips, ipStr)
1,938✔
4160
                        if !all {
2,658✔
4161
                                break
720✔
4162
                        }
4163
                }
4164
        }
4165
        return true, ips, nil
969✔
4166
}
4167

4168
// if the ip is not specified, attempt to resolve it
4169
func resolveHostPorts(addr net.Listener) []string {
15✔
4170
        hostPorts := make([]string, 0)
15✔
4171
        hp := addr.Addr().(*net.TCPAddr)
15✔
4172
        port := strconv.Itoa(hp.Port)
15✔
4173
        if hp.IP.IsUnspecified() {
22✔
4174
                var ip net.IP
7✔
4175
                ifaces, _ := net.Interfaces()
7✔
4176
                for _, i := range ifaces {
35✔
4177
                        addrs, _ := i.Addrs()
28✔
4178
                        for _, addr := range addrs {
63✔
4179
                                switch v := addr.(type) {
35✔
4180
                                case *net.IPNet:
35✔
4181
                                        ip = v.IP
35✔
4182
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
35✔
4183
                                case *net.IPAddr:
×
4184
                                        ip = v.IP
×
4185
                                        hostPorts = append(hostPorts, net.JoinHostPort(ip.String(), port))
×
4186
                                default:
×
4187
                                        continue
×
4188
                                }
4189
                        }
4190
                }
4191
        } else {
8✔
4192
                hostPorts = append(hostPorts, net.JoinHostPort(hp.IP.String(), port))
8✔
4193
        }
8✔
4194
        return hostPorts
15✔
4195
}
4196

4197
// format the address of a net.Listener with a protocol
4198
func formatURL(protocol string, addr net.Listener) []string {
15✔
4199
        hostports := resolveHostPorts(addr)
15✔
4200
        for i, hp := range hostports {
58✔
4201
                hostports[i] = fmt.Sprintf("%s://%s", protocol, hp)
43✔
4202
        }
43✔
4203
        return hostports
15✔
4204
}
4205

4206
// Ports describes URLs that the server can be contacted in
4207
type Ports struct {
4208
        Nats       []string `json:"nats,omitempty"`
4209
        Monitoring []string `json:"monitoring,omitempty"`
4210
        Cluster    []string `json:"cluster,omitempty"`
4211
        Profile    []string `json:"profile,omitempty"`
4212
        WebSocket  []string `json:"websocket,omitempty"`
4213
}
4214

4215
// PortsInfo attempts to resolve all the ports. If after maxWait the ports are not
4216
// resolved, it returns nil. Otherwise it returns a Ports struct
4217
// describing ports where the server can be contacted
4218
func (s *Server) PortsInfo(maxWait time.Duration) *Ports {
6✔
4219
        if s.readyForListeners(maxWait) {
12✔
4220
                opts := s.getOpts()
6✔
4221

6✔
4222
                s.mu.RLock()
6✔
4223
                tls := s.info.TLSRequired
6✔
4224
                listener := s.listener
6✔
4225
                httpListener := s.http
6✔
4226
                clusterListener := s.routeListener
6✔
4227
                profileListener := s.profiler
6✔
4228
                wsListener := s.websocket.listener
6✔
4229
                wss := s.websocket.tls
6✔
4230
                s.mu.RUnlock()
6✔
4231

6✔
4232
                ports := Ports{}
6✔
4233

6✔
4234
                if listener != nil {
12✔
4235
                        natsProto := "nats"
6✔
4236
                        if tls {
7✔
4237
                                natsProto = "tls"
1✔
4238
                        }
1✔
4239
                        ports.Nats = formatURL(natsProto, listener)
6✔
4240
                }
4241

4242
                if httpListener != nil {
8✔
4243
                        monProto := "http"
2✔
4244
                        if opts.HTTPSPort != 0 {
2✔
4245
                                monProto = "https"
×
4246
                        }
×
4247
                        ports.Monitoring = formatURL(monProto, httpListener)
2✔
4248
                }
4249

4250
                if clusterListener != nil {
8✔
4251
                        clusterProto := "nats"
2✔
4252
                        if opts.Cluster.TLSConfig != nil {
2✔
4253
                                clusterProto = "tls"
×
4254
                        }
×
4255
                        ports.Cluster = formatURL(clusterProto, clusterListener)
2✔
4256
                }
4257

4258
                if profileListener != nil {
8✔
4259
                        ports.Profile = formatURL("http", profileListener)
2✔
4260
                }
2✔
4261

4262
                if wsListener != nil {
9✔
4263
                        protocol := wsSchemePrefix
3✔
4264
                        if wss {
6✔
4265
                                protocol = wsSchemePrefixTLS
3✔
4266
                        }
3✔
4267
                        ports.WebSocket = formatURL(protocol, wsListener)
3✔
4268
                }
4269

4270
                return &ports
6✔
4271
        }
4272

4273
        return nil
×
4274
}
4275

4276
// Returns the portsFile. If a non-empty dirHint is provided, the dirHint
4277
// path is used instead of the server option value
4278
func (s *Server) portFile(dirHint string) string {
6✔
4279
        dirname := s.getOpts().PortsFileDir
6✔
4280
        if dirHint != "" {
12✔
4281
                dirname = dirHint
6✔
4282
        }
6✔
4283
        if dirname == _EMPTY_ {
6✔
4284
                return _EMPTY_
×
4285
        }
×
4286
        return filepath.Join(dirname, fmt.Sprintf("%s_%d.ports", filepath.Base(os.Args[0]), os.Getpid()))
6✔
4287
}
4288

4289
// Delete the ports file. If a non-empty dirHint is provided, the dirHint
4290
// path is used instead of the server option value
4291
func (s *Server) deletePortsFile(hintDir string) {
3✔
4292
        portsFile := s.portFile(hintDir)
3✔
4293
        if portsFile != "" {
6✔
4294
                if err := os.Remove(portsFile); err != nil {
3✔
4295
                        s.Errorf("Error cleaning up ports file %s: %v", portsFile, err)
×
4296
                }
×
4297
        }
4298
}
4299

4300
// Writes a file with a serialized Ports to the specified ports_file_dir.
4301
// The name of the file is `exename_pid.ports`, typically nats-server_pid.ports.
4302
// if ports file is not set, this function has no effect
4303
func (s *Server) logPorts() {
3✔
4304
        opts := s.getOpts()
3✔
4305
        portsFile := s.portFile(opts.PortsFileDir)
3✔
4306
        if portsFile != _EMPTY_ {
6✔
4307
                go func() {
6✔
4308
                        info := s.PortsInfo(5 * time.Second)
3✔
4309
                        if info == nil {
3✔
4310
                                s.Errorf("Unable to resolve the ports in the specified time")
×
4311
                                return
×
4312
                        }
×
4313
                        data, err := json.Marshal(info)
3✔
4314
                        if err != nil {
3✔
4315
                                s.Errorf("Error marshaling ports file: %v", err)
×
4316
                                return
×
4317
                        }
×
4318
                        if err := os.WriteFile(portsFile, data, 0666); err != nil {
3✔
4319
                                s.Errorf("Error writing ports file (%s): %v", portsFile, err)
×
4320
                                return
×
4321
                        }
×
4322

4323
                }()
4324
        }
4325
}
4326

4327
// waits until a calculated list of listeners is resolved or a timeout
4328
func (s *Server) readyForListeners(dur time.Duration) bool {
6✔
4329
        end := time.Now().Add(dur)
6✔
4330
        for time.Now().Before(end) {
13✔
4331
                s.mu.RLock()
7✔
4332
                listeners := s.serviceListeners()
7✔
4333
                s.mu.RUnlock()
7✔
4334
                if len(listeners) == 0 {
7✔
4335
                        return false
×
4336
                }
×
4337

4338
                ok := true
7✔
4339
                for _, l := range listeners {
24✔
4340
                        if l == nil {
18✔
4341
                                ok = false
1✔
4342
                                break
1✔
4343
                        }
4344
                }
4345
                if ok {
13✔
4346
                        return true
6✔
4347
                }
6✔
4348
                select {
1✔
4349
                case <-s.quitCh:
×
4350
                        return false
×
4351
                case <-time.After(25 * time.Millisecond):
1✔
4352
                        // continue - unable to select from quit - we are still running
4353
                }
4354
        }
4355
        return false
×
4356
}
4357

4358
// returns a list of listeners that are intended for the process
4359
// if the entry is nil, the interface is yet to be resolved
4360
func (s *Server) serviceListeners() []net.Listener {
7✔
4361
        listeners := make([]net.Listener, 0)
7✔
4362
        opts := s.getOpts()
7✔
4363
        listeners = append(listeners, s.listener)
7✔
4364
        if opts.Cluster.Port != 0 {
10✔
4365
                listeners = append(listeners, s.routeListener)
3✔
4366
        }
3✔
4367
        if opts.HTTPPort != 0 || opts.HTTPSPort != 0 {
10✔
4368
                listeners = append(listeners, s.http)
3✔
4369
        }
3✔
4370
        if opts.ProfPort != 0 {
10✔
4371
                listeners = append(listeners, s.profiler)
3✔
4372
        }
3✔
4373
        if opts.Websocket.Port != 0 {
11✔
4374
                listeners = append(listeners, s.websocket.listener)
4✔
4375
        }
4✔
4376
        return listeners
7✔
4377
}
4378

4379
// Returns true if in lame duck mode.
4380
func (s *Server) isLameDuckMode() bool {
21,993✔
4381
        s.mu.RLock()
21,993✔
4382
        defer s.mu.RUnlock()
21,993✔
4383
        return s.ldm
21,993✔
4384
}
21,993✔
4385

4386
// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
4387
// the client listener is closed, existing client connections are
4388
// kicked, Raft leaderships are transferred, JetStream is shutdown
4389
// and then finally shutdown the the NATS Server itself.
4390
// This function blocks and will not return until the NATS Server
4391
// has completed the entire shutdown operation.
4392
func (s *Server) LameDuckShutdown() {
1✔
4393
        s.lameDuckMode()
1✔
4394
}
1✔
4395

4396
// This function will close the client listener then close the clients
4397
// at some interval to avoid a reconnect storm.
4398
// We will also transfer any raft leaders and shutdown JetStream.
4399
func (s *Server) lameDuckMode() {
6✔
4400
        s.mu.Lock()
6✔
4401
        // Check if there is actually anything to do
6✔
4402
        if s.isShuttingDown() || s.ldm || s.listener == nil {
6✔
4403
                s.mu.Unlock()
×
4404
                return
×
4405
        }
×
4406
        s.Noticef("Entering lame duck mode, stop accepting new clients")
6✔
4407
        s.ldm = true
6✔
4408
        s.sendLDMShutdownEventLocked()
6✔
4409
        expected := 1
6✔
4410
        s.listener.Close()
6✔
4411
        s.listener = nil
6✔
4412
        expected += s.closeWebsocketServer()
6✔
4413
        s.ldmCh = make(chan bool, expected)
6✔
4414
        opts := s.getOpts()
6✔
4415
        gp := opts.LameDuckGracePeriod
6✔
4416
        // For tests, we want the grace period to be in some cases bigger
6✔
4417
        // than the ldm duration, so to by-pass the validateOptions() check,
6✔
4418
        // we use negative number and flip it here.
6✔
4419
        if gp < 0 {
11✔
4420
                gp *= -1
5✔
4421
        }
5✔
4422
        s.mu.Unlock()
6✔
4423

6✔
4424
        // If we are running any raftNodes transfer leaders.
6✔
4425
        if hadTransfers := s.transferRaftLeaders(); hadTransfers {
11✔
4426
                // They will transfer leadership quickly, but wait here for a second.
5✔
4427
                select {
5✔
4428
                case <-time.After(time.Second):
5✔
4429
                case <-s.quitCh:
×
4430
                        return
×
4431
                }
4432
        }
4433

4434
        // Now check and shutdown jetstream.
4435
        s.shutdownJetStream()
6✔
4436

6✔
4437
        // Now shutdown the nodes
6✔
4438
        s.shutdownRaftNodes()
6✔
4439

6✔
4440
        // Wait for accept loops to be done to make sure that no new
6✔
4441
        // client can connect
6✔
4442
        for i := 0; i < expected; i++ {
12✔
4443
                <-s.ldmCh
6✔
4444
        }
6✔
4445

4446
        s.mu.Lock()
6✔
4447
        // Need to recheck few things
6✔
4448
        if s.isShuttingDown() || len(s.clients) == 0 {
6✔
4449
                s.mu.Unlock()
×
4450
                // If there is no client, we need to call Shutdown() to complete
×
4451
                // the LDMode. If server has been shutdown while lock was released,
×
4452
                // calling Shutdown() should be no-op.
×
4453
                s.Shutdown()
×
4454
                return
×
4455
        }
×
4456
        dur := int64(opts.LameDuckDuration)
6✔
4457
        dur -= int64(gp)
6✔
4458
        if dur <= 0 {
11✔
4459
                dur = int64(time.Second)
5✔
4460
        }
5✔
4461
        numClients := int64(len(s.clients))
6✔
4462
        batch := 1
6✔
4463
        // Sleep interval between each client connection close.
6✔
4464
        var si int64
6✔
4465
        if numClients != 0 {
12✔
4466
                si = dur / numClients
6✔
4467
        }
6✔
4468
        if si < 1 {
6✔
4469
                // Should not happen (except in test with very small LD duration), but
×
4470
                // if there are too many clients, batch the number of close and
×
4471
                // use a tiny sleep interval that will result in yield likely.
×
4472
                si = 1
×
4473
                batch = int(numClients / dur)
×
4474
        } else if si > int64(time.Second) {
7✔
4475
                // Conversely, there is no need to sleep too long between clients
1✔
4476
                // and spread say 10 clients for the 2min duration. Sleeping no
1✔
4477
                // more than 1sec.
1✔
4478
                si = int64(time.Second)
1✔
4479
        }
1✔
4480

4481
        // Now capture all clients
4482
        clients := make([]*client, 0, len(s.clients))
6✔
4483
        for _, client := range s.clients {
22✔
4484
                clients = append(clients, client)
16✔
4485
        }
16✔
4486
        // Now that we know that no new client can be accepted,
4487
        // send INFO to routes and clients to notify this state.
4488
        s.sendLDMToRoutes()
6✔
4489
        s.sendLDMToClients()
6✔
4490
        s.mu.Unlock()
6✔
4491

6✔
4492
        t := time.NewTimer(gp)
6✔
4493
        // Delay start of closing of client connections in case
6✔
4494
        // we have several servers that we want to signal to enter LD mode
6✔
4495
        // and not have their client reconnect to each other.
6✔
4496
        select {
6✔
4497
        case <-t.C:
6✔
4498
                s.Noticef("Closing existing clients")
6✔
4499
        case <-s.quitCh:
×
4500
                t.Stop()
×
4501
                return
×
4502
        }
4503
        for i, client := range clients {
22✔
4504
                client.closeConnection(ServerShutdown)
16✔
4505
                if i == len(clients)-1 {
22✔
4506
                        break
6✔
4507
                }
4508
                if batch == 1 || i%batch == 0 {
20✔
4509
                        // We pick a random interval which will be at least si/2
10✔
4510
                        v := rand.Int63n(si)
10✔
4511
                        if v < si/2 {
16✔
4512
                                v = si / 2
6✔
4513
                        }
6✔
4514
                        t.Reset(time.Duration(v))
10✔
4515
                        // Sleep for given interval or bail out if kicked by Shutdown().
10✔
4516
                        select {
10✔
4517
                        case <-t.C:
10✔
4518
                        case <-s.quitCh:
×
4519
                                t.Stop()
×
4520
                                return
×
4521
                        }
4522
                }
4523
        }
4524
        s.Shutdown()
6✔
4525
        s.WaitForShutdown()
6✔
4526
}
4527

4528
// Send an INFO update to routes with the indication that this server is in LDM mode.
4529
// Server lock is held on entry.
4530
func (s *Server) sendLDMToRoutes() {
6✔
4531
        s.routeInfo.LameDuckMode = true
6✔
4532
        infoJSON := generateInfoJSON(&s.routeInfo)
6✔
4533
        s.forEachRemote(func(r *client) {
17✔
4534
                r.mu.Lock()
11✔
4535
                r.enqueueProto(infoJSON)
11✔
4536
                r.mu.Unlock()
11✔
4537
        })
11✔
4538
        // Clear now so that we notify only once, should we have to send other INFOs.
4539
        s.routeInfo.LameDuckMode = false
6✔
4540
}
4541

4542
// Send an INFO update to clients with the indication that this server is in
4543
// LDM mode and with only URLs of other nodes.
4544
// Server lock is held on entry.
4545
func (s *Server) sendLDMToClients() {
6✔
4546
        s.info.LameDuckMode = true
6✔
4547
        // Clear this so that if there are further updates, we don't send our URLs.
6✔
4548
        s.clientConnectURLs = s.clientConnectURLs[:0]
6✔
4549
        if s.websocket.connectURLs != nil {
6✔
4550
                s.websocket.connectURLs = s.websocket.connectURLs[:0]
×
4551
        }
×
4552
        // Reset content first.
4553
        s.info.ClientConnectURLs = s.info.ClientConnectURLs[:0]
6✔
4554
        s.info.WSConnectURLs = s.info.WSConnectURLs[:0]
6✔
4555
        // Only add the other nodes if we are allowed to.
6✔
4556
        if !s.getOpts().Cluster.NoAdvertise {
12✔
4557
                for url := range s.clientConnectURLsMap {
18✔
4558
                        s.info.ClientConnectURLs = append(s.info.ClientConnectURLs, url)
12✔
4559
                }
12✔
4560
                for url := range s.websocket.connectURLsMap {
6✔
4561
                        s.info.WSConnectURLs = append(s.info.WSConnectURLs, url)
×
4562
                }
×
4563
        }
4564
        // Send to all registered clients that support async INFO protocols.
4565
        s.sendAsyncInfoToClients(true, true)
6✔
4566
        // We now clear the info.LameDuckMode flag so that if there are
6✔
4567
        // cluster updates and we send the INFO, we don't have the boolean
6✔
4568
        // set which would cause multiple LDM notifications to clients.
6✔
4569
        s.info.LameDuckMode = false
6✔
4570
}
4571

4572
// If given error is a net.Error and is temporary, sleeps for the given
4573
// delay and double it, but cap it to ACCEPT_MAX_SLEEP. The sleep is
4574
// interrupted if the server is shutdown.
4575
// An error message is displayed depending on the type of error.
4576
// Returns the new (or unchanged) delay, or a negative value if the
4577
// server has been or is being shutdown.
4578
func (s *Server) acceptError(acceptName string, err error, tmpDelay time.Duration) time.Duration {
16,894✔
4579
        if !s.isRunning() {
33,788✔
4580
                return -1
16,894✔
4581
        }
16,894✔
4582
        //lint:ignore SA1019 We want to retry on a bunch of errors here.
4583
        if ne, ok := err.(net.Error); ok && ne.Temporary() { // nolint:staticcheck
×
4584
                s.Errorf("Temporary %s Accept Error(%v), sleeping %dms", acceptName, ne, tmpDelay/time.Millisecond)
×
4585
                select {
×
4586
                case <-time.After(tmpDelay):
×
4587
                case <-s.quitCh:
×
4588
                        return -1
×
4589
                }
4590
                tmpDelay *= 2
×
4591
                if tmpDelay > ACCEPT_MAX_SLEEP {
×
4592
                        tmpDelay = ACCEPT_MAX_SLEEP
×
4593
                }
×
4594
        } else {
×
4595
                s.Errorf("%s Accept error: %v", acceptName, err)
×
4596
        }
×
4597
        return tmpDelay
×
4598
}
4599

4600
var errNoIPAvail = errors.New("no IP available")
4601

4602
func (s *Server) getRandomIP(resolver netResolver, url string, excludedAddresses map[string]struct{}) (string, error) {
134,119✔
4603
        host, port, err := net.SplitHostPort(url)
134,119✔
4604
        if err != nil {
134,120✔
4605
                return "", err
1✔
4606
        }
1✔
4607
        // If already an IP, skip.
4608
        if net.ParseIP(host) != nil {
268,096✔
4609
                return url, nil
133,978✔
4610
        }
133,978✔
4611
        ips, err := resolver.LookupHost(context.Background(), host)
140✔
4612
        if err != nil {
145✔
4613
                return "", fmt.Errorf("lookup for host %q: %v", host, err)
5✔
4614
        }
5✔
4615
        if len(excludedAddresses) > 0 {
164✔
4616
                for i := 0; i < len(ips); i++ {
87✔
4617
                        ip := ips[i]
58✔
4618
                        addr := net.JoinHostPort(ip, port)
58✔
4619
                        if _, excluded := excludedAddresses[addr]; excluded {
64✔
4620
                                if len(ips) == 1 {
9✔
4621
                                        ips = nil
3✔
4622
                                        break
3✔
4623
                                }
4624
                                ips[i] = ips[len(ips)-1]
3✔
4625
                                ips = ips[:len(ips)-1]
3✔
4626
                                i--
3✔
4627
                        }
4628
                }
4629
                if len(ips) == 0 {
32✔
4630
                        return "", errNoIPAvail
3✔
4631
                }
3✔
4632
        }
4633
        var address string
132✔
4634
        if len(ips) == 0 {
133✔
4635
                s.Warnf("Unable to get IP for %s, will try with %s: %v", host, url, err)
1✔
4636
                address = url
1✔
4637
        } else {
132✔
4638
                var ip string
131✔
4639
                if len(ips) == 1 {
137✔
4640
                        ip = ips[0]
6✔
4641
                } else {
131✔
4642
                        ip = ips[rand.Int31n(int32(len(ips)))]
125✔
4643
                }
125✔
4644
                // add the port
4645
                address = net.JoinHostPort(ip, port)
131✔
4646
        }
4647
        return address, nil
132✔
4648
}
4649

4650
// Returns true for the first attempt and depending on the nature
4651
// of the attempt (first connect or a reconnect), when the number
4652
// of attempts is equal to the configured report attempts.
4653
func (s *Server) shouldReportConnectErr(firstConnect bool, attempts int) bool {
73,913✔
4654
        opts := s.getOpts()
73,913✔
4655
        if firstConnect {
117,426✔
4656
                if attempts == 1 || attempts%opts.ConnectErrorReports == 0 {
53,344✔
4657
                        return true
9,831✔
4658
                }
9,831✔
4659
                return false
33,682✔
4660
        }
4661
        if attempts == 1 || attempts%opts.ReconnectErrorReports == 0 {
60,800✔
4662
                return true
30,400✔
4663
        }
30,400✔
4664
        return false
×
4665
}
4666

4667
func (s *Server) updateRemoteSubscription(acc *Account, sub *subscription, delta int32) {
3,025✔
4668
        s.updateRouteSubscriptionMap(acc, sub, delta)
3,025✔
4669
        if s.gateway.enabled {
3,684✔
4670
                s.gatewayUpdateSubInterest(acc.Name, sub, delta)
659✔
4671
        }
659✔
4672

4673
        acc.updateLeafNodes(sub, delta)
3,025✔
4674
}
4675

4676
func (s *Server) startRateLimitLogExpiration() {
6,850✔
4677
        interval := time.Second
6,850✔
4678
        s.startGoRoutine(func() {
13,700✔
4679
                defer s.grWG.Done()
6,850✔
4680

6,850✔
4681
                ticker := time.NewTicker(time.Second)
6,850✔
4682
                defer ticker.Stop()
6,850✔
4683
                for {
31,150✔
4684
                        select {
24,300✔
4685
                        case <-s.quitCh:
6,843✔
4686
                                return
6,843✔
4687
                        case interval = <-s.rateLimitLoggingCh:
×
4688
                                ticker.Reset(interval)
×
4689
                        case <-ticker.C:
17,450✔
4690
                                s.rateLimitLogging.Range(func(k, v any) bool {
20,719✔
4691
                                        start := v.(time.Time)
3,269✔
4692
                                        if time.Since(start) >= interval {
4,546✔
4693
                                                s.rateLimitLogging.Delete(k)
1,277✔
4694
                                        }
1,277✔
4695
                                        return true
3,269✔
4696
                                })
4697
                        }
4698
                }
4699
        })
4700
}
4701

4702
func (s *Server) changeRateLimitLogInterval(d time.Duration) {
×
4703
        if d <= 0 {
×
4704
                return
×
4705
        }
×
4706
        select {
×
4707
        case s.rateLimitLoggingCh <- d:
×
4708
        default:
×
4709
        }
4710
}
4711

4712
// DisconnectClientByID disconnects a client by connection ID
4713
func (s *Server) DisconnectClientByID(id uint64) error {
2✔
4714
        if s == nil {
2✔
4715
                return ErrServerNotRunning
×
4716
        }
×
4717
        if client := s.getClient(id); client != nil {
3✔
4718
                client.closeConnection(Kicked)
1✔
4719
                return nil
1✔
4720
        } else if client = s.GetLeafNode(id); client != nil {
3✔
4721
                client.closeConnection(Kicked)
1✔
4722
                return nil
1✔
4723
        }
1✔
4724
        return errors.New("no such client or leafnode id")
×
4725
}
4726

4727
// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
4728
func (s *Server) LDMClientByID(id uint64) error {
1✔
4729
        if s == nil {
1✔
4730
                return ErrServerNotRunning
×
4731
        }
×
4732
        s.mu.RLock()
1✔
4733
        c := s.clients[id]
1✔
4734
        if c == nil {
1✔
4735
                s.mu.RUnlock()
×
4736
                return errors.New("no such client id")
×
4737
        }
×
4738
        info := s.copyInfo()
1✔
4739
        info.LameDuckMode = true
1✔
4740
        s.mu.RUnlock()
1✔
4741
        c.mu.Lock()
1✔
4742
        defer c.mu.Unlock()
1✔
4743
        if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
2✔
4744
                // sendInfo takes care of checking if the connection is still
1✔
4745
                // valid or not, so don't duplicate tests here.
1✔
4746
                c.Debugf("Sending Lame Duck Mode info to client")
1✔
4747
                c.enqueueProto(c.generateClientInfoJSON(info, true))
1✔
4748
                return nil
1✔
4749
        } else {
1✔
4750
                return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
×
4751
        }
×
4752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc