• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 16822237274

07 Aug 2025 11:29AM UTC coverage: 85.93% (+0.04%) from 85.894%
16822237274

push

github

web-flow
Update to Go 1.24.6/1.23.12, nats.go v1.44.0 (#7156)

Signed-off-by: Neil Twigg <neil@nats.io>

71724 of 83468 relevant lines covered (85.93%)

335469.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.92
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
2,009✔
120
        return c.kind == LEAF && c.leaf.remote != nil
2,009✔
121
}
2,009✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
5,710,743✔
126
        return c.kind == LEAF && c.leaf.isSpoke
5,710,743✔
127
}
5,710,743✔
128

129
func (c *client) isHubLeafNode() bool {
18,652✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
18,652✔
131
}
18,652✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
549✔
135
        sysAccName := _EMPTY_
549✔
136
        sAcc := s.SystemAccount()
549✔
137
        if sAcc != nil {
1,075✔
138
                sysAccName = sAcc.Name
526✔
139
        }
526✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,238✔
141
                s.mu.Lock()
689✔
142
                remote := newLeafNodeCfg(r)
689✔
143
                creds := remote.Credentials
689✔
144
                accName := remote.LocalAccount
689✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
689✔
146
                // Print notice if
689✔
147
                if isSysAccRemote {
786✔
148
                        if len(remote.DenyExports) > 0 {
98✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
98✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
689✔
156
                if creds != _EMPTY_ {
738✔
157
                        contents, err := os.ReadFile(creds)
49✔
158
                        defer wipeSlice(contents)
49✔
159
                        if err != nil {
49✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
49✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
49✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
49✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
53✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
45✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
51✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
689✔
178
        }
179
        for _, r := range remotes {
1,238✔
180
                // We need to call this, even if the leaf is disabled. This is so that
689✔
181
                // the number of internal configuration matches the options' remote leaf
689✔
182
                // configuration required for configuration reload.
689✔
183
                remote := addRemote(r, r.LocalAccount == sysAccName)
689✔
184
                if !r.Disabled {
1,377✔
185
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,376✔
186
                }
187
        }
188
}
189

190
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
5,248✔
191
        if remote.Disabled {
5,249✔
192
                return false
1✔
193
        }
1✔
194
        for _, ri := range s.getOpts().LeafNode.Remotes {
10,856✔
195
                // FIXME(dlc) - What about auth changes?
5,609✔
196
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
10,856✔
197
                        return true
5,247✔
198
                }
5,247✔
199
        }
200
        return false
×
201
}
202

203
// Ensure that leafnode is properly configured.
204
func validateLeafNode(o *Options) error {
8,472✔
205
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,474✔
206
                return err
2✔
207
        }
2✔
208

209
        // Users can bind to any local account, if its empty we will assume the $G account.
210
        for _, r := range o.LeafNode.Remotes {
9,192✔
211
                if r.LocalAccount == _EMPTY_ {
1,159✔
212
                        r.LocalAccount = globalAccountName
437✔
213
                }
437✔
214
        }
215

216
        // In local config mode, check that leafnode configuration refers to accounts that exist.
217
        if len(o.TrustedOperators) == 0 {
16,624✔
218
                accNames := map[string]struct{}{}
8,154✔
219
                for _, a := range o.Accounts {
17,117✔
220
                        accNames[a.Name] = struct{}{}
8,963✔
221
                }
8,963✔
222
                // global account is always created
223
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,154✔
224
                // in the context of leaf nodes, empty account means global account
8,154✔
225
                accNames[_EMPTY_] = struct{}{}
8,154✔
226
                // system account either exists or, if not disabled, will be created
8,154✔
227
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,614✔
228
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,460✔
229
                }
6,460✔
230
                checkAccountExists := func(accName string, cfgType string) error {
17,036✔
231
                        if _, ok := accNames[accName]; !ok {
8,884✔
232
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
233
                        }
2✔
234
                        return nil
8,880✔
235
                }
236
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,155✔
237
                        return err
1✔
238
                }
1✔
239
                for _, lu := range o.LeafNode.Users {
8,163✔
240
                        if lu.Account == nil { // means global account
13✔
241
                                continue
3✔
242
                        }
243
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
244
                                return err
×
245
                        }
×
246
                }
247
                for _, r := range o.LeafNode.Remotes {
8,874✔
248
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
722✔
249
                                return err
1✔
250
                        }
1✔
251
                }
252
        } else {
316✔
253
                if len(o.LeafNode.Users) != 0 {
317✔
254
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
255
                }
1✔
256
                for _, r := range o.LeafNode.Remotes {
316✔
257
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
258
                                return fmt.Errorf(
1✔
259
                                        "operator mode requires account nkeys in remotes. " +
1✔
260
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
261
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
262
                        }
1✔
263
                }
264
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
315✔
265
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
266
                }
1✔
267
        }
268

269
        // Validate compression settings
270
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,391✔
271
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,931✔
272
                        return err
5✔
273
                }
5✔
274
        }
275

276
        // If a remote has a websocket scheme, all need to have it.
277
        for _, rcfg := range o.LeafNode.Remotes {
9,180✔
278
                if len(rcfg.URLs) >= 2 {
934✔
279
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
214✔
280
                        for i := 1; i < len(rcfg.URLs); i++ {
673✔
281
                                u := rcfg.URLs[i]
459✔
282
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
466✔
283
                                        ok = false
7✔
284
                                        break
7✔
285
                                }
286
                        }
287
                        if !ok {
221✔
288
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
289
                        }
7✔
290
                }
291
                // Validate compression settings
292
                if rcfg.Compression.Mode != _EMPTY_ {
1,426✔
293
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
718✔
294
                                return err
5✔
295
                        }
5✔
296
                }
297
        }
298

299
        if o.LeafNode.Port == 0 {
13,573✔
300
                return nil
5,125✔
301
        }
5,125✔
302

303
        // If MinVersion is defined, check that it is valid.
304
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,327✔
305
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
306
                        return err
2✔
307
                }
2✔
308
        }
309

310
        // The checks below will be done only when detecting that we are configured
311
        // with gateways. So if an option validation needs to be done regardless,
312
        // it MUST be done before this point!
313

314
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
5,958✔
315
                return nil
2,637✔
316
        }
2,637✔
317
        // If we are here we have both leaf nodes and gateways defined, make sure there
318
        // is a system account defined.
319
        if o.SystemAccount == _EMPTY_ {
685✔
320
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
321
        }
1✔
322
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
683✔
323
                return fmt.Errorf("leafnode: %v", err)
×
324
        }
×
325
        return nil
683✔
326
}
327

328
func checkLeafMinVersionConfig(mv string) error {
8✔
329
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
330
                if err != nil {
6✔
331
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
332
                } else {
4✔
333
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
334
                }
2✔
335
        }
336
        return nil
4✔
337
}
338

339
// Used to validate user names in LeafNode configuration.
340
// - rejects mix of single and multiple users.
341
// - rejects duplicate user names.
342
func validateLeafNodeAuthOptions(o *Options) error {
8,521✔
343
        if len(o.LeafNode.Users) == 0 {
17,022✔
344
                return nil
8,501✔
345
        }
8,501✔
346
        if o.LeafNode.Username != _EMPTY_ {
22✔
347
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
348
        }
2✔
349
        if o.LeafNode.Nkey != _EMPTY_ {
18✔
350
                return fmt.Errorf("can not have a single nkey and a users array")
×
351
        }
×
352
        users := map[string]struct{}{}
18✔
353
        for _, u := range o.LeafNode.Users {
42✔
354
                if _, exists := users[u.Username]; exists {
26✔
355
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
356
                }
2✔
357
                users[u.Username] = struct{}{}
22✔
358
        }
359
        return nil
16✔
360
}
361

362
// Update remote LeafNode TLS configurations after a config reload.
363
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
15✔
364
        max := len(opts.LeafNode.Remotes)
15✔
365
        if max == 0 {
15✔
366
                return
×
367
        }
×
368

369
        s.mu.RLock()
15✔
370
        defer s.mu.RUnlock()
15✔
371

15✔
372
        // Changes in the list of remote leaf nodes is not supported.
15✔
373
        // However, make sure that we don't go over the arrays.
15✔
374
        if len(s.leafRemoteCfgs) < max {
15✔
375
                max = len(s.leafRemoteCfgs)
×
376
        }
×
377
        for i := 0; i < max; i++ {
34✔
378
                ro := opts.LeafNode.Remotes[i]
19✔
379
                cfg := s.leafRemoteCfgs[i]
19✔
380
                if ro.TLSConfig != nil {
21✔
381
                        cfg.Lock()
2✔
382
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
383
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
384
                        cfg.Unlock()
2✔
385
                }
2✔
386
        }
387
}
388

389
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
231✔
390
        delay := s.getOpts().LeafNode.ReconnectInterval
231✔
391
        select {
231✔
392
        case <-time.After(delay):
178✔
393
        case <-s.quitCh:
53✔
394
                s.grWG.Done()
53✔
395
                return
53✔
396
        }
397
        s.connectToRemoteLeafNode(remote, false)
178✔
398
}
399

400
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
401
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
689✔
402
        cfg := &leafNodeCfg{
689✔
403
                RemoteLeafOpts: remote,
689✔
404
                urls:           make([]*url.URL, 0, len(remote.URLs)),
689✔
405
        }
689✔
406
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
697✔
407
                perms := &Permissions{}
8✔
408
                if len(remote.DenyExports) > 0 {
16✔
409
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
410
                }
8✔
411
                if len(remote.DenyImports) > 0 {
15✔
412
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
413
                }
7✔
414
                cfg.perms = perms
8✔
415
        }
416
        // Start with the one that is configured. We will add to this
417
        // array when receiving async leafnode INFOs.
418
        cfg.urls = append(cfg.urls, cfg.URLs...)
689✔
419
        // If allowed to randomize, do it on our copy of URLs
689✔
420
        if !remote.NoRandomize {
1,376✔
421
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,103✔
422
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
416✔
423
                })
416✔
424
        }
425
        // If we are TLS make sure we save off a proper servername if possible.
426
        // Do same for user/password since we may need them to connect to
427
        // a bare URL that we get from INFO protocol.
428
        for _, u := range cfg.urls {
1,824✔
429
                cfg.saveTLSHostname(u)
1,135✔
430
                cfg.saveUserPassword(u)
1,135✔
431
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,135✔
432
                // config, mark that we should be using TLS anyway.
1,135✔
433
                if !cfg.TLS && isWSSURL(u) {
1,136✔
434
                        cfg.TLS = true
1✔
435
                }
1✔
436
        }
437
        return cfg
689✔
438
}
439

440
// Will pick an URL from the list of available URLs.
441
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
4,485✔
442
        cfg.Lock()
4,485✔
443
        defer cfg.Unlock()
4,485✔
444
        // If the current URL is the first in the list and we have more than
4,485✔
445
        // one URL, then move that one to end of the list.
4,485✔
446
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
8,075✔
447
                first := cfg.urls[0]
3,590✔
448
                copy(cfg.urls, cfg.urls[1:])
3,590✔
449
                cfg.urls[len(cfg.urls)-1] = first
3,590✔
450
        }
3,590✔
451
        cfg.curURL = cfg.urls[0]
4,485✔
452
        return cfg.curURL
4,485✔
453
}
454

455
// Returns the current URL
456
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
78✔
457
        cfg.RLock()
78✔
458
        defer cfg.RUnlock()
78✔
459
        return cfg.curURL
78✔
460
}
78✔
461

462
// Returns how long the server should wait before attempting
463
// to solicit a remote leafnode connection.
464
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
868✔
465
        cfg.RLock()
868✔
466
        delay := cfg.connDelay
868✔
467
        cfg.RUnlock()
868✔
468
        return delay
868✔
469
}
868✔
470

471
// Sets the connect delay.
472
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
155✔
473
        cfg.Lock()
155✔
474
        cfg.connDelay = delay
155✔
475
        cfg.Unlock()
155✔
476
}
155✔
477

478
// Ensure that non-exported options (used in tests) have
479
// been properly set.
480
func (s *Server) setLeafNodeNonExportedOptions() {
6,939✔
481
        opts := s.getOpts()
6,939✔
482
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,939✔
483
        if s.leafNodeOpts.dialTimeout == 0 {
13,877✔
484
                // Use same timeouts as routes for now.
6,938✔
485
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,938✔
486
        }
6,938✔
487
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,939✔
488
        if s.leafNodeOpts.resolver == nil {
13,874✔
489
                s.leafNodeOpts.resolver = net.DefaultResolver
6,935✔
490
        }
6,935✔
491
}
492

493
const sharedSysAccDelay = 250 * time.Millisecond
494

495
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
868✔
496
        defer s.grWG.Done()
868✔
497

868✔
498
        if remote == nil || len(remote.URLs) == 0 {
868✔
499
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
500
                return
×
501
        }
×
502

503
        opts := s.getOpts()
868✔
504
        reconnectDelay := opts.LeafNode.ReconnectInterval
868✔
505
        s.mu.RLock()
868✔
506
        dialTimeout := s.leafNodeOpts.dialTimeout
868✔
507
        resolver := s.leafNodeOpts.resolver
868✔
508
        var isSysAcc bool
868✔
509
        if s.eventsEnabled() {
1,704✔
510
                isSysAcc = remote.LocalAccount == s.sys.account.Name
836✔
511
        }
836✔
512
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
868✔
513
        s.mu.RUnlock()
868✔
514

868✔
515
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
868✔
516
        if firstConnect && isSysAcc && !s.standAloneMode() {
942✔
517
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
74✔
518
                remote.setConnectDelay(sharedSysAccDelay)
74✔
519
        }
74✔
520

521
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
949✔
522
                select {
81✔
523
                case <-time.After(connDelay):
70✔
524
                case <-s.quitCh:
11✔
525
                        return
11✔
526
                }
527
                remote.setConnectDelay(0)
70✔
528
        }
529

530
        var conn net.Conn
857✔
531

857✔
532
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
857✔
533

857✔
534
        attempts := 0
857✔
535

857✔
536
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
5,342✔
537
                rURL := remote.pickNextURL()
4,485✔
538
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
4,485✔
539
                if err == nil {
8,963✔
540
                        var ipStr string
4,478✔
541
                        if url != rURL.Host {
4,552✔
542
                                ipStr = fmt.Sprintf(" (%s)", url)
74✔
543
                        }
74✔
544
                        // Some test may want to disable remotes from connecting
545
                        if s.isLeafConnectDisabled() {
4,611✔
546
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
133✔
547
                                err = ErrLeafNodeDisabled
133✔
548
                        } else {
4,478✔
549
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
4,345✔
550
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
4,345✔
551
                        }
4,345✔
552
                }
553
                if err != nil {
8,208✔
554
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
3,723✔
555
                        delay := reconnectDelay + jitter
3,723✔
556
                        attempts++
3,723✔
557
                        if s.shouldReportConnectErr(firstConnect, attempts) {
7,428✔
558
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,705✔
559
                        } else {
3,723✔
560
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
18✔
561
                        }
18✔
562
                        remote.Lock()
3,723✔
563
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
3,723✔
564
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
3,731✔
565
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
566
                                        s.checkJetStreamMigrate(remote)
8✔
567
                                })
8✔
568
                        }
569
                        remote.Unlock()
3,723✔
570
                        select {
3,723✔
571
                        case <-s.quitCh:
90✔
572
                                remote.cancelMigrateTimer()
90✔
573
                                return
90✔
574
                        case <-time.After(delay):
3,632✔
575
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
3,632✔
576
                                // This will be used if JetStreamClusterMigrateDelay was not set
3,632✔
577
                                if jetstreamMigrateDelay == 0 {
7,189✔
578
                                        s.checkJetStreamMigrate(remote)
3,557✔
579
                                }
3,557✔
580
                                continue
3,632✔
581
                        }
582
                }
583
                remote.cancelMigrateTimer()
762✔
584
                if !s.remoteLeafNodeStillValid(remote) {
762✔
585
                        conn.Close()
×
586
                        return
×
587
                }
×
588

589
                // We have a connection here to a remote server.
590
                // Go ahead and create our leaf node and return.
591
                s.createLeafNode(conn, rURL, remote, nil)
762✔
592

762✔
593
                // Clear any observer states if we had them.
762✔
594
                s.clearObserverState(remote)
762✔
595

762✔
596
                return
762✔
597
        }
598
}
599

600
func (cfg *leafNodeCfg) cancelMigrateTimer() {
852✔
601
        cfg.Lock()
852✔
602
        stopAndClearTimer(&cfg.jsMigrateTimer)
852✔
603
        cfg.Unlock()
852✔
604
}
852✔
605

606
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
607
func (s *Server) clearObserverState(remote *leafNodeCfg) {
762✔
608
        s.mu.RLock()
762✔
609
        accName := remote.LocalAccount
762✔
610
        s.mu.RUnlock()
762✔
611

762✔
612
        acc, err := s.LookupAccount(accName)
762✔
613
        if err != nil {
764✔
614
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
615
                return
2✔
616
        }
2✔
617

618
        acc.jscmMu.Lock()
760✔
619
        defer acc.jscmMu.Unlock()
760✔
620

760✔
621
        // Walk all streams looking for any clustered stream, skip otherwise.
760✔
622
        for _, mset := range acc.streams() {
780✔
623
                node := mset.raftNode()
20✔
624
                if node == nil {
32✔
625
                        // Not R>1
12✔
626
                        continue
12✔
627
                }
628
                // Check consumers
629
                for _, o := range mset.getConsumers() {
10✔
630
                        if n := o.raftNode(); n != nil {
4✔
631
                                // Ensure we can become a leader again.
2✔
632
                                n.SetObserver(false)
2✔
633
                        }
2✔
634
                }
635
                // Ensure we can not become a leader again.
636
                node.SetObserver(false)
8✔
637
        }
638
}
639

640
// Check to see if we should migrate any assets from this account.
641
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
3,565✔
642
        s.mu.RLock()
3,565✔
643
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
3,565✔
644
        s.mu.RUnlock()
3,565✔
645

3,565✔
646
        if !shouldMigrate {
7,064✔
647
                return
3,499✔
648
        }
3,499✔
649

650
        acc, err := s.LookupAccount(accName)
66✔
651
        if err != nil {
66✔
652
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
653
                return
×
654
        }
×
655

656
        acc.jscmMu.Lock()
66✔
657
        defer acc.jscmMu.Unlock()
66✔
658

66✔
659
        // Walk all streams looking for any clustered stream, skip otherwise.
66✔
660
        // If we are the leader force stepdown.
66✔
661
        for _, mset := range acc.streams() {
99✔
662
                node := mset.raftNode()
33✔
663
                if node == nil {
33✔
664
                        // Not R>1
×
665
                        continue
×
666
                }
667
                // Collect any consumers
668
                for _, o := range mset.getConsumers() {
55✔
669
                        if n := o.raftNode(); n != nil {
44✔
670
                                n.StepDown()
22✔
671
                                // Ensure we can not become a leader while in this state.
22✔
672
                                n.SetObserver(true)
22✔
673
                        }
22✔
674
                }
675
                // Stepdown if this stream was leader.
676
                node.StepDown()
33✔
677
                // Ensure we can not become a leader while in this state.
33✔
678
                node.SetObserver(true)
33✔
679
        }
680
}
681

682
// Helper for checking.
683
func (s *Server) isLeafConnectDisabled() bool {
4,478✔
684
        s.mu.RLock()
4,478✔
685
        defer s.mu.RUnlock()
4,478✔
686
        return s.leafDisableConnect
4,478✔
687
}
4,478✔
688

689
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
690
// come from the server we connect to.
691
//
692
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
693
// However, this was causing failures for users that did not set the scheme (and
694
// their remote connections did not have a tls{} block).
695
// We now save the host name regardless in case the remote returns an INFO indicating
696
// that TLS is required.
697
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,751✔
698
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,771✔
699
                cfg.tlsName = u.Hostname()
20✔
700
        }
20✔
701
}
702

703
// Save off the username/password for when we connect using a bare URL
704
// that we get from the INFO protocol.
705
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,135✔
706
        if cfg.username == _EMPTY_ && u.User != nil {
1,409✔
707
                cfg.username = u.User.Username()
274✔
708
                cfg.password, _ = u.User.Password()
274✔
709
        }
274✔
710
}
711

712
// This starts the leafnode accept loop in a go routine, unless it
713
// is detected that the server has already been shutdown.
714
func (s *Server) startLeafNodeAcceptLoop() {
3,290✔
715
        // Snapshot server options.
3,290✔
716
        opts := s.getOpts()
3,290✔
717

3,290✔
718
        port := opts.LeafNode.Port
3,290✔
719
        if port == -1 {
6,427✔
720
                port = 0
3,137✔
721
        }
3,137✔
722

723
        if s.isShuttingDown() {
3,291✔
724
                return
1✔
725
        }
1✔
726

727
        s.mu.Lock()
3,289✔
728
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,289✔
729
        l, e := natsListen("tcp", hp)
3,289✔
730
        s.leafNodeListenerErr = e
3,289✔
731
        if e != nil {
3,289✔
732
                s.mu.Unlock()
×
733
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
734
                return
×
735
        }
×
736

737
        s.Noticef("Listening for leafnode connections on %s",
3,289✔
738
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,289✔
739

3,289✔
740
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,289✔
741
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,289✔
742
        // Do not set compression in this Info object, it would possibly cause
3,289✔
743
        // issues when sending asynchronous INFO to the remote.
3,289✔
744
        info := Info{
3,289✔
745
                ID:            s.info.ID,
3,289✔
746
                Name:          s.info.Name,
3,289✔
747
                Version:       s.info.Version,
3,289✔
748
                GitCommit:     gitCommit,
3,289✔
749
                GoVersion:     runtime.Version(),
3,289✔
750
                AuthRequired:  true,
3,289✔
751
                TLSRequired:   tlsRequired,
3,289✔
752
                TLSVerify:     tlsVerify,
3,289✔
753
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,289✔
754
                Headers:       s.supportsHeaders(),
3,289✔
755
                JetStream:     opts.JetStream,
3,289✔
756
                Domain:        opts.JetStreamDomain,
3,289✔
757
                Proto:         s.getServerProto(),
3,289✔
758
                InfoOnConnect: true,
3,289✔
759
        }
3,289✔
760
        // If we have selected a random port...
3,289✔
761
        if port == 0 {
6,425✔
762
                // Write resolved port back to options.
3,136✔
763
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,136✔
764
        }
3,136✔
765

766
        s.leafNodeInfo = info
3,289✔
767
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,289✔
768
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,289✔
769
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
770
                l.Close()
×
771
                s.mu.Unlock()
×
772
                return
×
773
        }
×
774
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,289✔
775
        s.generateLeafNodeInfoJSON()
3,289✔
776

3,289✔
777
        // Setup state that can enable shutdown
3,289✔
778
        s.leafNodeListener = l
3,289✔
779

3,289✔
780
        // As of now, a server that does not have remotes configured would
3,289✔
781
        // never solicit a connection, so we should not have to warn if
3,289✔
782
        // InsecureSkipVerify is set in main LeafNodes config (since
3,289✔
783
        // this TLS setting matters only when soliciting a connection).
3,289✔
784
        // Still, warn if insecure is set in any of LeafNode block.
3,289✔
785
        // We need to check remotes, even if tls is not required on accept.
3,289✔
786
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,289✔
787
        if !warn {
6,574✔
788
                for _, r := range opts.LeafNode.Remotes {
3,442✔
789
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
790
                                warn = true
1✔
791
                                break
1✔
792
                        }
793
                }
794
        }
795
        if warn {
3,294✔
796
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
797
        }
5✔
798
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,082✔
799
        s.mu.Unlock()
3,289✔
800
}
801

802
// RegEx to match a creds file with user JWT and Seed.
803
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
804

805
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
806
// Lock should be held entering here.
807
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
643✔
808
        // We support basic user/pass and operator based user JWT with signatures.
643✔
809
        cinfo := leafConnectInfo{
643✔
810
                Version:       VERSION,
643✔
811
                ID:            c.srv.info.ID,
643✔
812
                Domain:        c.srv.info.Domain,
643✔
813
                Name:          c.srv.info.Name,
643✔
814
                Hub:           c.leaf.remote.Hub,
643✔
815
                Cluster:       clusterName,
643✔
816
                Headers:       headers,
643✔
817
                JetStream:     c.acc.jetStreamConfigured(),
643✔
818
                DenyPub:       c.leaf.remote.DenyImports,
643✔
819
                Compression:   c.leaf.compression,
643✔
820
                RemoteAccount: c.acc.GetName(),
643✔
821
                Proto:         c.srv.getServerProto(),
643✔
822
        }
643✔
823

643✔
824
        // If a signature callback is specified, this takes precedence over anything else.
643✔
825
        if cb := c.leaf.remote.SignatureCB; cb != nil {
646✔
826
                nonce := c.nonce
3✔
827
                c.mu.Unlock()
3✔
828
                jwt, sigraw, err := cb(nonce)
3✔
829
                c.mu.Lock()
3✔
830
                if err == nil && c.isClosed() {
4✔
831
                        err = ErrConnectionClosed
1✔
832
                }
1✔
833
                if err != nil {
5✔
834
                        c.Errorf("Error signing the nonce: %v", err)
2✔
835
                        return err
2✔
836
                }
2✔
837
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
838
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
839

840
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
691✔
841
                // Check for credentials first, that will take precedence..
51✔
842
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
51✔
843
                contents, err := os.ReadFile(creds)
51✔
844
                if err != nil {
51✔
845
                        c.Errorf("%v", err)
×
846
                        return err
×
847
                }
×
848
                defer wipeSlice(contents)
51✔
849
                items := credsRe.FindAllSubmatch(contents, -1)
51✔
850
                if len(items) < 2 {
51✔
851
                        c.Errorf("Credentials file malformed")
×
852
                        return err
×
853
                }
×
854
                // First result should be the user JWT.
855
                // We copy here so that the file containing the seed will be wiped appropriately.
856
                raw := items[0][1]
51✔
857
                tmp := make([]byte, len(raw))
51✔
858
                copy(tmp, raw)
51✔
859
                // Seed is second item.
51✔
860
                kp, err := nkeys.FromSeed(items[1][1])
51✔
861
                if err != nil {
51✔
862
                        c.Errorf("Credentials file has malformed seed")
×
863
                        return err
×
864
                }
×
865
                // Wipe our key on exit.
866
                defer kp.Wipe()
51✔
867

51✔
868
                sigraw, _ := kp.Sign(c.nonce)
51✔
869
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
51✔
870
                cinfo.JWT = bytesToString(tmp)
51✔
871
                cinfo.Sig = sig
51✔
872
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
591✔
873
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
874
                if err != nil {
2✔
875
                        c.Errorf("Remote nkey has malformed seed")
×
876
                        return err
×
877
                }
×
878
                // Wipe our key on exit.
879
                defer kp.Wipe()
2✔
880
                sigraw, _ := kp.Sign(c.nonce)
2✔
881
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
882
                pkey, _ := kp.PublicKey()
2✔
883
                cinfo.Nkey = pkey
2✔
884
                cinfo.Sig = sig
2✔
885
        }
886
        // In addition, and this is to allow auth callout, set user/password or
887
        // token if applicable.
888
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
932✔
889
                // For backward compatibility, if only username is provided, set both
291✔
890
                // Token and User, not just Token.
291✔
891
                cinfo.User = userInfo.Username()
291✔
892
                var ok bool
291✔
893
                cinfo.Pass, ok = userInfo.Password()
291✔
894
                if !ok {
297✔
895
                        cinfo.Token = cinfo.User
6✔
896
                }
6✔
897
        } else if c.leaf.remote.username != _EMPTY_ {
354✔
898
                cinfo.User = c.leaf.remote.username
4✔
899
                cinfo.Pass = c.leaf.remote.password
4✔
900
        }
4✔
901
        b, err := json.Marshal(cinfo)
641✔
902
        if err != nil {
641✔
903
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
904
                return err
×
905
        }
×
906
        // Although this call is made before the writeLoop is created,
907
        // we don't really need to send in place. The protocol will be
908
        // sent out by the writeLoop.
909
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
641✔
910
        return nil
641✔
911
}
912

913
// Makes a deep copy of the LeafNode Info structure.
914
// The server lock is held on entry.
915
func (s *Server) copyLeafNodeInfo() *Info {
2,576✔
916
        clone := s.leafNodeInfo
2,576✔
917
        // Copy the array of urls.
2,576✔
918
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,675✔
919
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,099✔
920
        }
2,099✔
921
        return &clone
2,576✔
922
}
923

924
// Adds a LeafNode URL that we get when a route connects to the Info structure.
925
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
926
// Returns a boolean indicating if the URL was added or not.
927
// Server lock is held on entry
928
func (s *Server) addLeafNodeURL(urlStr string) bool {
6,553✔
929
        if s.leafURLsMap.addUrl(urlStr) {
13,101✔
930
                s.generateLeafNodeInfoJSON()
6,548✔
931
                return true
6,548✔
932
        }
6,548✔
933
        return false
5✔
934
}
935

936
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
937
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
938
// Returns a boolean indicating if the URL was removed or not.
939
// Server lock is held on entry.
940
func (s *Server) removeLeafNodeURL(urlStr string) bool {
6,553✔
941
        // Don't need to do this if we are removing the route connection because
6,553✔
942
        // we are shuting down...
6,553✔
943
        if s.isShuttingDown() {
9,983✔
944
                return false
3,430✔
945
        }
3,430✔
946
        if s.leafURLsMap.removeUrl(urlStr) {
6,242✔
947
                s.generateLeafNodeInfoJSON()
3,119✔
948
                return true
3,119✔
949
        }
3,119✔
950
        return false
4✔
951
}
952

953
// Server lock is held on entry
954
func (s *Server) generateLeafNodeInfoJSON() {
12,956✔
955
        s.leafNodeInfo.Cluster = s.cachedClusterName()
12,956✔
956
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
12,956✔
957
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
12,956✔
958
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
12,956✔
959
}
12,956✔
960

961
// Sends an async INFO protocol so that the connected servers can update
962
// their list of LeafNode urls.
963
func (s *Server) sendAsyncLeafNodeInfo() {
9,667✔
964
        for _, c := range s.leafs {
9,768✔
965
                c.mu.Lock()
101✔
966
                c.enqueueProto(s.leafNodeInfoJSON)
101✔
967
                c.mu.Unlock()
101✔
968
        }
101✔
969
}
970

971
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
972
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,582✔
973
        // Snapshot server options.
1,582✔
974
        opts := s.getOpts()
1,582✔
975

1,582✔
976
        maxPay := int32(opts.MaxPayload)
1,582✔
977
        maxSubs := int32(opts.MaxSubs)
1,582✔
978
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,582✔
979
        if maxSubs == 0 {
3,163✔
980
                maxSubs = -1
1,581✔
981
        }
1,581✔
982
        now := time.Now().UTC()
1,582✔
983

1,582✔
984
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,582✔
985
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,582✔
986
        c.leaf = &leaf{}
1,582✔
987

1,582✔
988
        // For accepted LN connections, ws will be != nil if it was accepted
1,582✔
989
        // through the Websocket port.
1,582✔
990
        c.ws = ws
1,582✔
991

1,582✔
992
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,582✔
993
        // a remote Leaf Node connection as a websocket connection.
1,582✔
994
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,625✔
995
                remote.RLock()
43✔
996
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
997
                remote.RUnlock()
43✔
998
        }
43✔
999

1000
        // Determines if we are soliciting the connection or not.
1001
        var solicited bool
1,582✔
1002
        var acc *Account
1,582✔
1003
        var remoteSuffix string
1,582✔
1004
        if remote != nil {
2,344✔
1005
                // For now, if lookup fails, we will constantly try
762✔
1006
                // to recreate this LN connection.
762✔
1007
                lacc := remote.LocalAccount
762✔
1008
                var err error
762✔
1009
                acc, err = s.LookupAccount(lacc)
762✔
1010
                if err != nil {
764✔
1011
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1012
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1013
                        // remote needs to be set or retry won't happen
2✔
1014
                        c.leaf.remote = remote
2✔
1015
                        c.closeConnection(MissingAccount)
2✔
1016
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1017
                        return nil
2✔
1018
                }
2✔
1019
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
760✔
1020
        }
1021

1022
        c.mu.Lock()
1,580✔
1023
        c.initClient()
1,580✔
1024
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,580✔
1025

1,580✔
1026
        var (
1,580✔
1027
                tlsFirst         bool
1,580✔
1028
                tlsFirstFallback time.Duration
1,580✔
1029
                infoTimeout      time.Duration
1,580✔
1030
        )
1,580✔
1031
        if remote != nil {
2,340✔
1032
                solicited = true
760✔
1033
                remote.Lock()
760✔
1034
                c.leaf.remote = remote
760✔
1035
                c.setPermissions(remote.perms)
760✔
1036
                if !c.leaf.remote.Hub {
1,514✔
1037
                        c.leaf.isSpoke = true
754✔
1038
                }
754✔
1039
                tlsFirst = remote.TLSHandshakeFirst
760✔
1040
                infoTimeout = remote.FirstInfoTimeout
760✔
1041
                remote.Unlock()
760✔
1042
                c.acc = acc
760✔
1043
        } else {
820✔
1044
                c.flags.set(expectConnect)
820✔
1045
                if ws != nil {
847✔
1046
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1047
                }
27✔
1048
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
820✔
1049
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
821✔
1050
                        tlsFirstFallback = f
1✔
1051
                }
1✔
1052
        }
1053
        c.mu.Unlock()
1,580✔
1054

1,580✔
1055
        var nonce [nonceLen]byte
1,580✔
1056
        var info *Info
1,580✔
1057

1,580✔
1058
        // Grab this before the client lock below.
1,580✔
1059
        if !solicited {
2,400✔
1060
                // Grab server variables
820✔
1061
                s.mu.Lock()
820✔
1062
                info = s.copyLeafNodeInfo()
820✔
1063
                // For tests that want to simulate old servers, do not set the compression
820✔
1064
                // on the INFO protocol if configured with CompressionNotSupported.
820✔
1065
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,639✔
1066
                        info.Compression = cm
819✔
1067
                }
819✔
1068
                s.generateNonce(nonce[:])
820✔
1069
                s.mu.Unlock()
820✔
1070
        }
1071

1072
        // Grab lock
1073
        c.mu.Lock()
1,580✔
1074

1,580✔
1075
        var preBuf []byte
1,580✔
1076
        if solicited {
2,340✔
1077
                // For websocket connection, we need to send an HTTP request,
760✔
1078
                // and get the response before starting the readLoop to get
760✔
1079
                // the INFO, etc..
760✔
1080
                if c.isWebsocket() {
803✔
1081
                        var err error
43✔
1082
                        var closeReason ClosedState
43✔
1083

43✔
1084
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1085
                        if err != nil {
59✔
1086
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1087
                                c.mu.Unlock()
16✔
1088
                                if closeReason != 0 {
28✔
1089
                                        c.closeConnection(closeReason)
12✔
1090
                                }
12✔
1091
                                return nil
16✔
1092
                        }
1093
                } else {
717✔
1094
                        // If configured to do TLS handshake first
717✔
1095
                        if tlsFirst {
721✔
1096
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1097
                                        c.mu.Unlock()
1✔
1098
                                        return nil
1✔
1099
                                }
1✔
1100
                        }
1101
                        // We need to wait for the info, but not for too long.
1102
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
716✔
1103
                }
1104

1105
                // We will process the INFO from the readloop and finish by
1106
                // sending the CONNECT and finish registration later.
1107
        } else {
820✔
1108
                // Send our info to the other side.
820✔
1109
                // Remember the nonce we sent here for signatures, etc.
820✔
1110
                c.nonce = make([]byte, nonceLen)
820✔
1111
                copy(c.nonce, nonce[:])
820✔
1112
                info.Nonce = bytesToString(c.nonce)
820✔
1113
                info.CID = c.cid
820✔
1114
                proto := generateInfoJSON(info)
820✔
1115

820✔
1116
                var pre []byte
820✔
1117
                // We need first to check for "TLS First" fallback delay.
820✔
1118
                if tlsFirstFallback > 0 {
821✔
1119
                        // We wait and see if we are getting any data. Since we did not send
1✔
1120
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1121
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1122
                        // if it is a rogue agent and not an actual client performing the
1✔
1123
                        // TLS handshake, the error will be detected when performing the
1✔
1124
                        // handshake on our side.
1✔
1125
                        pre = make([]byte, 4)
1✔
1126
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1127
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1128
                        c.nc.SetReadDeadline(time.Time{})
1✔
1129
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1130
                        // with the TLS handshake.
1✔
1131
                        if n > 0 {
1✔
1132
                                pre = pre[:n]
×
1133
                        } else {
1✔
1134
                                // We did not get anything so we will send the INFO protocol.
1✔
1135
                                pre = nil
1✔
1136
                                // Set the boolean to false for the rest of the function.
1✔
1137
                                tlsFirst = false
1✔
1138
                        }
1✔
1139
                }
1140

1141
                if !tlsFirst {
1,635✔
1142
                        // We have to send from this go routine because we may
815✔
1143
                        // have to block for TLS handshake before we start our
815✔
1144
                        // writeLoop go routine. The other side needs to receive
815✔
1145
                        // this before it can initiate the TLS handshake..
815✔
1146
                        c.sendProtoNow(proto)
815✔
1147

815✔
1148
                        // The above call could have marked the connection as closed (due to TCP error).
815✔
1149
                        if c.isClosed() {
815✔
1150
                                c.mu.Unlock()
×
1151
                                c.closeConnection(WriteError)
×
1152
                                return nil
×
1153
                        }
×
1154
                }
1155

1156
                // Check to see if we need to spin up TLS.
1157
                if !c.isWebsocket() && info.TLSRequired {
891✔
1158
                        // If we have a prebuffer create a multi-reader.
71✔
1159
                        if len(pre) > 0 {
71✔
1160
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1161
                        }
×
1162
                        // Perform server-side TLS handshake.
1163
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
115✔
1164
                                c.mu.Unlock()
44✔
1165
                                return nil
44✔
1166
                        }
44✔
1167
                }
1168

1169
                // If the user wants the TLS handshake to occur first, now that it is
1170
                // done, send the INFO protocol.
1171
                if tlsFirst {
779✔
1172
                        c.flags.set(didTLSFirst)
3✔
1173
                        c.sendProtoNow(proto)
3✔
1174
                        if c.isClosed() {
3✔
1175
                                c.mu.Unlock()
×
1176
                                c.closeConnection(WriteError)
×
1177
                                return nil
×
1178
                        }
×
1179
                }
1180

1181
                // Leaf nodes will always require a CONNECT to let us know
1182
                // when we are properly bound to an account.
1183
                //
1184
                // If compression is configured, we can't set the authTimer here because
1185
                // it would cause the parser to fail any incoming protocol that is not a
1186
                // CONNECT (and we need to exchange INFO protocols for compression
1187
                // negotiation). So instead, use the ping timer until we are done with
1188
                // negotiation and can set the auth timer.
1189
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
776✔
1190
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,344✔
1191
                        c.ping.tmr = time.AfterFunc(timeout, func() {
578✔
1192
                                c.authTimeout()
10✔
1193
                        })
10✔
1194
                } else {
208✔
1195
                        c.setAuthTimer(timeout)
208✔
1196
                }
208✔
1197
        }
1198

1199
        // Keep track in case server is shutdown before we can successfully register.
1200
        if !s.addToTempClients(c.cid, c) {
1,520✔
1201
                c.mu.Unlock()
1✔
1202
                c.setNoReconnect()
1✔
1203
                c.closeConnection(ServerShutdown)
1✔
1204
                return nil
1✔
1205
        }
1✔
1206

1207
        // Spin up the read loop.
1208
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,036✔
1209

1210
        // We will spin the write loop for solicited connections only
1211
        // when processing the INFO and after switching to TLS if needed.
1212
        if !solicited {
2,294✔
1213
                s.startGoRoutine(func() { c.writeLoop() })
1,552✔
1214
        }
1215

1216
        c.mu.Unlock()
1,518✔
1217

1,518✔
1218
        return c
1,518✔
1219
}
1220

1221
// Will perform the client-side TLS handshake if needed. Assumes that this
1222
// is called by the solicit side (remote will be non nil). Returns `true`
1223
// if TLS is required, `false` otherwise.
1224
// Lock held on entry.
1225
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,840✔
1226
        // Check if TLS is required and gather TLS config variables.
1,840✔
1227
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,840✔
1228
        if !tlsRequired {
3,602✔
1229
                return false, nil
1,762✔
1230
        }
1,762✔
1231

1232
        // If TLS required, peform handshake.
1233
        // Get the URL that was used to connect to the remote server.
1234
        rURL := remote.getCurrentURL()
78✔
1235

78✔
1236
        // Perform the client-side TLS handshake.
78✔
1237
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
115✔
1238
                // Check if we need to reset the remote's TLS name.
37✔
1239
                if resetTLSName {
37✔
1240
                        remote.Lock()
×
1241
                        remote.tlsName = _EMPTY_
×
1242
                        remote.Unlock()
×
1243
                }
×
1244
                return false, err
37✔
1245
        }
1246
        return true, nil
41✔
1247
}
1248

1249
func (c *client) processLeafnodeInfo(info *Info) {
2,546✔
1250
        c.mu.Lock()
2,546✔
1251
        if c.leaf == nil || c.isClosed() {
2,546✔
1252
                c.mu.Unlock()
×
1253
                return
×
1254
        }
×
1255
        s := c.srv
2,546✔
1256
        opts := s.getOpts()
2,546✔
1257
        remote := c.leaf.remote
2,546✔
1258
        didSolicit := remote != nil
2,546✔
1259
        firstINFO := !c.flags.isSet(infoReceived)
2,546✔
1260

2,546✔
1261
        // In case of websocket, the TLS handshake has been already done.
2,546✔
1262
        // So check only for non websocket connections and for configurations
2,546✔
1263
        // where the TLS Handshake was not done first.
2,546✔
1264
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,339✔
1265
                // If the server requires TLS, we need to set this in the remote
1,793✔
1266
                // otherwise if there is no TLS configuration block for the remote,
1,793✔
1267
                // the solicit side will not attempt to perform the TLS handshake.
1,793✔
1268
                if firstINFO && info.TLSRequired {
1,855✔
1269
                        remote.TLS = true
62✔
1270
                }
62✔
1271
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,825✔
1272
                        c.mu.Unlock()
32✔
1273
                        return
32✔
1274
                }
32✔
1275
        }
1276

1277
        // Check for compression, unless already done.
1278
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,753✔
1279
                // Prevent from getting back here.
1,239✔
1280
                c.flags.set(compressionNegotiated)
1,239✔
1281

1,239✔
1282
                var co *CompressionOpts
1,239✔
1283
                if !didSolicit {
1,781✔
1284
                        co = &opts.LeafNode.Compression
542✔
1285
                } else {
1,239✔
1286
                        co = &remote.Compression
697✔
1287
                }
697✔
1288
                if needsCompression(co.Mode) {
2,467✔
1289
                        // Release client lock since following function will need server lock.
1,228✔
1290
                        c.mu.Unlock()
1,228✔
1291
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,228✔
1292
                        if err != nil {
1,228✔
1293
                                c.sendErrAndErr(err.Error())
×
1294
                                c.closeConnection(ProtocolViolation)
×
1295
                                return
×
1296
                        }
×
1297
                        if compress {
2,328✔
1298
                                // Done for now, will get back another INFO protocol...
1,100✔
1299
                                return
1,100✔
1300
                        }
1,100✔
1301
                        // No compression because one side does not want/can't, so proceed.
1302
                        c.mu.Lock()
128✔
1303
                        // Check that the connection did not close if the lock was released.
128✔
1304
                        if c.isClosed() {
128✔
1305
                                c.mu.Unlock()
×
1306
                                return
×
1307
                        }
×
1308
                } else {
11✔
1309
                        // Coming from an old server, the Compression field would be the empty
11✔
1310
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1311
                        // this makes them behave as old servers.
11✔
1312
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1313
                                c.leaf.compression = CompressionNotSupported
3✔
1314
                        } else {
11✔
1315
                                c.leaf.compression = CompressionOff
8✔
1316
                        }
8✔
1317
                }
1318
                // Accepting side does not normally process an INFO protocol during
1319
                // initial connection handshake. So we keep it consistent by returning
1320
                // if we are not soliciting.
1321
                if !didSolicit {
140✔
1322
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1323
                        // clear the ping timer and set the auth timer now that the compression
1✔
1324
                        // negotiation is done.
1✔
1325
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1326
                                clearTimer(&c.ping.tmr)
×
1327
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1328
                        }
×
1329
                        c.mu.Unlock()
1✔
1330
                        return
1✔
1331
                }
1332
                // Fall through and process the INFO protocol as usual.
1333
        }
1334

1335
        // Note: For now, only the initial INFO has a nonce. We
1336
        // will probably do auto key rotation at some point.
1337
        if firstINFO {
2,150✔
1338
                // Mark that the INFO protocol has been received.
737✔
1339
                c.flags.set(infoReceived)
737✔
1340
                // Prevent connecting to non leafnode port. Need to do this only for
737✔
1341
                // the first INFO, not for async INFO updates...
737✔
1342
                //
737✔
1343
                // Content of INFO sent by the server when accepting a tcp connection.
737✔
1344
                // -------------------------------------------------------------------
737✔
1345
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
737✔
1346
                // -------------------------------------------------------------------
737✔
1347
                //      CLIENT    |  X* |        X**        |              |         |
737✔
1348
                //      ROUTE     |     |        X**        |      X***    |         |
737✔
1349
                //     GATEWAY    |     |                   |              |    X    |
737✔
1350
                //     LEAFNODE   |  X  |                   |       X      |         |
737✔
1351
                // -------------------------------------------------------------------
737✔
1352
                // *   Not on older servers.
737✔
1353
                // **  Not if "no advertise" is enabled.
737✔
1354
                // *** Not if leafnode's "no advertise" is enabled.
737✔
1355
                //
737✔
1356
                // As seen from above, a solicited LeafNode connection should receive
737✔
1357
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
737✔
1358
                // else should be considered an attempt to connect to a wrong port.
737✔
1359
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
788✔
1360
                        c.mu.Unlock()
51✔
1361
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1362
                        c.closeConnection(WrongPort)
51✔
1363
                        return
51✔
1364
                }
51✔
1365
                // Reject a cluster that contains spaces.
1366
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
687✔
1367
                        c.mu.Unlock()
1✔
1368
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1369
                        c.closeConnection(ProtocolViolation)
1✔
1370
                        return
1✔
1371
                }
1✔
1372
                // Capture a nonce here.
1373
                c.nonce = []byte(info.Nonce)
685✔
1374
                if info.TLSRequired && didSolicit {
715✔
1375
                        remote.TLS = true
30✔
1376
                }
30✔
1377
                supportsHeaders := c.srv.supportsHeaders()
685✔
1378
                c.headers = supportsHeaders && info.Headers
685✔
1379

685✔
1380
                // Remember the remote server.
685✔
1381
                // Pre 2.2.0 servers are not sending their server name.
685✔
1382
                // In that case, use info.ID, which, for those servers, matches
685✔
1383
                // the content of the field `Name` in the leafnode CONNECT protocol.
685✔
1384
                if info.Name == _EMPTY_ {
685✔
1385
                        c.leaf.remoteServer = info.ID
×
1386
                } else {
685✔
1387
                        c.leaf.remoteServer = info.Name
685✔
1388
                }
685✔
1389
                c.leaf.remoteDomain = info.Domain
685✔
1390
                c.leaf.remoteCluster = info.Cluster
685✔
1391
                // We send the protocol version in the INFO protocol.
685✔
1392
                // Keep track of it, so we know if this connection supports message
685✔
1393
                // tracing for instance.
685✔
1394
                c.opts.Protocol = info.Proto
685✔
1395
        }
1396

1397
        // For both initial INFO and async INFO protocols, Possibly
1398
        // update our list of remote leafnode URLs we can connect to.
1399
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,636✔
1400
                // Consider the incoming array as the most up-to-date
1,275✔
1401
                // representation of the remote cluster's list of URLs.
1,275✔
1402
                c.updateLeafNodeURLs(info)
1,275✔
1403
        }
1,275✔
1404

1405
        // Check to see if we have permissions updates here.
1406
        if info.Import != nil || info.Export != nil {
1,373✔
1407
                perms := &Permissions{
12✔
1408
                        Publish:   info.Export,
12✔
1409
                        Subscribe: info.Import,
12✔
1410
                }
12✔
1411
                // Check if we have local deny clauses that we need to merge.
12✔
1412
                if remote := c.leaf.remote; remote != nil {
24✔
1413
                        if len(remote.DenyExports) > 0 {
13✔
1414
                                if perms.Publish == nil {
1✔
1415
                                        perms.Publish = &SubjectPermission{}
×
1416
                                }
×
1417
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1418
                        }
1419
                        if len(remote.DenyImports) > 0 {
13✔
1420
                                if perms.Subscribe == nil {
1✔
1421
                                        perms.Subscribe = &SubjectPermission{}
×
1422
                                }
×
1423
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1424
                        }
1425
                }
1426
                c.setPermissions(perms)
12✔
1427
        }
1428

1429
        var resumeConnect, checkSyncConsumers bool
1,361✔
1430

1,361✔
1431
        // If this is a remote connection and this is the first INFO protocol,
1,361✔
1432
        // then we need to finish the connect process by sending CONNECT, etc..
1,361✔
1433
        if firstINFO && didSolicit {
2,004✔
1434
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
643✔
1435
                c.nc.SetDeadline(time.Time{})
643✔
1436
                resumeConnect = true
643✔
1437
        } else if !firstINFO && didSolicit {
1,993✔
1438
                c.leaf.remoteAccName = info.RemoteAccount
632✔
1439
                checkSyncConsumers = info.JetStream
632✔
1440
        }
632✔
1441

1442
        // Check if we have the remote account information and if so make sure it's stored.
1443
        if info.RemoteAccount != _EMPTY_ {
1,981✔
1444
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
620✔
1445
        }
620✔
1446
        c.mu.Unlock()
1,361✔
1447

1,361✔
1448
        finishConnect := info.ConnectInfo
1,361✔
1449
        if resumeConnect && s != nil {
2,004✔
1450
                s.leafNodeResumeConnectProcess(c)
643✔
1451
                if !info.InfoOnConnect {
643✔
1452
                        finishConnect = true
×
1453
                }
×
1454
        }
1455
        if finishConnect {
1,981✔
1456
                s.leafNodeFinishConnectProcess(c)
620✔
1457
        }
620✔
1458

1459
        // If we have JS enabled and so does the other side, we will
1460
        // check to see if we need to kick any internal source or mirror consumers.
1461
        if checkSyncConsumers {
1,624✔
1462
                s.checkInternalSyncConsumers(c.acc, info.Domain)
263✔
1463
        }
263✔
1464
}
1465

1466
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,228✔
1467
        // Negotiate the appropriate compression mode (or no compression)
1,228✔
1468
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,228✔
1469
        if err != nil {
1,228✔
1470
                return false, err
×
1471
        }
×
1472
        c.mu.Lock()
1,228✔
1473
        // For "auto" mode, set the initial compression mode based on RTT
1,228✔
1474
        if cm == CompressionS2Auto {
2,289✔
1475
                if c.rttStart.IsZero() {
2,122✔
1476
                        c.rtt = computeRTT(c.start)
1,061✔
1477
                }
1,061✔
1478
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,061✔
1479
        }
1480
        // Keep track of the negotiated compression mode.
1481
        c.leaf.compression = cm
1,228✔
1482
        cid := c.cid
1,228✔
1483
        var nonce string
1,228✔
1484
        if !didSolicit {
1,769✔
1485
                nonce = bytesToString(c.nonce)
541✔
1486
        }
541✔
1487
        c.mu.Unlock()
1,228✔
1488

1,228✔
1489
        if !needsCompression(cm) {
1,356✔
1490
                return false, nil
128✔
1491
        }
128✔
1492

1493
        // If we end-up doing compression...
1494

1495
        // Generate an INFO with the chosen compression mode.
1496
        s.mu.Lock()
1,100✔
1497
        info := s.copyLeafNodeInfo()
1,100✔
1498
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,100✔
1499
        infoProto := generateInfoJSON(info)
1,100✔
1500
        s.mu.Unlock()
1,100✔
1501

1,100✔
1502
        // If we solicited, then send this INFO protocol BEFORE switching
1,100✔
1503
        // to compression writer. However, if we did not, we send it after.
1,100✔
1504
        c.mu.Lock()
1,100✔
1505
        if didSolicit {
1,659✔
1506
                c.enqueueProto(infoProto)
559✔
1507
                // Make sure it is completely flushed (the pending bytes goes to
559✔
1508
                // 0) before proceeding.
559✔
1509
                for c.out.pb > 0 && !c.isClosed() {
1,118✔
1510
                        c.flushOutbound()
559✔
1511
                }
559✔
1512
        }
1513
        // This is to notify the readLoop that it should switch to a
1514
        // (de)compression reader.
1515
        c.in.flags.set(switchToCompression)
1,100✔
1516
        // Create the compress writer before queueing the INFO protocol for
1,100✔
1517
        // a route that did not solicit. It will make sure that that proto
1,100✔
1518
        // is sent with compression on.
1,100✔
1519
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,100✔
1520
        if !didSolicit {
1,641✔
1521
                c.enqueueProto(infoProto)
541✔
1522
        }
541✔
1523
        c.mu.Unlock()
1,100✔
1524
        return true, nil
1,100✔
1525
}
1526

1527
// When getting a leaf node INFO protocol, use the provided
1528
// array of urls to update the list of possible endpoints.
1529
func (c *client) updateLeafNodeURLs(info *Info) {
1,275✔
1530
        cfg := c.leaf.remote
1,275✔
1531
        cfg.Lock()
1,275✔
1532
        defer cfg.Unlock()
1,275✔
1533

1,275✔
1534
        // We have ensured that if a remote has a WS scheme, then all are.
1,275✔
1535
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,275✔
1536
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,329✔
1537
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1538
                // we will have already marked that the remote should use TLS anyway.
54✔
1539
                // But use proper scheme for log statements, etc...
54✔
1540
                proto := wsSchemePrefix
54✔
1541
                if cfg.TLS {
54✔
1542
                        proto = wsSchemePrefixTLS
×
1543
                }
×
1544
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1545
                return
54✔
1546
        }
1547
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,221✔
1548
}
1549

1550
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,275✔
1551
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,275✔
1552
        // Add the ones we receive in the protocol
1,275✔
1553
        for _, surl := range URLs {
3,587✔
1554
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,312✔
1555
                if err != nil {
2,312✔
1556
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1557
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1558
                        continue
×
1559
                }
1560
                // Do not add if it's the same as what we already have configured.
1561
                var dup bool
2,312✔
1562
                for _, u := range cfg.URLs {
5,876✔
1563
                        // URLs that we receive never have user info, but the
3,564✔
1564
                        // ones that were configured may have. Simply compare
3,564✔
1565
                        // host and port to decide if they are equal or not.
3,564✔
1566
                        if url.Host == u.Host && url.Port() == u.Port() {
5,260✔
1567
                                dup = true
1,696✔
1568
                                break
1,696✔
1569
                        }
1570
                }
1571
                if !dup {
2,928✔
1572
                        cfg.urls = append(cfg.urls, url)
616✔
1573
                        cfg.saveTLSHostname(url)
616✔
1574
                }
616✔
1575
        }
1576
        // Add the configured one
1577
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,275✔
1578
}
1579

1580
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1581
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,289✔
1582
        opts := s.getOpts()
3,289✔
1583
        if opts.LeafNode.Advertise != _EMPTY_ {
3,300✔
1584
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1585
                if err != nil {
11✔
1586
                        return err
×
1587
                }
×
1588
                s.leafNodeInfo.Host = advHost
11✔
1589
                s.leafNodeInfo.Port = advPort
11✔
1590
        } else {
3,278✔
1591
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,278✔
1592
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,278✔
1593
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,278✔
1594
                // This will return at most 1 IP.
3,278✔
1595
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,278✔
1596
                if err != nil {
3,278✔
1597
                        return err
×
1598
                }
×
1599
                if hostIsIPAny {
3,565✔
1600
                        if len(ips) == 0 {
287✔
1601
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1602
                                        s.leafNodeInfo.Host)
×
1603
                        } else {
287✔
1604
                                // Take the first from the list...
287✔
1605
                                s.leafNodeInfo.Host = ips[0]
287✔
1606
                        }
287✔
1607
                }
1608
        }
1609
        // Use just host:port for the IP
1610
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,289✔
1611
        if opts.LeafNode.Advertise != _EMPTY_ {
3,300✔
1612
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1613
        }
11✔
1614
        return nil
3,289✔
1615
}
1616

1617
// Add the connection to the map of leaf nodes.
1618
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1619
// if a connection already exists for the same server name and account.
1620
// That can happen when the remote is attempting to reconnect while the accepting
1621
// side did not detect the connection as broken yet.
1622
// But it can also happen when there is a misconfiguration and the remote is
1623
// creating two (or more) connections that bind to the same account on the accept
1624
// side.
1625
// When a duplicate is found, the new connection is accepted and the old is closed
1626
// (this solves the stale connection situation). An error is returned to help the
1627
// remote detect the misconfiguration when the duplicate is the result of that
1628
// misconfiguration.
1629
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,273✔
1630
        var accName string
1,273✔
1631
        c.mu.Lock()
1,273✔
1632
        cid := c.cid
1,273✔
1633
        acc := c.acc
1,273✔
1634
        if acc != nil {
2,546✔
1635
                accName = acc.Name
1,273✔
1636
        }
1,273✔
1637
        myRemoteDomain := c.leaf.remoteDomain
1,273✔
1638
        mySrvName := c.leaf.remoteServer
1,273✔
1639
        remoteAccName := c.leaf.remoteAccName
1,273✔
1640
        myClustName := c.leaf.remoteCluster
1,273✔
1641
        solicited := c.leaf.remote != nil
1,273✔
1642
        c.mu.Unlock()
1,273✔
1643

1,273✔
1644
        var old *client
1,273✔
1645
        s.mu.Lock()
1,273✔
1646
        // We check for empty because in some test we may send empty CONNECT{}
1,273✔
1647
        if checkForDup && srvName != _EMPTY_ {
1,895✔
1648
                for _, ol := range s.leafs {
1,014✔
1649
                        ol.mu.Lock()
392✔
1650
                        // We care here only about non solicited Leafnode. This function
392✔
1651
                        // is more about replacing stale connections than detecting loops.
392✔
1652
                        // We have code for the loop detection elsewhere, which also delays
392✔
1653
                        // attempt to reconnect.
392✔
1654
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
392✔
1655
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
392✔
1656
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
395✔
1657
                                old = ol
3✔
1658
                        }
3✔
1659
                        ol.mu.Unlock()
392✔
1660
                        if old != nil {
395✔
1661
                                break
3✔
1662
                        }
1663
                }
1664
        }
1665
        // Store new connection in the map
1666
        s.leafs[cid] = c
1,273✔
1667
        s.mu.Unlock()
1,273✔
1668
        s.removeFromTempClients(cid)
1,273✔
1669

1,273✔
1670
        // If applicable, evict the old one.
1,273✔
1671
        if old != nil {
1,276✔
1672
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
3✔
1673
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
3✔
1674
                c.Warnf("Replacing connection from same server")
3✔
1675
        }
3✔
1676

1677
        srvDecorated := func() string {
1,480✔
1678
                if myClustName == _EMPTY_ {
229✔
1679
                        return mySrvName
22✔
1680
                }
22✔
1681
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
185✔
1682
        }
1683

1684
        opts := s.getOpts()
1,273✔
1685
        sysAcc := s.SystemAccount()
1,273✔
1686
        js := s.getJetStream()
1,273✔
1687
        var meta *raft
1,273✔
1688
        if js != nil {
1,800✔
1689
                if mg := js.getMetaGroup(); mg != nil {
961✔
1690
                        meta = mg.(*raft)
434✔
1691
                }
434✔
1692
        }
1693
        blockMappingOutgoing := false
1,273✔
1694
        // Deny (non domain) JetStream API traffic unless system account is shared
1,273✔
1695
        // and domain names are identical and extending is not disabled
1,273✔
1696

1,273✔
1697
        // Check if backwards compatibility has been enabled and needs to be acted on
1,273✔
1698
        forceSysAccDeny := false
1,273✔
1699
        if len(opts.JsAccDefaultDomain) > 0 {
1,310✔
1700
                if acc == sysAcc {
48✔
1701
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1702
                                if d == _EMPTY_ {
19✔
1703
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1704
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1705
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1706
                                        forceSysAccDeny = true
8✔
1707
                                        break
8✔
1708
                                }
1709
                        }
1710
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1711
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1712
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1713
                        return
15✔
1714
                }
15✔
1715
        }
1716

1717
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1718
        // This is either signaled by js being disabled and a domain set,
1719
        // or in cases where no domain name exists, an extension hint is set.
1720
        // However, this is only relevant in mixed setups.
1721
        //
1722
        // If the system account connects but default domains are present, JetStream can't be extended.
1723
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,258✔
1724
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,358✔
1725
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,100✔
1726
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,100✔
1727
                if acc != nil && acc == sysAcc {
1,235✔
1728
                        c.Noticef("System account connected from %s", srvDecorated())
135✔
1729
                        c.Noticef("JetStream not extended, domains differ")
135✔
1730
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
135✔
1731
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
135✔
1732
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
135✔
1733
                        if solicited && meta != nil && meta.IsObserver() {
164✔
1734
                                meta.setObserver(false, extNotExtended)
29✔
1735
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1736
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1737
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1738
                                // Meta controller can't be leader yet.
29✔
1739
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1740
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1741
                                meta.Campaign()
29✔
1742
                        }
29✔
1743
                } else {
965✔
1744
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
965✔
1745
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
965✔
1746
                }
965✔
1747
                blockMappingOutgoing = true
1,100✔
1748
        } else if acc == sysAcc {
230✔
1749
                // system account and same domain
72✔
1750
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1751
                        myRemoteDomain, srvDecorated())
72✔
1752
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1753
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1754
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1755
                        meta.setObserver(true, extExtended)
4✔
1756
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1757
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1758
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1759
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1760
                        meta.StepDown()
4✔
1761
                }
4✔
1762
        } else {
86✔
1763
                // This deny is needed in all cases (system account shared or not)
86✔
1764
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1765
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1766
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1767
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1768
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1769
        }
86✔
1770
        // If we have a specified JetStream domain we will want to add a mapping to
1771
        // allow access cross domain for each non-system account.
1772
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,492✔
1773
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,340✔
1774
                        if err := acc.AddMapping(src, dest); err != nil {
2,106✔
1775
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1776
                        } else {
2,106✔
1777
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,106✔
1778
                        }
2,106✔
1779
                }
1780
                if blockMappingOutgoing {
437✔
1781
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
203✔
1782
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
203✔
1783
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
203✔
1784
                        // of this issue, not all of them.
203✔
1785
                        // This guards against a hub and a spoke having the same domain name.
203✔
1786
                        // But not two spokes having the same one and the request coming from the hub.
203✔
1787
                        c.mergeDenyPermissionsLocked(pub, []string{src})
203✔
1788
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
203✔
1789
                }
203✔
1790
        }
1791
}
1792

1793
func (s *Server) removeLeafNodeConnection(c *client) {
1,582✔
1794
        c.mu.Lock()
1,582✔
1795
        cid := c.cid
1,582✔
1796
        if c.leaf != nil {
3,164✔
1797
                if c.leaf.tsubt != nil {
2,743✔
1798
                        c.leaf.tsubt.Stop()
1,161✔
1799
                        c.leaf.tsubt = nil
1,161✔
1800
                }
1,161✔
1801
                if c.leaf.gwSub != nil {
2,200✔
1802
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
618✔
1803
                        // We need to set this to nil for GC to release the connection
618✔
1804
                        c.leaf.gwSub = nil
618✔
1805
                }
618✔
1806
        }
1807
        c.mu.Unlock()
1,582✔
1808
        s.mu.Lock()
1,582✔
1809
        delete(s.leafs, cid)
1,582✔
1810
        s.mu.Unlock()
1,582✔
1811
        s.removeFromTempClients(cid)
1,582✔
1812
}
1813

1814
// Connect information for solicited leafnodes.
1815
type leafConnectInfo struct {
1816
        Version   string   `json:"version,omitempty"`
1817
        Nkey      string   `json:"nkey,omitempty"`
1818
        JWT       string   `json:"jwt,omitempty"`
1819
        Sig       string   `json:"sig,omitempty"`
1820
        User      string   `json:"user,omitempty"`
1821
        Pass      string   `json:"pass,omitempty"`
1822
        Token     string   `json:"auth_token,omitempty"`
1823
        ID        string   `json:"server_id,omitempty"`
1824
        Domain    string   `json:"domain,omitempty"`
1825
        Name      string   `json:"name,omitempty"`
1826
        Hub       bool     `json:"is_hub,omitempty"`
1827
        Cluster   string   `json:"cluster,omitempty"`
1828
        Headers   bool     `json:"headers,omitempty"`
1829
        JetStream bool     `json:"jetstream,omitempty"`
1830
        DenyPub   []string `json:"deny_pub,omitempty"`
1831

1832
        // There was an existing field called:
1833
        // >> Comp bool `json:"compression,omitempty"`
1834
        // that has never been used. With support for compression, we now need
1835
        // a field that is a string. So we use a different json tag:
1836
        Compression string `json:"compress_mode,omitempty"`
1837

1838
        // Just used to detect wrong connection attempts.
1839
        Gateway string `json:"gateway,omitempty"`
1840

1841
        // Tells the accept side which account the remote is binding to.
1842
        RemoteAccount string `json:"remote_account,omitempty"`
1843

1844
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1845
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1846
        // version as part of the CONNECT. It will indicate if a connection supports
1847
        // some features, such as message tracing.
1848
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1849
        // in the low level process CONNECT.
1850
        Proto int `json:"protocol,omitempty"`
1851
}
1852

1853
// processLeafNodeConnect will process the inbound connect args.
1854
// Once we are here we are bound to an account, so can send any interest that
1855
// we would have to the other side.
1856
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
660✔
1857
        // Way to detect clients that incorrectly connect to the route listen
660✔
1858
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
660✔
1859
        if lang != _EMPTY_ {
660✔
1860
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1861
                c.closeConnection(WrongPort)
×
1862
                return ErrClientConnectedToLeafNodePort
×
1863
        }
×
1864

1865
        // Unmarshal as a leaf node connect protocol
1866
        proto := &leafConnectInfo{}
660✔
1867
        if err := json.Unmarshal(arg, proto); err != nil {
660✔
1868
                return err
×
1869
        }
×
1870

1871
        // Reject a cluster that contains spaces.
1872
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
661✔
1873
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1874
                c.closeConnection(ProtocolViolation)
1✔
1875
                return ErrClusterNameHasSpaces
1✔
1876
        }
1✔
1877

1878
        // Check for cluster name collisions.
1879
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
662✔
1880
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
1881
                c.closeConnection(ClusterNamesIdentical)
3✔
1882
                return ErrLeafNodeHasSameClusterName
3✔
1883
        }
3✔
1884

1885
        // Reject if this has Gateway which means that it would be from a gateway
1886
        // connection that incorrectly connects to the leafnode port.
1887
        if proto.Gateway != _EMPTY_ {
656✔
1888
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1889
                c.Errorf(errTxt)
×
1890
                c.sendErr(errTxt)
×
1891
                c.closeConnection(WrongGateway)
×
1892
                return ErrWrongGateway
×
1893
        }
×
1894

1895
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
658✔
1896
                major, minor, update, _ := versionComponents(mv)
2✔
1897
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1898
                        // We are going to send back an INFO because otherwise recent
1✔
1899
                        // versions of the remote server would simply break the connection
1✔
1900
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1901
                        // other side to just "stall" until we finish waiting for the holding
1✔
1902
                        // period and close the connection below.
1✔
1903
                        s.sendPermsAndAccountInfo(c)
1✔
1904
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1905
                        select {
1✔
1906
                        case <-c.srv.quitCh:
1✔
1907
                        case <-time.After(leafNodeWaitBeforeClose):
×
1908
                        }
1909
                        c.closeConnection(MinimumVersionRequired)
1✔
1910
                        return ErrMinimumVersionRequired
1✔
1911
                }
1912
        }
1913

1914
        // Check if this server supports headers.
1915
        supportHeaders := c.srv.supportsHeaders()
655✔
1916

655✔
1917
        c.mu.Lock()
655✔
1918
        // Leaf Nodes do not do echo or verbose or pedantic.
655✔
1919
        c.opts.Verbose = false
655✔
1920
        c.opts.Echo = false
655✔
1921
        c.opts.Pedantic = false
655✔
1922
        // This inbound connection will be marked as supporting headers if this server
655✔
1923
        // support headers and the remote has sent in the CONNECT protocol that it does
655✔
1924
        // support headers too.
655✔
1925
        c.headers = supportHeaders && proto.Headers
655✔
1926
        // If the compression level is still not set, set it based on what has been
655✔
1927
        // given to us in the CONNECT protocol.
655✔
1928
        if c.leaf.compression == _EMPTY_ {
783✔
1929
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
128✔
1930
                if proto.Compression == _EMPTY_ {
161✔
1931
                        c.leaf.compression = CompressionNotSupported
33✔
1932
                } else {
128✔
1933
                        c.leaf.compression = proto.Compression
95✔
1934
                }
95✔
1935
        }
1936

1937
        // Remember the remote server.
1938
        c.leaf.remoteServer = proto.Name
655✔
1939
        // Remember the remote account name
655✔
1940
        c.leaf.remoteAccName = proto.RemoteAccount
655✔
1941

655✔
1942
        // If the other side has declared itself a hub, so we will take on the spoke role.
655✔
1943
        if proto.Hub {
661✔
1944
                c.leaf.isSpoke = true
6✔
1945
        }
6✔
1946

1947
        // The soliciting side is part of a cluster.
1948
        if proto.Cluster != _EMPTY_ {
1,163✔
1949
                c.leaf.remoteCluster = proto.Cluster
508✔
1950
        }
508✔
1951

1952
        c.leaf.remoteDomain = proto.Domain
655✔
1953

655✔
1954
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
655✔
1955
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
655✔
1956
        if !c.isSolicitedLeafNode() && c.perms != nil {
668✔
1957
                sp, pp := c.perms.sub, c.perms.pub
13✔
1958
                c.perms.sub, c.perms.pub = pp, sp
13✔
1959
                if c.opts.Import != nil {
25✔
1960
                        c.darray = c.opts.Import.Deny
12✔
1961
                } else {
13✔
1962
                        c.darray = nil
1✔
1963
                }
1✔
1964
        }
1965

1966
        // Set the Ping timer
1967
        c.setFirstPingTimer()
655✔
1968

655✔
1969
        // If we received pub deny permissions from the other end, merge with existing ones.
655✔
1970
        c.mergeDenyPermissions(pub, proto.DenyPub)
655✔
1971

655✔
1972
        acc := c.acc
655✔
1973
        c.mu.Unlock()
655✔
1974

655✔
1975
        // Register the cluster, even if empty, as long as we are acting as a hub.
655✔
1976
        if !proto.Hub {
1,304✔
1977
                acc.registerLeafNodeCluster(proto.Cluster)
649✔
1978
        }
649✔
1979

1980
        // Add in the leafnode here since we passed through auth at this point.
1981
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
655✔
1982

655✔
1983
        // If we have permissions bound to this leafnode we need to send then back to the
655✔
1984
        // origin server for local enforcement.
655✔
1985
        s.sendPermsAndAccountInfo(c)
655✔
1986

655✔
1987
        // Create and initialize the smap since we know our bound account now.
655✔
1988
        // This will send all registered subs too.
655✔
1989
        s.initLeafNodeSmapAndSendSubs(c)
655✔
1990

655✔
1991
        // Announce the account connect event for a leaf node.
655✔
1992
        // This will be a no-op as needed.
655✔
1993
        s.sendLeafNodeConnect(c.acc)
655✔
1994

655✔
1995
        // If we have JS enabled and so does the other side, we will
655✔
1996
        // check to see if we need to kick any internal source or mirror consumers.
655✔
1997
        if proto.JetStream {
898✔
1998
                s.checkInternalSyncConsumers(acc, proto.Domain)
243✔
1999
        }
243✔
2000
        return nil
655✔
2001
}
2002

2003
// checkInternalSyncConsumers
2004
func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
506✔
2005
        // Grab our js
506✔
2006
        js := s.getJetStream()
506✔
2007

506✔
2008
        // Only applicable if we have JS and the leafnode has JS as well.
506✔
2009
        // We check for remote JS outside.
506✔
2010
        if !js.isEnabled() || acc == nil {
630✔
2011
                return
124✔
2012
        }
124✔
2013

2014
        // We will check all streams in our local account. They must be a leader and
2015
        // be sourcing or mirroring. We will check the external config on the stream itself
2016
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2017
        // extedning the system across this leafnode connection and hence we would be extending
2018
        // our own domain.
2019
        jsa := js.lookupAccount(acc)
382✔
2020
        if jsa == nil {
491✔
2021
                return
109✔
2022
        }
109✔
2023
        var streams []*stream
273✔
2024
        jsa.mu.RLock()
273✔
2025
        for _, mset := range jsa.streams {
294✔
2026
                mset.cfgMu.RLock()
21✔
2027
                // We need to have a mirror or source defined.
21✔
2028
                // We do not want to force another lock here to look for leader status,
21✔
2029
                // so collect and after we release jsa will make sure.
21✔
2030
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
25✔
2031
                        streams = append(streams, mset)
4✔
2032
                }
4✔
2033
                mset.cfgMu.RUnlock()
21✔
2034
        }
2035
        jsa.mu.RUnlock()
273✔
2036

273✔
2037
        // Now loop through all candidates and check if we are the leader and have NOT
273✔
2038
        // created the sync up consumer.
273✔
2039
        for _, mset := range streams {
277✔
2040
                mset.retryDisconnectedSyncConsumers(remoteDomain)
4✔
2041
        }
4✔
2042
}
2043

2044
// Returns the remote cluster name. This is set only once so does not require a lock.
2045
func (c *client) remoteCluster() string {
192,838✔
2046
        if c.leaf == nil {
192,838✔
2047
                return _EMPTY_
×
2048
        }
×
2049
        return c.leaf.remoteCluster
192,838✔
2050
}
2051

2052
// Sends back an info block to the soliciting leafnode to let it know about
2053
// its permission settings for local enforcement.
2054
func (s *Server) sendPermsAndAccountInfo(c *client) {
656✔
2055
        // Copy
656✔
2056
        s.mu.Lock()
656✔
2057
        info := s.copyLeafNodeInfo()
656✔
2058
        s.mu.Unlock()
656✔
2059
        c.mu.Lock()
656✔
2060
        info.CID = c.cid
656✔
2061
        info.Import = c.opts.Import
656✔
2062
        info.Export = c.opts.Export
656✔
2063
        info.RemoteAccount = c.acc.Name
656✔
2064
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
656✔
2065
        info.IsSystemAccount = c.acc == s.SystemAccount()
656✔
2066
        info.ConnectInfo = true
656✔
2067
        c.enqueueProto(generateInfoJSON(info))
656✔
2068
        c.mu.Unlock()
656✔
2069
}
656✔
2070

2071
// Snapshot the current subscriptions from the sublist into our smap which
2072
// we will keep updated from now on.
2073
// Also send the registered subscriptions.
2074
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,273✔
2075
        acc := c.acc
1,273✔
2076
        if acc == nil {
1,273✔
2077
                c.Debugf("Leafnode does not have an account bound")
×
2078
                return
×
2079
        }
×
2080
        // Collect all account subs here.
2081
        _subs := [1024]*subscription{}
1,273✔
2082
        subs := _subs[:0]
1,273✔
2083
        ims := []string{}
1,273✔
2084

1,273✔
2085
        // Hold the client lock otherwise there can be a race and miss some subs.
1,273✔
2086
        c.mu.Lock()
1,273✔
2087
        defer c.mu.Unlock()
1,273✔
2088

1,273✔
2089
        acc.mu.RLock()
1,273✔
2090
        accName := acc.Name
1,273✔
2091
        accNTag := acc.nameTag
1,273✔
2092

1,273✔
2093
        // To make printing look better when no friendly name present.
1,273✔
2094
        if accNTag != _EMPTY_ {
1,284✔
2095
                accNTag = "/" + accNTag
11✔
2096
        }
11✔
2097

2098
        // If we are solicited we only send interest for local clients.
2099
        if c.isSpokeLeafNode() {
1,891✔
2100
                acc.sl.localSubs(&subs, true)
618✔
2101
        } else {
1,273✔
2102
                acc.sl.All(&subs)
655✔
2103
        }
655✔
2104

2105
        // Check if we have an existing service import reply.
2106
        siReply := copyBytes(acc.siReply)
1,273✔
2107

1,273✔
2108
        // Since leaf nodes only send on interest, if the bound
1,273✔
2109
        // account has import services we need to send those over.
1,273✔
2110
        for isubj := range acc.imports.services {
5,998✔
2111
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,003✔
2112
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
278✔
2113
                        continue
278✔
2114
                }
2115
                ims = append(ims, isubj)
4,447✔
2116
        }
2117
        // Likewise for mappings.
2118
        for _, m := range acc.mappings {
3,499✔
2119
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,262✔
2120
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2121
                        continue
36✔
2122
                }
2123
                ims = append(ims, m.src)
2,190✔
2124
        }
2125

2126
        // Create a unique subject that will be used for loop detection.
2127
        lds := acc.lds
1,273✔
2128
        acc.mu.RUnlock()
1,273✔
2129

1,273✔
2130
        // Check if we have to create the LDS.
1,273✔
2131
        if lds == _EMPTY_ {
2,272✔
2132
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
999✔
2133
                acc.mu.Lock()
999✔
2134
                acc.lds = lds
999✔
2135
                acc.mu.Unlock()
999✔
2136
        }
999✔
2137

2138
        // Now check for gateway interest. Leafnodes will put this into
2139
        // the proper mode to propagate, but they are not held in the account.
2140
        gwsa := [16]*client{}
1,273✔
2141
        gws := gwsa[:0]
1,273✔
2142
        s.getOutboundGatewayConnections(&gws)
1,273✔
2143
        for _, cgw := range gws {
1,355✔
2144
                cgw.mu.Lock()
82✔
2145
                gw := cgw.gw
82✔
2146
                cgw.mu.Unlock()
82✔
2147
                if gw != nil {
164✔
2148
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
164✔
2149
                                if e := ei.(*outsie); e != nil && e.sl != nil {
164✔
2150
                                        e.sl.All(&subs)
82✔
2151
                                }
82✔
2152
                        }
2153
                }
2154
        }
2155

2156
        applyGlobalRouting := s.gateway.enabled
1,273✔
2157
        if c.isSpokeLeafNode() {
1,891✔
2158
                // Add a fake subscription for this solicited leafnode connection
618✔
2159
                // so that we can send back directly for mapped GW replies.
618✔
2160
                // We need to keep track of this subscription so it can be removed
618✔
2161
                // when the connection is closed so that the GC can release it.
618✔
2162
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
618✔
2163
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
618✔
2164
        }
618✔
2165

2166
        // Now walk the results and add them to our smap
2167
        rc := c.leaf.remoteCluster
1,273✔
2168
        c.leaf.smap = make(map[string]int32)
1,273✔
2169
        for _, sub := range subs {
38,882✔
2170
                // Check perms regardless of role.
37,609✔
2171
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
39,931✔
2172
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,322✔
2173
                        continue
2,322✔
2174
                }
2175
                // We ignore ourselves here.
2176
                // Also don't add the subscription if it has a origin cluster and the
2177
                // cluster name matches the one of the client we are sending to.
2178
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
64,998✔
2179
                        count := int32(1)
29,711✔
2180
                        if len(sub.queue) > 0 && sub.qw > 0 {
29,721✔
2181
                                count = sub.qw
10✔
2182
                        }
10✔
2183
                        c.leaf.smap[keyFromSub(sub)] += count
29,711✔
2184
                        if c.leaf.tsub == nil {
30,907✔
2185
                                c.leaf.tsub = make(map[*subscription]struct{})
1,196✔
2186
                        }
1,196✔
2187
                        c.leaf.tsub[sub] = struct{}{}
29,711✔
2188
                }
2189
        }
2190
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2191
        for _, isubj := range ims {
7,910✔
2192
                c.leaf.smap[isubj]++
6,637✔
2193
        }
6,637✔
2194
        // If we have gateways enabled we need to make sure the other side sends us responses
2195
        // that have been augmented from the original subscription.
2196
        // TODO(dlc) - Should we lock this down more?
2197
        if applyGlobalRouting {
1,376✔
2198
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
103✔
2199
                c.leaf.smap[gwReplyPrefix+">"]++
103✔
2200
        }
103✔
2201
        // Detect loops by subscribing to a specific subject and checking
2202
        // if this sub is coming back to us.
2203
        c.leaf.smap[lds]++
1,273✔
2204

1,273✔
2205
        // Check if we need to add an existing siReply to our map.
1,273✔
2206
        // This will be a prefix so add on the wildcard.
1,273✔
2207
        if siReply != nil {
1,290✔
2208
                wcsub := append(siReply, '>')
17✔
2209
                c.leaf.smap[string(wcsub)]++
17✔
2210
        }
17✔
2211
        // Queue all protocols. There is no max pending limit for LN connection,
2212
        // so we don't need chunking. The writes will happen from the writeLoop.
2213
        var b bytes.Buffer
1,273✔
2214
        for key, n := range c.leaf.smap {
27,415✔
2215
                c.writeLeafSub(&b, key, n)
26,142✔
2216
        }
26,142✔
2217
        if b.Len() > 0 {
2,546✔
2218
                c.enqueueProto(b.Bytes())
1,273✔
2219
        }
1,273✔
2220
        if c.leaf.tsub != nil {
2,470✔
2221
                // Clear the tsub map after 5 seconds.
1,197✔
2222
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,233✔
2223
                        c.mu.Lock()
36✔
2224
                        if c.leaf != nil {
72✔
2225
                                c.leaf.tsub = nil
36✔
2226
                                c.leaf.tsubt = nil
36✔
2227
                        }
36✔
2228
                        c.mu.Unlock()
36✔
2229
                })
2230
        }
2231
}
2232

2233
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2234
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
197,966✔
2235
        acc, err := s.LookupAccount(accName)
197,966✔
2236
        if acc == nil || err != nil {
198,125✔
2237
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
159✔
2238
                return
159✔
2239
        }
159✔
2240
        acc.updateLeafNodes(sub, delta)
197,807✔
2241
}
2242

2243
// updateLeafNodes will make sure to update the account smap for the subscription.
2244
// Will also forward to all leaf nodes as needed.
2245
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,405,441✔
2246
        if acc == nil || sub == nil {
2,405,441✔
2247
                return
×
2248
        }
×
2249

2250
        // We will do checks for no leafnodes and same cluster here inline and under the
2251
        // general account read lock.
2252
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2253
        // blocking routes or GWs.
2254

2255
        acc.mu.RLock()
2,405,441✔
2256
        // First check if we even have leafnodes here.
2,405,441✔
2257
        if acc.nleafs == 0 {
4,743,088✔
2258
                acc.mu.RUnlock()
2,337,647✔
2259
                return
2,337,647✔
2260
        }
2,337,647✔
2261

2262
        // Is this a loop detection subject.
2263
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
67,794✔
2264

67,794✔
2265
        // Capture the cluster even if its empty.
67,794✔
2266
        var cluster string
67,794✔
2267
        if sub.origin != nil {
116,390✔
2268
                cluster = bytesToString(sub.origin)
48,596✔
2269
        }
48,596✔
2270

2271
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2272
        // Empty clusters will return false for the check.
2273
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
88,759✔
2274
                acc.mu.RUnlock()
20,965✔
2275
                return
20,965✔
2276
        }
20,965✔
2277

2278
        // We can release the general account lock.
2279
        acc.mu.RUnlock()
46,829✔
2280

46,829✔
2281
        // We can hold the list lock here to avoid having to copy a large slice.
46,829✔
2282
        acc.lmu.RLock()
46,829✔
2283
        defer acc.lmu.RUnlock()
46,829✔
2284

46,829✔
2285
        // Do this once.
46,829✔
2286
        subject := string(sub.subject)
46,829✔
2287

46,829✔
2288
        // Walk the connected leafnodes.
46,829✔
2289
        for _, ln := range acc.lleafs {
103,224✔
2290
                if ln == sub.client {
85,445✔
2291
                        continue
29,050✔
2292
                }
2293
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2294
                ln.mu.Lock()
27,345✔
2295
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
27,345✔
2296
                // the detection of loops as long as different cluster.
27,345✔
2297
                clusterDifferent := cluster != ln.remoteCluster()
27,345✔
2298
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
50,477✔
2299
                        ln.updateSmap(sub, delta, isLDS)
23,132✔
2300
                }
23,132✔
2301
                ln.mu.Unlock()
27,345✔
2302
        }
2303
}
2304

2305
// This will make an update to our internal smap and determine if we should send out
2306
// an interest update to the remote side.
2307
// Lock should be held.
2308
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,132✔
2309
        if c.leaf.smap == nil {
23,143✔
2310
                return
11✔
2311
        }
11✔
2312

2313
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2314
        skind := sub.client.kind
23,121✔
2315
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,121✔
2316
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
31,808✔
2317
                return
8,687✔
2318
        }
8,687✔
2319

2320
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2321
        if delta > 0 && c.leaf.tsub != nil {
21,348✔
2322
                if _, present := c.leaf.tsub[sub]; present {
6,914✔
2323
                        delete(c.leaf.tsub, sub)
×
2324
                        if len(c.leaf.tsub) == 0 {
×
2325
                                c.leaf.tsub = nil
×
2326
                                c.leaf.tsubt.Stop()
×
2327
                                c.leaf.tsubt = nil
×
2328
                        }
×
2329
                        return
×
2330
                }
2331
        }
2332

2333
        key := keyFromSub(sub)
14,434✔
2334
        n, ok := c.leaf.smap[key]
14,434✔
2335
        if delta < 0 && !ok {
15,358✔
2336
                return
924✔
2337
        }
924✔
2338

2339
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2340
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,510✔
2341
        n += delta
13,510✔
2342
        if n > 0 {
23,472✔
2343
                c.leaf.smap[key] = n
9,962✔
2344
        } else {
13,510✔
2345
                delete(c.leaf.smap, key)
3,548✔
2346
        }
3,548✔
2347
        if update {
22,930✔
2348
                c.sendLeafNodeSubUpdate(key, n)
9,420✔
2349
        }
9,420✔
2350
}
2351

2352
// Used to force add subjects to the subject map.
2353
func (c *client) forceAddToSmap(subj string) {
4✔
2354
        c.mu.Lock()
4✔
2355
        defer c.mu.Unlock()
4✔
2356

4✔
2357
        if c.leaf.smap == nil {
4✔
2358
                return
×
2359
        }
×
2360
        n := c.leaf.smap[subj]
4✔
2361
        if n != 0 {
5✔
2362
                return
1✔
2363
        }
1✔
2364
        // Place into the map since it was not there.
2365
        c.leaf.smap[subj] = 1
3✔
2366
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2367
}
2368

2369
// Used to force remove a subject from the subject map.
2370
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2371
        c.mu.Lock()
1✔
2372
        defer c.mu.Unlock()
1✔
2373

1✔
2374
        if c.leaf.smap == nil {
1✔
2375
                return
×
2376
        }
×
2377
        n := c.leaf.smap[subj]
1✔
2378
        if n == 0 {
1✔
2379
                return
×
2380
        }
×
2381
        n--
1✔
2382
        if n == 0 {
2✔
2383
                // Remove is now zero
1✔
2384
                delete(c.leaf.smap, subj)
1✔
2385
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2386
        } else {
1✔
2387
                c.leaf.smap[subj] = n
×
2388
        }
×
2389
}
2390

2391
// Send the subscription interest change to the other side.
2392
// Lock should be held.
2393
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,424✔
2394
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,424✔
2395
        if c.isSpokeLeafNode() {
11,740✔
2396
                checkPerms := true
2,316✔
2397
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,680✔
2398
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,364✔
2399
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,364✔
2400
                                strings.HasPrefix(key, gwReplyPrefix) {
1,446✔
2401
                                checkPerms = false
82✔
2402
                        }
82✔
2403
                }
2404
                if checkPerms {
4,550✔
2405
                        var subject string
2,234✔
2406
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,726✔
2407
                                subject = key[:sep]
492✔
2408
                        } else {
2,234✔
2409
                                subject = key
1,742✔
2410
                        }
1,742✔
2411
                        if !c.canSubscribe(subject) {
2,234✔
2412
                                return
×
2413
                        }
×
2414
                }
2415
        }
2416
        // If we are here we can send over to the other side.
2417
        _b := [64]byte{}
9,424✔
2418
        b := bytes.NewBuffer(_b[:0])
9,424✔
2419
        c.writeLeafSub(b, key, n)
9,424✔
2420
        c.enqueueProto(b.Bytes())
9,424✔
2421
}
2422

2423
// Helper function to build the key.
2424
func keyFromSub(sub *subscription) string {
45,198✔
2425
        var sb strings.Builder
45,198✔
2426
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,198✔
2427
        sb.Write(sub.subject)
45,198✔
2428
        if sub.queue != nil {
49,059✔
2429
                // Just make the key subject spc group, e.g. 'foo bar'
3,861✔
2430
                sb.WriteByte(' ')
3,861✔
2431
                sb.Write(sub.queue)
3,861✔
2432
        }
3,861✔
2433
        return sb.String()
45,198✔
2434
}
2435

2436
const (
2437
        keyRoutedSub         = "R"
2438
        keyRoutedSubByte     = 'R'
2439
        keyRoutedLeafSub     = "L"
2440
        keyRoutedLeafSubByte = 'L'
2441
)
2442

2443
// Helper function to build the key that prevents collisions between normal
2444
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2445
// Keys will look like this:
2446
// "R foo"          -> plain routed sub on "foo"
2447
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2448
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2449
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2450
func keyFromSubWithOrigin(sub *subscription) string {
663,303✔
2451
        var sb strings.Builder
663,303✔
2452
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
663,303✔
2453
        leaf := len(sub.origin) > 0
663,303✔
2454
        if leaf {
680,028✔
2455
                sb.WriteByte(keyRoutedLeafSubByte)
16,725✔
2456
        } else {
663,303✔
2457
                sb.WriteByte(keyRoutedSubByte)
646,578✔
2458
        }
646,578✔
2459
        sb.WriteByte(' ')
663,303✔
2460
        sb.Write(sub.subject)
663,303✔
2461
        if sub.queue != nil {
687,072✔
2462
                sb.WriteByte(' ')
23,769✔
2463
                sb.Write(sub.queue)
23,769✔
2464
        }
23,769✔
2465
        if leaf {
680,028✔
2466
                sb.WriteByte(' ')
16,725✔
2467
                sb.Write(sub.origin)
16,725✔
2468
        }
16,725✔
2469
        return sb.String()
663,303✔
2470
}
2471

2472
// Lock should be held.
2473
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,566✔
2474
        if key == _EMPTY_ {
35,566✔
2475
                return
×
2476
        }
×
2477
        if n > 0 {
67,583✔
2478
                w.WriteString("LS+ " + key)
32,017✔
2479
                // Check for queue semantics, if found write n.
32,017✔
2480
                if strings.Contains(key, " ") {
34,349✔
2481
                        w.WriteString(" ")
2,332✔
2482
                        var b [12]byte
2,332✔
2483
                        var i = len(b)
2,332✔
2484
                        for l := n; l > 0; l /= 10 {
5,570✔
2485
                                i--
3,238✔
2486
                                b[i] = digits[l%10]
3,238✔
2487
                        }
3,238✔
2488
                        w.Write(b[i:])
2,332✔
2489
                        if c.trace {
2,332✔
2490
                                arg := fmt.Sprintf("%s %d", key, n)
×
2491
                                c.traceOutOp("LS+", []byte(arg))
×
2492
                        }
×
2493
                } else if c.trace {
29,881✔
2494
                        c.traceOutOp("LS+", []byte(key))
196✔
2495
                }
196✔
2496
        } else {
3,549✔
2497
                w.WriteString("LS- " + key)
3,549✔
2498
                if c.trace {
3,560✔
2499
                        c.traceOutOp("LS-", []byte(key))
11✔
2500
                }
11✔
2501
        }
2502
        w.WriteString(CR_LF)
35,566✔
2503
}
2504

2505
// processLeafSub will process an inbound sub request for the remote leaf node.
2506
func (c *client) processLeafSub(argo []byte) (err error) {
31,756✔
2507
        // Indicate activity.
31,756✔
2508
        c.in.subs++
31,756✔
2509

31,756✔
2510
        srv := c.srv
31,756✔
2511
        if srv == nil {
31,756✔
2512
                return nil
×
2513
        }
×
2514

2515
        // Copy so we do not reference a potentially large buffer
2516
        arg := make([]byte, len(argo))
31,756✔
2517
        copy(arg, argo)
31,756✔
2518

31,756✔
2519
        args := splitArg(arg)
31,756✔
2520
        sub := &subscription{client: c}
31,756✔
2521

31,756✔
2522
        delta := int32(1)
31,756✔
2523
        switch len(args) {
31,756✔
2524
        case 1:
29,457✔
2525
                sub.queue = nil
29,457✔
2526
        case 3:
2,298✔
2527
                sub.queue = args[1]
2,298✔
2528
                sub.qw = int32(parseSize(args[2]))
2,298✔
2529
                // TODO: (ik) We should have a non empty queue name and a queue
2,298✔
2530
                // weight >= 1. For 2.11, we may want to return an error if that
2,298✔
2531
                // is not the case, but for now just overwrite `delta` if queue
2,298✔
2532
                // weight is greater than 1 (it is possible after a reconnect/
2,298✔
2533
                // server restart to receive a queue weight > 1 for a new sub).
2,298✔
2534
                if sub.qw > 1 {
3,988✔
2535
                        delta = sub.qw
1,690✔
2536
                }
1,690✔
2537
        default:
1✔
2538
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
1✔
2539
        }
2540
        sub.subject = args[0]
31,755✔
2541

31,755✔
2542
        c.mu.Lock()
31,755✔
2543
        if c.isClosed() {
31,766✔
2544
                c.mu.Unlock()
11✔
2545
                return nil
11✔
2546
        }
11✔
2547

2548
        acc := c.acc
31,744✔
2549
        // Check if we have a loop.
31,744✔
2550
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,744✔
2551

31,744✔
2552
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,750✔
2553
                c.mu.Unlock()
6✔
2554
                c.handleLeafNodeLoop(true)
6✔
2555
                return nil
6✔
2556
        }
6✔
2557

2558
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2559
        checkPerms := true
31,738✔
2560
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,545✔
2561
                if ldsPrefix ||
28,807✔
2562
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,807✔
2563
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,764✔
2564
                        checkPerms = false
1,957✔
2565
                }
1,957✔
2566
        }
2567

2568
        // If we are a hub check that we can publish to this subject.
2569
        if checkPerms {
61,519✔
2570
                subj := string(sub.subject)
29,781✔
2571
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,119✔
2572
                        c.mu.Unlock()
338✔
2573
                        c.leafSubPermViolation(sub.subject)
338✔
2574
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
338✔
2575
                        return nil
338✔
2576
                }
338✔
2577
        }
2578

2579
        // Check if we have a maximum on the number of subscriptions.
2580
        if c.subsAtLimit() {
31,408✔
2581
                c.mu.Unlock()
8✔
2582
                c.maxSubsExceeded()
8✔
2583
                return nil
8✔
2584
        }
8✔
2585

2586
        // If we have an origin cluster associated mark that in the sub.
2587
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,684✔
2588
                sub.origin = []byte(rc)
28,292✔
2589
        }
28,292✔
2590

2591
        // Like Routes, we store local subs by account and subject and optionally queue name.
2592
        // If we have a queue it will have a trailing weight which we do not want.
2593
        if sub.queue != nil {
33,388✔
2594
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,996✔
2595
        } else {
31,392✔
2596
                sub.sid = arg
29,396✔
2597
        }
29,396✔
2598
        key := bytesToString(sub.sid)
31,392✔
2599
        osub := c.subs[key]
31,392✔
2600
        if osub == nil {
61,245✔
2601
                c.subs[key] = sub
29,853✔
2602
                // Now place into the account sl.
29,853✔
2603
                if err := acc.sl.Insert(sub); err != nil {
29,853✔
2604
                        delete(c.subs, key)
×
2605
                        c.mu.Unlock()
×
2606
                        c.Errorf("Could not insert subscription: %v", err)
×
2607
                        c.sendErr("Invalid Subscription")
×
2608
                        return nil
×
2609
                }
×
2610
        } else if sub.queue != nil {
3,077✔
2611
                // For a queue we need to update the weight.
1,538✔
2612
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,538✔
2613
                atomic.StoreInt32(&osub.qw, sub.qw)
1,538✔
2614
                acc.sl.UpdateRemoteQSub(osub)
1,538✔
2615
        }
1,538✔
2616
        spoke := c.isSpokeLeafNode()
31,392✔
2617
        c.mu.Unlock()
31,392✔
2618

31,392✔
2619
        // Only add in shadow subs if a new sub or qsub.
31,392✔
2620
        if osub == nil {
61,245✔
2621
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,853✔
2622
                        c.Errorf(err.Error())
×
2623
                }
×
2624
        }
2625

2626
        // If we are not solicited, treat leaf node subscriptions similar to a
2627
        // client subscription, meaning we forward them to routes, gateways and
2628
        // other leaf nodes as needed.
2629
        if !spoke {
42,415✔
2630
                // If we are routing add to the route map for the associated account.
11,023✔
2631
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,023✔
2632
                if srv.gateway.enabled {
12,549✔
2633
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,526✔
2634
                }
1,526✔
2635
        }
2636
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2637
        // and non-solicited state in this call so we will do the right thing.
2638
        acc.updateLeafNodes(sub, delta)
31,392✔
2639

31,392✔
2640
        return nil
31,392✔
2641
}
2642

2643
// If the leafnode is a solicited, set the connect delay based on default
2644
// or private option (for tests). Sends the error to the other side, log and
2645
// close the connection.
2646
func (c *client) handleLeafNodeLoop(sendErr bool) {
16✔
2647
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
16✔
2648
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
16✔
2649
        if sendErr {
24✔
2650
                c.sendErr(errTxt)
8✔
2651
        }
8✔
2652

2653
        c.Errorf(errTxt)
16✔
2654
        // If we are here with "sendErr" false, it means that this is the server
16✔
2655
        // that received the error. The other side will have closed the connection,
16✔
2656
        // but does not hurt to close here too.
16✔
2657
        c.closeConnection(ProtocolViolation)
16✔
2658
}
2659

2660
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2661
func (c *client) processLeafUnsub(arg []byte) error {
3,301✔
2662
        // Indicate any activity, so pub and sub or unsubs.
3,301✔
2663
        c.in.subs++
3,301✔
2664

3,301✔
2665
        acc := c.acc
3,301✔
2666
        srv := c.srv
3,301✔
2667

3,301✔
2668
        c.mu.Lock()
3,301✔
2669
        if c.isClosed() {
3,344✔
2670
                c.mu.Unlock()
43✔
2671
                return nil
43✔
2672
        }
43✔
2673

2674
        spoke := c.isSpokeLeafNode()
3,258✔
2675
        // We store local subs by account and subject and optionally queue name.
3,258✔
2676
        // LS- will have the arg exactly as the key.
3,258✔
2677
        sub, ok := c.subs[string(arg)]
3,258✔
2678
        if !ok {
3,273✔
2679
                // If not found, don't try to update routes/gws/leaf nodes.
15✔
2680
                c.mu.Unlock()
15✔
2681
                return nil
15✔
2682
        }
15✔
2683
        delta := int32(1)
3,243✔
2684
        if len(sub.queue) > 0 {
3,663✔
2685
                delta = sub.qw
420✔
2686
        }
420✔
2687
        c.mu.Unlock()
3,243✔
2688

3,243✔
2689
        c.unsubscribe(acc, sub, true, true)
3,243✔
2690
        if !spoke {
4,272✔
2691
                // If we are routing subtract from the route map for the associated account.
1,029✔
2692
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,029✔
2693
                // Gateways
1,029✔
2694
                if srv.gateway.enabled {
1,308✔
2695
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
279✔
2696
                }
279✔
2697
        }
2698
        // Now check on leafnode updates for other leaf nodes.
2699
        acc.updateLeafNodes(sub, -delta)
3,243✔
2700
        return nil
3,243✔
2701
}
2702

2703
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
475✔
2704
        // Unroll splitArgs to avoid runtime/heap issues
475✔
2705
        a := [MAX_MSG_ARGS][]byte{}
475✔
2706
        args := a[:0]
475✔
2707
        start := -1
475✔
2708
        for i, b := range arg {
31,277✔
2709
                switch b {
30,802✔
2710
                case ' ', '\t', '\r', '\n':
1,357✔
2711
                        if start >= 0 {
2,714✔
2712
                                args = append(args, arg[start:i])
1,357✔
2713
                                start = -1
1,357✔
2714
                        }
1,357✔
2715
                default:
29,445✔
2716
                        if start < 0 {
31,277✔
2717
                                start = i
1,832✔
2718
                        }
1,832✔
2719
                }
2720
        }
2721
        if start >= 0 {
950✔
2722
                args = append(args, arg[start:])
475✔
2723
        }
475✔
2724

2725
        c.pa.arg = arg
475✔
2726
        switch len(args) {
475✔
2727
        case 0, 1, 2:
×
2728
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2729
        case 3:
86✔
2730
                c.pa.reply = nil
86✔
2731
                c.pa.queues = nil
86✔
2732
                c.pa.hdb = args[1]
86✔
2733
                c.pa.hdr = parseSize(args[1])
86✔
2734
                c.pa.szb = args[2]
86✔
2735
                c.pa.size = parseSize(args[2])
86✔
2736
        case 4:
375✔
2737
                c.pa.reply = args[1]
375✔
2738
                c.pa.queues = nil
375✔
2739
                c.pa.hdb = args[2]
375✔
2740
                c.pa.hdr = parseSize(args[2])
375✔
2741
                c.pa.szb = args[3]
375✔
2742
                c.pa.size = parseSize(args[3])
375✔
2743
        default:
14✔
2744
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2745
                if len(args[1]) != 1 {
14✔
2746
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2747
                }
×
2748
                switch args[1][0] {
14✔
2749
                case '+':
4✔
2750
                        c.pa.reply = args[2]
4✔
2751
                case '|':
10✔
2752
                        c.pa.reply = nil
10✔
2753
                default:
×
2754
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2755
                }
2756
                // Grab header size.
2757
                c.pa.hdb = args[len(args)-2]
14✔
2758
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2759

14✔
2760
                // Grab size.
14✔
2761
                c.pa.szb = args[len(args)-1]
14✔
2762
                c.pa.size = parseSize(c.pa.szb)
14✔
2763

14✔
2764
                // Grab queue names.
14✔
2765
                if c.pa.reply != nil {
18✔
2766
                        c.pa.queues = args[3 : len(args)-2]
4✔
2767
                } else {
14✔
2768
                        c.pa.queues = args[2 : len(args)-2]
10✔
2769
                }
10✔
2770
        }
2771
        if c.pa.hdr < 0 {
475✔
2772
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2773
        }
×
2774
        if c.pa.size < 0 {
475✔
2775
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2776
        }
×
2777
        if c.pa.hdr > c.pa.size {
476✔
2778
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
1✔
2779
        }
1✔
2780

2781
        // Common ones processed after check for arg length
2782
        c.pa.subject = args[0]
474✔
2783

474✔
2784
        return nil
474✔
2785
}
2786

2787
func (c *client) processLeafMsgArgs(arg []byte) error {
109,648✔
2788
        // Unroll splitArgs to avoid runtime/heap issues
109,648✔
2789
        a := [MAX_MSG_ARGS][]byte{}
109,648✔
2790
        args := a[:0]
109,648✔
2791
        start := -1
109,648✔
2792
        for i, b := range arg {
3,523,922✔
2793
                switch b {
3,414,274✔
2794
                case ' ', '\t', '\r', '\n':
161,035✔
2795
                        if start >= 0 {
322,070✔
2796
                                args = append(args, arg[start:i])
161,035✔
2797
                                start = -1
161,035✔
2798
                        }
161,035✔
2799
                default:
3,253,239✔
2800
                        if start < 0 {
3,523,922✔
2801
                                start = i
270,683✔
2802
                        }
270,683✔
2803
                }
2804
        }
2805
        if start >= 0 {
219,296✔
2806
                args = append(args, arg[start:])
109,648✔
2807
        }
109,648✔
2808

2809
        c.pa.arg = arg
109,648✔
2810
        switch len(args) {
109,648✔
2811
        case 0, 1:
×
2812
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2813
        case 2:
80,976✔
2814
                c.pa.reply = nil
80,976✔
2815
                c.pa.queues = nil
80,976✔
2816
                c.pa.szb = args[1]
80,976✔
2817
                c.pa.size = parseSize(args[1])
80,976✔
2818
        case 3:
6,120✔
2819
                c.pa.reply = args[1]
6,120✔
2820
                c.pa.queues = nil
6,120✔
2821
                c.pa.szb = args[2]
6,120✔
2822
                c.pa.size = parseSize(args[2])
6,120✔
2823
        default:
22,552✔
2824
                // args[1] is our reply indicator. Should be + or | normally.
22,552✔
2825
                if len(args[1]) != 1 {
22,553✔
2826
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
1✔
2827
                }
1✔
2828
                switch args[1][0] {
22,551✔
2829
                case '+':
162✔
2830
                        c.pa.reply = args[2]
162✔
2831
                case '|':
22,389✔
2832
                        c.pa.reply = nil
22,389✔
2833
                default:
×
2834
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2835
                }
2836
                // Grab size.
2837
                c.pa.szb = args[len(args)-1]
22,551✔
2838
                c.pa.size = parseSize(c.pa.szb)
22,551✔
2839

22,551✔
2840
                // Grab queue names.
22,551✔
2841
                if c.pa.reply != nil {
22,713✔
2842
                        c.pa.queues = args[3 : len(args)-1]
162✔
2843
                } else {
22,551✔
2844
                        c.pa.queues = args[2 : len(args)-1]
22,389✔
2845
                }
22,389✔
2846
        }
2847
        if c.pa.size < 0 {
109,647✔
2848
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2849
        }
×
2850

2851
        // Common ones processed after check for arg length
2852
        c.pa.subject = args[0]
109,647✔
2853

109,647✔
2854
        return nil
109,647✔
2855
}
2856

2857
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2858
func (c *client) processInboundLeafMsg(msg []byte) {
108,114✔
2859
        // Update statistics
108,114✔
2860
        // The msg includes the CR_LF, so pull back out for accounting.
108,114✔
2861
        c.in.msgs++
108,114✔
2862
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
108,114✔
2863

108,114✔
2864
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
108,114✔
2865

108,114✔
2866
        // Mostly under testing scenarios.
108,114✔
2867
        if srv == nil || acc == nil {
108,116✔
2868
                return
2✔
2869
        }
2✔
2870

2871
        // Match the subscriptions. We will use our own L1 map if
2872
        // it's still valid, avoiding contention on the shared sublist.
2873
        var r *SublistResult
108,112✔
2874
        var ok bool
108,112✔
2875

108,112✔
2876
        genid := atomic.LoadUint64(&c.acc.sl.genid)
108,112✔
2877
        if genid == c.in.genid && c.in.results != nil {
213,850✔
2878
                r, ok = c.in.results[subject]
105,738✔
2879
        } else {
108,112✔
2880
                // Reset our L1 completely.
2,374✔
2881
                c.in.results = make(map[string]*SublistResult)
2,374✔
2882
                c.in.genid = genid
2,374✔
2883
        }
2,374✔
2884

2885
        // Go back to the sublist data structure.
2886
        if !ok {
186,250✔
2887
                r = c.acc.sl.Match(subject)
78,138✔
2888
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
78,138✔
2889
                if len(c.in.results) >= maxResultCacheSize {
80,334✔
2890
                        n := 0
2,196✔
2891
                        for subj := range c.in.results {
74,664✔
2892
                                delete(c.in.results, subj)
72,468✔
2893
                                if n++; n > pruneSize {
74,664✔
2894
                                        break
2,196✔
2895
                                }
2896
                        }
2897
                }
2898
                // Then add the new cache entry.
2899
                c.in.results[subject] = r
78,138✔
2900
        }
2901

2902
        // Collect queue names if needed.
2903
        var qnames [][]byte
108,112✔
2904

108,112✔
2905
        // Check for no interest, short circuit if so.
108,112✔
2906
        // This is the fanout scale.
108,112✔
2907
        if len(r.psubs)+len(r.qsubs) > 0 {
215,734✔
2908
                flag := pmrNoFlag
107,622✔
2909
                // If we have queue subs in this cluster, then if we run in gateway
107,622✔
2910
                // mode and the remote gateways have queue subs, then we need to
107,622✔
2911
                // collect the queue groups this message was sent to so that we
107,622✔
2912
                // exclude them when sending to gateways.
107,622✔
2913
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
107,622✔
2914
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
119,943✔
2915
                        flag |= pmrCollectQueueNames
12,321✔
2916
                }
12,321✔
2917
                // If this is a mapped subject that means the mapped interest
2918
                // is what got us here, but this might not have a queue designation
2919
                // If that is the case, make sure we ignore to process local queue subscribers.
2920
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
107,932✔
2921
                        flag |= pmrIgnoreEmptyQueueFilter
310✔
2922
                }
310✔
2923
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
107,622✔
2924
        }
2925

2926
        // Now deal with gateways
2927
        if c.srv.gateway.enabled {
121,502✔
2928
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,390✔
2929
        }
13,390✔
2930
}
2931

2932
// Handles a subscription permission violation.
2933
// See leafPermViolation() for details.
2934
func (c *client) leafSubPermViolation(subj []byte) {
338✔
2935
        c.leafPermViolation(false, subj)
338✔
2936
}
338✔
2937

2938
// Common function to process publish or subscribe leafnode permission violation.
2939
// Sends the permission violation error to the remote, logs it and closes the connection.
2940
// If this is from a server soliciting, the reconnection will be delayed.
2941
func (c *client) leafPermViolation(pub bool, subj []byte) {
338✔
2942
        if c.isSpokeLeafNode() {
676✔
2943
                // For spokes these are no-ops since the hub server told us our permissions.
338✔
2944
                // We just need to not send these over to the other side since we will get cutoff.
338✔
2945
                return
338✔
2946
        }
338✔
2947
        // FIXME(dlc) ?
2948
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2949
        var action string
×
2950
        if pub {
×
2951
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2952
                action = "Publish"
×
2953
        } else {
×
2954
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2955
                action = "Subscription"
×
2956
        }
×
2957
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2958
        // TODO: add a new close reason that is more appropriate?
×
2959
        c.closeConnection(ProtocolViolation)
×
2960
}
2961

2962
// Invoked from generic processErr() for LEAF connections.
2963
func (c *client) leafProcessErr(errStr string) {
33✔
2964
        // Check if we got a cluster name collision.
33✔
2965
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
36✔
2966
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
2967
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
2968
                return
3✔
2969
        }
3✔
2970

2971
        // We will look for Loop detected error coming from the other side.
2972
        // If we solicit, set the connect delay.
2973
        if !strings.Contains(errStr, "Loop detected") {
52✔
2974
                return
22✔
2975
        }
22✔
2976
        c.handleLeafNodeLoop(false)
8✔
2977
}
2978

2979
// If this leaf connection solicits, sets the connect delay to the given value,
2980
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2981
// Returns the connection's account name and delay.
2982
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
19✔
2983
        c.mu.Lock()
19✔
2984
        if c.isSolicitedLeafNode() {
30✔
2985
                if s := c.srv; s != nil {
22✔
2986
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
16✔
2987
                                delay = srvdelay
5✔
2988
                        }
5✔
2989
                }
2990
                c.leaf.remote.setConnectDelay(delay)
11✔
2991
        }
2992
        accName := c.acc.Name
19✔
2993
        c.mu.Unlock()
19✔
2994
        return accName, delay
19✔
2995
}
2996

2997
// For the given remote Leafnode configuration, this function returns
2998
// if TLS is required, and if so, will return a clone of the TLS Config
2999
// (since some fields will be changed during handshake), the TLS server
3000
// name that is remembered, and the TLS timeout.
3001
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,840✔
3002
        var (
1,840✔
3003
                tlsConfig  *tls.Config
1,840✔
3004
                tlsName    string
1,840✔
3005
                tlsTimeout float64
1,840✔
3006
        )
1,840✔
3007

1,840✔
3008
        remote.RLock()
1,840✔
3009
        defer remote.RUnlock()
1,840✔
3010

1,840✔
3011
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,840✔
3012
        if tlsRequired {
1,918✔
3013
                if remote.TLSConfig != nil {
129✔
3014
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3015
                } else {
78✔
3016
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
27✔
3017
                }
27✔
3018
                tlsName = remote.tlsName
78✔
3019
                tlsTimeout = remote.TLSTimeout
78✔
3020
                if tlsTimeout == 0 {
122✔
3021
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
44✔
3022
                }
44✔
3023
        }
3024

3025
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,840✔
3026
}
3027

3028
// Initiates the LeafNode Websocket connection by:
3029
// - doing the TLS handshake if needed
3030
// - sending the HTTP request
3031
// - waiting for the HTTP response
3032
//
3033
// Since some bufio reader is used to consume the HTTP response, this function
3034
// returns the slice of buffered bytes (if any) so that the readLoop that will
3035
// be started after that consume those first before reading from the socket.
3036
// The boolean
3037
//
3038
// Lock held on entry.
3039
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
3040
        remote.RLock()
43✔
3041
        compress := remote.Websocket.Compression
43✔
3042
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
3043
        noMasking := remote.Websocket.NoMasking
43✔
3044
        infoTimeout := remote.FirstInfoTimeout
43✔
3045
        remote.RUnlock()
43✔
3046
        // Will do the client-side TLS handshake if needed.
43✔
3047
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
3048
        if err != nil {
47✔
3049
                // 0 will indicate that the connection was already closed
4✔
3050
                return nil, 0, err
4✔
3051
        }
4✔
3052

3053
        // For http request, we need the passed URL to contain either http or https scheme.
3054
        scheme := "http"
39✔
3055
        if tlsRequired {
47✔
3056
                scheme = "https"
8✔
3057
        }
8✔
3058
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3059
        // create a LEAF connection, not a CLIENT.
3060
        // In case we use the user's URL path in the future, make sure we append the user's
3061
        // path to our `/leafnode` path.
3062
        lpath := leafNodeWSPath
39✔
3063
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
3064
                if curPath[0] == '/' {
42✔
3065
                        curPath = curPath[1:]
21✔
3066
                }
21✔
3067
                lpath = path.Join(curPath, lpath)
21✔
3068
        } else {
18✔
3069
                lpath = lpath[1:]
18✔
3070
        }
18✔
3071
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3072
        u, _ := url.Parse(ustr)
39✔
3073
        req := &http.Request{
39✔
3074
                Method:     "GET",
39✔
3075
                URL:        u,
39✔
3076
                Proto:      "HTTP/1.1",
39✔
3077
                ProtoMajor: 1,
39✔
3078
                ProtoMinor: 1,
39✔
3079
                Header:     make(http.Header),
39✔
3080
                Host:       u.Host,
39✔
3081
        }
39✔
3082
        wsKey, err := wsMakeChallengeKey()
39✔
3083
        if err != nil {
39✔
3084
                return nil, WriteError, err
×
3085
        }
×
3086

3087
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3088
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3089
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3090
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3091
        if compress {
48✔
3092
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3093
        }
9✔
3094
        if noMasking {
49✔
3095
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3096
        }
10✔
3097
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3098
        if err := req.Write(c.nc); err != nil {
39✔
3099
                return nil, WriteError, err
×
3100
        }
×
3101

3102
        var resp *http.Response
39✔
3103

39✔
3104
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3105
        resp, err = http.ReadResponse(br, req)
39✔
3106
        if err == nil &&
39✔
3107
                (resp.StatusCode != 101 ||
39✔
3108
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3109
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3110
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3111

1✔
3112
                err = fmt.Errorf("invalid websocket connection")
1✔
3113
        }
1✔
3114
        // Check compression extension...
3115
        if err == nil && c.ws.compress {
48✔
3116
                // Check that not only permessage-deflate extension is present, but that
9✔
3117
                // we also have server and client no context take over.
9✔
3118
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3119

9✔
3120
                // If server does not support compression, then simply disable it in our side.
9✔
3121
                if !srvCompress {
13✔
3122
                        c.ws.compress = false
4✔
3123
                } else if !noCtxTakeover {
9✔
3124
                        err = fmt.Errorf("compression negotiation error")
×
3125
                }
×
3126
        }
3127
        // Same for no masking...
3128
        if err == nil && noMasking {
49✔
3129
                // Check if server accepts no masking
10✔
3130
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3131
                        // Nope, need to mask our writes as any client would do.
1✔
3132
                        c.ws.maskwrite = true
1✔
3133
                }
1✔
3134
        }
3135
        if resp != nil {
67✔
3136
                resp.Body.Close()
28✔
3137
        }
28✔
3138
        if err != nil {
51✔
3139
                return nil, ReadError, err
12✔
3140
        }
12✔
3141
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3142

27✔
3143
        var preBuf []byte
27✔
3144
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3145
        if n := br.Buffered(); n != 0 {
27✔
3146
                preBuf, _ = br.Peek(n)
×
3147
        }
×
3148
        return preBuf, 0, nil
27✔
3149
}
3150

3151
const connectProcessTimeout = 2 * time.Second
3152

3153
// This is invoked for remote LEAF remote connections after processing the INFO
3154
// protocol.
3155
func (s *Server) leafNodeResumeConnectProcess(c *client) {
643✔
3156
        clusterName := s.ClusterName()
643✔
3157

643✔
3158
        c.mu.Lock()
643✔
3159
        if c.isClosed() {
643✔
3160
                c.mu.Unlock()
×
3161
                return
×
3162
        }
×
3163
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
645✔
3164
                c.mu.Unlock()
2✔
3165
                c.closeConnection(WriteError)
2✔
3166
                return
2✔
3167
        }
2✔
3168

3169
        // Spin up the write loop.
3170
        s.startGoRoutine(func() { c.writeLoop() })
1,282✔
3171

3172
        // timeout leafNodeFinishConnectProcess
3173
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
641✔
3174
                c.mu.Lock()
×
3175
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3176
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3177
                        c.mu.Unlock()
×
3178
                        return
×
3179
                }
×
3180
                clearTimer(&c.ping.tmr)
×
3181
                closed := c.isClosed()
×
3182
                c.mu.Unlock()
×
3183
                if !closed {
×
3184
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3185
                        c.closeConnection(StaleConnection)
×
3186
                }
×
3187
        })
3188
        c.mu.Unlock()
641✔
3189
        c.Debugf("Remote leafnode connect msg sent")
641✔
3190
}
3191

3192
// This is invoked for remote LEAF connections after processing the INFO
3193
// protocol and leafNodeResumeConnectProcess.
3194
// This will send LS+ the CONNECT protocol and register the leaf node.
3195
func (s *Server) leafNodeFinishConnectProcess(c *client) {
620✔
3196
        c.mu.Lock()
620✔
3197
        if !c.flags.setIfNotSet(connectProcessFinished) {
620✔
3198
                c.mu.Unlock()
×
3199
                return
×
3200
        }
×
3201
        if c.isClosed() {
620✔
3202
                c.mu.Unlock()
×
3203
                s.removeLeafNodeConnection(c)
×
3204
                return
×
3205
        }
×
3206
        remote := c.leaf.remote
620✔
3207
        // Check if we will need to send the system connect event.
620✔
3208
        remote.RLock()
620✔
3209
        sendSysConnectEvent := remote.Hub
620✔
3210
        remote.RUnlock()
620✔
3211

620✔
3212
        // Capture account before releasing lock
620✔
3213
        acc := c.acc
620✔
3214
        // cancel connectProcessTimeout
620✔
3215
        clearTimer(&c.ping.tmr)
620✔
3216
        c.mu.Unlock()
620✔
3217

620✔
3218
        // Make sure we register with the account here.
620✔
3219
        if err := c.registerWithAccount(acc); err != nil {
622✔
3220
                if err == ErrTooManyAccountConnections {
2✔
3221
                        c.maxAccountConnExceeded()
×
3222
                        return
×
3223
                } else if err == ErrLeafNodeLoop {
4✔
3224
                        c.handleLeafNodeLoop(true)
2✔
3225
                        return
2✔
3226
                }
2✔
3227
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3228
                c.closeConnection(ProtocolViolation)
×
3229
                return
×
3230
        }
3231
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
618✔
3232
        s.initLeafNodeSmapAndSendSubs(c)
618✔
3233
        if sendSysConnectEvent {
624✔
3234
                s.sendLeafNodeConnect(acc)
6✔
3235
        }
6✔
3236

3237
        // The above functions are not atomically under the client
3238
        // lock doing those operations. It is possible - since we
3239
        // have started the read/write loops - that the connection
3240
        // is closed before or in between. This would leave the
3241
        // closed LN connection possible registered with the account
3242
        // and/or the server's leafs map. So check if connection
3243
        // is closed, and if so, manually cleanup.
3244
        c.mu.Lock()
618✔
3245
        closed := c.isClosed()
618✔
3246
        if !closed {
1,236✔
3247
                c.setFirstPingTimer()
618✔
3248
        }
618✔
3249
        c.mu.Unlock()
618✔
3250
        if closed {
618✔
3251
                s.removeLeafNodeConnection(c)
×
3252
                if prev := acc.removeClient(c); prev == 1 {
×
3253
                        s.decActiveAccounts()
×
3254
                }
×
3255
        }
3256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc