• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 15339392784

29 May 2025 08:38AM UTC coverage: 85.689% (+0.05%) from 85.641%
15339392784

push

github

web-flow
[FIXED] LeafNode: Unable to reply if origin originates from super-cluster (#6931)

If a request originates from cluster A and is routed to cluster B then
to a leafnode where a service is running, sending back the reply could
fail if the leafnode has restrictive publish permissions.

Resolves #6894

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

69965 of 81650 relevant lines covered (85.69%)

367460.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.59
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
2,000✔
120
        return c.kind == LEAF && c.leaf.remote != nil
2,000✔
121
}
2,000✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
6,307,697✔
126
        return c.kind == LEAF && c.leaf.isSpoke
6,307,697✔
127
}
6,307,697✔
128

129
func (c *client) isHubLeafNode() bool {
17,380✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
17,380✔
131
}
17,380✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
545✔
135
        sysAccName := _EMPTY_
545✔
136
        sAcc := s.SystemAccount()
545✔
137
        if sAcc != nil {
1,067✔
138
                sysAccName = sAcc.Name
522✔
139
        }
522✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,229✔
141
                s.mu.Lock()
684✔
142
                remote := newLeafNodeCfg(r)
684✔
143
                creds := remote.Credentials
684✔
144
                accName := remote.LocalAccount
684✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
684✔
146
                // Print notice if
684✔
147
                if isSysAccRemote {
781✔
148
                        if len(remote.DenyExports) > 0 {
98✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
98✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
684✔
156
                if creds != _EMPTY_ {
731✔
157
                        contents, err := os.ReadFile(creds)
47✔
158
                        defer wipeSlice(contents)
47✔
159
                        if err != nil {
47✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
47✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
47✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
47✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
51✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
43✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
49✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
684✔
178
        }
179
        for _, r := range remotes {
1,229✔
180
                remote := addRemote(r, r.LocalAccount == sysAccName)
684✔
181
                s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,368✔
182
        }
183
}
184

185
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
1,790✔
186
        for _, ri := range s.getOpts().LeafNode.Remotes {
3,940✔
187
                // FIXME(dlc) - What about auth changes?
2,150✔
188
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
3,940✔
189
                        return true
1,790✔
190
                }
1,790✔
191
        }
192
        return false
×
193
}
194

195
// Ensure that leafnode is properly configured.
196
func validateLeafNode(o *Options) error {
8,032✔
197
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,034✔
198
                return err
2✔
199
        }
2✔
200

201
        // Users can bind to any local account, if its empty we will assume the $G account.
202
        for _, r := range o.LeafNode.Remotes {
8,739✔
203
                if r.LocalAccount == _EMPTY_ {
1,142✔
204
                        r.LocalAccount = globalAccountName
433✔
205
                }
433✔
206
        }
207

208
        // In local config mode, check that leafnode configuration refers to accounts that exist.
209
        if len(o.TrustedOperators) == 0 {
15,752✔
210
                accNames := map[string]struct{}{}
7,722✔
211
                for _, a := range o.Accounts {
16,358✔
212
                        accNames[a.Name] = struct{}{}
8,636✔
213
                }
8,636✔
214
                // global account is always created
215
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,722✔
216
                // in the context of leaf nodes, empty account means global account
7,722✔
217
                accNames[_EMPTY_] = struct{}{}
7,722✔
218
                // system account either exists or, if not disabled, will be created
7,722✔
219
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,758✔
220
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,036✔
221
                }
6,036✔
222
                checkAccountExists := func(accName string, cfgType string) error {
16,159✔
223
                        if _, ok := accNames[accName]; !ok {
8,439✔
224
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
225
                        }
2✔
226
                        return nil
8,435✔
227
                }
228
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,723✔
229
                        return err
1✔
230
                }
1✔
231
                for _, lu := range o.LeafNode.Users {
7,731✔
232
                        if lu.Account == nil { // means global account
13✔
233
                                continue
3✔
234
                        }
235
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
236
                                return err
×
237
                        }
×
238
                }
239
                for _, r := range o.LeafNode.Remotes {
8,429✔
240
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
709✔
241
                                return err
1✔
242
                        }
1✔
243
                }
244
        } else {
308✔
245
                if len(o.LeafNode.Users) != 0 {
309✔
246
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
247
                }
1✔
248
                for _, r := range o.LeafNode.Remotes {
308✔
249
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
250
                                return fmt.Errorf(
1✔
251
                                        "operator mode requires account nkeys in remotes. " +
1✔
252
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
253
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
254
                        }
1✔
255
                }
256
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
307✔
257
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
258
                }
1✔
259
        }
260

261
        // Validate compression settings
262
        if o.LeafNode.Compression.Mode != _EMPTY_ {
11,640✔
263
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,620✔
264
                        return err
5✔
265
                }
5✔
266
        }
267

268
        // If a remote has a websocket scheme, all need to have it.
269
        for _, rcfg := range o.LeafNode.Remotes {
8,727✔
270
                if len(rcfg.URLs) >= 2 {
921✔
271
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
214✔
272
                        for i := 1; i < len(rcfg.URLs); i++ {
673✔
273
                                u := rcfg.URLs[i]
459✔
274
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
466✔
275
                                        ok = false
7✔
276
                                        break
7✔
277
                                }
278
                        }
279
                        if !ok {
221✔
280
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
281
                        }
7✔
282
                }
283
                // Validate compression settings
284
                if rcfg.Compression.Mode != _EMPTY_ {
1,400✔
285
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
705✔
286
                                return err
5✔
287
                        }
5✔
288
                }
289
        }
290

291
        if o.LeafNode.Port == 0 {
12,996✔
292
                return nil
4,988✔
293
        }
4,988✔
294

295
        // If MinVersion is defined, check that it is valid.
296
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,024✔
297
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        // The checks below will be done only when detecting that we are configured
303
        // with gateways. So if an option validation needs to be done regardless,
304
        // it MUST be done before this point!
305

306
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
5,355✔
307
                return nil
2,337✔
308
        }
2,337✔
309
        // If we are here we have both leaf nodes and gateways defined, make sure there
310
        // is a system account defined.
311
        if o.SystemAccount == _EMPTY_ {
682✔
312
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
313
        }
1✔
314
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
680✔
315
                return fmt.Errorf("leafnode: %v", err)
×
316
        }
×
317
        return nil
680✔
318
}
319

320
func checkLeafMinVersionConfig(mv string) error {
8✔
321
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
322
                if err != nil {
6✔
323
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
324
                } else {
4✔
325
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
326
                }
2✔
327
        }
328
        return nil
4✔
329
}
330

331
// Used to validate user names in LeafNode configuration.
332
// - rejects mix of single and multiple users.
333
// - rejects duplicate user names.
334
func validateLeafNodeAuthOptions(o *Options) error {
8,081✔
335
        if len(o.LeafNode.Users) == 0 {
16,142✔
336
                return nil
8,061✔
337
        }
8,061✔
338
        if o.LeafNode.Username != _EMPTY_ {
22✔
339
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
340
        }
2✔
341
        if o.LeafNode.Nkey != _EMPTY_ {
18✔
342
                return fmt.Errorf("can not have a single nkey and a users array")
×
343
        }
×
344
        users := map[string]struct{}{}
18✔
345
        for _, u := range o.LeafNode.Users {
42✔
346
                if _, exists := users[u.Username]; exists {
26✔
347
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
348
                }
2✔
349
                users[u.Username] = struct{}{}
22✔
350
        }
351
        return nil
16✔
352
}
353

354
// Update remote LeafNode TLS configurations after a config reload.
355
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
11✔
356
        max := len(opts.LeafNode.Remotes)
11✔
357
        if max == 0 {
11✔
358
                return
×
359
        }
×
360

361
        s.mu.RLock()
11✔
362
        defer s.mu.RUnlock()
11✔
363

11✔
364
        // Changes in the list of remote leaf nodes is not supported.
11✔
365
        // However, make sure that we don't go over the arrays.
11✔
366
        if len(s.leafRemoteCfgs) < max {
11✔
367
                max = len(s.leafRemoteCfgs)
×
368
        }
×
369
        for i := 0; i < max; i++ {
22✔
370
                ro := opts.LeafNode.Remotes[i]
11✔
371
                cfg := s.leafRemoteCfgs[i]
11✔
372
                if ro.TLSConfig != nil {
13✔
373
                        cfg.Lock()
2✔
374
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
375
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
376
                        cfg.Unlock()
2✔
377
                }
2✔
378
        }
379
}
380

381
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
222✔
382
        delay := s.getOpts().LeafNode.ReconnectInterval
222✔
383
        select {
222✔
384
        case <-time.After(delay):
176✔
385
        case <-s.quitCh:
46✔
386
                s.grWG.Done()
46✔
387
                return
46✔
388
        }
389
        s.connectToRemoteLeafNode(remote, false)
176✔
390
}
391

392
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
393
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
684✔
394
        cfg := &leafNodeCfg{
684✔
395
                RemoteLeafOpts: remote,
684✔
396
                urls:           make([]*url.URL, 0, len(remote.URLs)),
684✔
397
        }
684✔
398
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
692✔
399
                perms := &Permissions{}
8✔
400
                if len(remote.DenyExports) > 0 {
16✔
401
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
402
                }
8✔
403
                if len(remote.DenyImports) > 0 {
15✔
404
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
405
                }
7✔
406
                cfg.perms = perms
8✔
407
        }
408
        // Start with the one that is configured. We will add to this
409
        // array when receiving async leafnode INFOs.
410
        cfg.urls = append(cfg.urls, cfg.URLs...)
684✔
411
        // If allowed to randomize, do it on our copy of URLs
684✔
412
        if !remote.NoRandomize {
1,366✔
413
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,098✔
414
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
416✔
415
                })
416✔
416
        }
417
        // If we are TLS make sure we save off a proper servername if possible.
418
        // Do same for user/password since we may need them to connect to
419
        // a bare URL that we get from INFO protocol.
420
        for _, u := range cfg.urls {
1,814✔
421
                cfg.saveTLSHostname(u)
1,130✔
422
                cfg.saveUserPassword(u)
1,130✔
423
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,130✔
424
                // config, mark that we should be using TLS anyway.
1,130✔
425
                if !cfg.TLS && isWSSURL(u) {
1,131✔
426
                        cfg.TLS = true
1✔
427
                }
1✔
428
        }
429
        return cfg
684✔
430
}
431

432
// Will pick an URL from the list of available URLs.
433
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
1,031✔
434
        cfg.Lock()
1,031✔
435
        defer cfg.Unlock()
1,031✔
436
        // If the current URL is the first in the list and we have more than
1,031✔
437
        // one URL, then move that one to end of the list.
1,031✔
438
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
1,171✔
439
                first := cfg.urls[0]
140✔
440
                copy(cfg.urls, cfg.urls[1:])
140✔
441
                cfg.urls[len(cfg.urls)-1] = first
140✔
442
        }
140✔
443
        cfg.curURL = cfg.urls[0]
1,031✔
444
        return cfg.curURL
1,031✔
445
}
446

447
// Returns the current URL
448
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
81✔
449
        cfg.RLock()
81✔
450
        defer cfg.RUnlock()
81✔
451
        return cfg.curURL
81✔
452
}
81✔
453

454
// Returns how long the server should wait before attempting
455
// to solicit a remote leafnode connection.
456
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
860✔
457
        cfg.RLock()
860✔
458
        delay := cfg.connDelay
860✔
459
        cfg.RUnlock()
860✔
460
        return delay
860✔
461
}
860✔
462

463
// Sets the connect delay.
464
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
158✔
465
        cfg.Lock()
158✔
466
        cfg.connDelay = delay
158✔
467
        cfg.Unlock()
158✔
468
}
158✔
469

470
// Ensure that non-exported options (used in tests) have
471
// been properly set.
472
func (s *Server) setLeafNodeNonExportedOptions() {
6,503✔
473
        opts := s.getOpts()
6,503✔
474
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,503✔
475
        if s.leafNodeOpts.dialTimeout == 0 {
13,005✔
476
                // Use same timeouts as routes for now.
6,502✔
477
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,502✔
478
        }
6,502✔
479
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,503✔
480
        if s.leafNodeOpts.resolver == nil {
13,002✔
481
                s.leafNodeOpts.resolver = net.DefaultResolver
6,499✔
482
        }
6,499✔
483
}
484

485
const sharedSysAccDelay = 250 * time.Millisecond
486

487
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
860✔
488
        defer s.grWG.Done()
860✔
489

860✔
490
        if remote == nil || len(remote.URLs) == 0 {
860✔
491
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
492
                return
×
493
        }
×
494

495
        opts := s.getOpts()
860✔
496
        reconnectDelay := opts.LeafNode.ReconnectInterval
860✔
497
        s.mu.RLock()
860✔
498
        dialTimeout := s.leafNodeOpts.dialTimeout
860✔
499
        resolver := s.leafNodeOpts.resolver
860✔
500
        var isSysAcc bool
860✔
501
        if s.eventsEnabled() {
1,691✔
502
                isSysAcc = remote.LocalAccount == s.sys.account.Name
831✔
503
        }
831✔
504
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
860✔
505
        s.mu.RUnlock()
860✔
506

860✔
507
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
860✔
508
        if firstConnect && isSysAcc && !s.standAloneMode() {
934✔
509
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
74✔
510
                remote.setConnectDelay(sharedSysAccDelay)
74✔
511
        }
74✔
512

513
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
941✔
514
                select {
81✔
515
                case <-time.After(connDelay):
72✔
516
                case <-s.quitCh:
9✔
517
                        return
9✔
518
                }
519
                remote.setConnectDelay(0)
72✔
520
        }
521

522
        var conn net.Conn
851✔
523

851✔
524
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
851✔
525

851✔
526
        attempts := 0
851✔
527

851✔
528
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
1,882✔
529
                rURL := remote.pickNextURL()
1,031✔
530
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
1,031✔
531
                if err == nil {
2,055✔
532
                        var ipStr string
1,024✔
533
                        if url != rURL.Host {
1,095✔
534
                                ipStr = fmt.Sprintf(" (%s)", url)
71✔
535
                        }
71✔
536
                        // Some test may want to disable remotes from connecting
537
                        if s.isLeafConnectDisabled() {
1,153✔
538
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
129✔
539
                                err = ErrLeafNodeDisabled
129✔
540
                        } else {
1,024✔
541
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
895✔
542
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
895✔
543
                        }
895✔
544
                }
545
                if err != nil {
1,303✔
546
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
272✔
547
                        delay := reconnectDelay + jitter
272✔
548
                        attempts++
272✔
549
                        if s.shouldReportConnectErr(firstConnect, attempts) {
530✔
550
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
258✔
551
                        } else {
272✔
552
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
14✔
553
                        }
14✔
554
                        remote.Lock()
272✔
555
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
272✔
556
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
280✔
557
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
558
                                        s.checkJetStreamMigrate(remote)
8✔
559
                                })
8✔
560
                        }
561
                        remote.Unlock()
272✔
562
                        select {
272✔
563
                        case <-s.quitCh:
92✔
564
                                remote.cancelMigrateTimer()
92✔
565
                                return
92✔
566
                        case <-time.After(delay):
180✔
567
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
180✔
568
                                // This will be used if JetStreamClusterMigrateDelay was not set
180✔
569
                                if jetstreamMigrateDelay == 0 {
289✔
570
                                        s.checkJetStreamMigrate(remote)
109✔
571
                                }
109✔
572
                                continue
180✔
573
                        }
574
                }
575
                remote.cancelMigrateTimer()
759✔
576
                if !s.remoteLeafNodeStillValid(remote) {
759✔
577
                        conn.Close()
×
578
                        return
×
579
                }
×
580

581
                // We have a connection here to a remote server.
582
                // Go ahead and create our leaf node and return.
583
                s.createLeafNode(conn, rURL, remote, nil)
759✔
584

759✔
585
                // Clear any observer states if we had them.
759✔
586
                s.clearObserverState(remote)
759✔
587

759✔
588
                return
759✔
589
        }
590
}
591

592
func (cfg *leafNodeCfg) cancelMigrateTimer() {
851✔
593
        cfg.Lock()
851✔
594
        stopAndClearTimer(&cfg.jsMigrateTimer)
851✔
595
        cfg.Unlock()
851✔
596
}
851✔
597

598
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
599
func (s *Server) clearObserverState(remote *leafNodeCfg) {
759✔
600
        s.mu.RLock()
759✔
601
        accName := remote.LocalAccount
759✔
602
        s.mu.RUnlock()
759✔
603

759✔
604
        acc, err := s.LookupAccount(accName)
759✔
605
        if err != nil {
761✔
606
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
607
                return
2✔
608
        }
2✔
609

610
        acc.jscmMu.Lock()
757✔
611
        defer acc.jscmMu.Unlock()
757✔
612

757✔
613
        // Walk all streams looking for any clustered stream, skip otherwise.
757✔
614
        for _, mset := range acc.streams() {
773✔
615
                node := mset.raftNode()
16✔
616
                if node == nil {
24✔
617
                        // Not R>1
8✔
618
                        continue
8✔
619
                }
620
                // Check consumers
621
                for _, o := range mset.getConsumers() {
10✔
622
                        if n := o.raftNode(); n != nil {
4✔
623
                                // Ensure we can become a leader again.
2✔
624
                                n.SetObserver(false)
2✔
625
                        }
2✔
626
                }
627
                // Ensure we can not become a leader again.
628
                node.SetObserver(false)
8✔
629
        }
630
}
631

632
// Check to see if we should migrate any assets from this account.
633
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
117✔
634
        s.mu.RLock()
117✔
635
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
117✔
636
        s.mu.RUnlock()
117✔
637

117✔
638
        if !shouldMigrate {
168✔
639
                return
51✔
640
        }
51✔
641

642
        acc, err := s.LookupAccount(accName)
66✔
643
        if err != nil {
66✔
644
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
645
                return
×
646
        }
×
647

648
        acc.jscmMu.Lock()
66✔
649
        defer acc.jscmMu.Unlock()
66✔
650

66✔
651
        // Walk all streams looking for any clustered stream, skip otherwise.
66✔
652
        // If we are the leader force stepdown.
66✔
653
        for _, mset := range acc.streams() {
100✔
654
                node := mset.raftNode()
34✔
655
                if node == nil {
34✔
656
                        // Not R>1
×
657
                        continue
×
658
                }
659
                // Collect any consumers
660
                for _, o := range mset.getConsumers() {
55✔
661
                        if n := o.raftNode(); n != nil {
42✔
662
                                n.StepDown()
21✔
663
                                // Ensure we can not become a leader while in this state.
21✔
664
                                n.SetObserver(true)
21✔
665
                        }
21✔
666
                }
667
                // Stepdown if this stream was leader.
668
                node.StepDown()
34✔
669
                // Ensure we can not become a leader while in this state.
34✔
670
                node.SetObserver(true)
34✔
671
        }
672
}
673

674
// Helper for checking.
675
func (s *Server) isLeafConnectDisabled() bool {
1,024✔
676
        s.mu.RLock()
1,024✔
677
        defer s.mu.RUnlock()
1,024✔
678
        return s.leafDisableConnect
1,024✔
679
}
1,024✔
680

681
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
682
// come from the server we connect to.
683
//
684
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
685
// However, this was causing failures for users that did not set the scheme (and
686
// their remote connections did not have a tls{} block).
687
// We now save the host name regardless in case the remote returns an INFO indicating
688
// that TLS is required.
689
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,737✔
690
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,757✔
691
                cfg.tlsName = u.Hostname()
20✔
692
        }
20✔
693
}
694

695
// Save off the username/password for when we connect using a bare URL
696
// that we get from the INFO protocol.
697
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,130✔
698
        if cfg.username == _EMPTY_ && u.User != nil {
1,394✔
699
                cfg.username = u.User.Username()
264✔
700
                cfg.password, _ = u.User.Password()
264✔
701
        }
264✔
702
}
703

704
// This starts the leafnode accept loop in a go routine, unless it
705
// is detected that the server has already been shutdown.
706
func (s *Server) startLeafNodeAcceptLoop() {
2,987✔
707
        // Snapshot server options.
2,987✔
708
        opts := s.getOpts()
2,987✔
709

2,987✔
710
        port := opts.LeafNode.Port
2,987✔
711
        if port == -1 {
5,825✔
712
                port = 0
2,838✔
713
        }
2,838✔
714

715
        if s.isShuttingDown() {
2,988✔
716
                return
1✔
717
        }
1✔
718

719
        s.mu.Lock()
2,986✔
720
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
2,986✔
721
        l, e := natsListen("tcp", hp)
2,986✔
722
        s.leafNodeListenerErr = e
2,986✔
723
        if e != nil {
2,986✔
724
                s.mu.Unlock()
×
725
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
726
                return
×
727
        }
×
728

729
        s.Noticef("Listening for leafnode connections on %s",
2,986✔
730
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
2,986✔
731

2,986✔
732
        tlsRequired := opts.LeafNode.TLSConfig != nil
2,986✔
733
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
2,986✔
734
        // Do not set compression in this Info object, it would possibly cause
2,986✔
735
        // issues when sending asynchronous INFO to the remote.
2,986✔
736
        info := Info{
2,986✔
737
                ID:            s.info.ID,
2,986✔
738
                Name:          s.info.Name,
2,986✔
739
                Version:       s.info.Version,
2,986✔
740
                GitCommit:     gitCommit,
2,986✔
741
                GoVersion:     runtime.Version(),
2,986✔
742
                AuthRequired:  true,
2,986✔
743
                TLSRequired:   tlsRequired,
2,986✔
744
                TLSVerify:     tlsVerify,
2,986✔
745
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
2,986✔
746
                Headers:       s.supportsHeaders(),
2,986✔
747
                JetStream:     opts.JetStream,
2,986✔
748
                Domain:        opts.JetStreamDomain,
2,986✔
749
                Proto:         s.getServerProto(),
2,986✔
750
                InfoOnConnect: true,
2,986✔
751
        }
2,986✔
752
        // If we have selected a random port...
2,986✔
753
        if port == 0 {
5,823✔
754
                // Write resolved port back to options.
2,837✔
755
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
2,837✔
756
        }
2,837✔
757

758
        s.leafNodeInfo = info
2,986✔
759
        // Possibly override Host/Port and set IP based on Cluster.Advertise
2,986✔
760
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
2,986✔
761
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
762
                l.Close()
×
763
                s.mu.Unlock()
×
764
                return
×
765
        }
×
766
        s.leafURLsMap[s.leafNodeInfo.IP]++
2,986✔
767
        s.generateLeafNodeInfoJSON()
2,986✔
768

2,986✔
769
        // Setup state that can enable shutdown
2,986✔
770
        s.leafNodeListener = l
2,986✔
771

2,986✔
772
        // As of now, a server that does not have remotes configured would
2,986✔
773
        // never solicit a connection, so we should not have to warn if
2,986✔
774
        // InsecureSkipVerify is set in main LeafNodes config (since
2,986✔
775
        // this TLS setting matters only when soliciting a connection).
2,986✔
776
        // Still, warn if insecure is set in any of LeafNode block.
2,986✔
777
        // We need to check remotes, even if tls is not required on accept.
2,986✔
778
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
2,986✔
779
        if !warn {
5,968✔
780
                for _, r := range opts.LeafNode.Remotes {
3,139✔
781
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
782
                                warn = true
1✔
783
                                break
1✔
784
                        }
785
                }
786
        }
787
        if warn {
2,991✔
788
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
789
        }
5✔
790
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,768✔
791
        s.mu.Unlock()
2,986✔
792
}
793

794
// RegEx to match a creds file with user JWT and Seed.
795
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
796

797
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
798
// Lock should be held entering here.
799
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
638✔
800
        // We support basic user/pass and operator based user JWT with signatures.
638✔
801
        cinfo := leafConnectInfo{
638✔
802
                Version:       VERSION,
638✔
803
                ID:            c.srv.info.ID,
638✔
804
                Domain:        c.srv.info.Domain,
638✔
805
                Name:          c.srv.info.Name,
638✔
806
                Hub:           c.leaf.remote.Hub,
638✔
807
                Cluster:       clusterName,
638✔
808
                Headers:       headers,
638✔
809
                JetStream:     c.acc.jetStreamConfigured(),
638✔
810
                DenyPub:       c.leaf.remote.DenyImports,
638✔
811
                Compression:   c.leaf.compression,
638✔
812
                RemoteAccount: c.acc.GetName(),
638✔
813
                Proto:         c.srv.getServerProto(),
638✔
814
        }
638✔
815

638✔
816
        // If a signature callback is specified, this takes precedence over anything else.
638✔
817
        if cb := c.leaf.remote.SignatureCB; cb != nil {
641✔
818
                nonce := c.nonce
3✔
819
                c.mu.Unlock()
3✔
820
                jwt, sigraw, err := cb(nonce)
3✔
821
                c.mu.Lock()
3✔
822
                if err == nil && c.isClosed() {
4✔
823
                        err = ErrConnectionClosed
1✔
824
                }
1✔
825
                if err != nil {
5✔
826
                        c.Errorf("Error signing the nonce: %v", err)
2✔
827
                        return err
2✔
828
                }
2✔
829
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
830
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
831

832
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
684✔
833
                // Check for credentials first, that will take precedence..
49✔
834
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
49✔
835
                contents, err := os.ReadFile(creds)
49✔
836
                if err != nil {
49✔
837
                        c.Errorf("%v", err)
×
838
                        return err
×
839
                }
×
840
                defer wipeSlice(contents)
49✔
841
                items := credsRe.FindAllSubmatch(contents, -1)
49✔
842
                if len(items) < 2 {
49✔
843
                        c.Errorf("Credentials file malformed")
×
844
                        return err
×
845
                }
×
846
                // First result should be the user JWT.
847
                // We copy here so that the file containing the seed will be wiped appropriately.
848
                raw := items[0][1]
49✔
849
                tmp := make([]byte, len(raw))
49✔
850
                copy(tmp, raw)
49✔
851
                // Seed is second item.
49✔
852
                kp, err := nkeys.FromSeed(items[1][1])
49✔
853
                if err != nil {
49✔
854
                        c.Errorf("Credentials file has malformed seed")
×
855
                        return err
×
856
                }
×
857
                // Wipe our key on exit.
858
                defer kp.Wipe()
49✔
859

49✔
860
                sigraw, _ := kp.Sign(c.nonce)
49✔
861
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
49✔
862
                cinfo.JWT = bytesToString(tmp)
49✔
863
                cinfo.Sig = sig
49✔
864
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
588✔
865
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
866
                if err != nil {
2✔
867
                        c.Errorf("Remote nkey has malformed seed")
×
868
                        return err
×
869
                }
×
870
                // Wipe our key on exit.
871
                defer kp.Wipe()
2✔
872
                sigraw, _ := kp.Sign(c.nonce)
2✔
873
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
874
                pkey, _ := kp.PublicKey()
2✔
875
                cinfo.Nkey = pkey
2✔
876
                cinfo.Sig = sig
2✔
877
        }
878
        // In addition, and this is to allow auth callout, set user/password or
879
        // token if applicable.
880
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
917✔
881
                // For backward compatibility, if only username is provided, set both
281✔
882
                // Token and User, not just Token.
281✔
883
                cinfo.User = userInfo.Username()
281✔
884
                var ok bool
281✔
885
                cinfo.Pass, ok = userInfo.Password()
281✔
886
                if !ok {
287✔
887
                        cinfo.Token = cinfo.User
6✔
888
                }
6✔
889
        } else if c.leaf.remote.username != _EMPTY_ {
358✔
890
                cinfo.User = c.leaf.remote.username
3✔
891
                cinfo.Pass = c.leaf.remote.password
3✔
892
        }
3✔
893
        b, err := json.Marshal(cinfo)
636✔
894
        if err != nil {
636✔
895
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
896
                return err
×
897
        }
×
898
        // Although this call is made before the writeLoop is created,
899
        // we don't really need to send in place. The protocol will be
900
        // sent out by the writeLoop.
901
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
636✔
902
        return nil
636✔
903
}
904

905
// Makes a deep copy of the LeafNode Info structure.
906
// The server lock is held on entry.
907
func (s *Server) copyLeafNodeInfo() *Info {
2,540✔
908
        clone := s.leafNodeInfo
2,540✔
909
        // Copy the array of urls.
2,540✔
910
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,610✔
911
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,070✔
912
        }
2,070✔
913
        return &clone
2,540✔
914
}
915

916
// Adds a LeafNode URL that we get when a route connects to the Info structure.
917
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
918
// Returns a boolean indicating if the URL was added or not.
919
// Server lock is held on entry
920
func (s *Server) addLeafNodeURL(urlStr string) bool {
5,901✔
921
        if s.leafURLsMap.addUrl(urlStr) {
11,797✔
922
                s.generateLeafNodeInfoJSON()
5,896✔
923
                return true
5,896✔
924
        }
5,896✔
925
        return false
5✔
926
}
927

928
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
929
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
930
// Returns a boolean indicating if the URL was removed or not.
931
// Server lock is held on entry.
932
func (s *Server) removeLeafNodeURL(urlStr string) bool {
5,901✔
933
        // Don't need to do this if we are removing the route connection because
5,901✔
934
        // we are shuting down...
5,901✔
935
        if s.isShuttingDown() {
9,026✔
936
                return false
3,125✔
937
        }
3,125✔
938
        if s.leafURLsMap.removeUrl(urlStr) {
5,548✔
939
                s.generateLeafNodeInfoJSON()
2,772✔
940
                return true
2,772✔
941
        }
2,772✔
942
        return false
4✔
943
}
944

945
// Server lock is held on entry
946
func (s *Server) generateLeafNodeInfoJSON() {
11,654✔
947
        s.leafNodeInfo.Cluster = s.cachedClusterName()
11,654✔
948
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
11,654✔
949
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
11,654✔
950
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
11,654✔
951
}
11,654✔
952

953
// Sends an async INFO protocol so that the connected servers can update
954
// their list of LeafNode urls.
955
func (s *Server) sendAsyncLeafNodeInfo() {
8,668✔
956
        for _, c := range s.leafs {
8,764✔
957
                c.mu.Lock()
96✔
958
                c.enqueueProto(s.leafNodeInfoJSON)
96✔
959
                c.mu.Unlock()
96✔
960
        }
96✔
961
}
962

963
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
964
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,568✔
965
        // Snapshot server options.
1,568✔
966
        opts := s.getOpts()
1,568✔
967

1,568✔
968
        maxPay := int32(opts.MaxPayload)
1,568✔
969
        maxSubs := int32(opts.MaxSubs)
1,568✔
970
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,568✔
971
        if maxSubs == 0 {
3,135✔
972
                maxSubs = -1
1,567✔
973
        }
1,567✔
974
        now := time.Now().UTC()
1,568✔
975

1,568✔
976
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,568✔
977
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,568✔
978
        c.leaf = &leaf{}
1,568✔
979

1,568✔
980
        // For accepted LN connections, ws will be != nil if it was accepted
1,568✔
981
        // through the Websocket port.
1,568✔
982
        c.ws = ws
1,568✔
983

1,568✔
984
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,568✔
985
        // a remote Leaf Node connection as a websocket connection.
1,568✔
986
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,611✔
987
                remote.RLock()
43✔
988
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
989
                remote.RUnlock()
43✔
990
        }
43✔
991

992
        // Determines if we are soliciting the connection or not.
993
        var solicited bool
1,568✔
994
        var acc *Account
1,568✔
995
        var remoteSuffix string
1,568✔
996
        if remote != nil {
2,327✔
997
                // For now, if lookup fails, we will constantly try
759✔
998
                // to recreate this LN connection.
759✔
999
                lacc := remote.LocalAccount
759✔
1000
                var err error
759✔
1001
                acc, err = s.LookupAccount(lacc)
759✔
1002
                if err != nil {
761✔
1003
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1004
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1005
                        // remote needs to be set or retry won't happen
2✔
1006
                        c.leaf.remote = remote
2✔
1007
                        c.closeConnection(MissingAccount)
2✔
1008
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1009
                        return nil
2✔
1010
                }
2✔
1011
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
757✔
1012
        }
1013

1014
        c.mu.Lock()
1,566✔
1015
        c.initClient()
1,566✔
1016
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,566✔
1017

1,566✔
1018
        var (
1,566✔
1019
                tlsFirst         bool
1,566✔
1020
                tlsFirstFallback time.Duration
1,566✔
1021
                infoTimeout      time.Duration
1,566✔
1022
        )
1,566✔
1023
        if remote != nil {
2,323✔
1024
                solicited = true
757✔
1025
                remote.Lock()
757✔
1026
                c.leaf.remote = remote
757✔
1027
                c.setPermissions(remote.perms)
757✔
1028
                if !c.leaf.remote.Hub {
1,508✔
1029
                        c.leaf.isSpoke = true
751✔
1030
                }
751✔
1031
                tlsFirst = remote.TLSHandshakeFirst
757✔
1032
                infoTimeout = remote.FirstInfoTimeout
757✔
1033
                remote.Unlock()
757✔
1034
                c.acc = acc
757✔
1035
        } else {
809✔
1036
                c.flags.set(expectConnect)
809✔
1037
                if ws != nil {
836✔
1038
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1039
                }
27✔
1040
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
809✔
1041
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
810✔
1042
                        tlsFirstFallback = f
1✔
1043
                }
1✔
1044
        }
1045
        c.mu.Unlock()
1,566✔
1046

1,566✔
1047
        var nonce [nonceLen]byte
1,566✔
1048
        var info *Info
1,566✔
1049

1,566✔
1050
        // Grab this before the client lock below.
1,566✔
1051
        if !solicited {
2,375✔
1052
                // Grab server variables
809✔
1053
                s.mu.Lock()
809✔
1054
                info = s.copyLeafNodeInfo()
809✔
1055
                // For tests that want to simulate old servers, do not set the compression
809✔
1056
                // on the INFO protocol if configured with CompressionNotSupported.
809✔
1057
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,617✔
1058
                        info.Compression = cm
808✔
1059
                }
808✔
1060
                s.generateNonce(nonce[:])
809✔
1061
                s.mu.Unlock()
809✔
1062
        }
1063

1064
        // Grab lock
1065
        c.mu.Lock()
1,566✔
1066

1,566✔
1067
        var preBuf []byte
1,566✔
1068
        if solicited {
2,323✔
1069
                // For websocket connection, we need to send an HTTP request,
757✔
1070
                // and get the response before starting the readLoop to get
757✔
1071
                // the INFO, etc..
757✔
1072
                if c.isWebsocket() {
800✔
1073
                        var err error
43✔
1074
                        var closeReason ClosedState
43✔
1075

43✔
1076
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1077
                        if err != nil {
59✔
1078
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1079
                                c.mu.Unlock()
16✔
1080
                                if closeReason != 0 {
28✔
1081
                                        c.closeConnection(closeReason)
12✔
1082
                                }
12✔
1083
                                return nil
16✔
1084
                        }
1085
                } else {
714✔
1086
                        // If configured to do TLS handshake first
714✔
1087
                        if tlsFirst {
718✔
1088
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1089
                                        c.mu.Unlock()
1✔
1090
                                        return nil
1✔
1091
                                }
1✔
1092
                        }
1093
                        // We need to wait for the info, but not for too long.
1094
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
713✔
1095
                }
1096

1097
                // We will process the INFO from the readloop and finish by
1098
                // sending the CONNECT and finish registration later.
1099
        } else {
809✔
1100
                // Send our info to the other side.
809✔
1101
                // Remember the nonce we sent here for signatures, etc.
809✔
1102
                c.nonce = make([]byte, nonceLen)
809✔
1103
                copy(c.nonce, nonce[:])
809✔
1104
                info.Nonce = bytesToString(c.nonce)
809✔
1105
                info.CID = c.cid
809✔
1106
                proto := generateInfoJSON(info)
809✔
1107

809✔
1108
                var pre []byte
809✔
1109
                // We need first to check for "TLS First" fallback delay.
809✔
1110
                if tlsFirstFallback > 0 {
810✔
1111
                        // We wait and see if we are getting any data. Since we did not send
1✔
1112
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1113
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1114
                        // if it is a rogue agent and not an actual client performing the
1✔
1115
                        // TLS handshake, the error will be detected when performing the
1✔
1116
                        // handshake on our side.
1✔
1117
                        pre = make([]byte, 4)
1✔
1118
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1119
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1120
                        c.nc.SetReadDeadline(time.Time{})
1✔
1121
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1122
                        // with the TLS handshake.
1✔
1123
                        if n > 0 {
1✔
1124
                                pre = pre[:n]
×
1125
                        } else {
1✔
1126
                                // We did not get anything so we will send the INFO protocol.
1✔
1127
                                pre = nil
1✔
1128
                                // Set the boolean to false for the rest of the function.
1✔
1129
                                tlsFirst = false
1✔
1130
                        }
1✔
1131
                }
1132

1133
                if !tlsFirst {
1,613✔
1134
                        // We have to send from this go routine because we may
804✔
1135
                        // have to block for TLS handshake before we start our
804✔
1136
                        // writeLoop go routine. The other side needs to receive
804✔
1137
                        // this before it can initiate the TLS handshake..
804✔
1138
                        c.sendProtoNow(proto)
804✔
1139

804✔
1140
                        // The above call could have marked the connection as closed (due to TCP error).
804✔
1141
                        if c.isClosed() {
804✔
1142
                                c.mu.Unlock()
×
1143
                                c.closeConnection(WriteError)
×
1144
                                return nil
×
1145
                        }
×
1146
                }
1147

1148
                // Check to see if we need to spin up TLS.
1149
                if !c.isWebsocket() && info.TLSRequired {
883✔
1150
                        // If we have a prebuffer create a multi-reader.
74✔
1151
                        if len(pre) > 0 {
74✔
1152
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1153
                        }
×
1154
                        // Perform server-side TLS handshake.
1155
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
120✔
1156
                                c.mu.Unlock()
46✔
1157
                                return nil
46✔
1158
                        }
46✔
1159
                }
1160

1161
                // If the user wants the TLS handshake to occur first, now that it is
1162
                // done, send the INFO protocol.
1163
                if tlsFirst {
766✔
1164
                        c.flags.set(didTLSFirst)
3✔
1165
                        c.sendProtoNow(proto)
3✔
1166
                        if c.isClosed() {
3✔
1167
                                c.mu.Unlock()
×
1168
                                c.closeConnection(WriteError)
×
1169
                                return nil
×
1170
                        }
×
1171
                }
1172

1173
                // Leaf nodes will always require a CONNECT to let us know
1174
                // when we are properly bound to an account.
1175
                //
1176
                // If compression is configured, we can't set the authTimer here because
1177
                // it would cause the parser to fail any incoming protocol that is not a
1178
                // CONNECT (and we need to exchange INFO protocols for compression
1179
                // negotiation). So instead, use the ping timer until we are done with
1180
                // negotiation and can set the auth timer.
1181
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
763✔
1182
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,325✔
1183
                        c.ping.tmr = time.AfterFunc(timeout, func() {
572✔
1184
                                c.authTimeout()
10✔
1185
                        })
10✔
1186
                } else {
201✔
1187
                        c.setAuthTimer(timeout)
201✔
1188
                }
201✔
1189
        }
1190

1191
        // Keep track in case server is shutdown before we can successfully register.
1192
        if !s.addToTempClients(c.cid, c) {
1,504✔
1193
                c.mu.Unlock()
1✔
1194
                c.setNoReconnect()
1✔
1195
                c.closeConnection(ServerShutdown)
1✔
1196
                return nil
1✔
1197
        }
1✔
1198

1199
        // Spin up the read loop.
1200
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,004✔
1201

1202
        // We will spin the write loop for solicited connections only
1203
        // when processing the INFO and after switching to TLS if needed.
1204
        if !solicited {
2,265✔
1205
                s.startGoRoutine(func() { c.writeLoop() })
1,526✔
1206
        }
1207

1208
        c.mu.Unlock()
1,502✔
1209

1,502✔
1210
        return c
1,502✔
1211
}
1212

1213
// Will perform the client-side TLS handshake if needed. Assumes that this
1214
// is called by the solicit side (remote will be non nil). Returns `true`
1215
// if TLS is required, `false` otherwise.
1216
// Lock held on entry.
1217
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,827✔
1218
        // Check if TLS is required and gather TLS config variables.
1,827✔
1219
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,827✔
1220
        if !tlsRequired {
3,573✔
1221
                return false, nil
1,746✔
1222
        }
1,746✔
1223

1224
        // If TLS required, peform handshake.
1225
        // Get the URL that was used to connect to the remote server.
1226
        rURL := remote.getCurrentURL()
81✔
1227

81✔
1228
        // Perform the client-side TLS handshake.
81✔
1229
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
120✔
1230
                // Check if we need to reset the remote's TLS name.
39✔
1231
                if resetTLSName {
39✔
1232
                        remote.Lock()
×
1233
                        remote.tlsName = _EMPTY_
×
1234
                        remote.Unlock()
×
1235
                }
×
1236
                return false, err
39✔
1237
        }
1238
        return true, nil
42✔
1239
}
1240

1241
func (c *client) processLeafnodeInfo(info *Info) {
2,521✔
1242
        c.mu.Lock()
2,521✔
1243
        if c.leaf == nil || c.isClosed() {
2,522✔
1244
                c.mu.Unlock()
1✔
1245
                return
1✔
1246
        }
1✔
1247
        s := c.srv
2,520✔
1248
        opts := s.getOpts()
2,520✔
1249
        remote := c.leaf.remote
2,520✔
1250
        didSolicit := remote != nil
2,520✔
1251
        firstINFO := !c.flags.isSet(infoReceived)
2,520✔
1252

2,520✔
1253
        // In case of websocket, the TLS handshake has been already done.
2,520✔
1254
        // So check only for non websocket connections and for configurations
2,520✔
1255
        // where the TLS Handshake was not done first.
2,520✔
1256
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,300✔
1257
                // If the server requires TLS, we need to set this in the remote
1,780✔
1258
                // otherwise if there is no TLS configuration block for the remote,
1,780✔
1259
                // the solicit side will not attempt to perform the TLS handshake.
1,780✔
1260
                if firstINFO && info.TLSRequired {
1,845✔
1261
                        remote.TLS = true
65✔
1262
                }
65✔
1263
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,814✔
1264
                        c.mu.Unlock()
34✔
1265
                        return
34✔
1266
                }
34✔
1267
        }
1268

1269
        // Check for compression, unless already done.
1270
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,714✔
1271
                // Prevent from getting back here.
1,228✔
1272
                c.flags.set(compressionNegotiated)
1,228✔
1273

1,228✔
1274
                var co *CompressionOpts
1,228✔
1275
                if !didSolicit {
1,764✔
1276
                        co = &opts.LeafNode.Compression
536✔
1277
                } else {
1,228✔
1278
                        co = &remote.Compression
692✔
1279
                }
692✔
1280
                if needsCompression(co.Mode) {
2,445✔
1281
                        // Release client lock since following function will need server lock.
1,217✔
1282
                        c.mu.Unlock()
1,217✔
1283
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,217✔
1284
                        if err != nil {
1,217✔
1285
                                c.sendErrAndErr(err.Error())
×
1286
                                c.closeConnection(ProtocolViolation)
×
1287
                                return
×
1288
                        }
×
1289
                        if compress {
2,305✔
1290
                                // Done for now, will get back another INFO protocol...
1,088✔
1291
                                return
1,088✔
1292
                        }
1,088✔
1293
                        // No compression because one side does not want/can't, so proceed.
1294
                        c.mu.Lock()
129✔
1295
                        // Check that the connection did not close if the lock was released.
129✔
1296
                        if c.isClosed() {
129✔
1297
                                c.mu.Unlock()
×
1298
                                return
×
1299
                        }
×
1300
                } else {
11✔
1301
                        // Coming from an old server, the Compression field would be the empty
11✔
1302
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1303
                        // this makes them behave as old servers.
11✔
1304
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1305
                                c.leaf.compression = CompressionNotSupported
3✔
1306
                        } else {
11✔
1307
                                c.leaf.compression = CompressionOff
8✔
1308
                        }
8✔
1309
                }
1310
                // Accepting side does not normally process an INFO protocol during
1311
                // initial connection handshake. So we keep it consistent by returning
1312
                // if we are not soliciting.
1313
                if !didSolicit {
141✔
1314
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1315
                        // clear the ping timer and set the auth timer now that the compression
1✔
1316
                        // negotiation is done.
1✔
1317
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1318
                                clearTimer(&c.ping.tmr)
×
1319
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1320
                        }
×
1321
                        c.mu.Unlock()
1✔
1322
                        return
1✔
1323
                }
1324
                // Fall through and process the INFO protocol as usual.
1325
        }
1326

1327
        // Note: For now, only the initial INFO has a nonce. We
1328
        // will probably do auto key rotation at some point.
1329
        if firstINFO {
2,128✔
1330
                // Mark that the INFO protocol has been received.
731✔
1331
                c.flags.set(infoReceived)
731✔
1332
                // Prevent connecting to non leafnode port. Need to do this only for
731✔
1333
                // the first INFO, not for async INFO updates...
731✔
1334
                //
731✔
1335
                // Content of INFO sent by the server when accepting a tcp connection.
731✔
1336
                // -------------------------------------------------------------------
731✔
1337
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
731✔
1338
                // -------------------------------------------------------------------
731✔
1339
                //      CLIENT    |  X* |        X**        |              |         |
731✔
1340
                //      ROUTE     |     |        X**        |      X***    |         |
731✔
1341
                //     GATEWAY    |     |                   |              |    X    |
731✔
1342
                //     LEAFNODE   |  X  |                   |       X      |         |
731✔
1343
                // -------------------------------------------------------------------
731✔
1344
                // *   Not on older servers.
731✔
1345
                // **  Not if "no advertise" is enabled.
731✔
1346
                // *** Not if leafnode's "no advertise" is enabled.
731✔
1347
                //
731✔
1348
                // As seen from above, a solicited LeafNode connection should receive
731✔
1349
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
731✔
1350
                // else should be considered an attempt to connect to a wrong port.
731✔
1351
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
782✔
1352
                        c.mu.Unlock()
51✔
1353
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1354
                        c.closeConnection(WrongPort)
51✔
1355
                        return
51✔
1356
                }
51✔
1357
                // Reject a cluster that contains spaces.
1358
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
681✔
1359
                        c.mu.Unlock()
1✔
1360
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1361
                        c.closeConnection(ProtocolViolation)
1✔
1362
                        return
1✔
1363
                }
1✔
1364
                // Capture a nonce here.
1365
                c.nonce = []byte(info.Nonce)
679✔
1366
                if info.TLSRequired && didSolicit {
710✔
1367
                        remote.TLS = true
31✔
1368
                }
31✔
1369
                supportsHeaders := c.srv.supportsHeaders()
679✔
1370
                c.headers = supportsHeaders && info.Headers
679✔
1371

679✔
1372
                // Remember the remote server.
679✔
1373
                // Pre 2.2.0 servers are not sending their server name.
679✔
1374
                // In that case, use info.ID, which, for those servers, matches
679✔
1375
                // the content of the field `Name` in the leafnode CONNECT protocol.
679✔
1376
                if info.Name == _EMPTY_ {
679✔
1377
                        c.leaf.remoteServer = info.ID
×
1378
                } else {
679✔
1379
                        c.leaf.remoteServer = info.Name
679✔
1380
                }
679✔
1381
                c.leaf.remoteDomain = info.Domain
679✔
1382
                c.leaf.remoteCluster = info.Cluster
679✔
1383
                // We send the protocol version in the INFO protocol.
679✔
1384
                // Keep track of it, so we know if this connection supports message
679✔
1385
                // tracing for instance.
679✔
1386
                c.opts.Protocol = info.Proto
679✔
1387
        }
1388

1389
        // For both initial INFO and async INFO protocols, Possibly
1390
        // update our list of remote leafnode URLs we can connect to.
1391
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,611✔
1392
                // Consider the incoming array as the most up-to-date
1,266✔
1393
                // representation of the remote cluster's list of URLs.
1,266✔
1394
                c.updateLeafNodeURLs(info)
1,266✔
1395
        }
1,266✔
1396

1397
        // Check to see if we have permissions updates here.
1398
        if info.Import != nil || info.Export != nil {
1,357✔
1399
                perms := &Permissions{
12✔
1400
                        Publish:   info.Export,
12✔
1401
                        Subscribe: info.Import,
12✔
1402
                }
12✔
1403
                // Check if we have local deny clauses that we need to merge.
12✔
1404
                if remote := c.leaf.remote; remote != nil {
24✔
1405
                        if len(remote.DenyExports) > 0 {
13✔
1406
                                if perms.Publish == nil {
1✔
1407
                                        perms.Publish = &SubjectPermission{}
×
1408
                                }
×
1409
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1410
                        }
1411
                        if len(remote.DenyImports) > 0 {
13✔
1412
                                if perms.Subscribe == nil {
1✔
1413
                                        perms.Subscribe = &SubjectPermission{}
×
1414
                                }
×
1415
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1416
                        }
1417
                }
1418
                c.setPermissions(perms)
12✔
1419
        }
1420

1421
        var resumeConnect bool
1,345✔
1422

1,345✔
1423
        // If this is a remote connection and this is the first INFO protocol,
1,345✔
1424
        // then we need to finish the connect process by sending CONNECT, etc..
1,345✔
1425
        if firstINFO && didSolicit {
1,983✔
1426
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
638✔
1427
                c.nc.SetDeadline(time.Time{})
638✔
1428
                resumeConnect = true
638✔
1429
        } else if !firstINFO && didSolicit {
1,973✔
1430
                c.leaf.remoteAccName = info.RemoteAccount
628✔
1431
        }
628✔
1432

1433
        // Check if we have the remote account information and if so make sure it's stored.
1434
        if info.RemoteAccount != _EMPTY_ {
1,961✔
1435
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
616✔
1436
        }
616✔
1437
        c.mu.Unlock()
1,345✔
1438

1,345✔
1439
        finishConnect := info.ConnectInfo
1,345✔
1440
        if resumeConnect && s != nil {
1,983✔
1441
                s.leafNodeResumeConnectProcess(c)
638✔
1442
                if !info.InfoOnConnect {
638✔
1443
                        finishConnect = true
×
1444
                }
×
1445
        }
1446
        if finishConnect {
1,961✔
1447
                s.leafNodeFinishConnectProcess(c)
616✔
1448
        }
616✔
1449
}
1450

1451
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,217✔
1452
        // Negotiate the appropriate compression mode (or no compression)
1,217✔
1453
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,217✔
1454
        if err != nil {
1,217✔
1455
                return false, err
×
1456
        }
×
1457
        c.mu.Lock()
1,217✔
1458
        // For "auto" mode, set the initial compression mode based on RTT
1,217✔
1459
        if cm == CompressionS2Auto {
2,266✔
1460
                if c.rttStart.IsZero() {
2,098✔
1461
                        c.rtt = computeRTT(c.start)
1,049✔
1462
                }
1,049✔
1463
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,049✔
1464
        }
1465
        // Keep track of the negotiated compression mode.
1466
        c.leaf.compression = cm
1,217✔
1467
        cid := c.cid
1,217✔
1468
        var nonce string
1,217✔
1469
        if !didSolicit {
1,752✔
1470
                nonce = bytesToString(c.nonce)
535✔
1471
        }
535✔
1472
        c.mu.Unlock()
1,217✔
1473

1,217✔
1474
        if !needsCompression(cm) {
1,346✔
1475
                return false, nil
129✔
1476
        }
129✔
1477

1478
        // If we end-up doing compression...
1479

1480
        // Generate an INFO with the chosen compression mode.
1481
        s.mu.Lock()
1,088✔
1482
        info := s.copyLeafNodeInfo()
1,088✔
1483
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,088✔
1484
        infoProto := generateInfoJSON(info)
1,088✔
1485
        s.mu.Unlock()
1,088✔
1486

1,088✔
1487
        // If we solicited, then send this INFO protocol BEFORE switching
1,088✔
1488
        // to compression writer. However, if we did not, we send it after.
1,088✔
1489
        c.mu.Lock()
1,088✔
1490
        if didSolicit {
1,641✔
1491
                c.enqueueProto(infoProto)
553✔
1492
                // Make sure it is completely flushed (the pending bytes goes to
553✔
1493
                // 0) before proceeding.
553✔
1494
                for c.out.pb > 0 && !c.isClosed() {
1,106✔
1495
                        c.flushOutbound()
553✔
1496
                }
553✔
1497
        }
1498
        // This is to notify the readLoop that it should switch to a
1499
        // (de)compression reader.
1500
        c.in.flags.set(switchToCompression)
1,088✔
1501
        // Create the compress writer before queueing the INFO protocol for
1,088✔
1502
        // a route that did not solicit. It will make sure that that proto
1,088✔
1503
        // is sent with compression on.
1,088✔
1504
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,088✔
1505
        if !didSolicit {
1,623✔
1506
                c.enqueueProto(infoProto)
535✔
1507
        }
535✔
1508
        c.mu.Unlock()
1,088✔
1509
        return true, nil
1,088✔
1510
}
1511

1512
// When getting a leaf node INFO protocol, use the provided
1513
// array of urls to update the list of possible endpoints.
1514
func (c *client) updateLeafNodeURLs(info *Info) {
1,266✔
1515
        cfg := c.leaf.remote
1,266✔
1516
        cfg.Lock()
1,266✔
1517
        defer cfg.Unlock()
1,266✔
1518

1,266✔
1519
        // We have ensured that if a remote has a WS scheme, then all are.
1,266✔
1520
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,266✔
1521
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,320✔
1522
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1523
                // we will have already marked that the remote should use TLS anyway.
54✔
1524
                // But use proper scheme for log statements, etc...
54✔
1525
                proto := wsSchemePrefix
54✔
1526
                if cfg.TLS {
54✔
1527
                        proto = wsSchemePrefixTLS
×
1528
                }
×
1529
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1530
                return
54✔
1531
        }
1532
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,212✔
1533
}
1534

1535
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,266✔
1536
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,266✔
1537
        // Add the ones we receive in the protocol
1,266✔
1538
        for _, surl := range URLs {
3,569✔
1539
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,303✔
1540
                if err != nil {
2,303✔
1541
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1542
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1543
                        continue
×
1544
                }
1545
                // Do not add if it's the same as what we already have configured.
1546
                var dup bool
2,303✔
1547
                for _, u := range cfg.URLs {
5,869✔
1548
                        // URLs that we receive never have user info, but the
3,566✔
1549
                        // ones that were configured may have. Simply compare
3,566✔
1550
                        // host and port to decide if they are equal or not.
3,566✔
1551
                        if url.Host == u.Host && url.Port() == u.Port() {
5,262✔
1552
                                dup = true
1,696✔
1553
                                break
1,696✔
1554
                        }
1555
                }
1556
                if !dup {
2,910✔
1557
                        cfg.urls = append(cfg.urls, url)
607✔
1558
                        cfg.saveTLSHostname(url)
607✔
1559
                }
607✔
1560
        }
1561
        // Add the configured one
1562
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,266✔
1563
}
1564

1565
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1566
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
2,986✔
1567
        opts := s.getOpts()
2,986✔
1568
        if opts.LeafNode.Advertise != _EMPTY_ {
2,997✔
1569
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1570
                if err != nil {
11✔
1571
                        return err
×
1572
                }
×
1573
                s.leafNodeInfo.Host = advHost
11✔
1574
                s.leafNodeInfo.Port = advPort
11✔
1575
        } else {
2,975✔
1576
                s.leafNodeInfo.Host = opts.LeafNode.Host
2,975✔
1577
                s.leafNodeInfo.Port = opts.LeafNode.Port
2,975✔
1578
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
2,975✔
1579
                // This will return at most 1 IP.
2,975✔
1580
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
2,975✔
1581
                if err != nil {
2,975✔
1582
                        return err
×
1583
                }
×
1584
                if hostIsIPAny {
3,256✔
1585
                        if len(ips) == 0 {
281✔
1586
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1587
                                        s.leafNodeInfo.Host)
×
1588
                        } else {
281✔
1589
                                // Take the first from the list...
281✔
1590
                                s.leafNodeInfo.Host = ips[0]
281✔
1591
                        }
281✔
1592
                }
1593
        }
1594
        // Use just host:port for the IP
1595
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
2,986✔
1596
        if opts.LeafNode.Advertise != _EMPTY_ {
2,997✔
1597
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1598
        }
11✔
1599
        return nil
2,986✔
1600
}
1601

1602
// Add the connection to the map of leaf nodes.
1603
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1604
// if a connection already exists for the same server name and account.
1605
// That can happen when the remote is attempting to reconnect while the accepting
1606
// side did not detect the connection as broken yet.
1607
// But it can also happen when there is a misconfiguration and the remote is
1608
// creating two (or more) connections that bind to the same account on the accept
1609
// side.
1610
// When a duplicate is found, the new connection is accepted and the old is closed
1611
// (this solves the stale connection situation). An error is returned to help the
1612
// remote detect the misconfiguration when the duplicate is the result of that
1613
// misconfiguration.
1614
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,256✔
1615
        var accName string
1,256✔
1616
        c.mu.Lock()
1,256✔
1617
        cid := c.cid
1,256✔
1618
        acc := c.acc
1,256✔
1619
        if acc != nil {
2,512✔
1620
                accName = acc.Name
1,256✔
1621
        }
1,256✔
1622
        myRemoteDomain := c.leaf.remoteDomain
1,256✔
1623
        mySrvName := c.leaf.remoteServer
1,256✔
1624
        remoteAccName := c.leaf.remoteAccName
1,256✔
1625
        myClustName := c.leaf.remoteCluster
1,256✔
1626
        solicited := c.leaf.remote != nil
1,256✔
1627
        c.mu.Unlock()
1,256✔
1628

1,256✔
1629
        var old *client
1,256✔
1630
        s.mu.Lock()
1,256✔
1631
        // We check for empty because in some test we may send empty CONNECT{}
1,256✔
1632
        if checkForDup && srvName != _EMPTY_ {
1,872✔
1633
                for _, ol := range s.leafs {
1,016✔
1634
                        ol.mu.Lock()
400✔
1635
                        // We care here only about non solicited Leafnode. This function
400✔
1636
                        // is more about replacing stale connections than detecting loops.
400✔
1637
                        // We have code for the loop detection elsewhere, which also delays
400✔
1638
                        // attempt to reconnect.
400✔
1639
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
400✔
1640
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
400✔
1641
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
402✔
1642
                                old = ol
2✔
1643
                        }
2✔
1644
                        ol.mu.Unlock()
400✔
1645
                        if old != nil {
402✔
1646
                                break
2✔
1647
                        }
1648
                }
1649
        }
1650
        // Store new connection in the map
1651
        s.leafs[cid] = c
1,256✔
1652
        s.mu.Unlock()
1,256✔
1653
        s.removeFromTempClients(cid)
1,256✔
1654

1,256✔
1655
        // If applicable, evict the old one.
1,256✔
1656
        if old != nil {
1,258✔
1657
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1658
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1659
                c.Warnf("Replacing connection from same server")
2✔
1660
        }
2✔
1661

1662
        srvDecorated := func() string {
1,464✔
1663
                if myClustName == _EMPTY_ {
229✔
1664
                        return mySrvName
21✔
1665
                }
21✔
1666
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
187✔
1667
        }
1668

1669
        opts := s.getOpts()
1,256✔
1670
        sysAcc := s.SystemAccount()
1,256✔
1671
        js := s.getJetStream()
1,256✔
1672
        var meta *raft
1,256✔
1673
        if js != nil {
1,780✔
1674
                if mg := js.getMetaGroup(); mg != nil {
959✔
1675
                        meta = mg.(*raft)
435✔
1676
                }
435✔
1677
        }
1678
        blockMappingOutgoing := false
1,256✔
1679
        // Deny (non domain) JetStream API traffic unless system account is shared
1,256✔
1680
        // and domain names are identical and extending is not disabled
1,256✔
1681

1,256✔
1682
        // Check if backwards compatibility has been enabled and needs to be acted on
1,256✔
1683
        forceSysAccDeny := false
1,256✔
1684
        if len(opts.JsAccDefaultDomain) > 0 {
1,293✔
1685
                if acc == sysAcc {
48✔
1686
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1687
                                if d == _EMPTY_ {
19✔
1688
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1689
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1690
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1691
                                        forceSysAccDeny = true
8✔
1692
                                        break
8✔
1693
                                }
1694
                        }
1695
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1696
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1697
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1698
                        return
15✔
1699
                }
15✔
1700
        }
1701

1702
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1703
        // This is either signaled by js being disabled and a domain set,
1704
        // or in cases where no domain name exists, an extension hint is set.
1705
        // However, this is only relevant in mixed setups.
1706
        //
1707
        // If the system account connects but default domains are present, JetStream can't be extended.
1708
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,241✔
1709
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,325✔
1710
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,084✔
1711
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,084✔
1712
                if acc != nil && acc == sysAcc {
1,220✔
1713
                        c.Noticef("System account connected from %s", srvDecorated())
136✔
1714
                        c.Noticef("JetStream not extended, domains differ")
136✔
1715
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
136✔
1716
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
136✔
1717
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
136✔
1718
                        if solicited && meta != nil && meta.IsObserver() {
165✔
1719
                                meta.setObserver(false, extNotExtended)
29✔
1720
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1721
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1722
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1723
                                // Meta controller can't be leader yet.
29✔
1724
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1725
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1726
                                meta.Campaign()
29✔
1727
                        }
29✔
1728
                } else {
948✔
1729
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
948✔
1730
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
948✔
1731
                }
948✔
1732
                blockMappingOutgoing = true
1,084✔
1733
        } else if acc == sysAcc {
229✔
1734
                // system account and same domain
72✔
1735
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1736
                        myRemoteDomain, srvDecorated())
72✔
1737
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1738
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1739
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1740
                        meta.setObserver(true, extExtended)
4✔
1741
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1742
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1743
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1744
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1745
                        meta.StepDown()
4✔
1746
                }
4✔
1747
        } else {
85✔
1748
                // This deny is needed in all cases (system account shared or not)
85✔
1749
                // If the system account is shared, jsAllAPI traffic will go through the system account.
85✔
1750
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
85✔
1751
                // If the system account is NOT shared, jsAllAPI traffic has no business
85✔
1752
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
85✔
1753
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
85✔
1754
        }
85✔
1755
        // If we have a specified JetStream domain we will want to add a mapping to
1756
        // allow access cross domain for each non-system account.
1757
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,471✔
1758
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,300✔
1759
                        if err := acc.AddMapping(src, dest); err != nil {
2,070✔
1760
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1761
                        } else {
2,070✔
1762
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,070✔
1763
                        }
2,070✔
1764
                }
1765
                if blockMappingOutgoing {
429✔
1766
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
199✔
1767
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
199✔
1768
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
199✔
1769
                        // of this issue, not all of them.
199✔
1770
                        // This guards against a hub and a spoke having the same domain name.
199✔
1771
                        // But not two spokes having the same one and the request coming from the hub.
199✔
1772
                        c.mergeDenyPermissionsLocked(pub, []string{src})
199✔
1773
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
199✔
1774
                }
199✔
1775
        }
1776
}
1777

1778
func (s *Server) removeLeafNodeConnection(c *client) {
1,568✔
1779
        c.mu.Lock()
1,568✔
1780
        cid := c.cid
1,568✔
1781
        if c.leaf != nil {
3,136✔
1782
                if c.leaf.tsubt != nil {
2,713✔
1783
                        c.leaf.tsubt.Stop()
1,145✔
1784
                        c.leaf.tsubt = nil
1,145✔
1785
                }
1,145✔
1786
                if c.leaf.gwSub != nil {
2,182✔
1787
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
614✔
1788
                        // We need to set this to nil for GC to release the connection
614✔
1789
                        c.leaf.gwSub = nil
614✔
1790
                }
614✔
1791
        }
1792
        c.mu.Unlock()
1,568✔
1793
        s.mu.Lock()
1,568✔
1794
        delete(s.leafs, cid)
1,568✔
1795
        s.mu.Unlock()
1,568✔
1796
        s.removeFromTempClients(cid)
1,568✔
1797
}
1798

1799
// Connect information for solicited leafnodes.
1800
type leafConnectInfo struct {
1801
        Version   string   `json:"version,omitempty"`
1802
        Nkey      string   `json:"nkey,omitempty"`
1803
        JWT       string   `json:"jwt,omitempty"`
1804
        Sig       string   `json:"sig,omitempty"`
1805
        User      string   `json:"user,omitempty"`
1806
        Pass      string   `json:"pass,omitempty"`
1807
        Token     string   `json:"auth_token,omitempty"`
1808
        ID        string   `json:"server_id,omitempty"`
1809
        Domain    string   `json:"domain,omitempty"`
1810
        Name      string   `json:"name,omitempty"`
1811
        Hub       bool     `json:"is_hub,omitempty"`
1812
        Cluster   string   `json:"cluster,omitempty"`
1813
        Headers   bool     `json:"headers,omitempty"`
1814
        JetStream bool     `json:"jetstream,omitempty"`
1815
        DenyPub   []string `json:"deny_pub,omitempty"`
1816

1817
        // There was an existing field called:
1818
        // >> Comp bool `json:"compression,omitempty"`
1819
        // that has never been used. With support for compression, we now need
1820
        // a field that is a string. So we use a different json tag:
1821
        Compression string `json:"compress_mode,omitempty"`
1822

1823
        // Just used to detect wrong connection attempts.
1824
        Gateway string `json:"gateway,omitempty"`
1825

1826
        // Tells the accept side which account the remote is binding to.
1827
        RemoteAccount string `json:"remote_account,omitempty"`
1828

1829
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1830
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1831
        // version as part of the CONNECT. It will indicate if a connection supports
1832
        // some features, such as message tracing.
1833
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1834
        // in the low level process CONNECT.
1835
        Proto int `json:"protocol,omitempty"`
1836
}
1837

1838
// processLeafNodeConnect will process the inbound connect args.
1839
// Once we are here we are bound to an account, so can send any interest that
1840
// we would have to the other side.
1841
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
648✔
1842
        // Way to detect clients that incorrectly connect to the route listen
648✔
1843
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
648✔
1844
        if lang != _EMPTY_ {
648✔
1845
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1846
                c.closeConnection(WrongPort)
×
1847
                return ErrClientConnectedToLeafNodePort
×
1848
        }
×
1849

1850
        // Unmarshal as a leaf node connect protocol
1851
        proto := &leafConnectInfo{}
648✔
1852
        if err := json.Unmarshal(arg, proto); err != nil {
648✔
1853
                return err
×
1854
        }
×
1855

1856
        // Reject a cluster that contains spaces.
1857
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
649✔
1858
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1859
                c.closeConnection(ProtocolViolation)
1✔
1860
                return ErrClusterNameHasSpaces
1✔
1861
        }
1✔
1862

1863
        // Check for cluster name collisions.
1864
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
651✔
1865
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1866
                c.closeConnection(ClusterNamesIdentical)
4✔
1867
                return ErrLeafNodeHasSameClusterName
4✔
1868
        }
4✔
1869

1870
        // Reject if this has Gateway which means that it would be from a gateway
1871
        // connection that incorrectly connects to the leafnode port.
1872
        if proto.Gateway != _EMPTY_ {
643✔
1873
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1874
                c.Errorf(errTxt)
×
1875
                c.sendErr(errTxt)
×
1876
                c.closeConnection(WrongGateway)
×
1877
                return ErrWrongGateway
×
1878
        }
×
1879

1880
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
645✔
1881
                major, minor, update, _ := versionComponents(mv)
2✔
1882
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1883
                        // We are going to send back an INFO because otherwise recent
1✔
1884
                        // versions of the remote server would simply break the connection
1✔
1885
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1886
                        // other side to just "stall" until we finish waiting for the holding
1✔
1887
                        // period and close the connection below.
1✔
1888
                        s.sendPermsAndAccountInfo(c)
1✔
1889
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1890
                        select {
1✔
1891
                        case <-c.srv.quitCh:
1✔
1892
                        case <-time.After(leafNodeWaitBeforeClose):
×
1893
                        }
1894
                        c.closeConnection(MinimumVersionRequired)
1✔
1895
                        return ErrMinimumVersionRequired
1✔
1896
                }
1897
        }
1898

1899
        // Check if this server supports headers.
1900
        supportHeaders := c.srv.supportsHeaders()
642✔
1901

642✔
1902
        c.mu.Lock()
642✔
1903
        // Leaf Nodes do not do echo or verbose or pedantic.
642✔
1904
        c.opts.Verbose = false
642✔
1905
        c.opts.Echo = false
642✔
1906
        c.opts.Pedantic = false
642✔
1907
        // This inbound connection will be marked as supporting headers if this server
642✔
1908
        // support headers and the remote has sent in the CONNECT protocol that it does
642✔
1909
        // support headers too.
642✔
1910
        c.headers = supportHeaders && proto.Headers
642✔
1911
        // If the compression level is still not set, set it based on what has been
642✔
1912
        // given to us in the CONNECT protocol.
642✔
1913
        if c.leaf.compression == _EMPTY_ {
763✔
1914
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
121✔
1915
                if proto.Compression == _EMPTY_ {
147✔
1916
                        c.leaf.compression = CompressionNotSupported
26✔
1917
                } else {
121✔
1918
                        c.leaf.compression = proto.Compression
95✔
1919
                }
95✔
1920
        }
1921

1922
        // Remember the remote server.
1923
        c.leaf.remoteServer = proto.Name
642✔
1924
        // Remember the remote account name
642✔
1925
        c.leaf.remoteAccName = proto.RemoteAccount
642✔
1926

642✔
1927
        // If the other side has declared itself a hub, so we will take on the spoke role.
642✔
1928
        if proto.Hub {
648✔
1929
                c.leaf.isSpoke = true
6✔
1930
        }
6✔
1931

1932
        // The soliciting side is part of a cluster.
1933
        if proto.Cluster != _EMPTY_ {
1,144✔
1934
                c.leaf.remoteCluster = proto.Cluster
502✔
1935
        }
502✔
1936

1937
        c.leaf.remoteDomain = proto.Domain
642✔
1938

642✔
1939
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
642✔
1940
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
642✔
1941
        if !c.isSolicitedLeafNode() && c.perms != nil {
655✔
1942
                sp, pp := c.perms.sub, c.perms.pub
13✔
1943
                c.perms.sub, c.perms.pub = pp, sp
13✔
1944
                if c.opts.Import != nil {
25✔
1945
                        c.darray = c.opts.Import.Deny
12✔
1946
                } else {
13✔
1947
                        c.darray = nil
1✔
1948
                }
1✔
1949
        }
1950

1951
        // Set the Ping timer
1952
        c.setFirstPingTimer()
642✔
1953

642✔
1954
        // If we received pub deny permissions from the other end, merge with existing ones.
642✔
1955
        c.mergeDenyPermissions(pub, proto.DenyPub)
642✔
1956

642✔
1957
        c.mu.Unlock()
642✔
1958

642✔
1959
        // Register the cluster, even if empty, as long as we are acting as a hub.
642✔
1960
        if !proto.Hub {
1,278✔
1961
                c.acc.registerLeafNodeCluster(proto.Cluster)
636✔
1962
        }
636✔
1963

1964
        // Add in the leafnode here since we passed through auth at this point.
1965
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
642✔
1966

642✔
1967
        // If we have permissions bound to this leafnode we need to send then back to the
642✔
1968
        // origin server for local enforcement.
642✔
1969
        s.sendPermsAndAccountInfo(c)
642✔
1970

642✔
1971
        // Create and initialize the smap since we know our bound account now.
642✔
1972
        // This will send all registered subs too.
642✔
1973
        s.initLeafNodeSmapAndSendSubs(c)
642✔
1974

642✔
1975
        // Announce the account connect event for a leaf node.
642✔
1976
        // This will no-op as needed.
642✔
1977
        s.sendLeafNodeConnect(c.acc)
642✔
1978

642✔
1979
        return nil
642✔
1980
}
1981

1982
// Returns the remote cluster name. This is set only once so does not require a lock.
1983
func (c *client) remoteCluster() string {
192,487✔
1984
        if c.leaf == nil {
192,487✔
1985
                return _EMPTY_
×
1986
        }
×
1987
        return c.leaf.remoteCluster
192,487✔
1988
}
1989

1990
// Sends back an info block to the soliciting leafnode to let it know about
1991
// its permission settings for local enforcement.
1992
func (s *Server) sendPermsAndAccountInfo(c *client) {
643✔
1993
        // Copy
643✔
1994
        info := s.copyLeafNodeInfo()
643✔
1995
        c.mu.Lock()
643✔
1996
        info.CID = c.cid
643✔
1997
        info.Import = c.opts.Import
643✔
1998
        info.Export = c.opts.Export
643✔
1999
        info.RemoteAccount = c.acc.Name
643✔
2000
        info.ConnectInfo = true
643✔
2001
        c.enqueueProto(generateInfoJSON(info))
643✔
2002
        c.mu.Unlock()
643✔
2003
}
643✔
2004

2005
// Snapshot the current subscriptions from the sublist into our smap which
2006
// we will keep updated from now on.
2007
// Also send the registered subscriptions.
2008
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,256✔
2009
        acc := c.acc
1,256✔
2010
        if acc == nil {
1,256✔
2011
                c.Debugf("Leafnode does not have an account bound")
×
2012
                return
×
2013
        }
×
2014
        // Collect all account subs here.
2015
        _subs := [1024]*subscription{}
1,256✔
2016
        subs := _subs[:0]
1,256✔
2017
        ims := []string{}
1,256✔
2018

1,256✔
2019
        // Hold the client lock otherwise there can be a race and miss some subs.
1,256✔
2020
        c.mu.Lock()
1,256✔
2021
        defer c.mu.Unlock()
1,256✔
2022

1,256✔
2023
        acc.mu.RLock()
1,256✔
2024
        accName := acc.Name
1,256✔
2025
        accNTag := acc.nameTag
1,256✔
2026

1,256✔
2027
        // To make printing look better when no friendly name present.
1,256✔
2028
        if accNTag != _EMPTY_ {
1,266✔
2029
                accNTag = "/" + accNTag
10✔
2030
        }
10✔
2031

2032
        // If we are solicited we only send interest for local clients.
2033
        if c.isSpokeLeafNode() {
1,870✔
2034
                acc.sl.localSubs(&subs, true)
614✔
2035
        } else {
1,256✔
2036
                acc.sl.All(&subs)
642✔
2037
        }
642✔
2038

2039
        // Check if we have an existing service import reply.
2040
        siReply := copyBytes(acc.siReply)
1,256✔
2041

1,256✔
2042
        // Since leaf nodes only send on interest, if the bound
1,256✔
2043
        // account has import services we need to send those over.
1,256✔
2044
        for isubj := range acc.imports.services {
5,913✔
2045
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,933✔
2046
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
276✔
2047
                        continue
276✔
2048
                }
2049
                ims = append(ims, isubj)
4,381✔
2050
        }
2051
        // Likewise for mappings.
2052
        for _, m := range acc.mappings {
3,446✔
2053
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,226✔
2054
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2055
                        continue
36✔
2056
                }
2057
                ims = append(ims, m.src)
2,154✔
2058
        }
2059

2060
        // Create a unique subject that will be used for loop detection.
2061
        lds := acc.lds
1,256✔
2062
        acc.mu.RUnlock()
1,256✔
2063

1,256✔
2064
        // Check if we have to create the LDS.
1,256✔
2065
        if lds == _EMPTY_ {
2,240✔
2066
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
984✔
2067
                acc.mu.Lock()
984✔
2068
                acc.lds = lds
984✔
2069
                acc.mu.Unlock()
984✔
2070
        }
984✔
2071

2072
        // Now check for gateway interest. Leafnodes will put this into
2073
        // the proper mode to propagate, but they are not held in the account.
2074
        gwsa := [16]*client{}
1,256✔
2075
        gws := gwsa[:0]
1,256✔
2076
        s.getOutboundGatewayConnections(&gws)
1,256✔
2077
        for _, cgw := range gws {
1,338✔
2078
                cgw.mu.Lock()
82✔
2079
                gw := cgw.gw
82✔
2080
                cgw.mu.Unlock()
82✔
2081
                if gw != nil {
164✔
2082
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
164✔
2083
                                if e := ei.(*outsie); e != nil && e.sl != nil {
164✔
2084
                                        e.sl.All(&subs)
82✔
2085
                                }
82✔
2086
                        }
2087
                }
2088
        }
2089

2090
        applyGlobalRouting := s.gateway.enabled
1,256✔
2091
        if c.isSpokeLeafNode() {
1,870✔
2092
                // Add a fake subscription for this solicited leafnode connection
614✔
2093
                // so that we can send back directly for mapped GW replies.
614✔
2094
                // We need to keep track of this subscription so it can be removed
614✔
2095
                // when the connection is closed so that the GC can release it.
614✔
2096
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
614✔
2097
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
614✔
2098
        }
614✔
2099

2100
        // Now walk the results and add them to our smap
2101
        rc := c.leaf.remoteCluster
1,256✔
2102
        c.leaf.smap = make(map[string]int32)
1,256✔
2103
        for _, sub := range subs {
39,006✔
2104
                // Check perms regardless of role.
37,750✔
2105
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,093✔
2106
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,343✔
2107
                        continue
2,343✔
2108
                }
2109
                // We ignore ourselves here.
2110
                // Also don't add the subscription if it has a origin cluster and the
2111
                // cluster name matches the one of the client we are sending to.
2112
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,335✔
2113
                        count := int32(1)
29,928✔
2114
                        if len(sub.queue) > 0 && sub.qw > 0 {
29,937✔
2115
                                count = sub.qw
9✔
2116
                        }
9✔
2117
                        c.leaf.smap[keyFromSub(sub)] += count
29,928✔
2118
                        if c.leaf.tsub == nil {
31,109✔
2119
                                c.leaf.tsub = make(map[*subscription]struct{})
1,181✔
2120
                        }
1,181✔
2121
                        c.leaf.tsub[sub] = struct{}{}
29,928✔
2122
                }
2123
        }
2124
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2125
        for _, isubj := range ims {
7,791✔
2126
                c.leaf.smap[isubj]++
6,535✔
2127
        }
6,535✔
2128
        // If we have gateways enabled we need to make sure the other side sends us responses
2129
        // that have been augmented from the original subscription.
2130
        // TODO(dlc) - Should we lock this down more?
2131
        if applyGlobalRouting {
1,358✔
2132
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
102✔
2133
                c.leaf.smap[gwReplyPrefix+">"]++
102✔
2134
        }
102✔
2135
        // Detect loops by subscribing to a specific subject and checking
2136
        // if this sub is coming back to us.
2137
        c.leaf.smap[lds]++
1,256✔
2138

1,256✔
2139
        // Check if we need to add an existing siReply to our map.
1,256✔
2140
        // This will be a prefix so add on the wildcard.
1,256✔
2141
        if siReply != nil {
1,277✔
2142
                wcsub := append(siReply, '>')
21✔
2143
                c.leaf.smap[string(wcsub)]++
21✔
2144
        }
21✔
2145
        // Queue all protocols. There is no max pending limit for LN connection,
2146
        // so we don't need chunking. The writes will happen from the writeLoop.
2147
        var b bytes.Buffer
1,256✔
2148
        for key, n := range c.leaf.smap {
27,333✔
2149
                c.writeLeafSub(&b, key, n)
26,077✔
2150
        }
26,077✔
2151
        if b.Len() > 0 {
2,512✔
2152
                c.enqueueProto(b.Bytes())
1,256✔
2153
        }
1,256✔
2154
        if c.leaf.tsub != nil {
2,438✔
2155
                // Clear the tsub map after 5 seconds.
1,182✔
2156
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,219✔
2157
                        c.mu.Lock()
37✔
2158
                        if c.leaf != nil {
74✔
2159
                                c.leaf.tsub = nil
37✔
2160
                                c.leaf.tsubt = nil
37✔
2161
                        }
37✔
2162
                        c.mu.Unlock()
37✔
2163
                })
2164
        }
2165
}
2166

2167
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2168
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
198,528✔
2169
        acc, err := s.LookupAccount(accName)
198,528✔
2170
        if acc == nil || err != nil {
198,690✔
2171
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
162✔
2172
                return
162✔
2173
        }
162✔
2174
        acc.updateLeafNodes(sub, delta)
198,366✔
2175
}
2176

2177
// updateLeafNodes will make sure to update the account smap for the subscription.
2178
// Will also forward to all leaf nodes as needed.
2179
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,293,349✔
2180
        if acc == nil || sub == nil {
2,293,349✔
2181
                return
×
2182
        }
×
2183

2184
        // We will do checks for no leafnodes and same cluster here inline and under the
2185
        // general account read lock.
2186
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2187
        // blocking routes or GWs.
2188

2189
        acc.mu.RLock()
2,293,349✔
2190
        // First check if we even have leafnodes here.
2,293,349✔
2191
        if acc.nleafs == 0 {
4,520,786✔
2192
                acc.mu.RUnlock()
2,227,437✔
2193
                return
2,227,437✔
2194
        }
2,227,437✔
2195

2196
        // Is this a loop detection subject.
2197
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
65,912✔
2198

65,912✔
2199
        // Capture the cluster even if its empty.
65,912✔
2200
        var cluster string
65,912✔
2201
        if sub.origin != nil {
113,419✔
2202
                cluster = bytesToString(sub.origin)
47,507✔
2203
        }
47,507✔
2204

2205
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2206
        // Empty clusters will return false for the check.
2207
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
86,151✔
2208
                acc.mu.RUnlock()
20,239✔
2209
                return
20,239✔
2210
        }
20,239✔
2211

2212
        // We can release the general account lock.
2213
        acc.mu.RUnlock()
45,673✔
2214

45,673✔
2215
        // We can hold the list lock here to avoid having to copy a large slice.
45,673✔
2216
        acc.lmu.RLock()
45,673✔
2217
        defer acc.lmu.RUnlock()
45,673✔
2218

45,673✔
2219
        // Do this once.
45,673✔
2220
        subject := string(sub.subject)
45,673✔
2221

45,673✔
2222
        // Walk the connected leafnodes.
45,673✔
2223
        for _, ln := range acc.lleafs {
102,846✔
2224
                if ln == sub.client {
86,621✔
2225
                        continue
29,448✔
2226
                }
2227
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2228
                ln.mu.Lock()
27,725✔
2229
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
27,725✔
2230
                // the detection of loops as long as different cluster.
27,725✔
2231
                clusterDifferent := cluster != ln.remoteCluster()
27,725✔
2232
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
51,068✔
2233
                        ln.updateSmap(sub, delta, isLDS)
23,343✔
2234
                }
23,343✔
2235
                ln.mu.Unlock()
27,725✔
2236
        }
2237
}
2238

2239
// This will make an update to our internal smap and determine if we should send out
2240
// an interest update to the remote side.
2241
// Lock should be held.
2242
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,343✔
2243
        if c.leaf.smap == nil {
23,348✔
2244
                return
5✔
2245
        }
5✔
2246

2247
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2248
        skind := sub.client.kind
23,338✔
2249
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,338✔
2250
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,041✔
2251
                return
8,703✔
2252
        }
8,703✔
2253

2254
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2255
        if delta > 0 && c.leaf.tsub != nil {
21,645✔
2256
                if _, present := c.leaf.tsub[sub]; present {
7,010✔
2257
                        delete(c.leaf.tsub, sub)
×
2258
                        if len(c.leaf.tsub) == 0 {
×
2259
                                c.leaf.tsub = nil
×
2260
                                c.leaf.tsubt.Stop()
×
2261
                                c.leaf.tsubt = nil
×
2262
                        }
×
2263
                        return
×
2264
                }
2265
        }
2266

2267
        key := keyFromSub(sub)
14,635✔
2268
        n, ok := c.leaf.smap[key]
14,635✔
2269
        if delta < 0 && !ok {
15,545✔
2270
                return
910✔
2271
        }
910✔
2272

2273
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2274
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,725✔
2275
        n += delta
13,725✔
2276
        if n > 0 {
23,904✔
2277
                c.leaf.smap[key] = n
10,179✔
2278
        } else {
13,725✔
2279
                delete(c.leaf.smap, key)
3,546✔
2280
        }
3,546✔
2281
        if update {
23,106✔
2282
                c.sendLeafNodeSubUpdate(key, n)
9,381✔
2283
        }
9,381✔
2284
}
2285

2286
// Used to force add subjects to the subject map.
2287
func (c *client) forceAddToSmap(subj string) {
4✔
2288
        c.mu.Lock()
4✔
2289
        defer c.mu.Unlock()
4✔
2290

4✔
2291
        if c.leaf.smap == nil {
4✔
2292
                return
×
2293
        }
×
2294
        n := c.leaf.smap[subj]
4✔
2295
        if n != 0 {
5✔
2296
                return
1✔
2297
        }
1✔
2298
        // Place into the map since it was not there.
2299
        c.leaf.smap[subj] = 1
3✔
2300
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2301
}
2302

2303
// Used to force remove a subject from the subject map.
2304
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2305
        c.mu.Lock()
1✔
2306
        defer c.mu.Unlock()
1✔
2307

1✔
2308
        if c.leaf.smap == nil {
1✔
2309
                return
×
2310
        }
×
2311
        n := c.leaf.smap[subj]
1✔
2312
        if n == 0 {
1✔
2313
                return
×
2314
        }
×
2315
        n--
1✔
2316
        if n == 0 {
2✔
2317
                // Remove is now zero
1✔
2318
                delete(c.leaf.smap, subj)
1✔
2319
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2320
        } else {
1✔
2321
                c.leaf.smap[subj] = n
×
2322
        }
×
2323
}
2324

2325
// Send the subscription interest change to the other side.
2326
// Lock should be held.
2327
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,385✔
2328
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,385✔
2329
        if c.isSpokeLeafNode() {
11,673✔
2330
                checkPerms := true
2,288✔
2331
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,626✔
2332
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,338✔
2333
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,338✔
2334
                                strings.HasPrefix(key, gwReplyPrefix) {
1,416✔
2335
                                checkPerms = false
78✔
2336
                        }
78✔
2337
                }
2338
                if checkPerms {
4,498✔
2339
                        var subject string
2,210✔
2340
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,702✔
2341
                                subject = key[:sep]
492✔
2342
                        } else {
2,210✔
2343
                                subject = key
1,718✔
2344
                        }
1,718✔
2345
                        if !c.canSubscribe(subject) {
2,210✔
2346
                                return
×
2347
                        }
×
2348
                }
2349
        }
2350
        // If we are here we can send over to the other side.
2351
        _b := [64]byte{}
9,385✔
2352
        b := bytes.NewBuffer(_b[:0])
9,385✔
2353
        c.writeLeafSub(b, key, n)
9,385✔
2354
        c.enqueueProto(b.Bytes())
9,385✔
2355
}
2356

2357
// Helper function to build the key.
2358
func keyFromSub(sub *subscription) string {
45,570✔
2359
        var sb strings.Builder
45,570✔
2360
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,570✔
2361
        sb.Write(sub.subject)
45,570✔
2362
        if sub.queue != nil {
49,356✔
2363
                // Just make the key subject spc group, e.g. 'foo bar'
3,786✔
2364
                sb.WriteByte(' ')
3,786✔
2365
                sb.Write(sub.queue)
3,786✔
2366
        }
3,786✔
2367
        return sb.String()
45,570✔
2368
}
2369

2370
const (
2371
        keyRoutedSub         = "R"
2372
        keyRoutedSubByte     = 'R'
2373
        keyRoutedLeafSub     = "L"
2374
        keyRoutedLeafSubByte = 'L'
2375
)
2376

2377
// Helper function to build the key that prevents collisions between normal
2378
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2379
// Keys will look like this:
2380
// "R foo"          -> plain routed sub on "foo"
2381
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2382
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2383
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2384
func keyFromSubWithOrigin(sub *subscription) string {
634,655✔
2385
        var sb strings.Builder
634,655✔
2386
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
634,655✔
2387
        leaf := len(sub.origin) > 0
634,655✔
2388
        if leaf {
651,507✔
2389
                sb.WriteByte(keyRoutedLeafSubByte)
16,852✔
2390
        } else {
634,655✔
2391
                sb.WriteByte(keyRoutedSubByte)
617,803✔
2392
        }
617,803✔
2393
        sb.WriteByte(' ')
634,655✔
2394
        sb.Write(sub.subject)
634,655✔
2395
        if sub.queue != nil {
657,885✔
2396
                sb.WriteByte(' ')
23,230✔
2397
                sb.Write(sub.queue)
23,230✔
2398
        }
23,230✔
2399
        if leaf {
651,507✔
2400
                sb.WriteByte(' ')
16,852✔
2401
                sb.Write(sub.origin)
16,852✔
2402
        }
16,852✔
2403
        return sb.String()
634,655✔
2404
}
2405

2406
// Lock should be held.
2407
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,462✔
2408
        if key == _EMPTY_ {
35,462✔
2409
                return
×
2410
        }
×
2411
        if n > 0 {
67,377✔
2412
                w.WriteString("LS+ " + key)
31,915✔
2413
                // Check for queue semantics, if found write n.
31,915✔
2414
                if strings.Contains(key, " ") {
34,222✔
2415
                        w.WriteString(" ")
2,307✔
2416
                        var b [12]byte
2,307✔
2417
                        var i = len(b)
2,307✔
2418
                        for l := n; l > 0; l /= 10 {
5,522✔
2419
                                i--
3,215✔
2420
                                b[i] = digits[l%10]
3,215✔
2421
                        }
3,215✔
2422
                        w.Write(b[i:])
2,307✔
2423
                        if c.trace {
2,307✔
2424
                                arg := fmt.Sprintf("%s %d", key, n)
×
2425
                                c.traceOutOp("LS+", []byte(arg))
×
2426
                        }
×
2427
                } else if c.trace {
29,804✔
2428
                        c.traceOutOp("LS+", []byte(key))
196✔
2429
                }
196✔
2430
        } else {
3,547✔
2431
                w.WriteString("LS- " + key)
3,547✔
2432
                if c.trace {
3,562✔
2433
                        c.traceOutOp("LS-", []byte(key))
15✔
2434
                }
15✔
2435
        }
2436
        w.WriteString(CR_LF)
35,462✔
2437
}
2438

2439
// processLeafSub will process an inbound sub request for the remote leaf node.
2440
func (c *client) processLeafSub(argo []byte) (err error) {
31,695✔
2441
        // Indicate activity.
31,695✔
2442
        c.in.subs++
31,695✔
2443

31,695✔
2444
        srv := c.srv
31,695✔
2445
        if srv == nil {
31,695✔
2446
                return nil
×
2447
        }
×
2448

2449
        // Copy so we do not reference a potentially large buffer
2450
        arg := make([]byte, len(argo))
31,695✔
2451
        copy(arg, argo)
31,695✔
2452

31,695✔
2453
        args := splitArg(arg)
31,695✔
2454
        sub := &subscription{client: c}
31,695✔
2455

31,695✔
2456
        delta := int32(1)
31,695✔
2457
        switch len(args) {
31,695✔
2458
        case 1:
29,471✔
2459
                sub.queue = nil
29,471✔
2460
        case 3:
2,224✔
2461
                sub.queue = args[1]
2,224✔
2462
                sub.qw = int32(parseSize(args[2]))
2,224✔
2463
                // TODO: (ik) We should have a non empty queue name and a queue
2,224✔
2464
                // weight >= 1. For 2.11, we may want to return an error if that
2,224✔
2465
                // is not the case, but for now just overwrite `delta` if queue
2,224✔
2466
                // weight is greater than 1 (it is possible after a reconnect/
2,224✔
2467
                // server restart to receive a queue weight > 1 for a new sub).
2,224✔
2468
                if sub.qw > 1 {
3,838✔
2469
                        delta = sub.qw
1,614✔
2470
                }
1,614✔
2471
        default:
×
2472
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2473
        }
2474
        sub.subject = args[0]
31,695✔
2475

31,695✔
2476
        c.mu.Lock()
31,695✔
2477
        if c.isClosed() {
31,706✔
2478
                c.mu.Unlock()
11✔
2479
                return nil
11✔
2480
        }
11✔
2481

2482
        acc := c.acc
31,684✔
2483
        // Check if we have a loop.
31,684✔
2484
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,684✔
2485

31,684✔
2486
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,690✔
2487
                c.mu.Unlock()
6✔
2488
                c.handleLeafNodeLoop(true)
6✔
2489
                return nil
6✔
2490
        }
6✔
2491

2492
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2493
        checkPerms := true
31,678✔
2494
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,509✔
2495
                if ldsPrefix ||
28,831✔
2496
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,831✔
2497
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,775✔
2498
                        checkPerms = false
1,944✔
2499
                }
1,944✔
2500
        }
2501

2502
        // If we are a hub check that we can publish to this subject.
2503
        if checkPerms {
61,412✔
2504
                subj := string(sub.subject)
29,734✔
2505
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,035✔
2506
                        c.mu.Unlock()
301✔
2507
                        c.leafSubPermViolation(sub.subject)
301✔
2508
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
301✔
2509
                        return nil
301✔
2510
                }
301✔
2511
        }
2512

2513
        // Check if we have a maximum on the number of subscriptions.
2514
        if c.subsAtLimit() {
31,385✔
2515
                c.mu.Unlock()
8✔
2516
                c.maxSubsExceeded()
8✔
2517
                return nil
8✔
2518
        }
8✔
2519

2520
        // If we have an origin cluster associated mark that in the sub.
2521
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,710✔
2522
                sub.origin = []byte(rc)
28,341✔
2523
        }
28,341✔
2524

2525
        // Like Routes, we store local subs by account and subject and optionally queue name.
2526
        // If we have a queue it will have a trailing weight which we do not want.
2527
        if sub.queue != nil {
33,328✔
2528
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,959✔
2529
        } else {
31,369✔
2530
                sub.sid = arg
29,410✔
2531
        }
29,410✔
2532
        key := bytesToString(sub.sid)
31,369✔
2533
        osub := c.subs[key]
31,369✔
2534
        if osub == nil {
61,235✔
2535
                c.subs[key] = sub
29,866✔
2536
                // Now place into the account sl.
29,866✔
2537
                if err := acc.sl.Insert(sub); err != nil {
29,866✔
2538
                        delete(c.subs, key)
×
2539
                        c.mu.Unlock()
×
2540
                        c.Errorf("Could not insert subscription: %v", err)
×
2541
                        c.sendErr("Invalid Subscription")
×
2542
                        return nil
×
2543
                }
×
2544
        } else if sub.queue != nil {
3,005✔
2545
                // For a queue we need to update the weight.
1,502✔
2546
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,502✔
2547
                atomic.StoreInt32(&osub.qw, sub.qw)
1,502✔
2548
                acc.sl.UpdateRemoteQSub(osub)
1,502✔
2549
        }
1,502✔
2550
        spoke := c.isSpokeLeafNode()
31,369✔
2551
        c.mu.Unlock()
31,369✔
2552

31,369✔
2553
        // Only add in shadow subs if a new sub or qsub.
31,369✔
2554
        if osub == nil {
61,235✔
2555
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,866✔
2556
                        c.Errorf(err.Error())
×
2557
                }
×
2558
        }
2559

2560
        // If we are not solicited, treat leaf node subscriptions similar to a
2561
        // client subscription, meaning we forward them to routes, gateways and
2562
        // other leaf nodes as needed.
2563
        if !spoke {
42,379✔
2564
                // If we are routing add to the route map for the associated account.
11,010✔
2565
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,010✔
2566
                if srv.gateway.enabled {
12,529✔
2567
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,519✔
2568
                }
1,519✔
2569
        }
2570
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2571
        // and non-solicited state in this call so we will do the right thing.
2572
        acc.updateLeafNodes(sub, delta)
31,369✔
2573

31,369✔
2574
        return nil
31,369✔
2575
}
2576

2577
// If the leafnode is a solicited, set the connect delay based on default
2578
// or private option (for tests). Sends the error to the other side, log and
2579
// close the connection.
2580
func (c *client) handleLeafNodeLoop(sendErr bool) {
15✔
2581
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
15✔
2582
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
15✔
2583
        if sendErr {
23✔
2584
                c.sendErr(errTxt)
8✔
2585
        }
8✔
2586

2587
        c.Errorf(errTxt)
15✔
2588
        // If we are here with "sendErr" false, it means that this is the server
15✔
2589
        // that received the error. The other side will have closed the connection,
15✔
2590
        // but does not hurt to close here too.
15✔
2591
        c.closeConnection(ProtocolViolation)
15✔
2592
}
2593

2594
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2595
func (c *client) processLeafUnsub(arg []byte) error {
3,310✔
2596
        // Indicate any activity, so pub and sub or unsubs.
3,310✔
2597
        c.in.subs++
3,310✔
2598

3,310✔
2599
        acc := c.acc
3,310✔
2600
        srv := c.srv
3,310✔
2601

3,310✔
2602
        c.mu.Lock()
3,310✔
2603
        if c.isClosed() {
3,344✔
2604
                c.mu.Unlock()
34✔
2605
                return nil
34✔
2606
        }
34✔
2607

2608
        spoke := c.isSpokeLeafNode()
3,276✔
2609
        // We store local subs by account and subject and optionally queue name.
3,276✔
2610
        // LS- will have the arg exactly as the key.
3,276✔
2611
        sub, ok := c.subs[string(arg)]
3,276✔
2612
        if !ok {
3,285✔
2613
                // If not found, don't try to update routes/gws/leaf nodes.
9✔
2614
                c.mu.Unlock()
9✔
2615
                return nil
9✔
2616
        }
9✔
2617
        delta := int32(1)
3,267✔
2618
        if len(sub.queue) > 0 {
3,688✔
2619
                delta = sub.qw
421✔
2620
        }
421✔
2621
        c.mu.Unlock()
3,267✔
2622

3,267✔
2623
        c.unsubscribe(acc, sub, true, true)
3,267✔
2624
        if !spoke {
4,321✔
2625
                // If we are routing subtract from the route map for the associated account.
1,054✔
2626
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,054✔
2627
                // Gateways
1,054✔
2628
                if srv.gateway.enabled {
1,374✔
2629
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
320✔
2630
                }
320✔
2631
        }
2632
        // Now check on leafnode updates for other leaf nodes.
2633
        acc.updateLeafNodes(sub, -delta)
3,267✔
2634
        return nil
3,267✔
2635
}
2636

2637
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
483✔
2638
        // Unroll splitArgs to avoid runtime/heap issues
483✔
2639
        a := [MAX_MSG_ARGS][]byte{}
483✔
2640
        args := a[:0]
483✔
2641
        start := -1
483✔
2642
        for i, b := range arg {
31,888✔
2643
                switch b {
31,405✔
2644
                case ' ', '\t', '\r', '\n':
1,380✔
2645
                        if start >= 0 {
2,760✔
2646
                                args = append(args, arg[start:i])
1,380✔
2647
                                start = -1
1,380✔
2648
                        }
1,380✔
2649
                default:
30,025✔
2650
                        if start < 0 {
31,888✔
2651
                                start = i
1,863✔
2652
                        }
1,863✔
2653
                }
2654
        }
2655
        if start >= 0 {
966✔
2656
                args = append(args, arg[start:])
483✔
2657
        }
483✔
2658

2659
        c.pa.arg = arg
483✔
2660
        switch len(args) {
483✔
2661
        case 0, 1, 2:
×
2662
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2663
        case 3:
87✔
2664
                c.pa.reply = nil
87✔
2665
                c.pa.queues = nil
87✔
2666
                c.pa.hdb = args[1]
87✔
2667
                c.pa.hdr = parseSize(args[1])
87✔
2668
                c.pa.szb = args[2]
87✔
2669
                c.pa.size = parseSize(args[2])
87✔
2670
        case 4:
382✔
2671
                c.pa.reply = args[1]
382✔
2672
                c.pa.queues = nil
382✔
2673
                c.pa.hdb = args[2]
382✔
2674
                c.pa.hdr = parseSize(args[2])
382✔
2675
                c.pa.szb = args[3]
382✔
2676
                c.pa.size = parseSize(args[3])
382✔
2677
        default:
14✔
2678
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2679
                if len(args[1]) != 1 {
14✔
2680
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2681
                }
×
2682
                switch args[1][0] {
14✔
2683
                case '+':
4✔
2684
                        c.pa.reply = args[2]
4✔
2685
                case '|':
10✔
2686
                        c.pa.reply = nil
10✔
2687
                default:
×
2688
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2689
                }
2690
                // Grab header size.
2691
                c.pa.hdb = args[len(args)-2]
14✔
2692
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2693

14✔
2694
                // Grab size.
14✔
2695
                c.pa.szb = args[len(args)-1]
14✔
2696
                c.pa.size = parseSize(c.pa.szb)
14✔
2697

14✔
2698
                // Grab queue names.
14✔
2699
                if c.pa.reply != nil {
18✔
2700
                        c.pa.queues = args[3 : len(args)-2]
4✔
2701
                } else {
14✔
2702
                        c.pa.queues = args[2 : len(args)-2]
10✔
2703
                }
10✔
2704
        }
2705
        if c.pa.hdr < 0 {
483✔
2706
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2707
        }
×
2708
        if c.pa.size < 0 {
483✔
2709
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2710
        }
×
2711
        if c.pa.hdr > c.pa.size {
483✔
2712
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2713
        }
×
2714

2715
        // Common ones processed after check for arg length
2716
        c.pa.subject = args[0]
483✔
2717

483✔
2718
        return nil
483✔
2719
}
2720

2721
func (c *client) processLeafMsgArgs(arg []byte) error {
112,008✔
2722
        // Unroll splitArgs to avoid runtime/heap issues
112,008✔
2723
        a := [MAX_MSG_ARGS][]byte{}
112,008✔
2724
        args := a[:0]
112,008✔
2725
        start := -1
112,008✔
2726
        for i, b := range arg {
3,398,675✔
2727
                switch b {
3,286,667✔
2728
                case ' ', '\t', '\r', '\n':
159,355✔
2729
                        if start >= 0 {
318,710✔
2730
                                args = append(args, arg[start:i])
159,355✔
2731
                                start = -1
159,355✔
2732
                        }
159,355✔
2733
                default:
3,127,312✔
2734
                        if start < 0 {
3,398,675✔
2735
                                start = i
271,363✔
2736
                        }
271,363✔
2737
                }
2738
        }
2739
        if start >= 0 {
224,016✔
2740
                args = append(args, arg[start:])
112,008✔
2741
        }
112,008✔
2742

2743
        c.pa.arg = arg
112,008✔
2744
        switch len(args) {
112,008✔
2745
        case 0, 1:
×
2746
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2747
        case 2:
87,380✔
2748
                c.pa.reply = nil
87,380✔
2749
                c.pa.queues = nil
87,380✔
2750
                c.pa.szb = args[1]
87,380✔
2751
                c.pa.size = parseSize(args[1])
87,380✔
2752
        case 3:
2,070✔
2753
                c.pa.reply = args[1]
2,070✔
2754
                c.pa.queues = nil
2,070✔
2755
                c.pa.szb = args[2]
2,070✔
2756
                c.pa.size = parseSize(args[2])
2,070✔
2757
        default:
22,558✔
2758
                // args[1] is our reply indicator. Should be + or | normally.
22,558✔
2759
                if len(args[1]) != 1 {
22,558✔
2760
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2761
                }
×
2762
                switch args[1][0] {
22,558✔
2763
                case '+':
161✔
2764
                        c.pa.reply = args[2]
161✔
2765
                case '|':
22,397✔
2766
                        c.pa.reply = nil
22,397✔
2767
                default:
×
2768
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2769
                }
2770
                // Grab size.
2771
                c.pa.szb = args[len(args)-1]
22,558✔
2772
                c.pa.size = parseSize(c.pa.szb)
22,558✔
2773

22,558✔
2774
                // Grab queue names.
22,558✔
2775
                if c.pa.reply != nil {
22,719✔
2776
                        c.pa.queues = args[3 : len(args)-1]
161✔
2777
                } else {
22,558✔
2778
                        c.pa.queues = args[2 : len(args)-1]
22,397✔
2779
                }
22,397✔
2780
        }
2781
        if c.pa.size < 0 {
112,008✔
2782
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2783
        }
×
2784

2785
        // Common ones processed after check for arg length
2786
        c.pa.subject = args[0]
112,008✔
2787

112,008✔
2788
        return nil
112,008✔
2789
}
2790

2791
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2792
func (c *client) processInboundLeafMsg(msg []byte) {
110,417✔
2793
        // Update statistics
110,417✔
2794
        // The msg includes the CR_LF, so pull back out for accounting.
110,417✔
2795
        c.in.msgs++
110,417✔
2796
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
110,417✔
2797

110,417✔
2798
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
110,417✔
2799

110,417✔
2800
        // Mostly under testing scenarios.
110,417✔
2801
        if srv == nil || acc == nil {
110,419✔
2802
                return
2✔
2803
        }
2✔
2804

2805
        // Match the subscriptions. We will use our own L1 map if
2806
        // it's still valid, avoiding contention on the shared sublist.
2807
        var r *SublistResult
110,415✔
2808
        var ok bool
110,415✔
2809

110,415✔
2810
        genid := atomic.LoadUint64(&c.acc.sl.genid)
110,415✔
2811
        if genid == c.in.genid && c.in.results != nil {
218,479✔
2812
                r, ok = c.in.results[subject]
108,064✔
2813
        } else {
110,415✔
2814
                // Reset our L1 completely.
2,351✔
2815
                c.in.results = make(map[string]*SublistResult)
2,351✔
2816
                c.in.genid = genid
2,351✔
2817
        }
2,351✔
2818

2819
        // Go back to the sublist data structure.
2820
        if !ok {
195,022✔
2821
                r = c.acc.sl.Match(subject)
84,607✔
2822
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
84,607✔
2823
                if len(c.in.results) >= maxResultCacheSize {
86,997✔
2824
                        n := 0
2,390✔
2825
                        for subj := range c.in.results {
81,260✔
2826
                                delete(c.in.results, subj)
78,870✔
2827
                                if n++; n > pruneSize {
81,260✔
2828
                                        break
2,390✔
2829
                                }
2830
                        }
2831
                }
2832
                // Then add the new cache entry.
2833
                c.in.results[subject] = r
84,607✔
2834
        }
2835

2836
        // Collect queue names if needed.
2837
        var qnames [][]byte
110,415✔
2838

110,415✔
2839
        // Check for no interest, short circuit if so.
110,415✔
2840
        // This is the fanout scale.
110,415✔
2841
        if len(r.psubs)+len(r.qsubs) > 0 {
220,574✔
2842
                flag := pmrNoFlag
110,159✔
2843
                // If we have queue subs in this cluster, then if we run in gateway
110,159✔
2844
                // mode and the remote gateways have queue subs, then we need to
110,159✔
2845
                // collect the queue groups this message was sent to so that we
110,159✔
2846
                // exclude them when sending to gateways.
110,159✔
2847
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
110,159✔
2848
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
122,494✔
2849
                        flag |= pmrCollectQueueNames
12,335✔
2850
                }
12,335✔
2851
                // If this is a mapped subject that means the mapped interest
2852
                // is what got us here, but this might not have a queue designation
2853
                // If that is the case, make sure we ignore to process local queue subscribers.
2854
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
110,461✔
2855
                        flag |= pmrIgnoreEmptyQueueFilter
302✔
2856
                }
302✔
2857
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
110,159✔
2858
        }
2859

2860
        // Now deal with gateways
2861
        if c.srv.gateway.enabled {
123,723✔
2862
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,308✔
2863
        }
13,308✔
2864
}
2865

2866
// Handles a subscription permission violation.
2867
// See leafPermViolation() for details.
2868
func (c *client) leafSubPermViolation(subj []byte) {
301✔
2869
        c.leafPermViolation(false, subj)
301✔
2870
}
301✔
2871

2872
// Common function to process publish or subscribe leafnode permission violation.
2873
// Sends the permission violation error to the remote, logs it and closes the connection.
2874
// If this is from a server soliciting, the reconnection will be delayed.
2875
func (c *client) leafPermViolation(pub bool, subj []byte) {
301✔
2876
        if c.isSpokeLeafNode() {
602✔
2877
                // For spokes these are no-ops since the hub server told us our permissions.
301✔
2878
                // We just need to not send these over to the other side since we will get cutoff.
301✔
2879
                return
301✔
2880
        }
301✔
2881
        // FIXME(dlc) ?
2882
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2883
        var action string
×
2884
        if pub {
×
2885
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2886
                action = "Publish"
×
2887
        } else {
×
2888
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2889
                action = "Subscription"
×
2890
        }
×
2891
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2892
        // TODO: add a new close reason that is more appropriate?
×
2893
        c.closeConnection(ProtocolViolation)
×
2894
}
2895

2896
// Invoked from generic processErr() for LEAF connections.
2897
func (c *client) leafProcessErr(errStr string) {
32✔
2898
        // Check if we got a cluster name collision.
32✔
2899
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
36✔
2900
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2901
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2902
                return
4✔
2903
        }
4✔
2904

2905
        // We will look for Loop detected error coming from the other side.
2906
        // If we solicit, set the connect delay.
2907
        if !strings.Contains(errStr, "Loop detected") {
49✔
2908
                return
21✔
2909
        }
21✔
2910
        c.handleLeafNodeLoop(false)
7✔
2911
}
2912

2913
// If this leaf connection solicits, sets the connect delay to the given value,
2914
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2915
// Returns the connection's account name and delay.
2916
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
19✔
2917
        c.mu.Lock()
19✔
2918
        if c.isSolicitedLeafNode() {
31✔
2919
                if s := c.srv; s != nil {
24✔
2920
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
17✔
2921
                                delay = srvdelay
5✔
2922
                        }
5✔
2923
                }
2924
                c.leaf.remote.setConnectDelay(delay)
12✔
2925
        }
2926
        accName := c.acc.Name
19✔
2927
        c.mu.Unlock()
19✔
2928
        return accName, delay
19✔
2929
}
2930

2931
// For the given remote Leafnode configuration, this function returns
2932
// if TLS is required, and if so, will return a clone of the TLS Config
2933
// (since some fields will be changed during handshake), the TLS server
2934
// name that is remembered, and the TLS timeout.
2935
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,827✔
2936
        var (
1,827✔
2937
                tlsConfig  *tls.Config
1,827✔
2938
                tlsName    string
1,827✔
2939
                tlsTimeout float64
1,827✔
2940
        )
1,827✔
2941

1,827✔
2942
        remote.RLock()
1,827✔
2943
        defer remote.RUnlock()
1,827✔
2944

1,827✔
2945
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,827✔
2946
        if tlsRequired {
1,908✔
2947
                if remote.TLSConfig != nil {
133✔
2948
                        tlsConfig = remote.TLSConfig.Clone()
52✔
2949
                } else {
81✔
2950
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
29✔
2951
                }
29✔
2952
                tlsName = remote.tlsName
81✔
2953
                tlsTimeout = remote.TLSTimeout
81✔
2954
                if tlsTimeout == 0 {
127✔
2955
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
46✔
2956
                }
46✔
2957
        }
2958

2959
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,827✔
2960
}
2961

2962
// Initiates the LeafNode Websocket connection by:
2963
// - doing the TLS handshake if needed
2964
// - sending the HTTP request
2965
// - waiting for the HTTP response
2966
//
2967
// Since some bufio reader is used to consume the HTTP response, this function
2968
// returns the slice of buffered bytes (if any) so that the readLoop that will
2969
// be started after that consume those first before reading from the socket.
2970
// The boolean
2971
//
2972
// Lock held on entry.
2973
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
2974
        remote.RLock()
43✔
2975
        compress := remote.Websocket.Compression
43✔
2976
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
2977
        noMasking := remote.Websocket.NoMasking
43✔
2978
        infoTimeout := remote.FirstInfoTimeout
43✔
2979
        remote.RUnlock()
43✔
2980
        // Will do the client-side TLS handshake if needed.
43✔
2981
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
2982
        if err != nil {
47✔
2983
                // 0 will indicate that the connection was already closed
4✔
2984
                return nil, 0, err
4✔
2985
        }
4✔
2986

2987
        // For http request, we need the passed URL to contain either http or https scheme.
2988
        scheme := "http"
39✔
2989
        if tlsRequired {
47✔
2990
                scheme = "https"
8✔
2991
        }
8✔
2992
        // We will use the `/leafnode` path to tell the accepting WS server that it should
2993
        // create a LEAF connection, not a CLIENT.
2994
        // In case we use the user's URL path in the future, make sure we append the user's
2995
        // path to our `/leafnode` path.
2996
        lpath := leafNodeWSPath
39✔
2997
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
2998
                if curPath[0] == '/' {
42✔
2999
                        curPath = curPath[1:]
21✔
3000
                }
21✔
3001
                lpath = path.Join(curPath, lpath)
21✔
3002
        } else {
18✔
3003
                lpath = lpath[1:]
18✔
3004
        }
18✔
3005
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3006
        u, _ := url.Parse(ustr)
39✔
3007
        req := &http.Request{
39✔
3008
                Method:     "GET",
39✔
3009
                URL:        u,
39✔
3010
                Proto:      "HTTP/1.1",
39✔
3011
                ProtoMajor: 1,
39✔
3012
                ProtoMinor: 1,
39✔
3013
                Header:     make(http.Header),
39✔
3014
                Host:       u.Host,
39✔
3015
        }
39✔
3016
        wsKey, err := wsMakeChallengeKey()
39✔
3017
        if err != nil {
39✔
3018
                return nil, WriteError, err
×
3019
        }
×
3020

3021
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3022
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3023
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3024
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3025
        if compress {
48✔
3026
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3027
        }
9✔
3028
        if noMasking {
49✔
3029
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3030
        }
10✔
3031
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3032
        if err := req.Write(c.nc); err != nil {
39✔
3033
                return nil, WriteError, err
×
3034
        }
×
3035

3036
        var resp *http.Response
39✔
3037

39✔
3038
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3039
        resp, err = http.ReadResponse(br, req)
39✔
3040
        if err == nil &&
39✔
3041
                (resp.StatusCode != 101 ||
39✔
3042
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3043
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3044
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3045

1✔
3046
                err = fmt.Errorf("invalid websocket connection")
1✔
3047
        }
1✔
3048
        // Check compression extension...
3049
        if err == nil && c.ws.compress {
48✔
3050
                // Check that not only permessage-deflate extension is present, but that
9✔
3051
                // we also have server and client no context take over.
9✔
3052
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3053

9✔
3054
                // If server does not support compression, then simply disable it in our side.
9✔
3055
                if !srvCompress {
13✔
3056
                        c.ws.compress = false
4✔
3057
                } else if !noCtxTakeover {
9✔
3058
                        err = fmt.Errorf("compression negotiation error")
×
3059
                }
×
3060
        }
3061
        // Same for no masking...
3062
        if err == nil && noMasking {
49✔
3063
                // Check if server accepts no masking
10✔
3064
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3065
                        // Nope, need to mask our writes as any client would do.
1✔
3066
                        c.ws.maskwrite = true
1✔
3067
                }
1✔
3068
        }
3069
        if resp != nil {
67✔
3070
                resp.Body.Close()
28✔
3071
        }
28✔
3072
        if err != nil {
51✔
3073
                return nil, ReadError, err
12✔
3074
        }
12✔
3075
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3076

27✔
3077
        var preBuf []byte
27✔
3078
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3079
        if n := br.Buffered(); n != 0 {
27✔
3080
                preBuf, _ = br.Peek(n)
×
3081
        }
×
3082
        return preBuf, 0, nil
27✔
3083
}
3084

3085
const connectProcessTimeout = 2 * time.Second
3086

3087
// This is invoked for remote LEAF remote connections after processing the INFO
3088
// protocol.
3089
func (s *Server) leafNodeResumeConnectProcess(c *client) {
638✔
3090
        clusterName := s.ClusterName()
638✔
3091

638✔
3092
        c.mu.Lock()
638✔
3093
        if c.isClosed() {
638✔
3094
                c.mu.Unlock()
×
3095
                return
×
3096
        }
×
3097
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
640✔
3098
                c.mu.Unlock()
2✔
3099
                c.closeConnection(WriteError)
2✔
3100
                return
2✔
3101
        }
2✔
3102

3103
        // Spin up the write loop.
3104
        s.startGoRoutine(func() { c.writeLoop() })
1,272✔
3105

3106
        // timeout leafNodeFinishConnectProcess
3107
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
636✔
3108
                c.mu.Lock()
×
3109
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3110
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3111
                        c.mu.Unlock()
×
3112
                        return
×
3113
                }
×
3114
                clearTimer(&c.ping.tmr)
×
3115
                closed := c.isClosed()
×
3116
                c.mu.Unlock()
×
3117
                if !closed {
×
3118
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3119
                        c.closeConnection(StaleConnection)
×
3120
                }
×
3121
        })
3122
        c.mu.Unlock()
636✔
3123
        c.Debugf("Remote leafnode connect msg sent")
636✔
3124
}
3125

3126
// This is invoked for remote LEAF connections after processing the INFO
3127
// protocol and leafNodeResumeConnectProcess.
3128
// This will send LS+ the CONNECT protocol and register the leaf node.
3129
func (s *Server) leafNodeFinishConnectProcess(c *client) {
616✔
3130
        c.mu.Lock()
616✔
3131
        if !c.flags.setIfNotSet(connectProcessFinished) {
616✔
3132
                c.mu.Unlock()
×
3133
                return
×
3134
        }
×
3135
        if c.isClosed() {
616✔
3136
                c.mu.Unlock()
×
3137
                s.removeLeafNodeConnection(c)
×
3138
                return
×
3139
        }
×
3140
        remote := c.leaf.remote
616✔
3141
        // Check if we will need to send the system connect event.
616✔
3142
        remote.RLock()
616✔
3143
        sendSysConnectEvent := remote.Hub
616✔
3144
        remote.RUnlock()
616✔
3145

616✔
3146
        // Capture account before releasing lock
616✔
3147
        acc := c.acc
616✔
3148
        // cancel connectProcessTimeout
616✔
3149
        clearTimer(&c.ping.tmr)
616✔
3150
        c.mu.Unlock()
616✔
3151

616✔
3152
        // Make sure we register with the account here.
616✔
3153
        if err := c.registerWithAccount(acc); err != nil {
618✔
3154
                if err == ErrTooManyAccountConnections {
2✔
3155
                        c.maxAccountConnExceeded()
×
3156
                        return
×
3157
                } else if err == ErrLeafNodeLoop {
4✔
3158
                        c.handleLeafNodeLoop(true)
2✔
3159
                        return
2✔
3160
                }
2✔
3161
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3162
                c.closeConnection(ProtocolViolation)
×
3163
                return
×
3164
        }
3165
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
614✔
3166
        s.initLeafNodeSmapAndSendSubs(c)
614✔
3167
        if sendSysConnectEvent {
620✔
3168
                s.sendLeafNodeConnect(acc)
6✔
3169
        }
6✔
3170

3171
        // The above functions are not atomically under the client
3172
        // lock doing those operations. It is possible - since we
3173
        // have started the read/write loops - that the connection
3174
        // is closed before or in between. This would leave the
3175
        // closed LN connection possible registered with the account
3176
        // and/or the server's leafs map. So check if connection
3177
        // is closed, and if so, manually cleanup.
3178
        c.mu.Lock()
614✔
3179
        closed := c.isClosed()
614✔
3180
        if !closed {
1,228✔
3181
                c.setFirstPingTimer()
614✔
3182
        }
614✔
3183
        c.mu.Unlock()
614✔
3184
        if closed {
614✔
3185
                s.removeLeafNodeConnection(c)
×
3186
                if prev := acc.removeClient(c); prev == 1 {
×
3187
                        s.decActiveAccounts()
×
3188
                }
×
3189
        }
3190
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc