• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 13363030201

15 Feb 2025 01:54AM UTC coverage: 84.426% (-1.1%) from 85.509%
13363030201

push

github

web-flow
NRG: Invalidate pending append entries cache (#6513)

The `n.pae` is an in-memory cache of pending but not yet applied
entries. When applying commits we can pull from this cache so we don't
need to pull them from disk for example. However, the cache has a
bounded size. So if the cache would be fully filled and we'd store a
different entry at an index that was cached, we'd apply the wrong
(cached) entry.

If we get an entry that we can't cache because it's full, we can simply
drop the entry from the cache if it exists. If an entry at this index
doesn't exist it's a noop, but if it did exist then it clears up room in
the cache for the next entries to be stored.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

68197 of 80777 relevant lines covered (84.43%)

863883.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.47
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
1,956✔
120
        return c.kind == LEAF && c.leaf.remote != nil
1,956✔
121
}
1,956✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
15,657,200✔
126
        return c.kind == LEAF && c.leaf.isSpoke
15,657,200✔
127
}
15,657,200✔
128

129
func (c *client) isHubLeafNode() bool {
18,202✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
18,202✔
131
}
18,202✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
540✔
135
        sysAccName := _EMPTY_
540✔
136
        sAcc := s.SystemAccount()
540✔
137
        if sAcc != nil {
1,057✔
138
                sysAccName = sAcc.Name
517✔
139
        }
517✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,218✔
141
                s.mu.Lock()
678✔
142
                remote := newLeafNodeCfg(r)
678✔
143
                creds := remote.Credentials
678✔
144
                accName := remote.LocalAccount
678✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
678✔
146
                // Print notice if
678✔
147
                if isSysAccRemote {
774✔
148
                        if len(remote.DenyExports) > 0 {
97✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
97✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
678✔
156
                if creds != _EMPTY_ {
725✔
157
                        contents, err := os.ReadFile(creds)
47✔
158
                        defer wipeSlice(contents)
47✔
159
                        if err != nil {
47✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
47✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
47✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
47✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
51✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
43✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
49✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
678✔
178
        }
179
        for _, r := range remotes {
1,218✔
180
                remote := addRemote(r, r.LocalAccount == sysAccName)
678✔
181
                s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,356✔
182
        }
183
}
184

185
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
1,783✔
186
        for _, ri := range s.getOpts().LeafNode.Remotes {
3,935✔
187
                // FIXME(dlc) - What about auth changes?
2,152✔
188
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
3,935✔
189
                        return true
1,783✔
190
                }
1,783✔
191
        }
192
        return false
×
193
}
194

195
// Ensure that leafnode is properly configured.
196
func validateLeafNode(o *Options) error {
6,506✔
197
        if err := validateLeafNodeAuthOptions(o); err != nil {
6,508✔
198
                return err
2✔
199
        }
2✔
200

201
        // Users can bind to any local account, if its empty we will assume the $G account.
202
        for _, r := range o.LeafNode.Remotes {
7,206✔
203
                if r.LocalAccount == _EMPTY_ {
1,132✔
204
                        r.LocalAccount = globalAccountName
430✔
205
                }
430✔
206
        }
207

208
        // In local config mode, check that leafnode configuration refers to accounts that exist.
209
        if len(o.TrustedOperators) == 0 {
12,707✔
210
                accNames := map[string]struct{}{}
6,203✔
211
                for _, a := range o.Accounts {
11,397✔
212
                        accNames[a.Name] = struct{}{}
5,194✔
213
                }
5,194✔
214
                // global account is always created
215
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
6,203✔
216
                // in the context of leaf nodes, empty account means global account
6,203✔
217
                accNames[_EMPTY_] = struct{}{}
6,203✔
218
                // system account either exists or, if not disabled, will be created
6,203✔
219
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
10,861✔
220
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
4,658✔
221
                }
4,658✔
222
                checkAccountExists := func(accName string, cfgType string) error {
13,113✔
223
                        if _, ok := accNames[accName]; !ok {
6,912✔
224
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
225
                        }
2✔
226
                        return nil
6,908✔
227
                }
228
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
6,204✔
229
                        return err
1✔
230
                }
1✔
231
                for _, lu := range o.LeafNode.Users {
6,211✔
232
                        if lu.Account == nil { // means global account
12✔
233
                                continue
3✔
234
                        }
235
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
6✔
236
                                return err
×
237
                        }
×
238
                }
239
                for _, r := range o.LeafNode.Remotes {
6,903✔
240
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
702✔
241
                                return err
1✔
242
                        }
1✔
243
                }
244
        } else {
301✔
245
                if len(o.LeafNode.Users) != 0 {
302✔
246
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
247
                }
1✔
248
                for _, r := range o.LeafNode.Remotes {
301✔
249
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
250
                                return fmt.Errorf(
1✔
251
                                        "operator mode requires account nkeys in remotes. " +
1✔
252
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
253
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
254
                        }
1✔
255
                }
256
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
300✔
257
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
258
                }
1✔
259
        }
260

261
        // Validate compression settings
262
        if o.LeafNode.Compression.Mode != _EMPTY_ {
9,877✔
263
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,383✔
264
                        return err
5✔
265
                }
5✔
266
        }
267

268
        // If a remote has a websocket scheme, all need to have it.
269
        for _, rcfg := range o.LeafNode.Remotes {
7,194✔
270
                if len(rcfg.URLs) >= 2 {
912✔
271
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
212✔
272
                        for i := 1; i < len(rcfg.URLs); i++ {
669✔
273
                                u := rcfg.URLs[i]
457✔
274
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
464✔
275
                                        ok = false
7✔
276
                                        break
7✔
277
                                }
278
                        }
279
                        if !ok {
219✔
280
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
281
                        }
7✔
282
                }
283
                // Validate compression settings
284
                if rcfg.Compression.Mode != _EMPTY_ {
1,386✔
285
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
698✔
286
                                return err
5✔
287
                        }
5✔
288
                }
289
        }
290

291
        if o.LeafNode.Port == 0 {
10,184✔
292
                return nil
3,702✔
293
        }
3,702✔
294

295
        // If MinVersion is defined, check that it is valid.
296
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
2,784✔
297
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
298
                        return err
2✔
299
                }
2✔
300
        }
301

302
        // The checks below will be done only when detecting that we are configured
303
        // with gateways. So if an option validation needs to be done regardless,
304
        // it MUST be done before this point!
305

306
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
4,891✔
307
                return nil
2,113✔
308
        }
2,113✔
309
        // If we are here we have both leaf nodes and gateways defined, make sure there
310
        // is a system account defined.
311
        if o.SystemAccount == _EMPTY_ {
666✔
312
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
313
        }
1✔
314
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
664✔
315
                return fmt.Errorf("leafnode: %v", err)
×
316
        }
×
317
        return nil
664✔
318
}
319

320
func checkLeafMinVersionConfig(mv string) error {
8✔
321
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
322
                if err != nil {
6✔
323
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
324
                } else {
4✔
325
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
326
                }
2✔
327
        }
328
        return nil
4✔
329
}
330

331
// Used to validate user names in LeafNode configuration.
332
// - rejects mix of single and multiple users.
333
// - rejects duplicate user names.
334
func validateLeafNodeAuthOptions(o *Options) error {
6,547✔
335
        if len(o.LeafNode.Users) == 0 {
13,075✔
336
                return nil
6,528✔
337
        }
6,528✔
338
        if o.LeafNode.Username != _EMPTY_ {
21✔
339
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
340
        }
2✔
341
        if o.LeafNode.Nkey != _EMPTY_ {
17✔
342
                return fmt.Errorf("can not have a single nkey and a users array")
×
343
        }
×
344
        users := map[string]struct{}{}
17✔
345
        for _, u := range o.LeafNode.Users {
40✔
346
                if _, exists := users[u.Username]; exists {
25✔
347
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
348
                }
2✔
349
                users[u.Username] = struct{}{}
21✔
350
        }
351
        return nil
15✔
352
}
353

354
// Update remote LeafNode TLS configurations after a config reload.
355
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
10✔
356
        max := len(opts.LeafNode.Remotes)
10✔
357
        if max == 0 {
10✔
358
                return
×
359
        }
×
360

361
        s.mu.RLock()
10✔
362
        defer s.mu.RUnlock()
10✔
363

10✔
364
        // Changes in the list of remote leaf nodes is not supported.
10✔
365
        // However, make sure that we don't go over the arrays.
10✔
366
        if len(s.leafRemoteCfgs) < max {
10✔
367
                max = len(s.leafRemoteCfgs)
×
368
        }
×
369
        for i := 0; i < max; i++ {
20✔
370
                ro := opts.LeafNode.Remotes[i]
10✔
371
                cfg := s.leafRemoteCfgs[i]
10✔
372
                if ro.TLSConfig != nil {
12✔
373
                        cfg.Lock()
2✔
374
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
375
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
376
                        cfg.Unlock()
2✔
377
                }
2✔
378
        }
379
}
380

381
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
226✔
382
        delay := s.getOpts().LeafNode.ReconnectInterval
226✔
383
        select {
226✔
384
        case <-time.After(delay):
172✔
385
        case <-s.quitCh:
54✔
386
                s.grWG.Done()
54✔
387
                return
54✔
388
        }
389
        s.connectToRemoteLeafNode(remote, false)
172✔
390
}
391

392
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
393
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
678✔
394
        cfg := &leafNodeCfg{
678✔
395
                RemoteLeafOpts: remote,
678✔
396
                urls:           make([]*url.URL, 0, len(remote.URLs)),
678✔
397
        }
678✔
398
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
686✔
399
                perms := &Permissions{}
8✔
400
                if len(remote.DenyExports) > 0 {
16✔
401
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
402
                }
8✔
403
                if len(remote.DenyImports) > 0 {
15✔
404
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
405
                }
7✔
406
                cfg.perms = perms
8✔
407
        }
408
        // Start with the one that is configured. We will add to this
409
        // array when receiving async leafnode INFOs.
410
        cfg.urls = append(cfg.urls, cfg.URLs...)
678✔
411
        // If allowed to randomize, do it on our copy of URLs
678✔
412
        if !remote.NoRandomize {
1,354✔
413
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,090✔
414
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
414✔
415
                })
414✔
416
        }
417
        // If we are TLS make sure we save off a proper servername if possible.
418
        // Do same for user/password since we may need them to connect to
419
        // a bare URL that we get from INFO protocol.
420
        for _, u := range cfg.urls {
1,800✔
421
                cfg.saveTLSHostname(u)
1,122✔
422
                cfg.saveUserPassword(u)
1,122✔
423
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,122✔
424
                // config, mark that we should be using TLS anyway.
1,122✔
425
                if !cfg.TLS && isWSSURL(u) {
1,123✔
426
                        cfg.TLS = true
1✔
427
                }
1✔
428
        }
429
        return cfg
678✔
430
}
431

432
// Will pick an URL from the list of available URLs.
433
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
1,029✔
434
        cfg.Lock()
1,029✔
435
        defer cfg.Unlock()
1,029✔
436
        // If the current URL is the first in the list and we have more than
1,029✔
437
        // one URL, then move that one to end of the list.
1,029✔
438
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
1,179✔
439
                first := cfg.urls[0]
150✔
440
                copy(cfg.urls, cfg.urls[1:])
150✔
441
                cfg.urls[len(cfg.urls)-1] = first
150✔
442
        }
150✔
443
        cfg.curURL = cfg.urls[0]
1,029✔
444
        return cfg.curURL
1,029✔
445
}
446

447
// Returns the current URL
448
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
83✔
449
        cfg.RLock()
83✔
450
        defer cfg.RUnlock()
83✔
451
        return cfg.curURL
83✔
452
}
83✔
453

454
// Returns how long the server should wait before attempting
455
// to solicit a remote leafnode connection.
456
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
850✔
457
        cfg.RLock()
850✔
458
        delay := cfg.connDelay
850✔
459
        cfg.RUnlock()
850✔
460
        return delay
850✔
461
}
850✔
462

463
// Sets the connect delay.
464
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
157✔
465
        cfg.Lock()
157✔
466
        cfg.connDelay = delay
157✔
467
        cfg.Unlock()
157✔
468
}
157✔
469

470
// Ensure that non-exported options (used in tests) have
471
// been properly set.
472
func (s *Server) setLeafNodeNonExportedOptions() {
6,127✔
473
        opts := s.getOpts()
6,127✔
474
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,127✔
475
        if s.leafNodeOpts.dialTimeout == 0 {
12,253✔
476
                // Use same timeouts as routes for now.
6,126✔
477
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,126✔
478
        }
6,126✔
479
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,127✔
480
        if s.leafNodeOpts.resolver == nil {
12,250✔
481
                s.leafNodeOpts.resolver = net.DefaultResolver
6,123✔
482
        }
6,123✔
483
}
484

485
const sharedSysAccDelay = 250 * time.Millisecond
486

487
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
850✔
488
        defer s.grWG.Done()
850✔
489

850✔
490
        if remote == nil || len(remote.URLs) == 0 {
850✔
491
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
492
                return
×
493
        }
×
494

495
        opts := s.getOpts()
850✔
496
        reconnectDelay := opts.LeafNode.ReconnectInterval
850✔
497
        s.mu.RLock()
850✔
498
        dialTimeout := s.leafNodeOpts.dialTimeout
850✔
499
        resolver := s.leafNodeOpts.resolver
850✔
500
        var isSysAcc bool
850✔
501
        if s.eventsEnabled() {
1,669✔
502
                isSysAcc = remote.LocalAccount == s.sys.account.Name
819✔
503
        }
819✔
504
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
850✔
505
        s.mu.RUnlock()
850✔
506

850✔
507
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
850✔
508
        if firstConnect && isSysAcc && !s.standAloneMode() {
923✔
509
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
73✔
510
                remote.setConnectDelay(sharedSysAccDelay)
73✔
511
        }
73✔
512

513
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
929✔
514
                select {
79✔
515
                case <-time.After(connDelay):
73✔
516
                case <-s.quitCh:
6✔
517
                        return
6✔
518
                }
519
                remote.setConnectDelay(0)
73✔
520
        }
521

522
        var conn net.Conn
844✔
523

844✔
524
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
844✔
525

844✔
526
        attempts := 0
844✔
527

844✔
528
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
1,873✔
529
                rURL := remote.pickNextURL()
1,029✔
530
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
1,029✔
531
                if err == nil {
2,051✔
532
                        var ipStr string
1,022✔
533
                        if url != rURL.Host {
1,099✔
534
                                ipStr = fmt.Sprintf(" (%s)", url)
77✔
535
                        }
77✔
536
                        // Some test may want to disable remotes from connecting
537
                        if s.isLeafConnectDisabled() {
1,159✔
538
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
137✔
539
                                err = ErrLeafNodeDisabled
137✔
540
                        } else {
1,022✔
541
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
885✔
542
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
885✔
543
                        }
885✔
544
                }
545
                if err != nil {
1,304✔
546
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
275✔
547
                        delay := reconnectDelay + jitter
275✔
548
                        attempts++
275✔
549
                        if s.shouldReportConnectErr(firstConnect, attempts) {
534✔
550
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
259✔
551
                        } else {
275✔
552
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
16✔
553
                        }
16✔
554
                        remote.Lock()
275✔
555
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
275✔
556
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
283✔
557
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
558
                                        s.checkJetStreamMigrate(remote)
8✔
559
                                })
8✔
560
                        }
561
                        remote.Unlock()
275✔
562
                        select {
275✔
563
                        case <-s.quitCh:
88✔
564
                                remote.cancelMigrateTimer()
88✔
565
                                return
88✔
566
                        case <-time.After(delay):
187✔
567
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
187✔
568
                                // This will be used if JetStreamClusterMigrateDelay was not set
187✔
569
                                if jetstreamMigrateDelay == 0 {
297✔
570
                                        s.checkJetStreamMigrate(remote)
110✔
571
                                }
110✔
572
                                continue
187✔
573
                        }
574
                }
575
                remote.cancelMigrateTimer()
754✔
576
                if !s.remoteLeafNodeStillValid(remote) {
754✔
577
                        conn.Close()
×
578
                        return
×
579
                }
×
580

581
                // We have a connection here to a remote server.
582
                // Go ahead and create our leaf node and return.
583
                s.createLeafNode(conn, rURL, remote, nil)
754✔
584

754✔
585
                // Clear any observer states if we had them.
754✔
586
                s.clearObserverState(remote)
754✔
587

754✔
588
                return
754✔
589
        }
590
}
591

592
func (cfg *leafNodeCfg) cancelMigrateTimer() {
842✔
593
        cfg.Lock()
842✔
594
        stopAndClearTimer(&cfg.jsMigrateTimer)
842✔
595
        cfg.Unlock()
842✔
596
}
842✔
597

598
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
599
func (s *Server) clearObserverState(remote *leafNodeCfg) {
754✔
600
        s.mu.RLock()
754✔
601
        accName := remote.LocalAccount
754✔
602
        s.mu.RUnlock()
754✔
603

754✔
604
        acc, err := s.LookupAccount(accName)
754✔
605
        if err != nil {
756✔
606
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
607
                return
2✔
608
        }
2✔
609

610
        acc.jscmMu.Lock()
752✔
611
        defer acc.jscmMu.Unlock()
752✔
612

752✔
613
        // Walk all streams looking for any clustered stream, skip otherwise.
752✔
614
        for _, mset := range acc.streams() {
767✔
615
                node := mset.raftNode()
15✔
616
                if node == nil {
22✔
617
                        // Not R>1
7✔
618
                        continue
7✔
619
                }
620
                // Check consumers
621
                for _, o := range mset.getConsumers() {
10✔
622
                        if n := o.raftNode(); n != nil {
4✔
623
                                // Ensure we can become a leader again.
2✔
624
                                n.SetObserver(false)
2✔
625
                        }
2✔
626
                }
627
                // Ensure we can not become a leader again.
628
                node.SetObserver(false)
8✔
629
        }
630
}
631

632
// Check to see if we should migrate any assets from this account.
633
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
118✔
634
        s.mu.RLock()
118✔
635
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
118✔
636
        s.mu.RUnlock()
118✔
637

118✔
638
        if !shouldMigrate {
168✔
639
                return
50✔
640
        }
50✔
641

642
        acc, err := s.LookupAccount(accName)
68✔
643
        if err != nil {
68✔
644
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
645
                return
×
646
        }
×
647

648
        acc.jscmMu.Lock()
68✔
649
        defer acc.jscmMu.Unlock()
68✔
650

68✔
651
        // Walk all streams looking for any clustered stream, skip otherwise.
68✔
652
        // If we are the leader force stepdown.
68✔
653
        for _, mset := range acc.streams() {
101✔
654
                node := mset.raftNode()
33✔
655
                if node == nil {
33✔
656
                        // Not R>1
×
657
                        continue
×
658
                }
659
                // Collect any consumers
660
                for _, o := range mset.getConsumers() {
55✔
661
                        if n := o.raftNode(); n != nil {
44✔
662
                                if n.Leader() {
24✔
663
                                        n.StepDown()
2✔
664
                                }
2✔
665
                                // Ensure we can not become a leader while in this state.
666
                                n.SetObserver(true)
22✔
667
                        }
668
                }
669
                // Stepdown if this stream was leader.
670
                if node.Leader() {
38✔
671
                        node.StepDown()
5✔
672
                }
5✔
673
                // Ensure we can not become a leader while in this state.
674
                node.SetObserver(true)
33✔
675
        }
676
}
677

678
// Helper for checking.
679
func (s *Server) isLeafConnectDisabled() bool {
1,022✔
680
        s.mu.RLock()
1,022✔
681
        defer s.mu.RUnlock()
1,022✔
682
        return s.leafDisableConnect
1,022✔
683
}
1,022✔
684

685
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
686
// come from the server we connect to.
687
//
688
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
689
// However, this was causing failures for users that did not set the scheme (and
690
// their remote connections did not have a tls{} block).
691
// We now save the host name regardless in case the remote returns an INFO indicating
692
// that TLS is required.
693
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,714✔
694
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,734✔
695
                cfg.tlsName = u.Hostname()
20✔
696
        }
20✔
697
}
698

699
// Save off the username/password for when we connect using a bare URL
700
// that we get from the INFO protocol.
701
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,122✔
702
        if cfg.username == _EMPTY_ && u.User != nil {
1,382✔
703
                cfg.username = u.User.Username()
260✔
704
                cfg.password, _ = u.User.Password()
260✔
705
        }
260✔
706
}
707

708
// This starts the leafnode accept loop in a go routine, unless it
709
// is detected that the server has already been shutdown.
710
func (s *Server) startLeafNodeAcceptLoop() {
2,749✔
711
        // Snapshot server options.
2,749✔
712
        opts := s.getOpts()
2,749✔
713

2,749✔
714
        port := opts.LeafNode.Port
2,749✔
715
        if port == -1 {
5,419✔
716
                port = 0
2,670✔
717
        }
2,670✔
718

719
        if s.isShuttingDown() {
2,750✔
720
                return
1✔
721
        }
1✔
722

723
        s.mu.Lock()
2,748✔
724
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
2,748✔
725
        l, e := natsListen("tcp", hp)
2,748✔
726
        s.leafNodeListenerErr = e
2,748✔
727
        if e != nil {
2,748✔
728
                s.mu.Unlock()
×
729
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
730
                return
×
731
        }
×
732

733
        s.Noticef("Listening for leafnode connections on %s",
2,748✔
734
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
2,748✔
735

2,748✔
736
        tlsRequired := opts.LeafNode.TLSConfig != nil
2,748✔
737
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
2,748✔
738
        // Do not set compression in this Info object, it would possibly cause
2,748✔
739
        // issues when sending asynchronous INFO to the remote.
2,748✔
740
        info := Info{
2,748✔
741
                ID:            s.info.ID,
2,748✔
742
                Name:          s.info.Name,
2,748✔
743
                Version:       s.info.Version,
2,748✔
744
                GitCommit:     gitCommit,
2,748✔
745
                GoVersion:     runtime.Version(),
2,748✔
746
                AuthRequired:  true,
2,748✔
747
                TLSRequired:   tlsRequired,
2,748✔
748
                TLSVerify:     tlsVerify,
2,748✔
749
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
2,748✔
750
                Headers:       s.supportsHeaders(),
2,748✔
751
                JetStream:     opts.JetStream,
2,748✔
752
                Domain:        opts.JetStreamDomain,
2,748✔
753
                Proto:         s.getServerProto(),
2,748✔
754
                InfoOnConnect: true,
2,748✔
755
        }
2,748✔
756
        // If we have selected a random port...
2,748✔
757
        if port == 0 {
5,417✔
758
                // Write resolved port back to options.
2,669✔
759
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
2,669✔
760
        }
2,669✔
761

762
        s.leafNodeInfo = info
2,748✔
763
        // Possibly override Host/Port and set IP based on Cluster.Advertise
2,748✔
764
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
2,748✔
765
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
766
                l.Close()
×
767
                s.mu.Unlock()
×
768
                return
×
769
        }
×
770
        s.leafURLsMap[s.leafNodeInfo.IP]++
2,748✔
771
        s.generateLeafNodeInfoJSON()
2,748✔
772

2,748✔
773
        // Setup state that can enable shutdown
2,748✔
774
        s.leafNodeListener = l
2,748✔
775

2,748✔
776
        // As of now, a server that does not have remotes configured would
2,748✔
777
        // never solicit a connection, so we should not have to warn if
2,748✔
778
        // InsecureSkipVerify is set in main LeafNodes config (since
2,748✔
779
        // this TLS setting matters only when soliciting a connection).
2,748✔
780
        // Still, warn if insecure is set in any of LeafNode block.
2,748✔
781
        // We need to check remotes, even if tls is not required on accept.
2,748✔
782
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
2,748✔
783
        if !warn {
5,492✔
784
                for _, r := range opts.LeafNode.Remotes {
2,901✔
785
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
786
                                warn = true
1✔
787
                                break
1✔
788
                        }
789
                }
790
        }
791
        if warn {
2,753✔
792
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
793
        }
5✔
794
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,526✔
795
        s.mu.Unlock()
2,748✔
796
}
797

798
// RegEx to match a creds file with user JWT and Seed.
799
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
800

801
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
802
// Lock should be held entering here.
803
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
631✔
804
        // We support basic user/pass and operator based user JWT with signatures.
631✔
805
        cinfo := leafConnectInfo{
631✔
806
                Version:       VERSION,
631✔
807
                ID:            c.srv.info.ID,
631✔
808
                Domain:        c.srv.info.Domain,
631✔
809
                Name:          c.srv.info.Name,
631✔
810
                Hub:           c.leaf.remote.Hub,
631✔
811
                Cluster:       clusterName,
631✔
812
                Headers:       headers,
631✔
813
                JetStream:     c.acc.jetStreamConfigured(),
631✔
814
                DenyPub:       c.leaf.remote.DenyImports,
631✔
815
                Compression:   c.leaf.compression,
631✔
816
                RemoteAccount: c.acc.GetName(),
631✔
817
                Proto:         c.srv.getServerProto(),
631✔
818
        }
631✔
819

631✔
820
        // If a signature callback is specified, this takes precedence over anything else.
631✔
821
        if cb := c.leaf.remote.SignatureCB; cb != nil {
634✔
822
                nonce := c.nonce
3✔
823
                c.mu.Unlock()
3✔
824
                jwt, sigraw, err := cb(nonce)
3✔
825
                c.mu.Lock()
3✔
826
                if err == nil && c.isClosed() {
4✔
827
                        err = ErrConnectionClosed
1✔
828
                }
1✔
829
                if err != nil {
5✔
830
                        c.Errorf("Error signing the nonce: %v", err)
2✔
831
                        return err
2✔
832
                }
2✔
833
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
834
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
835

836
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
677✔
837
                // Check for credentials first, that will take precedence..
49✔
838
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
49✔
839
                contents, err := os.ReadFile(creds)
49✔
840
                if err != nil {
49✔
841
                        c.Errorf("%v", err)
×
842
                        return err
×
843
                }
×
844
                defer wipeSlice(contents)
49✔
845
                items := credsRe.FindAllSubmatch(contents, -1)
49✔
846
                if len(items) < 2 {
49✔
847
                        c.Errorf("Credentials file malformed")
×
848
                        return err
×
849
                }
×
850
                // First result should be the user JWT.
851
                // We copy here so that the file containing the seed will be wiped appropriately.
852
                raw := items[0][1]
49✔
853
                tmp := make([]byte, len(raw))
49✔
854
                copy(tmp, raw)
49✔
855
                // Seed is second item.
49✔
856
                kp, err := nkeys.FromSeed(items[1][1])
49✔
857
                if err != nil {
49✔
858
                        c.Errorf("Credentials file has malformed seed")
×
859
                        return err
×
860
                }
×
861
                // Wipe our key on exit.
862
                defer kp.Wipe()
49✔
863

49✔
864
                sigraw, _ := kp.Sign(c.nonce)
49✔
865
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
49✔
866
                cinfo.JWT = bytesToString(tmp)
49✔
867
                cinfo.Sig = sig
49✔
868
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
581✔
869
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
870
                if err != nil {
2✔
871
                        c.Errorf("Remote nkey has malformed seed")
×
872
                        return err
×
873
                }
×
874
                // Wipe our key on exit.
875
                defer kp.Wipe()
2✔
876
                sigraw, _ := kp.Sign(c.nonce)
2✔
877
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
878
                pkey, _ := kp.PublicKey()
2✔
879
                cinfo.Nkey = pkey
2✔
880
                cinfo.Sig = sig
2✔
881
        }
882
        // In addition, and this is to allow auth callout, set user/password or
883
        // token if applicable.
884
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
908✔
885
                // For backward compatibility, if only username is provided, set both
279✔
886
                // Token and User, not just Token.
279✔
887
                cinfo.User = userInfo.Username()
279✔
888
                var ok bool
279✔
889
                cinfo.Pass, ok = userInfo.Password()
279✔
890
                if !ok {
285✔
891
                        cinfo.Token = cinfo.User
6✔
892
                }
6✔
893
        } else if c.leaf.remote.username != _EMPTY_ {
352✔
894
                cinfo.User = c.leaf.remote.username
2✔
895
                cinfo.Pass = c.leaf.remote.password
2✔
896
        }
2✔
897
        b, err := json.Marshal(cinfo)
629✔
898
        if err != nil {
629✔
899
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
900
                return err
×
901
        }
×
902
        // Although this call is made before the writeLoop is created,
903
        // we don't really need to send in place. The protocol will be
904
        // sent out by the writeLoop.
905
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
629✔
906
        return nil
629✔
907
}
908

909
// Makes a deep copy of the LeafNode Info structure.
910
// The server lock is held on entry.
911
func (s *Server) copyLeafNodeInfo() *Info {
2,515✔
912
        clone := s.leafNodeInfo
2,515✔
913
        // Copy the array of urls.
2,515✔
914
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,566✔
915
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,051✔
916
        }
2,051✔
917
        return &clone
2,515✔
918
}
919

920
// Adds a LeafNode URL that we get when a route connects to the Info structure.
921
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
922
// Returns a boolean indicating if the URL was added or not.
923
// Server lock is held on entry
924
func (s *Server) addLeafNodeURL(urlStr string) bool {
5,426✔
925
        if s.leafURLsMap.addUrl(urlStr) {
10,847✔
926
                s.generateLeafNodeInfoJSON()
5,421✔
927
                return true
5,421✔
928
        }
5,421✔
929
        return false
5✔
930
}
931

932
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
933
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
934
// Returns a boolean indicating if the URL was removed or not.
935
// Server lock is held on entry.
936
func (s *Server) removeLeafNodeURL(urlStr string) bool {
5,426✔
937
        // Don't need to do this if we are removing the route connection because
5,426✔
938
        // we are shuting down...
5,426✔
939
        if s.isShuttingDown() {
8,310✔
940
                return false
2,884✔
941
        }
2,884✔
942
        if s.leafURLsMap.removeUrl(urlStr) {
5,080✔
943
                s.generateLeafNodeInfoJSON()
2,538✔
944
                return true
2,538✔
945
        }
2,538✔
946
        return false
4✔
947
}
948

949
// Server lock is held on entry
950
func (s *Server) generateLeafNodeInfoJSON() {
10,707✔
951
        s.leafNodeInfo.Cluster = s.cachedClusterName()
10,707✔
952
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
10,707✔
953
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
10,707✔
954
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
10,707✔
955
}
10,707✔
956

957
// Sends an async INFO protocol so that the connected servers can update
958
// their list of LeafNode urls.
959
func (s *Server) sendAsyncLeafNodeInfo() {
7,959✔
960
        for _, c := range s.leafs {
8,052✔
961
                c.mu.Lock()
93✔
962
                c.enqueueProto(s.leafNodeInfoJSON)
93✔
963
                c.mu.Unlock()
93✔
964
        }
93✔
965
}
966

967
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
968
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,559✔
969
        // Snapshot server options.
1,559✔
970
        opts := s.getOpts()
1,559✔
971

1,559✔
972
        maxPay := int32(opts.MaxPayload)
1,559✔
973
        maxSubs := int32(opts.MaxSubs)
1,559✔
974
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,559✔
975
        if maxSubs == 0 {
3,117✔
976
                maxSubs = -1
1,558✔
977
        }
1,558✔
978
        now := time.Now().UTC()
1,559✔
979

1,559✔
980
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,559✔
981
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,559✔
982
        c.leaf = &leaf{}
1,559✔
983

1,559✔
984
        // For accepted LN connections, ws will be != nil if it was accepted
1,559✔
985
        // through the Websocket port.
1,559✔
986
        c.ws = ws
1,559✔
987

1,559✔
988
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,559✔
989
        // a remote Leaf Node connection as a websocket connection.
1,559✔
990
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,602✔
991
                remote.RLock()
43✔
992
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
993
                remote.RUnlock()
43✔
994
        }
43✔
995

996
        // Determines if we are soliciting the connection or not.
997
        var solicited bool
1,559✔
998
        var acc *Account
1,559✔
999
        var remoteSuffix string
1,559✔
1000
        if remote != nil {
2,313✔
1001
                // For now, if lookup fails, we will constantly try
754✔
1002
                // to recreate this LN connection.
754✔
1003
                lacc := remote.LocalAccount
754✔
1004
                var err error
754✔
1005
                acc, err = s.LookupAccount(lacc)
754✔
1006
                if err != nil {
756✔
1007
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1008
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1009
                        // remote needs to be set or retry won't happen
2✔
1010
                        c.leaf.remote = remote
2✔
1011
                        c.closeConnection(MissingAccount)
2✔
1012
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1013
                        return nil
2✔
1014
                }
2✔
1015
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
752✔
1016
        }
1017

1018
        c.mu.Lock()
1,557✔
1019
        c.initClient()
1,557✔
1020
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,557✔
1021

1,557✔
1022
        var (
1,557✔
1023
                tlsFirst         bool
1,557✔
1024
                tlsFirstFallback time.Duration
1,557✔
1025
                infoTimeout      time.Duration
1,557✔
1026
        )
1,557✔
1027
        if remote != nil {
2,309✔
1028
                solicited = true
752✔
1029
                remote.Lock()
752✔
1030
                c.leaf.remote = remote
752✔
1031
                c.setPermissions(remote.perms)
752✔
1032
                if !c.leaf.remote.Hub {
1,498✔
1033
                        c.leaf.isSpoke = true
746✔
1034
                }
746✔
1035
                tlsFirst = remote.TLSHandshakeFirst
752✔
1036
                infoTimeout = remote.FirstInfoTimeout
752✔
1037
                remote.Unlock()
752✔
1038
                c.acc = acc
752✔
1039
        } else {
805✔
1040
                c.flags.set(expectConnect)
805✔
1041
                if ws != nil {
832✔
1042
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1043
                }
27✔
1044
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
805✔
1045
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
806✔
1046
                        tlsFirstFallback = f
1✔
1047
                }
1✔
1048
        }
1049
        c.mu.Unlock()
1,557✔
1050

1,557✔
1051
        var nonce [nonceLen]byte
1,557✔
1052
        var info *Info
1,557✔
1053

1,557✔
1054
        // Grab this before the client lock below.
1,557✔
1055
        if !solicited {
2,362✔
1056
                // Grab server variables
805✔
1057
                s.mu.Lock()
805✔
1058
                info = s.copyLeafNodeInfo()
805✔
1059
                // For tests that want to simulate old servers, do not set the compression
805✔
1060
                // on the INFO protocol if configured with CompressionNotSupported.
805✔
1061
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,609✔
1062
                        info.Compression = cm
804✔
1063
                }
804✔
1064
                s.generateNonce(nonce[:])
805✔
1065
                s.mu.Unlock()
805✔
1066
        }
1067

1068
        // Grab lock
1069
        c.mu.Lock()
1,557✔
1070

1,557✔
1071
        var preBuf []byte
1,557✔
1072
        if solicited {
2,309✔
1073
                // For websocket connection, we need to send an HTTP request,
752✔
1074
                // and get the response before starting the readLoop to get
752✔
1075
                // the INFO, etc..
752✔
1076
                if c.isWebsocket() {
795✔
1077
                        var err error
43✔
1078
                        var closeReason ClosedState
43✔
1079

43✔
1080
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1081
                        if err != nil {
59✔
1082
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1083
                                c.mu.Unlock()
16✔
1084
                                if closeReason != 0 {
28✔
1085
                                        c.closeConnection(closeReason)
12✔
1086
                                }
12✔
1087
                                return nil
16✔
1088
                        }
1089
                } else {
709✔
1090
                        // If configured to do TLS handshake first
709✔
1091
                        if tlsFirst {
713✔
1092
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1093
                                        c.mu.Unlock()
1✔
1094
                                        return nil
1✔
1095
                                }
1✔
1096
                        }
1097
                        // We need to wait for the info, but not for too long.
1098
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
708✔
1099
                }
1100

1101
                // We will process the INFO from the readloop and finish by
1102
                // sending the CONNECT and finish registration later.
1103
        } else {
805✔
1104
                // Send our info to the other side.
805✔
1105
                // Remember the nonce we sent here for signatures, etc.
805✔
1106
                c.nonce = make([]byte, nonceLen)
805✔
1107
                copy(c.nonce, nonce[:])
805✔
1108
                info.Nonce = bytesToString(c.nonce)
805✔
1109
                info.CID = c.cid
805✔
1110
                proto := generateInfoJSON(info)
805✔
1111

805✔
1112
                var pre []byte
805✔
1113
                // We need first to check for "TLS First" fallback delay.
805✔
1114
                if tlsFirstFallback > 0 {
806✔
1115
                        // We wait and see if we are getting any data. Since we did not send
1✔
1116
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1117
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1118
                        // if it is a rogue agent and not an actual client performing the
1✔
1119
                        // TLS handshake, the error will be detected when performing the
1✔
1120
                        // handshake on our side.
1✔
1121
                        pre = make([]byte, 4)
1✔
1122
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1123
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1124
                        c.nc.SetReadDeadline(time.Time{})
1✔
1125
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1126
                        // with the TLS handshake.
1✔
1127
                        if n > 0 {
1✔
1128
                                pre = pre[:n]
×
1129
                        } else {
1✔
1130
                                // We did not get anything so we will send the INFO protocol.
1✔
1131
                                pre = nil
1✔
1132
                                // Set the boolean to false for the rest of the function.
1✔
1133
                                tlsFirst = false
1✔
1134
                        }
1✔
1135
                }
1136

1137
                if !tlsFirst {
1,605✔
1138
                        // We have to send from this go routine because we may
800✔
1139
                        // have to block for TLS handshake before we start our
800✔
1140
                        // writeLoop go routine. The other side needs to receive
800✔
1141
                        // this before it can initiate the TLS handshake..
800✔
1142
                        c.sendProtoNow(proto)
800✔
1143

800✔
1144
                        // The above call could have marked the connection as closed (due to TCP error).
800✔
1145
                        if c.isClosed() {
800✔
1146
                                c.mu.Unlock()
×
1147
                                c.closeConnection(WriteError)
×
1148
                                return nil
×
1149
                        }
×
1150
                }
1151

1152
                // Check to see if we need to spin up TLS.
1153
                if !c.isWebsocket() && info.TLSRequired {
881✔
1154
                        // If we have a prebuffer create a multi-reader.
76✔
1155
                        if len(pre) > 0 {
76✔
1156
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1157
                        }
×
1158
                        // Perform server-side TLS handshake.
1159
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
124✔
1160
                                c.mu.Unlock()
48✔
1161
                                return nil
48✔
1162
                        }
48✔
1163
                }
1164

1165
                // If the user wants the TLS handshake to occur first, now that it is
1166
                // done, send the INFO protocol.
1167
                if tlsFirst {
760✔
1168
                        c.flags.set(didTLSFirst)
3✔
1169
                        c.sendProtoNow(proto)
3✔
1170
                        if c.isClosed() {
3✔
1171
                                c.mu.Unlock()
×
1172
                                c.closeConnection(WriteError)
×
1173
                                return nil
×
1174
                        }
×
1175
                }
1176

1177
                // Leaf nodes will always require a CONNECT to let us know
1178
                // when we are properly bound to an account.
1179
                //
1180
                // If compression is configured, we can't set the authTimer here because
1181
                // it would cause the parser to fail any incoming protocol that is not a
1182
                // CONNECT (and we need to exchange INFO protocols for compression
1183
                // negotiation). So instead, use the ping timer until we are done with
1184
                // negotiation and can set the auth timer.
1185
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
757✔
1186
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,312✔
1187
                        c.ping.tmr = time.AfterFunc(timeout, func() {
565✔
1188
                                c.authTimeout()
10✔
1189
                        })
10✔
1190
                } else {
202✔
1191
                        c.setAuthTimer(timeout)
202✔
1192
                }
202✔
1193
        }
1194

1195
        // Keep track in case server is shutdown before we can successfully register.
1196
        if !s.addToTempClients(c.cid, c) {
1,493✔
1197
                c.mu.Unlock()
1✔
1198
                c.setNoReconnect()
1✔
1199
                c.closeConnection(ServerShutdown)
1✔
1200
                return nil
1✔
1201
        }
1✔
1202

1203
        // Spin up the read loop.
1204
        s.startGoRoutine(func() { c.readLoop(preBuf) })
2,982✔
1205

1206
        // We will spin the write loop for solicited connections only
1207
        // when processing the INFO and after switching to TLS if needed.
1208
        if !solicited {
2,248✔
1209
                s.startGoRoutine(func() { c.writeLoop() })
1,514✔
1210
        }
1211

1212
        c.mu.Unlock()
1,491✔
1213

1,491✔
1214
        return c
1,491✔
1215
}
1216

1217
// Will perform the client-side TLS handshake if needed. Assumes that this
1218
// is called by the solicit side (remote will be non nil). Returns `true`
1219
// if TLS is required, `false` otherwise.
1220
// Lock held on entry.
1221
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,803✔
1222
        // Check if TLS is required and gather TLS config variables.
1,803✔
1223
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,803✔
1224
        if !tlsRequired {
3,523✔
1225
                return false, nil
1,720✔
1226
        }
1,720✔
1227

1228
        // If TLS required, peform handshake.
1229
        // Get the URL that was used to connect to the remote server.
1230
        rURL := remote.getCurrentURL()
83✔
1231

83✔
1232
        // Perform the client-side TLS handshake.
83✔
1233
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
124✔
1234
                // Check if we need to reset the remote's TLS name.
41✔
1235
                if resetTLSName {
41✔
1236
                        remote.Lock()
×
1237
                        remote.tlsName = _EMPTY_
×
1238
                        remote.Unlock()
×
1239
                }
×
1240
                return false, err
41✔
1241
        }
1242
        return true, nil
42✔
1243
}
1244

1245
func (c *client) processLeafnodeInfo(info *Info) {
2,494✔
1246
        c.mu.Lock()
2,494✔
1247
        if c.leaf == nil || c.isClosed() {
2,494✔
1248
                c.mu.Unlock()
×
1249
                return
×
1250
        }
×
1251
        s := c.srv
2,494✔
1252
        opts := s.getOpts()
2,494✔
1253
        remote := c.leaf.remote
2,494✔
1254
        didSolicit := remote != nil
2,494✔
1255
        firstINFO := !c.flags.isSet(infoReceived)
2,494✔
1256

2,494✔
1257
        // In case of websocket, the TLS handshake has been already done.
2,494✔
1258
        // So check only for non websocket connections and for configurations
2,494✔
1259
        // where the TLS Handshake was not done first.
2,494✔
1260
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,250✔
1261
                // If the server requires TLS, we need to set this in the remote
1,756✔
1262
                // otherwise if there is no TLS configuration block for the remote,
1,756✔
1263
                // the solicit side will not attempt to perform the TLS handshake.
1,756✔
1264
                if firstINFO && info.TLSRequired {
1,823✔
1265
                        remote.TLS = true
67✔
1266
                }
67✔
1267
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,792✔
1268
                        c.mu.Unlock()
36✔
1269
                        return
36✔
1270
                }
36✔
1271
        }
1272

1273
        // Check for compression, unless already done.
1274
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,672✔
1275
                // Prevent from getting back here.
1,214✔
1276
                c.flags.set(compressionNegotiated)
1,214✔
1277

1,214✔
1278
                var co *CompressionOpts
1,214✔
1279
                if !didSolicit {
1,743✔
1280
                        co = &opts.LeafNode.Compression
529✔
1281
                } else {
1,214✔
1282
                        co = &remote.Compression
685✔
1283
                }
685✔
1284
                if needsCompression(co.Mode) {
2,417✔
1285
                        // Release client lock since following function will need server lock.
1,203✔
1286
                        c.mu.Unlock()
1,203✔
1287
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,203✔
1288
                        if err != nil {
1,203✔
1289
                                c.sendErrAndErr(err.Error())
×
1290
                                c.closeConnection(ProtocolViolation)
×
1291
                                return
×
1292
                        }
×
1293
                        if compress {
2,277✔
1294
                                // Done for now, will get back another INFO protocol...
1,074✔
1295
                                return
1,074✔
1296
                        }
1,074✔
1297
                        // No compression because one side does not want/can't, so proceed.
1298
                        c.mu.Lock()
129✔
1299
                        // Check that the connection did not close if the lock was released.
129✔
1300
                        if c.isClosed() {
129✔
1301
                                c.mu.Unlock()
×
1302
                                return
×
1303
                        }
×
1304
                } else {
11✔
1305
                        // Coming from an old server, the Compression field would be the empty
11✔
1306
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1307
                        // this makes them behave as old servers.
11✔
1308
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1309
                                c.leaf.compression = CompressionNotSupported
3✔
1310
                        } else {
11✔
1311
                                c.leaf.compression = CompressionOff
8✔
1312
                        }
8✔
1313
                }
1314
                // Accepting side does not normally process an INFO protocol during
1315
                // initial connection handshake. So we keep it consistent by returning
1316
                // if we are not soliciting.
1317
                if !didSolicit {
141✔
1318
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1319
                        // clear the ping timer and set the auth timer now that the compression
1✔
1320
                        // negotiation is done.
1✔
1321
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1322
                                clearTimer(&c.ping.tmr)
×
1323
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1324
                        }
×
1325
                        c.mu.Unlock()
1✔
1326
                        return
1✔
1327
                }
1328
                // Fall through and process the INFO protocol as usual.
1329
        }
1330

1331
        // Note: For now, only the initial INFO has a nonce. We
1332
        // will probably do auto key rotation at some point.
1333
        if firstINFO {
2,107✔
1334
                // Mark that the INFO protocol has been received.
724✔
1335
                c.flags.set(infoReceived)
724✔
1336
                // Prevent connecting to non leafnode port. Need to do this only for
724✔
1337
                // the first INFO, not for async INFO updates...
724✔
1338
                //
724✔
1339
                // Content of INFO sent by the server when accepting a tcp connection.
724✔
1340
                // -------------------------------------------------------------------
724✔
1341
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
724✔
1342
                // -------------------------------------------------------------------
724✔
1343
                //      CLIENT    |  X* |        X**        |              |         |
724✔
1344
                //      ROUTE     |     |        X**        |      X***    |         |
724✔
1345
                //     GATEWAY    |     |                   |              |    X    |
724✔
1346
                //     LEAFNODE   |  X  |                   |       X      |         |
724✔
1347
                // -------------------------------------------------------------------
724✔
1348
                // *   Not on older servers.
724✔
1349
                // **  Not if "no advertise" is enabled.
724✔
1350
                // *** Not if leafnode's "no advertise" is enabled.
724✔
1351
                //
724✔
1352
                // As seen from above, a solicited LeafNode connection should receive
724✔
1353
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
724✔
1354
                // else should be considered an attempt to connect to a wrong port.
724✔
1355
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
775✔
1356
                        c.mu.Unlock()
51✔
1357
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1358
                        c.closeConnection(WrongPort)
51✔
1359
                        return
51✔
1360
                }
51✔
1361
                // Reject a cluster that contains spaces.
1362
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
674✔
1363
                        c.mu.Unlock()
1✔
1364
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1365
                        c.closeConnection(ProtocolViolation)
1✔
1366
                        return
1✔
1367
                }
1✔
1368
                // Capture a nonce here.
1369
                c.nonce = []byte(info.Nonce)
672✔
1370
                if info.TLSRequired && didSolicit {
703✔
1371
                        remote.TLS = true
31✔
1372
                }
31✔
1373
                supportsHeaders := c.srv.supportsHeaders()
672✔
1374
                c.headers = supportsHeaders && info.Headers
672✔
1375

672✔
1376
                // Remember the remote server.
672✔
1377
                // Pre 2.2.0 servers are not sending their server name.
672✔
1378
                // In that case, use info.ID, which, for those servers, matches
672✔
1379
                // the content of the field `Name` in the leafnode CONNECT protocol.
672✔
1380
                if info.Name == _EMPTY_ {
672✔
1381
                        c.leaf.remoteServer = info.ID
×
1382
                } else {
672✔
1383
                        c.leaf.remoteServer = info.Name
672✔
1384
                }
672✔
1385
                c.leaf.remoteDomain = info.Domain
672✔
1386
                c.leaf.remoteCluster = info.Cluster
672✔
1387
                // We send the protocol version in the INFO protocol.
672✔
1388
                // Keep track of it, so we know if this connection supports message
672✔
1389
                // tracing for instance.
672✔
1390
                c.opts.Protocol = info.Proto
672✔
1391
        }
1392

1393
        // For both initial INFO and async INFO protocols, Possibly
1394
        // update our list of remote leafnode URLs we can connect to.
1395
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,578✔
1396
                // Consider the incoming array as the most up-to-date
1,247✔
1397
                // representation of the remote cluster's list of URLs.
1,247✔
1398
                c.updateLeafNodeURLs(info)
1,247✔
1399
        }
1,247✔
1400

1401
        // Check to see if we have permissions updates here.
1402
        if info.Import != nil || info.Export != nil {
1,342✔
1403
                perms := &Permissions{
11✔
1404
                        Publish:   info.Export,
11✔
1405
                        Subscribe: info.Import,
11✔
1406
                }
11✔
1407
                // Check if we have local deny clauses that we need to merge.
11✔
1408
                if remote := c.leaf.remote; remote != nil {
22✔
1409
                        if len(remote.DenyExports) > 0 {
12✔
1410
                                if perms.Publish == nil {
1✔
1411
                                        perms.Publish = &SubjectPermission{}
×
1412
                                }
×
1413
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1414
                        }
1415
                        if len(remote.DenyImports) > 0 {
12✔
1416
                                if perms.Subscribe == nil {
1✔
1417
                                        perms.Subscribe = &SubjectPermission{}
×
1418
                                }
×
1419
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1420
                        }
1421
                }
1422
                c.setPermissions(perms)
11✔
1423
        }
1424

1425
        var resumeConnect bool
1,331✔
1426

1,331✔
1427
        // If this is a remote connection and this is the first INFO protocol,
1,331✔
1428
        // then we need to finish the connect process by sending CONNECT, etc..
1,331✔
1429
        if firstINFO && didSolicit {
1,962✔
1430
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
631✔
1431
                c.nc.SetDeadline(time.Time{})
631✔
1432
                resumeConnect = true
631✔
1433
        } else if !firstINFO && didSolicit {
1,947✔
1434
                c.leaf.remoteAccName = info.RemoteAccount
616✔
1435
        }
616✔
1436

1437
        // Check if we have the remote account information and if so make sure it's stored.
1438
        if info.RemoteAccount != _EMPTY_ {
1,940✔
1439
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
609✔
1440
        }
609✔
1441
        c.mu.Unlock()
1,331✔
1442

1,331✔
1443
        finishConnect := info.ConnectInfo
1,331✔
1444
        if resumeConnect && s != nil {
1,962✔
1445
                s.leafNodeResumeConnectProcess(c)
631✔
1446
                if !info.InfoOnConnect {
631✔
1447
                        finishConnect = true
×
1448
                }
×
1449
        }
1450
        if finishConnect {
1,940✔
1451
                s.leafNodeFinishConnectProcess(c)
609✔
1452
        }
609✔
1453
}
1454

1455
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,203✔
1456
        // Negotiate the appropriate compression mode (or no compression)
1,203✔
1457
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,203✔
1458
        if err != nil {
1,203✔
1459
                return false, err
×
1460
        }
×
1461
        c.mu.Lock()
1,203✔
1462
        // For "auto" mode, set the initial compression mode based on RTT
1,203✔
1463
        if cm == CompressionS2Auto {
2,238✔
1464
                if c.rttStart.IsZero() {
2,070✔
1465
                        c.rtt = computeRTT(c.start)
1,035✔
1466
                }
1,035✔
1467
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,035✔
1468
        }
1469
        // Keep track of the negotiated compression mode.
1470
        c.leaf.compression = cm
1,203✔
1471
        cid := c.cid
1,203✔
1472
        var nonce string
1,203✔
1473
        if !didSolicit {
1,731✔
1474
                nonce = bytesToString(c.nonce)
528✔
1475
        }
528✔
1476
        c.mu.Unlock()
1,203✔
1477

1,203✔
1478
        if !needsCompression(cm) {
1,332✔
1479
                return false, nil
129✔
1480
        }
129✔
1481

1482
        // If we end-up doing compression...
1483

1484
        // Generate an INFO with the chosen compression mode.
1485
        s.mu.Lock()
1,074✔
1486
        info := s.copyLeafNodeInfo()
1,074✔
1487
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,074✔
1488
        infoProto := generateInfoJSON(info)
1,074✔
1489
        s.mu.Unlock()
1,074✔
1490

1,074✔
1491
        // If we solicited, then send this INFO protocol BEFORE switching
1,074✔
1492
        // to compression writer. However, if we did not, we send it after.
1,074✔
1493
        c.mu.Lock()
1,074✔
1494
        if didSolicit {
1,620✔
1495
                c.enqueueProto(infoProto)
546✔
1496
                // Make sure it is completely flushed (the pending bytes goes to
546✔
1497
                // 0) before proceeding.
546✔
1498
                for c.out.pb > 0 && !c.isClosed() {
1,092✔
1499
                        c.flushOutbound()
546✔
1500
                }
546✔
1501
        }
1502
        // This is to notify the readLoop that it should switch to a
1503
        // (de)compression reader.
1504
        c.in.flags.set(switchToCompression)
1,074✔
1505
        // Create the compress writer before queueing the INFO protocol for
1,074✔
1506
        // a route that did not solicit. It will make sure that that proto
1,074✔
1507
        // is sent with compression on.
1,074✔
1508
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,074✔
1509
        if !didSolicit {
1,602✔
1510
                c.enqueueProto(infoProto)
528✔
1511
        }
528✔
1512
        c.mu.Unlock()
1,074✔
1513
        return true, nil
1,074✔
1514
}
1515

1516
// When getting a leaf node INFO protocol, use the provided
1517
// array of urls to update the list of possible endpoints.
1518
func (c *client) updateLeafNodeURLs(info *Info) {
1,247✔
1519
        cfg := c.leaf.remote
1,247✔
1520
        cfg.Lock()
1,247✔
1521
        defer cfg.Unlock()
1,247✔
1522

1,247✔
1523
        // We have ensured that if a remote has a WS scheme, then all are.
1,247✔
1524
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,247✔
1525
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,301✔
1526
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1527
                // we will have already marked that the remote should use TLS anyway.
54✔
1528
                // But use proper scheme for log statements, etc...
54✔
1529
                proto := wsSchemePrefix
54✔
1530
                if cfg.TLS {
54✔
1531
                        proto = wsSchemePrefixTLS
×
1532
                }
×
1533
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1534
                return
54✔
1535
        }
1536
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,193✔
1537
}
1538

1539
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,247✔
1540
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,247✔
1541
        // Add the ones we receive in the protocol
1,247✔
1542
        for _, surl := range URLs {
3,528✔
1543
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,281✔
1544
                if err != nil {
2,281✔
1545
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1546
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1547
                        continue
×
1548
                }
1549
                // Do not add if it's the same as what we already have configured.
1550
                var dup bool
2,281✔
1551
                for _, u := range cfg.URLs {
5,814✔
1552
                        // URLs that we receive never have user info, but the
3,533✔
1553
                        // ones that were configured may have. Simply compare
3,533✔
1554
                        // host and port to decide if they are equal or not.
3,533✔
1555
                        if url.Host == u.Host && url.Port() == u.Port() {
5,222✔
1556
                                dup = true
1,689✔
1557
                                break
1,689✔
1558
                        }
1559
                }
1560
                if !dup {
2,873✔
1561
                        cfg.urls = append(cfg.urls, url)
592✔
1562
                        cfg.saveTLSHostname(url)
592✔
1563
                }
592✔
1564
        }
1565
        // Add the configured one
1566
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,247✔
1567
}
1568

1569
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1570
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
2,748✔
1571
        opts := s.getOpts()
2,748✔
1572
        if opts.LeafNode.Advertise != _EMPTY_ {
2,759✔
1573
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1574
                if err != nil {
11✔
1575
                        return err
×
1576
                }
×
1577
                s.leafNodeInfo.Host = advHost
11✔
1578
                s.leafNodeInfo.Port = advPort
11✔
1579
        } else {
2,737✔
1580
                s.leafNodeInfo.Host = opts.LeafNode.Host
2,737✔
1581
                s.leafNodeInfo.Port = opts.LeafNode.Port
2,737✔
1582
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
2,737✔
1583
                // This will return at most 1 IP.
2,737✔
1584
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
2,737✔
1585
                if err != nil {
2,737✔
1586
                        return err
×
1587
                }
×
1588
                if hostIsIPAny {
3,018✔
1589
                        if len(ips) == 0 {
281✔
1590
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1591
                                        s.leafNodeInfo.Host)
×
1592
                        } else {
281✔
1593
                                // Take the first from the list...
281✔
1594
                                s.leafNodeInfo.Host = ips[0]
281✔
1595
                        }
281✔
1596
                }
1597
        }
1598
        // Use just host:port for the IP
1599
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
2,748✔
1600
        if opts.LeafNode.Advertise != _EMPTY_ {
2,759✔
1601
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1602
        }
11✔
1603
        return nil
2,748✔
1604
}
1605

1606
// Add the connection to the map of leaf nodes.
1607
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1608
// if a connection already exists for the same server name and account.
1609
// That can happen when the remote is attempting to reconnect while the accepting
1610
// side did not detect the connection as broken yet.
1611
// But it can also happen when there is a misconfiguration and the remote is
1612
// creating two (or more) connections that bind to the same account on the accept
1613
// side.
1614
// When a duplicate is found, the new connection is accepted and the old is closed
1615
// (this solves the stale connection situation). An error is returned to help the
1616
// remote detect the misconfiguration when the duplicate is the result of that
1617
// misconfiguration.
1618
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,242✔
1619
        var accName string
1,242✔
1620
        c.mu.Lock()
1,242✔
1621
        cid := c.cid
1,242✔
1622
        acc := c.acc
1,242✔
1623
        if acc != nil {
2,484✔
1624
                accName = acc.Name
1,242✔
1625
        }
1,242✔
1626
        myRemoteDomain := c.leaf.remoteDomain
1,242✔
1627
        mySrvName := c.leaf.remoteServer
1,242✔
1628
        remoteAccName := c.leaf.remoteAccName
1,242✔
1629
        myClustName := c.leaf.remoteCluster
1,242✔
1630
        solicited := c.leaf.remote != nil
1,242✔
1631
        c.mu.Unlock()
1,242✔
1632

1,242✔
1633
        var old *client
1,242✔
1634
        s.mu.Lock()
1,242✔
1635
        // We check for empty because in some test we may send empty CONNECT{}
1,242✔
1636
        if checkForDup && srvName != _EMPTY_ {
1,851✔
1637
                for _, ol := range s.leafs {
970✔
1638
                        ol.mu.Lock()
361✔
1639
                        // We care here only about non solicited Leafnode. This function
361✔
1640
                        // is more about replacing stale connections than detecting loops.
361✔
1641
                        // We have code for the loop detection elsewhere, which also delays
361✔
1642
                        // attempt to reconnect.
361✔
1643
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
361✔
1644
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
361✔
1645
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
363✔
1646
                                old = ol
2✔
1647
                        }
2✔
1648
                        ol.mu.Unlock()
361✔
1649
                        if old != nil {
363✔
1650
                                break
2✔
1651
                        }
1652
                }
1653
        }
1654
        // Store new connection in the map
1655
        s.leafs[cid] = c
1,242✔
1656
        s.mu.Unlock()
1,242✔
1657
        s.removeFromTempClients(cid)
1,242✔
1658

1,242✔
1659
        // If applicable, evict the old one.
1,242✔
1660
        if old != nil {
1,244✔
1661
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1662
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1663
                c.Warnf("Replacing connection from same server")
2✔
1664
        }
2✔
1665

1666
        srvDecorated := func() string {
1,454✔
1667
                if myClustName == _EMPTY_ {
233✔
1668
                        return mySrvName
21✔
1669
                }
21✔
1670
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
191✔
1671
        }
1672

1673
        opts := s.getOpts()
1,242✔
1674
        sysAcc := s.SystemAccount()
1,242✔
1675
        js := s.getJetStream()
1,242✔
1676
        var meta *raft
1,242✔
1677
        if js != nil {
1,768✔
1678
                if mg := js.getMetaGroup(); mg != nil {
963✔
1679
                        meta = mg.(*raft)
437✔
1680
                }
437✔
1681
        }
1682
        blockMappingOutgoing := false
1,242✔
1683
        // Deny (non domain) JetStream API traffic unless system account is shared
1,242✔
1684
        // and domain names are identical and extending is not disabled
1,242✔
1685

1,242✔
1686
        // Check if backwards compatibility has been enabled and needs to be acted on
1,242✔
1687
        forceSysAccDeny := false
1,242✔
1688
        if len(opts.JsAccDefaultDomain) > 0 {
1,279✔
1689
                if acc == sysAcc {
48✔
1690
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1691
                                if d == _EMPTY_ {
19✔
1692
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1693
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1694
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1695
                                        forceSysAccDeny = true
8✔
1696
                                        break
8✔
1697
                                }
1698
                        }
1699
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1700
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1701
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1702
                        return
15✔
1703
                }
15✔
1704
        }
1705

1706
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1707
        // This is either signaled by js being disabled and a domain set,
1708
        // or in cases where no domain name exists, an extension hint is set.
1709
        // However, this is only relevant in mixed setups.
1710
        //
1711
        // If the system account connects but default domains are present, JetStream can't be extended.
1712
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,227✔
1713
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,297✔
1714
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,070✔
1715
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,070✔
1716
                if acc != nil && acc == sysAcc {
1,210✔
1717
                        c.Noticef("System account connected from %s", srvDecorated())
140✔
1718
                        c.Noticef("JetStream not extended, domains differ")
140✔
1719
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
140✔
1720
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
140✔
1721
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
140✔
1722
                        if solicited && meta != nil && meta.IsObserver() {
169✔
1723
                                meta.setObserver(false, extNotExtended)
29✔
1724
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1725
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1726
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1727
                                // Meta controller can't be leader yet.
29✔
1728
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1729
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1730
                                meta.Campaign()
29✔
1731
                        }
29✔
1732
                } else {
930✔
1733
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
930✔
1734
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
930✔
1735
                }
930✔
1736
                blockMappingOutgoing = true
1,070✔
1737
        } else if acc == sysAcc {
229✔
1738
                // system account and same domain
72✔
1739
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1740
                        myRemoteDomain, srvDecorated())
72✔
1741
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1742
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1743
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1744
                        meta.setObserver(true, extExtended)
4✔
1745
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1746
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1747
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1748
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1749
                        meta.StepDown()
4✔
1750
                }
4✔
1751
        } else {
85✔
1752
                // This deny is needed in all cases (system account shared or not)
85✔
1753
                // If the system account is shared, jsAllAPI traffic will go through the system account.
85✔
1754
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
85✔
1755
                // If the system account is NOT shared, jsAllAPI traffic has no business
85✔
1756
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
85✔
1757
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
85✔
1758
        }
85✔
1759
        // If we have a specified JetStream domain we will want to add a mapping to
1760
        // allow access cross domain for each non-system account.
1761
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,456✔
1762
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,290✔
1763
                        if err := acc.AddMapping(src, dest); err != nil {
2,061✔
1764
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1765
                        } else {
2,061✔
1766
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,061✔
1767
                        }
2,061✔
1768
                }
1769
                if blockMappingOutgoing {
427✔
1770
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
198✔
1771
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
198✔
1772
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
198✔
1773
                        // of this issue, not all of them.
198✔
1774
                        // This guards against a hub and a spoke having the same domain name.
198✔
1775
                        // But not two spokes having the same one and the request coming from the hub.
198✔
1776
                        c.mergeDenyPermissionsLocked(pub, []string{src})
198✔
1777
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
198✔
1778
                }
198✔
1779
        }
1780
}
1781

1782
func (s *Server) removeLeafNodeConnection(c *client) {
1,559✔
1783
        c.mu.Lock()
1,559✔
1784
        cid := c.cid
1,559✔
1785
        if c.leaf != nil {
3,118✔
1786
                if c.leaf.tsubt != nil {
2,662✔
1787
                        c.leaf.tsubt.Stop()
1,103✔
1788
                        c.leaf.tsubt = nil
1,103✔
1789
                }
1,103✔
1790
                if c.leaf.gwSub != nil {
2,166✔
1791
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
607✔
1792
                        // We need to set this to nil for GC to release the connection
607✔
1793
                        c.leaf.gwSub = nil
607✔
1794
                }
607✔
1795
        }
1796
        c.mu.Unlock()
1,559✔
1797
        s.mu.Lock()
1,559✔
1798
        delete(s.leafs, cid)
1,559✔
1799
        s.mu.Unlock()
1,559✔
1800
        s.removeFromTempClients(cid)
1,559✔
1801
}
1802

1803
// Connect information for solicited leafnodes.
1804
type leafConnectInfo struct {
1805
        Version   string   `json:"version,omitempty"`
1806
        Nkey      string   `json:"nkey,omitempty"`
1807
        JWT       string   `json:"jwt,omitempty"`
1808
        Sig       string   `json:"sig,omitempty"`
1809
        User      string   `json:"user,omitempty"`
1810
        Pass      string   `json:"pass,omitempty"`
1811
        Token     string   `json:"auth_token,omitempty"`
1812
        ID        string   `json:"server_id,omitempty"`
1813
        Domain    string   `json:"domain,omitempty"`
1814
        Name      string   `json:"name,omitempty"`
1815
        Hub       bool     `json:"is_hub,omitempty"`
1816
        Cluster   string   `json:"cluster,omitempty"`
1817
        Headers   bool     `json:"headers,omitempty"`
1818
        JetStream bool     `json:"jetstream,omitempty"`
1819
        DenyPub   []string `json:"deny_pub,omitempty"`
1820

1821
        // There was an existing field called:
1822
        // >> Comp bool `json:"compression,omitempty"`
1823
        // that has never been used. With support for compression, we now need
1824
        // a field that is a string. So we use a different json tag:
1825
        Compression string `json:"compress_mode,omitempty"`
1826

1827
        // Just used to detect wrong connection attempts.
1828
        Gateway string `json:"gateway,omitempty"`
1829

1830
        // Tells the accept side which account the remote is binding to.
1831
        RemoteAccount string `json:"remote_account,omitempty"`
1832

1833
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1834
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1835
        // version as part of the CONNECT. It will indicate if a connection supports
1836
        // some features, such as message tracing.
1837
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1838
        // in the low level process CONNECT.
1839
        Proto int `json:"protocol,omitempty"`
1840
}
1841

1842
// processLeafNodeConnect will process the inbound connect args.
1843
// Once we are here we are bound to an account, so can send any interest that
1844
// we would have to the other side.
1845
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
641✔
1846
        // Way to detect clients that incorrectly connect to the route listen
641✔
1847
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
641✔
1848
        if lang != _EMPTY_ {
641✔
1849
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1850
                c.closeConnection(WrongPort)
×
1851
                return ErrClientConnectedToLeafNodePort
×
1852
        }
×
1853

1854
        // Unmarshal as a leaf node connect protocol
1855
        proto := &leafConnectInfo{}
641✔
1856
        if err := json.Unmarshal(arg, proto); err != nil {
641✔
1857
                return err
×
1858
        }
×
1859

1860
        // Reject a cluster that contains spaces.
1861
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
642✔
1862
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1863
                c.closeConnection(ProtocolViolation)
1✔
1864
                return ErrClusterNameHasSpaces
1✔
1865
        }
1✔
1866

1867
        // Check for cluster name collisions.
1868
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
644✔
1869
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1870
                c.closeConnection(ClusterNamesIdentical)
4✔
1871
                return ErrLeafNodeHasSameClusterName
4✔
1872
        }
4✔
1873

1874
        // Reject if this has Gateway which means that it would be from a gateway
1875
        // connection that incorrectly connects to the leafnode port.
1876
        if proto.Gateway != _EMPTY_ {
636✔
1877
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1878
                c.Errorf(errTxt)
×
1879
                c.sendErr(errTxt)
×
1880
                c.closeConnection(WrongGateway)
×
1881
                return ErrWrongGateway
×
1882
        }
×
1883

1884
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
638✔
1885
                major, minor, update, _ := versionComponents(mv)
2✔
1886
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1887
                        // We are going to send back an INFO because otherwise recent
1✔
1888
                        // versions of the remote server would simply break the connection
1✔
1889
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1890
                        // other side to just "stall" until we finish waiting for the holding
1✔
1891
                        // period and close the connection below.
1✔
1892
                        s.sendPermsAndAccountInfo(c)
1✔
1893
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1894
                        select {
1✔
1895
                        case <-c.srv.quitCh:
1✔
1896
                        case <-time.After(leafNodeWaitBeforeClose):
×
1897
                        }
1898
                        c.closeConnection(MinimumVersionRequired)
1✔
1899
                        return ErrMinimumVersionRequired
1✔
1900
                }
1901
        }
1902

1903
        // Check if this server supports headers.
1904
        supportHeaders := c.srv.supportsHeaders()
635✔
1905

635✔
1906
        c.mu.Lock()
635✔
1907
        // Leaf Nodes do not do echo or verbose or pedantic.
635✔
1908
        c.opts.Verbose = false
635✔
1909
        c.opts.Echo = false
635✔
1910
        c.opts.Pedantic = false
635✔
1911
        // This inbound connection will be marked as supporting headers if this server
635✔
1912
        // support headers and the remote has sent in the CONNECT protocol that it does
635✔
1913
        // support headers too.
635✔
1914
        c.headers = supportHeaders && proto.Headers
635✔
1915
        // If the compression level is still not set, set it based on what has been
635✔
1916
        // given to us in the CONNECT protocol.
635✔
1917
        if c.leaf.compression == _EMPTY_ {
756✔
1918
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
121✔
1919
                if proto.Compression == _EMPTY_ {
147✔
1920
                        c.leaf.compression = CompressionNotSupported
26✔
1921
                } else {
121✔
1922
                        c.leaf.compression = proto.Compression
95✔
1923
                }
95✔
1924
        }
1925

1926
        // Remember the remote server.
1927
        c.leaf.remoteServer = proto.Name
635✔
1928
        // Remember the remote account name
635✔
1929
        c.leaf.remoteAccName = proto.RemoteAccount
635✔
1930

635✔
1931
        // If the other side has declared itself a hub, so we will take on the spoke role.
635✔
1932
        if proto.Hub {
641✔
1933
                c.leaf.isSpoke = true
6✔
1934
        }
6✔
1935

1936
        // The soliciting side is part of a cluster.
1937
        if proto.Cluster != _EMPTY_ {
1,131✔
1938
                c.leaf.remoteCluster = proto.Cluster
496✔
1939
        }
496✔
1940

1941
        c.leaf.remoteDomain = proto.Domain
635✔
1942

635✔
1943
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
635✔
1944
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
635✔
1945
        if !c.isSolicitedLeafNode() && c.perms != nil {
647✔
1946
                sp, pp := c.perms.sub, c.perms.pub
12✔
1947
                c.perms.sub, c.perms.pub = pp, sp
12✔
1948
                if c.opts.Import != nil {
23✔
1949
                        c.darray = c.opts.Import.Deny
11✔
1950
                } else {
12✔
1951
                        c.darray = nil
1✔
1952
                }
1✔
1953
        }
1954

1955
        // Set the Ping timer
1956
        c.setFirstPingTimer()
635✔
1957

635✔
1958
        // If we received pub deny permissions from the other end, merge with existing ones.
635✔
1959
        c.mergeDenyPermissions(pub, proto.DenyPub)
635✔
1960

635✔
1961
        c.mu.Unlock()
635✔
1962

635✔
1963
        // Register the cluster, even if empty, as long as we are acting as a hub.
635✔
1964
        if !proto.Hub {
1,264✔
1965
                c.acc.registerLeafNodeCluster(proto.Cluster)
629✔
1966
        }
629✔
1967

1968
        // Add in the leafnode here since we passed through auth at this point.
1969
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
635✔
1970

635✔
1971
        // If we have permissions bound to this leafnode we need to send then back to the
635✔
1972
        // origin server for local enforcement.
635✔
1973
        s.sendPermsAndAccountInfo(c)
635✔
1974

635✔
1975
        // Create and initialize the smap since we know our bound account now.
635✔
1976
        // This will send all registered subs too.
635✔
1977
        s.initLeafNodeSmapAndSendSubs(c)
635✔
1978

635✔
1979
        // Announce the account connect event for a leaf node.
635✔
1980
        // This will no-op as needed.
635✔
1981
        s.sendLeafNodeConnect(c.acc)
635✔
1982

635✔
1983
        return nil
635✔
1984
}
1985

1986
// Returns the remote cluster name. This is set only once so does not require a lock.
1987
func (c *client) remoteCluster() string {
183,451✔
1988
        if c.leaf == nil {
183,451✔
1989
                return _EMPTY_
×
1990
        }
×
1991
        return c.leaf.remoteCluster
183,451✔
1992
}
1993

1994
// Sends back an info block to the soliciting leafnode to let it know about
1995
// its permission settings for local enforcement.
1996
func (s *Server) sendPermsAndAccountInfo(c *client) {
636✔
1997
        // Copy
636✔
1998
        info := s.copyLeafNodeInfo()
636✔
1999
        c.mu.Lock()
636✔
2000
        info.CID = c.cid
636✔
2001
        info.Import = c.opts.Import
636✔
2002
        info.Export = c.opts.Export
636✔
2003
        info.RemoteAccount = c.acc.Name
636✔
2004
        info.ConnectInfo = true
636✔
2005
        c.enqueueProto(generateInfoJSON(info))
636✔
2006
        c.mu.Unlock()
636✔
2007
}
636✔
2008

2009
// Snapshot the current subscriptions from the sublist into our smap which
2010
// we will keep updated from now on.
2011
// Also send the registered subscriptions.
2012
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,242✔
2013
        acc := c.acc
1,242✔
2014
        if acc == nil {
1,242✔
2015
                c.Debugf("Leafnode does not have an account bound")
×
2016
                return
×
2017
        }
×
2018
        // Collect all account subs here.
2019
        _subs := [1024]*subscription{}
1,242✔
2020
        subs := _subs[:0]
1,242✔
2021
        ims := []string{}
1,242✔
2022

1,242✔
2023
        // Hold the client lock otherwise there can be a race and miss some subs.
1,242✔
2024
        c.mu.Lock()
1,242✔
2025
        defer c.mu.Unlock()
1,242✔
2026

1,242✔
2027
        acc.mu.RLock()
1,242✔
2028
        accName := acc.Name
1,242✔
2029
        accNTag := acc.nameTag
1,242✔
2030

1,242✔
2031
        // To make printing look better when no friendly name present.
1,242✔
2032
        if accNTag != _EMPTY_ {
1,252✔
2033
                accNTag = "/" + accNTag
10✔
2034
        }
10✔
2035

2036
        // If we are solicited we only send interest for local clients.
2037
        if c.isSpokeLeafNode() {
1,849✔
2038
                acc.sl.localSubs(&subs, true)
607✔
2039
        } else {
1,242✔
2040
                acc.sl.All(&subs)
635✔
2041
        }
635✔
2042

2043
        // Check if we have an existing service import reply.
2044
        siReply := copyBytes(acc.siReply)
1,242✔
2045

1,242✔
2046
        // Since leaf nodes only send on interest, if the bound
1,242✔
2047
        // account has import services we need to send those over.
1,242✔
2048
        for isubj := range acc.imports.services {
5,827✔
2049
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,855✔
2050
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
270✔
2051
                        continue
270✔
2052
                }
2053
                ims = append(ims, isubj)
4,315✔
2054
        }
2055
        // Likewise for mappings.
2056
        for _, m := range acc.mappings {
3,423✔
2057
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,217✔
2058
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2059
                        continue
36✔
2060
                }
2061
                ims = append(ims, m.src)
2,145✔
2062
        }
2063

2064
        // Create a unique subject that will be used for loop detection.
2065
        lds := acc.lds
1,242✔
2066
        acc.mu.RUnlock()
1,242✔
2067

1,242✔
2068
        // Check if we have to create the LDS.
1,242✔
2069
        if lds == _EMPTY_ {
2,224✔
2070
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
982✔
2071
                acc.mu.Lock()
982✔
2072
                acc.lds = lds
982✔
2073
                acc.mu.Unlock()
982✔
2074
        }
982✔
2075

2076
        // Now check for gateway interest. Leafnodes will put this into
2077
        // the proper mode to propagate, but they are not held in the account.
2078
        gwsa := [16]*client{}
1,242✔
2079
        gws := gwsa[:0]
1,242✔
2080
        s.getOutboundGatewayConnections(&gws)
1,242✔
2081
        for _, cgw := range gws {
1,319✔
2082
                cgw.mu.Lock()
77✔
2083
                gw := cgw.gw
77✔
2084
                cgw.mu.Unlock()
77✔
2085
                if gw != nil {
154✔
2086
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
154✔
2087
                                if e := ei.(*outsie); e != nil && e.sl != nil {
154✔
2088
                                        e.sl.All(&subs)
77✔
2089
                                }
77✔
2090
                        }
2091
                }
2092
        }
2093

2094
        applyGlobalRouting := s.gateway.enabled
1,242✔
2095
        if c.isSpokeLeafNode() {
1,849✔
2096
                // Add a fake subscription for this solicited leafnode connection
607✔
2097
                // so that we can send back directly for mapped GW replies.
607✔
2098
                // We need to keep track of this subscription so it can be removed
607✔
2099
                // when the connection is closed so that the GC can release it.
607✔
2100
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
607✔
2101
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
607✔
2102
        }
607✔
2103

2104
        // Now walk the results and add them to our smap
2105
        rc := c.leaf.remoteCluster
1,242✔
2106
        c.leaf.smap = make(map[string]int32)
1,242✔
2107
        for _, sub := range subs {
39,428✔
2108
                // Check perms regardless of role.
38,186✔
2109
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,584✔
2110
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,398✔
2111
                        continue
2,398✔
2112
                }
2113
                // We ignore ourselves here.
2114
                // Also don't add the subscription if it has a origin cluster and the
2115
                // cluster name matches the one of the client we are sending to.
2116
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
66,001✔
2117
                        count := int32(1)
30,213✔
2118
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,222✔
2119
                                count = sub.qw
9✔
2120
                        }
9✔
2121
                        c.leaf.smap[keyFromSub(sub)] += count
30,213✔
2122
                        if c.leaf.tsub == nil {
31,382✔
2123
                                c.leaf.tsub = make(map[*subscription]struct{})
1,169✔
2124
                        }
1,169✔
2125
                        c.leaf.tsub[sub] = struct{}{}
30,213✔
2126
                }
2127
        }
2128
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2129
        for _, isubj := range ims {
7,702✔
2130
                c.leaf.smap[isubj]++
6,460✔
2131
        }
6,460✔
2132
        // If we have gateways enabled we need to make sure the other side sends us responses
2133
        // that have been augmented from the original subscription.
2134
        // TODO(dlc) - Should we lock this down more?
2135
        if applyGlobalRouting {
1,339✔
2136
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
97✔
2137
                c.leaf.smap[gwReplyPrefix+">"]++
97✔
2138
        }
97✔
2139
        // Detect loops by subscribing to a specific subject and checking
2140
        // if this sub is coming back to us.
2141
        c.leaf.smap[lds]++
1,242✔
2142

1,242✔
2143
        // Check if we need to add an existing siReply to our map.
1,242✔
2144
        // This will be a prefix so add on the wildcard.
1,242✔
2145
        if siReply != nil {
1,259✔
2146
                wcsub := append(siReply, '>')
17✔
2147
                c.leaf.smap[string(wcsub)]++
17✔
2148
        }
17✔
2149
        // Queue all protocols. There is no max pending limit for LN connection,
2150
        // so we don't need chunking. The writes will happen from the writeLoop.
2151
        var b bytes.Buffer
1,242✔
2152
        for key, n := range c.leaf.smap {
27,450✔
2153
                c.writeLeafSub(&b, key, n)
26,208✔
2154
        }
26,208✔
2155
        if b.Len() > 0 {
2,484✔
2156
                c.enqueueProto(b.Bytes())
1,242✔
2157
        }
1,242✔
2158
        if c.leaf.tsub != nil {
2,412✔
2159
                // Clear the tsub map after 5 seconds.
1,170✔
2160
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,237✔
2161
                        c.mu.Lock()
67✔
2162
                        if c.leaf != nil {
134✔
2163
                                c.leaf.tsub = nil
67✔
2164
                                c.leaf.tsubt = nil
67✔
2165
                        }
67✔
2166
                        c.mu.Unlock()
67✔
2167
                })
2168
        }
2169
}
2170

2171
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2172
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
195,078✔
2173
        acc, err := s.LookupAccount(accName)
195,078✔
2174
        if acc == nil || err != nil {
195,186✔
2175
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
108✔
2176
                return
108✔
2177
        }
108✔
2178
        acc.updateLeafNodes(sub, delta)
194,970✔
2179
}
2180

2181
// updateLeafNodes will make sure to update the account smap for the subscription.
2182
// Will also forward to all leaf nodes as needed.
2183
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,152,917✔
2184
        if acc == nil || sub == nil {
2,152,917✔
2185
                return
×
2186
        }
×
2187

2188
        // We will do checks for no leafnodes and same cluster here inline and under the
2189
        // general account read lock.
2190
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2191
        // blocking routes or GWs.
2192

2193
        acc.mu.RLock()
2,152,917✔
2194
        // First check if we even have leafnodes here.
2,152,917✔
2195
        if acc.nleafs == 0 {
4,238,515✔
2196
                acc.mu.RUnlock()
2,085,598✔
2197
                return
2,085,598✔
2198
        }
2,085,598✔
2199

2200
        // Is this a loop detection subject.
2201
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
67,319✔
2202

67,319✔
2203
        // Capture the cluster even if its empty.
67,319✔
2204
        var cluster string
67,319✔
2205
        if sub.origin != nil {
116,495✔
2206
                cluster = bytesToString(sub.origin)
49,176✔
2207
        }
49,176✔
2208

2209
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2210
        // Empty clusters will return false for the check.
2211
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
88,839✔
2212
                acc.mu.RUnlock()
21,520✔
2213
                return
21,520✔
2214
        }
21,520✔
2215

2216
        // We can release the general account lock.
2217
        acc.mu.RUnlock()
45,799✔
2218

45,799✔
2219
        // We can hold the list lock here to avoid having to copy a large slice.
45,799✔
2220
        acc.lmu.RLock()
45,799✔
2221
        defer acc.lmu.RUnlock()
45,799✔
2222

45,799✔
2223
        // Do this once.
45,799✔
2224
        subject := string(sub.subject)
45,799✔
2225

45,799✔
2226
        // Walk the connected leafnodes.
45,799✔
2227
        for _, ln := range acc.lleafs {
101,506✔
2228
                if ln == sub.client {
84,547✔
2229
                        continue
28,840✔
2230
                }
2231
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2232
                ln.mu.Lock()
26,867✔
2233
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
26,867✔
2234
                // the detection of loops as long as different cluster.
26,867✔
2235
                clusterDifferent := cluster != ln.remoteCluster()
26,867✔
2236
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
49,604✔
2237
                        ln.updateSmap(sub, delta, isLDS)
22,737✔
2238
                }
22,737✔
2239
                ln.mu.Unlock()
26,867✔
2240
        }
2241
}
2242

2243
// This will make an update to our internal smap and determine if we should send out
2244
// an interest update to the remote side.
2245
// Lock should be held.
2246
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
22,737✔
2247
        if c.leaf.smap == nil {
22,741✔
2248
                return
4✔
2249
        }
4✔
2250

2251
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2252
        skind := sub.client.kind
22,733✔
2253
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
22,733✔
2254
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
31,122✔
2255
                return
8,389✔
2256
        }
8,389✔
2257

2258
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2259
        if delta > 0 && c.leaf.tsub != nil {
21,212✔
2260
                if _, present := c.leaf.tsub[sub]; present {
6,868✔
2261
                        delete(c.leaf.tsub, sub)
×
2262
                        if len(c.leaf.tsub) == 0 {
×
2263
                                c.leaf.tsub = nil
×
2264
                                c.leaf.tsubt.Stop()
×
2265
                                c.leaf.tsubt = nil
×
2266
                        }
×
2267
                        return
×
2268
                }
2269
        }
2270

2271
        key := keyFromSub(sub)
14,344✔
2272
        n, ok := c.leaf.smap[key]
14,344✔
2273
        if delta < 0 && !ok {
15,259✔
2274
                return
915✔
2275
        }
915✔
2276

2277
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2278
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,429✔
2279
        n += delta
13,429✔
2280
        if n > 0 {
23,501✔
2281
                c.leaf.smap[key] = n
10,072✔
2282
        } else {
13,429✔
2283
                delete(c.leaf.smap, key)
3,357✔
2284
        }
3,357✔
2285
        if update {
22,437✔
2286
                c.sendLeafNodeSubUpdate(key, n)
9,008✔
2287
        }
9,008✔
2288
}
2289

2290
// Used to force add subjects to the subject map.
2291
func (c *client) forceAddToSmap(subj string) {
4✔
2292
        c.mu.Lock()
4✔
2293
        defer c.mu.Unlock()
4✔
2294

4✔
2295
        if c.leaf.smap == nil {
4✔
2296
                return
×
2297
        }
×
2298
        n := c.leaf.smap[subj]
4✔
2299
        if n != 0 {
5✔
2300
                return
1✔
2301
        }
1✔
2302
        // Place into the map since it was not there.
2303
        c.leaf.smap[subj] = 1
3✔
2304
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2305
}
2306

2307
// Used to force remove a subject from the subject map.
2308
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2309
        c.mu.Lock()
1✔
2310
        defer c.mu.Unlock()
1✔
2311

1✔
2312
        if c.leaf.smap == nil {
1✔
2313
                return
×
2314
        }
×
2315
        n := c.leaf.smap[subj]
1✔
2316
        if n == 0 {
1✔
2317
                return
×
2318
        }
×
2319
        n--
1✔
2320
        if n == 0 {
2✔
2321
                // Remove is now zero
1✔
2322
                delete(c.leaf.smap, subj)
1✔
2323
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2324
        } else {
1✔
2325
                c.leaf.smap[subj] = n
×
2326
        }
×
2327
}
2328

2329
// Send the subscription interest change to the other side.
2330
// Lock should be held.
2331
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,012✔
2332
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,012✔
2333
        if c.isSpokeLeafNode() {
11,188✔
2334
                checkPerms := true
2,176✔
2335
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,516✔
2336
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,340✔
2337
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,340✔
2338
                                strings.HasPrefix(key, gwReplyPrefix) {
1,421✔
2339
                                checkPerms = false
81✔
2340
                        }
81✔
2341
                }
2342
                if checkPerms {
4,271✔
2343
                        var subject string
2,095✔
2344
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,506✔
2345
                                subject = key[:sep]
411✔
2346
                        } else {
2,095✔
2347
                                subject = key
1,684✔
2348
                        }
1,684✔
2349
                        if !c.canSubscribe(subject) {
2,095✔
2350
                                return
×
2351
                        }
×
2352
                }
2353
        }
2354
        // If we are here we can send over to the other side.
2355
        _b := [64]byte{}
9,012✔
2356
        b := bytes.NewBuffer(_b[:0])
9,012✔
2357
        c.writeLeafSub(b, key, n)
9,012✔
2358
        c.enqueueProto(b.Bytes())
9,012✔
2359
}
2360

2361
// Helper function to build the key.
2362
func keyFromSub(sub *subscription) string {
45,499✔
2363
        var sb strings.Builder
45,499✔
2364
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,499✔
2365
        sb.Write(sub.subject)
45,499✔
2366
        if sub.queue != nil {
48,997✔
2367
                // Just make the key subject spc group, e.g. 'foo bar'
3,498✔
2368
                sb.WriteByte(' ')
3,498✔
2369
                sb.Write(sub.queue)
3,498✔
2370
        }
3,498✔
2371
        return sb.String()
45,499✔
2372
}
2373

2374
const (
2375
        keyRoutedSub         = "R"
2376
        keyRoutedSubByte     = 'R'
2377
        keyRoutedLeafSub     = "L"
2378
        keyRoutedLeafSubByte = 'L'
2379
)
2380

2381
// Helper function to build the key that prevents collisions between normal
2382
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2383
// Keys will look like this:
2384
// "R foo"          -> plain routed sub on "foo"
2385
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2386
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2387
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2388
func keyFromSubWithOrigin(sub *subscription) string {
622,824✔
2389
        var sb strings.Builder
622,824✔
2390
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
622,824✔
2391
        leaf := len(sub.origin) > 0
622,824✔
2392
        if leaf {
639,718✔
2393
                sb.WriteByte(keyRoutedLeafSubByte)
16,894✔
2394
        } else {
622,824✔
2395
                sb.WriteByte(keyRoutedSubByte)
605,930✔
2396
        }
605,930✔
2397
        sb.WriteByte(' ')
622,824✔
2398
        sb.Write(sub.subject)
622,824✔
2399
        if sub.queue != nil {
646,884✔
2400
                sb.WriteByte(' ')
24,060✔
2401
                sb.Write(sub.queue)
24,060✔
2402
        }
24,060✔
2403
        if leaf {
639,718✔
2404
                sb.WriteByte(' ')
16,894✔
2405
                sb.Write(sub.origin)
16,894✔
2406
        }
16,894✔
2407
        return sb.String()
622,824✔
2408
}
2409

2410
// Lock should be held.
2411
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,220✔
2412
        if key == _EMPTY_ {
35,220✔
2413
                return
×
2414
        }
×
2415
        if n > 0 {
67,082✔
2416
                w.WriteString("LS+ " + key)
31,862✔
2417
                // Check for queue semantics, if found write n.
31,862✔
2418
                if strings.Contains(key, " ") {
34,034✔
2419
                        w.WriteString(" ")
2,172✔
2420
                        var b [12]byte
2,172✔
2421
                        var i = len(b)
2,172✔
2422
                        for l := n; l > 0; l /= 10 {
5,240✔
2423
                                i--
3,068✔
2424
                                b[i] = digits[l%10]
3,068✔
2425
                        }
3,068✔
2426
                        w.Write(b[i:])
2,172✔
2427
                        if c.trace {
2,172✔
2428
                                arg := fmt.Sprintf("%s %d", key, n)
×
2429
                                c.traceOutOp("LS+", []byte(arg))
×
2430
                        }
×
2431
                } else if c.trace {
29,881✔
2432
                        c.traceOutOp("LS+", []byte(key))
191✔
2433
                }
191✔
2434
        } else {
3,358✔
2435
                w.WriteString("LS- " + key)
3,358✔
2436
                if c.trace {
3,367✔
2437
                        c.traceOutOp("LS-", []byte(key))
9✔
2438
                }
9✔
2439
        }
2440
        w.WriteString(CR_LF)
35,220✔
2441
}
2442

2443
// processLeafSub will process an inbound sub request for the remote leaf node.
2444
func (c *client) processLeafSub(argo []byte) (err error) {
31,665✔
2445
        // Indicate activity.
31,665✔
2446
        c.in.subs++
31,665✔
2447

31,665✔
2448
        srv := c.srv
31,665✔
2449
        if srv == nil {
31,665✔
2450
                return nil
×
2451
        }
×
2452

2453
        // Copy so we do not reference a potentially large buffer
2454
        arg := make([]byte, len(argo))
31,665✔
2455
        copy(arg, argo)
31,665✔
2456

31,665✔
2457
        args := splitArg(arg)
31,665✔
2458
        sub := &subscription{client: c}
31,665✔
2459

31,665✔
2460
        delta := int32(1)
31,665✔
2461
        switch len(args) {
31,665✔
2462
        case 1:
29,559✔
2463
                sub.queue = nil
29,559✔
2464
        case 3:
2,106✔
2465
                sub.queue = args[1]
2,106✔
2466
                sub.qw = int32(parseSize(args[2]))
2,106✔
2467
                // TODO: (ik) We should have a non empty queue name and a queue
2,106✔
2468
                // weight >= 1. For 2.11, we may want to return an error if that
2,106✔
2469
                // is not the case, but for now just overwrite `delta` if queue
2,106✔
2470
                // weight is greater than 1 (it is possible after a reconnect/
2,106✔
2471
                // server restart to receive a queue weight > 1 for a new sub).
2,106✔
2472
                if sub.qw > 1 {
3,710✔
2473
                        delta = sub.qw
1,604✔
2474
                }
1,604✔
2475
        default:
×
2476
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2477
        }
2478
        sub.subject = args[0]
31,665✔
2479

31,665✔
2480
        c.mu.Lock()
31,665✔
2481
        if c.isClosed() {
31,677✔
2482
                c.mu.Unlock()
12✔
2483
                return nil
12✔
2484
        }
12✔
2485

2486
        acc := c.acc
31,653✔
2487
        // Check if we have a loop.
31,653✔
2488
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,653✔
2489

31,653✔
2490
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,659✔
2491
                c.mu.Unlock()
6✔
2492
                c.handleLeafNodeLoop(true)
6✔
2493
                return nil
6✔
2494
        }
6✔
2495

2496
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2497
        checkPerms := true
31,647✔
2498
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,612✔
2499
                if ldsPrefix ||
28,965✔
2500
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,965✔
2501
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,875✔
2502
                        checkPerms = false
1,910✔
2503
                }
1,910✔
2504
        }
2505

2506
        // If we are a hub check that we can publish to this subject.
2507
        if checkPerms {
61,384✔
2508
                subj := string(sub.subject)
29,737✔
2509
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,040✔
2510
                        c.mu.Unlock()
303✔
2511
                        c.leafSubPermViolation(sub.subject)
303✔
2512
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
303✔
2513
                        return nil
303✔
2514
                }
303✔
2515
        }
2516

2517
        // Check if we have a maximum on the number of subscriptions.
2518
        if c.subsAtLimit() {
31,352✔
2519
                c.mu.Unlock()
8✔
2520
                c.maxSubsExceeded()
8✔
2521
                return nil
8✔
2522
        }
8✔
2523

2524
        // If we have an origin cluster associated mark that in the sub.
2525
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,659✔
2526
                sub.origin = []byte(rc)
28,323✔
2527
        }
28,323✔
2528

2529
        // Like Routes, we store local subs by account and subject and optionally queue name.
2530
        // If we have a queue it will have a trailing weight which we do not want.
2531
        if sub.queue != nil {
33,171✔
2532
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,835✔
2533
        } else {
31,336✔
2534
                sub.sid = arg
29,501✔
2535
        }
29,501✔
2536
        key := bytesToString(sub.sid)
31,336✔
2537
        osub := c.subs[key]
31,336✔
2538
        if osub == nil {
61,202✔
2539
                c.subs[key] = sub
29,866✔
2540
                // Now place into the account sl.
29,866✔
2541
                if err := acc.sl.Insert(sub); err != nil {
29,866✔
2542
                        delete(c.subs, key)
×
2543
                        c.mu.Unlock()
×
2544
                        c.Errorf("Could not insert subscription: %v", err)
×
2545
                        c.sendErr("Invalid Subscription")
×
2546
                        return nil
×
2547
                }
×
2548
        } else if sub.queue != nil {
2,939✔
2549
                // For a queue we need to update the weight.
1,469✔
2550
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,469✔
2551
                atomic.StoreInt32(&osub.qw, sub.qw)
1,469✔
2552
                acc.sl.UpdateRemoteQSub(osub)
1,469✔
2553
        }
1,469✔
2554
        spoke := c.isSpokeLeafNode()
31,336✔
2555
        c.mu.Unlock()
31,336✔
2556

31,336✔
2557
        // Only add in shadow subs if a new sub or qsub.
31,336✔
2558
        if osub == nil {
61,202✔
2559
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,866✔
2560
                        c.Errorf(err.Error())
×
2561
                }
×
2562
        }
2563

2564
        // If we are not solicited, treat leaf node subscriptions similar to a
2565
        // client subscription, meaning we forward them to routes, gateways and
2566
        // other leaf nodes as needed.
2567
        if !spoke {
42,349✔
2568
                // If we are routing add to the route map for the associated account.
11,013✔
2569
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,013✔
2570
                if srv.gateway.enabled {
12,449✔
2571
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,436✔
2572
                }
1,436✔
2573
        }
2574
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2575
        // and non-solicited state in this call so we will do the right thing.
2576
        acc.updateLeafNodes(sub, delta)
31,336✔
2577

31,336✔
2578
        return nil
31,336✔
2579
}
2580

2581
// If the leafnode is a solicited, set the connect delay based on default
2582
// or private option (for tests). Sends the error to the other side, log and
2583
// close the connection.
2584
func (c *client) handleLeafNodeLoop(sendErr bool) {
15✔
2585
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
15✔
2586
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
15✔
2587
        if sendErr {
23✔
2588
                c.sendErr(errTxt)
8✔
2589
        }
8✔
2590

2591
        c.Errorf(errTxt)
15✔
2592
        // If we are here with "sendErr" false, it means that this is the server
15✔
2593
        // that received the error. The other side will have closed the connection,
15✔
2594
        // but does not hurt to close here too.
15✔
2595
        c.closeConnection(ProtocolViolation)
15✔
2596
}
2597

2598
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2599
func (c *client) processLeafUnsub(arg []byte) error {
3,155✔
2600
        // Indicate any activity, so pub and sub or unsubs.
3,155✔
2601
        c.in.subs++
3,155✔
2602

3,155✔
2603
        acc := c.acc
3,155✔
2604
        srv := c.srv
3,155✔
2605

3,155✔
2606
        c.mu.Lock()
3,155✔
2607
        if c.isClosed() {
3,192✔
2608
                c.mu.Unlock()
37✔
2609
                return nil
37✔
2610
        }
37✔
2611

2612
        spoke := c.isSpokeLeafNode()
3,118✔
2613
        // We store local subs by account and subject and optionally queue name.
3,118✔
2614
        // LS- will have the arg exactly as the key.
3,118✔
2615
        sub, ok := c.subs[string(arg)]
3,118✔
2616
        if !ok {
3,130✔
2617
                // If not found, don't try to update routes/gws/leaf nodes.
12✔
2618
                c.mu.Unlock()
12✔
2619
                return nil
12✔
2620
        }
12✔
2621
        delta := int32(1)
3,106✔
2622
        if len(sub.queue) > 0 {
3,437✔
2623
                delta = sub.qw
331✔
2624
        }
331✔
2625
        c.mu.Unlock()
3,106✔
2626

3,106✔
2627
        c.unsubscribe(acc, sub, true, true)
3,106✔
2628
        if !spoke {
4,112✔
2629
                // If we are routing subtract from the route map for the associated account.
1,006✔
2630
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,006✔
2631
                // Gateways
1,006✔
2632
                if srv.gateway.enabled {
1,267✔
2633
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
261✔
2634
                }
261✔
2635
        }
2636
        // Now check on leafnode updates for other leaf nodes.
2637
        acc.updateLeafNodes(sub, -delta)
3,106✔
2638
        return nil
3,106✔
2639
}
2640

2641
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
486✔
2642
        // Unroll splitArgs to avoid runtime/heap issues
486✔
2643
        a := [MAX_MSG_ARGS][]byte{}
486✔
2644
        args := a[:0]
486✔
2645
        start := -1
486✔
2646
        for i, b := range arg {
32,018✔
2647
                switch b {
31,532✔
2648
                case ' ', '\t', '\r', '\n':
1,390✔
2649
                        if start >= 0 {
2,780✔
2650
                                args = append(args, arg[start:i])
1,390✔
2651
                                start = -1
1,390✔
2652
                        }
1,390✔
2653
                default:
30,142✔
2654
                        if start < 0 {
32,018✔
2655
                                start = i
1,876✔
2656
                        }
1,876✔
2657
                }
2658
        }
2659
        if start >= 0 {
972✔
2660
                args = append(args, arg[start:])
486✔
2661
        }
486✔
2662

2663
        c.pa.arg = arg
486✔
2664
        switch len(args) {
486✔
2665
        case 0, 1, 2:
×
2666
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2667
        case 3:
86✔
2668
                c.pa.reply = nil
86✔
2669
                c.pa.queues = nil
86✔
2670
                c.pa.hdb = args[1]
86✔
2671
                c.pa.hdr = parseSize(args[1])
86✔
2672
                c.pa.szb = args[2]
86✔
2673
                c.pa.size = parseSize(args[2])
86✔
2674
        case 4:
386✔
2675
                c.pa.reply = args[1]
386✔
2676
                c.pa.queues = nil
386✔
2677
                c.pa.hdb = args[2]
386✔
2678
                c.pa.hdr = parseSize(args[2])
386✔
2679
                c.pa.szb = args[3]
386✔
2680
                c.pa.size = parseSize(args[3])
386✔
2681
        default:
14✔
2682
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2683
                if len(args[1]) != 1 {
14✔
2684
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2685
                }
×
2686
                switch args[1][0] {
14✔
2687
                case '+':
4✔
2688
                        c.pa.reply = args[2]
4✔
2689
                case '|':
10✔
2690
                        c.pa.reply = nil
10✔
2691
                default:
×
2692
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2693
                }
2694
                // Grab header size.
2695
                c.pa.hdb = args[len(args)-2]
14✔
2696
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2697

14✔
2698
                // Grab size.
14✔
2699
                c.pa.szb = args[len(args)-1]
14✔
2700
                c.pa.size = parseSize(c.pa.szb)
14✔
2701

14✔
2702
                // Grab queue names.
14✔
2703
                if c.pa.reply != nil {
18✔
2704
                        c.pa.queues = args[3 : len(args)-2]
4✔
2705
                } else {
14✔
2706
                        c.pa.queues = args[2 : len(args)-2]
10✔
2707
                }
10✔
2708
        }
2709
        if c.pa.hdr < 0 {
486✔
2710
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2711
        }
×
2712
        if c.pa.size < 0 {
486✔
2713
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2714
        }
×
2715
        if c.pa.hdr > c.pa.size {
486✔
2716
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2717
        }
×
2718

2719
        // Common ones processed after check for arg length
2720
        c.pa.subject = args[0]
486✔
2721

486✔
2722
        return nil
486✔
2723
}
2724

2725
func (c *client) processLeafMsgArgs(arg []byte) error {
99,730✔
2726
        // Unroll splitArgs to avoid runtime/heap issues
99,730✔
2727
        a := [MAX_MSG_ARGS][]byte{}
99,730✔
2728
        args := a[:0]
99,730✔
2729
        start := -1
99,730✔
2730
        for i, b := range arg {
3,017,842✔
2731
                switch b {
2,918,112✔
2732
                case ' ', '\t', '\r', '\n':
147,371✔
2733
                        if start >= 0 {
294,742✔
2734
                                args = append(args, arg[start:i])
147,371✔
2735
                                start = -1
147,371✔
2736
                        }
147,371✔
2737
                default:
2,770,741✔
2738
                        if start < 0 {
3,017,842✔
2739
                                start = i
247,101✔
2740
                        }
247,101✔
2741
                }
2742
        }
2743
        if start >= 0 {
199,460✔
2744
                args = append(args, arg[start:])
99,730✔
2745
        }
99,730✔
2746

2747
        c.pa.arg = arg
99,730✔
2748
        switch len(args) {
99,730✔
2749
        case 0, 1:
×
2750
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2751
        case 2:
74,783✔
2752
                c.pa.reply = nil
74,783✔
2753
                c.pa.queues = nil
74,783✔
2754
                c.pa.szb = args[1]
74,783✔
2755
                c.pa.size = parseSize(args[1])
74,783✔
2756
        case 3:
2,411✔
2757
                c.pa.reply = args[1]
2,411✔
2758
                c.pa.queues = nil
2,411✔
2759
                c.pa.szb = args[2]
2,411✔
2760
                c.pa.size = parseSize(args[2])
2,411✔
2761
        default:
22,536✔
2762
                // args[1] is our reply indicator. Should be + or | normally.
22,536✔
2763
                if len(args[1]) != 1 {
22,536✔
2764
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2765
                }
×
2766
                switch args[1][0] {
22,536✔
2767
                case '+':
158✔
2768
                        c.pa.reply = args[2]
158✔
2769
                case '|':
22,378✔
2770
                        c.pa.reply = nil
22,378✔
2771
                default:
×
2772
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2773
                }
2774
                // Grab size.
2775
                c.pa.szb = args[len(args)-1]
22,536✔
2776
                c.pa.size = parseSize(c.pa.szb)
22,536✔
2777

22,536✔
2778
                // Grab queue names.
22,536✔
2779
                if c.pa.reply != nil {
22,694✔
2780
                        c.pa.queues = args[3 : len(args)-1]
158✔
2781
                } else {
22,536✔
2782
                        c.pa.queues = args[2 : len(args)-1]
22,378✔
2783
                }
22,378✔
2784
        }
2785
        if c.pa.size < 0 {
99,730✔
2786
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2787
        }
×
2788

2789
        // Common ones processed after check for arg length
2790
        c.pa.subject = args[0]
99,730✔
2791

99,730✔
2792
        return nil
99,730✔
2793
}
2794

2795
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2796
func (c *client) processInboundLeafMsg(msg []byte) {
98,138✔
2797
        // Update statistics
98,138✔
2798
        // The msg includes the CR_LF, so pull back out for accounting.
98,138✔
2799
        c.in.msgs++
98,138✔
2800
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
98,138✔
2801

98,138✔
2802
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
98,138✔
2803

98,138✔
2804
        // Mostly under testing scenarios.
98,138✔
2805
        if srv == nil || acc == nil {
98,139✔
2806
                return
1✔
2807
        }
1✔
2808

2809
        // Match the subscriptions. We will use our own L1 map if
2810
        // it's still valid, avoiding contention on the shared sublist.
2811
        var r *SublistResult
98,137✔
2812
        var ok bool
98,137✔
2813

98,137✔
2814
        genid := atomic.LoadUint64(&c.acc.sl.genid)
98,137✔
2815
        if genid == c.in.genid && c.in.results != nil {
193,902✔
2816
                r, ok = c.in.results[subject]
95,765✔
2817
        } else {
98,137✔
2818
                // Reset our L1 completely.
2,372✔
2819
                c.in.results = make(map[string]*SublistResult)
2,372✔
2820
                c.in.genid = genid
2,372✔
2821
        }
2,372✔
2822

2823
        // Go back to the sublist data structure.
2824
        if !ok {
169,593✔
2825
                r = c.acc.sl.Match(subject)
71,456✔
2826
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
71,456✔
2827
                if len(c.in.results) >= maxResultCacheSize {
73,445✔
2828
                        n := 0
1,989✔
2829
                        for subj := range c.in.results {
67,626✔
2830
                                delete(c.in.results, subj)
65,637✔
2831
                                if n++; n > pruneSize {
67,626✔
2832
                                        break
1,989✔
2833
                                }
2834
                        }
2835
                }
2836
                // Then add the new cache entry.
2837
                c.in.results[subject] = r
71,456✔
2838
        }
2839

2840
        // Collect queue names if needed.
2841
        var qnames [][]byte
98,137✔
2842

98,137✔
2843
        // Check for no interest, short circuit if so.
98,137✔
2844
        // This is the fanout scale.
98,137✔
2845
        if len(r.psubs)+len(r.qsubs) > 0 {
195,508✔
2846
                flag := pmrNoFlag
97,371✔
2847
                // If we have queue subs in this cluster, then if we run in gateway
97,371✔
2848
                // mode and the remote gateways have queue subs, then we need to
97,371✔
2849
                // collect the queue groups this message was sent to so that we
97,371✔
2850
                // exclude them when sending to gateways.
97,371✔
2851
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
97,371✔
2852
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
109,565✔
2853
                        flag |= pmrCollectQueueNames
12,194✔
2854
                }
12,194✔
2855
                // If this is a mapped subject that means the mapped interest
2856
                // is what got us here, but this might not have a queue designation
2857
                // If that is the case, make sure we ignore to process local queue subscribers.
2858
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
97,670✔
2859
                        flag |= pmrIgnoreEmptyQueueFilter
299✔
2860
                }
299✔
2861
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
97,371✔
2862
        }
2863

2864
        // Now deal with gateways
2865
        if c.srv.gateway.enabled {
111,683✔
2866
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames)
13,546✔
2867
        }
13,546✔
2868
}
2869

2870
// Handles a subscription permission violation.
2871
// See leafPermViolation() for details.
2872
func (c *client) leafSubPermViolation(subj []byte) {
303✔
2873
        c.leafPermViolation(false, subj)
303✔
2874
}
303✔
2875

2876
// Common function to process publish or subscribe leafnode permission violation.
2877
// Sends the permission violation error to the remote, logs it and closes the connection.
2878
// If this is from a server soliciting, the reconnection will be delayed.
2879
func (c *client) leafPermViolation(pub bool, subj []byte) {
303✔
2880
        if c.isSpokeLeafNode() {
606✔
2881
                // For spokes these are no-ops since the hub server told us our permissions.
303✔
2882
                // We just need to not send these over to the other side since we will get cutoff.
303✔
2883
                return
303✔
2884
        }
303✔
2885
        // FIXME(dlc) ?
2886
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2887
        var action string
×
2888
        if pub {
×
2889
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2890
                action = "Publish"
×
2891
        } else {
×
2892
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2893
                action = "Subscription"
×
2894
        }
×
2895
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2896
        // TODO: add a new close reason that is more appropriate?
×
2897
        c.closeConnection(ProtocolViolation)
×
2898
}
2899

2900
// Invoked from generic processErr() for LEAF connections.
2901
func (c *client) leafProcessErr(errStr string) {
32✔
2902
        // Check if we got a cluster name collision.
32✔
2903
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
36✔
2904
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2905
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2906
                return
4✔
2907
        }
4✔
2908

2909
        // We will look for Loop detected error coming from the other side.
2910
        // If we solicit, set the connect delay.
2911
        if !strings.Contains(errStr, "Loop detected") {
49✔
2912
                return
21✔
2913
        }
21✔
2914
        c.handleLeafNodeLoop(false)
7✔
2915
}
2916

2917
// If this leaf connection solicits, sets the connect delay to the given value,
2918
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2919
// Returns the connection's account name and delay.
2920
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
19✔
2921
        c.mu.Lock()
19✔
2922
        if c.isSolicitedLeafNode() {
30✔
2923
                if s := c.srv; s != nil {
22✔
2924
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
2925
                                delay = srvdelay
4✔
2926
                        }
4✔
2927
                }
2928
                c.leaf.remote.setConnectDelay(delay)
11✔
2929
        }
2930
        accName := c.acc.Name
19✔
2931
        c.mu.Unlock()
19✔
2932
        return accName, delay
19✔
2933
}
2934

2935
// For the given remote Leafnode configuration, this function returns
2936
// if TLS is required, and if so, will return a clone of the TLS Config
2937
// (since some fields will be changed during handshake), the TLS server
2938
// name that is remembered, and the TLS timeout.
2939
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,803✔
2940
        var (
1,803✔
2941
                tlsConfig  *tls.Config
1,803✔
2942
                tlsName    string
1,803✔
2943
                tlsTimeout float64
1,803✔
2944
        )
1,803✔
2945

1,803✔
2946
        remote.RLock()
1,803✔
2947
        defer remote.RUnlock()
1,803✔
2948

1,803✔
2949
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,803✔
2950
        if tlsRequired {
1,886✔
2951
                if remote.TLSConfig != nil {
135✔
2952
                        tlsConfig = remote.TLSConfig.Clone()
52✔
2953
                } else {
83✔
2954
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
31✔
2955
                }
31✔
2956
                tlsName = remote.tlsName
83✔
2957
                tlsTimeout = remote.TLSTimeout
83✔
2958
                if tlsTimeout == 0 {
131✔
2959
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
48✔
2960
                }
48✔
2961
        }
2962

2963
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,803✔
2964
}
2965

2966
// Initiates the LeafNode Websocket connection by:
2967
// - doing the TLS handshake if needed
2968
// - sending the HTTP request
2969
// - waiting for the HTTP response
2970
//
2971
// Since some bufio reader is used to consume the HTTP response, this function
2972
// returns the slice of buffered bytes (if any) so that the readLoop that will
2973
// be started after that consume those first before reading from the socket.
2974
// The boolean
2975
//
2976
// Lock held on entry.
2977
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
2978
        remote.RLock()
43✔
2979
        compress := remote.Websocket.Compression
43✔
2980
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
2981
        noMasking := remote.Websocket.NoMasking
43✔
2982
        infoTimeout := remote.FirstInfoTimeout
43✔
2983
        remote.RUnlock()
43✔
2984
        // Will do the client-side TLS handshake if needed.
43✔
2985
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
2986
        if err != nil {
47✔
2987
                // 0 will indicate that the connection was already closed
4✔
2988
                return nil, 0, err
4✔
2989
        }
4✔
2990

2991
        // For http request, we need the passed URL to contain either http or https scheme.
2992
        scheme := "http"
39✔
2993
        if tlsRequired {
47✔
2994
                scheme = "https"
8✔
2995
        }
8✔
2996
        // We will use the `/leafnode` path to tell the accepting WS server that it should
2997
        // create a LEAF connection, not a CLIENT.
2998
        // In case we use the user's URL path in the future, make sure we append the user's
2999
        // path to our `/leafnode` path.
3000
        lpath := leafNodeWSPath
39✔
3001
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
3002
                if curPath[0] == '/' {
42✔
3003
                        curPath = curPath[1:]
21✔
3004
                }
21✔
3005
                lpath = path.Join(curPath, lpath)
21✔
3006
        } else {
18✔
3007
                lpath = lpath[1:]
18✔
3008
        }
18✔
3009
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3010
        u, _ := url.Parse(ustr)
39✔
3011
        req := &http.Request{
39✔
3012
                Method:     "GET",
39✔
3013
                URL:        u,
39✔
3014
                Proto:      "HTTP/1.1",
39✔
3015
                ProtoMajor: 1,
39✔
3016
                ProtoMinor: 1,
39✔
3017
                Header:     make(http.Header),
39✔
3018
                Host:       u.Host,
39✔
3019
        }
39✔
3020
        wsKey, err := wsMakeChallengeKey()
39✔
3021
        if err != nil {
39✔
3022
                return nil, WriteError, err
×
3023
        }
×
3024

3025
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3026
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3027
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3028
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3029
        if compress {
48✔
3030
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3031
        }
9✔
3032
        if noMasking {
49✔
3033
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3034
        }
10✔
3035
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3036
        if err := req.Write(c.nc); err != nil {
39✔
3037
                return nil, WriteError, err
×
3038
        }
×
3039

3040
        var resp *http.Response
39✔
3041

39✔
3042
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3043
        resp, err = http.ReadResponse(br, req)
39✔
3044
        if err == nil &&
39✔
3045
                (resp.StatusCode != 101 ||
39✔
3046
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3047
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3048
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3049

1✔
3050
                err = fmt.Errorf("invalid websocket connection")
1✔
3051
        }
1✔
3052
        // Check compression extension...
3053
        if err == nil && c.ws.compress {
48✔
3054
                // Check that not only permessage-deflate extension is present, but that
9✔
3055
                // we also have server and client no context take over.
9✔
3056
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3057

9✔
3058
                // If server does not support compression, then simply disable it in our side.
9✔
3059
                if !srvCompress {
13✔
3060
                        c.ws.compress = false
4✔
3061
                } else if !noCtxTakeover {
9✔
3062
                        err = fmt.Errorf("compression negotiation error")
×
3063
                }
×
3064
        }
3065
        // Same for no masking...
3066
        if err == nil && noMasking {
49✔
3067
                // Check if server accepts no masking
10✔
3068
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3069
                        // Nope, need to mask our writes as any client would do.
1✔
3070
                        c.ws.maskwrite = true
1✔
3071
                }
1✔
3072
        }
3073
        if resp != nil {
67✔
3074
                resp.Body.Close()
28✔
3075
        }
28✔
3076
        if err != nil {
51✔
3077
                return nil, ReadError, err
12✔
3078
        }
12✔
3079
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3080

27✔
3081
        var preBuf []byte
27✔
3082
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3083
        if n := br.Buffered(); n != 0 {
27✔
3084
                preBuf, _ = br.Peek(n)
×
3085
        }
×
3086
        return preBuf, 0, nil
27✔
3087
}
3088

3089
const connectProcessTimeout = 2 * time.Second
3090

3091
// This is invoked for remote LEAF remote connections after processing the INFO
3092
// protocol.
3093
func (s *Server) leafNodeResumeConnectProcess(c *client) {
631✔
3094
        clusterName := s.ClusterName()
631✔
3095

631✔
3096
        c.mu.Lock()
631✔
3097
        if c.isClosed() {
631✔
3098
                c.mu.Unlock()
×
3099
                return
×
3100
        }
×
3101
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
633✔
3102
                c.mu.Unlock()
2✔
3103
                c.closeConnection(WriteError)
2✔
3104
                return
2✔
3105
        }
2✔
3106

3107
        // Spin up the write loop.
3108
        s.startGoRoutine(func() { c.writeLoop() })
1,258✔
3109

3110
        // timeout leafNodeFinishConnectProcess
3111
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
629✔
3112
                c.mu.Lock()
×
3113
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3114
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3115
                        c.mu.Unlock()
×
3116
                        return
×
3117
                }
×
3118
                clearTimer(&c.ping.tmr)
×
3119
                closed := c.isClosed()
×
3120
                c.mu.Unlock()
×
3121
                if !closed {
×
3122
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3123
                        c.closeConnection(StaleConnection)
×
3124
                }
×
3125
        })
3126
        c.mu.Unlock()
629✔
3127
        c.Debugf("Remote leafnode connect msg sent")
629✔
3128
}
3129

3130
// This is invoked for remote LEAF connections after processing the INFO
3131
// protocol and leafNodeResumeConnectProcess.
3132
// This will send LS+ the CONNECT protocol and register the leaf node.
3133
func (s *Server) leafNodeFinishConnectProcess(c *client) {
609✔
3134
        c.mu.Lock()
609✔
3135
        if !c.flags.setIfNotSet(connectProcessFinished) {
609✔
3136
                c.mu.Unlock()
×
3137
                return
×
3138
        }
×
3139
        if c.isClosed() {
609✔
3140
                c.mu.Unlock()
×
3141
                s.removeLeafNodeConnection(c)
×
3142
                return
×
3143
        }
×
3144
        remote := c.leaf.remote
609✔
3145
        // Check if we will need to send the system connect event.
609✔
3146
        remote.RLock()
609✔
3147
        sendSysConnectEvent := remote.Hub
609✔
3148
        remote.RUnlock()
609✔
3149

609✔
3150
        // Capture account before releasing lock
609✔
3151
        acc := c.acc
609✔
3152
        // cancel connectProcessTimeout
609✔
3153
        clearTimer(&c.ping.tmr)
609✔
3154
        c.mu.Unlock()
609✔
3155

609✔
3156
        // Make sure we register with the account here.
609✔
3157
        if err := c.registerWithAccount(acc); err != nil {
611✔
3158
                if err == ErrTooManyAccountConnections {
2✔
3159
                        c.maxAccountConnExceeded()
×
3160
                        return
×
3161
                } else if err == ErrLeafNodeLoop {
4✔
3162
                        c.handleLeafNodeLoop(true)
2✔
3163
                        return
2✔
3164
                }
2✔
3165
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3166
                c.closeConnection(ProtocolViolation)
×
3167
                return
×
3168
        }
3169
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
607✔
3170
        s.initLeafNodeSmapAndSendSubs(c)
607✔
3171
        if sendSysConnectEvent {
613✔
3172
                s.sendLeafNodeConnect(acc)
6✔
3173
        }
6✔
3174

3175
        // The above functions are not atomically under the client
3176
        // lock doing those operations. It is possible - since we
3177
        // have started the read/write loops - that the connection
3178
        // is closed before or in between. This would leave the
3179
        // closed LN connection possible registered with the account
3180
        // and/or the server's leafs map. So check if connection
3181
        // is closed, and if so, manually cleanup.
3182
        c.mu.Lock()
607✔
3183
        closed := c.isClosed()
607✔
3184
        if !closed {
1,214✔
3185
                c.setFirstPingTimer()
607✔
3186
        }
607✔
3187
        c.mu.Unlock()
607✔
3188
        if closed {
607✔
3189
                s.removeLeafNodeConnection(c)
×
3190
                if prev := acc.removeClient(c); prev == 1 {
×
3191
                        s.decActiveAccounts()
×
3192
                }
×
3193
        }
3194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc