• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 17572013453

08 Sep 2025 04:12PM UTC coverage: 86.033% (-0.02%) from 86.048%
17572013453

push

github

neilalexander
Release v2.12.0-RC.1

Signed-off-by: Neil Twigg <neil@nats.io>

74115 of 86147 relevant lines covered (86.03%)

347566.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.11
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Whether or not we want to propagate east-west interest from other LNs.
86
        isolated bool
87
        // Used to suppress sub and unsub interest. Same as routes but our audience
88
        // here is tied to this leaf node. This will hold all subscriptions except this
89
        // leaf nodes. This represents all the interest we want to send to the other side.
90
        smap map[string]int32
91
        // This map will contain all the subscriptions that have been added to the smap
92
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
93
        // race between processing of a sub where sub is added to account sublist but
94
        // updateSmap has not be called on that "thread", while in the LN readloop,
95
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
96
        // this subscription to smap. When processing of the sub then calls updateSmap,
97
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
98
        tsub  map[*subscription]struct{}
99
        tsubt *time.Timer
100
        // Selected compression mode, which may be different from the server configured mode.
101
        compression string
102
        // This is for GW map replies.
103
        gwSub *subscription
104
}
105

106
// Used for remote (solicited) leafnodes.
107
type leafNodeCfg struct {
108
        sync.RWMutex
109
        *RemoteLeafOpts
110
        urls           []*url.URL
111
        curURL         *url.URL
112
        tlsName        string
113
        username       string
114
        password       string
115
        perms          *Permissions
116
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
117
        jsMigrateTimer *time.Timer
118
}
119

120
// Check to see if this is a solicited leafnode. We do special processing for solicited.
121
func (c *client) isSolicitedLeafNode() bool {
2,108✔
122
        return c.kind == LEAF && c.leaf.remote != nil
2,108✔
123
}
2,108✔
124

125
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
126
// connection leafnode where the otherside has declared itself to be the hub.
127
func (c *client) isSpokeLeafNode() bool {
6,417,209✔
128
        return c.kind == LEAF && c.leaf.isSpoke
6,417,209✔
129
}
6,417,209✔
130

131
func (c *client) isHubLeafNode() bool {
18,796✔
132
        return c.kind == LEAF && !c.leaf.isSpoke
18,796✔
133
}
18,796✔
134

135
func (c *client) isIsolatedLeafNode() bool {
11,375✔
136
        return c.kind == LEAF && c.leaf.isolated
11,375✔
137
}
11,375✔
138

139
// This will spin up go routines to solicit the remote leaf node connections.
140
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,167✔
141
        sysAccName := _EMPTY_
1,167✔
142
        sAcc := s.SystemAccount()
1,167✔
143
        if sAcc != nil {
2,311✔
144
                sysAccName = sAcc.Name
1,144✔
145
        }
1,144✔
146
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,480✔
147
                s.mu.Lock()
1,313✔
148
                remote := newLeafNodeCfg(r)
1,313✔
149
                creds := remote.Credentials
1,313✔
150
                accName := remote.LocalAccount
1,313✔
151
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
1,313✔
152
                // Print notice if
1,313✔
153
                if isSysAccRemote {
1,408✔
154
                        if len(remote.DenyExports) > 0 {
96✔
155
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
156
                        }
1✔
157
                        if len(remote.DenyImports) > 0 {
96✔
158
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
159
                        }
1✔
160
                }
161
                s.mu.Unlock()
1,313✔
162
                if creds != _EMPTY_ {
1,363✔
163
                        contents, err := os.ReadFile(creds)
50✔
164
                        defer wipeSlice(contents)
50✔
165
                        if err != nil {
50✔
166
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
167
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
50✔
168
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
169
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
50✔
170
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
171
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
50✔
172
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
173
                        } else if isSysAccRemote {
54✔
174
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
175
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
176
                                }
1✔
177
                        } else {
46✔
178
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
52✔
179
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
180
                                }
6✔
181
                        }
182
                }
183
                return remote
1,313✔
184
        }
185
        for _, r := range remotes {
2,480✔
186
                // We need to call this, even if the leaf is disabled. This is so that
1,313✔
187
                // the number of internal configuration matches the options' remote leaf
1,313✔
188
                // configuration required for configuration reload.
1,313✔
189
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,313✔
190
                if !r.Disabled {
2,625✔
191
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
2,624✔
192
                }
193
        }
194
}
195

196
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
7,692✔
197
        if remote.Disabled {
7,693✔
198
                return false
1✔
199
        }
1✔
200
        for _, ri := range s.getOpts().LeafNode.Remotes {
15,761✔
201
                // FIXME(dlc) - What about auth changes?
8,070✔
202
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
15,761✔
203
                        return true
7,691✔
204
                }
7,691✔
205
        }
206
        return false
×
207
}
208

209
// Ensure that leafnode is properly configured.
210
func validateLeafNode(o *Options) error {
8,705✔
211
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,707✔
212
                return err
2✔
213
        }
2✔
214

215
        // Users can bind to any local account, if its empty we will assume the $G account.
216
        for _, r := range o.LeafNode.Remotes {
10,057✔
217
                if r.LocalAccount == _EMPTY_ {
1,794✔
218
                        r.LocalAccount = globalAccountName
440✔
219
                }
440✔
220
        }
221

222
        // In local config mode, check that leafnode configuration refers to accounts that exist.
223
        if len(o.TrustedOperators) == 0 {
17,085✔
224
                accNames := map[string]struct{}{}
8,382✔
225
                for _, a := range o.Accounts {
17,550✔
226
                        accNames[a.Name] = struct{}{}
9,168✔
227
                }
9,168✔
228
                // global account is always created
229
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,382✔
230
                // in the context of leaf nodes, empty account means global account
8,382✔
231
                accNames[_EMPTY_] = struct{}{}
8,382✔
232
                // system account either exists or, if not disabled, will be created
8,382✔
233
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
15,037✔
234
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,655✔
235
                }
6,655✔
236
                checkAccountExists := func(accName string, cfgType string) error {
18,124✔
237
                        if _, ok := accNames[accName]; !ok {
9,744✔
238
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
239
                        }
2✔
240
                        return nil
9,740✔
241
                }
242
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,383✔
243
                        return err
1✔
244
                }
1✔
245
                for _, lu := range o.LeafNode.Users {
8,398✔
246
                        if lu.Account == nil { // means global account
27✔
247
                                continue
10✔
248
                        }
249
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
250
                                return err
×
251
                        }
×
252
                }
253
                for _, r := range o.LeafNode.Remotes {
9,734✔
254
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,354✔
255
                                return err
1✔
256
                        }
1✔
257
                }
258
        } else {
321✔
259
                if len(o.LeafNode.Users) != 0 {
322✔
260
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
261
                }
1✔
262
                for _, r := range o.LeafNode.Remotes {
321✔
263
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
264
                                return fmt.Errorf(
1✔
265
                                        "operator mode requires account nkeys in remotes. " +
1✔
266
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
267
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
268
                        }
1✔
269
                }
270
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
320✔
271
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
272
                }
1✔
273
        }
274

275
        // Validate compression settings
276
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,790✔
277
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,097✔
278
                        return err
5✔
279
                }
5✔
280
        }
281

282
        // If a remote has a websocket scheme, all need to have it.
283
        for _, rcfg := range o.LeafNode.Remotes {
10,045✔
284
                if len(rcfg.URLs) >= 2 {
1,562✔
285
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
210✔
286
                        for i := 1; i < len(rcfg.URLs); i++ {
665✔
287
                                u := rcfg.URLs[i]
455✔
288
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
462✔
289
                                        ok = false
7✔
290
                                        break
7✔
291
                                }
292
                        }
293
                        if !ok {
217✔
294
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
295
                        }
7✔
296
                }
297
                // Validate compression settings
298
                if rcfg.Compression.Mode != _EMPTY_ {
2,690✔
299
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,350✔
300
                                return err
5✔
301
                        }
5✔
302
                }
303
        }
304

305
        if o.LeafNode.Port == 0 {
13,864✔
306
                return nil
5,183✔
307
        }
5,183✔
308

309
        // If MinVersion is defined, check that it is valid.
310
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,502✔
311
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
312
                        return err
2✔
313
                }
2✔
314
        }
315

316
        // The checks below will be done only when detecting that we are configured
317
        // with gateways. So if an option validation needs to be done regardless,
318
        // it MUST be done before this point!
319

320
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,308✔
321
                return nil
2,812✔
322
        }
2,812✔
323
        // If we are here we have both leaf nodes and gateways defined, make sure there
324
        // is a system account defined.
325
        if o.SystemAccount == _EMPTY_ {
685✔
326
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
327
        }
1✔
328
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
683✔
329
                return fmt.Errorf("leafnode: %v", err)
×
330
        }
×
331
        return nil
683✔
332
}
333

334
func checkLeafMinVersionConfig(mv string) error {
8✔
335
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
336
                if err != nil {
6✔
337
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
338
                } else {
4✔
339
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
340
                }
2✔
341
        }
342
        return nil
4✔
343
}
344

345
// Used to validate user names in LeafNode configuration.
346
// - rejects mix of single and multiple users.
347
// - rejects duplicate user names.
348
func validateLeafNodeAuthOptions(o *Options) error {
8,764✔
349
        if len(o.LeafNode.Users) == 0 {
17,501✔
350
                return nil
8,737✔
351
        }
8,737✔
352
        if o.LeafNode.Username != _EMPTY_ {
29✔
353
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
354
        }
2✔
355
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
356
                return fmt.Errorf("can not have a single nkey and a users array")
×
357
        }
×
358
        users := map[string]struct{}{}
25✔
359
        for _, u := range o.LeafNode.Users {
66✔
360
                if _, exists := users[u.Username]; exists {
43✔
361
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
362
                }
2✔
363
                users[u.Username] = struct{}{}
39✔
364
        }
365
        return nil
23✔
366
}
367

368
// Update remote LeafNode TLS configurations after a config reload.
369
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
15✔
370
        max := len(opts.LeafNode.Remotes)
15✔
371
        if max == 0 {
15✔
372
                return
×
373
        }
×
374

375
        s.mu.RLock()
15✔
376
        defer s.mu.RUnlock()
15✔
377

15✔
378
        // Changes in the list of remote leaf nodes is not supported.
15✔
379
        // However, make sure that we don't go over the arrays.
15✔
380
        if len(s.leafRemoteCfgs) < max {
15✔
381
                max = len(s.leafRemoteCfgs)
×
382
        }
×
383
        for i := 0; i < max; i++ {
34✔
384
                ro := opts.LeafNode.Remotes[i]
19✔
385
                cfg := s.leafRemoteCfgs[i]
19✔
386
                if ro.TLSConfig != nil {
21✔
387
                        cfg.Lock()
2✔
388
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
389
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
390
                        cfg.Unlock()
2✔
391
                }
2✔
392
        }
393
}
394

395
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
242✔
396
        delay := s.getOpts().LeafNode.ReconnectInterval
242✔
397
        select {
242✔
398
        case <-time.After(delay):
187✔
399
        case <-s.quitCh:
55✔
400
                s.grWG.Done()
55✔
401
                return
55✔
402
        }
403
        s.connectToRemoteLeafNode(remote, false)
187✔
404
}
405

406
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
407
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,313✔
408
        cfg := &leafNodeCfg{
1,313✔
409
                RemoteLeafOpts: remote,
1,313✔
410
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,313✔
411
        }
1,313✔
412
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,321✔
413
                perms := &Permissions{}
8✔
414
                if len(remote.DenyExports) > 0 {
16✔
415
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
416
                }
8✔
417
                if len(remote.DenyImports) > 0 {
15✔
418
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
419
                }
7✔
420
                cfg.perms = perms
8✔
421
        }
422
        // Start with the one that is configured. We will add to this
423
        // array when receiving async leafnode INFOs.
424
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,313✔
425
        // If allowed to randomize, do it on our copy of URLs
1,313✔
426
        if !remote.NoRandomize {
2,624✔
427
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,723✔
428
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
412✔
429
                })
412✔
430
        }
431
        // If we are TLS make sure we save off a proper servername if possible.
432
        // Do same for user/password since we may need them to connect to
433
        // a bare URL that we get from INFO protocol.
434
        for _, u := range cfg.urls {
3,068✔
435
                cfg.saveTLSHostname(u)
1,755✔
436
                cfg.saveUserPassword(u)
1,755✔
437
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,755✔
438
                // config, mark that we should be using TLS anyway.
1,755✔
439
                if !cfg.TLS && isWSSURL(u) {
1,756✔
440
                        cfg.TLS = true
1✔
441
                }
1✔
442
        }
443
        return cfg
1,313✔
444
}
445

446
// Will pick an URL from the list of available URLs.
447
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,893✔
448
        cfg.Lock()
6,893✔
449
        defer cfg.Unlock()
6,893✔
450
        // If the current URL is the first in the list and we have more than
6,893✔
451
        // one URL, then move that one to end of the list.
6,893✔
452
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
10,482✔
453
                first := cfg.urls[0]
3,589✔
454
                copy(cfg.urls, cfg.urls[1:])
3,589✔
455
                cfg.urls[len(cfg.urls)-1] = first
3,589✔
456
        }
3,589✔
457
        cfg.curURL = cfg.urls[0]
6,893✔
458
        return cfg.curURL
6,893✔
459
}
460

461
// Returns the current URL
462
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
81✔
463
        cfg.RLock()
81✔
464
        defer cfg.RUnlock()
81✔
465
        return cfg.curURL
81✔
466
}
81✔
467

468
// Returns how long the server should wait before attempting
469
// to solicit a remote leafnode connection.
470
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,501✔
471
        cfg.RLock()
1,501✔
472
        delay := cfg.connDelay
1,501✔
473
        cfg.RUnlock()
1,501✔
474
        return delay
1,501✔
475
}
1,501✔
476

477
// Sets the connect delay.
478
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
153✔
479
        cfg.Lock()
153✔
480
        cfg.connDelay = delay
153✔
481
        cfg.Unlock()
153✔
482
}
153✔
483

484
// Ensure that non-exported options (used in tests) have
485
// been properly set.
486
func (s *Server) setLeafNodeNonExportedOptions() {
7,169✔
487
        opts := s.getOpts()
7,169✔
488
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
7,169✔
489
        if s.leafNodeOpts.dialTimeout == 0 {
14,337✔
490
                // Use same timeouts as routes for now.
7,168✔
491
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
7,168✔
492
        }
7,168✔
493
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
7,169✔
494
        if s.leafNodeOpts.resolver == nil {
14,334✔
495
                s.leafNodeOpts.resolver = net.DefaultResolver
7,165✔
496
        }
7,165✔
497
}
498

499
const sharedSysAccDelay = 250 * time.Millisecond
500

501
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
1,501✔
502
        defer s.grWG.Done()
1,501✔
503

1,501✔
504
        if remote == nil || len(remote.URLs) == 0 {
1,501✔
505
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
506
                return
×
507
        }
×
508

509
        opts := s.getOpts()
1,501✔
510
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,501✔
511
        s.mu.RLock()
1,501✔
512
        dialTimeout := s.leafNodeOpts.dialTimeout
1,501✔
513
        resolver := s.leafNodeOpts.resolver
1,501✔
514
        var isSysAcc bool
1,501✔
515
        if s.eventsEnabled() {
2,968✔
516
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,467✔
517
        }
1,467✔
518
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,501✔
519
        s.mu.RUnlock()
1,501✔
520

1,501✔
521
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,501✔
522
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,573✔
523
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
72✔
524
                remote.setConnectDelay(sharedSysAccDelay)
72✔
525
        }
72✔
526

527
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,579✔
528
                select {
78✔
529
                case <-time.After(connDelay):
70✔
530
                case <-s.quitCh:
8✔
531
                        return
8✔
532
                }
533
                remote.setConnectDelay(0)
70✔
534
        }
535

536
        var conn net.Conn
1,493✔
537

1,493✔
538
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,493✔
539

1,493✔
540
        attempts := 0
1,493✔
541

1,493✔
542
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
8,386✔
543
                rURL := remote.pickNextURL()
6,893✔
544
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,893✔
545
                if err == nil {
13,779✔
546
                        var ipStr string
6,886✔
547
                        if url != rURL.Host {
6,962✔
548
                                ipStr = fmt.Sprintf(" (%s)", url)
76✔
549
                        }
76✔
550
                        // Some test may want to disable remotes from connecting
551
                        if s.isLeafConnectDisabled() {
7,022✔
552
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
136✔
553
                                err = ErrLeafNodeDisabled
136✔
554
                        } else {
6,886✔
555
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,750✔
556
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,750✔
557
                        }
6,750✔
558
                }
559
                if err != nil {
12,988✔
560
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
6,095✔
561
                        delay := reconnectDelay + jitter
6,095✔
562
                        attempts++
6,095✔
563
                        if s.shouldReportConnectErr(firstConnect, attempts) {
10,392✔
564
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
4,297✔
565
                        } else {
6,095✔
566
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,798✔
567
                        }
1,798✔
568
                        remote.Lock()
6,095✔
569
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
6,095✔
570
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
6,103✔
571
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
572
                                        s.checkJetStreamMigrate(remote)
8✔
573
                                })
8✔
574
                        }
575
                        remote.Unlock()
6,095✔
576
                        select {
6,095✔
577
                        case <-s.quitCh:
684✔
578
                                remote.cancelMigrateTimer()
684✔
579
                                return
684✔
580
                        case <-time.After(delay):
5,410✔
581
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
5,410✔
582
                                // This will be used if JetStreamClusterMigrateDelay was not set
5,410✔
583
                                if jetstreamMigrateDelay == 0 {
10,748✔
584
                                        s.checkJetStreamMigrate(remote)
5,338✔
585
                                }
5,338✔
586
                                continue
5,410✔
587
                        }
588
                }
589
                remote.cancelMigrateTimer()
798✔
590
                if !s.remoteLeafNodeStillValid(remote) {
798✔
591
                        conn.Close()
×
592
                        return
×
593
                }
×
594

595
                // We have a connection here to a remote server.
596
                // Go ahead and create our leaf node and return.
597
                s.createLeafNode(conn, rURL, remote, nil)
798✔
598

798✔
599
                // Clear any observer states if we had them.
798✔
600
                s.clearObserverState(remote)
798✔
601

798✔
602
                return
798✔
603
        }
604
}
605

606
func (cfg *leafNodeCfg) cancelMigrateTimer() {
1,482✔
607
        cfg.Lock()
1,482✔
608
        stopAndClearTimer(&cfg.jsMigrateTimer)
1,482✔
609
        cfg.Unlock()
1,482✔
610
}
1,482✔
611

612
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
613
func (s *Server) clearObserverState(remote *leafNodeCfg) {
798✔
614
        s.mu.RLock()
798✔
615
        accName := remote.LocalAccount
798✔
616
        s.mu.RUnlock()
798✔
617

798✔
618
        acc, err := s.LookupAccount(accName)
798✔
619
        if err != nil {
800✔
620
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
621
                return
2✔
622
        }
2✔
623

624
        acc.jscmMu.Lock()
796✔
625
        defer acc.jscmMu.Unlock()
796✔
626

796✔
627
        // Walk all streams looking for any clustered stream, skip otherwise.
796✔
628
        for _, mset := range acc.streams() {
814✔
629
                node := mset.raftNode()
18✔
630
                if node == nil {
28✔
631
                        // Not R>1
10✔
632
                        continue
10✔
633
                }
634
                // Check consumers
635
                for _, o := range mset.getConsumers() {
10✔
636
                        if n := o.raftNode(); n != nil {
4✔
637
                                // Ensure we can become a leader again.
2✔
638
                                n.SetObserver(false)
2✔
639
                        }
2✔
640
                }
641
                // Ensure we can not become a leader again.
642
                node.SetObserver(false)
8✔
643
        }
644
}
645

646
// Check to see if we should migrate any assets from this account.
647
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
5,346✔
648
        s.mu.RLock()
5,346✔
649
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
5,346✔
650
        s.mu.RUnlock()
5,346✔
651

5,346✔
652
        if !shouldMigrate {
10,620✔
653
                return
5,274✔
654
        }
5,274✔
655

656
        acc, err := s.LookupAccount(accName)
72✔
657
        if err != nil {
72✔
658
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
659
                return
×
660
        }
×
661

662
        acc.jscmMu.Lock()
72✔
663
        defer acc.jscmMu.Unlock()
72✔
664

72✔
665
        // Walk all streams looking for any clustered stream, skip otherwise.
72✔
666
        // If we are the leader force stepdown.
72✔
667
        for _, mset := range acc.streams() {
109✔
668
                node := mset.raftNode()
37✔
669
                if node == nil {
37✔
670
                        // Not R>1
×
671
                        continue
×
672
                }
673
                // Collect any consumers
674
                for _, o := range mset.getConsumers() {
62✔
675
                        if n := o.raftNode(); n != nil {
50✔
676
                                n.StepDown()
25✔
677
                                // Ensure we can not become a leader while in this state.
25✔
678
                                n.SetObserver(true)
25✔
679
                        }
25✔
680
                }
681
                // Stepdown if this stream was leader.
682
                node.StepDown()
37✔
683
                // Ensure we can not become a leader while in this state.
37✔
684
                node.SetObserver(true)
37✔
685
        }
686
}
687

688
// Helper for checking.
689
func (s *Server) isLeafConnectDisabled() bool {
6,886✔
690
        s.mu.RLock()
6,886✔
691
        defer s.mu.RUnlock()
6,886✔
692
        return s.leafDisableConnect
6,886✔
693
}
6,886✔
694

695
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
696
// come from the server we connect to.
697
//
698
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
699
// However, this was causing failures for users that did not set the scheme (and
700
// their remote connections did not have a tls{} block).
701
// We now save the host name regardless in case the remote returns an INFO indicating
702
// that TLS is required.
703
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,414✔
704
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,434✔
705
                cfg.tlsName = u.Hostname()
20✔
706
        }
20✔
707
}
708

709
// Save off the username/password for when we connect using a bare URL
710
// that we get from the INFO protocol.
711
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,755✔
712
        if cfg.username == _EMPTY_ && u.User != nil {
2,049✔
713
                cfg.username = u.User.Username()
294✔
714
                cfg.password, _ = u.User.Password()
294✔
715
        }
294✔
716
}
717

718
// This starts the leafnode accept loop in a go routine, unless it
719
// is detected that the server has already been shutdown.
720
func (s *Server) startLeafNodeAcceptLoop() {
3,464✔
721
        // Snapshot server options.
3,464✔
722
        opts := s.getOpts()
3,464✔
723

3,464✔
724
        port := opts.LeafNode.Port
3,464✔
725
        if port == -1 {
6,751✔
726
                port = 0
3,287✔
727
        }
3,287✔
728

729
        if s.isShuttingDown() {
3,465✔
730
                return
1✔
731
        }
1✔
732

733
        s.mu.Lock()
3,463✔
734
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,463✔
735
        l, e := natsListen("tcp", hp)
3,463✔
736
        s.leafNodeListenerErr = e
3,463✔
737
        if e != nil {
3,463✔
738
                s.mu.Unlock()
×
739
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
740
                return
×
741
        }
×
742

743
        s.Noticef("Listening for leafnode connections on %s",
3,463✔
744
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,463✔
745

3,463✔
746
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,463✔
747
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,463✔
748
        // Do not set compression in this Info object, it would possibly cause
3,463✔
749
        // issues when sending asynchronous INFO to the remote.
3,463✔
750
        info := Info{
3,463✔
751
                ID:            s.info.ID,
3,463✔
752
                Name:          s.info.Name,
3,463✔
753
                Version:       s.info.Version,
3,463✔
754
                GitCommit:     gitCommit,
3,463✔
755
                GoVersion:     runtime.Version(),
3,463✔
756
                AuthRequired:  true,
3,463✔
757
                TLSRequired:   tlsRequired,
3,463✔
758
                TLSVerify:     tlsVerify,
3,463✔
759
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,463✔
760
                Headers:       s.supportsHeaders(),
3,463✔
761
                JetStream:     opts.JetStream,
3,463✔
762
                Domain:        opts.JetStreamDomain,
3,463✔
763
                Proto:         s.getServerProto(),
3,463✔
764
                InfoOnConnect: true,
3,463✔
765
                JSApiLevel:    JSApiLevel,
3,463✔
766
        }
3,463✔
767
        // If we have selected a random port...
3,463✔
768
        if port == 0 {
6,749✔
769
                // Write resolved port back to options.
3,286✔
770
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,286✔
771
        }
3,286✔
772

773
        s.leafNodeInfo = info
3,463✔
774
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,463✔
775
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,463✔
776
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
777
                l.Close()
×
778
                s.mu.Unlock()
×
779
                return
×
780
        }
×
781
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,463✔
782
        s.generateLeafNodeInfoJSON()
3,463✔
783

3,463✔
784
        // Setup state that can enable shutdown
3,463✔
785
        s.leafNodeListener = l
3,463✔
786

3,463✔
787
        // As of now, a server that does not have remotes configured would
3,463✔
788
        // never solicit a connection, so we should not have to warn if
3,463✔
789
        // InsecureSkipVerify is set in main LeafNodes config (since
3,463✔
790
        // this TLS setting matters only when soliciting a connection).
3,463✔
791
        // Still, warn if insecure is set in any of LeafNode block.
3,463✔
792
        // We need to check remotes, even if tls is not required on accept.
3,463✔
793
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,463✔
794
        if !warn {
6,922✔
795
                for _, r := range opts.LeafNode.Remotes {
3,649✔
796
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
191✔
797
                                warn = true
1✔
798
                                break
1✔
799
                        }
800
                }
801
        }
802
        if warn {
3,468✔
803
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
804
        }
5✔
805
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,302✔
806
        s.mu.Unlock()
3,463✔
807
}
808

809
// RegEx to match a creds file with user JWT and Seed.
810
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
811

812
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
813
// Lock should be held entering here.
814
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
677✔
815
        // We support basic user/pass and operator based user JWT with signatures.
677✔
816
        cinfo := leafConnectInfo{
677✔
817
                Version:       VERSION,
677✔
818
                ID:            c.srv.info.ID,
677✔
819
                Domain:        c.srv.info.Domain,
677✔
820
                Name:          c.srv.info.Name,
677✔
821
                Hub:           c.leaf.remote.Hub,
677✔
822
                Cluster:       clusterName,
677✔
823
                Headers:       headers,
677✔
824
                JetStream:     c.acc.jetStreamConfigured(),
677✔
825
                DenyPub:       c.leaf.remote.DenyImports,
677✔
826
                Compression:   c.leaf.compression,
677✔
827
                RemoteAccount: c.acc.GetName(),
677✔
828
                Proto:         c.srv.getServerProto(),
677✔
829
        }
677✔
830

677✔
831
        // If a signature callback is specified, this takes precedence over anything else.
677✔
832
        if cb := c.leaf.remote.SignatureCB; cb != nil {
682✔
833
                nonce := c.nonce
5✔
834
                c.mu.Unlock()
5✔
835
                jwt, sigraw, err := cb(nonce)
5✔
836
                c.mu.Lock()
5✔
837
                if err == nil && c.isClosed() {
6✔
838
                        err = ErrConnectionClosed
1✔
839
                }
1✔
840
                if err != nil {
7✔
841
                        c.Errorf("Error signing the nonce: %v", err)
2✔
842
                        return err
2✔
843
                }
2✔
844
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
845
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
846

847
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
726✔
848
                // Check for credentials first, that will take precedence..
54✔
849
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
54✔
850
                contents, err := os.ReadFile(creds)
54✔
851
                if err != nil {
54✔
852
                        c.Errorf("%v", err)
×
853
                        return err
×
854
                }
×
855
                defer wipeSlice(contents)
54✔
856
                items := credsRe.FindAllSubmatch(contents, -1)
54✔
857
                if len(items) < 2 {
54✔
858
                        c.Errorf("Credentials file malformed")
×
859
                        return err
×
860
                }
×
861
                // First result should be the user JWT.
862
                // We copy here so that the file containing the seed will be wiped appropriately.
863
                raw := items[0][1]
54✔
864
                tmp := make([]byte, len(raw))
54✔
865
                copy(tmp, raw)
54✔
866
                // Seed is second item.
54✔
867
                kp, err := nkeys.FromSeed(items[1][1])
54✔
868
                if err != nil {
54✔
869
                        c.Errorf("Credentials file has malformed seed")
×
870
                        return err
×
871
                }
×
872
                // Wipe our key on exit.
873
                defer kp.Wipe()
54✔
874

54✔
875
                sigraw, _ := kp.Sign(c.nonce)
54✔
876
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
54✔
877
                cinfo.JWT = bytesToString(tmp)
54✔
878
                cinfo.Sig = sig
54✔
879
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
623✔
880
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
881
                if err != nil {
5✔
882
                        c.Errorf("Remote nkey has malformed seed")
×
883
                        return err
×
884
                }
×
885
                // Wipe our key on exit.
886
                defer kp.Wipe()
5✔
887
                sigraw, _ := kp.Sign(c.nonce)
5✔
888
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
889
                pkey, _ := kp.PublicKey()
5✔
890
                cinfo.Nkey = pkey
5✔
891
                cinfo.Sig = sig
5✔
892
        }
893
        // In addition, and this is to allow auth callout, set user/password or
894
        // token if applicable.
895
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
990✔
896
                // For backward compatibility, if only username is provided, set both
315✔
897
                // Token and User, not just Token.
315✔
898
                cinfo.User = userInfo.Username()
315✔
899
                var ok bool
315✔
900
                cinfo.Pass, ok = userInfo.Password()
315✔
901
                if !ok {
321✔
902
                        cinfo.Token = cinfo.User
6✔
903
                }
6✔
904
        } else if c.leaf.remote.username != _EMPTY_ {
367✔
905
                cinfo.User = c.leaf.remote.username
7✔
906
                cinfo.Pass = c.leaf.remote.password
7✔
907
        }
7✔
908
        b, err := json.Marshal(cinfo)
675✔
909
        if err != nil {
675✔
910
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
911
                return err
×
912
        }
×
913
        // Although this call is made before the writeLoop is created,
914
        // we don't really need to send in place. The protocol will be
915
        // sent out by the writeLoop.
916
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
675✔
917
        return nil
675✔
918
}
919

920
// Makes a deep copy of the LeafNode Info structure.
921
// The server lock is held on entry.
922
func (s *Server) copyLeafNodeInfo() *Info {
2,718✔
923
        clone := s.leafNodeInfo
2,718✔
924
        // Copy the array of urls.
2,718✔
925
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,927✔
926
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,209✔
927
        }
2,209✔
928
        return &clone
2,718✔
929
}
930

931
// Adds a LeafNode URL that we get when a route connects to the Info structure.
932
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
933
// Returns a boolean indicating if the URL was added or not.
934
// Server lock is held on entry
935
func (s *Server) addLeafNodeURL(urlStr string) bool {
6,874✔
936
        if s.leafURLsMap.addUrl(urlStr) {
13,743✔
937
                s.generateLeafNodeInfoJSON()
6,869✔
938
                return true
6,869✔
939
        }
6,869✔
940
        return false
5✔
941
}
942

943
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
944
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
945
// Returns a boolean indicating if the URL was removed or not.
946
// Server lock is held on entry.
947
func (s *Server) removeLeafNodeURL(urlStr string) bool {
6,874✔
948
        // Don't need to do this if we are removing the route connection because
6,874✔
949
        // we are shuting down...
6,874✔
950
        if s.isShuttingDown() {
10,487✔
951
                return false
3,613✔
952
        }
3,613✔
953
        if s.leafURLsMap.removeUrl(urlStr) {
6,518✔
954
                s.generateLeafNodeInfoJSON()
3,257✔
955
                return true
3,257✔
956
        }
3,257✔
957
        return false
4✔
958
}
959

960
// Server lock is held on entry
961
func (s *Server) generateLeafNodeInfoJSON() {
13,589✔
962
        s.leafNodeInfo.Cluster = s.cachedClusterName()
13,589✔
963
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
13,589✔
964
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
13,589✔
965
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
13,589✔
966
}
13,589✔
967

968
// Sends an async INFO protocol so that the connected servers can update
969
// their list of LeafNode urls.
970
func (s *Server) sendAsyncLeafNodeInfo() {
10,126✔
971
        for _, c := range s.leafs {
10,234✔
972
                c.mu.Lock()
108✔
973
                c.enqueueProto(s.leafNodeInfoJSON)
108✔
974
                c.mu.Unlock()
108✔
975
        }
108✔
976
}
977

978
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
979
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,664✔
980
        // Snapshot server options.
1,664✔
981
        opts := s.getOpts()
1,664✔
982

1,664✔
983
        maxPay := int32(opts.MaxPayload)
1,664✔
984
        maxSubs := int32(opts.MaxSubs)
1,664✔
985
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,664✔
986
        if maxSubs == 0 {
3,327✔
987
                maxSubs = -1
1,663✔
988
        }
1,663✔
989
        now := time.Now().UTC()
1,664✔
990

1,664✔
991
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,664✔
992
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,664✔
993
        c.leaf = &leaf{}
1,664✔
994

1,664✔
995
        // If the leafnode subject interest should be isolated, flag it here.
1,664✔
996
        s.optsMu.RLock()
1,664✔
997
        c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest
1,664✔
998
        s.optsMu.RUnlock()
1,664✔
999

1,664✔
1000
        // For accepted LN connections, ws will be != nil if it was accepted
1,664✔
1001
        // through the Websocket port.
1,664✔
1002
        c.ws = ws
1,664✔
1003

1,664✔
1004
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,664✔
1005
        // a remote Leaf Node connection as a websocket connection.
1,664✔
1006
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,707✔
1007
                remote.RLock()
43✔
1008
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
1009
                remote.RUnlock()
43✔
1010
        }
43✔
1011

1012
        // Determines if we are soliciting the connection or not.
1013
        var solicited bool
1,664✔
1014
        var acc *Account
1,664✔
1015
        var remoteSuffix string
1,664✔
1016
        if remote != nil {
2,462✔
1017
                // For now, if lookup fails, we will constantly try
798✔
1018
                // to recreate this LN connection.
798✔
1019
                lacc := remote.LocalAccount
798✔
1020
                var err error
798✔
1021
                acc, err = s.LookupAccount(lacc)
798✔
1022
                if err != nil {
800✔
1023
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1024
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1025
                        // remote needs to be set or retry won't happen
2✔
1026
                        c.leaf.remote = remote
2✔
1027
                        c.closeConnection(MissingAccount)
2✔
1028
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1029
                        return nil
2✔
1030
                }
2✔
1031
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
796✔
1032
        }
1033

1034
        c.mu.Lock()
1,662✔
1035
        c.initClient()
1,662✔
1036
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,662✔
1037

1,662✔
1038
        var (
1,662✔
1039
                tlsFirst         bool
1,662✔
1040
                tlsFirstFallback time.Duration
1,662✔
1041
                infoTimeout      time.Duration
1,662✔
1042
        )
1,662✔
1043
        if remote != nil {
2,458✔
1044
                solicited = true
796✔
1045
                remote.Lock()
796✔
1046
                c.leaf.remote = remote
796✔
1047
                c.setPermissions(remote.perms)
796✔
1048
                if !c.leaf.remote.Hub {
1,580✔
1049
                        c.leaf.isSpoke = true
784✔
1050
                }
784✔
1051
                tlsFirst = remote.TLSHandshakeFirst
796✔
1052
                infoTimeout = remote.FirstInfoTimeout
796✔
1053
                remote.Unlock()
796✔
1054
                c.acc = acc
796✔
1055
        } else {
866✔
1056
                c.flags.set(expectConnect)
866✔
1057
                if ws != nil {
893✔
1058
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1059
                }
27✔
1060
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
866✔
1061
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
867✔
1062
                        tlsFirstFallback = f
1✔
1063
                }
1✔
1064
        }
1065
        c.mu.Unlock()
1,662✔
1066

1,662✔
1067
        var nonce [nonceLen]byte
1,662✔
1068
        var info *Info
1,662✔
1069

1,662✔
1070
        // Grab this before the client lock below.
1,662✔
1071
        if !solicited {
2,528✔
1072
                // Grab server variables
866✔
1073
                s.mu.Lock()
866✔
1074
                info = s.copyLeafNodeInfo()
866✔
1075
                // For tests that want to simulate old servers, do not set the compression
866✔
1076
                // on the INFO protocol if configured with CompressionNotSupported.
866✔
1077
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,731✔
1078
                        info.Compression = cm
865✔
1079
                }
865✔
1080
                // We always send a nonce for LEAF connections. Do not change that without
1081
                // taking into account presence of proxy trusted keys.
1082
                s.generateNonce(nonce[:])
866✔
1083
                s.mu.Unlock()
866✔
1084
        }
1085

1086
        // Grab lock
1087
        c.mu.Lock()
1,662✔
1088

1,662✔
1089
        var preBuf []byte
1,662✔
1090
        if solicited {
2,458✔
1091
                // For websocket connection, we need to send an HTTP request,
796✔
1092
                // and get the response before starting the readLoop to get
796✔
1093
                // the INFO, etc..
796✔
1094
                if c.isWebsocket() {
839✔
1095
                        var err error
43✔
1096
                        var closeReason ClosedState
43✔
1097

43✔
1098
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1099
                        if err != nil {
59✔
1100
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1101
                                c.mu.Unlock()
16✔
1102
                                if closeReason != 0 {
28✔
1103
                                        c.closeConnection(closeReason)
12✔
1104
                                }
12✔
1105
                                return nil
16✔
1106
                        }
1107
                } else {
753✔
1108
                        // If configured to do TLS handshake first
753✔
1109
                        if tlsFirst {
757✔
1110
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1111
                                        c.mu.Unlock()
1✔
1112
                                        return nil
1✔
1113
                                }
1✔
1114
                        }
1115
                        // We need to wait for the info, but not for too long.
1116
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
752✔
1117
                }
1118

1119
                // We will process the INFO from the readloop and finish by
1120
                // sending the CONNECT and finish registration later.
1121
        } else {
866✔
1122
                // Send our info to the other side.
866✔
1123
                // Remember the nonce we sent here for signatures, etc.
866✔
1124
                c.nonce = make([]byte, nonceLen)
866✔
1125
                copy(c.nonce, nonce[:])
866✔
1126
                info.Nonce = bytesToString(c.nonce)
866✔
1127
                info.CID = c.cid
866✔
1128
                proto := generateInfoJSON(info)
866✔
1129

866✔
1130
                var pre []byte
866✔
1131
                // We need first to check for "TLS First" fallback delay.
866✔
1132
                if tlsFirstFallback > 0 {
867✔
1133
                        // We wait and see if we are getting any data. Since we did not send
1✔
1134
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1135
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1136
                        // if it is a rogue agent and not an actual client performing the
1✔
1137
                        // TLS handshake, the error will be detected when performing the
1✔
1138
                        // handshake on our side.
1✔
1139
                        pre = make([]byte, 4)
1✔
1140
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1141
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1142
                        c.nc.SetReadDeadline(time.Time{})
1✔
1143
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1144
                        // with the TLS handshake.
1✔
1145
                        if n > 0 {
1✔
1146
                                pre = pre[:n]
×
1147
                        } else {
1✔
1148
                                // We did not get anything so we will send the INFO protocol.
1✔
1149
                                pre = nil
1✔
1150
                                // Set the boolean to false for the rest of the function.
1✔
1151
                                tlsFirst = false
1✔
1152
                        }
1✔
1153
                }
1154

1155
                if !tlsFirst {
1,727✔
1156
                        // We have to send from this go routine because we may
861✔
1157
                        // have to block for TLS handshake before we start our
861✔
1158
                        // writeLoop go routine. The other side needs to receive
861✔
1159
                        // this before it can initiate the TLS handshake..
861✔
1160
                        c.sendProtoNow(proto)
861✔
1161

861✔
1162
                        // The above call could have marked the connection as closed (due to TCP error).
861✔
1163
                        if c.isClosed() {
861✔
1164
                                c.mu.Unlock()
×
1165
                                c.closeConnection(WriteError)
×
1166
                                return nil
×
1167
                        }
×
1168
                }
1169

1170
                // Check to see if we need to spin up TLS.
1171
                if !c.isWebsocket() && info.TLSRequired {
940✔
1172
                        // If we have a prebuffer create a multi-reader.
74✔
1173
                        if len(pre) > 0 {
74✔
1174
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1175
                        }
×
1176
                        // Perform server-side TLS handshake.
1177
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
121✔
1178
                                c.mu.Unlock()
47✔
1179
                                return nil
47✔
1180
                        }
47✔
1181
                }
1182

1183
                // If the user wants the TLS handshake to occur first, now that it is
1184
                // done, send the INFO protocol.
1185
                if tlsFirst {
822✔
1186
                        c.flags.set(didTLSFirst)
3✔
1187
                        c.sendProtoNow(proto)
3✔
1188
                        if c.isClosed() {
3✔
1189
                                c.mu.Unlock()
×
1190
                                c.closeConnection(WriteError)
×
1191
                                return nil
×
1192
                        }
×
1193
                }
1194

1195
                // Leaf nodes will always require a CONNECT to let us know
1196
                // when we are properly bound to an account.
1197
                //
1198
                // If compression is configured, we can't set the authTimer here because
1199
                // it would cause the parser to fail any incoming protocol that is not a
1200
                // CONNECT (and we need to exchange INFO protocols for compression
1201
                // negotiation). So instead, use the ping timer until we are done with
1202
                // negotiation and can set the auth timer.
1203
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
819✔
1204
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,421✔
1205
                        c.ping.tmr = time.AfterFunc(timeout, func() {
610✔
1206
                                c.authTimeout()
8✔
1207
                        })
8✔
1208
                } else {
217✔
1209
                        c.setAuthTimer(timeout)
217✔
1210
                }
217✔
1211
        }
1212

1213
        // Keep track in case server is shutdown before we can successfully register.
1214
        if !s.addToTempClients(c.cid, c) {
1,599✔
1215
                c.mu.Unlock()
1✔
1216
                c.setNoReconnect()
1✔
1217
                c.closeConnection(ServerShutdown)
1✔
1218
                return nil
1✔
1219
        }
1✔
1220

1221
        // Spin up the read loop.
1222
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,194✔
1223

1224
        // We will spin the write loop for solicited connections only
1225
        // when processing the INFO and after switching to TLS if needed.
1226
        if !solicited {
2,416✔
1227
                s.startGoRoutine(func() { c.writeLoop() })
1,638✔
1228
        }
1229

1230
        c.mu.Unlock()
1,597✔
1231

1,597✔
1232
        return c
1,597✔
1233
}
1234

1235
// Will perform the client-side TLS handshake if needed. Assumes that this
1236
// is called by the solicit side (remote will be non nil). Returns `true`
1237
// if TLS is required, `false` otherwise.
1238
// Lock held on entry.
1239
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,936✔
1240
        // Check if TLS is required and gather TLS config variables.
1,936✔
1241
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,936✔
1242
        if !tlsRequired {
3,791✔
1243
                return false, nil
1,855✔
1244
        }
1,855✔
1245

1246
        // If TLS required, peform handshake.
1247
        // Get the URL that was used to connect to the remote server.
1248
        rURL := remote.getCurrentURL()
81✔
1249

81✔
1250
        // Perform the client-side TLS handshake.
81✔
1251
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
121✔
1252
                // Check if we need to reset the remote's TLS name.
40✔
1253
                if resetTLSName {
40✔
1254
                        remote.Lock()
×
1255
                        remote.tlsName = _EMPTY_
×
1256
                        remote.Unlock()
×
1257
                }
×
1258
                return false, err
40✔
1259
        }
1260
        return true, nil
41✔
1261
}
1262

1263
func (c *client) processLeafnodeInfo(info *Info) {
2,683✔
1264
        c.mu.Lock()
2,683✔
1265
        if c.leaf == nil || c.isClosed() {
2,683✔
1266
                c.mu.Unlock()
×
1267
                return
×
1268
        }
×
1269
        s := c.srv
2,683✔
1270
        opts := s.getOpts()
2,683✔
1271
        remote := c.leaf.remote
2,683✔
1272
        didSolicit := remote != nil
2,683✔
1273
        firstINFO := !c.flags.isSet(infoReceived)
2,683✔
1274

2,683✔
1275
        // In case of websocket, the TLS handshake has been already done.
2,683✔
1276
        // So check only for non websocket connections and for configurations
2,683✔
1277
        // where the TLS Handshake was not done first.
2,683✔
1278
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,572✔
1279
                // If the server requires TLS, we need to set this in the remote
1,889✔
1280
                // otherwise if there is no TLS configuration block for the remote,
1,889✔
1281
                // the solicit side will not attempt to perform the TLS handshake.
1,889✔
1282
                if firstINFO && info.TLSRequired {
1,954✔
1283
                        remote.TLS = true
65✔
1284
                }
65✔
1285
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,924✔
1286
                        c.mu.Unlock()
35✔
1287
                        return
35✔
1288
                }
35✔
1289
        }
1290

1291
        // Check for compression, unless already done.
1292
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,956✔
1293
                // Prevent from getting back here.
1,308✔
1294
                c.flags.set(compressionNegotiated)
1,308✔
1295

1,308✔
1296
                var co *CompressionOpts
1,308✔
1297
                if !didSolicit {
1,884✔
1298
                        co = &opts.LeafNode.Compression
576✔
1299
                } else {
1,308✔
1300
                        co = &remote.Compression
732✔
1301
                }
732✔
1302
                if needsCompression(co.Mode) {
2,605✔
1303
                        // Release client lock since following function will need server lock.
1,297✔
1304
                        c.mu.Unlock()
1,297✔
1305
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,297✔
1306
                        if err != nil {
1,297✔
1307
                                c.sendErrAndErr(err.Error())
×
1308
                                c.closeConnection(ProtocolViolation)
×
1309
                                return
×
1310
                        }
×
1311
                        if compress {
2,466✔
1312
                                // Done for now, will get back another INFO protocol...
1,169✔
1313
                                return
1,169✔
1314
                        }
1,169✔
1315
                        // No compression because one side does not want/can't, so proceed.
1316
                        c.mu.Lock()
128✔
1317
                        // Check that the connection did not close if the lock was released.
128✔
1318
                        if c.isClosed() {
128✔
1319
                                c.mu.Unlock()
×
1320
                                return
×
1321
                        }
×
1322
                } else {
11✔
1323
                        // Coming from an old server, the Compression field would be the empty
11✔
1324
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1325
                        // this makes them behave as old servers.
11✔
1326
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1327
                                c.leaf.compression = CompressionNotSupported
3✔
1328
                        } else {
11✔
1329
                                c.leaf.compression = CompressionOff
8✔
1330
                        }
8✔
1331
                }
1332
                // Accepting side does not normally process an INFO protocol during
1333
                // initial connection handshake. So we keep it consistent by returning
1334
                // if we are not soliciting.
1335
                if !didSolicit {
140✔
1336
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1337
                        // clear the ping timer and set the auth timer now that the compression
1✔
1338
                        // negotiation is done.
1✔
1339
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1340
                                clearTimer(&c.ping.tmr)
×
1341
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1342
                        }
×
1343
                        c.mu.Unlock()
1✔
1344
                        return
1✔
1345
                }
1346
                // Fall through and process the INFO protocol as usual.
1347
        }
1348

1349
        // Note: For now, only the initial INFO has a nonce. We
1350
        // will probably do auto key rotation at some point.
1351
        if firstINFO {
2,251✔
1352
                // Mark that the INFO protocol has been received.
773✔
1353
                c.flags.set(infoReceived)
773✔
1354
                // Prevent connecting to non leafnode port. Need to do this only for
773✔
1355
                // the first INFO, not for async INFO updates...
773✔
1356
                //
773✔
1357
                // Content of INFO sent by the server when accepting a tcp connection.
773✔
1358
                // -------------------------------------------------------------------
773✔
1359
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
773✔
1360
                // -------------------------------------------------------------------
773✔
1361
                //      CLIENT    |  X* |        X**        |              |         |
773✔
1362
                //      ROUTE     |     |        X**        |      X***    |         |
773✔
1363
                //     GATEWAY    |     |                   |              |    X    |
773✔
1364
                //     LEAFNODE   |  X  |                   |       X      |         |
773✔
1365
                // -------------------------------------------------------------------
773✔
1366
                // *   Not on older servers.
773✔
1367
                // **  Not if "no advertise" is enabled.
773✔
1368
                // *** Not if leafnode's "no advertise" is enabled.
773✔
1369
                //
773✔
1370
                // As seen from above, a solicited LeafNode connection should receive
773✔
1371
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
773✔
1372
                // else should be considered an attempt to connect to a wrong port.
773✔
1373
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
825✔
1374
                        c.mu.Unlock()
52✔
1375
                        c.Errorf(ErrConnectedToWrongPort.Error())
52✔
1376
                        c.closeConnection(WrongPort)
52✔
1377
                        return
52✔
1378
                }
52✔
1379
                // Reject a cluster that contains spaces.
1380
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
722✔
1381
                        c.mu.Unlock()
1✔
1382
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1383
                        c.closeConnection(ProtocolViolation)
1✔
1384
                        return
1✔
1385
                }
1✔
1386
                // Capture a nonce here.
1387
                c.nonce = []byte(info.Nonce)
720✔
1388
                if info.TLSRequired && didSolicit {
750✔
1389
                        remote.TLS = true
30✔
1390
                }
30✔
1391
                supportsHeaders := c.srv.supportsHeaders()
720✔
1392
                c.headers = supportsHeaders && info.Headers
720✔
1393

720✔
1394
                // Remember the remote server.
720✔
1395
                // Pre 2.2.0 servers are not sending their server name.
720✔
1396
                // In that case, use info.ID, which, for those servers, matches
720✔
1397
                // the content of the field `Name` in the leafnode CONNECT protocol.
720✔
1398
                if info.Name == _EMPTY_ {
720✔
1399
                        c.leaf.remoteServer = info.ID
×
1400
                } else {
720✔
1401
                        c.leaf.remoteServer = info.Name
720✔
1402
                }
720✔
1403
                c.leaf.remoteDomain = info.Domain
720✔
1404
                c.leaf.remoteCluster = info.Cluster
720✔
1405
                // We send the protocol version in the INFO protocol.
720✔
1406
                // Keep track of it, so we know if this connection supports message
720✔
1407
                // tracing for instance.
720✔
1408
                c.opts.Protocol = info.Proto
720✔
1409
        }
1410

1411
        // For both initial INFO and async INFO protocols, Possibly
1412
        // update our list of remote leafnode URLs we can connect to.
1413
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,757✔
1414
                // Consider the incoming array as the most up-to-date
1,332✔
1415
                // representation of the remote cluster's list of URLs.
1,332✔
1416
                c.updateLeafNodeURLs(info)
1,332✔
1417
        }
1,332✔
1418

1419
        // Check to see if we have permissions updates here.
1420
        if info.Import != nil || info.Export != nil {
1,440✔
1421
                perms := &Permissions{
15✔
1422
                        Publish:   info.Export,
15✔
1423
                        Subscribe: info.Import,
15✔
1424
                }
15✔
1425
                // Check if we have local deny clauses that we need to merge.
15✔
1426
                if remote := c.leaf.remote; remote != nil {
30✔
1427
                        if len(remote.DenyExports) > 0 {
16✔
1428
                                if perms.Publish == nil {
1✔
1429
                                        perms.Publish = &SubjectPermission{}
×
1430
                                }
×
1431
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1432
                        }
1433
                        if len(remote.DenyImports) > 0 {
16✔
1434
                                if perms.Subscribe == nil {
1✔
1435
                                        perms.Subscribe = &SubjectPermission{}
×
1436
                                }
×
1437
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1438
                        }
1439
                }
1440
                c.setPermissions(perms)
15✔
1441
        }
1442

1443
        var resumeConnect bool
1,425✔
1444

1,425✔
1445
        // If this is a remote connection and this is the first INFO protocol,
1,425✔
1446
        // then we need to finish the connect process by sending CONNECT, etc..
1,425✔
1447
        if firstINFO && didSolicit {
2,102✔
1448
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
677✔
1449
                c.nc.SetDeadline(time.Time{})
677✔
1450
                resumeConnect = true
677✔
1451
        } else if !firstINFO && didSolicit {
2,080✔
1452
                c.leaf.remoteAccName = info.RemoteAccount
655✔
1453
        }
655✔
1454

1455
        // Check if we have the remote account information and if so make sure it's stored.
1456
        if info.RemoteAccount != _EMPTY_ {
2,066✔
1457
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
641✔
1458
        }
641✔
1459
        c.mu.Unlock()
1,425✔
1460

1,425✔
1461
        finishConnect := info.ConnectInfo
1,425✔
1462
        if resumeConnect && s != nil {
2,102✔
1463
                s.leafNodeResumeConnectProcess(c)
677✔
1464
                if !info.InfoOnConnect {
677✔
1465
                        finishConnect = true
×
1466
                }
×
1467
        }
1468
        if finishConnect {
2,066✔
1469
                s.leafNodeFinishConnectProcess(c)
641✔
1470
        }
641✔
1471

1472
        // Check to see if we need to kick any internal source or mirror consumers.
1473
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1474
        // does not have jetstream.
1475
        s.checkInternalSyncConsumers(c.acc)
1,425✔
1476
}
1477

1478
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,297✔
1479
        // Negotiate the appropriate compression mode (or no compression)
1,297✔
1480
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,297✔
1481
        if err != nil {
1,297✔
1482
                return false, err
×
1483
        }
×
1484
        c.mu.Lock()
1,297✔
1485
        // For "auto" mode, set the initial compression mode based on RTT
1,297✔
1486
        if cm == CompressionS2Auto {
2,427✔
1487
                if c.rttStart.IsZero() {
2,260✔
1488
                        c.rtt = computeRTT(c.start)
1,130✔
1489
                }
1,130✔
1490
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,130✔
1491
        }
1492
        // Keep track of the negotiated compression mode.
1493
        c.leaf.compression = cm
1,297✔
1494
        cid := c.cid
1,297✔
1495
        var nonce string
1,297✔
1496
        if !didSolicit {
1,872✔
1497
                nonce = bytesToString(c.nonce)
575✔
1498
        }
575✔
1499
        c.mu.Unlock()
1,297✔
1500

1,297✔
1501
        if !needsCompression(cm) {
1,425✔
1502
                return false, nil
128✔
1503
        }
128✔
1504

1505
        // If we end-up doing compression...
1506

1507
        // Generate an INFO with the chosen compression mode.
1508
        s.mu.Lock()
1,169✔
1509
        info := s.copyLeafNodeInfo()
1,169✔
1510
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,169✔
1511
        infoProto := generateInfoJSON(info)
1,169✔
1512
        s.mu.Unlock()
1,169✔
1513

1,169✔
1514
        // If we solicited, then send this INFO protocol BEFORE switching
1,169✔
1515
        // to compression writer. However, if we did not, we send it after.
1,169✔
1516
        c.mu.Lock()
1,169✔
1517
        if didSolicit {
1,763✔
1518
                c.enqueueProto(infoProto)
594✔
1519
                // Make sure it is completely flushed (the pending bytes goes to
594✔
1520
                // 0) before proceeding.
594✔
1521
                for c.out.pb > 0 && !c.isClosed() {
1,188✔
1522
                        c.flushOutbound()
594✔
1523
                }
594✔
1524
        }
1525
        // This is to notify the readLoop that it should switch to a
1526
        // (de)compression reader.
1527
        c.in.flags.set(switchToCompression)
1,169✔
1528
        // Create the compress writer before queueing the INFO protocol for
1,169✔
1529
        // a route that did not solicit. It will make sure that that proto
1,169✔
1530
        // is sent with compression on.
1,169✔
1531
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,169✔
1532
        if !didSolicit {
1,744✔
1533
                c.enqueueProto(infoProto)
575✔
1534
        }
575✔
1535
        c.mu.Unlock()
1,169✔
1536
        return true, nil
1,169✔
1537
}
1538

1539
// When getting a leaf node INFO protocol, use the provided
1540
// array of urls to update the list of possible endpoints.
1541
func (c *client) updateLeafNodeURLs(info *Info) {
1,332✔
1542
        cfg := c.leaf.remote
1,332✔
1543
        cfg.Lock()
1,332✔
1544
        defer cfg.Unlock()
1,332✔
1545

1,332✔
1546
        // We have ensured that if a remote has a WS scheme, then all are.
1,332✔
1547
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,332✔
1548
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,386✔
1549
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1550
                // we will have already marked that the remote should use TLS anyway.
54✔
1551
                // But use proper scheme for log statements, etc...
54✔
1552
                proto := wsSchemePrefix
54✔
1553
                if cfg.TLS {
54✔
1554
                        proto = wsSchemePrefixTLS
×
1555
                }
×
1556
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1557
                return
54✔
1558
        }
1559
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,278✔
1560
}
1561

1562
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,332✔
1563
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,332✔
1564
        // Add the ones we receive in the protocol
1,332✔
1565
        for _, surl := range URLs {
3,701✔
1566
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,369✔
1567
                if err != nil {
2,369✔
1568
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1569
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1570
                        continue
×
1571
                }
1572
                // Do not add if it's the same as what we already have configured.
1573
                var dup bool
2,369✔
1574
                for _, u := range cfg.URLs {
5,994✔
1575
                        // URLs that we receive never have user info, but the
3,625✔
1576
                        // ones that were configured may have. Simply compare
3,625✔
1577
                        // host and port to decide if they are equal or not.
3,625✔
1578
                        if url.Host == u.Host && url.Port() == u.Port() {
5,335✔
1579
                                dup = true
1,710✔
1580
                                break
1,710✔
1581
                        }
1582
                }
1583
                if !dup {
3,028✔
1584
                        cfg.urls = append(cfg.urls, url)
659✔
1585
                        cfg.saveTLSHostname(url)
659✔
1586
                }
659✔
1587
        }
1588
        // Add the configured one
1589
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,332✔
1590
}
1591

1592
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1593
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,463✔
1594
        opts := s.getOpts()
3,463✔
1595
        if opts.LeafNode.Advertise != _EMPTY_ {
3,474✔
1596
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1597
                if err != nil {
11✔
1598
                        return err
×
1599
                }
×
1600
                s.leafNodeInfo.Host = advHost
11✔
1601
                s.leafNodeInfo.Port = advPort
11✔
1602
        } else {
3,452✔
1603
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,452✔
1604
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,452✔
1605
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,452✔
1606
                // This will return at most 1 IP.
3,452✔
1607
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,452✔
1608
                if err != nil {
3,452✔
1609
                        return err
×
1610
                }
×
1611
                if hostIsIPAny {
3,750✔
1612
                        if len(ips) == 0 {
298✔
1613
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1614
                                        s.leafNodeInfo.Host)
×
1615
                        } else {
298✔
1616
                                // Take the first from the list...
298✔
1617
                                s.leafNodeInfo.Host = ips[0]
298✔
1618
                        }
298✔
1619
                }
1620
        }
1621
        // Use just host:port for the IP
1622
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,463✔
1623
        if opts.LeafNode.Advertise != _EMPTY_ {
3,474✔
1624
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1625
        }
11✔
1626
        return nil
3,463✔
1627
}
1628

1629
// Add the connection to the map of leaf nodes.
1630
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1631
// if a connection already exists for the same server name and account.
1632
// That can happen when the remote is attempting to reconnect while the accepting
1633
// side did not detect the connection as broken yet.
1634
// But it can also happen when there is a misconfiguration and the remote is
1635
// creating two (or more) connections that bind to the same account on the accept
1636
// side.
1637
// When a duplicate is found, the new connection is accepted and the old is closed
1638
// (this solves the stale connection situation). An error is returned to help the
1639
// remote detect the misconfiguration when the duplicate is the result of that
1640
// misconfiguration.
1641
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,321✔
1642
        var accName string
1,321✔
1643
        c.mu.Lock()
1,321✔
1644
        cid := c.cid
1,321✔
1645
        acc := c.acc
1,321✔
1646
        if acc != nil {
2,642✔
1647
                accName = acc.Name
1,321✔
1648
        }
1,321✔
1649
        myRemoteDomain := c.leaf.remoteDomain
1,321✔
1650
        mySrvName := c.leaf.remoteServer
1,321✔
1651
        remoteAccName := c.leaf.remoteAccName
1,321✔
1652
        myClustName := c.leaf.remoteCluster
1,321✔
1653
        solicited := c.leaf.remote != nil
1,321✔
1654
        c.mu.Unlock()
1,321✔
1655

1,321✔
1656
        var old *client
1,321✔
1657
        s.mu.Lock()
1,321✔
1658
        // We check for empty because in some test we may send empty CONNECT{}
1,321✔
1659
        if checkForDup && srvName != _EMPTY_ {
1,963✔
1660
                for _, ol := range s.leafs {
1,041✔
1661
                        ol.mu.Lock()
399✔
1662
                        // We care here only about non solicited Leafnode. This function
399✔
1663
                        // is more about replacing stale connections than detecting loops.
399✔
1664
                        // We have code for the loop detection elsewhere, which also delays
399✔
1665
                        // attempt to reconnect.
399✔
1666
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
399✔
1667
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
399✔
1668
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
402✔
1669
                                old = ol
3✔
1670
                        }
3✔
1671
                        ol.mu.Unlock()
399✔
1672
                        if old != nil {
402✔
1673
                                break
3✔
1674
                        }
1675
                }
1676
        }
1677
        // Store new connection in the map
1678
        s.leafs[cid] = c
1,321✔
1679
        s.mu.Unlock()
1,321✔
1680
        s.removeFromTempClients(cid)
1,321✔
1681

1,321✔
1682
        // If applicable, evict the old one.
1,321✔
1683
        if old != nil {
1,324✔
1684
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
3✔
1685
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
3✔
1686
                c.Warnf("Replacing connection from same server")
3✔
1687
        }
3✔
1688

1689
        srvDecorated := func() string {
1,530✔
1690
                if myClustName == _EMPTY_ {
231✔
1691
                        return mySrvName
22✔
1692
                }
22✔
1693
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
187✔
1694
        }
1695

1696
        opts := s.getOpts()
1,321✔
1697
        sysAcc := s.SystemAccount()
1,321✔
1698
        js := s.getJetStream()
1,321✔
1699
        var meta *raft
1,321✔
1700
        if js != nil {
1,866✔
1701
                if mg := js.getMetaGroup(); mg != nil {
977✔
1702
                        meta = mg.(*raft)
432✔
1703
                }
432✔
1704
        }
1705
        blockMappingOutgoing := false
1,321✔
1706
        // Deny (non domain) JetStream API traffic unless system account is shared
1,321✔
1707
        // and domain names are identical and extending is not disabled
1,321✔
1708

1,321✔
1709
        // Check if backwards compatibility has been enabled and needs to be acted on
1,321✔
1710
        forceSysAccDeny := false
1,321✔
1711
        if len(opts.JsAccDefaultDomain) > 0 {
1,358✔
1712
                if acc == sysAcc {
48✔
1713
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1714
                                if d == _EMPTY_ {
19✔
1715
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1716
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1717
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1718
                                        forceSysAccDeny = true
8✔
1719
                                        break
8✔
1720
                                }
1721
                        }
1722
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1723
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1724
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1725
                        return
15✔
1726
                }
15✔
1727
        }
1728

1729
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1730
        // This is either signaled by js being disabled and a domain set,
1731
        // or in cases where no domain name exists, an extension hint is set.
1732
        // However, this is only relevant in mixed setups.
1733
        //
1734
        // If the system account connects but default domains are present, JetStream can't be extended.
1735
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,306✔
1736
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,454✔
1737
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,148✔
1738
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,148✔
1739
                if acc != nil && acc == sysAcc {
1,285✔
1740
                        c.Noticef("System account connected from %s", srvDecorated())
137✔
1741
                        c.Noticef("JetStream not extended, domains differ")
137✔
1742
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
137✔
1743
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
137✔
1744
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
137✔
1745
                        if solicited && meta != nil && meta.IsObserver() {
166✔
1746
                                meta.setObserver(false, extNotExtended)
29✔
1747
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1748
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1749
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1750
                                // Meta controller can't be leader yet.
29✔
1751
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1752
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1753
                                meta.Campaign()
29✔
1754
                        }
29✔
1755
                } else {
1,011✔
1756
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
1,011✔
1757
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
1,011✔
1758
                }
1,011✔
1759
                blockMappingOutgoing = true
1,148✔
1760
        } else if acc == sysAcc {
230✔
1761
                // system account and same domain
72✔
1762
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1763
                        myRemoteDomain, srvDecorated())
72✔
1764
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1765
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1766
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1767
                        meta.setObserver(true, extExtended)
4✔
1768
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1769
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1770
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1771
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1772
                        meta.StepDown()
4✔
1773
                }
4✔
1774
        } else {
86✔
1775
                // This deny is needed in all cases (system account shared or not)
86✔
1776
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1777
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1778
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1779
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1780
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1781
        }
86✔
1782
        // If we have a specified JetStream domain we will want to add a mapping to
1783
        // allow access cross domain for each non-system account.
1784
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,558✔
1785
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,520✔
1786
                        if err := acc.AddMapping(src, dest); err != nil {
2,268✔
1787
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1788
                        } else {
2,268✔
1789
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,268✔
1790
                        }
2,268✔
1791
                }
1792
                if blockMappingOutgoing {
473✔
1793
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
221✔
1794
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
221✔
1795
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
221✔
1796
                        // of this issue, not all of them.
221✔
1797
                        // This guards against a hub and a spoke having the same domain name.
221✔
1798
                        // But not two spokes having the same one and the request coming from the hub.
221✔
1799
                        c.mergeDenyPermissionsLocked(pub, []string{src})
221✔
1800
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
221✔
1801
                }
221✔
1802
        }
1803
}
1804

1805
func (s *Server) removeLeafNodeConnection(c *client) {
1,664✔
1806
        c.mu.Lock()
1,664✔
1807
        cid := c.cid
1,664✔
1808
        if c.leaf != nil {
3,328✔
1809
                if c.leaf.tsubt != nil {
2,874✔
1810
                        c.leaf.tsubt.Stop()
1,210✔
1811
                        c.leaf.tsubt = nil
1,210✔
1812
                }
1,210✔
1813
                if c.leaf.gwSub != nil {
2,303✔
1814
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
639✔
1815
                        // We need to set this to nil for GC to release the connection
639✔
1816
                        c.leaf.gwSub = nil
639✔
1817
                }
639✔
1818
        }
1819
        proxyKey := c.proxyKey
1,664✔
1820
        c.mu.Unlock()
1,664✔
1821
        s.mu.Lock()
1,664✔
1822
        delete(s.leafs, cid)
1,664✔
1823
        if proxyKey != _EMPTY_ {
1,668✔
1824
                s.removeProxiedConn(proxyKey, cid)
4✔
1825
        }
4✔
1826
        s.mu.Unlock()
1,664✔
1827
        s.removeFromTempClients(cid)
1,664✔
1828
}
1829

1830
// Connect information for solicited leafnodes.
1831
type leafConnectInfo struct {
1832
        Version   string   `json:"version,omitempty"`
1833
        Nkey      string   `json:"nkey,omitempty"`
1834
        JWT       string   `json:"jwt,omitempty"`
1835
        Sig       string   `json:"sig,omitempty"`
1836
        User      string   `json:"user,omitempty"`
1837
        Pass      string   `json:"pass,omitempty"`
1838
        Token     string   `json:"auth_token,omitempty"`
1839
        ID        string   `json:"server_id,omitempty"`
1840
        Domain    string   `json:"domain,omitempty"`
1841
        Name      string   `json:"name,omitempty"`
1842
        Hub       bool     `json:"is_hub,omitempty"`
1843
        Cluster   string   `json:"cluster,omitempty"`
1844
        Headers   bool     `json:"headers,omitempty"`
1845
        JetStream bool     `json:"jetstream,omitempty"`
1846
        DenyPub   []string `json:"deny_pub,omitempty"`
1847

1848
        // There was an existing field called:
1849
        // >> Comp bool `json:"compression,omitempty"`
1850
        // that has never been used. With support for compression, we now need
1851
        // a field that is a string. So we use a different json tag:
1852
        Compression string `json:"compress_mode,omitempty"`
1853

1854
        // Just used to detect wrong connection attempts.
1855
        Gateway string `json:"gateway,omitempty"`
1856

1857
        // Tells the accept side which account the remote is binding to.
1858
        RemoteAccount string `json:"remote_account,omitempty"`
1859

1860
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1861
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1862
        // version as part of the CONNECT. It will indicate if a connection supports
1863
        // some features, such as message tracing.
1864
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1865
        // in the low level process CONNECT.
1866
        Proto int `json:"protocol,omitempty"`
1867
}
1868

1869
// processLeafNodeConnect will process the inbound connect args.
1870
// Once we are here we are bound to an account, so can send any interest that
1871
// we would have to the other side.
1872
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
687✔
1873
        // Way to detect clients that incorrectly connect to the route listen
687✔
1874
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
687✔
1875
        if lang != _EMPTY_ {
687✔
1876
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1877
                c.closeConnection(WrongPort)
×
1878
                return ErrClientConnectedToLeafNodePort
×
1879
        }
×
1880

1881
        // Unmarshal as a leaf node connect protocol
1882
        proto := &leafConnectInfo{}
687✔
1883
        if err := json.Unmarshal(arg, proto); err != nil {
687✔
1884
                return err
×
1885
        }
×
1886

1887
        // Reject a cluster that contains spaces.
1888
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
688✔
1889
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1890
                c.closeConnection(ProtocolViolation)
1✔
1891
                return ErrClusterNameHasSpaces
1✔
1892
        }
1✔
1893

1894
        // Check for cluster name collisions.
1895
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
689✔
1896
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
1897
                c.closeConnection(ClusterNamesIdentical)
3✔
1898
                return ErrLeafNodeHasSameClusterName
3✔
1899
        }
3✔
1900

1901
        // Reject if this has Gateway which means that it would be from a gateway
1902
        // connection that incorrectly connects to the leafnode port.
1903
        if proto.Gateway != _EMPTY_ {
683✔
1904
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1905
                c.Errorf(errTxt)
×
1906
                c.sendErr(errTxt)
×
1907
                c.closeConnection(WrongGateway)
×
1908
                return ErrWrongGateway
×
1909
        }
×
1910

1911
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
685✔
1912
                major, minor, update, _ := versionComponents(mv)
2✔
1913
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1914
                        // We are going to send back an INFO because otherwise recent
1✔
1915
                        // versions of the remote server would simply break the connection
1✔
1916
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1917
                        // other side to just "stall" until we finish waiting for the holding
1✔
1918
                        // period and close the connection below.
1✔
1919
                        s.sendPermsAndAccountInfo(c)
1✔
1920
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1921
                        select {
1✔
1922
                        case <-c.srv.quitCh:
1✔
1923
                        case <-time.After(leafNodeWaitBeforeClose):
×
1924
                        }
1925
                        c.closeConnection(MinimumVersionRequired)
1✔
1926
                        return ErrMinimumVersionRequired
1✔
1927
                }
1928
        }
1929

1930
        // Check if this server supports headers.
1931
        supportHeaders := c.srv.supportsHeaders()
682✔
1932

682✔
1933
        c.mu.Lock()
682✔
1934
        // Leaf Nodes do not do echo or verbose or pedantic.
682✔
1935
        c.opts.Verbose = false
682✔
1936
        c.opts.Echo = false
682✔
1937
        c.opts.Pedantic = false
682✔
1938
        // This inbound connection will be marked as supporting headers if this server
682✔
1939
        // support headers and the remote has sent in the CONNECT protocol that it does
682✔
1940
        // support headers too.
682✔
1941
        c.headers = supportHeaders && proto.Headers
682✔
1942
        // If the compression level is still not set, set it based on what has been
682✔
1943
        // given to us in the CONNECT protocol.
682✔
1944
        if c.leaf.compression == _EMPTY_ {
817✔
1945
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
135✔
1946
                if proto.Compression == _EMPTY_ {
175✔
1947
                        c.leaf.compression = CompressionNotSupported
40✔
1948
                } else {
135✔
1949
                        c.leaf.compression = proto.Compression
95✔
1950
                }
95✔
1951
        }
1952

1953
        // Remember the remote server.
1954
        c.leaf.remoteServer = proto.Name
682✔
1955
        // Remember the remote account name
682✔
1956
        c.leaf.remoteAccName = proto.RemoteAccount
682✔
1957

682✔
1958
        // If the other side has declared itself a hub, so we will take on the spoke role.
682✔
1959
        if proto.Hub {
694✔
1960
                c.leaf.isSpoke = true
12✔
1961
        }
12✔
1962

1963
        // The soliciting side is part of a cluster.
1964
        if proto.Cluster != _EMPTY_ {
1,211✔
1965
                c.leaf.remoteCluster = proto.Cluster
529✔
1966
        }
529✔
1967

1968
        c.leaf.remoteDomain = proto.Domain
682✔
1969

682✔
1970
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
682✔
1971
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
682✔
1972
        if !c.isSolicitedLeafNode() && c.perms != nil {
698✔
1973
                sp, pp := c.perms.sub, c.perms.pub
16✔
1974
                c.perms.sub, c.perms.pub = pp, sp
16✔
1975
                if c.opts.Import != nil {
31✔
1976
                        c.darray = c.opts.Import.Deny
15✔
1977
                } else {
16✔
1978
                        c.darray = nil
1✔
1979
                }
1✔
1980
        }
1981

1982
        // Set the Ping timer
1983
        c.setFirstPingTimer()
682✔
1984

682✔
1985
        // If we received pub deny permissions from the other end, merge with existing ones.
682✔
1986
        c.mergeDenyPermissions(pub, proto.DenyPub)
682✔
1987

682✔
1988
        acc := c.acc
682✔
1989
        c.mu.Unlock()
682✔
1990

682✔
1991
        // Register the cluster, even if empty, as long as we are acting as a hub.
682✔
1992
        if !proto.Hub {
1,352✔
1993
                acc.registerLeafNodeCluster(proto.Cluster)
670✔
1994
        }
670✔
1995

1996
        // Add in the leafnode here since we passed through auth at this point.
1997
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
682✔
1998

682✔
1999
        // If we have permissions bound to this leafnode we need to send then back to the
682✔
2000
        // origin server for local enforcement.
682✔
2001
        s.sendPermsAndAccountInfo(c)
682✔
2002

682✔
2003
        // Create and initialize the smap since we know our bound account now.
682✔
2004
        // This will send all registered subs too.
682✔
2005
        s.initLeafNodeSmapAndSendSubs(c)
682✔
2006

682✔
2007
        // Announce the account connect event for a leaf node.
682✔
2008
        // This will be a no-op as needed.
682✔
2009
        s.sendLeafNodeConnect(c.acc)
682✔
2010

682✔
2011
        // Check to see if we need to kick any internal source or mirror consumers.
682✔
2012
        // This will be a no-op if JetStream not enabled for this server or if the bound account
682✔
2013
        // does not have jetstream.
682✔
2014
        s.checkInternalSyncConsumers(acc)
682✔
2015

682✔
2016
        return nil
682✔
2017
}
2018

2019
// checkInternalSyncConsumers
2020
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,107✔
2021
        // Grab our js
2,107✔
2022
        js := s.getJetStream()
2,107✔
2023

2,107✔
2024
        // Only applicable if we have JS and the leafnode has JS as well.
2,107✔
2025
        // We check for remote JS outside.
2,107✔
2026
        if !js.isEnabled() || acc == nil {
3,307✔
2027
                return
1,200✔
2028
        }
1,200✔
2029

2030
        // We will check all streams in our local account. They must be a leader and
2031
        // be sourcing or mirroring. We will check the external config on the stream itself
2032
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2033
        // extedning the system across this leafnode connection and hence we would be extending
2034
        // our own domain.
2035
        jsa := js.lookupAccount(acc)
907✔
2036
        if jsa == nil {
1,261✔
2037
                return
354✔
2038
        }
354✔
2039

2040
        var streams []*stream
553✔
2041
        jsa.mu.RLock()
553✔
2042
        for _, mset := range jsa.streams {
609✔
2043
                mset.cfgMu.RLock()
56✔
2044
                // We need to have a mirror or source defined.
56✔
2045
                // We do not want to force another lock here to look for leader status,
56✔
2046
                // so collect and after we release jsa will make sure.
56✔
2047
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
68✔
2048
                        streams = append(streams, mset)
12✔
2049
                }
12✔
2050
                mset.cfgMu.RUnlock()
56✔
2051
        }
2052
        jsa.mu.RUnlock()
553✔
2053

553✔
2054
        // Now loop through all candidates and check if we are the leader and have NOT
553✔
2055
        // created the sync up consumer.
553✔
2056
        for _, mset := range streams {
565✔
2057
                mset.retryDisconnectedSyncConsumers()
12✔
2058
        }
12✔
2059
}
2060

2061
// Returns the remote cluster name. This is set only once so does not require a lock.
2062
func (c *client) remoteCluster() string {
194,777✔
2063
        if c.leaf == nil {
194,777✔
2064
                return _EMPTY_
×
2065
        }
×
2066
        return c.leaf.remoteCluster
194,777✔
2067
}
2068

2069
// Sends back an info block to the soliciting leafnode to let it know about
2070
// its permission settings for local enforcement.
2071
func (s *Server) sendPermsAndAccountInfo(c *client) {
683✔
2072
        // Copy
683✔
2073
        s.mu.Lock()
683✔
2074
        info := s.copyLeafNodeInfo()
683✔
2075
        s.mu.Unlock()
683✔
2076
        c.mu.Lock()
683✔
2077
        info.CID = c.cid
683✔
2078
        info.Import = c.opts.Import
683✔
2079
        info.Export = c.opts.Export
683✔
2080
        info.RemoteAccount = c.acc.Name
683✔
2081
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
683✔
2082
        info.IsSystemAccount = c.acc == s.SystemAccount()
683✔
2083
        info.ConnectInfo = true
683✔
2084
        c.enqueueProto(generateInfoJSON(info))
683✔
2085
        c.mu.Unlock()
683✔
2086
}
683✔
2087

2088
// Snapshot the current subscriptions from the sublist into our smap which
2089
// we will keep updated from now on.
2090
// Also send the registered subscriptions.
2091
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,321✔
2092
        acc := c.acc
1,321✔
2093
        if acc == nil {
1,321✔
2094
                c.Debugf("Leafnode does not have an account bound")
×
2095
                return
×
2096
        }
×
2097
        // Collect all account subs here.
2098
        _subs := [1024]*subscription{}
1,321✔
2099
        subs := _subs[:0]
1,321✔
2100
        ims := []string{}
1,321✔
2101

1,321✔
2102
        // Hold the client lock otherwise there can be a race and miss some subs.
1,321✔
2103
        c.mu.Lock()
1,321✔
2104
        defer c.mu.Unlock()
1,321✔
2105

1,321✔
2106
        acc.mu.RLock()
1,321✔
2107
        accName := acc.Name
1,321✔
2108
        accNTag := acc.nameTag
1,321✔
2109

1,321✔
2110
        // To make printing look better when no friendly name present.
1,321✔
2111
        if accNTag != _EMPTY_ {
1,332✔
2112
                accNTag = "/" + accNTag
11✔
2113
        }
11✔
2114

2115
        // If we are solicited we only send interest for local clients.
2116
        if c.isSpokeLeafNode() {
1,960✔
2117
                acc.sl.localSubs(&subs, true)
639✔
2118
        } else {
1,321✔
2119
                acc.sl.All(&subs)
682✔
2120
        }
682✔
2121

2122
        // Check if we have an existing service import reply.
2123
        siReply := copyBytes(acc.siReply)
1,321✔
2124

1,321✔
2125
        // Since leaf nodes only send on interest, if the bound
1,321✔
2126
        // account has import services we need to send those over.
1,321✔
2127
        for isubj := range acc.imports.services {
6,256✔
2128
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,223✔
2129
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
288✔
2130
                        continue
288✔
2131
                }
2132
                ims = append(ims, isubj)
4,647✔
2133
        }
2134
        // Likewise for mappings.
2135
        for _, m := range acc.mappings {
3,709✔
2136
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,424✔
2137
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2138
                        continue
36✔
2139
                }
2140
                ims = append(ims, m.src)
2,352✔
2141
        }
2142

2143
        // Create a unique subject that will be used for loop detection.
2144
        lds := acc.lds
1,321✔
2145
        acc.mu.RUnlock()
1,321✔
2146

1,321✔
2147
        // Check if we have to create the LDS.
1,321✔
2148
        if lds == _EMPTY_ {
2,347✔
2149
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,026✔
2150
                acc.mu.Lock()
1,026✔
2151
                acc.lds = lds
1,026✔
2152
                acc.mu.Unlock()
1,026✔
2153
        }
1,026✔
2154

2155
        // Now check for gateway interest. Leafnodes will put this into
2156
        // the proper mode to propagate, but they are not held in the account.
2157
        gwsa := [16]*client{}
1,321✔
2158
        gws := gwsa[:0]
1,321✔
2159
        s.getOutboundGatewayConnections(&gws)
1,321✔
2160
        for _, cgw := range gws {
1,403✔
2161
                cgw.mu.Lock()
82✔
2162
                gw := cgw.gw
82✔
2163
                cgw.mu.Unlock()
82✔
2164
                if gw != nil {
164✔
2165
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
164✔
2166
                                if e := ei.(*outsie); e != nil && e.sl != nil {
164✔
2167
                                        e.sl.All(&subs)
82✔
2168
                                }
82✔
2169
                        }
2170
                }
2171
        }
2172

2173
        applyGlobalRouting := s.gateway.enabled
1,321✔
2174
        if c.isSpokeLeafNode() {
1,960✔
2175
                // Add a fake subscription for this solicited leafnode connection
639✔
2176
                // so that we can send back directly for mapped GW replies.
639✔
2177
                // We need to keep track of this subscription so it can be removed
639✔
2178
                // when the connection is closed so that the GC can release it.
639✔
2179
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
639✔
2180
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
639✔
2181
        }
639✔
2182

2183
        // Now walk the results and add them to our smap
2184
        rc := c.leaf.remoteCluster
1,321✔
2185
        c.leaf.smap = make(map[string]int32)
1,321✔
2186
        for _, sub := range subs {
39,393✔
2187
                // Check perms regardless of role.
38,072✔
2188
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,464✔
2189
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,392✔
2190
                        continue
2,392✔
2191
                }
2192
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2193
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
35,685✔
2194
                        continue
5✔
2195
                }
2196
                // We ignore ourselves here.
2197
                // Also don't add the subscription if it has a origin cluster and the
2198
                // cluster name matches the one of the client we are sending to.
2199
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
66,029✔
2200
                        count := int32(1)
30,354✔
2201
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,364✔
2202
                                count = sub.qw
10✔
2203
                        }
10✔
2204
                        c.leaf.smap[keyFromSub(sub)] += count
30,354✔
2205
                        if c.leaf.tsub == nil {
31,598✔
2206
                                c.leaf.tsub = make(map[*subscription]struct{})
1,244✔
2207
                        }
1,244✔
2208
                        c.leaf.tsub[sub] = struct{}{}
30,354✔
2209
                }
2210
        }
2211
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2212
        for _, isubj := range ims {
8,320✔
2213
                c.leaf.smap[isubj]++
6,999✔
2214
        }
6,999✔
2215
        // If we have gateways enabled we need to make sure the other side sends us responses
2216
        // that have been augmented from the original subscription.
2217
        // TODO(dlc) - Should we lock this down more?
2218
        if applyGlobalRouting {
1,424✔
2219
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
103✔
2220
                c.leaf.smap[gwReplyPrefix+">"]++
103✔
2221
        }
103✔
2222
        // Detect loops by subscribing to a specific subject and checking
2223
        // if this sub is coming back to us.
2224
        c.leaf.smap[lds]++
1,321✔
2225

1,321✔
2226
        // Check if we need to add an existing siReply to our map.
1,321✔
2227
        // This will be a prefix so add on the wildcard.
1,321✔
2228
        if siReply != nil {
1,339✔
2229
                wcsub := append(siReply, '>')
18✔
2230
                c.leaf.smap[string(wcsub)]++
18✔
2231
        }
18✔
2232
        // Queue all protocols. There is no max pending limit for LN connection,
2233
        // so we don't need chunking. The writes will happen from the writeLoop.
2234
        var b bytes.Buffer
1,321✔
2235
        for key, n := range c.leaf.smap {
28,078✔
2236
                c.writeLeafSub(&b, key, n)
26,757✔
2237
        }
26,757✔
2238
        if b.Len() > 0 {
2,642✔
2239
                c.enqueueProto(b.Bytes())
1,321✔
2240
        }
1,321✔
2241
        if c.leaf.tsub != nil {
2,566✔
2242
                // Clear the tsub map after 5 seconds.
1,245✔
2243
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,280✔
2244
                        c.mu.Lock()
35✔
2245
                        if c.leaf != nil {
70✔
2246
                                c.leaf.tsub = nil
35✔
2247
                                c.leaf.tsubt = nil
35✔
2248
                        }
35✔
2249
                        c.mu.Unlock()
35✔
2250
                })
2251
        }
2252
}
2253

2254
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2255
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
198,939✔
2256
        acc, err := s.LookupAccount(accName)
198,939✔
2257
        if acc == nil || err != nil {
199,063✔
2258
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
124✔
2259
                return
124✔
2260
        }
124✔
2261
        acc.updateLeafNodes(sub, delta)
198,815✔
2262
}
2263

2264
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2265
// Will also forward to all leaf nodes as needed.
2266
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2267
// (that is, for which this server acts as a hub to them).
2268
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,441,882✔
2269
        if acc == nil || sub == nil {
2,441,882✔
2270
                return
×
2271
        }
×
2272

2273
        // We will do checks for no leafnodes and same cluster here inline and under the
2274
        // general account read lock.
2275
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2276
        // blocking routes or GWs.
2277

2278
        acc.mu.RLock()
2,441,882✔
2279
        // First check if we even have leafnodes here.
2,441,882✔
2280
        if acc.nleafs == 0 {
4,814,756✔
2281
                acc.mu.RUnlock()
2,372,874✔
2282
                return
2,372,874✔
2283
        }
2,372,874✔
2284

2285
        // Is this a loop detection subject.
2286
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
69,008✔
2287

69,008✔
2288
        // Capture the cluster even if its empty.
69,008✔
2289
        var cluster string
69,008✔
2290
        if sub.origin != nil {
118,872✔
2291
                cluster = bytesToString(sub.origin)
49,864✔
2292
        }
49,864✔
2293

2294
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2295
        // Empty clusters will return false for the check.
2296
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
89,926✔
2297
                acc.mu.RUnlock()
20,918✔
2298
                return
20,918✔
2299
        }
20,918✔
2300

2301
        // We can release the general account lock.
2302
        acc.mu.RUnlock()
48,090✔
2303

48,090✔
2304
        // We can hold the list lock here to avoid having to copy a large slice.
48,090✔
2305
        acc.lmu.RLock()
48,090✔
2306
        defer acc.lmu.RUnlock()
48,090✔
2307

48,090✔
2308
        // Do this once.
48,090✔
2309
        subject := string(sub.subject)
48,090✔
2310

48,090✔
2311
        // Walk the connected leafnodes.
48,090✔
2312
        for _, ln := range acc.lleafs {
107,497✔
2313
                if ln == sub.client {
90,199✔
2314
                        continue
30,792✔
2315
                }
2316
                ln.mu.Lock()
28,615✔
2317
                // Don't advertise interest from leafnodes to other isolated leafnodes.
28,615✔
2318
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
28,627✔
2319
                        ln.mu.Unlock()
12✔
2320
                        continue
12✔
2321
                }
2322
                // If `hubOnly` is true, it means that we want to update only leafnodes
2323
                // that connect to this server (so isHubLeafNode() would return `true`).
2324
                if hubOnly && !ln.isHubLeafNode() {
28,609✔
2325
                        ln.mu.Unlock()
6✔
2326
                        continue
6✔
2327
                }
2328
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2329
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2330
                // the detection of loops as long as different cluster.
2331
                clusterDifferent := cluster != ln.remoteCluster()
28,597✔
2332
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
52,678✔
2333
                        ln.updateSmap(sub, delta, isLDS)
24,081✔
2334
                }
24,081✔
2335
                ln.mu.Unlock()
28,597✔
2336
        }
2337
}
2338

2339
// updateLeafNodes will make sure to update the account smap for the subscription.
2340
// Will also forward to all leaf nodes as needed.
2341
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,441,859✔
2342
        acc.updateLeafNodesEx(sub, delta, false)
2,441,859✔
2343
}
2,441,859✔
2344

2345
// This will make an update to our internal smap and determine if we should send out
2346
// an interest update to the remote side.
2347
// Lock should be held.
2348
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
24,081✔
2349
        if c.leaf.smap == nil {
24,102✔
2350
                return
21✔
2351
        }
21✔
2352

2353
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2354
        skind := sub.client.kind
24,060✔
2355
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
24,060✔
2356
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,420✔
2357
                return
8,360✔
2358
        }
8,360✔
2359

2360
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2361
        if delta > 0 && c.leaf.tsub != nil {
23,411✔
2362
                if _, present := c.leaf.tsub[sub]; present {
7,714✔
2363
                        delete(c.leaf.tsub, sub)
3✔
2364
                        if len(c.leaf.tsub) == 0 {
3✔
2365
                                c.leaf.tsub = nil
×
2366
                                c.leaf.tsubt.Stop()
×
2367
                                c.leaf.tsubt = nil
×
2368
                        }
×
2369
                        return
3✔
2370
                }
2371
        }
2372

2373
        key := keyFromSub(sub)
15,697✔
2374
        n, ok := c.leaf.smap[key]
15,697✔
2375
        if delta < 0 && !ok {
16,644✔
2376
                return
947✔
2377
        }
947✔
2378

2379
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2380
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,750✔
2381
        n += delta
14,750✔
2382
        if n > 0 {
25,810✔
2383
                c.leaf.smap[key] = n
11,060✔
2384
        } else {
14,750✔
2385
                delete(c.leaf.smap, key)
3,690✔
2386
        }
3,690✔
2387
        if update {
24,726✔
2388
                c.sendLeafNodeSubUpdate(key, n)
9,976✔
2389
        }
9,976✔
2390
}
2391

2392
// Used to force add subjects to the subject map.
2393
func (c *client) forceAddToSmap(subj string) {
4✔
2394
        c.mu.Lock()
4✔
2395
        defer c.mu.Unlock()
4✔
2396

4✔
2397
        if c.leaf.smap == nil {
4✔
2398
                return
×
2399
        }
×
2400
        n := c.leaf.smap[subj]
4✔
2401
        if n != 0 {
5✔
2402
                return
1✔
2403
        }
1✔
2404
        // Place into the map since it was not there.
2405
        c.leaf.smap[subj] = 1
3✔
2406
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2407
}
2408

2409
// Used to force remove a subject from the subject map.
2410
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2411
        c.mu.Lock()
1✔
2412
        defer c.mu.Unlock()
1✔
2413

1✔
2414
        if c.leaf.smap == nil {
1✔
2415
                return
×
2416
        }
×
2417
        n := c.leaf.smap[subj]
1✔
2418
        if n == 0 {
1✔
2419
                return
×
2420
        }
×
2421
        n--
1✔
2422
        if n == 0 {
2✔
2423
                // Remove is now zero
1✔
2424
                delete(c.leaf.smap, subj)
1✔
2425
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2426
        } else {
1✔
2427
                c.leaf.smap[subj] = n
×
2428
        }
×
2429
}
2430

2431
// Send the subscription interest change to the other side.
2432
// Lock should be held.
2433
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,980✔
2434
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,980✔
2435
        if c.isSpokeLeafNode() {
12,406✔
2436
                checkPerms := true
2,426✔
2437
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,874✔
2438
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,448✔
2439
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,448✔
2440
                                strings.HasPrefix(key, gwReplyPrefix) {
1,540✔
2441
                                checkPerms = false
92✔
2442
                        }
92✔
2443
                }
2444
                if checkPerms {
4,760✔
2445
                        var subject string
2,334✔
2446
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,827✔
2447
                                subject = key[:sep]
493✔
2448
                        } else {
2,334✔
2449
                                subject = key
1,841✔
2450
                        }
1,841✔
2451
                        if !c.canSubscribe(subject) {
2,334✔
2452
                                return
×
2453
                        }
×
2454
                }
2455
        }
2456
        // If we are here we can send over to the other side.
2457
        _b := [64]byte{}
9,980✔
2458
        b := bytes.NewBuffer(_b[:0])
9,980✔
2459
        c.writeLeafSub(b, key, n)
9,980✔
2460
        c.enqueueProto(b.Bytes())
9,980✔
2461
}
2462

2463
// Helper function to build the key.
2464
func keyFromSub(sub *subscription) string {
47,070✔
2465
        var sb strings.Builder
47,070✔
2466
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
47,070✔
2467
        sb.Write(sub.subject)
47,070✔
2468
        if sub.queue != nil {
50,876✔
2469
                // Just make the key subject spc group, e.g. 'foo bar'
3,806✔
2470
                sb.WriteByte(' ')
3,806✔
2471
                sb.Write(sub.queue)
3,806✔
2472
        }
3,806✔
2473
        return sb.String()
47,070✔
2474
}
2475

2476
const (
2477
        keyRoutedSub         = "R"
2478
        keyRoutedSubByte     = 'R'
2479
        keyRoutedLeafSub     = "L"
2480
        keyRoutedLeafSubByte = 'L'
2481
)
2482

2483
// Helper function to build the key that prevents collisions between normal
2484
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2485
// Keys will look like this:
2486
// "R foo"          -> plain routed sub on "foo"
2487
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2488
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2489
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2490
func keyFromSubWithOrigin(sub *subscription) string {
676,441✔
2491
        var sb strings.Builder
676,441✔
2492
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
676,441✔
2493
        leaf := len(sub.origin) > 0
676,441✔
2494
        if leaf {
693,231✔
2495
                sb.WriteByte(keyRoutedLeafSubByte)
16,790✔
2496
        } else {
676,441✔
2497
                sb.WriteByte(keyRoutedSubByte)
659,651✔
2498
        }
659,651✔
2499
        sb.WriteByte(' ')
676,441✔
2500
        sb.Write(sub.subject)
676,441✔
2501
        if sub.queue != nil {
700,058✔
2502
                sb.WriteByte(' ')
23,617✔
2503
                sb.Write(sub.queue)
23,617✔
2504
        }
23,617✔
2505
        if leaf {
693,231✔
2506
                sb.WriteByte(' ')
16,790✔
2507
                sb.Write(sub.origin)
16,790✔
2508
        }
16,790✔
2509
        return sb.String()
676,441✔
2510
}
2511

2512
// Lock should be held.
2513
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
36,737✔
2514
        if key == _EMPTY_ {
36,737✔
2515
                return
×
2516
        }
×
2517
        if n > 0 {
69,783✔
2518
                w.WriteString("LS+ " + key)
33,046✔
2519
                // Check for queue semantics, if found write n.
33,046✔
2520
                if strings.Contains(key, " ") {
35,354✔
2521
                        w.WriteString(" ")
2,308✔
2522
                        var b [12]byte
2,308✔
2523
                        var i = len(b)
2,308✔
2524
                        for l := n; l > 0; l /= 10 {
5,532✔
2525
                                i--
3,224✔
2526
                                b[i] = digits[l%10]
3,224✔
2527
                        }
3,224✔
2528
                        w.Write(b[i:])
2,308✔
2529
                        if c.trace {
2,308✔
2530
                                arg := fmt.Sprintf("%s %d", key, n)
×
2531
                                c.traceOutOp("LS+", []byte(arg))
×
2532
                        }
×
2533
                } else if c.trace {
30,934✔
2534
                        c.traceOutOp("LS+", []byte(key))
196✔
2535
                }
196✔
2536
        } else {
3,691✔
2537
                w.WriteString("LS- " + key)
3,691✔
2538
                if c.trace {
3,705✔
2539
                        c.traceOutOp("LS-", []byte(key))
14✔
2540
                }
14✔
2541
        }
2542
        w.WriteString(CR_LF)
36,737✔
2543
}
2544

2545
// processLeafSub will process an inbound sub request for the remote leaf node.
2546
func (c *client) processLeafSub(argo []byte) (err error) {
32,720✔
2547
        // Indicate activity.
32,720✔
2548
        c.in.subs++
32,720✔
2549

32,720✔
2550
        srv := c.srv
32,720✔
2551
        if srv == nil {
32,720✔
2552
                return nil
×
2553
        }
×
2554

2555
        // Copy so we do not reference a potentially large buffer
2556
        arg := make([]byte, len(argo))
32,720✔
2557
        copy(arg, argo)
32,720✔
2558

32,720✔
2559
        args := splitArg(arg)
32,720✔
2560
        sub := &subscription{client: c}
32,720✔
2561

32,720✔
2562
        delta := int32(1)
32,720✔
2563
        switch len(args) {
32,720✔
2564
        case 1:
30,477✔
2565
                sub.queue = nil
30,477✔
2566
        case 3:
2,242✔
2567
                sub.queue = args[1]
2,242✔
2568
                sub.qw = int32(parseSize(args[2]))
2,242✔
2569
                // TODO: (ik) We should have a non empty queue name and a queue
2,242✔
2570
                // weight >= 1. For 2.11, we may want to return an error if that
2,242✔
2571
                // is not the case, but for now just overwrite `delta` if queue
2,242✔
2572
                // weight is greater than 1 (it is possible after a reconnect/
2,242✔
2573
                // server restart to receive a queue weight > 1 for a new sub).
2,242✔
2574
                if sub.qw > 1 {
3,875✔
2575
                        delta = sub.qw
1,633✔
2576
                }
1,633✔
2577
        default:
1✔
2578
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
1✔
2579
        }
2580
        sub.subject = args[0]
32,719✔
2581

32,719✔
2582
        c.mu.Lock()
32,719✔
2583
        if c.isClosed() {
32,748✔
2584
                c.mu.Unlock()
29✔
2585
                return nil
29✔
2586
        }
29✔
2587

2588
        acc := c.acc
32,690✔
2589
        // Check if we have a loop.
32,690✔
2590
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
32,690✔
2591

32,690✔
2592
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
32,696✔
2593
                c.mu.Unlock()
6✔
2594
                c.handleLeafNodeLoop(true)
6✔
2595
                return nil
6✔
2596
        }
6✔
2597

2598
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2599
        checkPerms := true
32,684✔
2600
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
62,458✔
2601
                if ldsPrefix ||
29,774✔
2602
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
29,774✔
2603
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
31,815✔
2604
                        checkPerms = false
2,041✔
2605
                }
2,041✔
2606
        }
2607

2608
        // If we are a hub check that we can publish to this subject.
2609
        if checkPerms {
63,327✔
2610
                subj := string(sub.subject)
30,643✔
2611
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,955✔
2612
                        c.mu.Unlock()
312✔
2613
                        c.leafSubPermViolation(sub.subject)
312✔
2614
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
312✔
2615
                        return nil
312✔
2616
                }
312✔
2617
        }
2618

2619
        // Check if we have a maximum on the number of subscriptions.
2620
        if c.subsAtLimit() {
32,380✔
2621
                c.mu.Unlock()
8✔
2622
                c.maxSubsExceeded()
8✔
2623
                return nil
8✔
2624
        }
8✔
2625

2626
        // If we have an origin cluster associated mark that in the sub.
2627
        if rc := c.remoteCluster(); rc != _EMPTY_ {
61,212✔
2628
                sub.origin = []byte(rc)
28,848✔
2629
        }
28,848✔
2630

2631
        // Like Routes, we store local subs by account and subject and optionally queue name.
2632
        // If we have a queue it will have a trailing weight which we do not want.
2633
        if sub.queue != nil {
34,313✔
2634
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,949✔
2635
        } else {
32,364✔
2636
                sub.sid = arg
30,415✔
2637
        }
30,415✔
2638
        key := bytesToString(sub.sid)
32,364✔
2639
        osub := c.subs[key]
32,364✔
2640
        if osub == nil {
63,238✔
2641
                c.subs[key] = sub
30,874✔
2642
                // Now place into the account sl.
30,874✔
2643
                if err := acc.sl.Insert(sub); err != nil {
30,874✔
2644
                        delete(c.subs, key)
×
2645
                        c.mu.Unlock()
×
2646
                        c.Errorf("Could not insert subscription: %v", err)
×
2647
                        c.sendErr("Invalid Subscription")
×
2648
                        return nil
×
2649
                }
×
2650
        } else if sub.queue != nil {
2,979✔
2651
                // For a queue we need to update the weight.
1,489✔
2652
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,489✔
2653
                atomic.StoreInt32(&osub.qw, sub.qw)
1,489✔
2654
                acc.sl.UpdateRemoteQSub(osub)
1,489✔
2655
        }
1,489✔
2656
        spoke := c.isSpokeLeafNode()
32,364✔
2657
        c.mu.Unlock()
32,364✔
2658

32,364✔
2659
        // Only add in shadow subs if a new sub or qsub.
32,364✔
2660
        if osub == nil {
63,238✔
2661
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
30,874✔
2662
                        c.Errorf(err.Error())
×
2663
                }
×
2664
        }
2665

2666
        // If we are not solicited, treat leaf node subscriptions similar to a
2667
        // client subscription, meaning we forward them to routes, gateways and
2668
        // other leaf nodes as needed.
2669
        if !spoke {
43,738✔
2670
                // If we are routing add to the route map for the associated account.
11,374✔
2671
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,374✔
2672
                if srv.gateway.enabled {
12,900✔
2673
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,526✔
2674
                }
1,526✔
2675
        }
2676
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2677
        // and non-solicited state in this call so we will do the right thing.
2678
        acc.updateLeafNodes(sub, delta)
32,364✔
2679

32,364✔
2680
        return nil
32,364✔
2681
}
2682

2683
// If the leafnode is a solicited, set the connect delay based on default
2684
// or private option (for tests). Sends the error to the other side, log and
2685
// close the connection.
2686
func (c *client) handleLeafNodeLoop(sendErr bool) {
15✔
2687
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
15✔
2688
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
15✔
2689
        if sendErr {
23✔
2690
                c.sendErr(errTxt)
8✔
2691
        }
8✔
2692

2693
        c.Errorf(errTxt)
15✔
2694
        // If we are here with "sendErr" false, it means that this is the server
15✔
2695
        // that received the error. The other side will have closed the connection,
15✔
2696
        // but does not hurt to close here too.
15✔
2697
        c.closeConnection(ProtocolViolation)
15✔
2698
}
2699

2700
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2701
func (c *client) processLeafUnsub(arg []byte) error {
3,424✔
2702
        // Indicate any activity, so pub and sub or unsubs.
3,424✔
2703
        c.in.subs++
3,424✔
2704

3,424✔
2705
        acc := c.acc
3,424✔
2706
        srv := c.srv
3,424✔
2707

3,424✔
2708
        c.mu.Lock()
3,424✔
2709
        if c.isClosed() {
3,462✔
2710
                c.mu.Unlock()
38✔
2711
                return nil
38✔
2712
        }
38✔
2713

2714
        spoke := c.isSpokeLeafNode()
3,386✔
2715
        // We store local subs by account and subject and optionally queue name.
3,386✔
2716
        // LS- will have the arg exactly as the key.
3,386✔
2717
        sub, ok := c.subs[string(arg)]
3,386✔
2718
        if !ok {
3,397✔
2719
                // If not found, don't try to update routes/gws/leaf nodes.
11✔
2720
                c.mu.Unlock()
11✔
2721
                return nil
11✔
2722
        }
11✔
2723
        delta := int32(1)
3,375✔
2724
        if len(sub.queue) > 0 {
3,797✔
2725
                delta = sub.qw
422✔
2726
        }
422✔
2727
        c.mu.Unlock()
3,375✔
2728

3,375✔
2729
        c.unsubscribe(acc, sub, true, true)
3,375✔
2730
        if !spoke {
4,429✔
2731
                // If we are routing subtract from the route map for the associated account.
1,054✔
2732
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,054✔
2733
                // Gateways
1,054✔
2734
                if srv.gateway.enabled {
1,341✔
2735
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
287✔
2736
                }
287✔
2737
        }
2738
        // Now check on leafnode updates for other leaf nodes.
2739
        acc.updateLeafNodes(sub, -delta)
3,375✔
2740
        return nil
3,375✔
2741
}
2742

2743
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
480✔
2744
        // Unroll splitArgs to avoid runtime/heap issues
480✔
2745
        a := [MAX_MSG_ARGS][]byte{}
480✔
2746
        args := a[:0]
480✔
2747
        start := -1
480✔
2748
        for i, b := range arg {
31,652✔
2749
                switch b {
31,172✔
2750
                case ' ', '\t', '\r', '\n':
1,372✔
2751
                        if start >= 0 {
2,744✔
2752
                                args = append(args, arg[start:i])
1,372✔
2753
                                start = -1
1,372✔
2754
                        }
1,372✔
2755
                default:
29,800✔
2756
                        if start < 0 {
31,652✔
2757
                                start = i
1,852✔
2758
                        }
1,852✔
2759
                }
2760
        }
2761
        if start >= 0 {
960✔
2762
                args = append(args, arg[start:])
480✔
2763
        }
480✔
2764

2765
        c.pa.arg = arg
480✔
2766
        switch len(args) {
480✔
2767
        case 0, 1, 2:
×
2768
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2769
        case 3:
86✔
2770
                c.pa.reply = nil
86✔
2771
                c.pa.queues = nil
86✔
2772
                c.pa.hdb = args[1]
86✔
2773
                c.pa.hdr = parseSize(args[1])
86✔
2774
                c.pa.szb = args[2]
86✔
2775
                c.pa.size = parseSize(args[2])
86✔
2776
        case 4:
380✔
2777
                c.pa.reply = args[1]
380✔
2778
                c.pa.queues = nil
380✔
2779
                c.pa.hdb = args[2]
380✔
2780
                c.pa.hdr = parseSize(args[2])
380✔
2781
                c.pa.szb = args[3]
380✔
2782
                c.pa.size = parseSize(args[3])
380✔
2783
        default:
14✔
2784
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2785
                if len(args[1]) != 1 {
14✔
2786
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2787
                }
×
2788
                switch args[1][0] {
14✔
2789
                case '+':
4✔
2790
                        c.pa.reply = args[2]
4✔
2791
                case '|':
10✔
2792
                        c.pa.reply = nil
10✔
2793
                default:
×
2794
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2795
                }
2796
                // Grab header size.
2797
                c.pa.hdb = args[len(args)-2]
14✔
2798
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2799

14✔
2800
                // Grab size.
14✔
2801
                c.pa.szb = args[len(args)-1]
14✔
2802
                c.pa.size = parseSize(c.pa.szb)
14✔
2803

14✔
2804
                // Grab queue names.
14✔
2805
                if c.pa.reply != nil {
18✔
2806
                        c.pa.queues = args[3 : len(args)-2]
4✔
2807
                } else {
14✔
2808
                        c.pa.queues = args[2 : len(args)-2]
10✔
2809
                }
10✔
2810
        }
2811
        if c.pa.hdr < 0 {
480✔
2812
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2813
        }
×
2814
        if c.pa.size < 0 {
480✔
2815
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2816
        }
×
2817
        if c.pa.hdr > c.pa.size {
481✔
2818
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
1✔
2819
        }
1✔
2820

2821
        // Common ones processed after check for arg length
2822
        c.pa.subject = args[0]
479✔
2823

479✔
2824
        return nil
479✔
2825
}
2826

2827
func (c *client) processLeafMsgArgs(arg []byte) error {
108,526✔
2828
        // Unroll splitArgs to avoid runtime/heap issues
108,526✔
2829
        a := [MAX_MSG_ARGS][]byte{}
108,526✔
2830
        args := a[:0]
108,526✔
2831
        start := -1
108,526✔
2832
        for i, b := range arg {
3,495,517✔
2833
                switch b {
3,386,991✔
2834
                case ' ', '\t', '\r', '\n':
160,229✔
2835
                        if start >= 0 {
320,458✔
2836
                                args = append(args, arg[start:i])
160,229✔
2837
                                start = -1
160,229✔
2838
                        }
160,229✔
2839
                default:
3,226,762✔
2840
                        if start < 0 {
3,495,517✔
2841
                                start = i
268,755✔
2842
                        }
268,755✔
2843
                }
2844
        }
2845
        if start >= 0 {
217,052✔
2846
                args = append(args, arg[start:])
108,526✔
2847
        }
108,526✔
2848

2849
        c.pa.arg = arg
108,526✔
2850
        switch len(args) {
108,526✔
2851
        case 0, 1:
×
2852
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2853
        case 2:
79,535✔
2854
                c.pa.reply = nil
79,535✔
2855
                c.pa.queues = nil
79,535✔
2856
                c.pa.szb = args[1]
79,535✔
2857
                c.pa.size = parseSize(args[1])
79,535✔
2858
        case 3:
6,441✔
2859
                c.pa.reply = args[1]
6,441✔
2860
                c.pa.queues = nil
6,441✔
2861
                c.pa.szb = args[2]
6,441✔
2862
                c.pa.size = parseSize(args[2])
6,441✔
2863
        default:
22,550✔
2864
                // args[1] is our reply indicator. Should be + or | normally.
22,550✔
2865
                if len(args[1]) != 1 {
22,551✔
2866
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
1✔
2867
                }
1✔
2868
                switch args[1][0] {
22,549✔
2869
                case '+':
161✔
2870
                        c.pa.reply = args[2]
161✔
2871
                case '|':
22,388✔
2872
                        c.pa.reply = nil
22,388✔
2873
                default:
×
2874
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2875
                }
2876
                // Grab size.
2877
                c.pa.szb = args[len(args)-1]
22,549✔
2878
                c.pa.size = parseSize(c.pa.szb)
22,549✔
2879

22,549✔
2880
                // Grab queue names.
22,549✔
2881
                if c.pa.reply != nil {
22,710✔
2882
                        c.pa.queues = args[3 : len(args)-1]
161✔
2883
                } else {
22,549✔
2884
                        c.pa.queues = args[2 : len(args)-1]
22,388✔
2885
                }
22,388✔
2886
        }
2887
        if c.pa.size < 0 {
108,525✔
2888
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2889
        }
×
2890

2891
        // Common ones processed after check for arg length
2892
        c.pa.subject = args[0]
108,525✔
2893

108,525✔
2894
        return nil
108,525✔
2895
}
2896

2897
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2898
func (c *client) processInboundLeafMsg(msg []byte) {
106,934✔
2899
        // Update statistics
106,934✔
2900
        // The msg includes the CR_LF, so pull back out for accounting.
106,934✔
2901
        c.in.msgs++
106,934✔
2902
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
106,934✔
2903

106,934✔
2904
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
106,934✔
2905

106,934✔
2906
        // Mostly under testing scenarios.
106,934✔
2907
        if srv == nil || acc == nil {
106,936✔
2908
                return
2✔
2909
        }
2✔
2910

2911
        // Match the subscriptions. We will use our own L1 map if
2912
        // it's still valid, avoiding contention on the shared sublist.
2913
        var r *SublistResult
106,932✔
2914
        var ok bool
106,932✔
2915

106,932✔
2916
        genid := atomic.LoadUint64(&c.acc.sl.genid)
106,932✔
2917
        if genid == c.in.genid && c.in.results != nil {
211,476✔
2918
                r, ok = c.in.results[subject]
104,544✔
2919
        } else {
106,932✔
2920
                // Reset our L1 completely.
2,388✔
2921
                c.in.results = make(map[string]*SublistResult)
2,388✔
2922
                c.in.genid = genid
2,388✔
2923
        }
2,388✔
2924

2925
        // Go back to the sublist data structure.
2926
        if !ok {
183,481✔
2927
                r = c.acc.sl.Match(subject)
76,549✔
2928
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
76,549✔
2929
                if len(c.in.results) >= maxResultCacheSize {
78,691✔
2930
                        n := 0
2,142✔
2931
                        for subj := range c.in.results {
72,828✔
2932
                                delete(c.in.results, subj)
70,686✔
2933
                                if n++; n > pruneSize {
72,828✔
2934
                                        break
2,142✔
2935
                                }
2936
                        }
2937
                }
2938
                // Then add the new cache entry.
2939
                c.in.results[subject] = r
76,549✔
2940
        }
2941

2942
        // Collect queue names if needed.
2943
        var qnames [][]byte
106,932✔
2944

106,932✔
2945
        // Check for no interest, short circuit if so.
106,932✔
2946
        // This is the fanout scale.
106,932✔
2947
        if len(r.psubs)+len(r.qsubs) > 0 {
213,549✔
2948
                flag := pmrNoFlag
106,617✔
2949
                // If we have queue subs in this cluster, then if we run in gateway
106,617✔
2950
                // mode and the remote gateways have queue subs, then we need to
106,617✔
2951
                // collect the queue groups this message was sent to so that we
106,617✔
2952
                // exclude them when sending to gateways.
106,617✔
2953
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
106,617✔
2954
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
118,840✔
2955
                        flag |= pmrCollectQueueNames
12,223✔
2956
                }
12,223✔
2957
                // If this is a mapped subject that means the mapped interest
2958
                // is what got us here, but this might not have a queue designation
2959
                // If that is the case, make sure we ignore to process local queue subscribers.
2960
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
106,934✔
2961
                        flag |= pmrIgnoreEmptyQueueFilter
317✔
2962
                }
317✔
2963
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
106,617✔
2964
        }
2965

2966
        // Now deal with gateways
2967
        if c.srv.gateway.enabled {
120,167✔
2968
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,235✔
2969
        }
13,235✔
2970
}
2971

2972
// Handles a subscription permission violation.
2973
// See leafPermViolation() for details.
2974
func (c *client) leafSubPermViolation(subj []byte) {
312✔
2975
        c.leafPermViolation(false, subj)
312✔
2976
}
312✔
2977

2978
// Common function to process publish or subscribe leafnode permission violation.
2979
// Sends the permission violation error to the remote, logs it and closes the connection.
2980
// If this is from a server soliciting, the reconnection will be delayed.
2981
func (c *client) leafPermViolation(pub bool, subj []byte) {
312✔
2982
        if c.isSpokeLeafNode() {
624✔
2983
                // For spokes these are no-ops since the hub server told us our permissions.
312✔
2984
                // We just need to not send these over to the other side since we will get cutoff.
312✔
2985
                return
312✔
2986
        }
312✔
2987
        // FIXME(dlc) ?
2988
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2989
        var action string
×
2990
        if pub {
×
2991
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2992
                action = "Publish"
×
2993
        } else {
×
2994
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2995
                action = "Subscription"
×
2996
        }
×
2997
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2998
        // TODO: add a new close reason that is more appropriate?
×
2999
        c.closeConnection(ProtocolViolation)
×
3000
}
3001

3002
// Invoked from generic processErr() for LEAF connections.
3003
func (c *client) leafProcessErr(errStr string) {
46✔
3004
        // Check if we got a cluster name collision.
46✔
3005
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
49✔
3006
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3007
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3008
                return
3✔
3009
        }
3✔
3010

3011
        // We will look for Loop detected error coming from the other side.
3012
        // If we solicit, set the connect delay.
3013
        if !strings.Contains(errStr, "Loop detected") {
79✔
3014
                return
36✔
3015
        }
36✔
3016
        c.handleLeafNodeLoop(false)
7✔
3017
}
3018

3019
// If this leaf connection solicits, sets the connect delay to the given value,
3020
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3021
// Returns the connection's account name and delay.
3022
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
18✔
3023
        c.mu.Lock()
18✔
3024
        if c.isSolicitedLeafNode() {
29✔
3025
                if s := c.srv; s != nil {
22✔
3026
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
3027
                                delay = srvdelay
4✔
3028
                        }
4✔
3029
                }
3030
                c.leaf.remote.setConnectDelay(delay)
11✔
3031
        }
3032
        accName := c.acc.Name
18✔
3033
        c.mu.Unlock()
18✔
3034
        return accName, delay
18✔
3035
}
3036

3037
// For the given remote Leafnode configuration, this function returns
3038
// if TLS is required, and if so, will return a clone of the TLS Config
3039
// (since some fields will be changed during handshake), the TLS server
3040
// name that is remembered, and the TLS timeout.
3041
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,936✔
3042
        var (
1,936✔
3043
                tlsConfig  *tls.Config
1,936✔
3044
                tlsName    string
1,936✔
3045
                tlsTimeout float64
1,936✔
3046
        )
1,936✔
3047

1,936✔
3048
        remote.RLock()
1,936✔
3049
        defer remote.RUnlock()
1,936✔
3050

1,936✔
3051
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,936✔
3052
        if tlsRequired {
2,017✔
3053
                if remote.TLSConfig != nil {
132✔
3054
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3055
                } else {
81✔
3056
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
30✔
3057
                }
30✔
3058
                tlsName = remote.tlsName
81✔
3059
                tlsTimeout = remote.TLSTimeout
81✔
3060
                if tlsTimeout == 0 {
128✔
3061
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
47✔
3062
                }
47✔
3063
        }
3064

3065
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,936✔
3066
}
3067

3068
// Initiates the LeafNode Websocket connection by:
3069
// - doing the TLS handshake if needed
3070
// - sending the HTTP request
3071
// - waiting for the HTTP response
3072
//
3073
// Since some bufio reader is used to consume the HTTP response, this function
3074
// returns the slice of buffered bytes (if any) so that the readLoop that will
3075
// be started after that consume those first before reading from the socket.
3076
// The boolean
3077
//
3078
// Lock held on entry.
3079
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
3080
        remote.RLock()
43✔
3081
        compress := remote.Websocket.Compression
43✔
3082
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
3083
        noMasking := remote.Websocket.NoMasking
43✔
3084
        infoTimeout := remote.FirstInfoTimeout
43✔
3085
        remote.RUnlock()
43✔
3086
        // Will do the client-side TLS handshake if needed.
43✔
3087
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
3088
        if err != nil {
47✔
3089
                // 0 will indicate that the connection was already closed
4✔
3090
                return nil, 0, err
4✔
3091
        }
4✔
3092

3093
        // For http request, we need the passed URL to contain either http or https scheme.
3094
        scheme := "http"
39✔
3095
        if tlsRequired {
47✔
3096
                scheme = "https"
8✔
3097
        }
8✔
3098
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3099
        // create a LEAF connection, not a CLIENT.
3100
        // In case we use the user's URL path in the future, make sure we append the user's
3101
        // path to our `/leafnode` path.
3102
        lpath := leafNodeWSPath
39✔
3103
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
3104
                if curPath[0] == '/' {
42✔
3105
                        curPath = curPath[1:]
21✔
3106
                }
21✔
3107
                lpath = path.Join(curPath, lpath)
21✔
3108
        } else {
18✔
3109
                lpath = lpath[1:]
18✔
3110
        }
18✔
3111
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3112
        u, _ := url.Parse(ustr)
39✔
3113
        req := &http.Request{
39✔
3114
                Method:     "GET",
39✔
3115
                URL:        u,
39✔
3116
                Proto:      "HTTP/1.1",
39✔
3117
                ProtoMajor: 1,
39✔
3118
                ProtoMinor: 1,
39✔
3119
                Header:     make(http.Header),
39✔
3120
                Host:       u.Host,
39✔
3121
        }
39✔
3122
        wsKey, err := wsMakeChallengeKey()
39✔
3123
        if err != nil {
39✔
3124
                return nil, WriteError, err
×
3125
        }
×
3126

3127
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3128
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3129
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3130
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3131
        if compress {
48✔
3132
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3133
        }
9✔
3134
        if noMasking {
49✔
3135
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3136
        }
10✔
3137
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3138
        if err := req.Write(c.nc); err != nil {
39✔
3139
                return nil, WriteError, err
×
3140
        }
×
3141

3142
        var resp *http.Response
39✔
3143

39✔
3144
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3145
        resp, err = http.ReadResponse(br, req)
39✔
3146
        if err == nil &&
39✔
3147
                (resp.StatusCode != 101 ||
39✔
3148
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3149
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3150
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3151

1✔
3152
                err = fmt.Errorf("invalid websocket connection")
1✔
3153
        }
1✔
3154
        // Check compression extension...
3155
        if err == nil && c.ws.compress {
48✔
3156
                // Check that not only permessage-deflate extension is present, but that
9✔
3157
                // we also have server and client no context take over.
9✔
3158
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3159

9✔
3160
                // If server does not support compression, then simply disable it in our side.
9✔
3161
                if !srvCompress {
13✔
3162
                        c.ws.compress = false
4✔
3163
                } else if !noCtxTakeover {
9✔
3164
                        err = fmt.Errorf("compression negotiation error")
×
3165
                }
×
3166
        }
3167
        // Same for no masking...
3168
        if err == nil && noMasking {
49✔
3169
                // Check if server accepts no masking
10✔
3170
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3171
                        // Nope, need to mask our writes as any client would do.
1✔
3172
                        c.ws.maskwrite = true
1✔
3173
                }
1✔
3174
        }
3175
        if resp != nil {
67✔
3176
                resp.Body.Close()
28✔
3177
        }
28✔
3178
        if err != nil {
51✔
3179
                return nil, ReadError, err
12✔
3180
        }
12✔
3181
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3182

27✔
3183
        var preBuf []byte
27✔
3184
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3185
        if n := br.Buffered(); n != 0 {
27✔
3186
                preBuf, _ = br.Peek(n)
×
3187
        }
×
3188
        return preBuf, 0, nil
27✔
3189
}
3190

3191
const connectProcessTimeout = 2 * time.Second
3192

3193
// This is invoked for remote LEAF remote connections after processing the INFO
3194
// protocol.
3195
func (s *Server) leafNodeResumeConnectProcess(c *client) {
677✔
3196
        clusterName := s.ClusterName()
677✔
3197

677✔
3198
        c.mu.Lock()
677✔
3199
        if c.isClosed() {
677✔
3200
                c.mu.Unlock()
×
3201
                return
×
3202
        }
×
3203
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
679✔
3204
                c.mu.Unlock()
2✔
3205
                c.closeConnection(WriteError)
2✔
3206
                return
2✔
3207
        }
2✔
3208

3209
        // Spin up the write loop.
3210
        s.startGoRoutine(func() { c.writeLoop() })
1,350✔
3211

3212
        // timeout leafNodeFinishConnectProcess
3213
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
675✔
3214
                c.mu.Lock()
×
3215
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3216
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3217
                        c.mu.Unlock()
×
3218
                        return
×
3219
                }
×
3220
                clearTimer(&c.ping.tmr)
×
3221
                closed := c.isClosed()
×
3222
                c.mu.Unlock()
×
3223
                if !closed {
×
3224
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3225
                        c.closeConnection(StaleConnection)
×
3226
                }
×
3227
        })
3228
        c.mu.Unlock()
675✔
3229
        c.Debugf("Remote leafnode connect msg sent")
675✔
3230
}
3231

3232
// This is invoked for remote LEAF connections after processing the INFO
3233
// protocol and leafNodeResumeConnectProcess.
3234
// This will send LS+ the CONNECT protocol and register the leaf node.
3235
func (s *Server) leafNodeFinishConnectProcess(c *client) {
641✔
3236
        c.mu.Lock()
641✔
3237
        if !c.flags.setIfNotSet(connectProcessFinished) {
641✔
3238
                c.mu.Unlock()
×
3239
                return
×
3240
        }
×
3241
        if c.isClosed() {
641✔
3242
                c.mu.Unlock()
×
3243
                s.removeLeafNodeConnection(c)
×
3244
                return
×
3245
        }
×
3246
        remote := c.leaf.remote
641✔
3247
        // Check if we will need to send the system connect event.
641✔
3248
        remote.RLock()
641✔
3249
        sendSysConnectEvent := remote.Hub
641✔
3250
        remote.RUnlock()
641✔
3251

641✔
3252
        // Capture account before releasing lock
641✔
3253
        acc := c.acc
641✔
3254
        // cancel connectProcessTimeout
641✔
3255
        clearTimer(&c.ping.tmr)
641✔
3256
        c.mu.Unlock()
641✔
3257

641✔
3258
        // Make sure we register with the account here.
641✔
3259
        if err := c.registerWithAccount(acc); err != nil {
643✔
3260
                if err == ErrTooManyAccountConnections {
2✔
3261
                        c.maxAccountConnExceeded()
×
3262
                        return
×
3263
                } else if err == ErrLeafNodeLoop {
4✔
3264
                        c.handleLeafNodeLoop(true)
2✔
3265
                        return
2✔
3266
                }
2✔
3267
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3268
                c.closeConnection(ProtocolViolation)
×
3269
                return
×
3270
        }
3271
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
639✔
3272
        s.initLeafNodeSmapAndSendSubs(c)
639✔
3273
        if sendSysConnectEvent {
651✔
3274
                s.sendLeafNodeConnect(acc)
12✔
3275
        }
12✔
3276

3277
        // The above functions are not atomically under the client
3278
        // lock doing those operations. It is possible - since we
3279
        // have started the read/write loops - that the connection
3280
        // is closed before or in between. This would leave the
3281
        // closed LN connection possible registered with the account
3282
        // and/or the server's leafs map. So check if connection
3283
        // is closed, and if so, manually cleanup.
3284
        c.mu.Lock()
639✔
3285
        closed := c.isClosed()
639✔
3286
        if !closed {
1,278✔
3287
                c.setFirstPingTimer()
639✔
3288
        }
639✔
3289
        c.mu.Unlock()
639✔
3290
        if closed {
639✔
3291
                s.removeLeafNodeConnection(c)
×
3292
                if prev := acc.removeClient(c); prev == 1 {
×
3293
                        s.decActiveAccounts()
×
3294
                }
×
3295
        }
3296
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc