• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 14099044173

26 Mar 2025 09:39PM UTC coverage: 85.54% (-0.05%) from 85.585%
14099044173

push

github

web-flow
[FIXED] JetStream clustering: observer incorrectly elected leader (#6730)

In some leafnodes setups with shared system account, the leafnode is
supposed to set itself as observer and step down so that only servers in
the hub cluster can be elected leaders. A test was flapping but it turns
out that there was a race condition that could cause a server that has
chosen to be in observer mode and step down to still end-up being
elected.

Added a dedicated test that was able to reproduce the issue more easily
than the leafnode test.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

69358 of 81083 relevant lines covered (85.54%)

500082.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.55
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
1,973✔
120
        return c.kind == LEAF && c.leaf.remote != nil
1,973✔
121
}
1,973✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
8,559,058✔
126
        return c.kind == LEAF && c.leaf.isSpoke
8,559,058✔
127
}
8,559,058✔
128

129
func (c *client) isHubLeafNode() bool {
17,451✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
17,451✔
131
}
17,451✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
540✔
135
        sysAccName := _EMPTY_
540✔
136
        sAcc := s.SystemAccount()
540✔
137
        if sAcc != nil {
1,057✔
138
                sysAccName = sAcc.Name
517✔
139
        }
517✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,215✔
141
                s.mu.Lock()
675✔
142
                remote := newLeafNodeCfg(r)
675✔
143
                creds := remote.Credentials
675✔
144
                accName := remote.LocalAccount
675✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
675✔
146
                // Print notice if
675✔
147
                if isSysAccRemote {
768✔
148
                        if len(remote.DenyExports) > 0 {
94✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
94✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
675✔
156
                if creds != _EMPTY_ {
722✔
157
                        contents, err := os.ReadFile(creds)
47✔
158
                        defer wipeSlice(contents)
47✔
159
                        if err != nil {
47✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
47✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
47✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
47✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
51✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
43✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
49✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
675✔
178
        }
179
        for _, r := range remotes {
1,215✔
180
                remote := addRemote(r, r.LocalAccount == sysAccName)
675✔
181
                s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,350✔
182
        }
183
}
184

185
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
1,786✔
186
        for _, ri := range s.getOpts().LeafNode.Remotes {
3,932✔
187
                // FIXME(dlc) - What about auth changes?
2,146✔
188
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
3,932✔
189
                        return true
1,786✔
190
                }
1,786✔
191
        }
192
        return false
×
193
}
194

195
// Ensure that leafnode is properly configured.
196
func validateLeafNode(o *Options) error {
7,831✔
197
        if err := validateLeafNodeAuthOptions(o); err != nil {
7,833✔
198
                return err
2✔
199
        }
2✔
200

201
        // Users can bind to any local account, if its empty we will assume the $G account.
202
        for _, r := range o.LeafNode.Remotes {
8,529✔
203
                if r.LocalAccount == _EMPTY_ {
1,128✔
204
                        r.LocalAccount = globalAccountName
428✔
205
                }
428✔
206
        }
207

208
        // In local config mode, check that leafnode configuration refers to accounts that exist.
209
        if len(o.TrustedOperators) == 0 {
15,356✔
210
                accNames := map[string]struct{}{}
7,527✔
211
                for _, a := range o.Accounts {
15,998✔
212
                        accNames[a.Name] = struct{}{}
8,471✔
213
                }
8,471✔
214
                // global account is always created
215
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,527✔
216
                // in the context of leaf nodes, empty account means global account
7,527✔
217
                accNames[_EMPTY_] = struct{}{}
7,527✔
218
                // system account either exists or, if not disabled, will be created
7,527✔
219
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,371✔
220
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
5,844✔
221
                }
5,844✔
222
                checkAccountExists := func(accName string, cfgType string) error {
15,760✔
223
                        if _, ok := accNames[accName]; !ok {
8,235✔
224
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
225
                        }
2✔
226
                        return nil
8,231✔
227
                }
228
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,528✔
229
                        return err
1✔
230
                }
1✔
231
                for _, lu := range o.LeafNode.Users {
7,536✔
232
                        if lu.Account == nil { // means global account
13✔
233
                                continue
3✔
234
                        }
235
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
236
                                return err
×
237
                        }
×
238
                }
239
                for _, r := range o.LeafNode.Remotes {
8,225✔
240
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
700✔
241
                                return err
1✔
242
                        }
1✔
243
                }
244
        } else {
302✔
245
                if len(o.LeafNode.Users) != 0 {
303✔
246
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
247
                }
1✔
248
                for _, r := range o.LeafNode.Remotes {
302✔
249
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
250
                                return fmt.Errorf(
1✔
251
                                        "operator mode requires account nkeys in remotes. " +
1✔
252
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
253
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
254
                        }
1✔
255
                }
256
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
301✔
257
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
258
                }
1✔
259
        }
260

261
        // Validate compression settings
262
        if o.LeafNode.Compression.Mode != _EMPTY_ {
11,292✔
263
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,473✔
264
                        return err
5✔
265
                }
5✔
266
        }
267

268
        // If a remote has a websocket scheme, all need to have it.
269
        for _, rcfg := range o.LeafNode.Remotes {
8,517✔
270
                if len(rcfg.URLs) >= 2 {
904✔
271
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
206✔
272
                        for i := 1; i < len(rcfg.URLs); i++ {
657✔
273
                                u := rcfg.URLs[i]
451✔
274
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
458✔
275
                                        ok = false
7✔
276
                                        break
7✔
277
                                }
278
                        }
279
                        if !ok {
213✔
280
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
281
                        }
7✔
282
                }
283
                // Validate compression settings
284
                if rcfg.Compression.Mode != _EMPTY_ {
1,382✔
285
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
696✔
286
                                return err
5✔
287
                        }
5✔
288
                }
289
        }
290

291
        if o.LeafNode.Port == 0 {
12,743✔
292
                return nil
4,936✔
293
        }
4,936✔
294

295
        // If MinVersion is defined, check that it is valid.
296
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
2,875✔
297
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        // The checks below will be done only when detecting that we are configured
303
        // with gateways. So if an option validation needs to be done regardless,
304
        // it MUST be done before this point!
305

306
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
5,058✔
307
                return nil
2,189✔
308
        }
2,189✔
309
        // If we are here we have both leaf nodes and gateways defined, make sure there
310
        // is a system account defined.
311
        if o.SystemAccount == _EMPTY_ {
681✔
312
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
313
        }
1✔
314
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
679✔
315
                return fmt.Errorf("leafnode: %v", err)
×
316
        }
×
317
        return nil
679✔
318
}
319

320
func checkLeafMinVersionConfig(mv string) error {
8✔
321
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
322
                if err != nil {
6✔
323
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
324
                } else {
4✔
325
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
326
                }
2✔
327
        }
328
        return nil
4✔
329
}
330

331
// Used to validate user names in LeafNode configuration.
332
// - rejects mix of single and multiple users.
333
// - rejects duplicate user names.
334
func validateLeafNodeAuthOptions(o *Options) error {
7,872✔
335
        if len(o.LeafNode.Users) == 0 {
15,724✔
336
                return nil
7,852✔
337
        }
7,852✔
338
        if o.LeafNode.Username != _EMPTY_ {
22✔
339
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
340
        }
2✔
341
        if o.LeafNode.Nkey != _EMPTY_ {
18✔
342
                return fmt.Errorf("can not have a single nkey and a users array")
×
343
        }
×
344
        users := map[string]struct{}{}
18✔
345
        for _, u := range o.LeafNode.Users {
42✔
346
                if _, exists := users[u.Username]; exists {
26✔
347
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
348
                }
2✔
349
                users[u.Username] = struct{}{}
22✔
350
        }
351
        return nil
16✔
352
}
353

354
// Update remote LeafNode TLS configurations after a config reload.
355
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
11✔
356
        max := len(opts.LeafNode.Remotes)
11✔
357
        if max == 0 {
11✔
358
                return
×
359
        }
×
360

361
        s.mu.RLock()
11✔
362
        defer s.mu.RUnlock()
11✔
363

11✔
364
        // Changes in the list of remote leaf nodes is not supported.
11✔
365
        // However, make sure that we don't go over the arrays.
11✔
366
        if len(s.leafRemoteCfgs) < max {
11✔
367
                max = len(s.leafRemoteCfgs)
×
368
        }
×
369
        for i := 0; i < max; i++ {
22✔
370
                ro := opts.LeafNode.Remotes[i]
11✔
371
                cfg := s.leafRemoteCfgs[i]
11✔
372
                if ro.TLSConfig != nil {
13✔
373
                        cfg.Lock()
2✔
374
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
375
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
376
                        cfg.Unlock()
2✔
377
                }
2✔
378
        }
379
}
380

381
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
228✔
382
        delay := s.getOpts().LeafNode.ReconnectInterval
228✔
383
        select {
228✔
384
        case <-time.After(delay):
184✔
385
        case <-s.quitCh:
44✔
386
                s.grWG.Done()
44✔
387
                return
44✔
388
        }
389
        s.connectToRemoteLeafNode(remote, false)
184✔
390
}
391

392
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
393
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
675✔
394
        cfg := &leafNodeCfg{
675✔
395
                RemoteLeafOpts: remote,
675✔
396
                urls:           make([]*url.URL, 0, len(remote.URLs)),
675✔
397
        }
675✔
398
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
683✔
399
                perms := &Permissions{}
8✔
400
                if len(remote.DenyExports) > 0 {
16✔
401
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
402
                }
8✔
403
                if len(remote.DenyImports) > 0 {
15✔
404
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
405
                }
7✔
406
                cfg.perms = perms
8✔
407
        }
408
        // Start with the one that is configured. We will add to this
409
        // array when receiving async leafnode INFOs.
410
        cfg.urls = append(cfg.urls, cfg.URLs...)
675✔
411
        // If allowed to randomize, do it on our copy of URLs
675✔
412
        if !remote.NoRandomize {
1,348✔
413
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,081✔
414
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
408✔
415
                })
408✔
416
        }
417
        // If we are TLS make sure we save off a proper servername if possible.
418
        // Do same for user/password since we may need them to connect to
419
        // a bare URL that we get from INFO protocol.
420
        for _, u := range cfg.urls {
1,788✔
421
                cfg.saveTLSHostname(u)
1,113✔
422
                cfg.saveUserPassword(u)
1,113✔
423
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,113✔
424
                // config, mark that we should be using TLS anyway.
1,113✔
425
                if !cfg.TLS && isWSSURL(u) {
1,114✔
426
                        cfg.TLS = true
1✔
427
                }
1✔
428
        }
429
        return cfg
675✔
430
}
431

432
// Will pick an URL from the list of available URLs.
433
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
1,027✔
434
        cfg.Lock()
1,027✔
435
        defer cfg.Unlock()
1,027✔
436
        // If the current URL is the first in the list and we have more than
1,027✔
437
        // one URL, then move that one to end of the list.
1,027✔
438
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
1,173✔
439
                first := cfg.urls[0]
146✔
440
                copy(cfg.urls, cfg.urls[1:])
146✔
441
                cfg.urls[len(cfg.urls)-1] = first
146✔
442
        }
146✔
443
        cfg.curURL = cfg.urls[0]
1,027✔
444
        return cfg.curURL
1,027✔
445
}
446

447
// Returns the current URL
448
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
88✔
449
        cfg.RLock()
88✔
450
        defer cfg.RUnlock()
88✔
451
        return cfg.curURL
88✔
452
}
88✔
453

454
// Returns how long the server should wait before attempting
455
// to solicit a remote leafnode connection.
456
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
859✔
457
        cfg.RLock()
859✔
458
        delay := cfg.connDelay
859✔
459
        cfg.RUnlock()
859✔
460
        return delay
859✔
461
}
859✔
462

463
// Sets the connect delay.
464
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
152✔
465
        cfg.Lock()
152✔
466
        cfg.connDelay = delay
152✔
467
        cfg.Unlock()
152✔
468
}
152✔
469

470
// Ensure that non-exported options (used in tests) have
471
// been properly set.
472
func (s *Server) setLeafNodeNonExportedOptions() {
6,304✔
473
        opts := s.getOpts()
6,304✔
474
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,304✔
475
        if s.leafNodeOpts.dialTimeout == 0 {
12,607✔
476
                // Use same timeouts as routes for now.
6,303✔
477
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,303✔
478
        }
6,303✔
479
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,304✔
480
        if s.leafNodeOpts.resolver == nil {
12,604✔
481
                s.leafNodeOpts.resolver = net.DefaultResolver
6,300✔
482
        }
6,300✔
483
}
484

485
const sharedSysAccDelay = 250 * time.Millisecond
486

487
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
859✔
488
        defer s.grWG.Done()
859✔
489

859✔
490
        if remote == nil || len(remote.URLs) == 0 {
859✔
491
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
492
                return
×
493
        }
×
494

495
        opts := s.getOpts()
859✔
496
        reconnectDelay := opts.LeafNode.ReconnectInterval
859✔
497
        s.mu.RLock()
859✔
498
        dialTimeout := s.leafNodeOpts.dialTimeout
859✔
499
        resolver := s.leafNodeOpts.resolver
859✔
500
        var isSysAcc bool
859✔
501
        if s.eventsEnabled() {
1,687✔
502
                isSysAcc = remote.LocalAccount == s.sys.account.Name
828✔
503
        }
828✔
504
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
859✔
505
        s.mu.RUnlock()
859✔
506

859✔
507
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
859✔
508
        if firstConnect && isSysAcc && !s.standAloneMode() {
929✔
509
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
70✔
510
                remote.setConnectDelay(sharedSysAccDelay)
70✔
511
        }
70✔
512

513
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
939✔
514
                select {
80✔
515
                case <-time.After(connDelay):
71✔
516
                case <-s.quitCh:
9✔
517
                        return
9✔
518
                }
519
                remote.setConnectDelay(0)
71✔
520
        }
521

522
        var conn net.Conn
850✔
523

850✔
524
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
850✔
525

850✔
526
        attempts := 0
850✔
527

850✔
528
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
1,877✔
529
                rURL := remote.pickNextURL()
1,027✔
530
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
1,027✔
531
                if err == nil {
2,047✔
532
                        var ipStr string
1,020✔
533
                        if url != rURL.Host {
1,093✔
534
                                ipStr = fmt.Sprintf(" (%s)", url)
73✔
535
                        }
73✔
536
                        // Some test may want to disable remotes from connecting
537
                        if s.isLeafConnectDisabled() {
1,153✔
538
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
133✔
539
                                err = ErrLeafNodeDisabled
133✔
540
                        } else {
1,020✔
541
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
887✔
542
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
887✔
543
                        }
887✔
544
                }
545
                if err != nil {
1,295✔
546
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
268✔
547
                        delay := reconnectDelay + jitter
268✔
548
                        attempts++
268✔
549
                        if s.shouldReportConnectErr(firstConnect, attempts) {
521✔
550
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
253✔
551
                        } else {
268✔
552
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
15✔
553
                        }
15✔
554
                        remote.Lock()
268✔
555
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
268✔
556
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
276✔
557
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
558
                                        s.checkJetStreamMigrate(remote)
8✔
559
                                })
8✔
560
                        }
561
                        remote.Unlock()
268✔
562
                        select {
268✔
563
                        case <-s.quitCh:
89✔
564
                                remote.cancelMigrateTimer()
89✔
565
                                return
89✔
566
                        case <-time.After(delay):
179✔
567
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
179✔
568
                                // This will be used if JetStreamClusterMigrateDelay was not set
179✔
569
                                if jetstreamMigrateDelay == 0 {
285✔
570
                                        s.checkJetStreamMigrate(remote)
106✔
571
                                }
106✔
572
                                continue
179✔
573
                        }
574
                }
575
                remote.cancelMigrateTimer()
759✔
576
                if !s.remoteLeafNodeStillValid(remote) {
759✔
577
                        conn.Close()
×
578
                        return
×
579
                }
×
580

581
                // We have a connection here to a remote server.
582
                // Go ahead and create our leaf node and return.
583
                s.createLeafNode(conn, rURL, remote, nil)
759✔
584

759✔
585
                // Clear any observer states if we had them.
759✔
586
                s.clearObserverState(remote)
759✔
587

759✔
588
                return
759✔
589
        }
590
}
591

592
func (cfg *leafNodeCfg) cancelMigrateTimer() {
848✔
593
        cfg.Lock()
848✔
594
        stopAndClearTimer(&cfg.jsMigrateTimer)
848✔
595
        cfg.Unlock()
848✔
596
}
848✔
597

598
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
599
func (s *Server) clearObserverState(remote *leafNodeCfg) {
759✔
600
        s.mu.RLock()
759✔
601
        accName := remote.LocalAccount
759✔
602
        s.mu.RUnlock()
759✔
603

759✔
604
        acc, err := s.LookupAccount(accName)
759✔
605
        if err != nil {
761✔
606
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
607
                return
2✔
608
        }
2✔
609

610
        acc.jscmMu.Lock()
757✔
611
        defer acc.jscmMu.Unlock()
757✔
612

757✔
613
        // Walk all streams looking for any clustered stream, skip otherwise.
757✔
614
        for _, mset := range acc.streams() {
772✔
615
                node := mset.raftNode()
15✔
616
                if node == nil {
22✔
617
                        // Not R>1
7✔
618
                        continue
7✔
619
                }
620
                // Check consumers
621
                for _, o := range mset.getConsumers() {
10✔
622
                        if n := o.raftNode(); n != nil {
4✔
623
                                // Ensure we can become a leader again.
2✔
624
                                n.SetObserver(false)
2✔
625
                        }
2✔
626
                }
627
                // Ensure we can not become a leader again.
628
                node.SetObserver(false)
8✔
629
        }
630
}
631

632
// Check to see if we should migrate any assets from this account.
633
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
114✔
634
        s.mu.RLock()
114✔
635
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
114✔
636
        s.mu.RUnlock()
114✔
637

114✔
638
        if !shouldMigrate {
160✔
639
                return
46✔
640
        }
46✔
641

642
        acc, err := s.LookupAccount(accName)
68✔
643
        if err != nil {
68✔
644
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
645
                return
×
646
        }
×
647

648
        acc.jscmMu.Lock()
68✔
649
        defer acc.jscmMu.Unlock()
68✔
650

68✔
651
        // Walk all streams looking for any clustered stream, skip otherwise.
68✔
652
        // If we are the leader force stepdown.
68✔
653
        for _, mset := range acc.streams() {
103✔
654
                node := mset.raftNode()
35✔
655
                if node == nil {
35✔
656
                        // Not R>1
×
657
                        continue
×
658
                }
659
                // Collect any consumers
660
                for _, o := range mset.getConsumers() {
58✔
661
                        if n := o.raftNode(); n != nil {
46✔
662
                                n.StepDown()
23✔
663
                                // Ensure we can not become a leader while in this state.
23✔
664
                                n.SetObserver(true)
23✔
665
                        }
23✔
666
                }
667
                // Stepdown if this stream was leader.
668
                node.StepDown()
35✔
669
                // Ensure we can not become a leader while in this state.
35✔
670
                node.SetObserver(true)
35✔
671
        }
672
}
673

674
// Helper for checking.
675
func (s *Server) isLeafConnectDisabled() bool {
1,020✔
676
        s.mu.RLock()
1,020✔
677
        defer s.mu.RUnlock()
1,020✔
678
        return s.leafDisableConnect
1,020✔
679
}
1,020✔
680

681
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
682
// come from the server we connect to.
683
//
684
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
685
// However, this was causing failures for users that did not set the scheme (and
686
// their remote connections did not have a tls{} block).
687
// We now save the host name regardless in case the remote returns an INFO indicating
688
// that TLS is required.
689
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,712✔
690
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,732✔
691
                cfg.tlsName = u.Hostname()
20✔
692
        }
20✔
693
}
694

695
// Save off the username/password for when we connect using a bare URL
696
// that we get from the INFO protocol.
697
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,113✔
698
        if cfg.username == _EMPTY_ && u.User != nil {
1,372✔
699
                cfg.username = u.User.Username()
259✔
700
                cfg.password, _ = u.User.Password()
259✔
701
        }
259✔
702
}
703

704
// This starts the leafnode accept loop in a go routine, unless it
705
// is detected that the server has already been shutdown.
706
func (s *Server) startLeafNodeAcceptLoop() {
2,838✔
707
        // Snapshot server options.
2,838✔
708
        opts := s.getOpts()
2,838✔
709

2,838✔
710
        port := opts.LeafNode.Port
2,838✔
711
        if port == -1 {
5,527✔
712
                port = 0
2,689✔
713
        }
2,689✔
714

715
        if s.isShuttingDown() {
2,839✔
716
                return
1✔
717
        }
1✔
718

719
        s.mu.Lock()
2,837✔
720
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
2,837✔
721
        l, e := natsListen("tcp", hp)
2,837✔
722
        s.leafNodeListenerErr = e
2,837✔
723
        if e != nil {
2,837✔
724
                s.mu.Unlock()
×
725
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
726
                return
×
727
        }
×
728

729
        s.Noticef("Listening for leafnode connections on %s",
2,837✔
730
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
2,837✔
731

2,837✔
732
        tlsRequired := opts.LeafNode.TLSConfig != nil
2,837✔
733
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
2,837✔
734
        // Do not set compression in this Info object, it would possibly cause
2,837✔
735
        // issues when sending asynchronous INFO to the remote.
2,837✔
736
        info := Info{
2,837✔
737
                ID:            s.info.ID,
2,837✔
738
                Name:          s.info.Name,
2,837✔
739
                Version:       s.info.Version,
2,837✔
740
                GitCommit:     gitCommit,
2,837✔
741
                GoVersion:     runtime.Version(),
2,837✔
742
                AuthRequired:  true,
2,837✔
743
                TLSRequired:   tlsRequired,
2,837✔
744
                TLSVerify:     tlsVerify,
2,837✔
745
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
2,837✔
746
                Headers:       s.supportsHeaders(),
2,837✔
747
                JetStream:     opts.JetStream,
2,837✔
748
                Domain:        opts.JetStreamDomain,
2,837✔
749
                Proto:         s.getServerProto(),
2,837✔
750
                InfoOnConnect: true,
2,837✔
751
        }
2,837✔
752
        // If we have selected a random port...
2,837✔
753
        if port == 0 {
5,525✔
754
                // Write resolved port back to options.
2,688✔
755
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
2,688✔
756
        }
2,688✔
757

758
        s.leafNodeInfo = info
2,837✔
759
        // Possibly override Host/Port and set IP based on Cluster.Advertise
2,837✔
760
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
2,837✔
761
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
762
                l.Close()
×
763
                s.mu.Unlock()
×
764
                return
×
765
        }
×
766
        s.leafURLsMap[s.leafNodeInfo.IP]++
2,837✔
767
        s.generateLeafNodeInfoJSON()
2,837✔
768

2,837✔
769
        // Setup state that can enable shutdown
2,837✔
770
        s.leafNodeListener = l
2,837✔
771

2,837✔
772
        // As of now, a server that does not have remotes configured would
2,837✔
773
        // never solicit a connection, so we should not have to warn if
2,837✔
774
        // InsecureSkipVerify is set in main LeafNodes config (since
2,837✔
775
        // this TLS setting matters only when soliciting a connection).
2,837✔
776
        // Still, warn if insecure is set in any of LeafNode block.
2,837✔
777
        // We need to check remotes, even if tls is not required on accept.
2,837✔
778
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
2,837✔
779
        if !warn {
5,670✔
780
                for _, r := range opts.LeafNode.Remotes {
2,990✔
781
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
782
                                warn = true
1✔
783
                                break
1✔
784
                        }
785
                }
786
        }
787
        if warn {
2,842✔
788
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
789
        }
5✔
790
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,619✔
791
        s.mu.Unlock()
2,837✔
792
}
793

794
// RegEx to match a creds file with user JWT and Seed.
795
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
796

797
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
798
// Lock should be held entering here.
799
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
632✔
800
        // We support basic user/pass and operator based user JWT with signatures.
632✔
801
        cinfo := leafConnectInfo{
632✔
802
                Version:       VERSION,
632✔
803
                ID:            c.srv.info.ID,
632✔
804
                Domain:        c.srv.info.Domain,
632✔
805
                Name:          c.srv.info.Name,
632✔
806
                Hub:           c.leaf.remote.Hub,
632✔
807
                Cluster:       clusterName,
632✔
808
                Headers:       headers,
632✔
809
                JetStream:     c.acc.jetStreamConfigured(),
632✔
810
                DenyPub:       c.leaf.remote.DenyImports,
632✔
811
                Compression:   c.leaf.compression,
632✔
812
                RemoteAccount: c.acc.GetName(),
632✔
813
                Proto:         c.srv.getServerProto(),
632✔
814
        }
632✔
815

632✔
816
        // If a signature callback is specified, this takes precedence over anything else.
632✔
817
        if cb := c.leaf.remote.SignatureCB; cb != nil {
635✔
818
                nonce := c.nonce
3✔
819
                c.mu.Unlock()
3✔
820
                jwt, sigraw, err := cb(nonce)
3✔
821
                c.mu.Lock()
3✔
822
                if err == nil && c.isClosed() {
4✔
823
                        err = ErrConnectionClosed
1✔
824
                }
1✔
825
                if err != nil {
5✔
826
                        c.Errorf("Error signing the nonce: %v", err)
2✔
827
                        return err
2✔
828
                }
2✔
829
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
830
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
831

832
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
678✔
833
                // Check for credentials first, that will take precedence..
49✔
834
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
49✔
835
                contents, err := os.ReadFile(creds)
49✔
836
                if err != nil {
49✔
837
                        c.Errorf("%v", err)
×
838
                        return err
×
839
                }
×
840
                defer wipeSlice(contents)
49✔
841
                items := credsRe.FindAllSubmatch(contents, -1)
49✔
842
                if len(items) < 2 {
49✔
843
                        c.Errorf("Credentials file malformed")
×
844
                        return err
×
845
                }
×
846
                // First result should be the user JWT.
847
                // We copy here so that the file containing the seed will be wiped appropriately.
848
                raw := items[0][1]
49✔
849
                tmp := make([]byte, len(raw))
49✔
850
                copy(tmp, raw)
49✔
851
                // Seed is second item.
49✔
852
                kp, err := nkeys.FromSeed(items[1][1])
49✔
853
                if err != nil {
49✔
854
                        c.Errorf("Credentials file has malformed seed")
×
855
                        return err
×
856
                }
×
857
                // Wipe our key on exit.
858
                defer kp.Wipe()
49✔
859

49✔
860
                sigraw, _ := kp.Sign(c.nonce)
49✔
861
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
49✔
862
                cinfo.JWT = bytesToString(tmp)
49✔
863
                cinfo.Sig = sig
49✔
864
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
582✔
865
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
866
                if err != nil {
2✔
867
                        c.Errorf("Remote nkey has malformed seed")
×
868
                        return err
×
869
                }
×
870
                // Wipe our key on exit.
871
                defer kp.Wipe()
2✔
872
                sigraw, _ := kp.Sign(c.nonce)
2✔
873
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
874
                pkey, _ := kp.PublicKey()
2✔
875
                cinfo.Nkey = pkey
2✔
876
                cinfo.Sig = sig
2✔
877
        }
878
        // In addition, and this is to allow auth callout, set user/password or
879
        // token if applicable.
880
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
910✔
881
                // For backward compatibility, if only username is provided, set both
280✔
882
                // Token and User, not just Token.
280✔
883
                cinfo.User = userInfo.Username()
280✔
884
                var ok bool
280✔
885
                cinfo.Pass, ok = userInfo.Password()
280✔
886
                if !ok {
286✔
887
                        cinfo.Token = cinfo.User
6✔
888
                }
6✔
889
        } else if c.leaf.remote.username != _EMPTY_ {
353✔
890
                cinfo.User = c.leaf.remote.username
3✔
891
                cinfo.Pass = c.leaf.remote.password
3✔
892
        }
3✔
893
        b, err := json.Marshal(cinfo)
630✔
894
        if err != nil {
630✔
895
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
896
                return err
×
897
        }
×
898
        // Although this call is made before the writeLoop is created,
899
        // we don't really need to send in place. The protocol will be
900
        // sent out by the writeLoop.
901
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
630✔
902
        return nil
630✔
903
}
904

905
// Makes a deep copy of the LeafNode Info structure.
906
// The server lock is held on entry.
907
func (s *Server) copyLeafNodeInfo() *Info {
2,522✔
908
        clone := s.leafNodeInfo
2,522✔
909
        // Copy the array of urls.
2,522✔
910
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,579✔
911
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,057✔
912
        }
2,057✔
913
        return &clone
2,522✔
914
}
915

916
// Adds a LeafNode URL that we get when a route connects to the Info structure.
917
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
918
// Returns a boolean indicating if the URL was added or not.
919
// Server lock is held on entry
920
func (s *Server) addLeafNodeURL(urlStr string) bool {
5,596✔
921
        if s.leafURLsMap.addUrl(urlStr) {
11,187✔
922
                s.generateLeafNodeInfoJSON()
5,591✔
923
                return true
5,591✔
924
        }
5,591✔
925
        return false
5✔
926
}
927

928
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
929
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
930
// Returns a boolean indicating if the URL was removed or not.
931
// Server lock is held on entry.
932
func (s *Server) removeLeafNodeURL(urlStr string) bool {
5,596✔
933
        // Don't need to do this if we are removing the route connection because
5,596✔
934
        // we are shuting down...
5,596✔
935
        if s.isShuttingDown() {
8,552✔
936
                return false
2,956✔
937
        }
2,956✔
938
        if s.leafURLsMap.removeUrl(urlStr) {
5,276✔
939
                s.generateLeafNodeInfoJSON()
2,636✔
940
                return true
2,636✔
941
        }
2,636✔
942
        return false
4✔
943
}
944

945
// Server lock is held on entry
946
func (s *Server) generateLeafNodeInfoJSON() {
11,064✔
947
        s.leafNodeInfo.Cluster = s.cachedClusterName()
11,064✔
948
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
11,064✔
949
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
11,064✔
950
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
11,064✔
951
}
11,064✔
952

953
// Sends an async INFO protocol so that the connected servers can update
954
// their list of LeafNode urls.
955
func (s *Server) sendAsyncLeafNodeInfo() {
8,227✔
956
        for _, c := range s.leafs {
8,321✔
957
                c.mu.Lock()
94✔
958
                c.enqueueProto(s.leafNodeInfoJSON)
94✔
959
                c.mu.Unlock()
94✔
960
        }
94✔
961
}
962

963
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
964
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,568✔
965
        // Snapshot server options.
1,568✔
966
        opts := s.getOpts()
1,568✔
967

1,568✔
968
        maxPay := int32(opts.MaxPayload)
1,568✔
969
        maxSubs := int32(opts.MaxSubs)
1,568✔
970
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,568✔
971
        if maxSubs == 0 {
3,135✔
972
                maxSubs = -1
1,567✔
973
        }
1,567✔
974
        now := time.Now().UTC()
1,568✔
975

1,568✔
976
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,568✔
977
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,568✔
978
        c.leaf = &leaf{}
1,568✔
979

1,568✔
980
        // For accepted LN connections, ws will be != nil if it was accepted
1,568✔
981
        // through the Websocket port.
1,568✔
982
        c.ws = ws
1,568✔
983

1,568✔
984
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,568✔
985
        // a remote Leaf Node connection as a websocket connection.
1,568✔
986
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,611✔
987
                remote.RLock()
43✔
988
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
989
                remote.RUnlock()
43✔
990
        }
43✔
991

992
        // Determines if we are soliciting the connection or not.
993
        var solicited bool
1,568✔
994
        var acc *Account
1,568✔
995
        var remoteSuffix string
1,568✔
996
        if remote != nil {
2,327✔
997
                // For now, if lookup fails, we will constantly try
759✔
998
                // to recreate this LN connection.
759✔
999
                lacc := remote.LocalAccount
759✔
1000
                var err error
759✔
1001
                acc, err = s.LookupAccount(lacc)
759✔
1002
                if err != nil {
761✔
1003
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1004
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1005
                        // remote needs to be set or retry won't happen
2✔
1006
                        c.leaf.remote = remote
2✔
1007
                        c.closeConnection(MissingAccount)
2✔
1008
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1009
                        return nil
2✔
1010
                }
2✔
1011
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
757✔
1012
        }
1013

1014
        c.mu.Lock()
1,566✔
1015
        c.initClient()
1,566✔
1016
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,566✔
1017

1,566✔
1018
        var (
1,566✔
1019
                tlsFirst         bool
1,566✔
1020
                tlsFirstFallback time.Duration
1,566✔
1021
                infoTimeout      time.Duration
1,566✔
1022
        )
1,566✔
1023
        if remote != nil {
2,323✔
1024
                solicited = true
757✔
1025
                remote.Lock()
757✔
1026
                c.leaf.remote = remote
757✔
1027
                c.setPermissions(remote.perms)
757✔
1028
                if !c.leaf.remote.Hub {
1,508✔
1029
                        c.leaf.isSpoke = true
751✔
1030
                }
751✔
1031
                tlsFirst = remote.TLSHandshakeFirst
757✔
1032
                infoTimeout = remote.FirstInfoTimeout
757✔
1033
                remote.Unlock()
757✔
1034
                c.acc = acc
757✔
1035
        } else {
809✔
1036
                c.flags.set(expectConnect)
809✔
1037
                if ws != nil {
836✔
1038
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1039
                }
27✔
1040
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
809✔
1041
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
810✔
1042
                        tlsFirstFallback = f
1✔
1043
                }
1✔
1044
        }
1045
        c.mu.Unlock()
1,566✔
1046

1,566✔
1047
        var nonce [nonceLen]byte
1,566✔
1048
        var info *Info
1,566✔
1049

1,566✔
1050
        // Grab this before the client lock below.
1,566✔
1051
        if !solicited {
2,375✔
1052
                // Grab server variables
809✔
1053
                s.mu.Lock()
809✔
1054
                info = s.copyLeafNodeInfo()
809✔
1055
                // For tests that want to simulate old servers, do not set the compression
809✔
1056
                // on the INFO protocol if configured with CompressionNotSupported.
809✔
1057
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,617✔
1058
                        info.Compression = cm
808✔
1059
                }
808✔
1060
                s.generateNonce(nonce[:])
809✔
1061
                s.mu.Unlock()
809✔
1062
        }
1063

1064
        // Grab lock
1065
        c.mu.Lock()
1,566✔
1066

1,566✔
1067
        var preBuf []byte
1,566✔
1068
        if solicited {
2,323✔
1069
                // For websocket connection, we need to send an HTTP request,
757✔
1070
                // and get the response before starting the readLoop to get
757✔
1071
                // the INFO, etc..
757✔
1072
                if c.isWebsocket() {
800✔
1073
                        var err error
43✔
1074
                        var closeReason ClosedState
43✔
1075

43✔
1076
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1077
                        if err != nil {
59✔
1078
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1079
                                c.mu.Unlock()
16✔
1080
                                if closeReason != 0 {
28✔
1081
                                        c.closeConnection(closeReason)
12✔
1082
                                }
12✔
1083
                                return nil
16✔
1084
                        }
1085
                } else {
714✔
1086
                        // If configured to do TLS handshake first
714✔
1087
                        if tlsFirst {
718✔
1088
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1089
                                        c.mu.Unlock()
1✔
1090
                                        return nil
1✔
1091
                                }
1✔
1092
                        }
1093
                        // We need to wait for the info, but not for too long.
1094
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
713✔
1095
                }
1096

1097
                // We will process the INFO from the readloop and finish by
1098
                // sending the CONNECT and finish registration later.
1099
        } else {
809✔
1100
                // Send our info to the other side.
809✔
1101
                // Remember the nonce we sent here for signatures, etc.
809✔
1102
                c.nonce = make([]byte, nonceLen)
809✔
1103
                copy(c.nonce, nonce[:])
809✔
1104
                info.Nonce = bytesToString(c.nonce)
809✔
1105
                info.CID = c.cid
809✔
1106
                proto := generateInfoJSON(info)
809✔
1107

809✔
1108
                var pre []byte
809✔
1109
                // We need first to check for "TLS First" fallback delay.
809✔
1110
                if tlsFirstFallback > 0 {
810✔
1111
                        // We wait and see if we are getting any data. Since we did not send
1✔
1112
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1113
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1114
                        // if it is a rogue agent and not an actual client performing the
1✔
1115
                        // TLS handshake, the error will be detected when performing the
1✔
1116
                        // handshake on our side.
1✔
1117
                        pre = make([]byte, 4)
1✔
1118
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1119
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1120
                        c.nc.SetReadDeadline(time.Time{})
1✔
1121
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1122
                        // with the TLS handshake.
1✔
1123
                        if n > 0 {
1✔
1124
                                pre = pre[:n]
×
1125
                        } else {
1✔
1126
                                // We did not get anything so we will send the INFO protocol.
1✔
1127
                                pre = nil
1✔
1128
                                // Set the boolean to false for the rest of the function.
1✔
1129
                                tlsFirst = false
1✔
1130
                        }
1✔
1131
                }
1132

1133
                if !tlsFirst {
1,613✔
1134
                        // We have to send from this go routine because we may
804✔
1135
                        // have to block for TLS handshake before we start our
804✔
1136
                        // writeLoop go routine. The other side needs to receive
804✔
1137
                        // this before it can initiate the TLS handshake..
804✔
1138
                        c.sendProtoNow(proto)
804✔
1139

804✔
1140
                        // The above call could have marked the connection as closed (due to TCP error).
804✔
1141
                        if c.isClosed() {
804✔
1142
                                c.mu.Unlock()
×
1143
                                c.closeConnection(WriteError)
×
1144
                                return nil
×
1145
                        }
×
1146
                }
1147

1148
                // Check to see if we need to spin up TLS.
1149
                if !c.isWebsocket() && info.TLSRequired {
890✔
1150
                        // If we have a prebuffer create a multi-reader.
81✔
1151
                        if len(pre) > 0 {
81✔
1152
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1153
                        }
×
1154
                        // Perform server-side TLS handshake.
1155
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
134✔
1156
                                c.mu.Unlock()
53✔
1157
                                return nil
53✔
1158
                        }
53✔
1159
                }
1160

1161
                // If the user wants the TLS handshake to occur first, now that it is
1162
                // done, send the INFO protocol.
1163
                if tlsFirst {
759✔
1164
                        c.flags.set(didTLSFirst)
3✔
1165
                        c.sendProtoNow(proto)
3✔
1166
                        if c.isClosed() {
3✔
1167
                                c.mu.Unlock()
×
1168
                                c.closeConnection(WriteError)
×
1169
                                return nil
×
1170
                        }
×
1171
                }
1172

1173
                // Leaf nodes will always require a CONNECT to let us know
1174
                // when we are properly bound to an account.
1175
                //
1176
                // If compression is configured, we can't set the authTimer here because
1177
                // it would cause the parser to fail any incoming protocol that is not a
1178
                // CONNECT (and we need to exchange INFO protocols for compression
1179
                // negotiation). So instead, use the ping timer until we are done with
1180
                // negotiation and can set the auth timer.
1181
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
756✔
1182
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,311✔
1183
                        c.ping.tmr = time.AfterFunc(timeout, func() {
564✔
1184
                                c.authTimeout()
9✔
1185
                        })
9✔
1186
                } else {
201✔
1187
                        c.setAuthTimer(timeout)
201✔
1188
                }
201✔
1189
        }
1190

1191
        // Keep track in case server is shutdown before we can successfully register.
1192
        if !s.addToTempClients(c.cid, c) {
1,497✔
1193
                c.mu.Unlock()
1✔
1194
                c.setNoReconnect()
1✔
1195
                c.closeConnection(ServerShutdown)
1✔
1196
                return nil
1✔
1197
        }
1✔
1198

1199
        // Spin up the read loop.
1200
        s.startGoRoutine(func() { c.readLoop(preBuf) })
2,990✔
1201

1202
        // We will spin the write loop for solicited connections only
1203
        // when processing the INFO and after switching to TLS if needed.
1204
        if !solicited {
2,251✔
1205
                s.startGoRoutine(func() { c.writeLoop() })
1,512✔
1206
        }
1207

1208
        c.mu.Unlock()
1,495✔
1209

1,495✔
1210
        return c
1,495✔
1211
}
1212

1213
// Will perform the client-side TLS handshake if needed. Assumes that this
1214
// is called by the solicit side (remote will be non nil). Returns `true`
1215
// if TLS is required, `false` otherwise.
1216
// Lock held on entry.
1217
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,813✔
1218
        // Check if TLS is required and gather TLS config variables.
1,813✔
1219
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,813✔
1220
        if !tlsRequired {
3,538✔
1221
                return false, nil
1,725✔
1222
        }
1,725✔
1223

1224
        // If TLS required, peform handshake.
1225
        // Get the URL that was used to connect to the remote server.
1226
        rURL := remote.getCurrentURL()
88✔
1227

88✔
1228
        // Perform the client-side TLS handshake.
88✔
1229
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
134✔
1230
                // Check if we need to reset the remote's TLS name.
46✔
1231
                if resetTLSName {
46✔
1232
                        remote.Lock()
×
1233
                        remote.tlsName = _EMPTY_
×
1234
                        remote.Unlock()
×
1235
                }
×
1236
                return false, err
46✔
1237
        }
1238
        return true, nil
42✔
1239
}
1240

1241
func (c *client) processLeafnodeInfo(info *Info) {
2,503✔
1242
        c.mu.Lock()
2,503✔
1243
        if c.leaf == nil || c.isClosed() {
2,503✔
1244
                c.mu.Unlock()
×
1245
                return
×
1246
        }
×
1247
        s := c.srv
2,503✔
1248
        opts := s.getOpts()
2,503✔
1249
        remote := c.leaf.remote
2,503✔
1250
        didSolicit := remote != nil
2,503✔
1251
        firstINFO := !c.flags.isSet(infoReceived)
2,503✔
1252

2,503✔
1253
        // In case of websocket, the TLS handshake has been already done.
2,503✔
1254
        // So check only for non websocket connections and for configurations
2,503✔
1255
        // where the TLS Handshake was not done first.
2,503✔
1256
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,269✔
1257
                // If the server requires TLS, we need to set this in the remote
1,766✔
1258
                // otherwise if there is no TLS configuration block for the remote,
1,766✔
1259
                // the solicit side will not attempt to perform the TLS handshake.
1,766✔
1260
                if firstINFO && info.TLSRequired {
1,838✔
1261
                        remote.TLS = true
72✔
1262
                }
72✔
1263
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,807✔
1264
                        c.mu.Unlock()
41✔
1265
                        return
41✔
1266
                }
41✔
1267
        }
1268

1269
        // Check for compression, unless already done.
1270
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,678✔
1271
                // Prevent from getting back here.
1,216✔
1272
                c.flags.set(compressionNegotiated)
1,216✔
1273

1,216✔
1274
                var co *CompressionOpts
1,216✔
1275
                if !didSolicit {
1,746✔
1276
                        co = &opts.LeafNode.Compression
530✔
1277
                } else {
1,216✔
1278
                        co = &remote.Compression
686✔
1279
                }
686✔
1280
                if needsCompression(co.Mode) {
2,421✔
1281
                        // Release client lock since following function will need server lock.
1,205✔
1282
                        c.mu.Unlock()
1,205✔
1283
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,205✔
1284
                        if err != nil {
1,205✔
1285
                                c.sendErrAndErr(err.Error())
×
1286
                                c.closeConnection(ProtocolViolation)
×
1287
                                return
×
1288
                        }
×
1289
                        if compress {
2,281✔
1290
                                // Done for now, will get back another INFO protocol...
1,076✔
1291
                                return
1,076✔
1292
                        }
1,076✔
1293
                        // No compression because one side does not want/can't, so proceed.
1294
                        c.mu.Lock()
129✔
1295
                        // Check that the connection did not close if the lock was released.
129✔
1296
                        if c.isClosed() {
129✔
1297
                                c.mu.Unlock()
×
1298
                                return
×
1299
                        }
×
1300
                } else {
11✔
1301
                        // Coming from an old server, the Compression field would be the empty
11✔
1302
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1303
                        // this makes them behave as old servers.
11✔
1304
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1305
                                c.leaf.compression = CompressionNotSupported
3✔
1306
                        } else {
11✔
1307
                                c.leaf.compression = CompressionOff
8✔
1308
                        }
8✔
1309
                }
1310
                // Accepting side does not normally process an INFO protocol during
1311
                // initial connection handshake. So we keep it consistent by returning
1312
                // if we are not soliciting.
1313
                if !didSolicit {
141✔
1314
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1315
                        // clear the ping timer and set the auth timer now that the compression
1✔
1316
                        // negotiation is done.
1✔
1317
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1318
                                clearTimer(&c.ping.tmr)
×
1319
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1320
                        }
×
1321
                        c.mu.Unlock()
1✔
1322
                        return
1✔
1323
                }
1324
                // Fall through and process the INFO protocol as usual.
1325
        }
1326

1327
        // Note: For now, only the initial INFO has a nonce. We
1328
        // will probably do auto key rotation at some point.
1329
        if firstINFO {
2,110✔
1330
                // Mark that the INFO protocol has been received.
725✔
1331
                c.flags.set(infoReceived)
725✔
1332
                // Prevent connecting to non leafnode port. Need to do this only for
725✔
1333
                // the first INFO, not for async INFO updates...
725✔
1334
                //
725✔
1335
                // Content of INFO sent by the server when accepting a tcp connection.
725✔
1336
                // -------------------------------------------------------------------
725✔
1337
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
725✔
1338
                // -------------------------------------------------------------------
725✔
1339
                //      CLIENT    |  X* |        X**        |              |         |
725✔
1340
                //      ROUTE     |     |        X**        |      X***    |         |
725✔
1341
                //     GATEWAY    |     |                   |              |    X    |
725✔
1342
                //     LEAFNODE   |  X  |                   |       X      |         |
725✔
1343
                // -------------------------------------------------------------------
725✔
1344
                // *   Not on older servers.
725✔
1345
                // **  Not if "no advertise" is enabled.
725✔
1346
                // *** Not if leafnode's "no advertise" is enabled.
725✔
1347
                //
725✔
1348
                // As seen from above, a solicited LeafNode connection should receive
725✔
1349
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
725✔
1350
                // else should be considered an attempt to connect to a wrong port.
725✔
1351
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
776✔
1352
                        c.mu.Unlock()
51✔
1353
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1354
                        c.closeConnection(WrongPort)
51✔
1355
                        return
51✔
1356
                }
51✔
1357
                // Reject a cluster that contains spaces.
1358
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
675✔
1359
                        c.mu.Unlock()
1✔
1360
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1361
                        c.closeConnection(ProtocolViolation)
1✔
1362
                        return
1✔
1363
                }
1✔
1364
                // Capture a nonce here.
1365
                c.nonce = []byte(info.Nonce)
673✔
1366
                if info.TLSRequired && didSolicit {
704✔
1367
                        remote.TLS = true
31✔
1368
                }
31✔
1369
                supportsHeaders := c.srv.supportsHeaders()
673✔
1370
                c.headers = supportsHeaders && info.Headers
673✔
1371

673✔
1372
                // Remember the remote server.
673✔
1373
                // Pre 2.2.0 servers are not sending their server name.
673✔
1374
                // In that case, use info.ID, which, for those servers, matches
673✔
1375
                // the content of the field `Name` in the leafnode CONNECT protocol.
673✔
1376
                if info.Name == _EMPTY_ {
673✔
1377
                        c.leaf.remoteServer = info.ID
×
1378
                } else {
673✔
1379
                        c.leaf.remoteServer = info.Name
673✔
1380
                }
673✔
1381
                c.leaf.remoteDomain = info.Domain
673✔
1382
                c.leaf.remoteCluster = info.Cluster
673✔
1383
                // We send the protocol version in the INFO protocol.
673✔
1384
                // Keep track of it, so we know if this connection supports message
673✔
1385
                // tracing for instance.
673✔
1386
                c.opts.Protocol = info.Proto
673✔
1387
        }
1388

1389
        // For both initial INFO and async INFO protocols, Possibly
1390
        // update our list of remote leafnode URLs we can connect to.
1391
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,584✔
1392
                // Consider the incoming array as the most up-to-date
1,251✔
1393
                // representation of the remote cluster's list of URLs.
1,251✔
1394
                c.updateLeafNodeURLs(info)
1,251✔
1395
        }
1,251✔
1396

1397
        // Check to see if we have permissions updates here.
1398
        if info.Import != nil || info.Export != nil {
1,344✔
1399
                perms := &Permissions{
11✔
1400
                        Publish:   info.Export,
11✔
1401
                        Subscribe: info.Import,
11✔
1402
                }
11✔
1403
                // Check if we have local deny clauses that we need to merge.
11✔
1404
                if remote := c.leaf.remote; remote != nil {
22✔
1405
                        if len(remote.DenyExports) > 0 {
12✔
1406
                                if perms.Publish == nil {
1✔
1407
                                        perms.Publish = &SubjectPermission{}
×
1408
                                }
×
1409
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1410
                        }
1411
                        if len(remote.DenyImports) > 0 {
12✔
1412
                                if perms.Subscribe == nil {
1✔
1413
                                        perms.Subscribe = &SubjectPermission{}
×
1414
                                }
×
1415
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1416
                        }
1417
                }
1418
                c.setPermissions(perms)
11✔
1419
        }
1420

1421
        var resumeConnect bool
1,333✔
1422

1,333✔
1423
        // If this is a remote connection and this is the first INFO protocol,
1,333✔
1424
        // then we need to finish the connect process by sending CONNECT, etc..
1,333✔
1425
        if firstINFO && didSolicit {
1,965✔
1426
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
632✔
1427
                c.nc.SetDeadline(time.Time{})
632✔
1428
                resumeConnect = true
632✔
1429
        } else if !firstINFO && didSolicit {
1,952✔
1430
                c.leaf.remoteAccName = info.RemoteAccount
619✔
1431
        }
619✔
1432

1433
        // Check if we have the remote account information and if so make sure it's stored.
1434
        if info.RemoteAccount != _EMPTY_ {
1,943✔
1435
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
610✔
1436
        }
610✔
1437
        c.mu.Unlock()
1,333✔
1438

1,333✔
1439
        finishConnect := info.ConnectInfo
1,333✔
1440
        if resumeConnect && s != nil {
1,965✔
1441
                s.leafNodeResumeConnectProcess(c)
632✔
1442
                if !info.InfoOnConnect {
632✔
1443
                        finishConnect = true
×
1444
                }
×
1445
        }
1446
        if finishConnect {
1,943✔
1447
                s.leafNodeFinishConnectProcess(c)
610✔
1448
        }
610✔
1449
}
1450

1451
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,205✔
1452
        // Negotiate the appropriate compression mode (or no compression)
1,205✔
1453
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,205✔
1454
        if err != nil {
1,205✔
1455
                return false, err
×
1456
        }
×
1457
        c.mu.Lock()
1,205✔
1458
        // For "auto" mode, set the initial compression mode based on RTT
1,205✔
1459
        if cm == CompressionS2Auto {
2,242✔
1460
                if c.rttStart.IsZero() {
2,074✔
1461
                        c.rtt = computeRTT(c.start)
1,037✔
1462
                }
1,037✔
1463
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,037✔
1464
        }
1465
        // Keep track of the negotiated compression mode.
1466
        c.leaf.compression = cm
1,205✔
1467
        cid := c.cid
1,205✔
1468
        var nonce string
1,205✔
1469
        if !didSolicit {
1,734✔
1470
                nonce = bytesToString(c.nonce)
529✔
1471
        }
529✔
1472
        c.mu.Unlock()
1,205✔
1473

1,205✔
1474
        if !needsCompression(cm) {
1,334✔
1475
                return false, nil
129✔
1476
        }
129✔
1477

1478
        // If we end-up doing compression...
1479

1480
        // Generate an INFO with the chosen compression mode.
1481
        s.mu.Lock()
1,076✔
1482
        info := s.copyLeafNodeInfo()
1,076✔
1483
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,076✔
1484
        infoProto := generateInfoJSON(info)
1,076✔
1485
        s.mu.Unlock()
1,076✔
1486

1,076✔
1487
        // If we solicited, then send this INFO protocol BEFORE switching
1,076✔
1488
        // to compression writer. However, if we did not, we send it after.
1,076✔
1489
        c.mu.Lock()
1,076✔
1490
        if didSolicit {
1,623✔
1491
                c.enqueueProto(infoProto)
547✔
1492
                // Make sure it is completely flushed (the pending bytes goes to
547✔
1493
                // 0) before proceeding.
547✔
1494
                for c.out.pb > 0 && !c.isClosed() {
1,094✔
1495
                        c.flushOutbound()
547✔
1496
                }
547✔
1497
        }
1498
        // This is to notify the readLoop that it should switch to a
1499
        // (de)compression reader.
1500
        c.in.flags.set(switchToCompression)
1,076✔
1501
        // Create the compress writer before queueing the INFO protocol for
1,076✔
1502
        // a route that did not solicit. It will make sure that that proto
1,076✔
1503
        // is sent with compression on.
1,076✔
1504
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,076✔
1505
        if !didSolicit {
1,605✔
1506
                c.enqueueProto(infoProto)
529✔
1507
        }
529✔
1508
        c.mu.Unlock()
1,076✔
1509
        return true, nil
1,076✔
1510
}
1511

1512
// When getting a leaf node INFO protocol, use the provided
1513
// array of urls to update the list of possible endpoints.
1514
func (c *client) updateLeafNodeURLs(info *Info) {
1,251✔
1515
        cfg := c.leaf.remote
1,251✔
1516
        cfg.Lock()
1,251✔
1517
        defer cfg.Unlock()
1,251✔
1518

1,251✔
1519
        // We have ensured that if a remote has a WS scheme, then all are.
1,251✔
1520
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,251✔
1521
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,305✔
1522
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1523
                // we will have already marked that the remote should use TLS anyway.
54✔
1524
                // But use proper scheme for log statements, etc...
54✔
1525
                proto := wsSchemePrefix
54✔
1526
                if cfg.TLS {
54✔
1527
                        proto = wsSchemePrefixTLS
×
1528
                }
×
1529
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1530
                return
54✔
1531
        }
1532
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,197✔
1533
}
1534

1535
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,251✔
1536
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,251✔
1537
        // Add the ones we receive in the protocol
1,251✔
1538
        for _, surl := range URLs {
3,528✔
1539
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,277✔
1540
                if err != nil {
2,277✔
1541
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1542
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1543
                        continue
×
1544
                }
1545
                // Do not add if it's the same as what we already have configured.
1546
                var dup bool
2,277✔
1547
                for _, u := range cfg.URLs {
5,793✔
1548
                        // URLs that we receive never have user info, but the
3,516✔
1549
                        // ones that were configured may have. Simply compare
3,516✔
1550
                        // host and port to decide if they are equal or not.
3,516✔
1551
                        if url.Host == u.Host && url.Port() == u.Port() {
5,194✔
1552
                                dup = true
1,678✔
1553
                                break
1,678✔
1554
                        }
1555
                }
1556
                if !dup {
2,876✔
1557
                        cfg.urls = append(cfg.urls, url)
599✔
1558
                        cfg.saveTLSHostname(url)
599✔
1559
                }
599✔
1560
        }
1561
        // Add the configured one
1562
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,251✔
1563
}
1564

1565
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1566
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
2,837✔
1567
        opts := s.getOpts()
2,837✔
1568
        if opts.LeafNode.Advertise != _EMPTY_ {
2,848✔
1569
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1570
                if err != nil {
11✔
1571
                        return err
×
1572
                }
×
1573
                s.leafNodeInfo.Host = advHost
11✔
1574
                s.leafNodeInfo.Port = advPort
11✔
1575
        } else {
2,826✔
1576
                s.leafNodeInfo.Host = opts.LeafNode.Host
2,826✔
1577
                s.leafNodeInfo.Port = opts.LeafNode.Port
2,826✔
1578
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
2,826✔
1579
                // This will return at most 1 IP.
2,826✔
1580
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
2,826✔
1581
                if err != nil {
2,826✔
1582
                        return err
×
1583
                }
×
1584
                if hostIsIPAny {
3,107✔
1585
                        if len(ips) == 0 {
281✔
1586
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1587
                                        s.leafNodeInfo.Host)
×
1588
                        } else {
281✔
1589
                                // Take the first from the list...
281✔
1590
                                s.leafNodeInfo.Host = ips[0]
281✔
1591
                        }
281✔
1592
                }
1593
        }
1594
        // Use just host:port for the IP
1595
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
2,837✔
1596
        if opts.LeafNode.Advertise != _EMPTY_ {
2,848✔
1597
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1598
        }
11✔
1599
        return nil
2,837✔
1600
}
1601

1602
// Add the connection to the map of leaf nodes.
1603
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1604
// if a connection already exists for the same server name and account.
1605
// That can happen when the remote is attempting to reconnect while the accepting
1606
// side did not detect the connection as broken yet.
1607
// But it can also happen when there is a misconfiguration and the remote is
1608
// creating two (or more) connections that bind to the same account on the accept
1609
// side.
1610
// When a duplicate is found, the new connection is accepted and the old is closed
1611
// (this solves the stale connection situation). An error is returned to help the
1612
// remote detect the misconfiguration when the duplicate is the result of that
1613
// misconfiguration.
1614
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,244✔
1615
        var accName string
1,244✔
1616
        c.mu.Lock()
1,244✔
1617
        cid := c.cid
1,244✔
1618
        acc := c.acc
1,244✔
1619
        if acc != nil {
2,488✔
1620
                accName = acc.Name
1,244✔
1621
        }
1,244✔
1622
        myRemoteDomain := c.leaf.remoteDomain
1,244✔
1623
        mySrvName := c.leaf.remoteServer
1,244✔
1624
        remoteAccName := c.leaf.remoteAccName
1,244✔
1625
        myClustName := c.leaf.remoteCluster
1,244✔
1626
        solicited := c.leaf.remote != nil
1,244✔
1627
        c.mu.Unlock()
1,244✔
1628

1,244✔
1629
        var old *client
1,244✔
1630
        s.mu.Lock()
1,244✔
1631
        // We check for empty because in some test we may send empty CONNECT{}
1,244✔
1632
        if checkForDup && srvName != _EMPTY_ {
1,854✔
1633
                for _, ol := range s.leafs {
988✔
1634
                        ol.mu.Lock()
378✔
1635
                        // We care here only about non solicited Leafnode. This function
378✔
1636
                        // is more about replacing stale connections than detecting loops.
378✔
1637
                        // We have code for the loop detection elsewhere, which also delays
378✔
1638
                        // attempt to reconnect.
378✔
1639
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
378✔
1640
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
378✔
1641
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
380✔
1642
                                old = ol
2✔
1643
                        }
2✔
1644
                        ol.mu.Unlock()
378✔
1645
                        if old != nil {
380✔
1646
                                break
2✔
1647
                        }
1648
                }
1649
        }
1650
        // Store new connection in the map
1651
        s.leafs[cid] = c
1,244✔
1652
        s.mu.Unlock()
1,244✔
1653
        s.removeFromTempClients(cid)
1,244✔
1654

1,244✔
1655
        // If applicable, evict the old one.
1,244✔
1656
        if old != nil {
1,246✔
1657
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1658
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1659
                c.Warnf("Replacing connection from same server")
2✔
1660
        }
2✔
1661

1662
        srvDecorated := func() string {
1,452✔
1663
                if myClustName == _EMPTY_ {
229✔
1664
                        return mySrvName
21✔
1665
                }
21✔
1666
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
187✔
1667
        }
1668

1669
        opts := s.getOpts()
1,244✔
1670
        sysAcc := s.SystemAccount()
1,244✔
1671
        js := s.getJetStream()
1,244✔
1672
        var meta *raft
1,244✔
1673
        if js != nil {
1,760✔
1674
                if mg := js.getMetaGroup(); mg != nil {
943✔
1675
                        meta = mg.(*raft)
427✔
1676
                }
427✔
1677
        }
1678
        blockMappingOutgoing := false
1,244✔
1679
        // Deny (non domain) JetStream API traffic unless system account is shared
1,244✔
1680
        // and domain names are identical and extending is not disabled
1,244✔
1681

1,244✔
1682
        // Check if backwards compatibility has been enabled and needs to be acted on
1,244✔
1683
        forceSysAccDeny := false
1,244✔
1684
        if len(opts.JsAccDefaultDomain) > 0 {
1,281✔
1685
                if acc == sysAcc {
48✔
1686
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1687
                                if d == _EMPTY_ {
19✔
1688
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1689
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1690
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1691
                                        forceSysAccDeny = true
8✔
1692
                                        break
8✔
1693
                                }
1694
                        }
1695
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1696
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1697
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1698
                        return
15✔
1699
                }
15✔
1700
        }
1701

1702
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1703
        // This is either signaled by js being disabled and a domain set,
1704
        // or in cases where no domain name exists, an extension hint is set.
1705
        // However, this is only relevant in mixed setups.
1706
        //
1707
        // If the system account connects but default domains are present, JetStream can't be extended.
1708
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,229✔
1709
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,301✔
1710
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,072✔
1711
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,072✔
1712
                if acc != nil && acc == sysAcc {
1,208✔
1713
                        c.Noticef("System account connected from %s", srvDecorated())
136✔
1714
                        c.Noticef("JetStream not extended, domains differ")
136✔
1715
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
136✔
1716
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
136✔
1717
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
136✔
1718
                        if solicited && meta != nil && meta.IsObserver() {
165✔
1719
                                meta.setObserver(false, extNotExtended)
29✔
1720
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1721
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1722
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1723
                                // Meta controller can't be leader yet.
29✔
1724
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1725
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1726
                                meta.Campaign()
29✔
1727
                        }
29✔
1728
                } else {
936✔
1729
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
936✔
1730
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
936✔
1731
                }
936✔
1732
                blockMappingOutgoing = true
1,072✔
1733
        } else if acc == sysAcc {
229✔
1734
                // system account and same domain
72✔
1735
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1736
                        myRemoteDomain, srvDecorated())
72✔
1737
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1738
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1739
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1740
                        meta.setObserver(true, extExtended)
4✔
1741
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1742
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1743
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1744
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1745
                        meta.StepDown()
4✔
1746
                }
4✔
1747
        } else {
85✔
1748
                // This deny is needed in all cases (system account shared or not)
85✔
1749
                // If the system account is shared, jsAllAPI traffic will go through the system account.
85✔
1750
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
85✔
1751
                // If the system account is NOT shared, jsAllAPI traffic has no business
85✔
1752
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
85✔
1753
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
85✔
1754
        }
85✔
1755
        // If we have a specified JetStream domain we will want to add a mapping to
1756
        // allow access cross domain for each non-system account.
1757
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,455✔
1758
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,260✔
1759
                        if err := acc.AddMapping(src, dest); err != nil {
2,034✔
1760
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1761
                        } else {
2,034✔
1762
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,034✔
1763
                        }
2,034✔
1764
                }
1765
                if blockMappingOutgoing {
421✔
1766
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
195✔
1767
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
195✔
1768
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
195✔
1769
                        // of this issue, not all of them.
195✔
1770
                        // This guards against a hub and a spoke having the same domain name.
195✔
1771
                        // But not two spokes having the same one and the request coming from the hub.
195✔
1772
                        c.mergeDenyPermissionsLocked(pub, []string{src})
195✔
1773
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
195✔
1774
                }
195✔
1775
        }
1776
}
1777

1778
func (s *Server) removeLeafNodeConnection(c *client) {
1,568✔
1779
        c.mu.Lock()
1,568✔
1780
        cid := c.cid
1,568✔
1781
        if c.leaf != nil {
3,136✔
1782
                if c.leaf.tsubt != nil {
2,699✔
1783
                        c.leaf.tsubt.Stop()
1,131✔
1784
                        c.leaf.tsubt = nil
1,131✔
1785
                }
1,131✔
1786
                if c.leaf.gwSub != nil {
2,176✔
1787
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
608✔
1788
                        // We need to set this to nil for GC to release the connection
608✔
1789
                        c.leaf.gwSub = nil
608✔
1790
                }
608✔
1791
        }
1792
        c.mu.Unlock()
1,568✔
1793
        s.mu.Lock()
1,568✔
1794
        delete(s.leafs, cid)
1,568✔
1795
        s.mu.Unlock()
1,568✔
1796
        s.removeFromTempClients(cid)
1,568✔
1797
}
1798

1799
// Connect information for solicited leafnodes.
1800
type leafConnectInfo struct {
1801
        Version   string   `json:"version,omitempty"`
1802
        Nkey      string   `json:"nkey,omitempty"`
1803
        JWT       string   `json:"jwt,omitempty"`
1804
        Sig       string   `json:"sig,omitempty"`
1805
        User      string   `json:"user,omitempty"`
1806
        Pass      string   `json:"pass,omitempty"`
1807
        Token     string   `json:"auth_token,omitempty"`
1808
        ID        string   `json:"server_id,omitempty"`
1809
        Domain    string   `json:"domain,omitempty"`
1810
        Name      string   `json:"name,omitempty"`
1811
        Hub       bool     `json:"is_hub,omitempty"`
1812
        Cluster   string   `json:"cluster,omitempty"`
1813
        Headers   bool     `json:"headers,omitempty"`
1814
        JetStream bool     `json:"jetstream,omitempty"`
1815
        DenyPub   []string `json:"deny_pub,omitempty"`
1816

1817
        // There was an existing field called:
1818
        // >> Comp bool `json:"compression,omitempty"`
1819
        // that has never been used. With support for compression, we now need
1820
        // a field that is a string. So we use a different json tag:
1821
        Compression string `json:"compress_mode,omitempty"`
1822

1823
        // Just used to detect wrong connection attempts.
1824
        Gateway string `json:"gateway,omitempty"`
1825

1826
        // Tells the accept side which account the remote is binding to.
1827
        RemoteAccount string `json:"remote_account,omitempty"`
1828

1829
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1830
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1831
        // version as part of the CONNECT. It will indicate if a connection supports
1832
        // some features, such as message tracing.
1833
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1834
        // in the low level process CONNECT.
1835
        Proto int `json:"protocol,omitempty"`
1836
}
1837

1838
// processLeafNodeConnect will process the inbound connect args.
1839
// Once we are here we are bound to an account, so can send any interest that
1840
// we would have to the other side.
1841
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
642✔
1842
        // Way to detect clients that incorrectly connect to the route listen
642✔
1843
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
642✔
1844
        if lang != _EMPTY_ {
642✔
1845
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1846
                c.closeConnection(WrongPort)
×
1847
                return ErrClientConnectedToLeafNodePort
×
1848
        }
×
1849

1850
        // Unmarshal as a leaf node connect protocol
1851
        proto := &leafConnectInfo{}
642✔
1852
        if err := json.Unmarshal(arg, proto); err != nil {
642✔
1853
                return err
×
1854
        }
×
1855

1856
        // Reject a cluster that contains spaces.
1857
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
643✔
1858
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1859
                c.closeConnection(ProtocolViolation)
1✔
1860
                return ErrClusterNameHasSpaces
1✔
1861
        }
1✔
1862

1863
        // Check for cluster name collisions.
1864
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
645✔
1865
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1866
                c.closeConnection(ClusterNamesIdentical)
4✔
1867
                return ErrLeafNodeHasSameClusterName
4✔
1868
        }
4✔
1869

1870
        // Reject if this has Gateway which means that it would be from a gateway
1871
        // connection that incorrectly connects to the leafnode port.
1872
        if proto.Gateway != _EMPTY_ {
637✔
1873
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1874
                c.Errorf(errTxt)
×
1875
                c.sendErr(errTxt)
×
1876
                c.closeConnection(WrongGateway)
×
1877
                return ErrWrongGateway
×
1878
        }
×
1879

1880
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
639✔
1881
                major, minor, update, _ := versionComponents(mv)
2✔
1882
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1883
                        // We are going to send back an INFO because otherwise recent
1✔
1884
                        // versions of the remote server would simply break the connection
1✔
1885
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1886
                        // other side to just "stall" until we finish waiting for the holding
1✔
1887
                        // period and close the connection below.
1✔
1888
                        s.sendPermsAndAccountInfo(c)
1✔
1889
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1890
                        select {
1✔
1891
                        case <-c.srv.quitCh:
1✔
1892
                        case <-time.After(leafNodeWaitBeforeClose):
×
1893
                        }
1894
                        c.closeConnection(MinimumVersionRequired)
1✔
1895
                        return ErrMinimumVersionRequired
1✔
1896
                }
1897
        }
1898

1899
        // Check if this server supports headers.
1900
        supportHeaders := c.srv.supportsHeaders()
636✔
1901

636✔
1902
        c.mu.Lock()
636✔
1903
        // Leaf Nodes do not do echo or verbose or pedantic.
636✔
1904
        c.opts.Verbose = false
636✔
1905
        c.opts.Echo = false
636✔
1906
        c.opts.Pedantic = false
636✔
1907
        // This inbound connection will be marked as supporting headers if this server
636✔
1908
        // support headers and the remote has sent in the CONNECT protocol that it does
636✔
1909
        // support headers too.
636✔
1910
        c.headers = supportHeaders && proto.Headers
636✔
1911
        // If the compression level is still not set, set it based on what has been
636✔
1912
        // given to us in the CONNECT protocol.
636✔
1913
        if c.leaf.compression == _EMPTY_ {
757✔
1914
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
121✔
1915
                if proto.Compression == _EMPTY_ {
147✔
1916
                        c.leaf.compression = CompressionNotSupported
26✔
1917
                } else {
121✔
1918
                        c.leaf.compression = proto.Compression
95✔
1919
                }
95✔
1920
        }
1921

1922
        // Remember the remote server.
1923
        c.leaf.remoteServer = proto.Name
636✔
1924
        // Remember the remote account name
636✔
1925
        c.leaf.remoteAccName = proto.RemoteAccount
636✔
1926

636✔
1927
        // If the other side has declared itself a hub, so we will take on the spoke role.
636✔
1928
        if proto.Hub {
642✔
1929
                c.leaf.isSpoke = true
6✔
1930
        }
6✔
1931

1932
        // The soliciting side is part of a cluster.
1933
        if proto.Cluster != _EMPTY_ {
1,132✔
1934
                c.leaf.remoteCluster = proto.Cluster
496✔
1935
        }
496✔
1936

1937
        c.leaf.remoteDomain = proto.Domain
636✔
1938

636✔
1939
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
636✔
1940
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
636✔
1941
        if !c.isSolicitedLeafNode() && c.perms != nil {
648✔
1942
                sp, pp := c.perms.sub, c.perms.pub
12✔
1943
                c.perms.sub, c.perms.pub = pp, sp
12✔
1944
                if c.opts.Import != nil {
23✔
1945
                        c.darray = c.opts.Import.Deny
11✔
1946
                } else {
12✔
1947
                        c.darray = nil
1✔
1948
                }
1✔
1949
        }
1950

1951
        // Set the Ping timer
1952
        c.setFirstPingTimer()
636✔
1953

636✔
1954
        // If we received pub deny permissions from the other end, merge with existing ones.
636✔
1955
        c.mergeDenyPermissions(pub, proto.DenyPub)
636✔
1956

636✔
1957
        c.mu.Unlock()
636✔
1958

636✔
1959
        // Register the cluster, even if empty, as long as we are acting as a hub.
636✔
1960
        if !proto.Hub {
1,266✔
1961
                c.acc.registerLeafNodeCluster(proto.Cluster)
630✔
1962
        }
630✔
1963

1964
        // Add in the leafnode here since we passed through auth at this point.
1965
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
636✔
1966

636✔
1967
        // If we have permissions bound to this leafnode we need to send then back to the
636✔
1968
        // origin server for local enforcement.
636✔
1969
        s.sendPermsAndAccountInfo(c)
636✔
1970

636✔
1971
        // Create and initialize the smap since we know our bound account now.
636✔
1972
        // This will send all registered subs too.
636✔
1973
        s.initLeafNodeSmapAndSendSubs(c)
636✔
1974

636✔
1975
        // Announce the account connect event for a leaf node.
636✔
1976
        // This will no-op as needed.
636✔
1977
        s.sendLeafNodeConnect(c.acc)
636✔
1978

636✔
1979
        return nil
636✔
1980
}
1981

1982
// Returns the remote cluster name. This is set only once so does not require a lock.
1983
func (c *client) remoteCluster() string {
157,095✔
1984
        if c.leaf == nil {
157,095✔
1985
                return _EMPTY_
×
1986
        }
×
1987
        return c.leaf.remoteCluster
157,095✔
1988
}
1989

1990
// Sends back an info block to the soliciting leafnode to let it know about
1991
// its permission settings for local enforcement.
1992
func (s *Server) sendPermsAndAccountInfo(c *client) {
637✔
1993
        // Copy
637✔
1994
        info := s.copyLeafNodeInfo()
637✔
1995
        c.mu.Lock()
637✔
1996
        info.CID = c.cid
637✔
1997
        info.Import = c.opts.Import
637✔
1998
        info.Export = c.opts.Export
637✔
1999
        info.RemoteAccount = c.acc.Name
637✔
2000
        info.ConnectInfo = true
637✔
2001
        c.enqueueProto(generateInfoJSON(info))
637✔
2002
        c.mu.Unlock()
637✔
2003
}
637✔
2004

2005
// Snapshot the current subscriptions from the sublist into our smap which
2006
// we will keep updated from now on.
2007
// Also send the registered subscriptions.
2008
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,244✔
2009
        acc := c.acc
1,244✔
2010
        if acc == nil {
1,244✔
2011
                c.Debugf("Leafnode does not have an account bound")
×
2012
                return
×
2013
        }
×
2014
        // Collect all account subs here.
2015
        _subs := [1024]*subscription{}
1,244✔
2016
        subs := _subs[:0]
1,244✔
2017
        ims := []string{}
1,244✔
2018

1,244✔
2019
        // Hold the client lock otherwise there can be a race and miss some subs.
1,244✔
2020
        c.mu.Lock()
1,244✔
2021
        defer c.mu.Unlock()
1,244✔
2022

1,244✔
2023
        acc.mu.RLock()
1,244✔
2024
        accName := acc.Name
1,244✔
2025
        accNTag := acc.nameTag
1,244✔
2026

1,244✔
2027
        // To make printing look better when no friendly name present.
1,244✔
2028
        if accNTag != _EMPTY_ {
1,254✔
2029
                accNTag = "/" + accNTag
10✔
2030
        }
10✔
2031

2032
        // If we are solicited we only send interest for local clients.
2033
        if c.isSpokeLeafNode() {
1,852✔
2034
                acc.sl.localSubs(&subs, true)
608✔
2035
        } else {
1,244✔
2036
                acc.sl.All(&subs)
636✔
2037
        }
636✔
2038

2039
        // Check if we have an existing service import reply.
2040
        siReply := copyBytes(acc.siReply)
1,244✔
2041

1,244✔
2042
        // Since leaf nodes only send on interest, if the bound
1,244✔
2043
        // account has import services we need to send those over.
1,244✔
2044
        for isubj := range acc.imports.services {
5,845✔
2045
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,869✔
2046
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
268✔
2047
                        continue
268✔
2048
                }
2049
                ims = append(ims, isubj)
4,333✔
2050
        }
2051
        // Likewise for mappings.
2052
        for _, m := range acc.mappings {
3,398✔
2053
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,190✔
2054
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2055
                        continue
36✔
2056
                }
2057
                ims = append(ims, m.src)
2,118✔
2058
        }
2059

2060
        // Create a unique subject that will be used for loop detection.
2061
        lds := acc.lds
1,244✔
2062
        acc.mu.RUnlock()
1,244✔
2063

1,244✔
2064
        // Check if we have to create the LDS.
1,244✔
2065
        if lds == _EMPTY_ {
2,221✔
2066
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
977✔
2067
                acc.mu.Lock()
977✔
2068
                acc.lds = lds
977✔
2069
                acc.mu.Unlock()
977✔
2070
        }
977✔
2071

2072
        // Now check for gateway interest. Leafnodes will put this into
2073
        // the proper mode to propagate, but they are not held in the account.
2074
        gwsa := [16]*client{}
1,244✔
2075
        gws := gwsa[:0]
1,244✔
2076
        s.getOutboundGatewayConnections(&gws)
1,244✔
2077
        for _, cgw := range gws {
1,325✔
2078
                cgw.mu.Lock()
81✔
2079
                gw := cgw.gw
81✔
2080
                cgw.mu.Unlock()
81✔
2081
                if gw != nil {
162✔
2082
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
162✔
2083
                                if e := ei.(*outsie); e != nil && e.sl != nil {
162✔
2084
                                        e.sl.All(&subs)
81✔
2085
                                }
81✔
2086
                        }
2087
                }
2088
        }
2089

2090
        applyGlobalRouting := s.gateway.enabled
1,244✔
2091
        if c.isSpokeLeafNode() {
1,852✔
2092
                // Add a fake subscription for this solicited leafnode connection
608✔
2093
                // so that we can send back directly for mapped GW replies.
608✔
2094
                // We need to keep track of this subscription so it can be removed
608✔
2095
                // when the connection is closed so that the GC can release it.
608✔
2096
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
608✔
2097
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
608✔
2098
        }
608✔
2099

2100
        // Now walk the results and add them to our smap
2101
        rc := c.leaf.remoteCluster
1,244✔
2102
        c.leaf.smap = make(map[string]int32)
1,244✔
2103
        for _, sub := range subs {
38,821✔
2104
                // Check perms regardless of role.
37,577✔
2105
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
39,901✔
2106
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,324✔
2107
                        continue
2,324✔
2108
                }
2109
                // We ignore ourselves here.
2110
                // Also don't add the subscription if it has a origin cluster and the
2111
                // cluster name matches the one of the client we are sending to.
2112
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,098✔
2113
                        count := int32(1)
29,845✔
2114
                        if len(sub.queue) > 0 && sub.qw > 0 {
29,856✔
2115
                                count = sub.qw
11✔
2116
                        }
11✔
2117
                        c.leaf.smap[keyFromSub(sub)] += count
29,845✔
2118
                        if c.leaf.tsub == nil {
31,016✔
2119
                                c.leaf.tsub = make(map[*subscription]struct{})
1,171✔
2120
                        }
1,171✔
2121
                        c.leaf.tsub[sub] = struct{}{}
29,845✔
2122
                }
2123
        }
2124
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2125
        for _, isubj := range ims {
7,695✔
2126
                c.leaf.smap[isubj]++
6,451✔
2127
        }
6,451✔
2128
        // If we have gateways enabled we need to make sure the other side sends us responses
2129
        // that have been augmented from the original subscription.
2130
        // TODO(dlc) - Should we lock this down more?
2131
        if applyGlobalRouting {
1,345✔
2132
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
101✔
2133
                c.leaf.smap[gwReplyPrefix+">"]++
101✔
2134
        }
101✔
2135
        // Detect loops by subscribing to a specific subject and checking
2136
        // if this sub is coming back to us.
2137
        c.leaf.smap[lds]++
1,244✔
2138

1,244✔
2139
        // Check if we need to add an existing siReply to our map.
1,244✔
2140
        // This will be a prefix so add on the wildcard.
1,244✔
2141
        if siReply != nil {
1,259✔
2142
                wcsub := append(siReply, '>')
15✔
2143
                c.leaf.smap[string(wcsub)]++
15✔
2144
        }
15✔
2145
        // Queue all protocols. There is no max pending limit for LN connection,
2146
        // so we don't need chunking. The writes will happen from the writeLoop.
2147
        var b bytes.Buffer
1,244✔
2148
        for key, n := range c.leaf.smap {
27,168✔
2149
                c.writeLeafSub(&b, key, n)
25,924✔
2150
        }
25,924✔
2151
        if b.Len() > 0 {
2,488✔
2152
                c.enqueueProto(b.Bytes())
1,244✔
2153
        }
1,244✔
2154
        if c.leaf.tsub != nil {
2,416✔
2155
                // Clear the tsub map after 5 seconds.
1,172✔
2156
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,213✔
2157
                        c.mu.Lock()
41✔
2158
                        if c.leaf != nil {
82✔
2159
                                c.leaf.tsub = nil
41✔
2160
                                c.leaf.tsubt = nil
41✔
2161
                        }
41✔
2162
                        c.mu.Unlock()
41✔
2163
                })
2164
        }
2165
}
2166

2167
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2168
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
197,229✔
2169
        acc, err := s.LookupAccount(accName)
197,229✔
2170
        if acc == nil || err != nil {
197,337✔
2171
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
108✔
2172
                return
108✔
2173
        }
108✔
2174
        acc.updateLeafNodes(sub, delta)
197,121✔
2175
}
2176

2177
// updateLeafNodes will make sure to update the account smap for the subscription.
2178
// Will also forward to all leaf nodes as needed.
2179
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,068,333✔
2180
        if acc == nil || sub == nil {
2,068,333✔
2181
                return
×
2182
        }
×
2183

2184
        // We will do checks for no leafnodes and same cluster here inline and under the
2185
        // general account read lock.
2186
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2187
        // blocking routes or GWs.
2188

2189
        acc.mu.RLock()
2,068,333✔
2190
        // First check if we even have leafnodes here.
2,068,333✔
2191
        if acc.nleafs == 0 {
4,070,667✔
2192
                acc.mu.RUnlock()
2,002,334✔
2193
                return
2,002,334✔
2194
        }
2,002,334✔
2195

2196
        // Is this a loop detection subject.
2197
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
65,999✔
2198

65,999✔
2199
        // Capture the cluster even if its empty.
65,999✔
2200
        var cluster string
65,999✔
2201
        if sub.origin != nil {
114,554✔
2202
                cluster = bytesToString(sub.origin)
48,555✔
2203
        }
48,555✔
2204

2205
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2206
        // Empty clusters will return false for the check.
2207
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
87,042✔
2208
                acc.mu.RUnlock()
21,043✔
2209
                return
21,043✔
2210
        }
21,043✔
2211

2212
        // We can release the general account lock.
2213
        acc.mu.RUnlock()
44,956✔
2214

44,956✔
2215
        // We can hold the list lock here to avoid having to copy a large slice.
44,956✔
2216
        acc.lmu.RLock()
44,956✔
2217
        defer acc.lmu.RUnlock()
44,956✔
2218

44,956✔
2219
        // Do this once.
44,956✔
2220
        subject := string(sub.subject)
44,956✔
2221

44,956✔
2222
        // Walk the connected leafnodes.
44,956✔
2223
        for _, ln := range acc.lleafs {
100,780✔
2224
                if ln == sub.client {
84,791✔
2225
                        continue
28,967✔
2226
                }
2227
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2228
                ln.mu.Lock()
26,857✔
2229
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
26,857✔
2230
                // the detection of loops as long as different cluster.
26,857✔
2231
                clusterDifferent := cluster != ln.remoteCluster()
26,857✔
2232
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
49,615✔
2233
                        ln.updateSmap(sub, delta, isLDS)
22,758✔
2234
                }
22,758✔
2235
                ln.mu.Unlock()
26,857✔
2236
        }
2237
}
2238

2239
// This will make an update to our internal smap and determine if we should send out
2240
// an interest update to the remote side.
2241
// Lock should be held.
2242
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
22,758✔
2243
        if c.leaf.smap == nil {
22,770✔
2244
                return
12✔
2245
        }
12✔
2246

2247
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2248
        skind := sub.client.kind
22,746✔
2249
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
22,746✔
2250
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
30,783✔
2251
                return
8,037✔
2252
        }
8,037✔
2253

2254
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2255
        if delta > 0 && c.leaf.tsub != nil {
21,810✔
2256
                if _, present := c.leaf.tsub[sub]; present {
7,101✔
2257
                        delete(c.leaf.tsub, sub)
×
2258
                        if len(c.leaf.tsub) == 0 {
×
2259
                                c.leaf.tsub = nil
×
2260
                                c.leaf.tsubt.Stop()
×
2261
                                c.leaf.tsubt = nil
×
2262
                        }
×
2263
                        return
×
2264
                }
2265
        }
2266

2267
        key := keyFromSub(sub)
14,709✔
2268
        n, ok := c.leaf.smap[key]
14,709✔
2269
        if delta < 0 && !ok {
15,609✔
2270
                return
900✔
2271
        }
900✔
2272

2273
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2274
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,809✔
2275
        n += delta
13,809✔
2276
        if n > 0 {
24,152✔
2277
                c.leaf.smap[key] = n
10,343✔
2278
        } else {
13,809✔
2279
                delete(c.leaf.smap, key)
3,466✔
2280
        }
3,466✔
2281
        if update {
23,107✔
2282
                c.sendLeafNodeSubUpdate(key, n)
9,298✔
2283
        }
9,298✔
2284
}
2285

2286
// Used to force add subjects to the subject map.
2287
func (c *client) forceAddToSmap(subj string) {
4✔
2288
        c.mu.Lock()
4✔
2289
        defer c.mu.Unlock()
4✔
2290

4✔
2291
        if c.leaf.smap == nil {
4✔
2292
                return
×
2293
        }
×
2294
        n := c.leaf.smap[subj]
4✔
2295
        if n != 0 {
5✔
2296
                return
1✔
2297
        }
1✔
2298
        // Place into the map since it was not there.
2299
        c.leaf.smap[subj] = 1
3✔
2300
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2301
}
2302

2303
// Used to force remove a subject from the subject map.
2304
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2305
        c.mu.Lock()
1✔
2306
        defer c.mu.Unlock()
1✔
2307

1✔
2308
        if c.leaf.smap == nil {
1✔
2309
                return
×
2310
        }
×
2311
        n := c.leaf.smap[subj]
1✔
2312
        if n == 0 {
1✔
2313
                return
×
2314
        }
×
2315
        n--
1✔
2316
        if n == 0 {
2✔
2317
                // Remove is now zero
1✔
2318
                delete(c.leaf.smap, subj)
1✔
2319
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2320
        } else {
1✔
2321
                c.leaf.smap[subj] = n
×
2322
        }
×
2323
}
2324

2325
// Send the subscription interest change to the other side.
2326
// Lock should be held.
2327
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,302✔
2328
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,302✔
2329
        if c.isSpokeLeafNode() {
11,535✔
2330
                checkPerms := true
2,233✔
2331
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,517✔
2332
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,284✔
2333
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,284✔
2334
                                strings.HasPrefix(key, gwReplyPrefix) {
1,363✔
2335
                                checkPerms = false
79✔
2336
                        }
79✔
2337
                }
2338
                if checkPerms {
4,387✔
2339
                        var subject string
2,154✔
2340
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,645✔
2341
                                subject = key[:sep]
491✔
2342
                        } else {
2,154✔
2343
                                subject = key
1,663✔
2344
                        }
1,663✔
2345
                        if !c.canSubscribe(subject) {
2,154✔
2346
                                return
×
2347
                        }
×
2348
                }
2349
        }
2350
        // If we are here we can send over to the other side.
2351
        _b := [64]byte{}
9,302✔
2352
        b := bytes.NewBuffer(_b[:0])
9,302✔
2353
        c.writeLeafSub(b, key, n)
9,302✔
2354
        c.enqueueProto(b.Bytes())
9,302✔
2355
}
2356

2357
// Helper function to build the key.
2358
func keyFromSub(sub *subscription) string {
45,548✔
2359
        var sb strings.Builder
45,548✔
2360
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,548✔
2361
        sb.Write(sub.subject)
45,548✔
2362
        if sub.queue != nil {
49,310✔
2363
                // Just make the key subject spc group, e.g. 'foo bar'
3,762✔
2364
                sb.WriteByte(' ')
3,762✔
2365
                sb.Write(sub.queue)
3,762✔
2366
        }
3,762✔
2367
        return sb.String()
45,548✔
2368
}
2369

2370
const (
2371
        keyRoutedSub         = "R"
2372
        keyRoutedSubByte     = 'R'
2373
        keyRoutedLeafSub     = "L"
2374
        keyRoutedLeafSubByte = 'L'
2375
)
2376

2377
// Helper function to build the key that prevents collisions between normal
2378
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2379
// Keys will look like this:
2380
// "R foo"          -> plain routed sub on "foo"
2381
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2382
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2383
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2384
func keyFromSubWithOrigin(sub *subscription) string {
521,550✔
2385
        var sb strings.Builder
521,550✔
2386
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
521,550✔
2387
        leaf := len(sub.origin) > 0
521,550✔
2388
        if leaf {
538,243✔
2389
                sb.WriteByte(keyRoutedLeafSubByte)
16,693✔
2390
        } else {
521,550✔
2391
                sb.WriteByte(keyRoutedSubByte)
504,857✔
2392
        }
504,857✔
2393
        sb.WriteByte(' ')
521,550✔
2394
        sb.Write(sub.subject)
521,550✔
2395
        if sub.queue != nil {
545,119✔
2396
                sb.WriteByte(' ')
23,569✔
2397
                sb.Write(sub.queue)
23,569✔
2398
        }
23,569✔
2399
        if leaf {
538,243✔
2400
                sb.WriteByte(' ')
16,693✔
2401
                sb.Write(sub.origin)
16,693✔
2402
        }
16,693✔
2403
        return sb.String()
521,550✔
2404
}
2405

2406
// Lock should be held.
2407
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,226✔
2408
        if key == _EMPTY_ {
35,226✔
2409
                return
×
2410
        }
×
2411
        if n > 0 {
66,985✔
2412
                w.WriteString("LS+ " + key)
31,759✔
2413
                // Check for queue semantics, if found write n.
31,759✔
2414
                if strings.Contains(key, " ") {
34,064✔
2415
                        w.WriteString(" ")
2,305✔
2416
                        var b [12]byte
2,305✔
2417
                        var i = len(b)
2,305✔
2418
                        for l := n; l > 0; l /= 10 {
5,509✔
2419
                                i--
3,204✔
2420
                                b[i] = digits[l%10]
3,204✔
2421
                        }
3,204✔
2422
                        w.Write(b[i:])
2,305✔
2423
                        if c.trace {
2,305✔
2424
                                arg := fmt.Sprintf("%s %d", key, n)
×
2425
                                c.traceOutOp("LS+", []byte(arg))
×
2426
                        }
×
2427
                } else if c.trace {
29,650✔
2428
                        c.traceOutOp("LS+", []byte(key))
196✔
2429
                }
196✔
2430
        } else {
3,467✔
2431
                w.WriteString("LS- " + key)
3,467✔
2432
                if c.trace {
3,482✔
2433
                        c.traceOutOp("LS-", []byte(key))
15✔
2434
                }
15✔
2435
        }
2436
        w.WriteString(CR_LF)
35,226✔
2437
}
2438

2439
// processLeafSub will process an inbound sub request for the remote leaf node.
2440
func (c *client) processLeafSub(argo []byte) (err error) {
31,577✔
2441
        // Indicate activity.
31,577✔
2442
        c.in.subs++
31,577✔
2443

31,577✔
2444
        srv := c.srv
31,577✔
2445
        if srv == nil {
31,577✔
2446
                return nil
×
2447
        }
×
2448

2449
        // Copy so we do not reference a potentially large buffer
2450
        arg := make([]byte, len(argo))
31,577✔
2451
        copy(arg, argo)
31,577✔
2452

31,577✔
2453
        args := splitArg(arg)
31,577✔
2454
        sub := &subscription{client: c}
31,577✔
2455

31,577✔
2456
        delta := int32(1)
31,577✔
2457
        switch len(args) {
31,577✔
2458
        case 1:
29,322✔
2459
                sub.queue = nil
29,322✔
2460
        case 3:
2,255✔
2461
                sub.queue = args[1]
2,255✔
2462
                sub.qw = int32(parseSize(args[2]))
2,255✔
2463
                // TODO: (ik) We should have a non empty queue name and a queue
2,255✔
2464
                // weight >= 1. For 2.11, we may want to return an error if that
2,255✔
2465
                // is not the case, but for now just overwrite `delta` if queue
2,255✔
2466
                // weight is greater than 1 (it is possible after a reconnect/
2,255✔
2467
                // server restart to receive a queue weight > 1 for a new sub).
2,255✔
2468
                if sub.qw > 1 {
3,907✔
2469
                        delta = sub.qw
1,652✔
2470
                }
1,652✔
2471
        default:
×
2472
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2473
        }
2474
        sub.subject = args[0]
31,577✔
2475

31,577✔
2476
        c.mu.Lock()
31,577✔
2477
        if c.isClosed() {
31,589✔
2478
                c.mu.Unlock()
12✔
2479
                return nil
12✔
2480
        }
12✔
2481

2482
        acc := c.acc
31,565✔
2483
        // Check if we have a loop.
31,565✔
2484
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,565✔
2485

31,565✔
2486
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,571✔
2487
                c.mu.Unlock()
6✔
2488
                c.handleLeafNodeLoop(true)
6✔
2489
                return nil
6✔
2490
        }
6✔
2491

2492
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2493
        checkPerms := true
31,559✔
2494
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,243✔
2495
                if ldsPrefix ||
28,684✔
2496
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,684✔
2497
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,613✔
2498
                        checkPerms = false
1,929✔
2499
                }
1,929✔
2500
        }
2501

2502
        // If we are a hub check that we can publish to this subject.
2503
        if checkPerms {
61,189✔
2504
                subj := string(sub.subject)
29,630✔
2505
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
29,950✔
2506
                        c.mu.Unlock()
320✔
2507
                        c.leafSubPermViolation(sub.subject)
320✔
2508
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
320✔
2509
                        return nil
320✔
2510
                }
320✔
2511
        }
2512

2513
        // Check if we have a maximum on the number of subscriptions.
2514
        if c.subsAtLimit() {
31,247✔
2515
                c.mu.Unlock()
8✔
2516
                c.maxSubsExceeded()
8✔
2517
                return nil
8✔
2518
        }
8✔
2519

2520
        // If we have an origin cluster associated mark that in the sub.
2521
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,439✔
2522
                sub.origin = []byte(rc)
28,208✔
2523
        }
28,208✔
2524

2525
        // Like Routes, we store local subs by account and subject and optionally queue name.
2526
        // If we have a queue it will have a trailing weight which we do not want.
2527
        if sub.queue != nil {
33,198✔
2528
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,967✔
2529
        } else {
31,231✔
2530
                sub.sid = arg
29,264✔
2531
        }
29,264✔
2532
        key := bytesToString(sub.sid)
31,231✔
2533
        osub := c.subs[key]
31,231✔
2534
        if osub == nil {
60,946✔
2535
                c.subs[key] = sub
29,715✔
2536
                // Now place into the account sl.
29,715✔
2537
                if err := acc.sl.Insert(sub); err != nil {
29,715✔
2538
                        delete(c.subs, key)
×
2539
                        c.mu.Unlock()
×
2540
                        c.Errorf("Could not insert subscription: %v", err)
×
2541
                        c.sendErr("Invalid Subscription")
×
2542
                        return nil
×
2543
                }
×
2544
        } else if sub.queue != nil {
3,031✔
2545
                // For a queue we need to update the weight.
1,515✔
2546
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,515✔
2547
                atomic.StoreInt32(&osub.qw, sub.qw)
1,515✔
2548
                acc.sl.UpdateRemoteQSub(osub)
1,515✔
2549
        }
1,515✔
2550
        spoke := c.isSpokeLeafNode()
31,231✔
2551
        c.mu.Unlock()
31,231✔
2552

31,231✔
2553
        // Only add in shadow subs if a new sub or qsub.
31,231✔
2554
        if osub == nil {
60,946✔
2555
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,715✔
2556
                        c.Errorf(err.Error())
×
2557
                }
×
2558
        }
2559

2560
        // If we are not solicited, treat leaf node subscriptions similar to a
2561
        // client subscription, meaning we forward them to routes, gateways and
2562
        // other leaf nodes as needed.
2563
        if !spoke {
42,152✔
2564
                // If we are routing add to the route map for the associated account.
10,921✔
2565
                srv.updateRouteSubscriptionMap(acc, sub, delta)
10,921✔
2566
                if srv.gateway.enabled {
12,439✔
2567
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,518✔
2568
                }
1,518✔
2569
        }
2570
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2571
        // and non-solicited state in this call so we will do the right thing.
2572
        acc.updateLeafNodes(sub, delta)
31,231✔
2573

31,231✔
2574
        return nil
31,231✔
2575
}
2576

2577
// If the leafnode is a solicited, set the connect delay based on default
2578
// or private option (for tests). Sends the error to the other side, log and
2579
// close the connection.
2580
func (c *client) handleLeafNodeLoop(sendErr bool) {
14✔
2581
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
14✔
2582
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
14✔
2583
        if sendErr {
22✔
2584
                c.sendErr(errTxt)
8✔
2585
        }
8✔
2586

2587
        c.Errorf(errTxt)
14✔
2588
        // If we are here with "sendErr" false, it means that this is the server
14✔
2589
        // that received the error. The other side will have closed the connection,
14✔
2590
        // but does not hurt to close here too.
14✔
2591
        c.closeConnection(ProtocolViolation)
14✔
2592
}
2593

2594
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2595
func (c *client) processLeafUnsub(arg []byte) error {
3,282✔
2596
        // Indicate any activity, so pub and sub or unsubs.
3,282✔
2597
        c.in.subs++
3,282✔
2598

3,282✔
2599
        acc := c.acc
3,282✔
2600
        srv := c.srv
3,282✔
2601

3,282✔
2602
        c.mu.Lock()
3,282✔
2603
        if c.isClosed() {
3,340✔
2604
                c.mu.Unlock()
58✔
2605
                return nil
58✔
2606
        }
58✔
2607

2608
        spoke := c.isSpokeLeafNode()
3,224✔
2609
        // We store local subs by account and subject and optionally queue name.
3,224✔
2610
        // LS- will have the arg exactly as the key.
3,224✔
2611
        sub, ok := c.subs[string(arg)]
3,224✔
2612
        if !ok {
3,236✔
2613
                // If not found, don't try to update routes/gws/leaf nodes.
12✔
2614
                c.mu.Unlock()
12✔
2615
                return nil
12✔
2616
        }
12✔
2617
        delta := int32(1)
3,212✔
2618
        if len(sub.queue) > 0 {
3,624✔
2619
                delta = sub.qw
412✔
2620
        }
412✔
2621
        c.mu.Unlock()
3,212✔
2622

3,212✔
2623
        c.unsubscribe(acc, sub, true, true)
3,212✔
2624
        if !spoke {
4,243✔
2625
                // If we are routing subtract from the route map for the associated account.
1,031✔
2626
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,031✔
2627
                // Gateways
1,031✔
2628
                if srv.gateway.enabled {
1,350✔
2629
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
319✔
2630
                }
319✔
2631
        }
2632
        // Now check on leafnode updates for other leaf nodes.
2633
        acc.updateLeafNodes(sub, -delta)
3,212✔
2634
        return nil
3,212✔
2635
}
2636

2637
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
474✔
2638
        // Unroll splitArgs to avoid runtime/heap issues
474✔
2639
        a := [MAX_MSG_ARGS][]byte{}
474✔
2640
        args := a[:0]
474✔
2641
        start := -1
474✔
2642
        for i, b := range arg {
31,326✔
2643
                switch b {
30,852✔
2644
                case ' ', '\t', '\r', '\n':
1,355✔
2645
                        if start >= 0 {
2,710✔
2646
                                args = append(args, arg[start:i])
1,355✔
2647
                                start = -1
1,355✔
2648
                        }
1,355✔
2649
                default:
29,497✔
2650
                        if start < 0 {
31,326✔
2651
                                start = i
1,829✔
2652
                        }
1,829✔
2653
                }
2654
        }
2655
        if start >= 0 {
948✔
2656
                args = append(args, arg[start:])
474✔
2657
        }
474✔
2658

2659
        c.pa.arg = arg
474✔
2660
        switch len(args) {
474✔
2661
        case 0, 1, 2:
×
2662
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2663
        case 3:
85✔
2664
                c.pa.reply = nil
85✔
2665
                c.pa.queues = nil
85✔
2666
                c.pa.hdb = args[1]
85✔
2667
                c.pa.hdr = parseSize(args[1])
85✔
2668
                c.pa.szb = args[2]
85✔
2669
                c.pa.size = parseSize(args[2])
85✔
2670
        case 4:
375✔
2671
                c.pa.reply = args[1]
375✔
2672
                c.pa.queues = nil
375✔
2673
                c.pa.hdb = args[2]
375✔
2674
                c.pa.hdr = parseSize(args[2])
375✔
2675
                c.pa.szb = args[3]
375✔
2676
                c.pa.size = parseSize(args[3])
375✔
2677
        default:
14✔
2678
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2679
                if len(args[1]) != 1 {
14✔
2680
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2681
                }
×
2682
                switch args[1][0] {
14✔
2683
                case '+':
4✔
2684
                        c.pa.reply = args[2]
4✔
2685
                case '|':
10✔
2686
                        c.pa.reply = nil
10✔
2687
                default:
×
2688
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2689
                }
2690
                // Grab header size.
2691
                c.pa.hdb = args[len(args)-2]
14✔
2692
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2693

14✔
2694
                // Grab size.
14✔
2695
                c.pa.szb = args[len(args)-1]
14✔
2696
                c.pa.size = parseSize(c.pa.szb)
14✔
2697

14✔
2698
                // Grab queue names.
14✔
2699
                if c.pa.reply != nil {
18✔
2700
                        c.pa.queues = args[3 : len(args)-2]
4✔
2701
                } else {
14✔
2702
                        c.pa.queues = args[2 : len(args)-2]
10✔
2703
                }
10✔
2704
        }
2705
        if c.pa.hdr < 0 {
474✔
2706
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2707
        }
×
2708
        if c.pa.size < 0 {
474✔
2709
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2710
        }
×
2711
        if c.pa.hdr > c.pa.size {
474✔
2712
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2713
        }
×
2714

2715
        // Common ones processed after check for arg length
2716
        c.pa.subject = args[0]
474✔
2717

474✔
2718
        return nil
474✔
2719
}
2720

2721
func (c *client) processLeafMsgArgs(arg []byte) error {
74,611✔
2722
        // Unroll splitArgs to avoid runtime/heap issues
74,611✔
2723
        a := [MAX_MSG_ARGS][]byte{}
74,611✔
2724
        args := a[:0]
74,611✔
2725
        start := -1
74,611✔
2726
        for i, b := range arg {
2,199,007✔
2727
                switch b {
2,124,396✔
2728
                case ' ', '\t', '\r', '\n':
121,968✔
2729
                        if start >= 0 {
243,936✔
2730
                                args = append(args, arg[start:i])
121,968✔
2731
                                start = -1
121,968✔
2732
                        }
121,968✔
2733
                default:
2,002,428✔
2734
                        if start < 0 {
2,199,007✔
2735
                                start = i
196,579✔
2736
                        }
196,579✔
2737
                }
2738
        }
2739
        if start >= 0 {
149,222✔
2740
                args = append(args, arg[start:])
74,611✔
2741
        }
74,611✔
2742

2743
        c.pa.arg = arg
74,611✔
2744
        switch len(args) {
74,611✔
2745
        case 0, 1:
×
2746
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2747
        case 2:
49,973✔
2748
                c.pa.reply = nil
49,973✔
2749
                c.pa.queues = nil
49,973✔
2750
                c.pa.szb = args[1]
49,973✔
2751
                c.pa.size = parseSize(args[1])
49,973✔
2752
        case 3:
2,080✔
2753
                c.pa.reply = args[1]
2,080✔
2754
                c.pa.queues = nil
2,080✔
2755
                c.pa.szb = args[2]
2,080✔
2756
                c.pa.size = parseSize(args[2])
2,080✔
2757
        default:
22,558✔
2758
                // args[1] is our reply indicator. Should be + or | normally.
22,558✔
2759
                if len(args[1]) != 1 {
22,558✔
2760
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2761
                }
×
2762
                switch args[1][0] {
22,558✔
2763
                case '+':
161✔
2764
                        c.pa.reply = args[2]
161✔
2765
                case '|':
22,397✔
2766
                        c.pa.reply = nil
22,397✔
2767
                default:
×
2768
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2769
                }
2770
                // Grab size.
2771
                c.pa.szb = args[len(args)-1]
22,558✔
2772
                c.pa.size = parseSize(c.pa.szb)
22,558✔
2773

22,558✔
2774
                // Grab queue names.
22,558✔
2775
                if c.pa.reply != nil {
22,719✔
2776
                        c.pa.queues = args[3 : len(args)-1]
161✔
2777
                } else {
22,558✔
2778
                        c.pa.queues = args[2 : len(args)-1]
22,397✔
2779
                }
22,397✔
2780
        }
2781
        if c.pa.size < 0 {
74,611✔
2782
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2783
        }
×
2784

2785
        // Common ones processed after check for arg length
2786
        c.pa.subject = args[0]
74,611✔
2787

74,611✔
2788
        return nil
74,611✔
2789
}
2790

2791
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2792
func (c *client) processInboundLeafMsg(msg []byte) {
73,431✔
2793
        // Update statistics
73,431✔
2794
        // The msg includes the CR_LF, so pull back out for accounting.
73,431✔
2795
        c.in.msgs++
73,431✔
2796
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
73,431✔
2797

73,431✔
2798
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
73,431✔
2799

73,431✔
2800
        // Mostly under testing scenarios.
73,431✔
2801
        if srv == nil || acc == nil {
73,433✔
2802
                return
2✔
2803
        }
2✔
2804

2805
        // Match the subscriptions. We will use our own L1 map if
2806
        // it's still valid, avoiding contention on the shared sublist.
2807
        var r *SublistResult
73,429✔
2808
        var ok bool
73,429✔
2809

73,429✔
2810
        genid := atomic.LoadUint64(&c.acc.sl.genid)
73,429✔
2811
        if genid == c.in.genid && c.in.results != nil {
144,575✔
2812
                r, ok = c.in.results[subject]
71,146✔
2813
        } else {
73,429✔
2814
                // Reset our L1 completely.
2,283✔
2815
                c.in.results = make(map[string]*SublistResult)
2,283✔
2816
                c.in.genid = genid
2,283✔
2817
        }
2,283✔
2818

2819
        // Go back to the sublist data structure.
2820
        if !ok {
120,902✔
2821
                r = c.acc.sl.Match(subject)
47,473✔
2822
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
47,473✔
2823
                if len(c.in.results) >= maxResultCacheSize {
48,740✔
2824
                        n := 0
1,267✔
2825
                        for subj := range c.in.results {
43,078✔
2826
                                delete(c.in.results, subj)
41,811✔
2827
                                if n++; n > pruneSize {
43,078✔
2828
                                        break
1,267✔
2829
                                }
2830
                        }
2831
                }
2832
                // Then add the new cache entry.
2833
                c.in.results[subject] = r
47,473✔
2834
        }
2835

2836
        // Collect queue names if needed.
2837
        var qnames [][]byte
73,429✔
2838

73,429✔
2839
        // Check for no interest, short circuit if so.
73,429✔
2840
        // This is the fanout scale.
73,429✔
2841
        if len(r.psubs)+len(r.qsubs) > 0 {
146,602✔
2842
                flag := pmrNoFlag
73,173✔
2843
                // If we have queue subs in this cluster, then if we run in gateway
73,173✔
2844
                // mode and the remote gateways have queue subs, then we need to
73,173✔
2845
                // collect the queue groups this message was sent to so that we
73,173✔
2846
                // exclude them when sending to gateways.
73,173✔
2847
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
73,173✔
2848
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
85,505✔
2849
                        flag |= pmrCollectQueueNames
12,332✔
2850
                }
12,332✔
2851
                // If this is a mapped subject that means the mapped interest
2852
                // is what got us here, but this might not have a queue designation
2853
                // If that is the case, make sure we ignore to process local queue subscribers.
2854
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
73,474✔
2855
                        flag |= pmrIgnoreEmptyQueueFilter
301✔
2856
                }
301✔
2857
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
73,173✔
2858
        }
2859

2860
        // Now deal with gateways
2861
        if c.srv.gateway.enabled {
86,730✔
2862
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,301✔
2863
        }
13,301✔
2864
}
2865

2866
// Handles a subscription permission violation.
2867
// See leafPermViolation() for details.
2868
func (c *client) leafSubPermViolation(subj []byte) {
320✔
2869
        c.leafPermViolation(false, subj)
320✔
2870
}
320✔
2871

2872
// Common function to process publish or subscribe leafnode permission violation.
2873
// Sends the permission violation error to the remote, logs it and closes the connection.
2874
// If this is from a server soliciting, the reconnection will be delayed.
2875
func (c *client) leafPermViolation(pub bool, subj []byte) {
320✔
2876
        if c.isSpokeLeafNode() {
640✔
2877
                // For spokes these are no-ops since the hub server told us our permissions.
320✔
2878
                // We just need to not send these over to the other side since we will get cutoff.
320✔
2879
                return
320✔
2880
        }
320✔
2881
        // FIXME(dlc) ?
2882
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2883
        var action string
×
2884
        if pub {
×
2885
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2886
                action = "Publish"
×
2887
        } else {
×
2888
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2889
                action = "Subscription"
×
2890
        }
×
2891
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2892
        // TODO: add a new close reason that is more appropriate?
×
2893
        c.closeConnection(ProtocolViolation)
×
2894
}
2895

2896
// Invoked from generic processErr() for LEAF connections.
2897
func (c *client) leafProcessErr(errStr string) {
31✔
2898
        // Check if we got a cluster name collision.
31✔
2899
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
35✔
2900
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2901
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2902
                return
4✔
2903
        }
4✔
2904

2905
        // We will look for Loop detected error coming from the other side.
2906
        // If we solicit, set the connect delay.
2907
        if !strings.Contains(errStr, "Loop detected") {
48✔
2908
                return
21✔
2909
        }
21✔
2910
        c.handleLeafNodeLoop(false)
6✔
2911
}
2912

2913
// If this leaf connection solicits, sets the connect delay to the given value,
2914
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2915
// Returns the connection's account name and delay.
2916
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
18✔
2917
        c.mu.Lock()
18✔
2918
        if c.isSolicitedLeafNode() {
29✔
2919
                if s := c.srv; s != nil {
22✔
2920
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
2921
                                delay = srvdelay
4✔
2922
                        }
4✔
2923
                }
2924
                c.leaf.remote.setConnectDelay(delay)
11✔
2925
        }
2926
        accName := c.acc.Name
18✔
2927
        c.mu.Unlock()
18✔
2928
        return accName, delay
18✔
2929
}
2930

2931
// For the given remote Leafnode configuration, this function returns
2932
// if TLS is required, and if so, will return a clone of the TLS Config
2933
// (since some fields will be changed during handshake), the TLS server
2934
// name that is remembered, and the TLS timeout.
2935
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,813✔
2936
        var (
1,813✔
2937
                tlsConfig  *tls.Config
1,813✔
2938
                tlsName    string
1,813✔
2939
                tlsTimeout float64
1,813✔
2940
        )
1,813✔
2941

1,813✔
2942
        remote.RLock()
1,813✔
2943
        defer remote.RUnlock()
1,813✔
2944

1,813✔
2945
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,813✔
2946
        if tlsRequired {
1,901✔
2947
                if remote.TLSConfig != nil {
140✔
2948
                        tlsConfig = remote.TLSConfig.Clone()
52✔
2949
                } else {
88✔
2950
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
36✔
2951
                }
36✔
2952
                tlsName = remote.tlsName
88✔
2953
                tlsTimeout = remote.TLSTimeout
88✔
2954
                if tlsTimeout == 0 {
141✔
2955
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
53✔
2956
                }
53✔
2957
        }
2958

2959
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,813✔
2960
}
2961

2962
// Initiates the LeafNode Websocket connection by:
2963
// - doing the TLS handshake if needed
2964
// - sending the HTTP request
2965
// - waiting for the HTTP response
2966
//
2967
// Since some bufio reader is used to consume the HTTP response, this function
2968
// returns the slice of buffered bytes (if any) so that the readLoop that will
2969
// be started after that consume those first before reading from the socket.
2970
// The boolean
2971
//
2972
// Lock held on entry.
2973
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
2974
        remote.RLock()
43✔
2975
        compress := remote.Websocket.Compression
43✔
2976
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
2977
        noMasking := remote.Websocket.NoMasking
43✔
2978
        infoTimeout := remote.FirstInfoTimeout
43✔
2979
        remote.RUnlock()
43✔
2980
        // Will do the client-side TLS handshake if needed.
43✔
2981
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
2982
        if err != nil {
47✔
2983
                // 0 will indicate that the connection was already closed
4✔
2984
                return nil, 0, err
4✔
2985
        }
4✔
2986

2987
        // For http request, we need the passed URL to contain either http or https scheme.
2988
        scheme := "http"
39✔
2989
        if tlsRequired {
47✔
2990
                scheme = "https"
8✔
2991
        }
8✔
2992
        // We will use the `/leafnode` path to tell the accepting WS server that it should
2993
        // create a LEAF connection, not a CLIENT.
2994
        // In case we use the user's URL path in the future, make sure we append the user's
2995
        // path to our `/leafnode` path.
2996
        lpath := leafNodeWSPath
39✔
2997
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
2998
                if curPath[0] == '/' {
42✔
2999
                        curPath = curPath[1:]
21✔
3000
                }
21✔
3001
                lpath = path.Join(curPath, lpath)
21✔
3002
        } else {
18✔
3003
                lpath = lpath[1:]
18✔
3004
        }
18✔
3005
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3006
        u, _ := url.Parse(ustr)
39✔
3007
        req := &http.Request{
39✔
3008
                Method:     "GET",
39✔
3009
                URL:        u,
39✔
3010
                Proto:      "HTTP/1.1",
39✔
3011
                ProtoMajor: 1,
39✔
3012
                ProtoMinor: 1,
39✔
3013
                Header:     make(http.Header),
39✔
3014
                Host:       u.Host,
39✔
3015
        }
39✔
3016
        wsKey, err := wsMakeChallengeKey()
39✔
3017
        if err != nil {
39✔
3018
                return nil, WriteError, err
×
3019
        }
×
3020

3021
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3022
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3023
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3024
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3025
        if compress {
48✔
3026
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3027
        }
9✔
3028
        if noMasking {
49✔
3029
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3030
        }
10✔
3031
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3032
        if err := req.Write(c.nc); err != nil {
39✔
3033
                return nil, WriteError, err
×
3034
        }
×
3035

3036
        var resp *http.Response
39✔
3037

39✔
3038
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3039
        resp, err = http.ReadResponse(br, req)
39✔
3040
        if err == nil &&
39✔
3041
                (resp.StatusCode != 101 ||
39✔
3042
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3043
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3044
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3045

1✔
3046
                err = fmt.Errorf("invalid websocket connection")
1✔
3047
        }
1✔
3048
        // Check compression extension...
3049
        if err == nil && c.ws.compress {
48✔
3050
                // Check that not only permessage-deflate extension is present, but that
9✔
3051
                // we also have server and client no context take over.
9✔
3052
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3053

9✔
3054
                // If server does not support compression, then simply disable it in our side.
9✔
3055
                if !srvCompress {
13✔
3056
                        c.ws.compress = false
4✔
3057
                } else if !noCtxTakeover {
9✔
3058
                        err = fmt.Errorf("compression negotiation error")
×
3059
                }
×
3060
        }
3061
        // Same for no masking...
3062
        if err == nil && noMasking {
49✔
3063
                // Check if server accepts no masking
10✔
3064
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3065
                        // Nope, need to mask our writes as any client would do.
1✔
3066
                        c.ws.maskwrite = true
1✔
3067
                }
1✔
3068
        }
3069
        if resp != nil {
67✔
3070
                resp.Body.Close()
28✔
3071
        }
28✔
3072
        if err != nil {
51✔
3073
                return nil, ReadError, err
12✔
3074
        }
12✔
3075
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3076

27✔
3077
        var preBuf []byte
27✔
3078
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3079
        if n := br.Buffered(); n != 0 {
28✔
3080
                preBuf, _ = br.Peek(n)
1✔
3081
        }
1✔
3082
        return preBuf, 0, nil
27✔
3083
}
3084

3085
const connectProcessTimeout = 2 * time.Second
3086

3087
// This is invoked for remote LEAF remote connections after processing the INFO
3088
// protocol.
3089
func (s *Server) leafNodeResumeConnectProcess(c *client) {
632✔
3090
        clusterName := s.ClusterName()
632✔
3091

632✔
3092
        c.mu.Lock()
632✔
3093
        if c.isClosed() {
632✔
3094
                c.mu.Unlock()
×
3095
                return
×
3096
        }
×
3097
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
634✔
3098
                c.mu.Unlock()
2✔
3099
                c.closeConnection(WriteError)
2✔
3100
                return
2✔
3101
        }
2✔
3102

3103
        // Spin up the write loop.
3104
        s.startGoRoutine(func() { c.writeLoop() })
1,260✔
3105

3106
        // timeout leafNodeFinishConnectProcess
3107
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
630✔
3108
                c.mu.Lock()
×
3109
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3110
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3111
                        c.mu.Unlock()
×
3112
                        return
×
3113
                }
×
3114
                clearTimer(&c.ping.tmr)
×
3115
                closed := c.isClosed()
×
3116
                c.mu.Unlock()
×
3117
                if !closed {
×
3118
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3119
                        c.closeConnection(StaleConnection)
×
3120
                }
×
3121
        })
3122
        c.mu.Unlock()
630✔
3123
        c.Debugf("Remote leafnode connect msg sent")
630✔
3124
}
3125

3126
// This is invoked for remote LEAF connections after processing the INFO
3127
// protocol and leafNodeResumeConnectProcess.
3128
// This will send LS+ the CONNECT protocol and register the leaf node.
3129
func (s *Server) leafNodeFinishConnectProcess(c *client) {
610✔
3130
        c.mu.Lock()
610✔
3131
        if !c.flags.setIfNotSet(connectProcessFinished) {
610✔
3132
                c.mu.Unlock()
×
3133
                return
×
3134
        }
×
3135
        if c.isClosed() {
610✔
3136
                c.mu.Unlock()
×
3137
                s.removeLeafNodeConnection(c)
×
3138
                return
×
3139
        }
×
3140
        remote := c.leaf.remote
610✔
3141
        // Check if we will need to send the system connect event.
610✔
3142
        remote.RLock()
610✔
3143
        sendSysConnectEvent := remote.Hub
610✔
3144
        remote.RUnlock()
610✔
3145

610✔
3146
        // Capture account before releasing lock
610✔
3147
        acc := c.acc
610✔
3148
        // cancel connectProcessTimeout
610✔
3149
        clearTimer(&c.ping.tmr)
610✔
3150
        c.mu.Unlock()
610✔
3151

610✔
3152
        // Make sure we register with the account here.
610✔
3153
        if err := c.registerWithAccount(acc); err != nil {
612✔
3154
                if err == ErrTooManyAccountConnections {
2✔
3155
                        c.maxAccountConnExceeded()
×
3156
                        return
×
3157
                } else if err == ErrLeafNodeLoop {
4✔
3158
                        c.handleLeafNodeLoop(true)
2✔
3159
                        return
2✔
3160
                }
2✔
3161
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3162
                c.closeConnection(ProtocolViolation)
×
3163
                return
×
3164
        }
3165
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
608✔
3166
        s.initLeafNodeSmapAndSendSubs(c)
608✔
3167
        if sendSysConnectEvent {
614✔
3168
                s.sendLeafNodeConnect(acc)
6✔
3169
        }
6✔
3170

3171
        // The above functions are not atomically under the client
3172
        // lock doing those operations. It is possible - since we
3173
        // have started the read/write loops - that the connection
3174
        // is closed before or in between. This would leave the
3175
        // closed LN connection possible registered with the account
3176
        // and/or the server's leafs map. So check if connection
3177
        // is closed, and if so, manually cleanup.
3178
        c.mu.Lock()
608✔
3179
        closed := c.isClosed()
608✔
3180
        if !closed {
1,216✔
3181
                c.setFirstPingTimer()
608✔
3182
        }
608✔
3183
        c.mu.Unlock()
608✔
3184
        if closed {
608✔
3185
                s.removeLeafNodeConnection(c)
×
3186
                if prev := acc.removeClient(c); prev == 1 {
×
3187
                        s.decActiveAccounts()
×
3188
                }
×
3189
        }
3190
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc