• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 12740792186

10 Jan 2025 10:21PM UTC coverage: 85.519% (-0.002%) from 85.521%
12740792186

push

github

web-flow
Test: expected a fully formed cluster error (#6364)

Cluster tests would sometimes fail with the following error message:
```
jetstream_helpers_test.go:774: Expected a fully formed cluster, only 0 of 3 peers seen
```

This was due to a logic error in the tests, where it was assumed that
once a meta leader is chosen it will remain the same. That's not always
true as the meta leader can still shift around. So, keep fetching the
latest leader and check the peers on that.

Previously it would fail by selecting a meta leader, the meta leader
would change, and the follower would then be used to request peers on.
But because of the following lines in `JetStreamClusterPeers()`:
```go
	cc := js.cluster
	if !cc.isLeader() || cc.meta == nil {
		return nil
	}
```
You'd get no peers at all, leading into that error message.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

68599 of 80215 relevant lines covered (85.52%)

915532.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/server/leafnode.go
1
// Copyright 2019-2024 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
1,903✔
120
        return c.kind == LEAF && c.leaf.remote != nil
1,903✔
121
}
1,903✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
15,799,510✔
126
        return c.kind == LEAF && c.leaf.isSpoke
15,799,510✔
127
}
15,799,510✔
128

129
func (c *client) isHubLeafNode() bool {
16,869✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
16,869✔
131
}
16,869✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
521✔
135
        sysAccName := _EMPTY_
521✔
136
        sAcc := s.SystemAccount()
521✔
137
        if sAcc != nil {
1,019✔
138
                sysAccName = sAcc.Name
498✔
139
        }
498✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,173✔
141
                s.mu.Lock()
652✔
142
                remote := newLeafNodeCfg(r)
652✔
143
                creds := remote.Credentials
652✔
144
                accName := remote.LocalAccount
652✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
652✔
146
                // Print notice if
652✔
147
                if isSysAccRemote {
741✔
148
                        if len(remote.DenyExports) > 0 {
90✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
90✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
652✔
156
                if creds != _EMPTY_ {
695✔
157
                        contents, err := os.ReadFile(creds)
43✔
158
                        defer wipeSlice(contents)
43✔
159
                        if err != nil {
43✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
43✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
43✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
43✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
47✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
39✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
45✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
652✔
178
        }
179
        for _, r := range remotes {
1,173✔
180
                remote := addRemote(r, r.LocalAccount == sysAccName)
652✔
181
                s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,304✔
182
        }
183
}
184

185
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
1,712✔
186
        for _, ri := range s.getOpts().LeafNode.Remotes {
3,782✔
187
                // FIXME(dlc) - What about auth changes?
2,070✔
188
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
3,782✔
189
                        return true
1,712✔
190
                }
1,712✔
191
        }
192
        return false
×
193
}
194

195
// Ensure that leafnode is properly configured.
196
func validateLeafNode(o *Options) error {
6,483✔
197
        if err := validateLeafNodeAuthOptions(o); err != nil {
6,485✔
198
                return err
2✔
199
        }
2✔
200

201
        // Users can bind to any local account, if its empty we will assume the $G account.
202
        for _, r := range o.LeafNode.Remotes {
7,157✔
203
                if r.LocalAccount == _EMPTY_ {
1,092✔
204
                        r.LocalAccount = globalAccountName
416✔
205
                }
416✔
206
        }
207

208
        // In local config mode, check that leafnode configuration refers to accounts that exist.
209
        if len(o.TrustedOperators) == 0 {
12,664✔
210
                accNames := map[string]struct{}{}
6,183✔
211
                for _, a := range o.Accounts {
11,296✔
212
                        accNames[a.Name] = struct{}{}
5,113✔
213
                }
5,113✔
214
                // global account is always created
215
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
6,183✔
216
                // in the context of leaf nodes, empty account means global account
6,183✔
217
                accNames[_EMPTY_] = struct{}{}
6,183✔
218
                // system account either exists or, if not disabled, will be created
6,183✔
219
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
10,766✔
220
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
4,583✔
221
                }
4,583✔
222
                checkAccountExists := func(accName string, cfgType string) error {
13,047✔
223
                        if _, ok := accNames[accName]; !ok {
6,866✔
224
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
225
                        }
2✔
226
                        return nil
6,862✔
227
                }
228
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
6,184✔
229
                        return err
1✔
230
                }
1✔
231
                for _, lu := range o.LeafNode.Users {
6,191✔
232
                        if lu.Account == nil { // means global account
12✔
233
                                continue
3✔
234
                        }
235
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
6✔
236
                                return err
×
237
                        }
×
238
                }
239
                for _, r := range o.LeafNode.Remotes {
6,857✔
240
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
676✔
241
                                return err
1✔
242
                        }
1✔
243
                }
244
        } else {
298✔
245
                if len(o.LeafNode.Users) != 0 {
299✔
246
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
247
                }
1✔
248
                for _, r := range o.LeafNode.Remotes {
298✔
249
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
250
                                return fmt.Errorf(
1✔
251
                                        "operator mode requires account nkeys in remotes. " +
1✔
252
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
253
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
254
                        }
1✔
255
                }
256
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
297✔
257
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
258
                }
1✔
259
        }
260

261
        // Validate compression settings
262
        if o.LeafNode.Compression.Mode != _EMPTY_ {
9,775✔
263
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,304✔
264
                        return err
5✔
265
                }
5✔
266
        }
267

268
        // If a remote has a websocket scheme, all need to have it.
269
        for _, rcfg := range o.LeafNode.Remotes {
7,145✔
270
                if len(rcfg.URLs) >= 2 {
872✔
271
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
198✔
272
                        for i := 1; i < len(rcfg.URLs); i++ {
641✔
273
                                u := rcfg.URLs[i]
443✔
274
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
450✔
275
                                        ok = false
7✔
276
                                        break
7✔
277
                                }
278
                        }
279
                        if !ok {
205✔
280
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
281
                        }
7✔
282
                }
283
                // Validate compression settings
284
                if rcfg.Compression.Mode != _EMPTY_ {
1,334✔
285
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
672✔
286
                                return err
5✔
287
                        }
5✔
288
                }
289
        }
290

291
        if o.LeafNode.Port == 0 {
10,220✔
292
                return nil
3,761✔
293
        }
3,761✔
294

295
        // If MinVersion is defined, check that it is valid.
296
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
2,702✔
297
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        // The checks below will be done only when detecting that we are configured
303
        // with gateways. So if an option validation needs to be done regardless,
304
        // it MUST be done before this point!
305

306
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
4,730✔
307
                return nil
2,034✔
308
        }
2,034✔
309
        // If we are here we have both leaf nodes and gateways defined, make sure there
310
        // is a system account defined.
311
        if o.SystemAccount == _EMPTY_ {
663✔
312
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
313
        }
1✔
314
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
661✔
315
                return fmt.Errorf("leafnode: %v", err)
×
316
        }
×
317
        return nil
661✔
318
}
319

320
func checkLeafMinVersionConfig(mv string) error {
8✔
321
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
322
                if err != nil {
6✔
323
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
324
                } else {
4✔
325
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
326
                }
2✔
327
        }
328
        return nil
4✔
329
}
330

331
// Used to validate user names in LeafNode configuration.
332
// - rejects mix of single and multiple users.
333
// - rejects duplicate user names.
334
func validateLeafNodeAuthOptions(o *Options) error {
6,524✔
335
        if len(o.LeafNode.Users) == 0 {
13,029✔
336
                return nil
6,505✔
337
        }
6,505✔
338
        if o.LeafNode.Username != _EMPTY_ {
21✔
339
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
340
        }
2✔
341
        if o.LeafNode.Nkey != _EMPTY_ {
17✔
342
                return fmt.Errorf("can not have a single nkey and a users array")
×
343
        }
×
344
        users := map[string]struct{}{}
17✔
345
        for _, u := range o.LeafNode.Users {
40✔
346
                if _, exists := users[u.Username]; exists {
25✔
347
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
348
                }
2✔
349
                users[u.Username] = struct{}{}
21✔
350
        }
351
        return nil
15✔
352
}
353

354
// Update remote LeafNode TLS configurations after a config reload.
355
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
10✔
356
        max := len(opts.LeafNode.Remotes)
10✔
357
        if max == 0 {
10✔
358
                return
×
359
        }
×
360

361
        s.mu.RLock()
10✔
362
        defer s.mu.RUnlock()
10✔
363

10✔
364
        // Changes in the list of remote leaf nodes is not supported.
10✔
365
        // However, make sure that we don't go over the arrays.
10✔
366
        if len(s.leafRemoteCfgs) < max {
10✔
367
                max = len(s.leafRemoteCfgs)
×
368
        }
×
369
        for i := 0; i < max; i++ {
20✔
370
                ro := opts.LeafNode.Remotes[i]
10✔
371
                cfg := s.leafRemoteCfgs[i]
10✔
372
                if ro.TLSConfig != nil {
12✔
373
                        cfg.Lock()
2✔
374
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
375
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
376
                        cfg.Unlock()
2✔
377
                }
2✔
378
        }
379
}
380

381
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
210✔
382
        delay := s.getOpts().LeafNode.ReconnectInterval
210✔
383
        select {
210✔
384
        case <-time.After(delay):
165✔
385
        case <-s.quitCh:
45✔
386
                s.grWG.Done()
45✔
387
                return
45✔
388
        }
389
        s.connectToRemoteLeafNode(remote, false)
165✔
390
}
391

392
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
393
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
652✔
394
        cfg := &leafNodeCfg{
652✔
395
                RemoteLeafOpts: remote,
652✔
396
                urls:           make([]*url.URL, 0, len(remote.URLs)),
652✔
397
        }
652✔
398
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
660✔
399
                perms := &Permissions{}
8✔
400
                if len(remote.DenyExports) > 0 {
16✔
401
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
402
                }
8✔
403
                if len(remote.DenyImports) > 0 {
15✔
404
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
405
                }
7✔
406
                cfg.perms = perms
8✔
407
        }
408
        // Start with the one that is configured. We will add to this
409
        // array when receiving async leafnode INFOs.
410
        cfg.urls = append(cfg.urls, cfg.URLs...)
652✔
411
        // If allowed to randomize, do it on our copy of URLs
652✔
412
        if !remote.NoRandomize {
1,302✔
413
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,050✔
414
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
400✔
415
                })
400✔
416
        }
417
        // If we are TLS make sure we save off a proper servername if possible.
418
        // Do same for user/password since we may need them to connect to
419
        // a bare URL that we get from INFO protocol.
420
        for _, u := range cfg.urls {
1,734✔
421
                cfg.saveTLSHostname(u)
1,082✔
422
                cfg.saveUserPassword(u)
1,082✔
423
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,082✔
424
                // config, mark that we should be using TLS anyway.
1,082✔
425
                if !cfg.TLS && isWSSURL(u) {
1,083✔
426
                        cfg.TLS = true
1✔
427
                }
1✔
428
        }
429
        return cfg
652✔
430
}
431

432
// Will pick an URL from the list of available URLs.
433
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
996✔
434
        cfg.Lock()
996✔
435
        defer cfg.Unlock()
996✔
436
        // If the current URL is the first in the list and we have more than
996✔
437
        // one URL, then move that one to end of the list.
996✔
438
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
1,143✔
439
                first := cfg.urls[0]
147✔
440
                copy(cfg.urls, cfg.urls[1:])
147✔
441
                cfg.urls[len(cfg.urls)-1] = first
147✔
442
        }
147✔
443
        cfg.curURL = cfg.urls[0]
996✔
444
        return cfg.curURL
996✔
445
}
446

447
// Returns the current URL
448
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
74✔
449
        cfg.RLock()
74✔
450
        defer cfg.RUnlock()
74✔
451
        return cfg.curURL
74✔
452
}
74✔
453

454
// Returns how long the server should wait before attempting
455
// to solicit a remote leafnode connection.
456
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
817✔
457
        cfg.RLock()
817✔
458
        delay := cfg.connDelay
817✔
459
        cfg.RUnlock()
817✔
460
        return delay
817✔
461
}
817✔
462

463
// Sets the connect delay.
464
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
148✔
465
        cfg.Lock()
148✔
466
        cfg.connDelay = delay
148✔
467
        cfg.Unlock()
148✔
468
}
148✔
469

470
// Ensure that non-exported options (used in tests) have
471
// been properly set.
472
func (s *Server) setLeafNodeNonExportedOptions() {
6,092✔
473
        opts := s.getOpts()
6,092✔
474
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,092✔
475
        if s.leafNodeOpts.dialTimeout == 0 {
12,183✔
476
                // Use same timeouts as routes for now.
6,091✔
477
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,091✔
478
        }
6,091✔
479
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,092✔
480
        if s.leafNodeOpts.resolver == nil {
12,180✔
481
                s.leafNodeOpts.resolver = net.DefaultResolver
6,088✔
482
        }
6,088✔
483
}
484

485
const sharedSysAccDelay = 250 * time.Millisecond
486

487
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
817✔
488
        defer s.grWG.Done()
817✔
489

817✔
490
        if remote == nil || len(remote.URLs) == 0 {
817✔
491
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
492
                return
×
493
        }
×
494

495
        opts := s.getOpts()
817✔
496
        reconnectDelay := opts.LeafNode.ReconnectInterval
817✔
497
        s.mu.RLock()
817✔
498
        dialTimeout := s.leafNodeOpts.dialTimeout
817✔
499
        resolver := s.leafNodeOpts.resolver
817✔
500
        var isSysAcc bool
817✔
501
        if s.eventsEnabled() {
1,600✔
502
                isSysAcc = remote.LocalAccount == s.sys.account.Name
783✔
503
        }
783✔
504
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
817✔
505
        s.mu.RUnlock()
817✔
506

817✔
507
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
817✔
508
        if firstConnect && isSysAcc && !s.standAloneMode() {
883✔
509
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
66✔
510
                remote.setConnectDelay(sharedSysAccDelay)
66✔
511
        }
66✔
512

513
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
894✔
514
                select {
77✔
515
                case <-time.After(connDelay):
69✔
516
                case <-s.quitCh:
8✔
517
                        return
8✔
518
                }
519
                remote.setConnectDelay(0)
69✔
520
        }
521

522
        var conn net.Conn
809✔
523

809✔
524
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
809✔
525

809✔
526
        attempts := 0
809✔
527

809✔
528
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
1,805✔
529
                rURL := remote.pickNextURL()
996✔
530
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
996✔
531
                if err == nil {
1,985✔
532
                        var ipStr string
989✔
533
                        if url != rURL.Host {
1,059✔
534
                                ipStr = fmt.Sprintf(" (%s)", url)
70✔
535
                        }
70✔
536
                        // Some test may want to disable remotes from connecting
537
                        if s.isLeafConnectDisabled() {
1,127✔
538
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
138✔
539
                                err = ErrLeafNodeDisabled
138✔
540
                        } else {
989✔
541
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
851✔
542
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
851✔
543
                        }
851✔
544
                }
545
                if err != nil {
1,276✔
546
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
280✔
547
                        delay := reconnectDelay + jitter
280✔
548
                        attempts++
280✔
549
                        if s.shouldReportConnectErr(firstConnect, attempts) {
543✔
550
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
263✔
551
                        } else {
280✔
552
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
17✔
553
                        }
17✔
554
                        remote.Lock()
280✔
555
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
280✔
556
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
288✔
557
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
558
                                        s.checkJetStreamMigrate(remote)
8✔
559
                                })
8✔
560
                        }
561
                        remote.Unlock()
280✔
562
                        select {
280✔
563
                        case <-s.quitCh:
88✔
564
                                remote.cancelMigrateTimer()
88✔
565
                                return
88✔
566
                        case <-time.After(delay):
192✔
567
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
192✔
568
                                // This will be used if JetStreamClusterMigrateDelay was not set
192✔
569
                                if jetstreamMigrateDelay == 0 {
306✔
570
                                        s.checkJetStreamMigrate(remote)
114✔
571
                                }
114✔
572
                                continue
192✔
573
                        }
574
                }
575
                remote.cancelMigrateTimer()
716✔
576
                if !s.remoteLeafNodeStillValid(remote) {
716✔
577
                        conn.Close()
×
578
                        return
×
579
                }
×
580

581
                // We have a connection here to a remote server.
582
                // Go ahead and create our leaf node and return.
583
                s.createLeafNode(conn, rURL, remote, nil)
716✔
584

716✔
585
                // Clear any observer states if we had them.
716✔
586
                s.clearObserverState(remote)
716✔
587

716✔
588
                return
716✔
589
        }
590
}
591

592
func (cfg *leafNodeCfg) cancelMigrateTimer() {
804✔
593
        cfg.Lock()
804✔
594
        stopAndClearTimer(&cfg.jsMigrateTimer)
804✔
595
        cfg.Unlock()
804✔
596
}
804✔
597

598
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
599
func (s *Server) clearObserverState(remote *leafNodeCfg) {
716✔
600
        s.mu.RLock()
716✔
601
        accName := remote.LocalAccount
716✔
602
        s.mu.RUnlock()
716✔
603

716✔
604
        acc, err := s.LookupAccount(accName)
716✔
605
        if err != nil {
718✔
606
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
607
                return
2✔
608
        }
2✔
609

610
        acc.jscmMu.Lock()
714✔
611
        defer acc.jscmMu.Unlock()
714✔
612

714✔
613
        // Walk all streams looking for any clustered stream, skip otherwise.
714✔
614
        for _, mset := range acc.streams() {
728✔
615
                node := mset.raftNode()
14✔
616
                if node == nil {
20✔
617
                        // Not R>1
6✔
618
                        continue
6✔
619
                }
620
                // Check consumers
621
                for _, o := range mset.getConsumers() {
10✔
622
                        if n := o.raftNode(); n != nil {
4✔
623
                                // Ensure we can become a leader again.
2✔
624
                                n.SetObserver(false)
2✔
625
                        }
2✔
626
                }
627
                // Ensure we can not become a leader again.
628
                node.SetObserver(false)
8✔
629
        }
630
}
631

632
// Check to see if we should migrate any assets from this account.
633
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
122✔
634
        s.mu.RLock()
122✔
635
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
122✔
636
        s.mu.RUnlock()
122✔
637

122✔
638
        if !shouldMigrate {
176✔
639
                return
54✔
640
        }
54✔
641

642
        acc, err := s.LookupAccount(accName)
68✔
643
        if err != nil {
68✔
644
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
645
                return
×
646
        }
×
647

648
        acc.jscmMu.Lock()
68✔
649
        defer acc.jscmMu.Unlock()
68✔
650

68✔
651
        // Walk all streams looking for any clustered stream, skip otherwise.
68✔
652
        // If we are the leader force stepdown.
68✔
653
        for _, mset := range acc.streams() {
102✔
654
                node := mset.raftNode()
34✔
655
                if node == nil {
34✔
656
                        // Not R>1
×
657
                        continue
×
658
                }
659
                // Collect any consumers
660
                for _, o := range mset.getConsumers() {
58✔
661
                        if n := o.raftNode(); n != nil {
48✔
662
                                if n.Leader() {
26✔
663
                                        n.StepDown()
2✔
664
                                }
2✔
665
                                // Ensure we can not become a leader while in this state.
666
                                n.SetObserver(true)
24✔
667
                        }
668
                }
669
                // Stepdown if this stream was leader.
670
                if node.Leader() {
38✔
671
                        node.StepDown()
4✔
672
                }
4✔
673
                // Ensure we can not become a leader while in this state.
674
                node.SetObserver(true)
34✔
675
        }
676
}
677

678
// Helper for checking.
679
func (s *Server) isLeafConnectDisabled() bool {
989✔
680
        s.mu.RLock()
989✔
681
        defer s.mu.RUnlock()
989✔
682
        return s.leafDisableConnect
989✔
683
}
989✔
684

685
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
686
// come from the server we connect to.
687
//
688
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
689
// However, this was causing failures for users that did not set the scheme (and
690
// their remote connections did not have a tls{} block).
691
// We now save the host name regardless in case the remote returns an INFO indicating
692
// that TLS is required.
693
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,669✔
694
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,689✔
695
                cfg.tlsName = u.Hostname()
20✔
696
        }
20✔
697
}
698

699
// Save off the username/password for when we connect using a bare URL
700
// that we get from the INFO protocol.
701
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,082✔
702
        if cfg.username == _EMPTY_ && u.User != nil {
1,322✔
703
                cfg.username = u.User.Username()
240✔
704
                cfg.password, _ = u.User.Password()
240✔
705
        }
240✔
706
}
707

708
// This starts the leafnode accept loop in a go routine, unless it
709
// is detected that the server has already been shutdown.
710
func (s *Server) startLeafNodeAcceptLoop() {
2,667✔
711
        // Snapshot server options.
2,667✔
712
        opts := s.getOpts()
2,667✔
713

2,667✔
714
        port := opts.LeafNode.Port
2,667✔
715
        if port == -1 {
5,255✔
716
                port = 0
2,588✔
717
        }
2,588✔
718

719
        if s.isShuttingDown() {
2,668✔
720
                return
1✔
721
        }
1✔
722

723
        s.mu.Lock()
2,666✔
724
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
2,666✔
725
        l, e := natsListen("tcp", hp)
2,666✔
726
        s.leafNodeListenerErr = e
2,666✔
727
        if e != nil {
2,666✔
728
                s.mu.Unlock()
×
729
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
730
                return
×
731
        }
×
732

733
        s.Noticef("Listening for leafnode connections on %s",
2,666✔
734
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
2,666✔
735

2,666✔
736
        tlsRequired := opts.LeafNode.TLSConfig != nil
2,666✔
737
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
2,666✔
738
        // Do not set compression in this Info object, it would possibly cause
2,666✔
739
        // issues when sending asynchronous INFO to the remote.
2,666✔
740
        info := Info{
2,666✔
741
                ID:            s.info.ID,
2,666✔
742
                Name:          s.info.Name,
2,666✔
743
                Version:       s.info.Version,
2,666✔
744
                GitCommit:     gitCommit,
2,666✔
745
                GoVersion:     runtime.Version(),
2,666✔
746
                AuthRequired:  true,
2,666✔
747
                TLSRequired:   tlsRequired,
2,666✔
748
                TLSVerify:     tlsVerify,
2,666✔
749
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
2,666✔
750
                Headers:       s.supportsHeaders(),
2,666✔
751
                JetStream:     opts.JetStream,
2,666✔
752
                Domain:        opts.JetStreamDomain,
2,666✔
753
                Proto:         s.getServerProto(),
2,666✔
754
                InfoOnConnect: true,
2,666✔
755
        }
2,666✔
756
        // If we have selected a random port...
2,666✔
757
        if port == 0 {
5,253✔
758
                // Write resolved port back to options.
2,587✔
759
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
2,587✔
760
        }
2,587✔
761

762
        s.leafNodeInfo = info
2,666✔
763
        // Possibly override Host/Port and set IP based on Cluster.Advertise
2,666✔
764
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
2,666✔
765
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
766
                l.Close()
×
767
                s.mu.Unlock()
×
768
                return
×
769
        }
×
770
        s.leafURLsMap[s.leafNodeInfo.IP]++
2,666✔
771
        s.generateLeafNodeInfoJSON()
2,666✔
772

2,666✔
773
        // Setup state that can enable shutdown
2,666✔
774
        s.leafNodeListener = l
2,666✔
775

2,666✔
776
        // As of now, a server that does not have remotes configured would
2,666✔
777
        // never solicit a connection, so we should not have to warn if
2,666✔
778
        // InsecureSkipVerify is set in main LeafNodes config (since
2,666✔
779
        // this TLS setting matters only when soliciting a connection).
2,666✔
780
        // Still, warn if insecure is set in any of LeafNode block.
2,666✔
781
        // We need to check remotes, even if tls is not required on accept.
2,666✔
782
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
2,666✔
783
        if !warn {
5,328✔
784
                for _, r := range opts.LeafNode.Remotes {
2,819✔
785
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
786
                                warn = true
1✔
787
                                break
1✔
788
                        }
789
                }
790
        }
791
        if warn {
2,671✔
792
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
793
        }
5✔
794
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,409✔
795
        s.mu.Unlock()
2,666✔
796
}
797

798
// RegEx to match a creds file with user JWT and Seed.
799
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
800

801
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
802
// Lock should be held entering here.
803
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
606✔
804
        // We support basic user/pass and operator based user JWT with signatures.
606✔
805
        cinfo := leafConnectInfo{
606✔
806
                Version:       VERSION,
606✔
807
                ID:            c.srv.info.ID,
606✔
808
                Domain:        c.srv.info.Domain,
606✔
809
                Name:          c.srv.info.Name,
606✔
810
                Hub:           c.leaf.remote.Hub,
606✔
811
                Cluster:       clusterName,
606✔
812
                Headers:       headers,
606✔
813
                JetStream:     c.acc.jetStreamConfigured(),
606✔
814
                DenyPub:       c.leaf.remote.DenyImports,
606✔
815
                Compression:   c.leaf.compression,
606✔
816
                RemoteAccount: c.acc.GetName(),
606✔
817
                Proto:         c.srv.getServerProto(),
606✔
818
        }
606✔
819

606✔
820
        // If a signature callback is specified, this takes precedence over anything else.
606✔
821
        if cb := c.leaf.remote.SignatureCB; cb != nil {
609✔
822
                nonce := c.nonce
3✔
823
                c.mu.Unlock()
3✔
824
                jwt, sigraw, err := cb(nonce)
3✔
825
                c.mu.Lock()
3✔
826
                if err == nil && c.isClosed() {
4✔
827
                        err = ErrConnectionClosed
1✔
828
                }
1✔
829
                if err != nil {
5✔
830
                        c.Errorf("Error signing the nonce: %v", err)
2✔
831
                        return err
2✔
832
                }
2✔
833
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
834
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
835

836
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
648✔
837
                // Check for credentials first, that will take precedence..
45✔
838
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
45✔
839
                contents, err := os.ReadFile(creds)
45✔
840
                if err != nil {
45✔
841
                        c.Errorf("%v", err)
×
842
                        return err
×
843
                }
×
844
                defer wipeSlice(contents)
45✔
845
                items := credsRe.FindAllSubmatch(contents, -1)
45✔
846
                if len(items) < 2 {
45✔
847
                        c.Errorf("Credentials file malformed")
×
848
                        return err
×
849
                }
×
850
                // First result should be the user JWT.
851
                // We copy here so that the file containing the seed will be wiped appropriately.
852
                raw := items[0][1]
45✔
853
                tmp := make([]byte, len(raw))
45✔
854
                copy(tmp, raw)
45✔
855
                // Seed is second item.
45✔
856
                kp, err := nkeys.FromSeed(items[1][1])
45✔
857
                if err != nil {
45✔
858
                        c.Errorf("Credentials file has malformed seed")
×
859
                        return err
×
860
                }
×
861
                // Wipe our key on exit.
862
                defer kp.Wipe()
45✔
863

45✔
864
                sigraw, _ := kp.Sign(c.nonce)
45✔
865
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
45✔
866
                cinfo.JWT = bytesToString(tmp)
45✔
867
                cinfo.Sig = sig
45✔
868
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
560✔
869
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
870
                if err != nil {
2✔
871
                        c.Errorf("Remote nkey has malformed seed")
×
872
                        return err
×
873
                }
×
874
                // Wipe our key on exit.
875
                defer kp.Wipe()
2✔
876
                sigraw, _ := kp.Sign(c.nonce)
2✔
877
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
878
                pkey, _ := kp.PublicKey()
2✔
879
                cinfo.Nkey = pkey
2✔
880
                cinfo.Sig = sig
2✔
881
        } else if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
817✔
882
                cinfo.User = userInfo.Username()
261✔
883
                cinfo.Pass, _ = userInfo.Password()
261✔
884
        } else if c.leaf.remote.username != _EMPTY_ {
558✔
885
                cinfo.User = c.leaf.remote.username
2✔
886
                cinfo.Pass = c.leaf.remote.password
2✔
887
        }
2✔
888
        b, err := json.Marshal(cinfo)
604✔
889
        if err != nil {
604✔
890
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
891
                return err
×
892
        }
×
893
        // Although this call is made before the writeLoop is created,
894
        // we don't really need to send in place. The protocol will be
895
        // sent out by the writeLoop.
896
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
604✔
897
        return nil
604✔
898
}
899

900
// Makes a deep copy of the LeafNode Info structure.
901
// The server lock is held on entry.
902
func (s *Server) copyLeafNodeInfo() *Info {
2,412✔
903
        clone := s.leafNodeInfo
2,412✔
904
        // Copy the array of urls.
2,412✔
905
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,385✔
906
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
1,973✔
907
        }
1,973✔
908
        return &clone
2,412✔
909
}
910

911
// Adds a LeafNode URL that we get when a route connects to the Info structure.
912
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
913
// Returns a boolean indicating if the URL was added or not.
914
// Server lock is held on entry
915
func (s *Server) addLeafNodeURL(urlStr string) bool {
5,255✔
916
        if s.leafURLsMap.addUrl(urlStr) {
10,505✔
917
                s.generateLeafNodeInfoJSON()
5,250✔
918
                return true
5,250✔
919
        }
5,250✔
920
        return false
5✔
921
}
922

923
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
924
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
925
// Returns a boolean indicating if the URL was removed or not.
926
// Server lock is held on entry.
927
func (s *Server) removeLeafNodeURL(urlStr string) bool {
5,255✔
928
        // Don't need to do this if we are removing the route connection because
5,255✔
929
        // we are shuting down...
5,255✔
930
        if s.isShuttingDown() {
8,035✔
931
                return false
2,780✔
932
        }
2,780✔
933
        if s.leafURLsMap.removeUrl(urlStr) {
4,946✔
934
                s.generateLeafNodeInfoJSON()
2,471✔
935
                return true
2,471✔
936
        }
2,471✔
937
        return false
4✔
938
}
939

940
// Server lock is held on entry
941
func (s *Server) generateLeafNodeInfoJSON() {
10,387✔
942
        s.leafNodeInfo.Cluster = s.cachedClusterName()
10,387✔
943
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
10,387✔
944
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
10,387✔
945
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
10,387✔
946
}
10,387✔
947

948
// Sends an async INFO protocol so that the connected servers can update
949
// their list of LeafNode urls.
950
func (s *Server) sendAsyncLeafNodeInfo() {
7,721✔
951
        for _, c := range s.leafs {
7,815✔
952
                c.mu.Lock()
94✔
953
                c.enqueueProto(s.leafNodeInfoJSON)
94✔
954
                c.mu.Unlock()
94✔
955
        }
94✔
956
}
957

958
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
959
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,486✔
960
        // Snapshot server options.
1,486✔
961
        opts := s.getOpts()
1,486✔
962

1,486✔
963
        maxPay := int32(opts.MaxPayload)
1,486✔
964
        maxSubs := int32(opts.MaxSubs)
1,486✔
965
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,486✔
966
        if maxSubs == 0 {
2,971✔
967
                maxSubs = -1
1,485✔
968
        }
1,485✔
969
        now := time.Now().UTC()
1,486✔
970

1,486✔
971
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,486✔
972
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,486✔
973
        c.leaf = &leaf{}
1,486✔
974

1,486✔
975
        // For accepted LN connections, ws will be != nil if it was accepted
1,486✔
976
        // through the Websocket port.
1,486✔
977
        c.ws = ws
1,486✔
978

1,486✔
979
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,486✔
980
        // a remote Leaf Node connection as a websocket connection.
1,486✔
981
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,529✔
982
                remote.RLock()
43✔
983
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
984
                remote.RUnlock()
43✔
985
        }
43✔
986

987
        // Determines if we are soliciting the connection or not.
988
        var solicited bool
1,486✔
989
        var acc *Account
1,486✔
990
        var remoteSuffix string
1,486✔
991
        if remote != nil {
2,202✔
992
                // For now, if lookup fails, we will constantly try
716✔
993
                // to recreate this LN connection.
716✔
994
                lacc := remote.LocalAccount
716✔
995
                var err error
716✔
996
                acc, err = s.LookupAccount(lacc)
716✔
997
                if err != nil {
718✔
998
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
999
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1000
                        // remote needs to be set or retry won't happen
2✔
1001
                        c.leaf.remote = remote
2✔
1002
                        c.closeConnection(MissingAccount)
2✔
1003
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1004
                        return nil
2✔
1005
                }
2✔
1006
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
714✔
1007
        }
1008

1009
        c.mu.Lock()
1,484✔
1010
        c.initClient()
1,484✔
1011
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,484✔
1012

1,484✔
1013
        var (
1,484✔
1014
                tlsFirst         bool
1,484✔
1015
                tlsFirstFallback time.Duration
1,484✔
1016
                infoTimeout      time.Duration
1,484✔
1017
        )
1,484✔
1018
        if remote != nil {
2,198✔
1019
                solicited = true
714✔
1020
                remote.Lock()
714✔
1021
                c.leaf.remote = remote
714✔
1022
                c.setPermissions(remote.perms)
714✔
1023
                if !c.leaf.remote.Hub {
1,422✔
1024
                        c.leaf.isSpoke = true
708✔
1025
                }
708✔
1026
                tlsFirst = remote.TLSHandshakeFirst
714✔
1027
                infoTimeout = remote.FirstInfoTimeout
714✔
1028
                remote.Unlock()
714✔
1029
                c.acc = acc
714✔
1030
        } else {
770✔
1031
                c.flags.set(expectConnect)
770✔
1032
                if ws != nil {
797✔
1033
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1034
                }
27✔
1035
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
770✔
1036
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
771✔
1037
                        tlsFirstFallback = f
1✔
1038
                }
1✔
1039
        }
1040
        c.mu.Unlock()
1,484✔
1041

1,484✔
1042
        var nonce [nonceLen]byte
1,484✔
1043
        var info *Info
1,484✔
1044

1,484✔
1045
        // Grab this before the client lock below.
1,484✔
1046
        if !solicited {
2,254✔
1047
                // Grab server variables
770✔
1048
                s.mu.Lock()
770✔
1049
                info = s.copyLeafNodeInfo()
770✔
1050
                // For tests that want to simulate old servers, do not set the compression
770✔
1051
                // on the INFO protocol if configured with CompressionNotSupported.
770✔
1052
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,539✔
1053
                        info.Compression = cm
769✔
1054
                }
769✔
1055
                s.generateNonce(nonce[:])
770✔
1056
                s.mu.Unlock()
770✔
1057
        }
1058

1059
        // Grab lock
1060
        c.mu.Lock()
1,484✔
1061

1,484✔
1062
        var preBuf []byte
1,484✔
1063
        if solicited {
2,198✔
1064
                // For websocket connection, we need to send an HTTP request,
714✔
1065
                // and get the response before starting the readLoop to get
714✔
1066
                // the INFO, etc..
714✔
1067
                if c.isWebsocket() {
757✔
1068
                        var err error
43✔
1069
                        var closeReason ClosedState
43✔
1070

43✔
1071
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1072
                        if err != nil {
59✔
1073
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1074
                                c.mu.Unlock()
16✔
1075
                                if closeReason != 0 {
28✔
1076
                                        c.closeConnection(closeReason)
12✔
1077
                                }
12✔
1078
                                return nil
16✔
1079
                        }
1080
                } else {
671✔
1081
                        // If configured to do TLS handshake first
671✔
1082
                        if tlsFirst {
675✔
1083
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1084
                                        c.mu.Unlock()
1✔
1085
                                        return nil
1✔
1086
                                }
1✔
1087
                        }
1088
                        // We need to wait for the info, but not for too long.
1089
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
670✔
1090
                }
1091

1092
                // We will process the INFO from the readloop and finish by
1093
                // sending the CONNECT and finish registration later.
1094
        } else {
770✔
1095
                // Send our info to the other side.
770✔
1096
                // Remember the nonce we sent here for signatures, etc.
770✔
1097
                c.nonce = make([]byte, nonceLen)
770✔
1098
                copy(c.nonce, nonce[:])
770✔
1099
                info.Nonce = bytesToString(c.nonce)
770✔
1100
                info.CID = c.cid
770✔
1101
                proto := generateInfoJSON(info)
770✔
1102

770✔
1103
                var pre []byte
770✔
1104
                // We need first to check for "TLS First" fallback delay.
770✔
1105
                if tlsFirstFallback > 0 {
771✔
1106
                        // We wait and see if we are getting any data. Since we did not send
1✔
1107
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1108
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1109
                        // if it is a rogue agent and not an actual client performing the
1✔
1110
                        // TLS handshake, the error will be detected when performing the
1✔
1111
                        // handshake on our side.
1✔
1112
                        pre = make([]byte, 4)
1✔
1113
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1114
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1115
                        c.nc.SetReadDeadline(time.Time{})
1✔
1116
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1117
                        // with the TLS handshake.
1✔
1118
                        if n > 0 {
1✔
1119
                                pre = pre[:n]
×
1120
                        } else {
1✔
1121
                                // We did not get anything so we will send the INFO protocol.
1✔
1122
                                pre = nil
1✔
1123
                                // Set the boolean to false for the rest of the function.
1✔
1124
                                tlsFirst = false
1✔
1125
                        }
1✔
1126
                }
1127

1128
                if !tlsFirst {
1,535✔
1129
                        // We have to send from this go routine because we may
765✔
1130
                        // have to block for TLS handshake before we start our
765✔
1131
                        // writeLoop go routine. The other side needs to receive
765✔
1132
                        // this before it can initiate the TLS handshake..
765✔
1133
                        c.sendProtoNow(proto)
765✔
1134

765✔
1135
                        // The above call could have marked the connection as closed (due to TCP error).
765✔
1136
                        if c.isClosed() {
765✔
1137
                                c.mu.Unlock()
×
1138
                                c.closeConnection(WriteError)
×
1139
                                return nil
×
1140
                        }
×
1141
                }
1142

1143
                // Check to see if we need to spin up TLS.
1144
                if !c.isWebsocket() && info.TLSRequired {
837✔
1145
                        // If we have a prebuffer create a multi-reader.
67✔
1146
                        if len(pre) > 0 {
67✔
1147
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1148
                        }
×
1149
                        // Perform server-side TLS handshake.
1150
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
107✔
1151
                                c.mu.Unlock()
40✔
1152
                                return nil
40✔
1153
                        }
40✔
1154
                }
1155

1156
                // If the user wants the TLS handshake to occur first, now that it is
1157
                // done, send the INFO protocol.
1158
                if tlsFirst {
733✔
1159
                        c.flags.set(didTLSFirst)
3✔
1160
                        c.sendProtoNow(proto)
3✔
1161
                        if c.isClosed() {
3✔
1162
                                c.mu.Unlock()
×
1163
                                c.closeConnection(WriteError)
×
1164
                                return nil
×
1165
                        }
×
1166
                }
1167

1168
                // Leaf nodes will always require a CONNECT to let us know
1169
                // when we are properly bound to an account.
1170
                //
1171
                // If compression is configured, we can't set the authTimer here because
1172
                // it would cause the parser to fail any incoming protocol that is not a
1173
                // CONNECT (and we need to exchange INFO protocols for compression
1174
                // negotiation). So instead, use the ping timer until we are done with
1175
                // negotiation and can set the auth timer.
1176
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
730✔
1177
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,260✔
1178
                        c.ping.tmr = time.AfterFunc(timeout, func() {
539✔
1179
                                c.authTimeout()
9✔
1180
                        })
9✔
1181
                } else {
200✔
1182
                        c.setAuthTimer(timeout)
200✔
1183
                }
200✔
1184
        }
1185

1186
        // Keep track in case server is shutdown before we can successfully register.
1187
        if !s.addToTempClients(c.cid, c) {
1,428✔
1188
                c.mu.Unlock()
1✔
1189
                c.setNoReconnect()
1✔
1190
                c.closeConnection(ServerShutdown)
1✔
1191
                return nil
1✔
1192
        }
1✔
1193

1194
        // Spin up the read loop.
1195
        s.startGoRoutine(func() { c.readLoop(preBuf) })
2,852✔
1196

1197
        // We will spin the write loop for solicited connections only
1198
        // when processing the INFO and after switching to TLS if needed.
1199
        if !solicited {
2,156✔
1200
                s.startGoRoutine(func() { c.writeLoop() })
1,460✔
1201
        }
1202

1203
        c.mu.Unlock()
1,426✔
1204

1,426✔
1205
        return c
1,426✔
1206
}
1207

1208
// Will perform the client-side TLS handshake if needed. Assumes that this
1209
// is called by the solicit side (remote will be non nil). Returns `true`
1210
// if TLS is required, `false` otherwise.
1211
// Lock held on entry.
1212
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,722✔
1213
        // Check if TLS is required and gather TLS config variables.
1,722✔
1214
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,722✔
1215
        if !tlsRequired {
3,370✔
1216
                return false, nil
1,648✔
1217
        }
1,648✔
1218

1219
        // If TLS required, peform handshake.
1220
        // Get the URL that was used to connect to the remote server.
1221
        rURL := remote.getCurrentURL()
74✔
1222

74✔
1223
        // Perform the client-side TLS handshake.
74✔
1224
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
107✔
1225
                // Check if we need to reset the remote's TLS name.
33✔
1226
                if resetTLSName {
33✔
1227
                        remote.Lock()
×
1228
                        remote.tlsName = _EMPTY_
×
1229
                        remote.Unlock()
×
1230
                }
×
1231
                return false, err
33✔
1232
        }
1233
        return true, nil
41✔
1234
}
1235

1236
func (c *client) processLeafnodeInfo(info *Info) {
2,388✔
1237
        c.mu.Lock()
2,388✔
1238
        if c.leaf == nil || c.isClosed() {
2,388✔
1239
                c.mu.Unlock()
×
1240
                return
×
1241
        }
×
1242
        s := c.srv
2,388✔
1243
        opts := s.getOpts()
2,388✔
1244
        remote := c.leaf.remote
2,388✔
1245
        didSolicit := remote != nil
2,388✔
1246
        firstINFO := !c.flags.isSet(infoReceived)
2,388✔
1247

2,388✔
1248
        // In case of websocket, the TLS handshake has been already done.
2,388✔
1249
        // So check only for non websocket connections and for configurations
2,388✔
1250
        // where the TLS Handshake was not done first.
2,388✔
1251
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,063✔
1252
                // If the server requires TLS, we need to set this in the remote
1,675✔
1253
                // otherwise if there is no TLS configuration block for the remote,
1,675✔
1254
                // the solicit side will not attempt to perform the TLS handshake.
1,675✔
1255
                if firstINFO && info.TLSRequired {
1,733✔
1256
                        remote.TLS = true
58✔
1257
                }
58✔
1258
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,703✔
1259
                        c.mu.Unlock()
28✔
1260
                        return
28✔
1261
                }
28✔
1262
        }
1263

1264
        // Check for compression, unless already done.
1265
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,521✔
1266
                // Prevent from getting back here.
1,161✔
1267
                c.flags.set(compressionNegotiated)
1,161✔
1268

1,161✔
1269
                var co *CompressionOpts
1,161✔
1270
                if !didSolicit {
1,666✔
1271
                        co = &opts.LeafNode.Compression
505✔
1272
                } else {
1,161✔
1273
                        co = &remote.Compression
656✔
1274
                }
656✔
1275
                if needsCompression(co.Mode) {
2,311✔
1276
                        // Release client lock since following function will need server lock.
1,150✔
1277
                        c.mu.Unlock()
1,150✔
1278
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,150✔
1279
                        if err != nil {
1,150✔
1280
                                c.sendErrAndErr(err.Error())
×
1281
                                c.closeConnection(ProtocolViolation)
×
1282
                                return
×
1283
                        }
×
1284
                        if compress {
2,176✔
1285
                                // Done for now, will get back another INFO protocol...
1,026✔
1286
                                return
1,026✔
1287
                        }
1,026✔
1288
                        // No compression because one side does not want/can't, so proceed.
1289
                        c.mu.Lock()
124✔
1290
                        // Check that the connection did not close if the lock was released.
124✔
1291
                        if c.isClosed() {
124✔
1292
                                c.mu.Unlock()
×
1293
                                return
×
1294
                        }
×
1295
                } else {
11✔
1296
                        // Coming from an old server, the Compression field would be the empty
11✔
1297
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1298
                        // this makes them behave as old servers.
11✔
1299
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1300
                                c.leaf.compression = CompressionNotSupported
3✔
1301
                        } else {
11✔
1302
                                c.leaf.compression = CompressionOff
8✔
1303
                        }
8✔
1304
                }
1305
                // Accepting side does not normally process an INFO protocol during
1306
                // initial connection handshake. So we keep it consistent by returning
1307
                // if we are not soliciting.
1308
                if !didSolicit {
136✔
1309
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1310
                        // clear the ping timer and set the auth timer now that the compression
1✔
1311
                        // negotiation is done.
1✔
1312
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1313
                                clearTimer(&c.ping.tmr)
×
1314
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1315
                        }
×
1316
                        c.mu.Unlock()
1✔
1317
                        return
1✔
1318
                }
1319
                // Fall through and process the INFO protocol as usual.
1320
        }
1321

1322
        // Note: For now, only the initial INFO has a nonce. We
1323
        // will probably do auto key rotation at some point.
1324
        if firstINFO {
2,028✔
1325
                // Mark that the INFO protocol has been received.
695✔
1326
                c.flags.set(infoReceived)
695✔
1327
                // Prevent connecting to non leafnode port. Need to do this only for
695✔
1328
                // the first INFO, not for async INFO updates...
695✔
1329
                //
695✔
1330
                // Content of INFO sent by the server when accepting a tcp connection.
695✔
1331
                // -------------------------------------------------------------------
695✔
1332
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
695✔
1333
                // -------------------------------------------------------------------
695✔
1334
                //      CLIENT    |  X* |        X**        |              |         |
695✔
1335
                //      ROUTE     |     |        X**        |      X***    |         |
695✔
1336
                //     GATEWAY    |     |                   |              |    X    |
695✔
1337
                //     LEAFNODE   |  X  |                   |       X      |         |
695✔
1338
                // -------------------------------------------------------------------
695✔
1339
                // *   Not on older servers.
695✔
1340
                // **  Not if "no advertise" is enabled.
695✔
1341
                // *** Not if leafnode's "no advertise" is enabled.
695✔
1342
                //
695✔
1343
                // As seen from above, a solicited LeafNode connection should receive
695✔
1344
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
695✔
1345
                // else should be considered an attempt to connect to a wrong port.
695✔
1346
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
742✔
1347
                        c.mu.Unlock()
47✔
1348
                        c.Errorf(ErrConnectedToWrongPort.Error())
47✔
1349
                        c.closeConnection(WrongPort)
47✔
1350
                        return
47✔
1351
                }
47✔
1352
                // Reject a cluster that contains spaces.
1353
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
649✔
1354
                        c.mu.Unlock()
1✔
1355
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1356
                        c.closeConnection(ProtocolViolation)
1✔
1357
                        return
1✔
1358
                }
1✔
1359
                // Capture a nonce here.
1360
                c.nonce = []byte(info.Nonce)
647✔
1361
                if info.TLSRequired && didSolicit {
677✔
1362
                        remote.TLS = true
30✔
1363
                }
30✔
1364
                supportsHeaders := c.srv.supportsHeaders()
647✔
1365
                c.headers = supportsHeaders && info.Headers
647✔
1366

647✔
1367
                // Remember the remote server.
647✔
1368
                // Pre 2.2.0 servers are not sending their server name.
647✔
1369
                // In that case, use info.ID, which, for those servers, matches
647✔
1370
                // the content of the field `Name` in the leafnode CONNECT protocol.
647✔
1371
                if info.Name == _EMPTY_ {
647✔
1372
                        c.leaf.remoteServer = info.ID
×
1373
                } else {
647✔
1374
                        c.leaf.remoteServer = info.Name
647✔
1375
                }
647✔
1376
                c.leaf.remoteDomain = info.Domain
647✔
1377
                c.leaf.remoteCluster = info.Cluster
647✔
1378
                // We send the protocol version in the INFO protocol.
647✔
1379
                // Keep track of it, so we know if this connection supports message
647✔
1380
                // tracing for instance.
647✔
1381
                c.opts.Protocol = info.Proto
647✔
1382
        }
1383

1384
        // For both initial INFO and async INFO protocols, Possibly
1385
        // update our list of remote leafnode URLs we can connect to.
1386
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,487✔
1387
                // Consider the incoming array as the most up-to-date
1,202✔
1388
                // representation of the remote cluster's list of URLs.
1,202✔
1389
                c.updateLeafNodeURLs(info)
1,202✔
1390
        }
1,202✔
1391

1392
        // Check to see if we have permissions updates here.
1393
        if info.Import != nil || info.Export != nil {
1,296✔
1394
                perms := &Permissions{
11✔
1395
                        Publish:   info.Export,
11✔
1396
                        Subscribe: info.Import,
11✔
1397
                }
11✔
1398
                // Check if we have local deny clauses that we need to merge.
11✔
1399
                if remote := c.leaf.remote; remote != nil {
22✔
1400
                        if len(remote.DenyExports) > 0 {
12✔
1401
                                if perms.Publish == nil {
1✔
1402
                                        perms.Publish = &SubjectPermission{}
×
1403
                                }
×
1404
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1405
                        }
1406
                        if len(remote.DenyImports) > 0 {
12✔
1407
                                if perms.Subscribe == nil {
1✔
1408
                                        perms.Subscribe = &SubjectPermission{}
×
1409
                                }
×
1410
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1411
                        }
1412
                }
1413
                c.setPermissions(perms)
11✔
1414
        }
1415

1416
        var resumeConnect bool
1,285✔
1417

1,285✔
1418
        // If this is a remote connection and this is the first INFO protocol,
1,285✔
1419
        // then we need to finish the connect process by sending CONNECT, etc..
1,285✔
1420
        if firstINFO && didSolicit {
1,891✔
1421
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
606✔
1422
                c.nc.SetDeadline(time.Time{})
606✔
1423
                resumeConnect = true
606✔
1424
        } else if !firstINFO && didSolicit {
1,881✔
1425
                c.leaf.remoteAccName = info.RemoteAccount
596✔
1426
        }
596✔
1427

1428
        // Check if we have the remote account information and if so make sure it's stored.
1429
        if info.RemoteAccount != _EMPTY_ {
1,873✔
1430
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
588✔
1431
        }
588✔
1432
        c.mu.Unlock()
1,285✔
1433

1,285✔
1434
        finishConnect := info.ConnectInfo
1,285✔
1435
        if resumeConnect && s != nil {
1,891✔
1436
                s.leafNodeResumeConnectProcess(c)
606✔
1437
                if !info.InfoOnConnect {
606✔
1438
                        finishConnect = true
×
1439
                }
×
1440
        }
1441
        if finishConnect {
1,873✔
1442
                s.leafNodeFinishConnectProcess(c)
588✔
1443
        }
588✔
1444
}
1445

1446
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,150✔
1447
        // Negotiate the appropriate compression mode (or no compression)
1,150✔
1448
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,150✔
1449
        if err != nil {
1,150✔
1450
                return false, err
×
1451
        }
×
1452
        c.mu.Lock()
1,150✔
1453
        // For "auto" mode, set the initial compression mode based on RTT
1,150✔
1454
        if cm == CompressionS2Auto {
2,137✔
1455
                if c.rttStart.IsZero() {
1,974✔
1456
                        c.rtt = computeRTT(c.start)
987✔
1457
                }
987✔
1458
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
987✔
1459
        }
1460
        // Keep track of the negotiated compression mode.
1461
        c.leaf.compression = cm
1,150✔
1462
        cid := c.cid
1,150✔
1463
        var nonce string
1,150✔
1464
        if !didSolicit {
1,654✔
1465
                nonce = bytesToString(c.nonce)
504✔
1466
        }
504✔
1467
        c.mu.Unlock()
1,150✔
1468

1,150✔
1469
        if !needsCompression(cm) {
1,274✔
1470
                return false, nil
124✔
1471
        }
124✔
1472

1473
        // If we end-up doing compression...
1474

1475
        // Generate an INFO with the chosen compression mode.
1476
        s.mu.Lock()
1,026✔
1477
        info := s.copyLeafNodeInfo()
1,026✔
1478
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,026✔
1479
        infoProto := generateInfoJSON(info)
1,026✔
1480
        s.mu.Unlock()
1,026✔
1481

1,026✔
1482
        // If we solicited, then send this INFO protocol BEFORE switching
1,026✔
1483
        // to compression writer. However, if we did not, we send it after.
1,026✔
1484
        c.mu.Lock()
1,026✔
1485
        if didSolicit {
1,548✔
1486
                c.enqueueProto(infoProto)
522✔
1487
                // Make sure it is completely flushed (the pending bytes goes to
522✔
1488
                // 0) before proceeding.
522✔
1489
                for c.out.pb > 0 && !c.isClosed() {
1,044✔
1490
                        c.flushOutbound()
522✔
1491
                }
522✔
1492
        }
1493
        // This is to notify the readLoop that it should switch to a
1494
        // (de)compression reader.
1495
        c.in.flags.set(switchToCompression)
1,026✔
1496
        // Create the compress writer before queueing the INFO protocol for
1,026✔
1497
        // a route that did not solicit. It will make sure that that proto
1,026✔
1498
        // is sent with compression on.
1,026✔
1499
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,026✔
1500
        if !didSolicit {
1,530✔
1501
                c.enqueueProto(infoProto)
504✔
1502
        }
504✔
1503
        c.mu.Unlock()
1,026✔
1504
        return true, nil
1,026✔
1505
}
1506

1507
// When getting a leaf node INFO protocol, use the provided
1508
// array of urls to update the list of possible endpoints.
1509
func (c *client) updateLeafNodeURLs(info *Info) {
1,202✔
1510
        cfg := c.leaf.remote
1,202✔
1511
        cfg.Lock()
1,202✔
1512
        defer cfg.Unlock()
1,202✔
1513

1,202✔
1514
        // We have ensured that if a remote has a WS scheme, then all are.
1,202✔
1515
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,202✔
1516
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,256✔
1517
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1518
                // we will have already marked that the remote should use TLS anyway.
54✔
1519
                // But use proper scheme for log statements, etc...
54✔
1520
                proto := wsSchemePrefix
54✔
1521
                if cfg.TLS {
54✔
1522
                        proto = wsSchemePrefixTLS
×
1523
                }
×
1524
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1525
                return
54✔
1526
        }
1527
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,148✔
1528
}
1529

1530
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,202✔
1531
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,202✔
1532
        // Add the ones we receive in the protocol
1,202✔
1533
        for _, surl := range URLs {
3,406✔
1534
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,204✔
1535
                if err != nil {
2,204✔
1536
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1537
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1538
                        continue
×
1539
                }
1540
                // Do not add if it's the same as what we already have configured.
1541
                var dup bool
2,204✔
1542
                for _, u := range cfg.URLs {
5,635✔
1543
                        // URLs that we receive never have user info, but the
3,431✔
1544
                        // ones that were configured may have. Simply compare
3,431✔
1545
                        // host and port to decide if they are equal or not.
3,431✔
1546
                        if url.Host == u.Host && url.Port() == u.Port() {
5,048✔
1547
                                dup = true
1,617✔
1548
                                break
1,617✔
1549
                        }
1550
                }
1551
                if !dup {
2,791✔
1552
                        cfg.urls = append(cfg.urls, url)
587✔
1553
                        cfg.saveTLSHostname(url)
587✔
1554
                }
587✔
1555
        }
1556
        // Add the configured one
1557
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,202✔
1558
}
1559

1560
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1561
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
2,666✔
1562
        opts := s.getOpts()
2,666✔
1563
        if opts.LeafNode.Advertise != _EMPTY_ {
2,677✔
1564
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1565
                if err != nil {
11✔
1566
                        return err
×
1567
                }
×
1568
                s.leafNodeInfo.Host = advHost
11✔
1569
                s.leafNodeInfo.Port = advPort
11✔
1570
        } else {
2,655✔
1571
                s.leafNodeInfo.Host = opts.LeafNode.Host
2,655✔
1572
                s.leafNodeInfo.Port = opts.LeafNode.Port
2,655✔
1573
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
2,655✔
1574
                // This will return at most 1 IP.
2,655✔
1575
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
2,655✔
1576
                if err != nil {
2,655✔
1577
                        return err
×
1578
                }
×
1579
                if hostIsIPAny {
2,936✔
1580
                        if len(ips) == 0 {
281✔
1581
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1582
                                        s.leafNodeInfo.Host)
×
1583
                        } else {
281✔
1584
                                // Take the first from the list...
281✔
1585
                                s.leafNodeInfo.Host = ips[0]
281✔
1586
                        }
281✔
1587
                }
1588
        }
1589
        // Use just host:port for the IP
1590
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
2,666✔
1591
        if opts.LeafNode.Advertise != _EMPTY_ {
2,677✔
1592
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1593
        }
11✔
1594
        return nil
2,666✔
1595
}
1596

1597
// Add the connection to the map of leaf nodes.
1598
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1599
// if a connection already exists for the same server name and account.
1600
// That can happen when the remote is attempting to reconnect while the accepting
1601
// side did not detect the connection as broken yet.
1602
// But it can also happen when there is a misconfiguration and the remote is
1603
// creating two (or more) connections that bind to the same account on the accept
1604
// side.
1605
// When a duplicate is found, the new connection is accepted and the old is closed
1606
// (this solves the stale connection situation). An error is returned to help the
1607
// remote detect the misconfiguration when the duplicate is the result of that
1608
// misconfiguration.
1609
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,201✔
1610
        var accName string
1,201✔
1611
        c.mu.Lock()
1,201✔
1612
        cid := c.cid
1,201✔
1613
        acc := c.acc
1,201✔
1614
        if acc != nil {
2,402✔
1615
                accName = acc.Name
1,201✔
1616
        }
1,201✔
1617
        myRemoteDomain := c.leaf.remoteDomain
1,201✔
1618
        mySrvName := c.leaf.remoteServer
1,201✔
1619
        remoteAccName := c.leaf.remoteAccName
1,201✔
1620
        myClustName := c.leaf.remoteCluster
1,201✔
1621
        solicited := c.leaf.remote != nil
1,201✔
1622
        c.mu.Unlock()
1,201✔
1623

1,201✔
1624
        var old *client
1,201✔
1625
        s.mu.Lock()
1,201✔
1626
        // We check for empty because in some test we may send empty CONNECT{}
1,201✔
1627
        if checkForDup && srvName != _EMPTY_ {
1,790✔
1628
                for _, ol := range s.leafs {
963✔
1629
                        ol.mu.Lock()
374✔
1630
                        // We care here only about non solicited Leafnode. This function
374✔
1631
                        // is more about replacing stale connections than detecting loops.
374✔
1632
                        // We have code for the loop detection elsewhere, which also delays
374✔
1633
                        // attempt to reconnect.
374✔
1634
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
374✔
1635
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
374✔
1636
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
377✔
1637
                                old = ol
3✔
1638
                        }
3✔
1639
                        ol.mu.Unlock()
374✔
1640
                        if old != nil {
377✔
1641
                                break
3✔
1642
                        }
1643
                }
1644
        }
1645
        // Store new connection in the map
1646
        s.leafs[cid] = c
1,201✔
1647
        s.mu.Unlock()
1,201✔
1648
        s.removeFromTempClients(cid)
1,201✔
1649

1,201✔
1650
        // If applicable, evict the old one.
1,201✔
1651
        if old != nil {
1,204✔
1652
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
3✔
1653
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
3✔
1654
                c.Warnf("Replacing connection from same server")
3✔
1655
        }
3✔
1656

1657
        srvDecorated := func() string {
1,403✔
1658
                if myClustName == _EMPTY_ {
223✔
1659
                        return mySrvName
21✔
1660
                }
21✔
1661
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
181✔
1662
        }
1663

1664
        opts := s.getOpts()
1,201✔
1665
        sysAcc := s.SystemAccount()
1,201✔
1666
        js := s.getJetStream()
1,201✔
1667
        var meta *raft
1,201✔
1668
        if js != nil {
1,703✔
1669
                if mg := js.getMetaGroup(); mg != nil {
915✔
1670
                        meta = mg.(*raft)
413✔
1671
                }
413✔
1672
        }
1673
        blockMappingOutgoing := false
1,201✔
1674
        // Deny (non domain) JetStream API traffic unless system account is shared
1,201✔
1675
        // and domain names are identical and extending is not disabled
1,201✔
1676

1,201✔
1677
        // Check if backwards compatibility has been enabled and needs to be acted on
1,201✔
1678
        forceSysAccDeny := false
1,201✔
1679
        if len(opts.JsAccDefaultDomain) > 0 {
1,238✔
1680
                if acc == sysAcc {
48✔
1681
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1682
                                if d == _EMPTY_ {
19✔
1683
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1684
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1685
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1686
                                        forceSysAccDeny = true
8✔
1687
                                        break
8✔
1688
                                }
1689
                        }
1690
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1691
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1692
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1693
                        return
15✔
1694
                }
15✔
1695
        }
1696

1697
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1698
        // This is either signaled by js being disabled and a domain set,
1699
        // or in cases where no domain name exists, an extension hint is set.
1700
        // However, this is only relevant in mixed setups.
1701
        //
1702
        // If the system account connects but default domains are present, JetStream can't be extended.
1703
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,186✔
1704
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,215✔
1705
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,029✔
1706
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,029✔
1707
                if acc != nil && acc == sysAcc {
1,159✔
1708
                        c.Noticef("System account connected from %s", srvDecorated())
130✔
1709
                        c.Noticef("JetStream not extended, domains differ")
130✔
1710
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
130✔
1711
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
130✔
1712
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
130✔
1713
                        if solicited && meta != nil && meta.IsObserver() {
159✔
1714
                                meta.setObserver(false, extNotExtended)
29✔
1715
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1716
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1717
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1718
                                // Meta controller can't be leader yet.
29✔
1719
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1720
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1721
                                meta.Campaign()
29✔
1722
                        }
29✔
1723
                } else {
899✔
1724
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
899✔
1725
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
899✔
1726
                }
899✔
1727
                blockMappingOutgoing = true
1,029✔
1728
        } else if acc == sysAcc {
229✔
1729
                // system account and same domain
72✔
1730
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1731
                        myRemoteDomain, srvDecorated())
72✔
1732
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1733
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1734
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1735
                        meta.setObserver(true, extExtended)
4✔
1736
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1737
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1738
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1739
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1740
                        meta.StepDown()
4✔
1741
                }
4✔
1742
        } else {
85✔
1743
                // This deny is needed in all cases (system account shared or not)
85✔
1744
                // If the system account is shared, jsAllAPI traffic will go through the system account.
85✔
1745
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
85✔
1746
                // If the system account is NOT shared, jsAllAPI traffic has no business
85✔
1747
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
85✔
1748
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
85✔
1749
        }
85✔
1750
        // If we have a specified JetStream domain we will want to add a mapping to
1751
        // allow access cross domain for each non-system account.
1752
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,408✔
1753
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,220✔
1754
                        if err := acc.AddMapping(src, dest); err != nil {
1,998✔
1755
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1756
                        } else {
1,998✔
1757
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
1,998✔
1758
                        }
1,998✔
1759
                }
1760
                if blockMappingOutgoing {
413✔
1761
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
191✔
1762
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
191✔
1763
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
191✔
1764
                        // of this issue, not all of them.
191✔
1765
                        // This guards against a hub and a spoke having the same domain name.
191✔
1766
                        // But not two spokes having the same one and the request coming from the hub.
191✔
1767
                        c.mergeDenyPermissionsLocked(pub, []string{src})
191✔
1768
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
191✔
1769
                }
191✔
1770
        }
1771
}
1772

1773
func (s *Server) removeLeafNodeConnection(c *client) {
1,486✔
1774
        c.mu.Lock()
1,486✔
1775
        cid := c.cid
1,486✔
1776
        if c.leaf != nil {
2,972✔
1777
                if c.leaf.tsubt != nil {
2,546✔
1778
                        c.leaf.tsubt.Stop()
1,060✔
1779
                        c.leaf.tsubt = nil
1,060✔
1780
                }
1,060✔
1781
                if c.leaf.gwSub != nil {
2,072✔
1782
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
586✔
1783
                        // We need to set this to nil for GC to release the connection
586✔
1784
                        c.leaf.gwSub = nil
586✔
1785
                }
586✔
1786
        }
1787
        c.mu.Unlock()
1,486✔
1788
        s.mu.Lock()
1,486✔
1789
        delete(s.leafs, cid)
1,486✔
1790
        s.mu.Unlock()
1,486✔
1791
        s.removeFromTempClients(cid)
1,486✔
1792
}
1793

1794
// Connect information for solicited leafnodes.
1795
type leafConnectInfo struct {
1796
        Version   string   `json:"version,omitempty"`
1797
        Nkey      string   `json:"nkey,omitempty"`
1798
        JWT       string   `json:"jwt,omitempty"`
1799
        Sig       string   `json:"sig,omitempty"`
1800
        User      string   `json:"user,omitempty"`
1801
        Pass      string   `json:"pass,omitempty"`
1802
        ID        string   `json:"server_id,omitempty"`
1803
        Domain    string   `json:"domain,omitempty"`
1804
        Name      string   `json:"name,omitempty"`
1805
        Hub       bool     `json:"is_hub,omitempty"`
1806
        Cluster   string   `json:"cluster,omitempty"`
1807
        Headers   bool     `json:"headers,omitempty"`
1808
        JetStream bool     `json:"jetstream,omitempty"`
1809
        DenyPub   []string `json:"deny_pub,omitempty"`
1810

1811
        // There was an existing field called:
1812
        // >> Comp bool `json:"compression,omitempty"`
1813
        // that has never been used. With support for compression, we now need
1814
        // a field that is a string. So we use a different json tag:
1815
        Compression string `json:"compress_mode,omitempty"`
1816

1817
        // Just used to detect wrong connection attempts.
1818
        Gateway string `json:"gateway,omitempty"`
1819

1820
        // Tells the accept side which account the remote is binding to.
1821
        RemoteAccount string `json:"remote_account,omitempty"`
1822

1823
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1824
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1825
        // version as part of the CONNECT. It will indicate if a connection supports
1826
        // some features, such as message tracing.
1827
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1828
        // in the low level process CONNECT.
1829
        Proto int `json:"protocol,omitempty"`
1830
}
1831

1832
// processLeafNodeConnect will process the inbound connect args.
1833
// Once we are here we are bound to an account, so can send any interest that
1834
// we would have to the other side.
1835
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
621✔
1836
        // Way to detect clients that incorrectly connect to the route listen
621✔
1837
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
621✔
1838
        if lang != _EMPTY_ {
621✔
1839
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1840
                c.closeConnection(WrongPort)
×
1841
                return ErrClientConnectedToLeafNodePort
×
1842
        }
×
1843

1844
        // Unmarshal as a leaf node connect protocol
1845
        proto := &leafConnectInfo{}
621✔
1846
        if err := json.Unmarshal(arg, proto); err != nil {
621✔
1847
                return err
×
1848
        }
×
1849

1850
        // Reject a cluster that contains spaces.
1851
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
622✔
1852
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1853
                c.closeConnection(ProtocolViolation)
1✔
1854
                return ErrClusterNameHasSpaces
1✔
1855
        }
1✔
1856

1857
        // Check for cluster name collisions.
1858
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
624✔
1859
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1860
                c.closeConnection(ClusterNamesIdentical)
4✔
1861
                return ErrLeafNodeHasSameClusterName
4✔
1862
        }
4✔
1863

1864
        // Reject if this has Gateway which means that it would be from a gateway
1865
        // connection that incorrectly connects to the leafnode port.
1866
        if proto.Gateway != _EMPTY_ {
616✔
1867
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1868
                c.Errorf(errTxt)
×
1869
                c.sendErr(errTxt)
×
1870
                c.closeConnection(WrongGateway)
×
1871
                return ErrWrongGateway
×
1872
        }
×
1873

1874
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
618✔
1875
                major, minor, update, _ := versionComponents(mv)
2✔
1876
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1877
                        // We are going to send back an INFO because otherwise recent
1✔
1878
                        // versions of the remote server would simply break the connection
1✔
1879
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1880
                        // other side to just "stall" until we finish waiting for the holding
1✔
1881
                        // period and close the connection below.
1✔
1882
                        s.sendPermsAndAccountInfo(c)
1✔
1883
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1884
                        select {
1✔
1885
                        case <-c.srv.quitCh:
1✔
1886
                        case <-time.After(leafNodeWaitBeforeClose):
×
1887
                        }
1888
                        c.closeConnection(MinimumVersionRequired)
1✔
1889
                        return ErrMinimumVersionRequired
1✔
1890
                }
1891
        }
1892

1893
        // Check if this server supports headers.
1894
        supportHeaders := c.srv.supportsHeaders()
615✔
1895

615✔
1896
        c.mu.Lock()
615✔
1897
        // Leaf Nodes do not do echo or verbose or pedantic.
615✔
1898
        c.opts.Verbose = false
615✔
1899
        c.opts.Echo = false
615✔
1900
        c.opts.Pedantic = false
615✔
1901
        // This inbound connection will be marked as supporting headers if this server
615✔
1902
        // support headers and the remote has sent in the CONNECT protocol that it does
615✔
1903
        // support headers too.
615✔
1904
        c.headers = supportHeaders && proto.Headers
615✔
1905
        // If the compression level is still not set, set it based on what has been
615✔
1906
        // given to us in the CONNECT protocol.
615✔
1907
        if c.leaf.compression == _EMPTY_ {
736✔
1908
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
121✔
1909
                if proto.Compression == _EMPTY_ {
147✔
1910
                        c.leaf.compression = CompressionNotSupported
26✔
1911
                } else {
121✔
1912
                        c.leaf.compression = proto.Compression
95✔
1913
                }
95✔
1914
        }
1915

1916
        // Remember the remote server.
1917
        c.leaf.remoteServer = proto.Name
615✔
1918
        // Remember the remote account name
615✔
1919
        c.leaf.remoteAccName = proto.RemoteAccount
615✔
1920

615✔
1921
        // If the other side has declared itself a hub, so we will take on the spoke role.
615✔
1922
        if proto.Hub {
621✔
1923
                c.leaf.isSpoke = true
6✔
1924
        }
6✔
1925

1926
        // The soliciting side is part of a cluster.
1927
        if proto.Cluster != _EMPTY_ {
1,091✔
1928
                c.leaf.remoteCluster = proto.Cluster
476✔
1929
        }
476✔
1930

1931
        c.leaf.remoteDomain = proto.Domain
615✔
1932

615✔
1933
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
615✔
1934
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
615✔
1935
        if !c.isSolicitedLeafNode() && c.perms != nil {
627✔
1936
                sp, pp := c.perms.sub, c.perms.pub
12✔
1937
                c.perms.sub, c.perms.pub = pp, sp
12✔
1938
                if c.opts.Import != nil {
23✔
1939
                        c.darray = c.opts.Import.Deny
11✔
1940
                } else {
12✔
1941
                        c.darray = nil
1✔
1942
                }
1✔
1943
        }
1944

1945
        // Set the Ping timer
1946
        c.setFirstPingTimer()
615✔
1947

615✔
1948
        // If we received pub deny permissions from the other end, merge with existing ones.
615✔
1949
        c.mergeDenyPermissions(pub, proto.DenyPub)
615✔
1950

615✔
1951
        c.mu.Unlock()
615✔
1952

615✔
1953
        // Register the cluster, even if empty, as long as we are acting as a hub.
615✔
1954
        if !proto.Hub {
1,224✔
1955
                c.acc.registerLeafNodeCluster(proto.Cluster)
609✔
1956
        }
609✔
1957

1958
        // Add in the leafnode here since we passed through auth at this point.
1959
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
615✔
1960

615✔
1961
        // If we have permissions bound to this leafnode we need to send then back to the
615✔
1962
        // origin server for local enforcement.
615✔
1963
        s.sendPermsAndAccountInfo(c)
615✔
1964

615✔
1965
        // Create and initialize the smap since we know our bound account now.
615✔
1966
        // This will send all registered subs too.
615✔
1967
        s.initLeafNodeSmapAndSendSubs(c)
615✔
1968

615✔
1969
        // Announce the account connect event for a leaf node.
615✔
1970
        // This will no-op as needed.
615✔
1971
        s.sendLeafNodeConnect(c.acc)
615✔
1972

615✔
1973
        return nil
615✔
1974
}
1975

1976
// Returns the remote cluster name. This is set only once so does not require a lock.
1977
func (c *client) remoteCluster() string {
128,249✔
1978
        if c.leaf == nil {
128,249✔
1979
                return _EMPTY_
×
1980
        }
×
1981
        return c.leaf.remoteCluster
128,249✔
1982
}
1983

1984
// Sends back an info block to the soliciting leafnode to let it know about
1985
// its permission settings for local enforcement.
1986
func (s *Server) sendPermsAndAccountInfo(c *client) {
616✔
1987
        // Copy
616✔
1988
        info := s.copyLeafNodeInfo()
616✔
1989
        c.mu.Lock()
616✔
1990
        info.CID = c.cid
616✔
1991
        info.Import = c.opts.Import
616✔
1992
        info.Export = c.opts.Export
616✔
1993
        info.RemoteAccount = c.acc.Name
616✔
1994
        info.ConnectInfo = true
616✔
1995
        c.enqueueProto(generateInfoJSON(info))
616✔
1996
        c.mu.Unlock()
616✔
1997
}
616✔
1998

1999
// Snapshot the current subscriptions from the sublist into our smap which
2000
// we will keep updated from now on.
2001
// Also send the registered subscriptions.
2002
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,201✔
2003
        acc := c.acc
1,201✔
2004
        if acc == nil {
1,201✔
2005
                c.Debugf("Leafnode does not have an account bound")
×
2006
                return
×
2007
        }
×
2008
        // Collect all account subs here.
2009
        _subs := [1024]*subscription{}
1,201✔
2010
        subs := _subs[:0]
1,201✔
2011
        ims := []string{}
1,201✔
2012

1,201✔
2013
        // Hold the client lock otherwise there can be a race and miss some subs.
1,201✔
2014
        c.mu.Lock()
1,201✔
2015
        defer c.mu.Unlock()
1,201✔
2016

1,201✔
2017
        acc.mu.RLock()
1,201✔
2018
        accName := acc.Name
1,201✔
2019
        accNTag := acc.nameTag
1,201✔
2020

1,201✔
2021
        // To make printing look better when no friendly name present.
1,201✔
2022
        if accNTag != _EMPTY_ {
1,209✔
2023
                accNTag = "/" + accNTag
8✔
2024
        }
8✔
2025

2026
        // If we are solicited we only send interest for local clients.
2027
        if c.isSpokeLeafNode() {
1,787✔
2028
                acc.sl.localSubs(&subs, true)
586✔
2029
        } else {
1,201✔
2030
                acc.sl.All(&subs)
615✔
2031
        }
615✔
2032

2033
        // Check if we have an existing service import reply.
2034
        siReply := copyBytes(acc.siReply)
1,201✔
2035

1,201✔
2036
        // Since leaf nodes only send on interest, if the bound
1,201✔
2037
        // account has import services we need to send those over.
1,201✔
2038
        for isubj := range acc.imports.services {
5,637✔
2039
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,699✔
2040
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
263✔
2041
                        continue
263✔
2042
                }
2043
                ims = append(ims, isubj)
4,173✔
2044
        }
2045
        // Likewise for mappings.
2046
        for _, m := range acc.mappings {
3,319✔
2047
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,154✔
2048
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2049
                        continue
36✔
2050
                }
2051
                ims = append(ims, m.src)
2,082✔
2052
        }
2053

2054
        // Create a unique subject that will be used for loop detection.
2055
        lds := acc.lds
1,201✔
2056
        acc.mu.RUnlock()
1,201✔
2057

1,201✔
2058
        // Check if we have to create the LDS.
1,201✔
2059
        if lds == _EMPTY_ {
2,149✔
2060
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
948✔
2061
                acc.mu.Lock()
948✔
2062
                acc.lds = lds
948✔
2063
                acc.mu.Unlock()
948✔
2064
        }
948✔
2065

2066
        // Now check for gateway interest. Leafnodes will put this into
2067
        // the proper mode to propagate, but they are not held in the account.
2068
        gwsa := [16]*client{}
1,201✔
2069
        gws := gwsa[:0]
1,201✔
2070
        s.getOutboundGatewayConnections(&gws)
1,201✔
2071
        for _, cgw := range gws {
1,273✔
2072
                cgw.mu.Lock()
72✔
2073
                gw := cgw.gw
72✔
2074
                cgw.mu.Unlock()
72✔
2075
                if gw != nil {
144✔
2076
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
144✔
2077
                                if e := ei.(*outsie); e != nil && e.sl != nil {
144✔
2078
                                        e.sl.All(&subs)
72✔
2079
                                }
72✔
2080
                        }
2081
                }
2082
        }
2083

2084
        applyGlobalRouting := s.gateway.enabled
1,201✔
2085
        if c.isSpokeLeafNode() {
1,787✔
2086
                // Add a fake subscription for this solicited leafnode connection
586✔
2087
                // so that we can send back directly for mapped GW replies.
586✔
2088
                // We need to keep track of this subscription so it can be removed
586✔
2089
                // when the connection is closed so that the GC can release it.
586✔
2090
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
586✔
2091
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
586✔
2092
        }
586✔
2093

2094
        // Now walk the results and add them to our smap
2095
        rc := c.leaf.remoteCluster
1,201✔
2096
        c.leaf.smap = make(map[string]int32)
1,201✔
2097
        for _, sub := range subs {
35,898✔
2098
                // Check perms regardless of role.
34,697✔
2099
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
36,919✔
2100
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,222✔
2101
                        continue
2,222✔
2102
                }
2103
                // We ignore ourselves here.
2104
                // Also don't add the subscription if it has a origin cluster and the
2105
                // cluster name matches the one of the client we are sending to.
2106
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
60,119✔
2107
                        count := int32(1)
27,644✔
2108
                        if len(sub.queue) > 0 && sub.qw > 0 {
27,654✔
2109
                                count = sub.qw
10✔
2110
                        }
10✔
2111
                        c.leaf.smap[keyFromSub(sub)] += count
27,644✔
2112
                        if c.leaf.tsub == nil {
28,772✔
2113
                                c.leaf.tsub = make(map[*subscription]struct{})
1,128✔
2114
                        }
1,128✔
2115
                        c.leaf.tsub[sub] = struct{}{}
27,644✔
2116
                }
2117
        }
2118
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2119
        for _, isubj := range ims {
7,456✔
2120
                c.leaf.smap[isubj]++
6,255✔
2121
        }
6,255✔
2122
        // If we have gateways enabled we need to make sure the other side sends us responses
2123
        // that have been augmented from the original subscription.
2124
        // TODO(dlc) - Should we lock this down more?
2125
        if applyGlobalRouting {
1,293✔
2126
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
92✔
2127
                c.leaf.smap[gwReplyPrefix+">"]++
92✔
2128
        }
92✔
2129
        // Detect loops by subscribing to a specific subject and checking
2130
        // if this sub is coming back to us.
2131
        c.leaf.smap[lds]++
1,201✔
2132

1,201✔
2133
        // Check if we need to add an existing siReply to our map.
1,201✔
2134
        // This will be a prefix so add on the wildcard.
1,201✔
2135
        if siReply != nil {
1,223✔
2136
                wcsub := append(siReply, '>')
22✔
2137
                c.leaf.smap[string(wcsub)]++
22✔
2138
        }
22✔
2139
        // Queue all protocols. There is no max pending limit for LN connection,
2140
        // so we don't need chunking. The writes will happen from the writeLoop.
2141
        var b bytes.Buffer
1,201✔
2142
        for key, n := range c.leaf.smap {
25,184✔
2143
                c.writeLeafSub(&b, key, n)
23,983✔
2144
        }
23,983✔
2145
        if b.Len() > 0 {
2,402✔
2146
                c.enqueueProto(b.Bytes())
1,201✔
2147
        }
1,201✔
2148
        if c.leaf.tsub != nil {
2,330✔
2149
                // Clear the tsub map after 5 seconds.
1,129✔
2150
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,198✔
2151
                        c.mu.Lock()
69✔
2152
                        if c.leaf != nil {
138✔
2153
                                c.leaf.tsub = nil
69✔
2154
                                c.leaf.tsubt = nil
69✔
2155
                        }
69✔
2156
                        c.mu.Unlock()
69✔
2157
                })
2158
        }
2159
}
2160

2161
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2162
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
186,184✔
2163
        acc, err := s.LookupAccount(accName)
186,184✔
2164
        if acc == nil || err != nil {
186,284✔
2165
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
100✔
2166
                return
100✔
2167
        }
100✔
2168
        acc.updateLeafNodes(sub, delta)
186,084✔
2169
}
2170

2171
// updateLeafNodes will make sure to update the account smap for the subscription.
2172
// Will also forward to all leaf nodes as needed.
2173
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
1,950,359✔
2174
        if acc == nil || sub == nil {
1,950,390✔
2175
                return
31✔
2176
        }
31✔
2177

2178
        // We will do checks for no leafnodes and same cluster here inline and under the
2179
        // general account read lock.
2180
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2181
        // blocking routes or GWs.
2182

2183
        acc.mu.RLock()
1,950,328✔
2184
        // First check if we even have leafnodes here.
1,950,328✔
2185
        if acc.nleafs == 0 {
3,838,656✔
2186
                acc.mu.RUnlock()
1,888,328✔
2187
                return
1,888,328✔
2188
        }
1,888,328✔
2189

2190
        // Is this a loop detection subject.
2191
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
62,000✔
2192

62,000✔
2193
        // Capture the cluster even if its empty.
62,000✔
2194
        var cluster string
62,000✔
2195
        if sub.origin != nil {
107,421✔
2196
                cluster = bytesToString(sub.origin)
45,421✔
2197
        }
45,421✔
2198

2199
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2200
        // Empty clusters will return false for the check.
2201
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
81,701✔
2202
                acc.mu.RUnlock()
19,701✔
2203
                return
19,701✔
2204
        }
19,701✔
2205

2206
        // We can release the general account lock.
2207
        acc.mu.RUnlock()
42,299✔
2208

42,299✔
2209
        // We can hold the list lock here to avoid having to copy a large slice.
42,299✔
2210
        acc.lmu.RLock()
42,299✔
2211
        defer acc.lmu.RUnlock()
42,299✔
2212

42,299✔
2213
        // Do this once.
42,299✔
2214
        subject := string(sub.subject)
42,299✔
2215

42,299✔
2216
        // Walk the connected leafnodes.
42,299✔
2217
        for _, ln := range acc.lleafs {
95,209✔
2218
                if ln == sub.client {
80,050✔
2219
                        continue
27,140✔
2220
                }
2221
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2222
                ln.mu.Lock()
25,770✔
2223
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
25,770✔
2224
                // the detection of loops as long as different cluster.
25,770✔
2225
                clusterDifferent := cluster != ln.remoteCluster()
25,770✔
2226
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
47,428✔
2227
                        ln.updateSmap(sub, delta, isLDS)
21,658✔
2228
                }
21,658✔
2229
                ln.mu.Unlock()
25,770✔
2230
        }
2231
}
2232

2233
// This will make an update to our internal smap and determine if we should send out
2234
// an interest update to the remote side.
2235
// Lock should be held.
2236
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
21,658✔
2237
        if c.leaf.smap == nil {
21,658✔
2238
                return
×
2239
        }
×
2240

2241
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2242
        skind := sub.client.kind
21,658✔
2243
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
21,658✔
2244
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
29,221✔
2245
                return
7,563✔
2246
        }
7,563✔
2247

2248
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2249
        if delta > 0 && c.leaf.tsub != nil {
20,780✔
2250
                if _, present := c.leaf.tsub[sub]; present {
6,685✔
2251
                        delete(c.leaf.tsub, sub)
×
2252
                        if len(c.leaf.tsub) == 0 {
×
2253
                                c.leaf.tsub = nil
×
2254
                                c.leaf.tsubt.Stop()
×
2255
                                c.leaf.tsubt = nil
×
2256
                        }
×
2257
                        return
×
2258
                }
2259
        }
2260

2261
        key := keyFromSub(sub)
14,095✔
2262
        n, ok := c.leaf.smap[key]
14,095✔
2263
        if delta < 0 && !ok {
14,963✔
2264
                return
868✔
2265
        }
868✔
2266

2267
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2268
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,227✔
2269
        n += delta
13,227✔
2270
        if n > 0 {
23,189✔
2271
                c.leaf.smap[key] = n
9,962✔
2272
        } else {
13,227✔
2273
                delete(c.leaf.smap, key)
3,265✔
2274
        }
3,265✔
2275
        if update {
21,999✔
2276
                c.sendLeafNodeSubUpdate(key, n)
8,772✔
2277
        }
8,772✔
2278
}
2279

2280
// Used to force add subjects to the subject map.
2281
func (c *client) forceAddToSmap(subj string) {
4✔
2282
        c.mu.Lock()
4✔
2283
        defer c.mu.Unlock()
4✔
2284

4✔
2285
        if c.leaf.smap == nil {
4✔
2286
                return
×
2287
        }
×
2288
        n := c.leaf.smap[subj]
4✔
2289
        if n != 0 {
5✔
2290
                return
1✔
2291
        }
1✔
2292
        // Place into the map since it was not there.
2293
        c.leaf.smap[subj] = 1
3✔
2294
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2295
}
2296

2297
// Used to force remove a subject from the subject map.
2298
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2299
        c.mu.Lock()
1✔
2300
        defer c.mu.Unlock()
1✔
2301

1✔
2302
        if c.leaf.smap == nil {
1✔
2303
                return
×
2304
        }
×
2305
        n := c.leaf.smap[subj]
1✔
2306
        if n == 0 {
1✔
2307
                return
×
2308
        }
×
2309
        n--
1✔
2310
        if n == 0 {
2✔
2311
                // Remove is now zero
1✔
2312
                delete(c.leaf.smap, subj)
1✔
2313
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2314
        } else {
1✔
2315
                c.leaf.smap[subj] = n
×
2316
        }
×
2317
}
2318

2319
// Send the subscription interest change to the other side.
2320
// Lock should be held.
2321
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
8,776✔
2322
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
8,776✔
2323
        if c.isSpokeLeafNode() {
10,916✔
2324
                checkPerms := true
2,140✔
2325
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,453✔
2326
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,313✔
2327
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,313✔
2328
                                strings.HasPrefix(key, gwReplyPrefix) {
1,396✔
2329
                                checkPerms = false
83✔
2330
                        }
83✔
2331
                }
2332
                if checkPerms {
4,197✔
2333
                        var subject string
2,057✔
2334
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,456✔
2335
                                subject = key[:sep]
399✔
2336
                        } else {
2,057✔
2337
                                subject = key
1,658✔
2338
                        }
1,658✔
2339
                        if !c.canSubscribe(subject) {
2,057✔
2340
                                return
×
2341
                        }
×
2342
                }
2343
        }
2344
        // If we are here we can send over to the other side.
2345
        _b := [64]byte{}
8,776✔
2346
        b := bytes.NewBuffer(_b[:0])
8,776✔
2347
        c.writeLeafSub(b, key, n)
8,776✔
2348
        c.enqueueProto(b.Bytes())
8,776✔
2349
}
2350

2351
// Helper function to build the key.
2352
func keyFromSub(sub *subscription) string {
42,680✔
2353
        var sb strings.Builder
42,680✔
2354
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
42,680✔
2355
        sb.Write(sub.subject)
42,680✔
2356
        if sub.queue != nil {
46,148✔
2357
                // Just make the key subject spc group, e.g. 'foo bar'
3,468✔
2358
                sb.WriteByte(' ')
3,468✔
2359
                sb.Write(sub.queue)
3,468✔
2360
        }
3,468✔
2361
        return sb.String()
42,680✔
2362
}
2363

2364
const (
2365
        keyRoutedSub         = "R"
2366
        keyRoutedSubByte     = 'R'
2367
        keyRoutedLeafSub     = "L"
2368
        keyRoutedLeafSubByte = 'L'
2369
)
2370

2371
// Helper function to build the key that prevents collisions between normal
2372
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2373
// Keys will look like this:
2374
// "R foo"          -> plain routed sub on "foo"
2375
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2376
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2377
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2378
func keyFromSubWithOrigin(sub *subscription) string {
502,219✔
2379
        var sb strings.Builder
502,219✔
2380
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
502,219✔
2381
        leaf := len(sub.origin) > 0
502,219✔
2382
        if leaf {
517,572✔
2383
                sb.WriteByte(keyRoutedLeafSubByte)
15,353✔
2384
        } else {
502,219✔
2385
                sb.WriteByte(keyRoutedSubByte)
486,866✔
2386
        }
486,866✔
2387
        sb.WriteByte(' ')
502,219✔
2388
        sb.Write(sub.subject)
502,219✔
2389
        if sub.queue != nil {
528,359✔
2390
                sb.WriteByte(' ')
26,140✔
2391
                sb.Write(sub.queue)
26,140✔
2392
        }
26,140✔
2393
        if leaf {
517,572✔
2394
                sb.WriteByte(' ')
15,353✔
2395
                sb.Write(sub.origin)
15,353✔
2396
        }
15,353✔
2397
        return sb.String()
502,219✔
2398
}
2399

2400
// Lock should be held.
2401
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
32,759✔
2402
        if key == _EMPTY_ {
32,759✔
2403
                return
×
2404
        }
×
2405
        if n > 0 {
62,252✔
2406
                w.WriteString("LS+ " + key)
29,493✔
2407
                // Check for queue semantics, if found write n.
29,493✔
2408
                if strings.Contains(key, " ") {
31,648✔
2409
                        w.WriteString(" ")
2,155✔
2410
                        var b [12]byte
2,155✔
2411
                        var i = len(b)
2,155✔
2412
                        for l := n; l > 0; l /= 10 {
5,230✔
2413
                                i--
3,075✔
2414
                                b[i] = digits[l%10]
3,075✔
2415
                        }
3,075✔
2416
                        w.Write(b[i:])
2,155✔
2417
                        if c.trace {
2,155✔
2418
                                arg := fmt.Sprintf("%s %d", key, n)
×
2419
                                c.traceOutOp("LS+", []byte(arg))
×
2420
                        }
×
2421
                } else if c.trace {
27,534✔
2422
                        c.traceOutOp("LS+", []byte(key))
196✔
2423
                }
196✔
2424
        } else {
3,266✔
2425
                w.WriteString("LS- " + key)
3,266✔
2426
                if c.trace {
3,279✔
2427
                        c.traceOutOp("LS-", []byte(key))
13✔
2428
                }
13✔
2429
        }
2430
        w.WriteString(CR_LF)
32,759✔
2431
}
2432

2433
// processLeafSub will process an inbound sub request for the remote leaf node.
2434
func (c *client) processLeafSub(argo []byte) (err error) {
29,293✔
2435
        // Indicate activity.
29,293✔
2436
        c.in.subs++
29,293✔
2437

29,293✔
2438
        srv := c.srv
29,293✔
2439
        if srv == nil {
29,293✔
2440
                return nil
×
2441
        }
×
2442

2443
        // Copy so we do not reference a potentially large buffer
2444
        arg := make([]byte, len(argo))
29,293✔
2445
        copy(arg, argo)
29,293✔
2446

29,293✔
2447
        args := splitArg(arg)
29,293✔
2448
        sub := &subscription{client: c}
29,293✔
2449

29,293✔
2450
        delta := int32(1)
29,293✔
2451
        switch len(args) {
29,293✔
2452
        case 1:
27,195✔
2453
                sub.queue = nil
27,195✔
2454
        case 3:
2,098✔
2455
                sub.queue = args[1]
2,098✔
2456
                sub.qw = int32(parseSize(args[2]))
2,098✔
2457
                // TODO: (ik) We should have a non empty queue name and a queue
2,098✔
2458
                // weight >= 1. For 2.11, we may want to return an error if that
2,098✔
2459
                // is not the case, but for now just overwrite `delta` if queue
2,098✔
2460
                // weight is greater than 1 (it is possible after a reconnect/
2,098✔
2461
                // server restart to receive a queue weight > 1 for a new sub).
2,098✔
2462
                if sub.qw > 1 {
3,717✔
2463
                        delta = sub.qw
1,619✔
2464
                }
1,619✔
2465
        default:
×
2466
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2467
        }
2468
        sub.subject = args[0]
29,293✔
2469

29,293✔
2470
        c.mu.Lock()
29,293✔
2471
        if c.isClosed() {
29,305✔
2472
                c.mu.Unlock()
12✔
2473
                return nil
12✔
2474
        }
12✔
2475

2476
        acc := c.acc
29,281✔
2477
        // Check if we have a loop.
29,281✔
2478
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
29,281✔
2479

29,281✔
2480
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
29,288✔
2481
                c.mu.Unlock()
7✔
2482
                c.handleLeafNodeLoop(true)
7✔
2483
                return nil
7✔
2484
        }
7✔
2485

2486
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2487
        checkPerms := true
29,274✔
2488
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
55,884✔
2489
                if ldsPrefix ||
26,610✔
2490
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
26,610✔
2491
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
28,449✔
2492
                        checkPerms = false
1,839✔
2493
                }
1,839✔
2494
        }
2495

2496
        // If we are a hub check that we can publish to this subject.
2497
        if checkPerms {
56,709✔
2498
                subj := string(sub.subject)
27,435✔
2499
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
27,756✔
2500
                        c.mu.Unlock()
321✔
2501
                        c.leafSubPermViolation(sub.subject)
321✔
2502
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
321✔
2503
                        return nil
321✔
2504
                }
321✔
2505
        }
2506

2507
        // Check if we have a maximum on the number of subscriptions.
2508
        if c.subsAtLimit() {
28,961✔
2509
                c.mu.Unlock()
8✔
2510
                c.maxSubsExceeded()
8✔
2511
                return nil
8✔
2512
        }
8✔
2513

2514
        // If we have an origin cluster associated mark that in the sub.
2515
        if rc := c.remoteCluster(); rc != _EMPTY_ {
54,980✔
2516
                sub.origin = []byte(rc)
26,035✔
2517
        }
26,035✔
2518

2519
        // Like Routes, we store local subs by account and subject and optionally queue name.
2520
        // If we have a queue it will have a trailing weight which we do not want.
2521
        if sub.queue != nil {
30,754✔
2522
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,809✔
2523
        } else {
28,945✔
2524
                sub.sid = arg
27,136✔
2525
        }
27,136✔
2526
        key := bytesToString(sub.sid)
28,945✔
2527
        osub := c.subs[key]
28,945✔
2528
        updateGWs := false
28,945✔
2529
        if osub == nil {
56,426✔
2530
                c.subs[key] = sub
27,481✔
2531
                // Now place into the account sl.
27,481✔
2532
                if err := acc.sl.Insert(sub); err != nil {
27,481✔
2533
                        delete(c.subs, key)
×
2534
                        c.mu.Unlock()
×
2535
                        c.Errorf("Could not insert subscription: %v", err)
×
2536
                        c.sendErr("Invalid Subscription")
×
2537
                        return nil
×
2538
                }
×
2539
                updateGWs = srv.gateway.enabled
27,481✔
2540
        } else if sub.queue != nil {
2,927✔
2541
                // For a queue we need to update the weight.
1,463✔
2542
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,463✔
2543
                atomic.StoreInt32(&osub.qw, sub.qw)
1,463✔
2544
                acc.sl.UpdateRemoteQSub(osub)
1,463✔
2545
        }
1,463✔
2546
        spoke := c.isSpokeLeafNode()
28,945✔
2547
        c.mu.Unlock()
28,945✔
2548

28,945✔
2549
        // Only add in shadow subs if a new sub or qsub.
28,945✔
2550
        if osub == nil {
56,426✔
2551
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
27,481✔
2552
                        c.Errorf(err.Error())
×
2553
                }
×
2554
        }
2555

2556
        // If we are not solicited, treat leaf node subscriptions similar to a
2557
        // client subscription, meaning we forward them to routes, gateways and
2558
        // other leaf nodes as needed.
2559
        if !spoke {
39,093✔
2560
                // If we are routing add to the route map for the associated account.
10,148✔
2561
                srv.updateRouteSubscriptionMap(acc, sub, delta)
10,148✔
2562
                if updateGWs {
11,435✔
2563
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,287✔
2564
                }
1,287✔
2565
        }
2566
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2567
        // and non-solicited state in this call so we will do the right thing.
2568
        acc.updateLeafNodes(sub, delta)
28,945✔
2569

28,945✔
2570
        return nil
28,945✔
2571
}
2572

2573
// If the leafnode is a solicited, set the connect delay based on default
2574
// or private option (for tests). Sends the error to the other side, log and
2575
// close the connection.
2576
func (c *client) handleLeafNodeLoop(sendErr bool) {
18✔
2577
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
18✔
2578
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
18✔
2579
        if sendErr {
27✔
2580
                c.sendErr(errTxt)
9✔
2581
        }
9✔
2582

2583
        c.Errorf(errTxt)
18✔
2584
        // If we are here with "sendErr" false, it means that this is the server
18✔
2585
        // that received the error. The other side will have closed the connection,
18✔
2586
        // but does not hurt to close here too.
18✔
2587
        c.closeConnection(ProtocolViolation)
18✔
2588
}
2589

2590
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2591
func (c *client) processLeafUnsub(arg []byte) error {
3,032✔
2592
        // Indicate any activity, so pub and sub or unsubs.
3,032✔
2593
        c.in.subs++
3,032✔
2594

3,032✔
2595
        acc := c.acc
3,032✔
2596
        srv := c.srv
3,032✔
2597

3,032✔
2598
        c.mu.Lock()
3,032✔
2599
        if c.isClosed() {
3,085✔
2600
                c.mu.Unlock()
53✔
2601
                return nil
53✔
2602
        }
53✔
2603

2604
        updateGWs := false
2,979✔
2605
        spoke := c.isSpokeLeafNode()
2,979✔
2606
        // We store local subs by account and subject and optionally queue name.
2,979✔
2607
        // LS- will have the arg exactly as the key.
2,979✔
2608
        sub, ok := c.subs[string(arg)]
2,979✔
2609
        delta := int32(1)
2,979✔
2610
        if ok && len(sub.queue) > 0 {
3,297✔
2611
                delta = sub.qw
318✔
2612
        }
318✔
2613
        c.mu.Unlock()
2,979✔
2614

2,979✔
2615
        if ok {
5,945✔
2616
                c.unsubscribe(acc, sub, true, true)
2,966✔
2617
                updateGWs = srv.gateway.enabled
2,966✔
2618
        }
2,966✔
2619

2620
        if !spoke {
3,977✔
2621
                // If we are routing subtract from the route map for the associated account.
998✔
2622
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
998✔
2623
                // Gateways
998✔
2624
                if updateGWs {
1,261✔
2625
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
263✔
2626
                }
263✔
2627
        }
2628
        // Now check on leafnode updates for other leaf nodes.
2629
        acc.updateLeafNodes(sub, -delta)
2,979✔
2630
        return nil
2,979✔
2631
}
2632

2633
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
481✔
2634
        // Unroll splitArgs to avoid runtime/heap issues
481✔
2635
        a := [MAX_MSG_ARGS][]byte{}
481✔
2636
        args := a[:0]
481✔
2637
        start := -1
481✔
2638
        for i, b := range arg {
31,545✔
2639
                switch b {
31,064✔
2640
                case ' ', '\t', '\r', '\n':
1,370✔
2641
                        if start >= 0 {
2,740✔
2642
                                args = append(args, arg[start:i])
1,370✔
2643
                                start = -1
1,370✔
2644
                        }
1,370✔
2645
                default:
29,694✔
2646
                        if start < 0 {
31,545✔
2647
                                start = i
1,851✔
2648
                        }
1,851✔
2649
                }
2650
        }
2651
        if start >= 0 {
962✔
2652
                args = append(args, arg[start:])
481✔
2653
        }
481✔
2654

2655
        c.pa.arg = arg
481✔
2656
        switch len(args) {
481✔
2657
        case 0, 1, 2:
×
2658
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2659
        case 3:
91✔
2660
                c.pa.reply = nil
91✔
2661
                c.pa.queues = nil
91✔
2662
                c.pa.hdb = args[1]
91✔
2663
                c.pa.hdr = parseSize(args[1])
91✔
2664
                c.pa.szb = args[2]
91✔
2665
                c.pa.size = parseSize(args[2])
91✔
2666
        case 4:
376✔
2667
                c.pa.reply = args[1]
376✔
2668
                c.pa.queues = nil
376✔
2669
                c.pa.hdb = args[2]
376✔
2670
                c.pa.hdr = parseSize(args[2])
376✔
2671
                c.pa.szb = args[3]
376✔
2672
                c.pa.size = parseSize(args[3])
376✔
2673
        default:
14✔
2674
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2675
                if len(args[1]) != 1 {
14✔
2676
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2677
                }
×
2678
                switch args[1][0] {
14✔
2679
                case '+':
4✔
2680
                        c.pa.reply = args[2]
4✔
2681
                case '|':
10✔
2682
                        c.pa.reply = nil
10✔
2683
                default:
×
2684
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2685
                }
2686
                // Grab header size.
2687
                c.pa.hdb = args[len(args)-2]
14✔
2688
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2689

14✔
2690
                // Grab size.
14✔
2691
                c.pa.szb = args[len(args)-1]
14✔
2692
                c.pa.size = parseSize(c.pa.szb)
14✔
2693

14✔
2694
                // Grab queue names.
14✔
2695
                if c.pa.reply != nil {
18✔
2696
                        c.pa.queues = args[3 : len(args)-2]
4✔
2697
                } else {
14✔
2698
                        c.pa.queues = args[2 : len(args)-2]
10✔
2699
                }
10✔
2700
        }
2701
        if c.pa.hdr < 0 {
481✔
2702
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2703
        }
×
2704
        if c.pa.size < 0 {
481✔
2705
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2706
        }
×
2707
        if c.pa.hdr > c.pa.size {
481✔
2708
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2709
        }
×
2710

2711
        // Common ones processed after check for arg length
2712
        c.pa.subject = args[0]
481✔
2713

481✔
2714
        return nil
481✔
2715
}
2716

2717
func (c *client) processLeafMsgArgs(arg []byte) error {
50,670✔
2718
        // Unroll splitArgs to avoid runtime/heap issues
50,670✔
2719
        a := [MAX_MSG_ARGS][]byte{}
50,670✔
2720
        args := a[:0]
50,670✔
2721
        start := -1
50,670✔
2722
        for i, b := range arg {
1,447,219✔
2723
                switch b {
1,396,549✔
2724
                case ' ', '\t', '\r', '\n':
98,191✔
2725
                        if start >= 0 {
196,382✔
2726
                                args = append(args, arg[start:i])
98,191✔
2727
                                start = -1
98,191✔
2728
                        }
98,191✔
2729
                default:
1,298,358✔
2730
                        if start < 0 {
1,447,219✔
2731
                                start = i
148,861✔
2732
                        }
148,861✔
2733
                }
2734
        }
2735
        if start >= 0 {
101,340✔
2736
                args = append(args, arg[start:])
50,670✔
2737
        }
50,670✔
2738

2739
        c.pa.arg = arg
50,670✔
2740
        switch len(args) {
50,670✔
2741
        case 0, 1:
×
2742
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2743
        case 2:
25,845✔
2744
                c.pa.reply = nil
25,845✔
2745
                c.pa.queues = nil
25,845✔
2746
                c.pa.szb = args[1]
25,845✔
2747
                c.pa.size = parseSize(args[1])
25,845✔
2748
        case 3:
2,290✔
2749
                c.pa.reply = args[1]
2,290✔
2750
                c.pa.queues = nil
2,290✔
2751
                c.pa.szb = args[2]
2,290✔
2752
                c.pa.size = parseSize(args[2])
2,290✔
2753
        default:
22,535✔
2754
                // args[1] is our reply indicator. Should be + or | normally.
22,535✔
2755
                if len(args[1]) != 1 {
22,535✔
2756
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2757
                }
×
2758
                switch args[1][0] {
22,535✔
2759
                case '+':
161✔
2760
                        c.pa.reply = args[2]
161✔
2761
                case '|':
22,374✔
2762
                        c.pa.reply = nil
22,374✔
2763
                default:
×
2764
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2765
                }
2766
                // Grab size.
2767
                c.pa.szb = args[len(args)-1]
22,535✔
2768
                c.pa.size = parseSize(c.pa.szb)
22,535✔
2769

22,535✔
2770
                // Grab queue names.
22,535✔
2771
                if c.pa.reply != nil {
22,696✔
2772
                        c.pa.queues = args[3 : len(args)-1]
161✔
2773
                } else {
22,535✔
2774
                        c.pa.queues = args[2 : len(args)-1]
22,374✔
2775
                }
22,374✔
2776
        }
2777
        if c.pa.size < 0 {
50,670✔
2778
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2779
        }
×
2780

2781
        // Common ones processed after check for arg length
2782
        c.pa.subject = args[0]
50,670✔
2783

50,670✔
2784
        return nil
50,670✔
2785
}
2786

2787
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2788
func (c *client) processInboundLeafMsg(msg []byte) {
49,592✔
2789
        // Update statistics
49,592✔
2790
        // The msg includes the CR_LF, so pull back out for accounting.
49,592✔
2791
        c.in.msgs++
49,592✔
2792
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
49,592✔
2793

49,592✔
2794
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
49,592✔
2795

49,592✔
2796
        // Mostly under testing scenarios.
49,592✔
2797
        if srv == nil || acc == nil {
49,594✔
2798
                return
2✔
2799
        }
2✔
2800

2801
        // Match the subscriptions. We will use our own L1 map if
2802
        // it's still valid, avoiding contention on the shared sublist.
2803
        var r *SublistResult
49,590✔
2804
        var ok bool
49,590✔
2805

49,590✔
2806
        genid := atomic.LoadUint64(&c.acc.sl.genid)
49,590✔
2807
        if genid == c.in.genid && c.in.results != nil {
96,830✔
2808
                r, ok = c.in.results[subject]
47,240✔
2809
        } else {
49,590✔
2810
                // Reset our L1 completely.
2,350✔
2811
                c.in.results = make(map[string]*SublistResult)
2,350✔
2812
                c.in.genid = genid
2,350✔
2813
        }
2,350✔
2814

2815
        // Go back to the sublist data structure.
2816
        if !ok {
72,736✔
2817
                r = c.acc.sl.Match(subject)
23,146✔
2818
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
23,146✔
2819
                if len(c.in.results) >= maxResultCacheSize {
23,672✔
2820
                        n := 0
526✔
2821
                        for subj := range c.in.results {
17,884✔
2822
                                delete(c.in.results, subj)
17,358✔
2823
                                if n++; n > pruneSize {
17,884✔
2824
                                        break
526✔
2825
                                }
2826
                        }
2827
                }
2828
                // Then add the new cache entry.
2829
                c.in.results[subject] = r
23,146✔
2830
        }
2831

2832
        // Collect queue names if needed.
2833
        var qnames [][]byte
49,590✔
2834

49,590✔
2835
        // Check for no interest, short circuit if so.
49,590✔
2836
        // This is the fanout scale.
49,590✔
2837
        if len(r.psubs)+len(r.qsubs) > 0 {
98,316✔
2838
                flag := pmrNoFlag
48,726✔
2839
                // If we have queue subs in this cluster, then if we run in gateway
48,726✔
2840
                // mode and the remote gateways have queue subs, then we need to
48,726✔
2841
                // collect the queue groups this message was sent to so that we
48,726✔
2842
                // exclude them when sending to gateways.
48,726✔
2843
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
48,726✔
2844
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
60,983✔
2845
                        flag |= pmrCollectQueueNames
12,257✔
2846
                }
12,257✔
2847
                // If this is a mapped subject that means the mapped interest
2848
                // is what got us here, but this might not have a queue designation
2849
                // If that is the case, make sure we ignore to process local queue subscribers.
2850
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
49,028✔
2851
                        flag |= pmrIgnoreEmptyQueueFilter
302✔
2852
                }
302✔
2853
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
48,726✔
2854
        }
2855

2856
        // Now deal with gateways
2857
        if c.srv.gateway.enabled {
63,296✔
2858
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames)
13,706✔
2859
        }
13,706✔
2860
}
2861

2862
// Handles a subscription permission violation.
2863
// See leafPermViolation() for details.
2864
func (c *client) leafSubPermViolation(subj []byte) {
321✔
2865
        c.leafPermViolation(false, subj)
321✔
2866
}
321✔
2867

2868
// Common function to process publish or subscribe leafnode permission violation.
2869
// Sends the permission violation error to the remote, logs it and closes the connection.
2870
// If this is from a server soliciting, the reconnection will be delayed.
2871
func (c *client) leafPermViolation(pub bool, subj []byte) {
321✔
2872
        if c.isSpokeLeafNode() {
642✔
2873
                // For spokes these are no-ops since the hub server told us our permissions.
321✔
2874
                // We just need to not send these over to the other side since we will get cutoff.
321✔
2875
                return
321✔
2876
        }
321✔
2877
        // FIXME(dlc) ?
2878
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2879
        var action string
×
2880
        if pub {
×
2881
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2882
                action = "Publish"
×
2883
        } else {
×
2884
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2885
                action = "Subscription"
×
2886
        }
×
2887
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2888
        // TODO: add a new close reason that is more appropriate?
×
2889
        c.closeConnection(ProtocolViolation)
×
2890
}
2891

2892
// Invoked from generic processErr() for LEAF connections.
2893
func (c *client) leafProcessErr(errStr string) {
30✔
2894
        // Check if we got a cluster name collision.
30✔
2895
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
34✔
2896
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2897
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2898
                return
4✔
2899
        }
4✔
2900

2901
        // We will look for Loop detected error coming from the other side.
2902
        // If we solicit, set the connect delay.
2903
        if !strings.Contains(errStr, "Loop detected") {
43✔
2904
                return
17✔
2905
        }
17✔
2906
        c.handleLeafNodeLoop(false)
9✔
2907
}
2908

2909
// If this leaf connection solicits, sets the connect delay to the given value,
2910
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2911
// Returns the connection's account name and delay.
2912
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
22✔
2913
        c.mu.Lock()
22✔
2914
        if c.isSolicitedLeafNode() {
35✔
2915
                if s := c.srv; s != nil {
26✔
2916
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
18✔
2917
                                delay = srvdelay
5✔
2918
                        }
5✔
2919
                }
2920
                c.leaf.remote.setConnectDelay(delay)
13✔
2921
        }
2922
        accName := c.acc.Name
22✔
2923
        c.mu.Unlock()
22✔
2924
        return accName, delay
22✔
2925
}
2926

2927
// For the given remote Leafnode configuration, this function returns
2928
// if TLS is required, and if so, will return a clone of the TLS Config
2929
// (since some fields will be changed during handshake), the TLS server
2930
// name that is remembered, and the TLS timeout.
2931
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,722✔
2932
        var (
1,722✔
2933
                tlsConfig  *tls.Config
1,722✔
2934
                tlsName    string
1,722✔
2935
                tlsTimeout float64
1,722✔
2936
        )
1,722✔
2937

1,722✔
2938
        remote.RLock()
1,722✔
2939
        defer remote.RUnlock()
1,722✔
2940

1,722✔
2941
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,722✔
2942
        if tlsRequired {
1,796✔
2943
                if remote.TLSConfig != nil {
125✔
2944
                        tlsConfig = remote.TLSConfig.Clone()
51✔
2945
                } else {
74✔
2946
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
23✔
2947
                }
23✔
2948
                tlsName = remote.tlsName
74✔
2949
                tlsTimeout = remote.TLSTimeout
74✔
2950
                if tlsTimeout == 0 {
114✔
2951
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
40✔
2952
                }
40✔
2953
        }
2954

2955
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,722✔
2956
}
2957

2958
// Initiates the LeafNode Websocket connection by:
2959
// - doing the TLS handshake if needed
2960
// - sending the HTTP request
2961
// - waiting for the HTTP response
2962
//
2963
// Since some bufio reader is used to consume the HTTP response, this function
2964
// returns the slice of buffered bytes (if any) so that the readLoop that will
2965
// be started after that consume those first before reading from the socket.
2966
// The boolean
2967
//
2968
// Lock held on entry.
2969
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
2970
        remote.RLock()
43✔
2971
        compress := remote.Websocket.Compression
43✔
2972
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
2973
        noMasking := remote.Websocket.NoMasking
43✔
2974
        infoTimeout := remote.FirstInfoTimeout
43✔
2975
        remote.RUnlock()
43✔
2976
        // Will do the client-side TLS handshake if needed.
43✔
2977
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
2978
        if err != nil {
47✔
2979
                // 0 will indicate that the connection was already closed
4✔
2980
                return nil, 0, err
4✔
2981
        }
4✔
2982

2983
        // For http request, we need the passed URL to contain either http or https scheme.
2984
        scheme := "http"
39✔
2985
        if tlsRequired {
47✔
2986
                scheme = "https"
8✔
2987
        }
8✔
2988
        // We will use the `/leafnode` path to tell the accepting WS server that it should
2989
        // create a LEAF connection, not a CLIENT.
2990
        // In case we use the user's URL path in the future, make sure we append the user's
2991
        // path to our `/leafnode` path.
2992
        lpath := leafNodeWSPath
39✔
2993
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
2994
                if curPath[0] == '/' {
42✔
2995
                        curPath = curPath[1:]
21✔
2996
                }
21✔
2997
                lpath = path.Join(curPath, lpath)
21✔
2998
        } else {
18✔
2999
                lpath = lpath[1:]
18✔
3000
        }
18✔
3001
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3002
        u, _ := url.Parse(ustr)
39✔
3003
        req := &http.Request{
39✔
3004
                Method:     "GET",
39✔
3005
                URL:        u,
39✔
3006
                Proto:      "HTTP/1.1",
39✔
3007
                ProtoMajor: 1,
39✔
3008
                ProtoMinor: 1,
39✔
3009
                Header:     make(http.Header),
39✔
3010
                Host:       u.Host,
39✔
3011
        }
39✔
3012
        wsKey, err := wsMakeChallengeKey()
39✔
3013
        if err != nil {
39✔
3014
                return nil, WriteError, err
×
3015
        }
×
3016

3017
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3018
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3019
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3020
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3021
        if compress {
48✔
3022
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3023
        }
9✔
3024
        if noMasking {
49✔
3025
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3026
        }
10✔
3027
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3028
        if err := req.Write(c.nc); err != nil {
39✔
3029
                return nil, WriteError, err
×
3030
        }
×
3031

3032
        var resp *http.Response
39✔
3033

39✔
3034
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3035
        resp, err = http.ReadResponse(br, req)
39✔
3036
        if err == nil &&
39✔
3037
                (resp.StatusCode != 101 ||
39✔
3038
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3039
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3040
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3041

1✔
3042
                err = fmt.Errorf("invalid websocket connection")
1✔
3043
        }
1✔
3044
        // Check compression extension...
3045
        if err == nil && c.ws.compress {
48✔
3046
                // Check that not only permessage-deflate extension is present, but that
9✔
3047
                // we also have server and client no context take over.
9✔
3048
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3049

9✔
3050
                // If server does not support compression, then simply disable it in our side.
9✔
3051
                if !srvCompress {
13✔
3052
                        c.ws.compress = false
4✔
3053
                } else if !noCtxTakeover {
9✔
3054
                        err = fmt.Errorf("compression negotiation error")
×
3055
                }
×
3056
        }
3057
        // Same for no masking...
3058
        if err == nil && noMasking {
49✔
3059
                // Check if server accepts no masking
10✔
3060
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3061
                        // Nope, need to mask our writes as any client would do.
1✔
3062
                        c.ws.maskwrite = true
1✔
3063
                }
1✔
3064
        }
3065
        if resp != nil {
67✔
3066
                resp.Body.Close()
28✔
3067
        }
28✔
3068
        if err != nil {
51✔
3069
                return nil, ReadError, err
12✔
3070
        }
12✔
3071
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3072

27✔
3073
        var preBuf []byte
27✔
3074
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3075
        if n := br.Buffered(); n != 0 {
28✔
3076
                preBuf, _ = br.Peek(n)
1✔
3077
        }
1✔
3078
        return preBuf, 0, nil
27✔
3079
}
3080

3081
const connectProcessTimeout = 2 * time.Second
3082

3083
// This is invoked for remote LEAF remote connections after processing the INFO
3084
// protocol.
3085
func (s *Server) leafNodeResumeConnectProcess(c *client) {
606✔
3086
        clusterName := s.ClusterName()
606✔
3087

606✔
3088
        c.mu.Lock()
606✔
3089
        if c.isClosed() {
606✔
3090
                c.mu.Unlock()
×
3091
                return
×
3092
        }
×
3093
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
608✔
3094
                c.mu.Unlock()
2✔
3095
                c.closeConnection(WriteError)
2✔
3096
                return
2✔
3097
        }
2✔
3098

3099
        // Spin up the write loop.
3100
        s.startGoRoutine(func() { c.writeLoop() })
1,208✔
3101

3102
        // timeout leafNodeFinishConnectProcess
3103
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
604✔
3104
                c.mu.Lock()
×
3105
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3106
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3107
                        c.mu.Unlock()
×
3108
                        return
×
3109
                }
×
3110
                clearTimer(&c.ping.tmr)
×
3111
                closed := c.isClosed()
×
3112
                c.mu.Unlock()
×
3113
                if !closed {
×
3114
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3115
                        c.closeConnection(StaleConnection)
×
3116
                }
×
3117
        })
3118
        c.mu.Unlock()
604✔
3119
        c.Debugf("Remote leafnode connect msg sent")
604✔
3120
}
3121

3122
// This is invoked for remote LEAF connections after processing the INFO
3123
// protocol and leafNodeResumeConnectProcess.
3124
// This will send LS+ the CONNECT protocol and register the leaf node.
3125
func (s *Server) leafNodeFinishConnectProcess(c *client) {
588✔
3126
        c.mu.Lock()
588✔
3127
        if !c.flags.setIfNotSet(connectProcessFinished) {
588✔
3128
                c.mu.Unlock()
×
3129
                return
×
3130
        }
×
3131
        if c.isClosed() {
588✔
3132
                c.mu.Unlock()
×
3133
                s.removeLeafNodeConnection(c)
×
3134
                return
×
3135
        }
×
3136
        remote := c.leaf.remote
588✔
3137
        // Check if we will need to send the system connect event.
588✔
3138
        remote.RLock()
588✔
3139
        sendSysConnectEvent := remote.Hub
588✔
3140
        remote.RUnlock()
588✔
3141

588✔
3142
        // Capture account before releasing lock
588✔
3143
        acc := c.acc
588✔
3144
        // cancel connectProcessTimeout
588✔
3145
        clearTimer(&c.ping.tmr)
588✔
3146
        c.mu.Unlock()
588✔
3147

588✔
3148
        // Make sure we register with the account here.
588✔
3149
        if err := c.registerWithAccount(acc); err != nil {
590✔
3150
                if err == ErrTooManyAccountConnections {
2✔
3151
                        c.maxAccountConnExceeded()
×
3152
                        return
×
3153
                } else if err == ErrLeafNodeLoop {
4✔
3154
                        c.handleLeafNodeLoop(true)
2✔
3155
                        return
2✔
3156
                }
2✔
3157
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3158
                c.closeConnection(ProtocolViolation)
×
3159
                return
×
3160
        }
3161
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
586✔
3162
        s.initLeafNodeSmapAndSendSubs(c)
586✔
3163
        if sendSysConnectEvent {
592✔
3164
                s.sendLeafNodeConnect(acc)
6✔
3165
        }
6✔
3166

3167
        // The above functions are not atomically under the client
3168
        // lock doing those operations. It is possible - since we
3169
        // have started the read/write loops - that the connection
3170
        // is closed before or in between. This would leave the
3171
        // closed LN connection possible registered with the account
3172
        // and/or the server's leafs map. So check if connection
3173
        // is closed, and if so, manually cleanup.
3174
        c.mu.Lock()
586✔
3175
        closed := c.isClosed()
586✔
3176
        if !closed {
1,172✔
3177
                c.setFirstPingTimer()
586✔
3178
        }
586✔
3179
        c.mu.Unlock()
586✔
3180
        if closed {
586✔
3181
                s.removeLeafNodeConnection(c)
×
3182
                if prev := acc.removeClient(c); prev == 1 {
×
3183
                        s.decActiveAccounts()
×
3184
                }
×
3185
        }
3186
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc