• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13150392206

04 Feb 2025 08:00PM UTC coverage: 85.523% (-0.01%) from 85.537%
13150392206

push

github

web-flow
Add `sliceHeader` for zero-copy parsing of message headers, use for client info (#6453)

In the various places that we are handling client info from headers,
like when queuing and pulling from the JS API queue or due to the
service export, we could safely refer to the underlying header slice,
instead of making a copy as `getHeader()` does today.

This PR adds a new `sliceHeader()` that merely slices the input header
instead, reducing copies considerably in any system where there is heavy
usage of JS API requests.

Signed-off-by: Neil Twigg <neil@nats.io>

69041 of 80728 relevant lines covered (85.52%)

787175.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.36
/server/leafnode.go
1
// Copyright 2019-2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
1,961✔
120
        return c.kind == LEAF && c.leaf.remote != nil
1,961✔
121
}
1,961✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
14,994,371✔
126
        return c.kind == LEAF && c.leaf.isSpoke
14,994,371✔
127
}
14,994,371✔
128

129
func (c *client) isHubLeafNode() bool {
17,697✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
17,697✔
131
}
17,697✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
531✔
135
        sysAccName := _EMPTY_
531✔
136
        sAcc := s.SystemAccount()
531✔
137
        if sAcc != nil {
1,039✔
138
                sysAccName = sAcc.Name
508✔
139
        }
508✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,198✔
141
                s.mu.Lock()
667✔
142
                remote := newLeafNodeCfg(r)
667✔
143
                creds := remote.Credentials
667✔
144
                accName := remote.LocalAccount
667✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
667✔
146
                // Print notice if
667✔
147
                if isSysAccRemote {
761✔
148
                        if len(remote.DenyExports) > 0 {
95✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
95✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
667✔
156
                if creds != _EMPTY_ {
710✔
157
                        contents, err := os.ReadFile(creds)
43✔
158
                        defer wipeSlice(contents)
43✔
159
                        if err != nil {
43✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
43✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
43✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
43✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
47✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
39✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
45✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
667✔
178
        }
179
        for _, r := range remotes {
1,198✔
180
                remote := addRemote(r, r.LocalAccount == sysAccName)
667✔
181
                s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,334✔
182
        }
183
}
184

185
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
1,773✔
186
        for _, ri := range s.getOpts().LeafNode.Remotes {
3,926✔
187
                // FIXME(dlc) - What about auth changes?
2,153✔
188
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
3,926✔
189
                        return true
1,773✔
190
                }
1,773✔
191
        }
192
        return false
×
193
}
194

195
// Ensure that leafnode is properly configured.
196
func validateLeafNode(o *Options) error {
6,523✔
197
        if err := validateLeafNodeAuthOptions(o); err != nil {
6,525✔
198
                return err
2✔
199
        }
2✔
200

201
        // Users can bind to any local account, if its empty we will assume the $G account.
202
        for _, r := range o.LeafNode.Remotes {
7,212✔
203
                if r.LocalAccount == _EMPTY_ {
1,112✔
204
                        r.LocalAccount = globalAccountName
421✔
205
                }
421✔
206
        }
207

208
        // In local config mode, check that leafnode configuration refers to accounts that exist.
209
        if len(o.TrustedOperators) == 0 {
12,744✔
210
                accNames := map[string]struct{}{}
6,223✔
211
                for _, a := range o.Accounts {
11,379✔
212
                        accNames[a.Name] = struct{}{}
5,156✔
213
                }
5,156✔
214
                // global account is always created
215
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
6,223✔
216
                // in the context of leaf nodes, empty account means global account
6,223✔
217
                accNames[_EMPTY_] = struct{}{}
6,223✔
218
                // system account either exists or, if not disabled, will be created
6,223✔
219
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
10,839✔
220
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
4,616✔
221
                }
4,616✔
222
                checkAccountExists := func(accName string, cfgType string) error {
13,142✔
223
                        if _, ok := accNames[accName]; !ok {
6,921✔
224
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
225
                        }
2✔
226
                        return nil
6,917✔
227
                }
228
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
6,224✔
229
                        return err
1✔
230
                }
1✔
231
                for _, lu := range o.LeafNode.Users {
6,231✔
232
                        if lu.Account == nil { // means global account
12✔
233
                                continue
3✔
234
                        }
235
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
6✔
236
                                return err
×
237
                        }
×
238
                }
239
                for _, r := range o.LeafNode.Remotes {
6,912✔
240
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
691✔
241
                                return err
1✔
242
                        }
1✔
243
                }
244
        } else {
298✔
245
                if len(o.LeafNode.Users) != 0 {
299✔
246
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
247
                }
1✔
248
                for _, r := range o.LeafNode.Remotes {
298✔
249
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
250
                                return fmt.Errorf(
1✔
251
                                        "operator mode requires account nkeys in remotes. " +
1✔
252
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
253
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
254
                        }
1✔
255
                }
256
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
297✔
257
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
258
                }
1✔
259
        }
260

261
        // Validate compression settings
262
        if o.LeafNode.Compression.Mode != _EMPTY_ {
9,830✔
263
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,319✔
264
                        return err
5✔
265
                }
5✔
266
        }
267

268
        // If a remote has a websocket scheme, all need to have it.
269
        for _, rcfg := range o.LeafNode.Remotes {
7,200✔
270
                if len(rcfg.URLs) >= 2 {
897✔
271
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
208✔
272
                        for i := 1; i < len(rcfg.URLs); i++ {
661✔
273
                                u := rcfg.URLs[i]
453✔
274
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
460✔
275
                                        ok = false
7✔
276
                                        break
7✔
277
                                }
278
                        }
279
                        if !ok {
215✔
280
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
281
                        }
7✔
282
                }
283
                // Validate compression settings
284
                if rcfg.Compression.Mode != _EMPTY_ {
1,364✔
285
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
687✔
286
                                return err
5✔
287
                        }
5✔
288
                }
289
        }
290

291
        if o.LeafNode.Port == 0 {
10,275✔
292
                return nil
3,776✔
293
        }
3,776✔
294

295
        // If MinVersion is defined, check that it is valid.
296
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
2,727✔
297
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        // The checks below will be done only when detecting that we are configured
303
        // with gateways. So if an option validation needs to be done regardless,
304
        // it MUST be done before this point!
305

306
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
4,777✔
307
                return nil
2,056✔
308
        }
2,056✔
309
        // If we are here we have both leaf nodes and gateways defined, make sure there
310
        // is a system account defined.
311
        if o.SystemAccount == _EMPTY_ {
666✔
312
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
313
        }
1✔
314
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
664✔
315
                return fmt.Errorf("leafnode: %v", err)
×
316
        }
×
317
        return nil
664✔
318
}
319

320
func checkLeafMinVersionConfig(mv string) error {
8✔
321
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
322
                if err != nil {
6✔
323
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
324
                } else {
4✔
325
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
326
                }
2✔
327
        }
328
        return nil
4✔
329
}
330

331
// Used to validate user names in LeafNode configuration.
332
// - rejects mix of single and multiple users.
333
// - rejects duplicate user names.
334
func validateLeafNodeAuthOptions(o *Options) error {
6,564✔
335
        if len(o.LeafNode.Users) == 0 {
13,109✔
336
                return nil
6,545✔
337
        }
6,545✔
338
        if o.LeafNode.Username != _EMPTY_ {
21✔
339
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
340
        }
2✔
341
        if o.LeafNode.Nkey != _EMPTY_ {
17✔
342
                return fmt.Errorf("can not have a single nkey and a users array")
×
343
        }
×
344
        users := map[string]struct{}{}
17✔
345
        for _, u := range o.LeafNode.Users {
40✔
346
                if _, exists := users[u.Username]; exists {
25✔
347
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
348
                }
2✔
349
                users[u.Username] = struct{}{}
21✔
350
        }
351
        return nil
15✔
352
}
353

354
// Update remote LeafNode TLS configurations after a config reload.
355
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
10✔
356
        max := len(opts.LeafNode.Remotes)
10✔
357
        if max == 0 {
10✔
358
                return
×
359
        }
×
360

361
        s.mu.RLock()
10✔
362
        defer s.mu.RUnlock()
10✔
363

10✔
364
        // Changes in the list of remote leaf nodes is not supported.
10✔
365
        // However, make sure that we don't go over the arrays.
10✔
366
        if len(s.leafRemoteCfgs) < max {
10✔
367
                max = len(s.leafRemoteCfgs)
×
368
        }
×
369
        for i := 0; i < max; i++ {
20✔
370
                ro := opts.LeafNode.Remotes[i]
10✔
371
                cfg := s.leafRemoteCfgs[i]
10✔
372
                if ro.TLSConfig != nil {
12✔
373
                        cfg.Lock()
2✔
374
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
375
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
376
                        cfg.Unlock()
2✔
377
                }
2✔
378
        }
379
}
380

381
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
227✔
382
        delay := s.getOpts().LeafNode.ReconnectInterval
227✔
383
        select {
227✔
384
        case <-time.After(delay):
178✔
385
        case <-s.quitCh:
49✔
386
                s.grWG.Done()
49✔
387
                return
49✔
388
        }
389
        s.connectToRemoteLeafNode(remote, false)
178✔
390
}
391

392
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
393
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
667✔
394
        cfg := &leafNodeCfg{
667✔
395
                RemoteLeafOpts: remote,
667✔
396
                urls:           make([]*url.URL, 0, len(remote.URLs)),
667✔
397
        }
667✔
398
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
675✔
399
                perms := &Permissions{}
8✔
400
                if len(remote.DenyExports) > 0 {
16✔
401
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
402
                }
8✔
403
                if len(remote.DenyImports) > 0 {
15✔
404
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
405
                }
7✔
406
                cfg.perms = perms
8✔
407
        }
408
        // Start with the one that is configured. We will add to this
409
        // array when receiving async leafnode INFOs.
410
        cfg.urls = append(cfg.urls, cfg.URLs...)
667✔
411
        // If allowed to randomize, do it on our copy of URLs
667✔
412
        if !remote.NoRandomize {
1,332✔
413
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,075✔
414
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
410✔
415
                })
410✔
416
        }
417
        // If we are TLS make sure we save off a proper servername if possible.
418
        // Do same for user/password since we may need them to connect to
419
        // a bare URL that we get from INFO protocol.
420
        for _, u := range cfg.urls {
1,774✔
421
                cfg.saveTLSHostname(u)
1,107✔
422
                cfg.saveUserPassword(u)
1,107✔
423
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,107✔
424
                // config, mark that we should be using TLS anyway.
1,107✔
425
                if !cfg.TLS && isWSSURL(u) {
1,108✔
426
                        cfg.TLS = true
1✔
427
                }
1✔
428
        }
429
        return cfg
667✔
430
}
431

432
// Will pick an URL from the list of available URLs.
433
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
1,024✔
434
        cfg.Lock()
1,024✔
435
        defer cfg.Unlock()
1,024✔
436
        // If the current URL is the first in the list and we have more than
1,024✔
437
        // one URL, then move that one to end of the list.
1,024✔
438
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
1,180✔
439
                first := cfg.urls[0]
156✔
440
                copy(cfg.urls, cfg.urls[1:])
156✔
441
                cfg.urls[len(cfg.urls)-1] = first
156✔
442
        }
156✔
443
        cfg.curURL = cfg.urls[0]
1,024✔
444
        return cfg.curURL
1,024✔
445
}
446

447
// Returns the current URL
448
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
86✔
449
        cfg.RLock()
86✔
450
        defer cfg.RUnlock()
86✔
451
        return cfg.curURL
86✔
452
}
86✔
453

454
// Returns how long the server should wait before attempting
455
// to solicit a remote leafnode connection.
456
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
845✔
457
        cfg.RLock()
845✔
458
        delay := cfg.connDelay
845✔
459
        cfg.RUnlock()
845✔
460
        return delay
845✔
461
}
845✔
462

463
// Sets the connect delay.
464
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
155✔
465
        cfg.Lock()
155✔
466
        cfg.connDelay = delay
155✔
467
        cfg.Unlock()
155✔
468
}
155✔
469

470
// Ensure that non-exported options (used in tests) have
471
// been properly set.
472
func (s *Server) setLeafNodeNonExportedOptions() {
6,132✔
473
        opts := s.getOpts()
6,132✔
474
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,132✔
475
        if s.leafNodeOpts.dialTimeout == 0 {
12,263✔
476
                // Use same timeouts as routes for now.
6,131✔
477
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,131✔
478
        }
6,131✔
479
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,132✔
480
        if s.leafNodeOpts.resolver == nil {
12,260✔
481
                s.leafNodeOpts.resolver = net.DefaultResolver
6,128✔
482
        }
6,128✔
483
}
484

485
const sharedSysAccDelay = 250 * time.Millisecond
486

487
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
845✔
488
        defer s.grWG.Done()
845✔
489

845✔
490
        if remote == nil || len(remote.URLs) == 0 {
845✔
491
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
492
                return
×
493
        }
×
494

495
        opts := s.getOpts()
845✔
496
        reconnectDelay := opts.LeafNode.ReconnectInterval
845✔
497
        s.mu.RLock()
845✔
498
        dialTimeout := s.leafNodeOpts.dialTimeout
845✔
499
        resolver := s.leafNodeOpts.resolver
845✔
500
        var isSysAcc bool
845✔
501
        if s.eventsEnabled() {
1,655✔
502
                isSysAcc = remote.LocalAccount == s.sys.account.Name
810✔
503
        }
810✔
504
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
845✔
505
        s.mu.RUnlock()
845✔
506

845✔
507
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
845✔
508
        if firstConnect && isSysAcc && !s.standAloneMode() {
916✔
509
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
71✔
510
                remote.setConnectDelay(sharedSysAccDelay)
71✔
511
        }
71✔
512

513
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
921✔
514
                select {
76✔
515
                case <-time.After(connDelay):
72✔
516
                case <-s.quitCh:
4✔
517
                        return
4✔
518
                }
519
                remote.setConnectDelay(0)
72✔
520
        }
521

522
        var conn net.Conn
841✔
523

841✔
524
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
841✔
525

841✔
526
        attempts := 0
841✔
527

841✔
528
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
1,865✔
529
                rURL := remote.pickNextURL()
1,024✔
530
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
1,024✔
531
                if err == nil {
2,041✔
532
                        var ipStr string
1,017✔
533
                        if url != rURL.Host {
1,093✔
534
                                ipStr = fmt.Sprintf(" (%s)", url)
76✔
535
                        }
76✔
536
                        // Some test may want to disable remotes from connecting
537
                        if s.isLeafConnectDisabled() {
1,156✔
538
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
139✔
539
                                err = ErrLeafNodeDisabled
139✔
540
                        } else {
1,017✔
541
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
878✔
542
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
878✔
543
                        }
878✔
544
                }
545
                if err != nil {
1,299✔
546
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
275✔
547
                        delay := reconnectDelay + jitter
275✔
548
                        attempts++
275✔
549
                        if s.shouldReportConnectErr(firstConnect, attempts) {
532✔
550
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
257✔
551
                        } else {
275✔
552
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
18✔
553
                        }
18✔
554
                        remote.Lock()
275✔
555
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
275✔
556
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
283✔
557
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
558
                                        s.checkJetStreamMigrate(remote)
8✔
559
                                })
8✔
560
                        }
561
                        remote.Unlock()
275✔
562
                        select {
275✔
563
                        case <-s.quitCh:
86✔
564
                                remote.cancelMigrateTimer()
86✔
565
                                return
86✔
566
                        case <-time.After(delay):
189✔
567
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
189✔
568
                                // This will be used if JetStreamClusterMigrateDelay was not set
189✔
569
                                if jetstreamMigrateDelay == 0 {
299✔
570
                                        s.checkJetStreamMigrate(remote)
110✔
571
                                }
110✔
572
                                continue
189✔
573
                        }
574
                }
575
                remote.cancelMigrateTimer()
749✔
576
                if !s.remoteLeafNodeStillValid(remote) {
749✔
577
                        conn.Close()
×
578
                        return
×
579
                }
×
580

581
                // We have a connection here to a remote server.
582
                // Go ahead and create our leaf node and return.
583
                s.createLeafNode(conn, rURL, remote, nil)
749✔
584

749✔
585
                // Clear any observer states if we had them.
749✔
586
                s.clearObserverState(remote)
749✔
587

749✔
588
                return
749✔
589
        }
590
}
591

592
func (cfg *leafNodeCfg) cancelMigrateTimer() {
835✔
593
        cfg.Lock()
835✔
594
        stopAndClearTimer(&cfg.jsMigrateTimer)
835✔
595
        cfg.Unlock()
835✔
596
}
835✔
597

598
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
599
func (s *Server) clearObserverState(remote *leafNodeCfg) {
749✔
600
        s.mu.RLock()
749✔
601
        accName := remote.LocalAccount
749✔
602
        s.mu.RUnlock()
749✔
603

749✔
604
        acc, err := s.LookupAccount(accName)
749✔
605
        if err != nil {
751✔
606
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
607
                return
2✔
608
        }
2✔
609

610
        acc.jscmMu.Lock()
747✔
611
        defer acc.jscmMu.Unlock()
747✔
612

747✔
613
        // Walk all streams looking for any clustered stream, skip otherwise.
747✔
614
        for _, mset := range acc.streams() {
763✔
615
                node := mset.raftNode()
16✔
616
                if node == nil {
24✔
617
                        // Not R>1
8✔
618
                        continue
8✔
619
                }
620
                // Check consumers
621
                for _, o := range mset.getConsumers() {
10✔
622
                        if n := o.raftNode(); n != nil {
4✔
623
                                // Ensure we can become a leader again.
2✔
624
                                n.SetObserver(false)
2✔
625
                        }
2✔
626
                }
627
                // Ensure we can not become a leader again.
628
                node.SetObserver(false)
8✔
629
        }
630
}
631

632
// Check to see if we should migrate any assets from this account.
633
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
118✔
634
        s.mu.RLock()
118✔
635
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
118✔
636
        s.mu.RUnlock()
118✔
637

118✔
638
        if !shouldMigrate {
168✔
639
                return
50✔
640
        }
50✔
641

642
        acc, err := s.LookupAccount(accName)
68✔
643
        if err != nil {
68✔
644
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
645
                return
×
646
        }
×
647

648
        acc.jscmMu.Lock()
68✔
649
        defer acc.jscmMu.Unlock()
68✔
650

68✔
651
        // Walk all streams looking for any clustered stream, skip otherwise.
68✔
652
        // If we are the leader force stepdown.
68✔
653
        for _, mset := range acc.streams() {
101✔
654
                node := mset.raftNode()
33✔
655
                if node == nil {
33✔
656
                        // Not R>1
×
657
                        continue
×
658
                }
659
                // Collect any consumers
660
                for _, o := range mset.getConsumers() {
55✔
661
                        if n := o.raftNode(); n != nil {
44✔
662
                                if n.Leader() {
24✔
663
                                        n.StepDown()
2✔
664
                                }
2✔
665
                                // Ensure we can not become a leader while in this state.
666
                                n.SetObserver(true)
22✔
667
                        }
668
                }
669
                // Stepdown if this stream was leader.
670
                if node.Leader() {
38✔
671
                        node.StepDown()
5✔
672
                }
5✔
673
                // Ensure we can not become a leader while in this state.
674
                node.SetObserver(true)
33✔
675
        }
676
}
677

678
// Helper for checking.
679
func (s *Server) isLeafConnectDisabled() bool {
1,017✔
680
        s.mu.RLock()
1,017✔
681
        defer s.mu.RUnlock()
1,017✔
682
        return s.leafDisableConnect
1,017✔
683
}
1,017✔
684

685
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
686
// come from the server we connect to.
687
//
688
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
689
// However, this was causing failures for users that did not set the scheme (and
690
// their remote connections did not have a tls{} block).
691
// We now save the host name regardless in case the remote returns an INFO indicating
692
// that TLS is required.
693
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,701✔
694
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,721✔
695
                cfg.tlsName = u.Hostname()
20✔
696
        }
20✔
697
}
698

699
// Save off the username/password for when we connect using a bare URL
700
// that we get from the INFO protocol.
701
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,107✔
702
        if cfg.username == _EMPTY_ && u.User != nil {
1,357✔
703
                cfg.username = u.User.Username()
250✔
704
                cfg.password, _ = u.User.Password()
250✔
705
        }
250✔
706
}
707

708
// This starts the leafnode accept loop in a go routine, unless it
709
// is detected that the server has already been shutdown.
710
func (s *Server) startLeafNodeAcceptLoop() {
2,692✔
711
        // Snapshot server options.
2,692✔
712
        opts := s.getOpts()
2,692✔
713

2,692✔
714
        port := opts.LeafNode.Port
2,692✔
715
        if port == -1 {
5,305✔
716
                port = 0
2,613✔
717
        }
2,613✔
718

719
        if s.isShuttingDown() {
2,693✔
720
                return
1✔
721
        }
1✔
722

723
        s.mu.Lock()
2,691✔
724
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
2,691✔
725
        l, e := natsListen("tcp", hp)
2,691✔
726
        s.leafNodeListenerErr = e
2,691✔
727
        if e != nil {
2,691✔
728
                s.mu.Unlock()
×
729
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
730
                return
×
731
        }
×
732

733
        s.Noticef("Listening for leafnode connections on %s",
2,691✔
734
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
2,691✔
735

2,691✔
736
        tlsRequired := opts.LeafNode.TLSConfig != nil
2,691✔
737
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
2,691✔
738
        // Do not set compression in this Info object, it would possibly cause
2,691✔
739
        // issues when sending asynchronous INFO to the remote.
2,691✔
740
        info := Info{
2,691✔
741
                ID:            s.info.ID,
2,691✔
742
                Name:          s.info.Name,
2,691✔
743
                Version:       s.info.Version,
2,691✔
744
                GitCommit:     gitCommit,
2,691✔
745
                GoVersion:     runtime.Version(),
2,691✔
746
                AuthRequired:  true,
2,691✔
747
                TLSRequired:   tlsRequired,
2,691✔
748
                TLSVerify:     tlsVerify,
2,691✔
749
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
2,691✔
750
                Headers:       s.supportsHeaders(),
2,691✔
751
                JetStream:     opts.JetStream,
2,691✔
752
                Domain:        opts.JetStreamDomain,
2,691✔
753
                Proto:         s.getServerProto(),
2,691✔
754
                InfoOnConnect: true,
2,691✔
755
        }
2,691✔
756
        // If we have selected a random port...
2,691✔
757
        if port == 0 {
5,303✔
758
                // Write resolved port back to options.
2,612✔
759
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
2,612✔
760
        }
2,612✔
761

762
        s.leafNodeInfo = info
2,691✔
763
        // Possibly override Host/Port and set IP based on Cluster.Advertise
2,691✔
764
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
2,691✔
765
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
766
                l.Close()
×
767
                s.mu.Unlock()
×
768
                return
×
769
        }
×
770
        s.leafURLsMap[s.leafNodeInfo.IP]++
2,691✔
771
        s.generateLeafNodeInfoJSON()
2,691✔
772

2,691✔
773
        // Setup state that can enable shutdown
2,691✔
774
        s.leafNodeListener = l
2,691✔
775

2,691✔
776
        // As of now, a server that does not have remotes configured would
2,691✔
777
        // never solicit a connection, so we should not have to warn if
2,691✔
778
        // InsecureSkipVerify is set in main LeafNodes config (since
2,691✔
779
        // this TLS setting matters only when soliciting a connection).
2,691✔
780
        // Still, warn if insecure is set in any of LeafNode block.
2,691✔
781
        // We need to check remotes, even if tls is not required on accept.
2,691✔
782
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
2,691✔
783
        if !warn {
5,378✔
784
                for _, r := range opts.LeafNode.Remotes {
2,844✔
785
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
786
                                warn = true
1✔
787
                                break
1✔
788
                        }
789
                }
790
        }
791
        if warn {
2,696✔
792
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
793
        }
5✔
794
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,467✔
795
        s.mu.Unlock()
2,691✔
796
}
797

798
// RegEx to match a creds file with user JWT and Seed.
799
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
800

801
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
802
// Lock should be held entering here.
803
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
622✔
804
        // We support basic user/pass and operator based user JWT with signatures.
622✔
805
        cinfo := leafConnectInfo{
622✔
806
                Version:       VERSION,
622✔
807
                ID:            c.srv.info.ID,
622✔
808
                Domain:        c.srv.info.Domain,
622✔
809
                Name:          c.srv.info.Name,
622✔
810
                Hub:           c.leaf.remote.Hub,
622✔
811
                Cluster:       clusterName,
622✔
812
                Headers:       headers,
622✔
813
                JetStream:     c.acc.jetStreamConfigured(),
622✔
814
                DenyPub:       c.leaf.remote.DenyImports,
622✔
815
                Compression:   c.leaf.compression,
622✔
816
                RemoteAccount: c.acc.GetName(),
622✔
817
                Proto:         c.srv.getServerProto(),
622✔
818
        }
622✔
819

622✔
820
        // If a signature callback is specified, this takes precedence over anything else.
622✔
821
        if cb := c.leaf.remote.SignatureCB; cb != nil {
625✔
822
                nonce := c.nonce
3✔
823
                c.mu.Unlock()
3✔
824
                jwt, sigraw, err := cb(nonce)
3✔
825
                c.mu.Lock()
3✔
826
                if err == nil && c.isClosed() {
4✔
827
                        err = ErrConnectionClosed
1✔
828
                }
1✔
829
                if err != nil {
5✔
830
                        c.Errorf("Error signing the nonce: %v", err)
2✔
831
                        return err
2✔
832
                }
2✔
833
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
834
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
835

836
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
664✔
837
                // Check for credentials first, that will take precedence..
45✔
838
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
45✔
839
                contents, err := os.ReadFile(creds)
45✔
840
                if err != nil {
45✔
841
                        c.Errorf("%v", err)
×
842
                        return err
×
843
                }
×
844
                defer wipeSlice(contents)
45✔
845
                items := credsRe.FindAllSubmatch(contents, -1)
45✔
846
                if len(items) < 2 {
45✔
847
                        c.Errorf("Credentials file malformed")
×
848
                        return err
×
849
                }
×
850
                // First result should be the user JWT.
851
                // We copy here so that the file containing the seed will be wiped appropriately.
852
                raw := items[0][1]
45✔
853
                tmp := make([]byte, len(raw))
45✔
854
                copy(tmp, raw)
45✔
855
                // Seed is second item.
45✔
856
                kp, err := nkeys.FromSeed(items[1][1])
45✔
857
                if err != nil {
45✔
858
                        c.Errorf("Credentials file has malformed seed")
×
859
                        return err
×
860
                }
×
861
                // Wipe our key on exit.
862
                defer kp.Wipe()
45✔
863

45✔
864
                sigraw, _ := kp.Sign(c.nonce)
45✔
865
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
45✔
866
                cinfo.JWT = bytesToString(tmp)
45✔
867
                cinfo.Sig = sig
45✔
868
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
576✔
869
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
870
                if err != nil {
2✔
871
                        c.Errorf("Remote nkey has malformed seed")
×
872
                        return err
×
873
                }
×
874
                // Wipe our key on exit.
875
                defer kp.Wipe()
2✔
876
                sigraw, _ := kp.Sign(c.nonce)
2✔
877
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
878
                pkey, _ := kp.PublicKey()
2✔
879
                cinfo.Nkey = pkey
2✔
880
                cinfo.Sig = sig
2✔
881
        } else if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
841✔
882
                cinfo.User = userInfo.Username()
269✔
883
                cinfo.Pass, _ = userInfo.Password()
269✔
884
        } else if c.leaf.remote.username != _EMPTY_ {
574✔
885
                cinfo.User = c.leaf.remote.username
2✔
886
                cinfo.Pass = c.leaf.remote.password
2✔
887
        }
2✔
888
        b, err := json.Marshal(cinfo)
620✔
889
        if err != nil {
620✔
890
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
891
                return err
×
892
        }
×
893
        // Although this call is made before the writeLoop is created,
894
        // we don't really need to send in place. The protocol will be
895
        // sent out by the writeLoop.
896
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
620✔
897
        return nil
620✔
898
}
899

900
// Makes a deep copy of the LeafNode Info structure.
901
// The server lock is held on entry.
902
func (s *Server) copyLeafNodeInfo() *Info {
2,490✔
903
        clone := s.leafNodeInfo
2,490✔
904
        // Copy the array of urls.
2,490✔
905
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,526✔
906
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,036✔
907
        }
2,036✔
908
        return &clone
2,490✔
909
}
910

911
// Adds a LeafNode URL that we get when a route connects to the Info structure.
912
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
913
// Returns a boolean indicating if the URL was added or not.
914
// Server lock is held on entry
915
func (s *Server) addLeafNodeURL(urlStr string) bool {
5,299✔
916
        if s.leafURLsMap.addUrl(urlStr) {
10,593✔
917
                s.generateLeafNodeInfoJSON()
5,294✔
918
                return true
5,294✔
919
        }
5,294✔
920
        return false
5✔
921
}
922

923
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
924
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
925
// Returns a boolean indicating if the URL was removed or not.
926
// Server lock is held on entry.
927
func (s *Server) removeLeafNodeURL(urlStr string) bool {
5,299✔
928
        // Don't need to do this if we are removing the route connection because
5,299✔
929
        // we are shuting down...
5,299✔
930
        if s.isShuttingDown() {
8,113✔
931
                return false
2,814✔
932
        }
2,814✔
933
        if s.leafURLsMap.removeUrl(urlStr) {
4,966✔
934
                s.generateLeafNodeInfoJSON()
2,481✔
935
                return true
2,481✔
936
        }
2,481✔
937
        return false
4✔
938
}
939

940
// Server lock is held on entry
941
func (s *Server) generateLeafNodeInfoJSON() {
10,466✔
942
        s.leafNodeInfo.Cluster = s.cachedClusterName()
10,466✔
943
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
10,466✔
944
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
10,466✔
945
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
10,466✔
946
}
10,466✔
947

948
// Sends an async INFO protocol so that the connected servers can update
949
// their list of LeafNode urls.
950
func (s *Server) sendAsyncLeafNodeInfo() {
7,775✔
951
        for _, c := range s.leafs {
7,865✔
952
                c.mu.Lock()
90✔
953
                c.enqueueProto(s.leafNodeInfoJSON)
90✔
954
                c.mu.Unlock()
90✔
955
        }
90✔
956
}
957

958
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
959
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,552✔
960
        // Snapshot server options.
1,552✔
961
        opts := s.getOpts()
1,552✔
962

1,552✔
963
        maxPay := int32(opts.MaxPayload)
1,552✔
964
        maxSubs := int32(opts.MaxSubs)
1,552✔
965
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,552✔
966
        if maxSubs == 0 {
3,103✔
967
                maxSubs = -1
1,551✔
968
        }
1,551✔
969
        now := time.Now().UTC()
1,552✔
970

1,552✔
971
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,552✔
972
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,552✔
973
        c.leaf = &leaf{}
1,552✔
974

1,552✔
975
        // For accepted LN connections, ws will be != nil if it was accepted
1,552✔
976
        // through the Websocket port.
1,552✔
977
        c.ws = ws
1,552✔
978

1,552✔
979
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,552✔
980
        // a remote Leaf Node connection as a websocket connection.
1,552✔
981
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,595✔
982
                remote.RLock()
43✔
983
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
984
                remote.RUnlock()
43✔
985
        }
43✔
986

987
        // Determines if we are soliciting the connection or not.
988
        var solicited bool
1,552✔
989
        var acc *Account
1,552✔
990
        var remoteSuffix string
1,552✔
991
        if remote != nil {
2,301✔
992
                // For now, if lookup fails, we will constantly try
749✔
993
                // to recreate this LN connection.
749✔
994
                lacc := remote.LocalAccount
749✔
995
                var err error
749✔
996
                acc, err = s.LookupAccount(lacc)
749✔
997
                if err != nil {
751✔
998
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
999
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1000
                        // remote needs to be set or retry won't happen
2✔
1001
                        c.leaf.remote = remote
2✔
1002
                        c.closeConnection(MissingAccount)
2✔
1003
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1004
                        return nil
2✔
1005
                }
2✔
1006
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
747✔
1007
        }
1008

1009
        c.mu.Lock()
1,550✔
1010
        c.initClient()
1,550✔
1011
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,550✔
1012

1,550✔
1013
        var (
1,550✔
1014
                tlsFirst         bool
1,550✔
1015
                tlsFirstFallback time.Duration
1,550✔
1016
                infoTimeout      time.Duration
1,550✔
1017
        )
1,550✔
1018
        if remote != nil {
2,297✔
1019
                solicited = true
747✔
1020
                remote.Lock()
747✔
1021
                c.leaf.remote = remote
747✔
1022
                c.setPermissions(remote.perms)
747✔
1023
                if !c.leaf.remote.Hub {
1,488✔
1024
                        c.leaf.isSpoke = true
741✔
1025
                }
741✔
1026
                tlsFirst = remote.TLSHandshakeFirst
747✔
1027
                infoTimeout = remote.FirstInfoTimeout
747✔
1028
                remote.Unlock()
747✔
1029
                c.acc = acc
747✔
1030
        } else {
803✔
1031
                c.flags.set(expectConnect)
803✔
1032
                if ws != nil {
830✔
1033
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1034
                }
27✔
1035
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
803✔
1036
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
804✔
1037
                        tlsFirstFallback = f
1✔
1038
                }
1✔
1039
        }
1040
        c.mu.Unlock()
1,550✔
1041

1,550✔
1042
        var nonce [nonceLen]byte
1,550✔
1043
        var info *Info
1,550✔
1044

1,550✔
1045
        // Grab this before the client lock below.
1,550✔
1046
        if !solicited {
2,353✔
1047
                // Grab server variables
803✔
1048
                s.mu.Lock()
803✔
1049
                info = s.copyLeafNodeInfo()
803✔
1050
                // For tests that want to simulate old servers, do not set the compression
803✔
1051
                // on the INFO protocol if configured with CompressionNotSupported.
803✔
1052
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,605✔
1053
                        info.Compression = cm
802✔
1054
                }
802✔
1055
                s.generateNonce(nonce[:])
803✔
1056
                s.mu.Unlock()
803✔
1057
        }
1058

1059
        // Grab lock
1060
        c.mu.Lock()
1,550✔
1061

1,550✔
1062
        var preBuf []byte
1,550✔
1063
        if solicited {
2,297✔
1064
                // For websocket connection, we need to send an HTTP request,
747✔
1065
                // and get the response before starting the readLoop to get
747✔
1066
                // the INFO, etc..
747✔
1067
                if c.isWebsocket() {
790✔
1068
                        var err error
43✔
1069
                        var closeReason ClosedState
43✔
1070

43✔
1071
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1072
                        if err != nil {
59✔
1073
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1074
                                c.mu.Unlock()
16✔
1075
                                if closeReason != 0 {
28✔
1076
                                        c.closeConnection(closeReason)
12✔
1077
                                }
12✔
1078
                                return nil
16✔
1079
                        }
1080
                } else {
704✔
1081
                        // If configured to do TLS handshake first
704✔
1082
                        if tlsFirst {
708✔
1083
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1084
                                        c.mu.Unlock()
1✔
1085
                                        return nil
1✔
1086
                                }
1✔
1087
                        }
1088
                        // We need to wait for the info, but not for too long.
1089
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
703✔
1090
                }
1091

1092
                // We will process the INFO from the readloop and finish by
1093
                // sending the CONNECT and finish registration later.
1094
        } else {
803✔
1095
                // Send our info to the other side.
803✔
1096
                // Remember the nonce we sent here for signatures, etc.
803✔
1097
                c.nonce = make([]byte, nonceLen)
803✔
1098
                copy(c.nonce, nonce[:])
803✔
1099
                info.Nonce = bytesToString(c.nonce)
803✔
1100
                info.CID = c.cid
803✔
1101
                proto := generateInfoJSON(info)
803✔
1102

803✔
1103
                var pre []byte
803✔
1104
                // We need first to check for "TLS First" fallback delay.
803✔
1105
                if tlsFirstFallback > 0 {
804✔
1106
                        // We wait and see if we are getting any data. Since we did not send
1✔
1107
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1108
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1109
                        // if it is a rogue agent and not an actual client performing the
1✔
1110
                        // TLS handshake, the error will be detected when performing the
1✔
1111
                        // handshake on our side.
1✔
1112
                        pre = make([]byte, 4)
1✔
1113
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1114
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1115
                        c.nc.SetReadDeadline(time.Time{})
1✔
1116
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1117
                        // with the TLS handshake.
1✔
1118
                        if n > 0 {
1✔
1119
                                pre = pre[:n]
×
1120
                        } else {
1✔
1121
                                // We did not get anything so we will send the INFO protocol.
1✔
1122
                                pre = nil
1✔
1123
                                // Set the boolean to false for the rest of the function.
1✔
1124
                                tlsFirst = false
1✔
1125
                        }
1✔
1126
                }
1127

1128
                if !tlsFirst {
1,601✔
1129
                        // We have to send from this go routine because we may
798✔
1130
                        // have to block for TLS handshake before we start our
798✔
1131
                        // writeLoop go routine. The other side needs to receive
798✔
1132
                        // this before it can initiate the TLS handshake..
798✔
1133
                        c.sendProtoNow(proto)
798✔
1134

798✔
1135
                        // The above call could have marked the connection as closed (due to TCP error).
798✔
1136
                        if c.isClosed() {
798✔
1137
                                c.mu.Unlock()
×
1138
                                c.closeConnection(WriteError)
×
1139
                                return nil
×
1140
                        }
×
1141
                }
1142

1143
                // Check to see if we need to spin up TLS.
1144
                if !c.isWebsocket() && info.TLSRequired {
882✔
1145
                        // If we have a prebuffer create a multi-reader.
79✔
1146
                        if len(pre) > 0 {
79✔
1147
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1148
                        }
×
1149
                        // Perform server-side TLS handshake.
1150
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
130✔
1151
                                c.mu.Unlock()
51✔
1152
                                return nil
51✔
1153
                        }
51✔
1154
                }
1155

1156
                // If the user wants the TLS handshake to occur first, now that it is
1157
                // done, send the INFO protocol.
1158
                if tlsFirst {
755✔
1159
                        c.flags.set(didTLSFirst)
3✔
1160
                        c.sendProtoNow(proto)
3✔
1161
                        if c.isClosed() {
3✔
1162
                                c.mu.Unlock()
×
1163
                                c.closeConnection(WriteError)
×
1164
                                return nil
×
1165
                        }
×
1166
                }
1167

1168
                // Leaf nodes will always require a CONNECT to let us know
1169
                // when we are properly bound to an account.
1170
                //
1171
                // If compression is configured, we can't set the authTimer here because
1172
                // it would cause the parser to fail any incoming protocol that is not a
1173
                // CONNECT (and we need to exchange INFO protocols for compression
1174
                // negotiation). So instead, use the ping timer until we are done with
1175
                // negotiation and can set the auth timer.
1176
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
752✔
1177
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,305✔
1178
                        c.ping.tmr = time.AfterFunc(timeout, func() {
570✔
1179
                                c.authTimeout()
17✔
1180
                        })
17✔
1181
                } else {
199✔
1182
                        c.setAuthTimer(timeout)
199✔
1183
                }
199✔
1184
        }
1185

1186
        // Keep track in case server is shutdown before we can successfully register.
1187
        if !s.addToTempClients(c.cid, c) {
1,483✔
1188
                c.mu.Unlock()
1✔
1189
                c.setNoReconnect()
1✔
1190
                c.closeConnection(ServerShutdown)
1✔
1191
                return nil
1✔
1192
        }
1✔
1193

1194
        // Spin up the read loop.
1195
        s.startGoRoutine(func() { c.readLoop(preBuf) })
2,962✔
1196

1197
        // We will spin the write loop for solicited connections only
1198
        // when processing the INFO and after switching to TLS if needed.
1199
        if !solicited {
2,233✔
1200
                s.startGoRoutine(func() { c.writeLoop() })
1,504✔
1201
        }
1202

1203
        c.mu.Unlock()
1,481✔
1204

1,481✔
1205
        return c
1,481✔
1206
}
1207

1208
// Will perform the client-side TLS handshake if needed. Assumes that this
1209
// is called by the solicit side (remote will be non nil). Returns `true`
1210
// if TLS is required, `false` otherwise.
1211
// Lock held on entry.
1212
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,776✔
1213
        // Check if TLS is required and gather TLS config variables.
1,776✔
1214
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,776✔
1215
        if !tlsRequired {
3,466✔
1216
                return false, nil
1,690✔
1217
        }
1,690✔
1218

1219
        // If TLS required, peform handshake.
1220
        // Get the URL that was used to connect to the remote server.
1221
        rURL := remote.getCurrentURL()
86✔
1222

86✔
1223
        // Perform the client-side TLS handshake.
86✔
1224
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
130✔
1225
                // Check if we need to reset the remote's TLS name.
44✔
1226
                if resetTLSName {
44✔
1227
                        remote.Lock()
×
1228
                        remote.tlsName = _EMPTY_
×
1229
                        remote.Unlock()
×
1230
                }
×
1231
                return false, err
44✔
1232
        }
1233
        return true, nil
42✔
1234
}
1235

1236
func (c *client) processLeafnodeInfo(info *Info) {
2,455✔
1237
        c.mu.Lock()
2,455✔
1238
        if c.leaf == nil || c.isClosed() {
2,455✔
1239
                c.mu.Unlock()
×
1240
                return
×
1241
        }
×
1242
        s := c.srv
2,455✔
1243
        opts := s.getOpts()
2,455✔
1244
        remote := c.leaf.remote
2,455✔
1245
        didSolicit := remote != nil
2,455✔
1246
        firstINFO := !c.flags.isSet(infoReceived)
2,455✔
1247

2,455✔
1248
        // In case of websocket, the TLS handshake has been already done.
2,455✔
1249
        // So check only for non websocket connections and for configurations
2,455✔
1250
        // where the TLS Handshake was not done first.
2,455✔
1251
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,184✔
1252
                // If the server requires TLS, we need to set this in the remote
1,729✔
1253
                // otherwise if there is no TLS configuration block for the remote,
1,729✔
1254
                // the solicit side will not attempt to perform the TLS handshake.
1,729✔
1255
                if firstINFO && info.TLSRequired {
1,799✔
1256
                        remote.TLS = true
70✔
1257
                }
70✔
1258
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,768✔
1259
                        c.mu.Unlock()
39✔
1260
                        return
39✔
1261
                }
39✔
1262
        }
1263

1264
        // Check for compression, unless already done.
1265
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,606✔
1266
                // Prevent from getting back here.
1,190✔
1267
                c.flags.set(compressionNegotiated)
1,190✔
1268

1,190✔
1269
                var co *CompressionOpts
1,190✔
1270
                if !didSolicit {
1,710✔
1271
                        co = &opts.LeafNode.Compression
520✔
1272
                } else {
1,190✔
1273
                        co = &remote.Compression
670✔
1274
                }
670✔
1275
                if needsCompression(co.Mode) {
2,369✔
1276
                        // Release client lock since following function will need server lock.
1,179✔
1277
                        c.mu.Unlock()
1,179✔
1278
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,179✔
1279
                        if err != nil {
1,179✔
1280
                                c.sendErrAndErr(err.Error())
×
1281
                                c.closeConnection(ProtocolViolation)
×
1282
                                return
×
1283
                        }
×
1284
                        if compress {
2,235✔
1285
                                // Done for now, will get back another INFO protocol...
1,056✔
1286
                                return
1,056✔
1287
                        }
1,056✔
1288
                        // No compression because one side does not want/can't, so proceed.
1289
                        c.mu.Lock()
123✔
1290
                        // Check that the connection did not close if the lock was released.
123✔
1291
                        if c.isClosed() {
123✔
1292
                                c.mu.Unlock()
×
1293
                                return
×
1294
                        }
×
1295
                } else {
11✔
1296
                        // Coming from an old server, the Compression field would be the empty
11✔
1297
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1298
                        // this makes them behave as old servers.
11✔
1299
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1300
                                c.leaf.compression = CompressionNotSupported
3✔
1301
                        } else {
11✔
1302
                                c.leaf.compression = CompressionOff
8✔
1303
                        }
8✔
1304
                }
1305
                // Accepting side does not normally process an INFO protocol during
1306
                // initial connection handshake. So we keep it consistent by returning
1307
                // if we are not soliciting.
1308
                if !didSolicit {
135✔
1309
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1310
                        // clear the ping timer and set the auth timer now that the compression
1✔
1311
                        // negotiation is done.
1✔
1312
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1313
                                clearTimer(&c.ping.tmr)
×
1314
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1315
                        }
×
1316
                        c.mu.Unlock()
1✔
1317
                        return
1✔
1318
                }
1319
                // Fall through and process the INFO protocol as usual.
1320
        }
1321

1322
        // Note: For now, only the initial INFO has a nonce. We
1323
        // will probably do auto key rotation at some point.
1324
        if firstINFO {
2,068✔
1325
                // Mark that the INFO protocol has been received.
709✔
1326
                c.flags.set(infoReceived)
709✔
1327
                // Prevent connecting to non leafnode port. Need to do this only for
709✔
1328
                // the first INFO, not for async INFO updates...
709✔
1329
                //
709✔
1330
                // Content of INFO sent by the server when accepting a tcp connection.
709✔
1331
                // -------------------------------------------------------------------
709✔
1332
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
709✔
1333
                // -------------------------------------------------------------------
709✔
1334
                //      CLIENT    |  X* |        X**        |              |         |
709✔
1335
                //      ROUTE     |     |        X**        |      X***    |         |
709✔
1336
                //     GATEWAY    |     |                   |              |    X    |
709✔
1337
                //     LEAFNODE   |  X  |                   |       X      |         |
709✔
1338
                // -------------------------------------------------------------------
709✔
1339
                // *   Not on older servers.
709✔
1340
                // **  Not if "no advertise" is enabled.
709✔
1341
                // *** Not if leafnode's "no advertise" is enabled.
709✔
1342
                //
709✔
1343
                // As seen from above, a solicited LeafNode connection should receive
709✔
1344
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
709✔
1345
                // else should be considered an attempt to connect to a wrong port.
709✔
1346
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
754✔
1347
                        c.mu.Unlock()
45✔
1348
                        c.Errorf(ErrConnectedToWrongPort.Error())
45✔
1349
                        c.closeConnection(WrongPort)
45✔
1350
                        return
45✔
1351
                }
45✔
1352
                // Reject a cluster that contains spaces.
1353
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
665✔
1354
                        c.mu.Unlock()
1✔
1355
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1356
                        c.closeConnection(ProtocolViolation)
1✔
1357
                        return
1✔
1358
                }
1✔
1359
                // Capture a nonce here.
1360
                c.nonce = []byte(info.Nonce)
663✔
1361
                if info.TLSRequired && didSolicit {
694✔
1362
                        remote.TLS = true
31✔
1363
                }
31✔
1364
                supportsHeaders := c.srv.supportsHeaders()
663✔
1365
                c.headers = supportsHeaders && info.Headers
663✔
1366

663✔
1367
                // Remember the remote server.
663✔
1368
                // Pre 2.2.0 servers are not sending their server name.
663✔
1369
                // In that case, use info.ID, which, for those servers, matches
663✔
1370
                // the content of the field `Name` in the leafnode CONNECT protocol.
663✔
1371
                if info.Name == _EMPTY_ {
663✔
1372
                        c.leaf.remoteServer = info.ID
×
1373
                } else {
663✔
1374
                        c.leaf.remoteServer = info.Name
663✔
1375
                }
663✔
1376
                c.leaf.remoteDomain = info.Domain
663✔
1377
                c.leaf.remoteCluster = info.Cluster
663✔
1378
                // We send the protocol version in the INFO protocol.
663✔
1379
                // Keep track of it, so we know if this connection supports message
663✔
1380
                // tracing for instance.
663✔
1381
                c.opts.Protocol = info.Proto
663✔
1382
        }
1383

1384
        // For both initial INFO and async INFO protocols, Possibly
1385
        // update our list of remote leafnode URLs we can connect to.
1386
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,545✔
1387
                // Consider the incoming array as the most up-to-date
1,232✔
1388
                // representation of the remote cluster's list of URLs.
1,232✔
1389
                c.updateLeafNodeURLs(info)
1,232✔
1390
        }
1,232✔
1391

1392
        // Check to see if we have permissions updates here.
1393
        if info.Import != nil || info.Export != nil {
1,324✔
1394
                perms := &Permissions{
11✔
1395
                        Publish:   info.Export,
11✔
1396
                        Subscribe: info.Import,
11✔
1397
                }
11✔
1398
                // Check if we have local deny clauses that we need to merge.
11✔
1399
                if remote := c.leaf.remote; remote != nil {
22✔
1400
                        if len(remote.DenyExports) > 0 {
12✔
1401
                                if perms.Publish == nil {
1✔
1402
                                        perms.Publish = &SubjectPermission{}
×
1403
                                }
×
1404
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1405
                        }
1406
                        if len(remote.DenyImports) > 0 {
12✔
1407
                                if perms.Subscribe == nil {
1✔
1408
                                        perms.Subscribe = &SubjectPermission{}
×
1409
                                }
×
1410
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1411
                        }
1412
                }
1413
                c.setPermissions(perms)
11✔
1414
        }
1415

1416
        var resumeConnect bool
1,313✔
1417

1,313✔
1418
        // If this is a remote connection and this is the first INFO protocol,
1,313✔
1419
        // then we need to finish the connect process by sending CONNECT, etc..
1,313✔
1420
        if firstINFO && didSolicit {
1,935✔
1421
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
622✔
1422
                c.nc.SetDeadline(time.Time{})
622✔
1423
                resumeConnect = true
622✔
1424
        } else if !firstINFO && didSolicit {
1,923✔
1425
                c.leaf.remoteAccName = info.RemoteAccount
610✔
1426
        }
610✔
1427

1428
        // Check if we have the remote account information and if so make sure it's stored.
1429
        if info.RemoteAccount != _EMPTY_ {
1,917✔
1430
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
604✔
1431
        }
604✔
1432
        c.mu.Unlock()
1,313✔
1433

1,313✔
1434
        finishConnect := info.ConnectInfo
1,313✔
1435
        if resumeConnect && s != nil {
1,935✔
1436
                s.leafNodeResumeConnectProcess(c)
622✔
1437
                if !info.InfoOnConnect {
622✔
1438
                        finishConnect = true
×
1439
                }
×
1440
        }
1441
        if finishConnect {
1,917✔
1442
                s.leafNodeFinishConnectProcess(c)
604✔
1443
        }
604✔
1444
}
1445

1446
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,179✔
1447
        // Negotiate the appropriate compression mode (or no compression)
1,179✔
1448
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,179✔
1449
        if err != nil {
1,179✔
1450
                return false, err
×
1451
        }
×
1452
        c.mu.Lock()
1,179✔
1453
        // For "auto" mode, set the initial compression mode based on RTT
1,179✔
1454
        if cm == CompressionS2Auto {
2,196✔
1455
                if c.rttStart.IsZero() {
2,034✔
1456
                        c.rtt = computeRTT(c.start)
1,017✔
1457
                }
1,017✔
1458
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,017✔
1459
        }
1460
        // Keep track of the negotiated compression mode.
1461
        c.leaf.compression = cm
1,179✔
1462
        cid := c.cid
1,179✔
1463
        var nonce string
1,179✔
1464
        if !didSolicit {
1,698✔
1465
                nonce = bytesToString(c.nonce)
519✔
1466
        }
519✔
1467
        c.mu.Unlock()
1,179✔
1468

1,179✔
1469
        if !needsCompression(cm) {
1,302✔
1470
                return false, nil
123✔
1471
        }
123✔
1472

1473
        // If we end-up doing compression...
1474

1475
        // Generate an INFO with the chosen compression mode.
1476
        s.mu.Lock()
1,056✔
1477
        info := s.copyLeafNodeInfo()
1,056✔
1478
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,056✔
1479
        infoProto := generateInfoJSON(info)
1,056✔
1480
        s.mu.Unlock()
1,056✔
1481

1,056✔
1482
        // If we solicited, then send this INFO protocol BEFORE switching
1,056✔
1483
        // to compression writer. However, if we did not, we send it after.
1,056✔
1484
        c.mu.Lock()
1,056✔
1485
        if didSolicit {
1,593✔
1486
                c.enqueueProto(infoProto)
537✔
1487
                // Make sure it is completely flushed (the pending bytes goes to
537✔
1488
                // 0) before proceeding.
537✔
1489
                for c.out.pb > 0 && !c.isClosed() {
1,074✔
1490
                        c.flushOutbound()
537✔
1491
                }
537✔
1492
        }
1493
        // This is to notify the readLoop that it should switch to a
1494
        // (de)compression reader.
1495
        c.in.flags.set(switchToCompression)
1,056✔
1496
        // Create the compress writer before queueing the INFO protocol for
1,056✔
1497
        // a route that did not solicit. It will make sure that that proto
1,056✔
1498
        // is sent with compression on.
1,056✔
1499
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,056✔
1500
        if !didSolicit {
1,575✔
1501
                c.enqueueProto(infoProto)
519✔
1502
        }
519✔
1503
        c.mu.Unlock()
1,056✔
1504
        return true, nil
1,056✔
1505
}
1506

1507
// When getting a leaf node INFO protocol, use the provided
1508
// array of urls to update the list of possible endpoints.
1509
func (c *client) updateLeafNodeURLs(info *Info) {
1,232✔
1510
        cfg := c.leaf.remote
1,232✔
1511
        cfg.Lock()
1,232✔
1512
        defer cfg.Unlock()
1,232✔
1513

1,232✔
1514
        // We have ensured that if a remote has a WS scheme, then all are.
1,232✔
1515
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,232✔
1516
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,286✔
1517
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1518
                // we will have already marked that the remote should use TLS anyway.
54✔
1519
                // But use proper scheme for log statements, etc...
54✔
1520
                proto := wsSchemePrefix
54✔
1521
                if cfg.TLS {
54✔
1522
                        proto = wsSchemePrefixTLS
×
1523
                }
×
1524
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1525
                return
54✔
1526
        }
1527
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,178✔
1528
}
1529

1530
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,232✔
1531
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,232✔
1532
        // Add the ones we receive in the protocol
1,232✔
1533
        for _, surl := range URLs {
3,490✔
1534
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,258✔
1535
                if err != nil {
2,258✔
1536
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1537
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1538
                        continue
×
1539
                }
1540
                // Do not add if it's the same as what we already have configured.
1541
                var dup bool
2,258✔
1542
                for _, u := range cfg.URLs {
5,765✔
1543
                        // URLs that we receive never have user info, but the
3,507✔
1544
                        // ones that were configured may have. Simply compare
3,507✔
1545
                        // host and port to decide if they are equal or not.
3,507✔
1546
                        if url.Host == u.Host && url.Port() == u.Port() {
5,171✔
1547
                                dup = true
1,664✔
1548
                                break
1,664✔
1549
                        }
1550
                }
1551
                if !dup {
2,852✔
1552
                        cfg.urls = append(cfg.urls, url)
594✔
1553
                        cfg.saveTLSHostname(url)
594✔
1554
                }
594✔
1555
        }
1556
        // Add the configured one
1557
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,232✔
1558
}
1559

1560
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1561
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
2,691✔
1562
        opts := s.getOpts()
2,691✔
1563
        if opts.LeafNode.Advertise != _EMPTY_ {
2,702✔
1564
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1565
                if err != nil {
11✔
1566
                        return err
×
1567
                }
×
1568
                s.leafNodeInfo.Host = advHost
11✔
1569
                s.leafNodeInfo.Port = advPort
11✔
1570
        } else {
2,680✔
1571
                s.leafNodeInfo.Host = opts.LeafNode.Host
2,680✔
1572
                s.leafNodeInfo.Port = opts.LeafNode.Port
2,680✔
1573
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
2,680✔
1574
                // This will return at most 1 IP.
2,680✔
1575
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
2,680✔
1576
                if err != nil {
2,680✔
1577
                        return err
×
1578
                }
×
1579
                if hostIsIPAny {
2,961✔
1580
                        if len(ips) == 0 {
281✔
1581
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1582
                                        s.leafNodeInfo.Host)
×
1583
                        } else {
281✔
1584
                                // Take the first from the list...
281✔
1585
                                s.leafNodeInfo.Host = ips[0]
281✔
1586
                        }
281✔
1587
                }
1588
        }
1589
        // Use just host:port for the IP
1590
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
2,691✔
1591
        if opts.LeafNode.Advertise != _EMPTY_ {
2,702✔
1592
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1593
        }
11✔
1594
        return nil
2,691✔
1595
}
1596

1597
// Add the connection to the map of leaf nodes.
1598
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1599
// if a connection already exists for the same server name and account.
1600
// That can happen when the remote is attempting to reconnect while the accepting
1601
// side did not detect the connection as broken yet.
1602
// But it can also happen when there is a misconfiguration and the remote is
1603
// creating two (or more) connections that bind to the same account on the accept
1604
// side.
1605
// When a duplicate is found, the new connection is accepted and the old is closed
1606
// (this solves the stale connection situation). An error is returned to help the
1607
// remote detect the misconfiguration when the duplicate is the result of that
1608
// misconfiguration.
1609
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,232✔
1610
        var accName string
1,232✔
1611
        c.mu.Lock()
1,232✔
1612
        cid := c.cid
1,232✔
1613
        acc := c.acc
1,232✔
1614
        if acc != nil {
2,464✔
1615
                accName = acc.Name
1,232✔
1616
        }
1,232✔
1617
        myRemoteDomain := c.leaf.remoteDomain
1,232✔
1618
        mySrvName := c.leaf.remoteServer
1,232✔
1619
        remoteAccName := c.leaf.remoteAccName
1,232✔
1620
        myClustName := c.leaf.remoteCluster
1,232✔
1621
        solicited := c.leaf.remote != nil
1,232✔
1622
        c.mu.Unlock()
1,232✔
1623

1,232✔
1624
        var old *client
1,232✔
1625
        s.mu.Lock()
1,232✔
1626
        // We check for empty because in some test we may send empty CONNECT{}
1,232✔
1627
        if checkForDup && srvName != _EMPTY_ {
1,836✔
1628
                for _, ol := range s.leafs {
979✔
1629
                        ol.mu.Lock()
375✔
1630
                        // We care here only about non solicited Leafnode. This function
375✔
1631
                        // is more about replacing stale connections than detecting loops.
375✔
1632
                        // We have code for the loop detection elsewhere, which also delays
375✔
1633
                        // attempt to reconnect.
375✔
1634
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
375✔
1635
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
375✔
1636
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
377✔
1637
                                old = ol
2✔
1638
                        }
2✔
1639
                        ol.mu.Unlock()
375✔
1640
                        if old != nil {
377✔
1641
                                break
2✔
1642
                        }
1643
                }
1644
        }
1645
        // Store new connection in the map
1646
        s.leafs[cid] = c
1,232✔
1647
        s.mu.Unlock()
1,232✔
1648
        s.removeFromTempClients(cid)
1,232✔
1649

1,232✔
1650
        // If applicable, evict the old one.
1,232✔
1651
        if old != nil {
1,234✔
1652
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1653
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1654
                c.Warnf("Replacing connection from same server")
2✔
1655
        }
2✔
1656

1657
        srvDecorated := func() string {
1,440✔
1658
                if myClustName == _EMPTY_ {
229✔
1659
                        return mySrvName
21✔
1660
                }
21✔
1661
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
187✔
1662
        }
1663

1664
        opts := s.getOpts()
1,232✔
1665
        sysAcc := s.SystemAccount()
1,232✔
1666
        js := s.getJetStream()
1,232✔
1667
        var meta *raft
1,232✔
1668
        if js != nil {
1,750✔
1669
                if mg := js.getMetaGroup(); mg != nil {
947✔
1670
                        meta = mg.(*raft)
429✔
1671
                }
429✔
1672
        }
1673
        blockMappingOutgoing := false
1,232✔
1674
        // Deny (non domain) JetStream API traffic unless system account is shared
1,232✔
1675
        // and domain names are identical and extending is not disabled
1,232✔
1676

1,232✔
1677
        // Check if backwards compatibility has been enabled and needs to be acted on
1,232✔
1678
        forceSysAccDeny := false
1,232✔
1679
        if len(opts.JsAccDefaultDomain) > 0 {
1,269✔
1680
                if acc == sysAcc {
48✔
1681
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1682
                                if d == _EMPTY_ {
19✔
1683
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1684
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1685
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1686
                                        forceSysAccDeny = true
8✔
1687
                                        break
8✔
1688
                                }
1689
                        }
1690
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1691
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1692
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1693
                        return
15✔
1694
                }
15✔
1695
        }
1696

1697
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1698
        // This is either signaled by js being disabled and a domain set,
1699
        // or in cases where no domain name exists, an extension hint is set.
1700
        // However, this is only relevant in mixed setups.
1701
        //
1702
        // If the system account connects but default domains are present, JetStream can't be extended.
1703
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,217✔
1704
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,277✔
1705
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,060✔
1706
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,060✔
1707
                if acc != nil && acc == sysAcc {
1,196✔
1708
                        c.Noticef("System account connected from %s", srvDecorated())
136✔
1709
                        c.Noticef("JetStream not extended, domains differ")
136✔
1710
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
136✔
1711
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
136✔
1712
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
136✔
1713
                        if solicited && meta != nil && meta.IsObserver() {
165✔
1714
                                meta.setObserver(false, extNotExtended)
29✔
1715
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1716
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1717
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1718
                                // Meta controller can't be leader yet.
29✔
1719
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1720
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1721
                                meta.Campaign()
29✔
1722
                        }
29✔
1723
                } else {
924✔
1724
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
924✔
1725
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
924✔
1726
                }
924✔
1727
                blockMappingOutgoing = true
1,060✔
1728
        } else if acc == sysAcc {
229✔
1729
                // system account and same domain
72✔
1730
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1731
                        myRemoteDomain, srvDecorated())
72✔
1732
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1733
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1734
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1735
                        meta.setObserver(true, extExtended)
4✔
1736
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1737
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1738
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1739
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1740
                        meta.StepDown()
4✔
1741
                }
4✔
1742
        } else {
85✔
1743
                // This deny is needed in all cases (system account shared or not)
85✔
1744
                // If the system account is shared, jsAllAPI traffic will go through the system account.
85✔
1745
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
85✔
1746
                // If the system account is NOT shared, jsAllAPI traffic has no business
85✔
1747
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
85✔
1748
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
85✔
1749
        }
85✔
1750
        // If we have a specified JetStream domain we will want to add a mapping to
1751
        // allow access cross domain for each non-system account.
1752
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,444✔
1753
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,270✔
1754
                        if err := acc.AddMapping(src, dest); err != nil {
2,043✔
1755
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1756
                        } else {
2,043✔
1757
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,043✔
1758
                        }
2,043✔
1759
                }
1760
                if blockMappingOutgoing {
423✔
1761
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
196✔
1762
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
196✔
1763
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
196✔
1764
                        // of this issue, not all of them.
196✔
1765
                        // This guards against a hub and a spoke having the same domain name.
196✔
1766
                        // But not two spokes having the same one and the request coming from the hub.
196✔
1767
                        c.mergeDenyPermissionsLocked(pub, []string{src})
196✔
1768
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
196✔
1769
                }
196✔
1770
        }
1771
}
1772

1773
func (s *Server) removeLeafNodeConnection(c *client) {
1,552✔
1774
        c.mu.Lock()
1,552✔
1775
        cid := c.cid
1,552✔
1776
        if c.leaf != nil {
3,104✔
1777
                if c.leaf.tsubt != nil {
2,631✔
1778
                        c.leaf.tsubt.Stop()
1,079✔
1779
                        c.leaf.tsubt = nil
1,079✔
1780
                }
1,079✔
1781
                if c.leaf.gwSub != nil {
2,154✔
1782
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
602✔
1783
                        // We need to set this to nil for GC to release the connection
602✔
1784
                        c.leaf.gwSub = nil
602✔
1785
                }
602✔
1786
        }
1787
        c.mu.Unlock()
1,552✔
1788
        s.mu.Lock()
1,552✔
1789
        delete(s.leafs, cid)
1,552✔
1790
        s.mu.Unlock()
1,552✔
1791
        s.removeFromTempClients(cid)
1,552✔
1792
}
1793

1794
// Connect information for solicited leafnodes.
1795
type leafConnectInfo struct {
1796
        Version   string   `json:"version,omitempty"`
1797
        Nkey      string   `json:"nkey,omitempty"`
1798
        JWT       string   `json:"jwt,omitempty"`
1799
        Sig       string   `json:"sig,omitempty"`
1800
        User      string   `json:"user,omitempty"`
1801
        Pass      string   `json:"pass,omitempty"`
1802
        ID        string   `json:"server_id,omitempty"`
1803
        Domain    string   `json:"domain,omitempty"`
1804
        Name      string   `json:"name,omitempty"`
1805
        Hub       bool     `json:"is_hub,omitempty"`
1806
        Cluster   string   `json:"cluster,omitempty"`
1807
        Headers   bool     `json:"headers,omitempty"`
1808
        JetStream bool     `json:"jetstream,omitempty"`
1809
        DenyPub   []string `json:"deny_pub,omitempty"`
1810

1811
        // There was an existing field called:
1812
        // >> Comp bool `json:"compression,omitempty"`
1813
        // that has never been used. With support for compression, we now need
1814
        // a field that is a string. So we use a different json tag:
1815
        Compression string `json:"compress_mode,omitempty"`
1816

1817
        // Just used to detect wrong connection attempts.
1818
        Gateway string `json:"gateway,omitempty"`
1819

1820
        // Tells the accept side which account the remote is binding to.
1821
        RemoteAccount string `json:"remote_account,omitempty"`
1822

1823
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1824
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1825
        // version as part of the CONNECT. It will indicate if a connection supports
1826
        // some features, such as message tracing.
1827
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1828
        // in the low level process CONNECT.
1829
        Proto int `json:"protocol,omitempty"`
1830
}
1831

1832
// processLeafNodeConnect will process the inbound connect args.
1833
// Once we are here we are bound to an account, so can send any interest that
1834
// we would have to the other side.
1835
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
636✔
1836
        // Way to detect clients that incorrectly connect to the route listen
636✔
1837
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
636✔
1838
        if lang != _EMPTY_ {
636✔
1839
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1840
                c.closeConnection(WrongPort)
×
1841
                return ErrClientConnectedToLeafNodePort
×
1842
        }
×
1843

1844
        // Unmarshal as a leaf node connect protocol
1845
        proto := &leafConnectInfo{}
636✔
1846
        if err := json.Unmarshal(arg, proto); err != nil {
636✔
1847
                return err
×
1848
        }
×
1849

1850
        // Reject a cluster that contains spaces.
1851
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
637✔
1852
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1853
                c.closeConnection(ProtocolViolation)
1✔
1854
                return ErrClusterNameHasSpaces
1✔
1855
        }
1✔
1856

1857
        // Check for cluster name collisions.
1858
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
639✔
1859
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1860
                c.closeConnection(ClusterNamesIdentical)
4✔
1861
                return ErrLeafNodeHasSameClusterName
4✔
1862
        }
4✔
1863

1864
        // Reject if this has Gateway which means that it would be from a gateway
1865
        // connection that incorrectly connects to the leafnode port.
1866
        if proto.Gateway != _EMPTY_ {
631✔
1867
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1868
                c.Errorf(errTxt)
×
1869
                c.sendErr(errTxt)
×
1870
                c.closeConnection(WrongGateway)
×
1871
                return ErrWrongGateway
×
1872
        }
×
1873

1874
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
633✔
1875
                major, minor, update, _ := versionComponents(mv)
2✔
1876
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1877
                        // We are going to send back an INFO because otherwise recent
1✔
1878
                        // versions of the remote server would simply break the connection
1✔
1879
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1880
                        // other side to just "stall" until we finish waiting for the holding
1✔
1881
                        // period and close the connection below.
1✔
1882
                        s.sendPermsAndAccountInfo(c)
1✔
1883
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1884
                        select {
1✔
1885
                        case <-c.srv.quitCh:
1✔
1886
                        case <-time.After(leafNodeWaitBeforeClose):
×
1887
                        }
1888
                        c.closeConnection(MinimumVersionRequired)
1✔
1889
                        return ErrMinimumVersionRequired
1✔
1890
                }
1891
        }
1892

1893
        // Check if this server supports headers.
1894
        supportHeaders := c.srv.supportsHeaders()
630✔
1895

630✔
1896
        c.mu.Lock()
630✔
1897
        // Leaf Nodes do not do echo or verbose or pedantic.
630✔
1898
        c.opts.Verbose = false
630✔
1899
        c.opts.Echo = false
630✔
1900
        c.opts.Pedantic = false
630✔
1901
        // This inbound connection will be marked as supporting headers if this server
630✔
1902
        // support headers and the remote has sent in the CONNECT protocol that it does
630✔
1903
        // support headers too.
630✔
1904
        c.headers = supportHeaders && proto.Headers
630✔
1905
        // If the compression level is still not set, set it based on what has been
630✔
1906
        // given to us in the CONNECT protocol.
630✔
1907
        if c.leaf.compression == _EMPTY_ {
751✔
1908
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
121✔
1909
                if proto.Compression == _EMPTY_ {
147✔
1910
                        c.leaf.compression = CompressionNotSupported
26✔
1911
                } else {
121✔
1912
                        c.leaf.compression = proto.Compression
95✔
1913
                }
95✔
1914
        }
1915

1916
        // Remember the remote server.
1917
        c.leaf.remoteServer = proto.Name
630✔
1918
        // Remember the remote account name
630✔
1919
        c.leaf.remoteAccName = proto.RemoteAccount
630✔
1920

630✔
1921
        // If the other side has declared itself a hub, so we will take on the spoke role.
630✔
1922
        if proto.Hub {
636✔
1923
                c.leaf.isSpoke = true
6✔
1924
        }
6✔
1925

1926
        // The soliciting side is part of a cluster.
1927
        if proto.Cluster != _EMPTY_ {
1,120✔
1928
                c.leaf.remoteCluster = proto.Cluster
490✔
1929
        }
490✔
1930

1931
        c.leaf.remoteDomain = proto.Domain
630✔
1932

630✔
1933
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
630✔
1934
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
630✔
1935
        if !c.isSolicitedLeafNode() && c.perms != nil {
642✔
1936
                sp, pp := c.perms.sub, c.perms.pub
12✔
1937
                c.perms.sub, c.perms.pub = pp, sp
12✔
1938
                if c.opts.Import != nil {
23✔
1939
                        c.darray = c.opts.Import.Deny
11✔
1940
                } else {
12✔
1941
                        c.darray = nil
1✔
1942
                }
1✔
1943
        }
1944

1945
        // Set the Ping timer
1946
        c.setFirstPingTimer()
630✔
1947

630✔
1948
        // If we received pub deny permissions from the other end, merge with existing ones.
630✔
1949
        c.mergeDenyPermissions(pub, proto.DenyPub)
630✔
1950

630✔
1951
        c.mu.Unlock()
630✔
1952

630✔
1953
        // Register the cluster, even if empty, as long as we are acting as a hub.
630✔
1954
        if !proto.Hub {
1,254✔
1955
                c.acc.registerLeafNodeCluster(proto.Cluster)
624✔
1956
        }
624✔
1957

1958
        // Add in the leafnode here since we passed through auth at this point.
1959
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
630✔
1960

630✔
1961
        // If we have permissions bound to this leafnode we need to send then back to the
630✔
1962
        // origin server for local enforcement.
630✔
1963
        s.sendPermsAndAccountInfo(c)
630✔
1964

630✔
1965
        // Create and initialize the smap since we know our bound account now.
630✔
1966
        // This will send all registered subs too.
630✔
1967
        s.initLeafNodeSmapAndSendSubs(c)
630✔
1968

630✔
1969
        // Announce the account connect event for a leaf node.
630✔
1970
        // This will no-op as needed.
630✔
1971
        s.sendLeafNodeConnect(c.acc)
630✔
1972

630✔
1973
        return nil
630✔
1974
}
1975

1976
// Returns the remote cluster name. This is set only once so does not require a lock.
1977
func (c *client) remoteCluster() string {
142,897✔
1978
        if c.leaf == nil {
142,897✔
1979
                return _EMPTY_
×
1980
        }
×
1981
        return c.leaf.remoteCluster
142,897✔
1982
}
1983

1984
// Sends back an info block to the soliciting leafnode to let it know about
1985
// its permission settings for local enforcement.
1986
func (s *Server) sendPermsAndAccountInfo(c *client) {
631✔
1987
        // Copy
631✔
1988
        info := s.copyLeafNodeInfo()
631✔
1989
        c.mu.Lock()
631✔
1990
        info.CID = c.cid
631✔
1991
        info.Import = c.opts.Import
631✔
1992
        info.Export = c.opts.Export
631✔
1993
        info.RemoteAccount = c.acc.Name
631✔
1994
        info.ConnectInfo = true
631✔
1995
        c.enqueueProto(generateInfoJSON(info))
631✔
1996
        c.mu.Unlock()
631✔
1997
}
631✔
1998

1999
// Snapshot the current subscriptions from the sublist into our smap which
2000
// we will keep updated from now on.
2001
// Also send the registered subscriptions.
2002
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,232✔
2003
        acc := c.acc
1,232✔
2004
        if acc == nil {
1,232✔
2005
                c.Debugf("Leafnode does not have an account bound")
×
2006
                return
×
2007
        }
×
2008
        // Collect all account subs here.
2009
        _subs := [1024]*subscription{}
1,232✔
2010
        subs := _subs[:0]
1,232✔
2011
        ims := []string{}
1,232✔
2012

1,232✔
2013
        // Hold the client lock otherwise there can be a race and miss some subs.
1,232✔
2014
        c.mu.Lock()
1,232✔
2015
        defer c.mu.Unlock()
1,232✔
2016

1,232✔
2017
        acc.mu.RLock()
1,232✔
2018
        accName := acc.Name
1,232✔
2019
        accNTag := acc.nameTag
1,232✔
2020

1,232✔
2021
        // To make printing look better when no friendly name present.
1,232✔
2022
        if accNTag != _EMPTY_ {
1,240✔
2023
                accNTag = "/" + accNTag
8✔
2024
        }
8✔
2025

2026
        // If we are solicited we only send interest for local clients.
2027
        if c.isSpokeLeafNode() {
1,834✔
2028
                acc.sl.localSubs(&subs, true)
602✔
2029
        } else {
1,232✔
2030
                acc.sl.All(&subs)
630✔
2031
        }
630✔
2032

2033
        // Check if we have an existing service import reply.
2034
        siReply := copyBytes(acc.siReply)
1,232✔
2035

1,232✔
2036
        // Since leaf nodes only send on interest, if the bound
1,232✔
2037
        // account has import services we need to send those over.
1,232✔
2038
        for isubj := range acc.imports.services {
5,787✔
2039
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,824✔
2040
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
269✔
2041
                        continue
269✔
2042
                }
2043
                ims = append(ims, isubj)
4,286✔
2044
        }
2045
        // Likewise for mappings.
2046
        for _, m := range acc.mappings {
3,395✔
2047
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,199✔
2048
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2049
                        continue
36✔
2050
                }
2051
                ims = append(ims, m.src)
2,127✔
2052
        }
2053

2054
        // Create a unique subject that will be used for loop detection.
2055
        lds := acc.lds
1,232✔
2056
        acc.mu.RUnlock()
1,232✔
2057

1,232✔
2058
        // Check if we have to create the LDS.
1,232✔
2059
        if lds == _EMPTY_ {
2,202✔
2060
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
970✔
2061
                acc.mu.Lock()
970✔
2062
                acc.lds = lds
970✔
2063
                acc.mu.Unlock()
970✔
2064
        }
970✔
2065

2066
        // Now check for gateway interest. Leafnodes will put this into
2067
        // the proper mode to propagate, but they are not held in the account.
2068
        gwsa := [16]*client{}
1,232✔
2069
        gws := gwsa[:0]
1,232✔
2070
        s.getOutboundGatewayConnections(&gws)
1,232✔
2071
        for _, cgw := range gws {
1,309✔
2072
                cgw.mu.Lock()
77✔
2073
                gw := cgw.gw
77✔
2074
                cgw.mu.Unlock()
77✔
2075
                if gw != nil {
154✔
2076
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
154✔
2077
                                if e := ei.(*outsie); e != nil && e.sl != nil {
154✔
2078
                                        e.sl.All(&subs)
77✔
2079
                                }
77✔
2080
                        }
2081
                }
2082
        }
2083

2084
        applyGlobalRouting := s.gateway.enabled
1,232✔
2085
        if c.isSpokeLeafNode() {
1,834✔
2086
                // Add a fake subscription for this solicited leafnode connection
602✔
2087
                // so that we can send back directly for mapped GW replies.
602✔
2088
                // We need to keep track of this subscription so it can be removed
602✔
2089
                // when the connection is closed so that the GC can release it.
602✔
2090
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
602✔
2091
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
602✔
2092
        }
602✔
2093

2094
        // Now walk the results and add them to our smap
2095
        rc := c.leaf.remoteCluster
1,232✔
2096
        c.leaf.smap = make(map[string]int32)
1,232✔
2097
        for _, sub := range subs {
39,085✔
2098
                // Check perms regardless of role.
37,853✔
2099
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,179✔
2100
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,326✔
2101
                        continue
2,326✔
2102
                }
2103
                // We ignore ourselves here.
2104
                // Also don't add the subscription if it has a origin cluster and the
2105
                // cluster name matches the one of the client we are sending to.
2106
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,497✔
2107
                        count := int32(1)
29,970✔
2108
                        if len(sub.queue) > 0 && sub.qw > 0 {
29,980✔
2109
                                count = sub.qw
10✔
2110
                        }
10✔
2111
                        c.leaf.smap[keyFromSub(sub)] += count
29,970✔
2112
                        if c.leaf.tsub == nil {
31,129✔
2113
                                c.leaf.tsub = make(map[*subscription]struct{})
1,159✔
2114
                        }
1,159✔
2115
                        c.leaf.tsub[sub] = struct{}{}
29,970✔
2116
                }
2117
        }
2118
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2119
        for _, isubj := range ims {
7,645✔
2120
                c.leaf.smap[isubj]++
6,413✔
2121
        }
6,413✔
2122
        // If we have gateways enabled we need to make sure the other side sends us responses
2123
        // that have been augmented from the original subscription.
2124
        // TODO(dlc) - Should we lock this down more?
2125
        if applyGlobalRouting {
1,329✔
2126
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
97✔
2127
                c.leaf.smap[gwReplyPrefix+">"]++
97✔
2128
        }
97✔
2129
        // Detect loops by subscribing to a specific subject and checking
2130
        // if this sub is coming back to us.
2131
        c.leaf.smap[lds]++
1,232✔
2132

1,232✔
2133
        // Check if we need to add an existing siReply to our map.
1,232✔
2134
        // This will be a prefix so add on the wildcard.
1,232✔
2135
        if siReply != nil {
1,247✔
2136
                wcsub := append(siReply, '>')
15✔
2137
                c.leaf.smap[string(wcsub)]++
15✔
2138
        }
15✔
2139
        // Queue all protocols. There is no max pending limit for LN connection,
2140
        // so we don't need chunking. The writes will happen from the writeLoop.
2141
        var b bytes.Buffer
1,232✔
2142
        for key, n := range c.leaf.smap {
27,112✔
2143
                c.writeLeafSub(&b, key, n)
25,880✔
2144
        }
25,880✔
2145
        if b.Len() > 0 {
2,464✔
2146
                c.enqueueProto(b.Bytes())
1,232✔
2147
        }
1,232✔
2148
        if c.leaf.tsub != nil {
2,392✔
2149
                // Clear the tsub map after 5 seconds.
1,160✔
2150
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,241✔
2151
                        c.mu.Lock()
81✔
2152
                        if c.leaf != nil {
162✔
2153
                                c.leaf.tsub = nil
81✔
2154
                                c.leaf.tsubt = nil
81✔
2155
                        }
81✔
2156
                        c.mu.Unlock()
81✔
2157
                })
2158
        }
2159
}
2160

2161
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2162
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
196,343✔
2163
        acc, err := s.LookupAccount(accName)
196,343✔
2164
        if acc == nil || err != nil {
196,451✔
2165
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
108✔
2166
                return
108✔
2167
        }
108✔
2168
        acc.updateLeafNodes(sub, delta)
196,235✔
2169
}
2170

2171
// updateLeafNodes will make sure to update the account smap for the subscription.
2172
// Will also forward to all leaf nodes as needed.
2173
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,112,722✔
2174
        if acc == nil || sub == nil {
2,112,722✔
2175
                return
×
2176
        }
×
2177

2178
        // We will do checks for no leafnodes and same cluster here inline and under the
2179
        // general account read lock.
2180
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2181
        // blocking routes or GWs.
2182

2183
        acc.mu.RLock()
2,112,722✔
2184
        // First check if we even have leafnodes here.
2,112,722✔
2185
        if acc.nleafs == 0 {
4,159,135✔
2186
                acc.mu.RUnlock()
2,046,413✔
2187
                return
2,046,413✔
2188
        }
2,046,413✔
2189

2190
        // Is this a loop detection subject.
2191
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
66,309✔
2192

66,309✔
2193
        // Capture the cluster even if its empty.
66,309✔
2194
        var cluster string
66,309✔
2195
        if sub.origin != nil {
114,659✔
2196
                cluster = bytesToString(sub.origin)
48,350✔
2197
        }
48,350✔
2198

2199
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2200
        // Empty clusters will return false for the check.
2201
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
87,714✔
2202
                acc.mu.RUnlock()
21,405✔
2203
                return
21,405✔
2204
        }
21,405✔
2205

2206
        // We can release the general account lock.
2207
        acc.mu.RUnlock()
44,904✔
2208

44,904✔
2209
        // We can hold the list lock here to avoid having to copy a large slice.
44,904✔
2210
        acc.lmu.RLock()
44,904✔
2211
        defer acc.lmu.RUnlock()
44,904✔
2212

44,904✔
2213
        // Do this once.
44,904✔
2214
        subject := string(sub.subject)
44,904✔
2215

44,904✔
2216
        // Walk the connected leafnodes.
44,904✔
2217
        for _, ln := range acc.lleafs {
101,279✔
2218
                if ln == sub.client {
85,458✔
2219
                        continue
29,083✔
2220
                }
2221
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2222
                ln.mu.Lock()
27,292✔
2223
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
27,292✔
2224
                // the detection of loops as long as different cluster.
27,292✔
2225
                clusterDifferent := cluster != ln.remoteCluster()
27,292✔
2226
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
50,127✔
2227
                        ln.updateSmap(sub, delta, isLDS)
22,835✔
2228
                }
22,835✔
2229
                ln.mu.Unlock()
27,292✔
2230
        }
2231
}
2232

2233
// This will make an update to our internal smap and determine if we should send out
2234
// an interest update to the remote side.
2235
// Lock should be held.
2236
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
22,835✔
2237
        if c.leaf.smap == nil {
22,835✔
2238
                return
×
2239
        }
×
2240

2241
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2242
        skind := sub.client.kind
22,835✔
2243
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
22,835✔
2244
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
31,037✔
2245
                return
8,202✔
2246
        }
8,202✔
2247

2248
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2249
        if delta > 0 && c.leaf.tsub != nil {
21,537✔
2250
                if _, present := c.leaf.tsub[sub]; present {
6,904✔
2251
                        delete(c.leaf.tsub, sub)
×
2252
                        if len(c.leaf.tsub) == 0 {
×
2253
                                c.leaf.tsub = nil
×
2254
                                c.leaf.tsubt.Stop()
×
2255
                                c.leaf.tsubt = nil
×
2256
                        }
×
2257
                        return
×
2258
                }
2259
        }
2260

2261
        key := keyFromSub(sub)
14,633✔
2262
        n, ok := c.leaf.smap[key]
14,633✔
2263
        if delta < 0 && !ok {
15,546✔
2264
                return
913✔
2265
        }
913✔
2266

2267
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2268
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,720✔
2269
        n += delta
13,720✔
2270
        if n > 0 {
24,017✔
2271
                c.leaf.smap[key] = n
10,297✔
2272
        } else {
13,720✔
2273
                delete(c.leaf.smap, key)
3,423✔
2274
        }
3,423✔
2275
        if update {
22,793✔
2276
                c.sendLeafNodeSubUpdate(key, n)
9,073✔
2277
        }
9,073✔
2278
}
2279

2280
// Used to force add subjects to the subject map.
2281
func (c *client) forceAddToSmap(subj string) {
4✔
2282
        c.mu.Lock()
4✔
2283
        defer c.mu.Unlock()
4✔
2284

4✔
2285
        if c.leaf.smap == nil {
4✔
2286
                return
×
2287
        }
×
2288
        n := c.leaf.smap[subj]
4✔
2289
        if n != 0 {
5✔
2290
                return
1✔
2291
        }
1✔
2292
        // Place into the map since it was not there.
2293
        c.leaf.smap[subj] = 1
3✔
2294
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2295
}
2296

2297
// Used to force remove a subject from the subject map.
2298
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2299
        c.mu.Lock()
1✔
2300
        defer c.mu.Unlock()
1✔
2301

1✔
2302
        if c.leaf.smap == nil {
1✔
2303
                return
×
2304
        }
×
2305
        n := c.leaf.smap[subj]
1✔
2306
        if n == 0 {
1✔
2307
                return
×
2308
        }
×
2309
        n--
1✔
2310
        if n == 0 {
2✔
2311
                // Remove is now zero
1✔
2312
                delete(c.leaf.smap, subj)
1✔
2313
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2314
        } else {
1✔
2315
                c.leaf.smap[subj] = n
×
2316
        }
×
2317
}
2318

2319
// Send the subscription interest change to the other side.
2320
// Lock should be held.
2321
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,077✔
2322
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,077✔
2323
        if c.isSpokeLeafNode() {
11,236✔
2324
                checkPerms := true
2,159✔
2325
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,489✔
2326
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,330✔
2327
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,330✔
2328
                                strings.HasPrefix(key, gwReplyPrefix) {
1,411✔
2329
                                checkPerms = false
81✔
2330
                        }
81✔
2331
                }
2332
                if checkPerms {
4,237✔
2333
                        var subject string
2,078✔
2334
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,489✔
2335
                                subject = key[:sep]
411✔
2336
                        } else {
2,078✔
2337
                                subject = key
1,667✔
2338
                        }
1,667✔
2339
                        if !c.canSubscribe(subject) {
2,078✔
2340
                                return
×
2341
                        }
×
2342
                }
2343
        }
2344
        // If we are here we can send over to the other side.
2345
        _b := [64]byte{}
9,077✔
2346
        b := bytes.NewBuffer(_b[:0])
9,077✔
2347
        c.writeLeafSub(b, key, n)
9,077✔
2348
        c.enqueueProto(b.Bytes())
9,077✔
2349
}
2350

2351
// Helper function to build the key.
2352
func keyFromSub(sub *subscription) string {
45,547✔
2353
        var sb strings.Builder
45,547✔
2354
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,547✔
2355
        sb.Write(sub.subject)
45,547✔
2356
        if sub.queue != nil {
49,005✔
2357
                // Just make the key subject spc group, e.g. 'foo bar'
3,458✔
2358
                sb.WriteByte(' ')
3,458✔
2359
                sb.Write(sub.queue)
3,458✔
2360
        }
3,458✔
2361
        return sb.String()
45,547✔
2362
}
2363

2364
const (
2365
        keyRoutedSub         = "R"
2366
        keyRoutedSubByte     = 'R'
2367
        keyRoutedLeafSub     = "L"
2368
        keyRoutedLeafSubByte = 'L'
2369
)
2370

2371
// Helper function to build the key that prevents collisions between normal
2372
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2373
// Keys will look like this:
2374
// "R foo"          -> plain routed sub on "foo"
2375
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2376
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2377
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2378
func keyFromSubWithOrigin(sub *subscription) string {
619,233✔
2379
        var sb strings.Builder
619,233✔
2380
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
619,233✔
2381
        leaf := len(sub.origin) > 0
619,233✔
2382
        if leaf {
635,860✔
2383
                sb.WriteByte(keyRoutedLeafSubByte)
16,627✔
2384
        } else {
619,233✔
2385
                sb.WriteByte(keyRoutedSubByte)
602,606✔
2386
        }
602,606✔
2387
        sb.WriteByte(' ')
619,233✔
2388
        sb.Write(sub.subject)
619,233✔
2389
        if sub.queue != nil {
644,570✔
2390
                sb.WriteByte(' ')
25,337✔
2391
                sb.Write(sub.queue)
25,337✔
2392
        }
25,337✔
2393
        if leaf {
635,860✔
2394
                sb.WriteByte(' ')
16,627✔
2395
                sb.Write(sub.origin)
16,627✔
2396
        }
16,627✔
2397
        return sb.String()
619,233✔
2398
}
2399

2400
// Lock should be held.
2401
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
34,957✔
2402
        if key == _EMPTY_ {
34,957✔
2403
                return
×
2404
        }
×
2405
        if n > 0 {
66,490✔
2406
                w.WriteString("LS+ " + key)
31,533✔
2407
                // Check for queue semantics, if found write n.
31,533✔
2408
                if strings.Contains(key, " ") {
33,675✔
2409
                        w.WriteString(" ")
2,142✔
2410
                        var b [12]byte
2,142✔
2411
                        var i = len(b)
2,142✔
2412
                        for l := n; l > 0; l /= 10 {
5,180✔
2413
                                i--
3,038✔
2414
                                b[i] = digits[l%10]
3,038✔
2415
                        }
3,038✔
2416
                        w.Write(b[i:])
2,142✔
2417
                        if c.trace {
2,142✔
2418
                                arg := fmt.Sprintf("%s %d", key, n)
×
2419
                                c.traceOutOp("LS+", []byte(arg))
×
2420
                        }
×
2421
                } else if c.trace {
29,587✔
2422
                        c.traceOutOp("LS+", []byte(key))
196✔
2423
                }
196✔
2424
        } else {
3,424✔
2425
                w.WriteString("LS- " + key)
3,424✔
2426
                if c.trace {
3,438✔
2427
                        c.traceOutOp("LS-", []byte(key))
14✔
2428
                }
14✔
2429
        }
2430
        w.WriteString(CR_LF)
34,957✔
2431
}
2432

2433
// processLeafSub will process an inbound sub request for the remote leaf node.
2434
func (c *client) processLeafSub(argo []byte) (err error) {
31,354✔
2435
        // Indicate activity.
31,354✔
2436
        c.in.subs++
31,354✔
2437

31,354✔
2438
        srv := c.srv
31,354✔
2439
        if srv == nil {
31,354✔
2440
                return nil
×
2441
        }
×
2442

2443
        // Copy so we do not reference a potentially large buffer
2444
        arg := make([]byte, len(argo))
31,354✔
2445
        copy(arg, argo)
31,354✔
2446

31,354✔
2447
        args := splitArg(arg)
31,354✔
2448
        sub := &subscription{client: c}
31,354✔
2449

31,354✔
2450
        delta := int32(1)
31,354✔
2451
        switch len(args) {
31,354✔
2452
        case 1:
29,260✔
2453
                sub.queue = nil
29,260✔
2454
        case 3:
2,094✔
2455
                sub.queue = args[1]
2,094✔
2456
                sub.qw = int32(parseSize(args[2]))
2,094✔
2457
                // TODO: (ik) We should have a non empty queue name and a queue
2,094✔
2458
                // weight >= 1. For 2.11, we may want to return an error if that
2,094✔
2459
                // is not the case, but for now just overwrite `delta` if queue
2,094✔
2460
                // weight is greater than 1 (it is possible after a reconnect/
2,094✔
2461
                // server restart to receive a queue weight > 1 for a new sub).
2,094✔
2462
                if sub.qw > 1 {
3,692✔
2463
                        delta = sub.qw
1,598✔
2464
                }
1,598✔
2465
        default:
×
2466
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2467
        }
2468
        sub.subject = args[0]
31,354✔
2469

31,354✔
2470
        c.mu.Lock()
31,354✔
2471
        if c.isClosed() {
31,385✔
2472
                c.mu.Unlock()
31✔
2473
                return nil
31✔
2474
        }
31✔
2475

2476
        acc := c.acc
31,323✔
2477
        // Check if we have a loop.
31,323✔
2478
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,323✔
2479

31,323✔
2480
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,330✔
2481
                c.mu.Unlock()
7✔
2482
                c.handleLeafNodeLoop(true)
7✔
2483
                return nil
7✔
2484
        }
7✔
2485

2486
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2487
        checkPerms := true
31,316✔
2488
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
59,987✔
2489
                if ldsPrefix ||
28,671✔
2490
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,671✔
2491
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,573✔
2492
                        checkPerms = false
1,902✔
2493
                }
1,902✔
2494
        }
2495

2496
        // If we are a hub check that we can publish to this subject.
2497
        if checkPerms {
60,730✔
2498
                subj := string(sub.subject)
29,414✔
2499
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
29,705✔
2500
                        c.mu.Unlock()
291✔
2501
                        c.leafSubPermViolation(sub.subject)
291✔
2502
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
291✔
2503
                        return nil
291✔
2504
                }
291✔
2505
        }
2506

2507
        // Check if we have a maximum on the number of subscriptions.
2508
        if c.subsAtLimit() {
31,033✔
2509
                c.mu.Unlock()
8✔
2510
                c.maxSubsExceeded()
8✔
2511
                return nil
8✔
2512
        }
8✔
2513

2514
        // If we have an origin cluster associated mark that in the sub.
2515
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,030✔
2516
                sub.origin = []byte(rc)
28,013✔
2517
        }
28,013✔
2518

2519
        // Like Routes, we store local subs by account and subject and optionally queue name.
2520
        // If we have a queue it will have a trailing weight which we do not want.
2521
        if sub.queue != nil {
32,833✔
2522
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,816✔
2523
        } else {
31,017✔
2524
                sub.sid = arg
29,201✔
2525
        }
29,201✔
2526
        key := bytesToString(sub.sid)
31,017✔
2527
        osub := c.subs[key]
31,017✔
2528
        if osub == nil {
60,580✔
2529
                c.subs[key] = sub
29,563✔
2530
                // Now place into the account sl.
29,563✔
2531
                if err := acc.sl.Insert(sub); err != nil {
29,563✔
2532
                        delete(c.subs, key)
×
2533
                        c.mu.Unlock()
×
2534
                        c.Errorf("Could not insert subscription: %v", err)
×
2535
                        c.sendErr("Invalid Subscription")
×
2536
                        return nil
×
2537
                }
×
2538
        } else if sub.queue != nil {
2,907✔
2539
                // For a queue we need to update the weight.
1,453✔
2540
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,453✔
2541
                atomic.StoreInt32(&osub.qw, sub.qw)
1,453✔
2542
                acc.sl.UpdateRemoteQSub(osub)
1,453✔
2543
        }
1,453✔
2544
        spoke := c.isSpokeLeafNode()
31,017✔
2545
        c.mu.Unlock()
31,017✔
2546

31,017✔
2547
        // Only add in shadow subs if a new sub or qsub.
31,017✔
2548
        if osub == nil {
60,580✔
2549
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,563✔
2550
                        c.Errorf(err.Error())
×
2551
                }
×
2552
        }
2553

2554
        // If we are not solicited, treat leaf node subscriptions similar to a
2555
        // client subscription, meaning we forward them to routes, gateways and
2556
        // other leaf nodes as needed.
2557
        if !spoke {
41,886✔
2558
                // If we are routing add to the route map for the associated account.
10,869✔
2559
                srv.updateRouteSubscriptionMap(acc, sub, delta)
10,869✔
2560
                if srv.gateway.enabled {
12,305✔
2561
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,436✔
2562
                }
1,436✔
2563
        }
2564
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2565
        // and non-solicited state in this call so we will do the right thing.
2566
        acc.updateLeafNodes(sub, delta)
31,017✔
2567

31,017✔
2568
        return nil
31,017✔
2569
}
2570

2571
// If the leafnode is a solicited, set the connect delay based on default
2572
// or private option (for tests). Sends the error to the other side, log and
2573
// close the connection.
2574
func (c *client) handleLeafNodeLoop(sendErr bool) {
17✔
2575
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
17✔
2576
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
17✔
2577
        if sendErr {
26✔
2578
                c.sendErr(errTxt)
9✔
2579
        }
9✔
2580

2581
        c.Errorf(errTxt)
17✔
2582
        // If we are here with "sendErr" false, it means that this is the server
17✔
2583
        // that received the error. The other side will have closed the connection,
17✔
2584
        // but does not hurt to close here too.
17✔
2585
        c.closeConnection(ProtocolViolation)
17✔
2586
}
2587

2588
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2589
func (c *client) processLeafUnsub(arg []byte) error {
3,188✔
2590
        // Indicate any activity, so pub and sub or unsubs.
3,188✔
2591
        c.in.subs++
3,188✔
2592

3,188✔
2593
        acc := c.acc
3,188✔
2594
        srv := c.srv
3,188✔
2595

3,188✔
2596
        c.mu.Lock()
3,188✔
2597
        if c.isClosed() {
3,219✔
2598
                c.mu.Unlock()
31✔
2599
                return nil
31✔
2600
        }
31✔
2601

2602
        spoke := c.isSpokeLeafNode()
3,157✔
2603
        // We store local subs by account and subject and optionally queue name.
3,157✔
2604
        // LS- will have the arg exactly as the key.
3,157✔
2605
        sub, ok := c.subs[string(arg)]
3,157✔
2606
        if !ok {
3,162✔
2607
                // If not found, don't try to update routes/gws/leaf nodes.
5✔
2608
                c.mu.Unlock()
5✔
2609
                return nil
5✔
2610
        }
5✔
2611
        delta := int32(1)
3,152✔
2612
        if len(sub.queue) > 0 {
3,477✔
2613
                delta = sub.qw
325✔
2614
        }
325✔
2615
        c.mu.Unlock()
3,152✔
2616

3,152✔
2617
        c.unsubscribe(acc, sub, true, true)
3,152✔
2618
        if !spoke {
4,153✔
2619
                // If we are routing subtract from the route map for the associated account.
1,001✔
2620
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,001✔
2621
                // Gateways
1,001✔
2622
                if srv.gateway.enabled {
1,258✔
2623
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
257✔
2624
                }
257✔
2625
        }
2626
        // Now check on leafnode updates for other leaf nodes.
2627
        acc.updateLeafNodes(sub, -delta)
3,152✔
2628
        return nil
3,152✔
2629
}
2630

2631
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
488✔
2632
        // Unroll splitArgs to avoid runtime/heap issues
488✔
2633
        a := [MAX_MSG_ARGS][]byte{}
488✔
2634
        args := a[:0]
488✔
2635
        start := -1
488✔
2636
        for i, b := range arg {
32,071✔
2637
                switch b {
31,583✔
2638
                case ' ', '\t', '\r', '\n':
1,394✔
2639
                        if start >= 0 {
2,788✔
2640
                                args = append(args, arg[start:i])
1,394✔
2641
                                start = -1
1,394✔
2642
                        }
1,394✔
2643
                default:
30,189✔
2644
                        if start < 0 {
32,071✔
2645
                                start = i
1,882✔
2646
                        }
1,882✔
2647
                }
2648
        }
2649
        if start >= 0 {
976✔
2650
                args = append(args, arg[start:])
488✔
2651
        }
488✔
2652

2653
        c.pa.arg = arg
488✔
2654
        switch len(args) {
488✔
2655
        case 0, 1, 2:
×
2656
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2657
        case 3:
88✔
2658
                c.pa.reply = nil
88✔
2659
                c.pa.queues = nil
88✔
2660
                c.pa.hdb = args[1]
88✔
2661
                c.pa.hdr = parseSize(args[1])
88✔
2662
                c.pa.szb = args[2]
88✔
2663
                c.pa.size = parseSize(args[2])
88✔
2664
        case 4:
386✔
2665
                c.pa.reply = args[1]
386✔
2666
                c.pa.queues = nil
386✔
2667
                c.pa.hdb = args[2]
386✔
2668
                c.pa.hdr = parseSize(args[2])
386✔
2669
                c.pa.szb = args[3]
386✔
2670
                c.pa.size = parseSize(args[3])
386✔
2671
        default:
14✔
2672
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2673
                if len(args[1]) != 1 {
14✔
2674
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2675
                }
×
2676
                switch args[1][0] {
14✔
2677
                case '+':
4✔
2678
                        c.pa.reply = args[2]
4✔
2679
                case '|':
10✔
2680
                        c.pa.reply = nil
10✔
2681
                default:
×
2682
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2683
                }
2684
                // Grab header size.
2685
                c.pa.hdb = args[len(args)-2]
14✔
2686
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2687

14✔
2688
                // Grab size.
14✔
2689
                c.pa.szb = args[len(args)-1]
14✔
2690
                c.pa.size = parseSize(c.pa.szb)
14✔
2691

14✔
2692
                // Grab queue names.
14✔
2693
                if c.pa.reply != nil {
18✔
2694
                        c.pa.queues = args[3 : len(args)-2]
4✔
2695
                } else {
14✔
2696
                        c.pa.queues = args[2 : len(args)-2]
10✔
2697
                }
10✔
2698
        }
2699
        if c.pa.hdr < 0 {
488✔
2700
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2701
        }
×
2702
        if c.pa.size < 0 {
488✔
2703
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2704
        }
×
2705
        if c.pa.hdr > c.pa.size {
488✔
2706
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2707
        }
×
2708

2709
        // Common ones processed after check for arg length
2710
        c.pa.subject = args[0]
488✔
2711

488✔
2712
        return nil
488✔
2713
}
2714

2715
func (c *client) processLeafMsgArgs(arg []byte) error {
60,148✔
2716
        // Unroll splitArgs to avoid runtime/heap issues
60,148✔
2717
        a := [MAX_MSG_ARGS][]byte{}
60,148✔
2718
        args := a[:0]
60,148✔
2719
        start := -1
60,148✔
2720
        for i, b := range arg {
1,741,535✔
2721
                switch b {
1,681,387✔
2722
                case ' ', '\t', '\r', '\n':
107,621✔
2723
                        if start >= 0 {
215,242✔
2724
                                args = append(args, arg[start:i])
107,621✔
2725
                                start = -1
107,621✔
2726
                        }
107,621✔
2727
                default:
1,573,766✔
2728
                        if start < 0 {
1,741,535✔
2729
                                start = i
167,769✔
2730
                        }
167,769✔
2731
                }
2732
        }
2733
        if start >= 0 {
120,296✔
2734
                args = append(args, arg[start:])
60,148✔
2735
        }
60,148✔
2736

2737
        c.pa.arg = arg
60,148✔
2738
        switch len(args) {
60,148✔
2739
        case 0, 1:
×
2740
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2741
        case 2:
35,375✔
2742
                c.pa.reply = nil
35,375✔
2743
                c.pa.queues = nil
35,375✔
2744
                c.pa.szb = args[1]
35,375✔
2745
                c.pa.size = parseSize(args[1])
35,375✔
2746
        case 3:
2,234✔
2747
                c.pa.reply = args[1]
2,234✔
2748
                c.pa.queues = nil
2,234✔
2749
                c.pa.szb = args[2]
2,234✔
2750
                c.pa.size = parseSize(args[2])
2,234✔
2751
        default:
22,539✔
2752
                // args[1] is our reply indicator. Should be + or | normally.
22,539✔
2753
                if len(args[1]) != 1 {
22,539✔
2754
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2755
                }
×
2756
                switch args[1][0] {
22,539✔
2757
                case '+':
161✔
2758
                        c.pa.reply = args[2]
161✔
2759
                case '|':
22,378✔
2760
                        c.pa.reply = nil
22,378✔
2761
                default:
×
2762
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2763
                }
2764
                // Grab size.
2765
                c.pa.szb = args[len(args)-1]
22,539✔
2766
                c.pa.size = parseSize(c.pa.szb)
22,539✔
2767

22,539✔
2768
                // Grab queue names.
22,539✔
2769
                if c.pa.reply != nil {
22,700✔
2770
                        c.pa.queues = args[3 : len(args)-1]
161✔
2771
                } else {
22,539✔
2772
                        c.pa.queues = args[2 : len(args)-1]
22,378✔
2773
                }
22,378✔
2774
        }
2775
        if c.pa.size < 0 {
60,148✔
2776
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2777
        }
×
2778

2779
        // Common ones processed after check for arg length
2780
        c.pa.subject = args[0]
60,148✔
2781

60,148✔
2782
        return nil
60,148✔
2783
}
2784

2785
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2786
func (c *client) processInboundLeafMsg(msg []byte) {
59,061✔
2787
        // Update statistics
59,061✔
2788
        // The msg includes the CR_LF, so pull back out for accounting.
59,061✔
2789
        c.in.msgs++
59,061✔
2790
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
59,061✔
2791

59,061✔
2792
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
59,061✔
2793

59,061✔
2794
        // Mostly under testing scenarios.
59,061✔
2795
        if srv == nil || acc == nil {
59,063✔
2796
                return
2✔
2797
        }
2✔
2798

2799
        // Match the subscriptions. We will use our own L1 map if
2800
        // it's still valid, avoiding contention on the shared sublist.
2801
        var r *SublistResult
59,059✔
2802
        var ok bool
59,059✔
2803

59,059✔
2804
        genid := atomic.LoadUint64(&c.acc.sl.genid)
59,059✔
2805
        if genid == c.in.genid && c.in.results != nil {
115,737✔
2806
                r, ok = c.in.results[subject]
56,678✔
2807
        } else {
59,059✔
2808
                // Reset our L1 completely.
2,381✔
2809
                c.in.results = make(map[string]*SublistResult)
2,381✔
2810
                c.in.genid = genid
2,381✔
2811
        }
2,381✔
2812

2813
        // Go back to the sublist data structure.
2814
        if !ok {
92,066✔
2815
                r = c.acc.sl.Match(subject)
33,007✔
2816
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
33,007✔
2817
                if len(c.in.results) >= maxResultCacheSize {
33,830✔
2818
                        n := 0
823✔
2819
                        for subj := range c.in.results {
27,982✔
2820
                                delete(c.in.results, subj)
27,159✔
2821
                                if n++; n > pruneSize {
27,982✔
2822
                                        break
823✔
2823
                                }
2824
                        }
2825
                }
2826
                // Then add the new cache entry.
2827
                c.in.results[subject] = r
33,007✔
2828
        }
2829

2830
        // Collect queue names if needed.
2831
        var qnames [][]byte
59,059✔
2832

59,059✔
2833
        // Check for no interest, short circuit if so.
59,059✔
2834
        // This is the fanout scale.
59,059✔
2835
        if len(r.psubs)+len(r.qsubs) > 0 {
117,796✔
2836
                flag := pmrNoFlag
58,737✔
2837
                // If we have queue subs in this cluster, then if we run in gateway
58,737✔
2838
                // mode and the remote gateways have queue subs, then we need to
58,737✔
2839
                // collect the queue groups this message was sent to so that we
58,737✔
2840
                // exclude them when sending to gateways.
58,737✔
2841
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
58,737✔
2842
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
70,999✔
2843
                        flag |= pmrCollectQueueNames
12,262✔
2844
                }
12,262✔
2845
                // If this is a mapped subject that means the mapped interest
2846
                // is what got us here, but this might not have a queue designation
2847
                // If that is the case, make sure we ignore to process local queue subscribers.
2848
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
59,038✔
2849
                        flag |= pmrIgnoreEmptyQueueFilter
301✔
2850
                }
301✔
2851
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
58,737✔
2852
        }
2853

2854
        // Now deal with gateways
2855
        if c.srv.gateway.enabled {
72,455✔
2856
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames)
13,396✔
2857
        }
13,396✔
2858
}
2859

2860
// Handles a subscription permission violation.
2861
// See leafPermViolation() for details.
2862
func (c *client) leafSubPermViolation(subj []byte) {
291✔
2863
        c.leafPermViolation(false, subj)
291✔
2864
}
291✔
2865

2866
// Common function to process publish or subscribe leafnode permission violation.
2867
// Sends the permission violation error to the remote, logs it and closes the connection.
2868
// If this is from a server soliciting, the reconnection will be delayed.
2869
func (c *client) leafPermViolation(pub bool, subj []byte) {
291✔
2870
        if c.isSpokeLeafNode() {
582✔
2871
                // For spokes these are no-ops since the hub server told us our permissions.
291✔
2872
                // We just need to not send these over to the other side since we will get cutoff.
291✔
2873
                return
291✔
2874
        }
291✔
2875
        // FIXME(dlc) ?
2876
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2877
        var action string
×
2878
        if pub {
×
2879
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2880
                action = "Publish"
×
2881
        } else {
×
2882
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2883
                action = "Subscription"
×
2884
        }
×
2885
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2886
        // TODO: add a new close reason that is more appropriate?
×
2887
        c.closeConnection(ProtocolViolation)
×
2888
}
2889

2890
// Invoked from generic processErr() for LEAF connections.
2891
func (c *client) leafProcessErr(errStr string) {
29✔
2892
        // Check if we got a cluster name collision.
29✔
2893
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
33✔
2894
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2895
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2896
                return
4✔
2897
        }
4✔
2898

2899
        // We will look for Loop detected error coming from the other side.
2900
        // If we solicit, set the connect delay.
2901
        if !strings.Contains(errStr, "Loop detected") {
42✔
2902
                return
17✔
2903
        }
17✔
2904
        c.handleLeafNodeLoop(false)
8✔
2905
}
2906

2907
// If this leaf connection solicits, sets the connect delay to the given value,
2908
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2909
// Returns the connection's account name and delay.
2910
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
21✔
2911
        c.mu.Lock()
21✔
2912
        if c.isSolicitedLeafNode() {
33✔
2913
                if s := c.srv; s != nil {
24✔
2914
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
17✔
2915
                                delay = srvdelay
5✔
2916
                        }
5✔
2917
                }
2918
                c.leaf.remote.setConnectDelay(delay)
12✔
2919
        }
2920
        accName := c.acc.Name
21✔
2921
        c.mu.Unlock()
21✔
2922
        return accName, delay
21✔
2923
}
2924

2925
// For the given remote Leafnode configuration, this function returns
2926
// if TLS is required, and if so, will return a clone of the TLS Config
2927
// (since some fields will be changed during handshake), the TLS server
2928
// name that is remembered, and the TLS timeout.
2929
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,776✔
2930
        var (
1,776✔
2931
                tlsConfig  *tls.Config
1,776✔
2932
                tlsName    string
1,776✔
2933
                tlsTimeout float64
1,776✔
2934
        )
1,776✔
2935

1,776✔
2936
        remote.RLock()
1,776✔
2937
        defer remote.RUnlock()
1,776✔
2938

1,776✔
2939
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,776✔
2940
        if tlsRequired {
1,862✔
2941
                if remote.TLSConfig != nil {
138✔
2942
                        tlsConfig = remote.TLSConfig.Clone()
52✔
2943
                } else {
86✔
2944
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
34✔
2945
                }
34✔
2946
                tlsName = remote.tlsName
86✔
2947
                tlsTimeout = remote.TLSTimeout
86✔
2948
                if tlsTimeout == 0 {
137✔
2949
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
51✔
2950
                }
51✔
2951
        }
2952

2953
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,776✔
2954
}
2955

2956
// Initiates the LeafNode Websocket connection by:
2957
// - doing the TLS handshake if needed
2958
// - sending the HTTP request
2959
// - waiting for the HTTP response
2960
//
2961
// Since some bufio reader is used to consume the HTTP response, this function
2962
// returns the slice of buffered bytes (if any) so that the readLoop that will
2963
// be started after that consume those first before reading from the socket.
2964
// The boolean
2965
//
2966
// Lock held on entry.
2967
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
2968
        remote.RLock()
43✔
2969
        compress := remote.Websocket.Compression
43✔
2970
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
2971
        noMasking := remote.Websocket.NoMasking
43✔
2972
        infoTimeout := remote.FirstInfoTimeout
43✔
2973
        remote.RUnlock()
43✔
2974
        // Will do the client-side TLS handshake if needed.
43✔
2975
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
2976
        if err != nil {
47✔
2977
                // 0 will indicate that the connection was already closed
4✔
2978
                return nil, 0, err
4✔
2979
        }
4✔
2980

2981
        // For http request, we need the passed URL to contain either http or https scheme.
2982
        scheme := "http"
39✔
2983
        if tlsRequired {
47✔
2984
                scheme = "https"
8✔
2985
        }
8✔
2986
        // We will use the `/leafnode` path to tell the accepting WS server that it should
2987
        // create a LEAF connection, not a CLIENT.
2988
        // In case we use the user's URL path in the future, make sure we append the user's
2989
        // path to our `/leafnode` path.
2990
        lpath := leafNodeWSPath
39✔
2991
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
2992
                if curPath[0] == '/' {
42✔
2993
                        curPath = curPath[1:]
21✔
2994
                }
21✔
2995
                lpath = path.Join(curPath, lpath)
21✔
2996
        } else {
18✔
2997
                lpath = lpath[1:]
18✔
2998
        }
18✔
2999
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3000
        u, _ := url.Parse(ustr)
39✔
3001
        req := &http.Request{
39✔
3002
                Method:     "GET",
39✔
3003
                URL:        u,
39✔
3004
                Proto:      "HTTP/1.1",
39✔
3005
                ProtoMajor: 1,
39✔
3006
                ProtoMinor: 1,
39✔
3007
                Header:     make(http.Header),
39✔
3008
                Host:       u.Host,
39✔
3009
        }
39✔
3010
        wsKey, err := wsMakeChallengeKey()
39✔
3011
        if err != nil {
39✔
3012
                return nil, WriteError, err
×
3013
        }
×
3014

3015
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3016
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3017
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3018
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3019
        if compress {
48✔
3020
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3021
        }
9✔
3022
        if noMasking {
49✔
3023
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3024
        }
10✔
3025
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3026
        if err := req.Write(c.nc); err != nil {
39✔
3027
                return nil, WriteError, err
×
3028
        }
×
3029

3030
        var resp *http.Response
39✔
3031

39✔
3032
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3033
        resp, err = http.ReadResponse(br, req)
39✔
3034
        if err == nil &&
39✔
3035
                (resp.StatusCode != 101 ||
39✔
3036
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3037
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3038
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3039

1✔
3040
                err = fmt.Errorf("invalid websocket connection")
1✔
3041
        }
1✔
3042
        // Check compression extension...
3043
        if err == nil && c.ws.compress {
48✔
3044
                // Check that not only permessage-deflate extension is present, but that
9✔
3045
                // we also have server and client no context take over.
9✔
3046
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3047

9✔
3048
                // If server does not support compression, then simply disable it in our side.
9✔
3049
                if !srvCompress {
13✔
3050
                        c.ws.compress = false
4✔
3051
                } else if !noCtxTakeover {
9✔
3052
                        err = fmt.Errorf("compression negotiation error")
×
3053
                }
×
3054
        }
3055
        // Same for no masking...
3056
        if err == nil && noMasking {
49✔
3057
                // Check if server accepts no masking
10✔
3058
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3059
                        // Nope, need to mask our writes as any client would do.
1✔
3060
                        c.ws.maskwrite = true
1✔
3061
                }
1✔
3062
        }
3063
        if resp != nil {
67✔
3064
                resp.Body.Close()
28✔
3065
        }
28✔
3066
        if err != nil {
51✔
3067
                return nil, ReadError, err
12✔
3068
        }
12✔
3069
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3070

27✔
3071
        var preBuf []byte
27✔
3072
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3073
        if n := br.Buffered(); n != 0 {
27✔
3074
                preBuf, _ = br.Peek(n)
×
3075
        }
×
3076
        return preBuf, 0, nil
27✔
3077
}
3078

3079
const connectProcessTimeout = 2 * time.Second
3080

3081
// This is invoked for remote LEAF remote connections after processing the INFO
3082
// protocol.
3083
func (s *Server) leafNodeResumeConnectProcess(c *client) {
622✔
3084
        clusterName := s.ClusterName()
622✔
3085

622✔
3086
        c.mu.Lock()
622✔
3087
        if c.isClosed() {
622✔
3088
                c.mu.Unlock()
×
3089
                return
×
3090
        }
×
3091
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
624✔
3092
                c.mu.Unlock()
2✔
3093
                c.closeConnection(WriteError)
2✔
3094
                return
2✔
3095
        }
2✔
3096

3097
        // Spin up the write loop.
3098
        s.startGoRoutine(func() { c.writeLoop() })
1,240✔
3099

3100
        // timeout leafNodeFinishConnectProcess
3101
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
620✔
3102
                c.mu.Lock()
×
3103
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3104
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3105
                        c.mu.Unlock()
×
3106
                        return
×
3107
                }
×
3108
                clearTimer(&c.ping.tmr)
×
3109
                closed := c.isClosed()
×
3110
                c.mu.Unlock()
×
3111
                if !closed {
×
3112
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3113
                        c.closeConnection(StaleConnection)
×
3114
                }
×
3115
        })
3116
        c.mu.Unlock()
620✔
3117
        c.Debugf("Remote leafnode connect msg sent")
620✔
3118
}
3119

3120
// This is invoked for remote LEAF connections after processing the INFO
3121
// protocol and leafNodeResumeConnectProcess.
3122
// This will send LS+ the CONNECT protocol and register the leaf node.
3123
func (s *Server) leafNodeFinishConnectProcess(c *client) {
604✔
3124
        c.mu.Lock()
604✔
3125
        if !c.flags.setIfNotSet(connectProcessFinished) {
604✔
3126
                c.mu.Unlock()
×
3127
                return
×
3128
        }
×
3129
        if c.isClosed() {
604✔
3130
                c.mu.Unlock()
×
3131
                s.removeLeafNodeConnection(c)
×
3132
                return
×
3133
        }
×
3134
        remote := c.leaf.remote
604✔
3135
        // Check if we will need to send the system connect event.
604✔
3136
        remote.RLock()
604✔
3137
        sendSysConnectEvent := remote.Hub
604✔
3138
        remote.RUnlock()
604✔
3139

604✔
3140
        // Capture account before releasing lock
604✔
3141
        acc := c.acc
604✔
3142
        // cancel connectProcessTimeout
604✔
3143
        clearTimer(&c.ping.tmr)
604✔
3144
        c.mu.Unlock()
604✔
3145

604✔
3146
        // Make sure we register with the account here.
604✔
3147
        if err := c.registerWithAccount(acc); err != nil {
606✔
3148
                if err == ErrTooManyAccountConnections {
2✔
3149
                        c.maxAccountConnExceeded()
×
3150
                        return
×
3151
                } else if err == ErrLeafNodeLoop {
4✔
3152
                        c.handleLeafNodeLoop(true)
2✔
3153
                        return
2✔
3154
                }
2✔
3155
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3156
                c.closeConnection(ProtocolViolation)
×
3157
                return
×
3158
        }
3159
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
602✔
3160
        s.initLeafNodeSmapAndSendSubs(c)
602✔
3161
        if sendSysConnectEvent {
608✔
3162
                s.sendLeafNodeConnect(acc)
6✔
3163
        }
6✔
3164

3165
        // The above functions are not atomically under the client
3166
        // lock doing those operations. It is possible - since we
3167
        // have started the read/write loops - that the connection
3168
        // is closed before or in between. This would leave the
3169
        // closed LN connection possible registered with the account
3170
        // and/or the server's leafs map. So check if connection
3171
        // is closed, and if so, manually cleanup.
3172
        c.mu.Lock()
602✔
3173
        closed := c.isClosed()
602✔
3174
        if !closed {
1,204✔
3175
                c.setFirstPingTimer()
602✔
3176
        }
602✔
3177
        c.mu.Unlock()
602✔
3178
        if closed {
602✔
3179
                s.removeLeafNodeConnection(c)
×
3180
                if prev := acc.removeClient(c); prev == 1 {
×
3181
                        s.decActiveAccounts()
×
3182
                }
×
3183
        }
3184
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc