• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 16435002168

21 Jul 2025 04:05PM UTC coverage: 85.791% (+0.06%) from 85.736%
16435002168

push

github

web-flow
(2.12) Stream leader should propose consumer remaps once assignment processed (#7071)

Right now, in a stream scale up/scale down scenario where there are
consumers that also need to be scaled, the metaleader makes a proposal
to update the stream and then immediately makes proposals to update the
consumers. However, this does not account for the fact that the stream
proposal _could_ fail to be applied, which is where we have sometimes
seen peer set drifts.

This moves this logic to take place on the stream leader once the stream
update proposal is applied instead.

This will also be necessary in a world where the consumer assignments
are managed by the stream leader, as the metaleader will not know
anything about the stream NRG.

Signed-off-by: Neil Twigg <neil@nats.io>

71148 of 82932 relevant lines covered (85.79%)

344990.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.92
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
1,985✔
120
        return c.kind == LEAF && c.leaf.remote != nil
1,985✔
121
}
1,985✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
5,804,255✔
126
        return c.kind == LEAF && c.leaf.isSpoke
5,804,255✔
127
}
5,804,255✔
128

129
func (c *client) isHubLeafNode() bool {
17,395✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
17,395✔
131
}
17,395✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
546✔
135
        sysAccName := _EMPTY_
546✔
136
        sAcc := s.SystemAccount()
546✔
137
        if sAcc != nil {
1,069✔
138
                sysAccName = sAcc.Name
523✔
139
        }
523✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
1,229✔
141
                s.mu.Lock()
683✔
142
                remote := newLeafNodeCfg(r)
683✔
143
                creds := remote.Credentials
683✔
144
                accName := remote.LocalAccount
683✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
683✔
146
                // Print notice if
683✔
147
                if isSysAccRemote {
777✔
148
                        if len(remote.DenyExports) > 0 {
95✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
95✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
683✔
156
                if creds != _EMPTY_ {
732✔
157
                        contents, err := os.ReadFile(creds)
49✔
158
                        defer wipeSlice(contents)
49✔
159
                        if err != nil {
49✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
49✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
49✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
49✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
53✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
45✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
51✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
683✔
178
        }
179
        for _, r := range remotes {
1,229✔
180
                // We need to call this, even if the leaf is disabled. This is so that
683✔
181
                // the number of internal configuration matches the options' remote leaf
683✔
182
                // configuration required for configuration reload.
683✔
183
                remote := addRemote(r, r.LocalAccount == sysAccName)
683✔
184
                if !r.Disabled {
1,365✔
185
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
1,364✔
186
                }
187
        }
188
}
189

190
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
5,277✔
191
        if remote.Disabled {
5,278✔
192
                return false
1✔
193
        }
1✔
194
        for _, ri := range s.getOpts().LeafNode.Remotes {
10,917✔
195
                // FIXME(dlc) - What about auth changes?
5,641✔
196
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
10,917✔
197
                        return true
5,276✔
198
                }
5,276✔
199
        }
200
        return false
×
201
}
202

203
// Ensure that leafnode is properly configured.
204
func validateLeafNode(o *Options) error {
8,318✔
205
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,320✔
206
                return err
2✔
207
        }
2✔
208

209
        // Users can bind to any local account, if its empty we will assume the $G account.
210
        for _, r := range o.LeafNode.Remotes {
9,032✔
211
                if r.LocalAccount == _EMPTY_ {
1,150✔
212
                        r.LocalAccount = globalAccountName
434✔
213
                }
434✔
214
        }
215

216
        // In local config mode, check that leafnode configuration refers to accounts that exist.
217
        if len(o.TrustedOperators) == 0 {
16,319✔
218
                accNames := map[string]struct{}{}
8,003✔
219
                for _, a := range o.Accounts {
16,870✔
220
                        accNames[a.Name] = struct{}{}
8,867✔
221
                }
8,867✔
222
                // global account is always created
223
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,003✔
224
                // in the context of leaf nodes, empty account means global account
8,003✔
225
                accNames[_EMPTY_] = struct{}{}
8,003✔
226
                // system account either exists or, if not disabled, will be created
8,003✔
227
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,314✔
228
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,311✔
229
                }
6,311✔
230
                checkAccountExists := func(accName string, cfgType string) error {
16,728✔
231
                        if _, ok := accNames[accName]; !ok {
8,727✔
232
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
233
                        }
2✔
234
                        return nil
8,723✔
235
                }
236
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,004✔
237
                        return err
1✔
238
                }
1✔
239
                for _, lu := range o.LeafNode.Users {
8,012✔
240
                        if lu.Account == nil { // means global account
13✔
241
                                continue
3✔
242
                        }
243
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
244
                                return err
×
245
                        }
×
246
                }
247
                for _, r := range o.LeafNode.Remotes {
8,717✔
248
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
716✔
249
                                return err
1✔
250
                        }
1✔
251
                }
252
        } else {
313✔
253
                if len(o.LeafNode.Users) != 0 {
314✔
254
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
255
                }
1✔
256
                for _, r := range o.LeafNode.Remotes {
313✔
257
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
258
                                return fmt.Errorf(
1✔
259
                                        "operator mode requires account nkeys in remotes. " +
1✔
260
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
261
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
262
                        }
1✔
263
                }
264
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
312✔
265
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
266
                }
1✔
267
        }
268

269
        // Validate compression settings
270
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,134✔
271
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
3,828✔
272
                        return err
5✔
273
                }
5✔
274
        }
275

276
        // If a remote has a websocket scheme, all need to have it.
277
        for _, rcfg := range o.LeafNode.Remotes {
9,020✔
278
                if len(rcfg.URLs) >= 2 {
922✔
279
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
208✔
280
                        for i := 1; i < len(rcfg.URLs); i++ {
661✔
281
                                u := rcfg.URLs[i]
453✔
282
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
460✔
283
                                        ok = false
7✔
284
                                        break
7✔
285
                                }
286
                        }
287
                        if !ok {
215✔
288
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
289
                        }
7✔
290
                }
291
                // Validate compression settings
292
                if rcfg.Compression.Mode != _EMPTY_ {
1,414✔
293
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
712✔
294
                                return err
5✔
295
                        }
5✔
296
                }
297
        }
298

299
        if o.LeafNode.Port == 0 {
13,355✔
300
                return nil
5,061✔
301
        }
5,061✔
302

303
        // If MinVersion is defined, check that it is valid.
304
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,237✔
305
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
306
                        return err
2✔
307
                }
2✔
308
        }
309

310
        // The checks below will be done only when detecting that we are configured
311
        // with gateways. So if an option validation needs to be done regardless,
312
        // it MUST be done before this point!
313

314
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
5,778✔
315
                return nil
2,547✔
316
        }
2,547✔
317
        // If we are here we have both leaf nodes and gateways defined, make sure there
318
        // is a system account defined.
319
        if o.SystemAccount == _EMPTY_ {
685✔
320
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
321
        }
1✔
322
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
683✔
323
                return fmt.Errorf("leafnode: %v", err)
×
324
        }
×
325
        return nil
683✔
326
}
327

328
func checkLeafMinVersionConfig(mv string) error {
8✔
329
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
330
                if err != nil {
6✔
331
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
332
                } else {
4✔
333
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
334
                }
2✔
335
        }
336
        return nil
4✔
337
}
338

339
// Used to validate user names in LeafNode configuration.
340
// - rejects mix of single and multiple users.
341
// - rejects duplicate user names.
342
func validateLeafNodeAuthOptions(o *Options) error {
8,367✔
343
        if len(o.LeafNode.Users) == 0 {
16,714✔
344
                return nil
8,347✔
345
        }
8,347✔
346
        if o.LeafNode.Username != _EMPTY_ {
22✔
347
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
348
        }
2✔
349
        if o.LeafNode.Nkey != _EMPTY_ {
18✔
350
                return fmt.Errorf("can not have a single nkey and a users array")
×
351
        }
×
352
        users := map[string]struct{}{}
18✔
353
        for _, u := range o.LeafNode.Users {
42✔
354
                if _, exists := users[u.Username]; exists {
26✔
355
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
356
                }
2✔
357
                users[u.Username] = struct{}{}
22✔
358
        }
359
        return nil
16✔
360
}
361

362
// Update remote LeafNode TLS configurations after a config reload.
363
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
15✔
364
        max := len(opts.LeafNode.Remotes)
15✔
365
        if max == 0 {
15✔
366
                return
×
367
        }
×
368

369
        s.mu.RLock()
15✔
370
        defer s.mu.RUnlock()
15✔
371

15✔
372
        // Changes in the list of remote leaf nodes is not supported.
15✔
373
        // However, make sure that we don't go over the arrays.
15✔
374
        if len(s.leafRemoteCfgs) < max {
15✔
375
                max = len(s.leafRemoteCfgs)
×
376
        }
×
377
        for i := 0; i < max; i++ {
34✔
378
                ro := opts.LeafNode.Remotes[i]
19✔
379
                cfg := s.leafRemoteCfgs[i]
19✔
380
                if ro.TLSConfig != nil {
21✔
381
                        cfg.Lock()
2✔
382
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
383
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
384
                        cfg.Unlock()
2✔
385
                }
2✔
386
        }
387
}
388

389
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
227✔
390
        delay := s.getOpts().LeafNode.ReconnectInterval
227✔
391
        select {
227✔
392
        case <-time.After(delay):
177✔
393
        case <-s.quitCh:
50✔
394
                s.grWG.Done()
50✔
395
                return
50✔
396
        }
397
        s.connectToRemoteLeafNode(remote, false)
177✔
398
}
399

400
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
401
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
683✔
402
        cfg := &leafNodeCfg{
683✔
403
                RemoteLeafOpts: remote,
683✔
404
                urls:           make([]*url.URL, 0, len(remote.URLs)),
683✔
405
        }
683✔
406
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
691✔
407
                perms := &Permissions{}
8✔
408
                if len(remote.DenyExports) > 0 {
16✔
409
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
410
                }
8✔
411
                if len(remote.DenyImports) > 0 {
15✔
412
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
413
                }
7✔
414
                cfg.perms = perms
8✔
415
        }
416
        // Start with the one that is configured. We will add to this
417
        // array when receiving async leafnode INFOs.
418
        cfg.urls = append(cfg.urls, cfg.URLs...)
683✔
419
        // If allowed to randomize, do it on our copy of URLs
683✔
420
        if !remote.NoRandomize {
1,364✔
421
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,091✔
422
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
410✔
423
                })
410✔
424
        }
425
        // If we are TLS make sure we save off a proper servername if possible.
426
        // Do same for user/password since we may need them to connect to
427
        // a bare URL that we get from INFO protocol.
428
        for _, u := range cfg.urls {
1,806✔
429
                cfg.saveTLSHostname(u)
1,123✔
430
                cfg.saveUserPassword(u)
1,123✔
431
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,123✔
432
                // config, mark that we should be using TLS anyway.
1,123✔
433
                if !cfg.TLS && isWSSURL(u) {
1,124✔
434
                        cfg.TLS = true
1✔
435
                }
1✔
436
        }
437
        return cfg
683✔
438
}
439

440
// Will pick an URL from the list of available URLs.
441
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
4,516✔
442
        cfg.Lock()
4,516✔
443
        defer cfg.Unlock()
4,516✔
444
        // If the current URL is the first in the list and we have more than
4,516✔
445
        // one URL, then move that one to end of the list.
4,516✔
446
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
8,142✔
447
                first := cfg.urls[0]
3,626✔
448
                copy(cfg.urls, cfg.urls[1:])
3,626✔
449
                cfg.urls[len(cfg.urls)-1] = first
3,626✔
450
        }
3,626✔
451
        cfg.curURL = cfg.urls[0]
4,516✔
452
        return cfg.curURL
4,516✔
453
}
454

455
// Returns the current URL
456
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
81✔
457
        cfg.RLock()
81✔
458
        defer cfg.RUnlock()
81✔
459
        return cfg.curURL
81✔
460
}
81✔
461

462
// Returns how long the server should wait before attempting
463
// to solicit a remote leafnode connection.
464
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
861✔
465
        cfg.RLock()
861✔
466
        delay := cfg.connDelay
861✔
467
        cfg.RUnlock()
861✔
468
        return delay
861✔
469
}
861✔
470

471
// Sets the connect delay.
472
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
152✔
473
        cfg.Lock()
152✔
474
        cfg.connDelay = delay
152✔
475
        cfg.Unlock()
152✔
476
}
152✔
477

478
// Ensure that non-exported options (used in tests) have
479
// been properly set.
480
func (s *Server) setLeafNodeNonExportedOptions() {
6,785✔
481
        opts := s.getOpts()
6,785✔
482
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,785✔
483
        if s.leafNodeOpts.dialTimeout == 0 {
13,569✔
484
                // Use same timeouts as routes for now.
6,784✔
485
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,784✔
486
        }
6,784✔
487
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,785✔
488
        if s.leafNodeOpts.resolver == nil {
13,566✔
489
                s.leafNodeOpts.resolver = net.DefaultResolver
6,781✔
490
        }
6,781✔
491
}
492

493
const sharedSysAccDelay = 250 * time.Millisecond
494

495
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
861✔
496
        defer s.grWG.Done()
861✔
497

861✔
498
        if remote == nil || len(remote.URLs) == 0 {
861✔
499
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
500
                return
×
501
        }
×
502

503
        opts := s.getOpts()
861✔
504
        reconnectDelay := opts.LeafNode.ReconnectInterval
861✔
505
        s.mu.RLock()
861✔
506
        dialTimeout := s.leafNodeOpts.dialTimeout
861✔
507
        resolver := s.leafNodeOpts.resolver
861✔
508
        var isSysAcc bool
861✔
509
        if s.eventsEnabled() {
1,691✔
510
                isSysAcc = remote.LocalAccount == s.sys.account.Name
830✔
511
        }
830✔
512
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
861✔
513
        s.mu.RUnlock()
861✔
514

861✔
515
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
861✔
516
        if firstConnect && isSysAcc && !s.standAloneMode() {
932✔
517
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
71✔
518
                remote.setConnectDelay(sharedSysAccDelay)
71✔
519
        }
71✔
520

521
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
938✔
522
                select {
77✔
523
                case <-time.After(connDelay):
70✔
524
                case <-s.quitCh:
7✔
525
                        return
7✔
526
                }
527
                remote.setConnectDelay(0)
70✔
528
        }
529

530
        var conn net.Conn
854✔
531

854✔
532
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
854✔
533

854✔
534
        attempts := 0
854✔
535

854✔
536
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
5,370✔
537
                rURL := remote.pickNextURL()
4,516✔
538
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
4,516✔
539
                if err == nil {
9,025✔
540
                        var ipStr string
4,509✔
541
                        if url != rURL.Host {
4,581✔
542
                                ipStr = fmt.Sprintf(" (%s)", url)
72✔
543
                        }
72✔
544
                        // Some test may want to disable remotes from connecting
545
                        if s.isLeafConnectDisabled() {
4,640✔
546
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
131✔
547
                                err = ErrLeafNodeDisabled
131✔
548
                        } else {
4,509✔
549
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
4,378✔
550
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
4,378✔
551
                        }
4,378✔
552
                }
553
                if err != nil {
8,272✔
554
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
3,756✔
555
                        delay := reconnectDelay + jitter
3,756✔
556
                        attempts++
3,756✔
557
                        if s.shouldReportConnectErr(firstConnect, attempts) {
7,488✔
558
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,732✔
559
                        } else {
3,756✔
560
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
24✔
561
                        }
24✔
562
                        remote.Lock()
3,756✔
563
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
3,756✔
564
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
3,764✔
565
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
566
                                        s.checkJetStreamMigrate(remote)
8✔
567
                                })
8✔
568
                        }
569
                        remote.Unlock()
3,756✔
570
                        select {
3,756✔
571
                        case <-s.quitCh:
90✔
572
                                remote.cancelMigrateTimer()
90✔
573
                                return
90✔
574
                        case <-time.After(delay):
3,665✔
575
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
3,665✔
576
                                // This will be used if JetStreamClusterMigrateDelay was not set
3,665✔
577
                                if jetstreamMigrateDelay == 0 {
7,253✔
578
                                        s.checkJetStreamMigrate(remote)
3,588✔
579
                                }
3,588✔
580
                                continue
3,665✔
581
                        }
582
                }
583
                remote.cancelMigrateTimer()
760✔
584
                if !s.remoteLeafNodeStillValid(remote) {
760✔
585
                        conn.Close()
×
586
                        return
×
587
                }
×
588

589
                // We have a connection here to a remote server.
590
                // Go ahead and create our leaf node and return.
591
                s.createLeafNode(conn, rURL, remote, nil)
760✔
592

760✔
593
                // Clear any observer states if we had them.
760✔
594
                s.clearObserverState(remote)
760✔
595

760✔
596
                return
760✔
597
        }
598
}
599

600
func (cfg *leafNodeCfg) cancelMigrateTimer() {
850✔
601
        cfg.Lock()
850✔
602
        stopAndClearTimer(&cfg.jsMigrateTimer)
850✔
603
        cfg.Unlock()
850✔
604
}
850✔
605

606
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
607
func (s *Server) clearObserverState(remote *leafNodeCfg) {
760✔
608
        s.mu.RLock()
760✔
609
        accName := remote.LocalAccount
760✔
610
        s.mu.RUnlock()
760✔
611

760✔
612
        acc, err := s.LookupAccount(accName)
760✔
613
        if err != nil {
762✔
614
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
615
                return
2✔
616
        }
2✔
617

618
        acc.jscmMu.Lock()
758✔
619
        defer acc.jscmMu.Unlock()
758✔
620

758✔
621
        // Walk all streams looking for any clustered stream, skip otherwise.
758✔
622
        for _, mset := range acc.streams() {
776✔
623
                node := mset.raftNode()
18✔
624
                if node == nil {
27✔
625
                        // Not R>1
9✔
626
                        continue
9✔
627
                }
628
                // Check consumers
629
                for _, o := range mset.getConsumers() {
12✔
630
                        if n := o.raftNode(); n != nil {
5✔
631
                                // Ensure we can become a leader again.
2✔
632
                                n.SetObserver(false)
2✔
633
                        }
2✔
634
                }
635
                // Ensure we can not become a leader again.
636
                node.SetObserver(false)
9✔
637
        }
638
}
639

640
// Check to see if we should migrate any assets from this account.
641
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
3,596✔
642
        s.mu.RLock()
3,596✔
643
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
3,596✔
644
        s.mu.RUnlock()
3,596✔
645

3,596✔
646
        if !shouldMigrate {
7,130✔
647
                return
3,534✔
648
        }
3,534✔
649

650
        acc, err := s.LookupAccount(accName)
62✔
651
        if err != nil {
62✔
652
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
653
                return
×
654
        }
×
655

656
        acc.jscmMu.Lock()
62✔
657
        defer acc.jscmMu.Unlock()
62✔
658

62✔
659
        // Walk all streams looking for any clustered stream, skip otherwise.
62✔
660
        // If we are the leader force stepdown.
62✔
661
        for _, mset := range acc.streams() {
93✔
662
                node := mset.raftNode()
31✔
663
                if node == nil {
31✔
664
                        // Not R>1
×
665
                        continue
×
666
                }
667
                // Collect any consumers
668
                for _, o := range mset.getConsumers() {
52✔
669
                        if n := o.raftNode(); n != nil {
42✔
670
                                n.StepDown()
21✔
671
                                // Ensure we can not become a leader while in this state.
21✔
672
                                n.SetObserver(true)
21✔
673
                        }
21✔
674
                }
675
                // Stepdown if this stream was leader.
676
                node.StepDown()
31✔
677
                // Ensure we can not become a leader while in this state.
31✔
678
                node.SetObserver(true)
31✔
679
        }
680
}
681

682
// Helper for checking.
683
func (s *Server) isLeafConnectDisabled() bool {
4,509✔
684
        s.mu.RLock()
4,509✔
685
        defer s.mu.RUnlock()
4,509✔
686
        return s.leafDisableConnect
4,509✔
687
}
4,509✔
688

689
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
690
// come from the server we connect to.
691
//
692
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
693
// However, this was causing failures for users that did not set the scheme (and
694
// their remote connections did not have a tls{} block).
695
// We now save the host name regardless in case the remote returns an INFO indicating
696
// that TLS is required.
697
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
1,737✔
698
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
1,757✔
699
                cfg.tlsName = u.Hostname()
20✔
700
        }
20✔
701
}
702

703
// Save off the username/password for when we connect using a bare URL
704
// that we get from the INFO protocol.
705
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,123✔
706
        if cfg.username == _EMPTY_ && u.User != nil {
1,387✔
707
                cfg.username = u.User.Username()
264✔
708
                cfg.password, _ = u.User.Password()
264✔
709
        }
264✔
710
}
711

712
// This starts the leafnode accept loop in a go routine, unless it
713
// is detected that the server has already been shutdown.
714
func (s *Server) startLeafNodeAcceptLoop() {
3,200✔
715
        // Snapshot server options.
3,200✔
716
        opts := s.getOpts()
3,200✔
717

3,200✔
718
        port := opts.LeafNode.Port
3,200✔
719
        if port == -1 {
6,247✔
720
                port = 0
3,047✔
721
        }
3,047✔
722

723
        if s.isShuttingDown() {
3,201✔
724
                return
1✔
725
        }
1✔
726

727
        s.mu.Lock()
3,199✔
728
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,199✔
729
        l, e := natsListen("tcp", hp)
3,199✔
730
        s.leafNodeListenerErr = e
3,199✔
731
        if e != nil {
3,199✔
732
                s.mu.Unlock()
×
733
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
734
                return
×
735
        }
×
736

737
        s.Noticef("Listening for leafnode connections on %s",
3,199✔
738
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,199✔
739

3,199✔
740
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,199✔
741
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,199✔
742
        // Do not set compression in this Info object, it would possibly cause
3,199✔
743
        // issues when sending asynchronous INFO to the remote.
3,199✔
744
        info := Info{
3,199✔
745
                ID:            s.info.ID,
3,199✔
746
                Name:          s.info.Name,
3,199✔
747
                Version:       s.info.Version,
3,199✔
748
                GitCommit:     gitCommit,
3,199✔
749
                GoVersion:     runtime.Version(),
3,199✔
750
                AuthRequired:  true,
3,199✔
751
                TLSRequired:   tlsRequired,
3,199✔
752
                TLSVerify:     tlsVerify,
3,199✔
753
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,199✔
754
                Headers:       s.supportsHeaders(),
3,199✔
755
                JetStream:     opts.JetStream,
3,199✔
756
                Domain:        opts.JetStreamDomain,
3,199✔
757
                Proto:         s.getServerProto(),
3,199✔
758
                InfoOnConnect: true,
3,199✔
759
        }
3,199✔
760
        // If we have selected a random port...
3,199✔
761
        if port == 0 {
6,245✔
762
                // Write resolved port back to options.
3,046✔
763
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,046✔
764
        }
3,046✔
765

766
        s.leafNodeInfo = info
3,199✔
767
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,199✔
768
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,199✔
769
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
770
                l.Close()
×
771
                s.mu.Unlock()
×
772
                return
×
773
        }
×
774
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,199✔
775
        s.generateLeafNodeInfoJSON()
3,199✔
776

3,199✔
777
        // Setup state that can enable shutdown
3,199✔
778
        s.leafNodeListener = l
3,199✔
779

3,199✔
780
        // As of now, a server that does not have remotes configured would
3,199✔
781
        // never solicit a connection, so we should not have to warn if
3,199✔
782
        // InsecureSkipVerify is set in main LeafNodes config (since
3,199✔
783
        // this TLS setting matters only when soliciting a connection).
3,199✔
784
        // Still, warn if insecure is set in any of LeafNode block.
3,199✔
785
        // We need to check remotes, even if tls is not required on accept.
3,199✔
786
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,199✔
787
        if !warn {
6,394✔
788
                for _, r := range opts.LeafNode.Remotes {
3,352✔
789
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
158✔
790
                                warn = true
1✔
791
                                break
1✔
792
                        }
793
                }
794
        }
795
        if warn {
3,204✔
796
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
797
        }
5✔
798
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
3,987✔
799
        s.mu.Unlock()
3,199✔
800
}
801

802
// RegEx to match a creds file with user JWT and Seed.
803
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
804

805
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
806
// Lock should be held entering here.
807
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
640✔
808
        // We support basic user/pass and operator based user JWT with signatures.
640✔
809
        cinfo := leafConnectInfo{
640✔
810
                Version:       VERSION,
640✔
811
                ID:            c.srv.info.ID,
640✔
812
                Domain:        c.srv.info.Domain,
640✔
813
                Name:          c.srv.info.Name,
640✔
814
                Hub:           c.leaf.remote.Hub,
640✔
815
                Cluster:       clusterName,
640✔
816
                Headers:       headers,
640✔
817
                JetStream:     c.acc.jetStreamConfigured(),
640✔
818
                DenyPub:       c.leaf.remote.DenyImports,
640✔
819
                Compression:   c.leaf.compression,
640✔
820
                RemoteAccount: c.acc.GetName(),
640✔
821
                Proto:         c.srv.getServerProto(),
640✔
822
        }
640✔
823

640✔
824
        // If a signature callback is specified, this takes precedence over anything else.
640✔
825
        if cb := c.leaf.remote.SignatureCB; cb != nil {
643✔
826
                nonce := c.nonce
3✔
827
                c.mu.Unlock()
3✔
828
                jwt, sigraw, err := cb(nonce)
3✔
829
                c.mu.Lock()
3✔
830
                if err == nil && c.isClosed() {
4✔
831
                        err = ErrConnectionClosed
1✔
832
                }
1✔
833
                if err != nil {
5✔
834
                        c.Errorf("Error signing the nonce: %v", err)
2✔
835
                        return err
2✔
836
                }
2✔
837
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
1✔
838
                cinfo.JWT, cinfo.Sig = jwt, sig
1✔
839

840
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
688✔
841
                // Check for credentials first, that will take precedence..
51✔
842
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
51✔
843
                contents, err := os.ReadFile(creds)
51✔
844
                if err != nil {
51✔
845
                        c.Errorf("%v", err)
×
846
                        return err
×
847
                }
×
848
                defer wipeSlice(contents)
51✔
849
                items := credsRe.FindAllSubmatch(contents, -1)
51✔
850
                if len(items) < 2 {
51✔
851
                        c.Errorf("Credentials file malformed")
×
852
                        return err
×
853
                }
×
854
                // First result should be the user JWT.
855
                // We copy here so that the file containing the seed will be wiped appropriately.
856
                raw := items[0][1]
51✔
857
                tmp := make([]byte, len(raw))
51✔
858
                copy(tmp, raw)
51✔
859
                // Seed is second item.
51✔
860
                kp, err := nkeys.FromSeed(items[1][1])
51✔
861
                if err != nil {
51✔
862
                        c.Errorf("Credentials file has malformed seed")
×
863
                        return err
×
864
                }
×
865
                // Wipe our key on exit.
866
                defer kp.Wipe()
51✔
867

51✔
868
                sigraw, _ := kp.Sign(c.nonce)
51✔
869
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
51✔
870
                cinfo.JWT = bytesToString(tmp)
51✔
871
                cinfo.Sig = sig
51✔
872
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
588✔
873
                kp, err := nkeys.FromSeed([]byte(nkey))
2✔
874
                if err != nil {
2✔
875
                        c.Errorf("Remote nkey has malformed seed")
×
876
                        return err
×
877
                }
×
878
                // Wipe our key on exit.
879
                defer kp.Wipe()
2✔
880
                sigraw, _ := kp.Sign(c.nonce)
2✔
881
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
882
                pkey, _ := kp.PublicKey()
2✔
883
                cinfo.Nkey = pkey
2✔
884
                cinfo.Sig = sig
2✔
885
        }
886
        // In addition, and this is to allow auth callout, set user/password or
887
        // token if applicable.
888
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
922✔
889
                // For backward compatibility, if only username is provided, set both
284✔
890
                // Token and User, not just Token.
284✔
891
                cinfo.User = userInfo.Username()
284✔
892
                var ok bool
284✔
893
                cinfo.Pass, ok = userInfo.Password()
284✔
894
                if !ok {
290✔
895
                        cinfo.Token = cinfo.User
6✔
896
                }
6✔
897
        } else if c.leaf.remote.username != _EMPTY_ {
359✔
898
                cinfo.User = c.leaf.remote.username
5✔
899
                cinfo.Pass = c.leaf.remote.password
5✔
900
        }
5✔
901
        b, err := json.Marshal(cinfo)
638✔
902
        if err != nil {
638✔
903
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
904
                return err
×
905
        }
×
906
        // Although this call is made before the writeLoop is created,
907
        // we don't really need to send in place. The protocol will be
908
        // sent out by the writeLoop.
909
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
638✔
910
        return nil
638✔
911
}
912

913
// Makes a deep copy of the LeafNode Info structure.
914
// The server lock is held on entry.
915
func (s *Server) copyLeafNodeInfo() *Info {
2,559✔
916
        clone := s.leafNodeInfo
2,559✔
917
        // Copy the array of urls.
2,559✔
918
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,644✔
919
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,085✔
920
        }
2,085✔
921
        return &clone
2,559✔
922
}
923

924
// Adds a LeafNode URL that we get when a route connects to the Info structure.
925
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
926
// Returns a boolean indicating if the URL was added or not.
927
// Server lock is held on entry
928
func (s *Server) addLeafNodeURL(urlStr string) bool {
6,373✔
929
        if s.leafURLsMap.addUrl(urlStr) {
12,741✔
930
                s.generateLeafNodeInfoJSON()
6,368✔
931
                return true
6,368✔
932
        }
6,368✔
933
        return false
5✔
934
}
935

936
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
937
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
938
// Returns a boolean indicating if the URL was removed or not.
939
// Server lock is held on entry.
940
func (s *Server) removeLeafNodeURL(urlStr string) bool {
6,373✔
941
        // Don't need to do this if we are removing the route connection because
6,373✔
942
        // we are shuting down...
6,373✔
943
        if s.isShuttingDown() {
9,728✔
944
                return false
3,355✔
945
        }
3,355✔
946
        if s.leafURLsMap.removeUrl(urlStr) {
6,032✔
947
                s.generateLeafNodeInfoJSON()
3,014✔
948
                return true
3,014✔
949
        }
3,014✔
950
        return false
4✔
951
}
952

953
// Server lock is held on entry
954
func (s *Server) generateLeafNodeInfoJSON() {
12,581✔
955
        s.leafNodeInfo.Cluster = s.cachedClusterName()
12,581✔
956
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
12,581✔
957
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
12,581✔
958
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
12,581✔
959
}
12,581✔
960

961
// Sends an async INFO protocol so that the connected servers can update
962
// their list of LeafNode urls.
963
func (s *Server) sendAsyncLeafNodeInfo() {
9,382✔
964
        for _, c := range s.leafs {
9,480✔
965
                c.mu.Lock()
98✔
966
                c.enqueueProto(s.leafNodeInfoJSON)
98✔
967
                c.mu.Unlock()
98✔
968
        }
98✔
969
}
970

971
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
972
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,575✔
973
        // Snapshot server options.
1,575✔
974
        opts := s.getOpts()
1,575✔
975

1,575✔
976
        maxPay := int32(opts.MaxPayload)
1,575✔
977
        maxSubs := int32(opts.MaxSubs)
1,575✔
978
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,575✔
979
        if maxSubs == 0 {
3,149✔
980
                maxSubs = -1
1,574✔
981
        }
1,574✔
982
        now := time.Now().UTC()
1,575✔
983

1,575✔
984
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,575✔
985
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,575✔
986
        c.leaf = &leaf{}
1,575✔
987

1,575✔
988
        // For accepted LN connections, ws will be != nil if it was accepted
1,575✔
989
        // through the Websocket port.
1,575✔
990
        c.ws = ws
1,575✔
991

1,575✔
992
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,575✔
993
        // a remote Leaf Node connection as a websocket connection.
1,575✔
994
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,618✔
995
                remote.RLock()
43✔
996
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
997
                remote.RUnlock()
43✔
998
        }
43✔
999

1000
        // Determines if we are soliciting the connection or not.
1001
        var solicited bool
1,575✔
1002
        var acc *Account
1,575✔
1003
        var remoteSuffix string
1,575✔
1004
        if remote != nil {
2,335✔
1005
                // For now, if lookup fails, we will constantly try
760✔
1006
                // to recreate this LN connection.
760✔
1007
                lacc := remote.LocalAccount
760✔
1008
                var err error
760✔
1009
                acc, err = s.LookupAccount(lacc)
760✔
1010
                if err != nil {
762✔
1011
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1012
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1013
                        // remote needs to be set or retry won't happen
2✔
1014
                        c.leaf.remote = remote
2✔
1015
                        c.closeConnection(MissingAccount)
2✔
1016
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1017
                        return nil
2✔
1018
                }
2✔
1019
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
758✔
1020
        }
1021

1022
        c.mu.Lock()
1,573✔
1023
        c.initClient()
1,573✔
1024
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,573✔
1025

1,573✔
1026
        var (
1,573✔
1027
                tlsFirst         bool
1,573✔
1028
                tlsFirstFallback time.Duration
1,573✔
1029
                infoTimeout      time.Duration
1,573✔
1030
        )
1,573✔
1031
        if remote != nil {
2,331✔
1032
                solicited = true
758✔
1033
                remote.Lock()
758✔
1034
                c.leaf.remote = remote
758✔
1035
                c.setPermissions(remote.perms)
758✔
1036
                if !c.leaf.remote.Hub {
1,510✔
1037
                        c.leaf.isSpoke = true
752✔
1038
                }
752✔
1039
                tlsFirst = remote.TLSHandshakeFirst
758✔
1040
                infoTimeout = remote.FirstInfoTimeout
758✔
1041
                remote.Unlock()
758✔
1042
                c.acc = acc
758✔
1043
        } else {
815✔
1044
                c.flags.set(expectConnect)
815✔
1045
                if ws != nil {
842✔
1046
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1047
                }
27✔
1048
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
815✔
1049
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
816✔
1050
                        tlsFirstFallback = f
1✔
1051
                }
1✔
1052
        }
1053
        c.mu.Unlock()
1,573✔
1054

1,573✔
1055
        var nonce [nonceLen]byte
1,573✔
1056
        var info *Info
1,573✔
1057

1,573✔
1058
        // Grab this before the client lock below.
1,573✔
1059
        if !solicited {
2,388✔
1060
                // Grab server variables
815✔
1061
                s.mu.Lock()
815✔
1062
                info = s.copyLeafNodeInfo()
815✔
1063
                // For tests that want to simulate old servers, do not set the compression
815✔
1064
                // on the INFO protocol if configured with CompressionNotSupported.
815✔
1065
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,629✔
1066
                        info.Compression = cm
814✔
1067
                }
814✔
1068
                s.generateNonce(nonce[:])
815✔
1069
                s.mu.Unlock()
815✔
1070
        }
1071

1072
        // Grab lock
1073
        c.mu.Lock()
1,573✔
1074

1,573✔
1075
        var preBuf []byte
1,573✔
1076
        if solicited {
2,331✔
1077
                // For websocket connection, we need to send an HTTP request,
758✔
1078
                // and get the response before starting the readLoop to get
758✔
1079
                // the INFO, etc..
758✔
1080
                if c.isWebsocket() {
801✔
1081
                        var err error
43✔
1082
                        var closeReason ClosedState
43✔
1083

43✔
1084
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1085
                        if err != nil {
59✔
1086
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1087
                                c.mu.Unlock()
16✔
1088
                                if closeReason != 0 {
28✔
1089
                                        c.closeConnection(closeReason)
12✔
1090
                                }
12✔
1091
                                return nil
16✔
1092
                        }
1093
                } else {
715✔
1094
                        // If configured to do TLS handshake first
715✔
1095
                        if tlsFirst {
719✔
1096
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1097
                                        c.mu.Unlock()
1✔
1098
                                        return nil
1✔
1099
                                }
1✔
1100
                        }
1101
                        // We need to wait for the info, but not for too long.
1102
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
714✔
1103
                }
1104

1105
                // We will process the INFO from the readloop and finish by
1106
                // sending the CONNECT and finish registration later.
1107
        } else {
815✔
1108
                // Send our info to the other side.
815✔
1109
                // Remember the nonce we sent here for signatures, etc.
815✔
1110
                c.nonce = make([]byte, nonceLen)
815✔
1111
                copy(c.nonce, nonce[:])
815✔
1112
                info.Nonce = bytesToString(c.nonce)
815✔
1113
                info.CID = c.cid
815✔
1114
                proto := generateInfoJSON(info)
815✔
1115

815✔
1116
                var pre []byte
815✔
1117
                // We need first to check for "TLS First" fallback delay.
815✔
1118
                if tlsFirstFallback > 0 {
816✔
1119
                        // We wait and see if we are getting any data. Since we did not send
1✔
1120
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1121
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1122
                        // if it is a rogue agent and not an actual client performing the
1✔
1123
                        // TLS handshake, the error will be detected when performing the
1✔
1124
                        // handshake on our side.
1✔
1125
                        pre = make([]byte, 4)
1✔
1126
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1127
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1128
                        c.nc.SetReadDeadline(time.Time{})
1✔
1129
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1130
                        // with the TLS handshake.
1✔
1131
                        if n > 0 {
1✔
1132
                                pre = pre[:n]
×
1133
                        } else {
1✔
1134
                                // We did not get anything so we will send the INFO protocol.
1✔
1135
                                pre = nil
1✔
1136
                                // Set the boolean to false for the rest of the function.
1✔
1137
                                tlsFirst = false
1✔
1138
                        }
1✔
1139
                }
1140

1141
                if !tlsFirst {
1,625✔
1142
                        // We have to send from this go routine because we may
810✔
1143
                        // have to block for TLS handshake before we start our
810✔
1144
                        // writeLoop go routine. The other side needs to receive
810✔
1145
                        // this before it can initiate the TLS handshake..
810✔
1146
                        c.sendProtoNow(proto)
810✔
1147

810✔
1148
                        // The above call could have marked the connection as closed (due to TCP error).
810✔
1149
                        if c.isClosed() {
810✔
1150
                                c.mu.Unlock()
×
1151
                                c.closeConnection(WriteError)
×
1152
                                return nil
×
1153
                        }
×
1154
                }
1155

1156
                // Check to see if we need to spin up TLS.
1157
                if !c.isWebsocket() && info.TLSRequired {
889✔
1158
                        // If we have a prebuffer create a multi-reader.
74✔
1159
                        if len(pre) > 0 {
74✔
1160
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1161
                        }
×
1162
                        // Perform server-side TLS handshake.
1163
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
120✔
1164
                                c.mu.Unlock()
46✔
1165
                                return nil
46✔
1166
                        }
46✔
1167
                }
1168

1169
                // If the user wants the TLS handshake to occur first, now that it is
1170
                // done, send the INFO protocol.
1171
                if tlsFirst {
772✔
1172
                        c.flags.set(didTLSFirst)
3✔
1173
                        c.sendProtoNow(proto)
3✔
1174
                        if c.isClosed() {
3✔
1175
                                c.mu.Unlock()
×
1176
                                c.closeConnection(WriteError)
×
1177
                                return nil
×
1178
                        }
×
1179
                }
1180

1181
                // Leaf nodes will always require a CONNECT to let us know
1182
                // when we are properly bound to an account.
1183
                //
1184
                // If compression is configured, we can't set the authTimer here because
1185
                // it would cause the parser to fail any incoming protocol that is not a
1186
                // CONNECT (and we need to exchange INFO protocols for compression
1187
                // negotiation). So instead, use the ping timer until we are done with
1188
                // negotiation and can set the auth timer.
1189
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
769✔
1190
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,332✔
1191
                        c.ping.tmr = time.AfterFunc(timeout, func() {
568✔
1192
                                c.authTimeout()
5✔
1193
                        })
5✔
1194
                } else {
206✔
1195
                        c.setAuthTimer(timeout)
206✔
1196
                }
206✔
1197
        }
1198

1199
        // Keep track in case server is shutdown before we can successfully register.
1200
        if !s.addToTempClients(c.cid, c) {
1,511✔
1201
                c.mu.Unlock()
1✔
1202
                c.setNoReconnect()
1✔
1203
                c.closeConnection(ServerShutdown)
1✔
1204
                return nil
1✔
1205
        }
1✔
1206

1207
        // Spin up the read loop.
1208
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,018✔
1209

1210
        // We will spin the write loop for solicited connections only
1211
        // when processing the INFO and after switching to TLS if needed.
1212
        if !solicited {
2,278✔
1213
                s.startGoRoutine(func() { c.writeLoop() })
1,538✔
1214
        }
1215

1216
        c.mu.Unlock()
1,509✔
1217

1,509✔
1218
        return c
1,509✔
1219
}
1220

1221
// Will perform the client-side TLS handshake if needed. Assumes that this
1222
// is called by the solicit side (remote will be non nil). Returns `true`
1223
// if TLS is required, `false` otherwise.
1224
// Lock held on entry.
1225
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,832✔
1226
        // Check if TLS is required and gather TLS config variables.
1,832✔
1227
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,832✔
1228
        if !tlsRequired {
3,583✔
1229
                return false, nil
1,751✔
1230
        }
1,751✔
1231

1232
        // If TLS required, peform handshake.
1233
        // Get the URL that was used to connect to the remote server.
1234
        rURL := remote.getCurrentURL()
81✔
1235

81✔
1236
        // Perform the client-side TLS handshake.
81✔
1237
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
120✔
1238
                // Check if we need to reset the remote's TLS name.
39✔
1239
                if resetTLSName {
39✔
1240
                        remote.Lock()
×
1241
                        remote.tlsName = _EMPTY_
×
1242
                        remote.Unlock()
×
1243
                }
×
1244
                return false, err
39✔
1245
        }
1246
        return true, nil
42✔
1247
}
1248

1249
func (c *client) processLeafnodeInfo(info *Info) {
2,531✔
1250
        c.mu.Lock()
2,531✔
1251
        if c.leaf == nil || c.isClosed() {
2,531✔
1252
                c.mu.Unlock()
×
1253
                return
×
1254
        }
×
1255
        s := c.srv
2,531✔
1256
        opts := s.getOpts()
2,531✔
1257
        remote := c.leaf.remote
2,531✔
1258
        didSolicit := remote != nil
2,531✔
1259
        firstINFO := !c.flags.isSet(infoReceived)
2,531✔
1260

2,531✔
1261
        // In case of websocket, the TLS handshake has been already done.
2,531✔
1262
        // So check only for non websocket connections and for configurations
2,531✔
1263
        // where the TLS Handshake was not done first.
2,531✔
1264
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,316✔
1265
                // If the server requires TLS, we need to set this in the remote
1,785✔
1266
                // otherwise if there is no TLS configuration block for the remote,
1,785✔
1267
                // the solicit side will not attempt to perform the TLS handshake.
1,785✔
1268
                if firstINFO && info.TLSRequired {
1,850✔
1269
                        remote.TLS = true
65✔
1270
                }
65✔
1271
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,819✔
1272
                        c.mu.Unlock()
34✔
1273
                        return
34✔
1274
                }
34✔
1275
        }
1276

1277
        // Check for compression, unless already done.
1278
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,729✔
1279
                // Prevent from getting back here.
1,232✔
1280
                c.flags.set(compressionNegotiated)
1,232✔
1281

1,232✔
1282
                var co *CompressionOpts
1,232✔
1283
                if !didSolicit {
1,770✔
1284
                        co = &opts.LeafNode.Compression
538✔
1285
                } else {
1,232✔
1286
                        co = &remote.Compression
694✔
1287
                }
694✔
1288
                if needsCompression(co.Mode) {
2,453✔
1289
                        // Release client lock since following function will need server lock.
1,221✔
1290
                        c.mu.Unlock()
1,221✔
1291
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,221✔
1292
                        if err != nil {
1,221✔
1293
                                c.sendErrAndErr(err.Error())
×
1294
                                c.closeConnection(ProtocolViolation)
×
1295
                                return
×
1296
                        }
×
1297
                        if compress {
2,313✔
1298
                                // Done for now, will get back another INFO protocol...
1,092✔
1299
                                return
1,092✔
1300
                        }
1,092✔
1301
                        // No compression because one side does not want/can't, so proceed.
1302
                        c.mu.Lock()
129✔
1303
                        // Check that the connection did not close if the lock was released.
129✔
1304
                        if c.isClosed() {
129✔
1305
                                c.mu.Unlock()
×
1306
                                return
×
1307
                        }
×
1308
                } else {
11✔
1309
                        // Coming from an old server, the Compression field would be the empty
11✔
1310
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1311
                        // this makes them behave as old servers.
11✔
1312
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1313
                                c.leaf.compression = CompressionNotSupported
3✔
1314
                        } else {
11✔
1315
                                c.leaf.compression = CompressionOff
8✔
1316
                        }
8✔
1317
                }
1318
                // Accepting side does not normally process an INFO protocol during
1319
                // initial connection handshake. So we keep it consistent by returning
1320
                // if we are not soliciting.
1321
                if !didSolicit {
141✔
1322
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1323
                        // clear the ping timer and set the auth timer now that the compression
1✔
1324
                        // negotiation is done.
1✔
1325
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1326
                                clearTimer(&c.ping.tmr)
×
1327
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1328
                        }
×
1329
                        c.mu.Unlock()
1✔
1330
                        return
1✔
1331
                }
1332
                // Fall through and process the INFO protocol as usual.
1333
        }
1334

1335
        // Note: For now, only the initial INFO has a nonce. We
1336
        // will probably do auto key rotation at some point.
1337
        if firstINFO {
2,137✔
1338
                // Mark that the INFO protocol has been received.
733✔
1339
                c.flags.set(infoReceived)
733✔
1340
                // Prevent connecting to non leafnode port. Need to do this only for
733✔
1341
                // the first INFO, not for async INFO updates...
733✔
1342
                //
733✔
1343
                // Content of INFO sent by the server when accepting a tcp connection.
733✔
1344
                // -------------------------------------------------------------------
733✔
1345
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
733✔
1346
                // -------------------------------------------------------------------
733✔
1347
                //      CLIENT    |  X* |        X**        |              |         |
733✔
1348
                //      ROUTE     |     |        X**        |      X***    |         |
733✔
1349
                //     GATEWAY    |     |                   |              |    X    |
733✔
1350
                //     LEAFNODE   |  X  |                   |       X      |         |
733✔
1351
                // -------------------------------------------------------------------
733✔
1352
                // *   Not on older servers.
733✔
1353
                // **  Not if "no advertise" is enabled.
733✔
1354
                // *** Not if leafnode's "no advertise" is enabled.
733✔
1355
                //
733✔
1356
                // As seen from above, a solicited LeafNode connection should receive
733✔
1357
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
733✔
1358
                // else should be considered an attempt to connect to a wrong port.
733✔
1359
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
784✔
1360
                        c.mu.Unlock()
51✔
1361
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1362
                        c.closeConnection(WrongPort)
51✔
1363
                        return
51✔
1364
                }
51✔
1365
                // Reject a cluster that contains spaces.
1366
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
683✔
1367
                        c.mu.Unlock()
1✔
1368
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1369
                        c.closeConnection(ProtocolViolation)
1✔
1370
                        return
1✔
1371
                }
1✔
1372
                // Capture a nonce here.
1373
                c.nonce = []byte(info.Nonce)
681✔
1374
                if info.TLSRequired && didSolicit {
712✔
1375
                        remote.TLS = true
31✔
1376
                }
31✔
1377
                supportsHeaders := c.srv.supportsHeaders()
681✔
1378
                c.headers = supportsHeaders && info.Headers
681✔
1379

681✔
1380
                // Remember the remote server.
681✔
1381
                // Pre 2.2.0 servers are not sending their server name.
681✔
1382
                // In that case, use info.ID, which, for those servers, matches
681✔
1383
                // the content of the field `Name` in the leafnode CONNECT protocol.
681✔
1384
                if info.Name == _EMPTY_ {
681✔
1385
                        c.leaf.remoteServer = info.ID
×
1386
                } else {
681✔
1387
                        c.leaf.remoteServer = info.Name
681✔
1388
                }
681✔
1389
                c.leaf.remoteDomain = info.Domain
681✔
1390
                c.leaf.remoteCluster = info.Cluster
681✔
1391
                // We send the protocol version in the INFO protocol.
681✔
1392
                // Keep track of it, so we know if this connection supports message
681✔
1393
                // tracing for instance.
681✔
1394
                c.opts.Protocol = info.Proto
681✔
1395
        }
1396

1397
        // For both initial INFO and async INFO protocols, Possibly
1398
        // update our list of remote leafnode URLs we can connect to.
1399
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,621✔
1400
                // Consider the incoming array as the most up-to-date
1,269✔
1401
                // representation of the remote cluster's list of URLs.
1,269✔
1402
                c.updateLeafNodeURLs(info)
1,269✔
1403
        }
1,269✔
1404

1405
        // Check to see if we have permissions updates here.
1406
        if info.Import != nil || info.Export != nil {
1,364✔
1407
                perms := &Permissions{
12✔
1408
                        Publish:   info.Export,
12✔
1409
                        Subscribe: info.Import,
12✔
1410
                }
12✔
1411
                // Check if we have local deny clauses that we need to merge.
12✔
1412
                if remote := c.leaf.remote; remote != nil {
24✔
1413
                        if len(remote.DenyExports) > 0 {
13✔
1414
                                if perms.Publish == nil {
1✔
1415
                                        perms.Publish = &SubjectPermission{}
×
1416
                                }
×
1417
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1418
                        }
1419
                        if len(remote.DenyImports) > 0 {
13✔
1420
                                if perms.Subscribe == nil {
1✔
1421
                                        perms.Subscribe = &SubjectPermission{}
×
1422
                                }
×
1423
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1424
                        }
1425
                }
1426
                c.setPermissions(perms)
12✔
1427
        }
1428

1429
        var resumeConnect, checkSyncConsumers bool
1,352✔
1430

1,352✔
1431
        // If this is a remote connection and this is the first INFO protocol,
1,352✔
1432
        // then we need to finish the connect process by sending CONNECT, etc..
1,352✔
1433
        if firstINFO && didSolicit {
1,992✔
1434
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
640✔
1435
                c.nc.SetDeadline(time.Time{})
640✔
1436
                resumeConnect = true
640✔
1437
        } else if !firstINFO && didSolicit {
1,981✔
1438
                c.leaf.remoteAccName = info.RemoteAccount
629✔
1439
                checkSyncConsumers = info.JetStream
629✔
1440
        }
629✔
1441

1442
        // Check if we have the remote account information and if so make sure it's stored.
1443
        if info.RemoteAccount != _EMPTY_ {
1,969✔
1444
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
617✔
1445
        }
617✔
1446
        c.mu.Unlock()
1,352✔
1447

1,352✔
1448
        finishConnect := info.ConnectInfo
1,352✔
1449
        if resumeConnect && s != nil {
1,992✔
1450
                s.leafNodeResumeConnectProcess(c)
640✔
1451
                if !info.InfoOnConnect {
640✔
1452
                        finishConnect = true
×
1453
                }
×
1454
        }
1455
        if finishConnect {
1,969✔
1456
                s.leafNodeFinishConnectProcess(c)
617✔
1457
        }
617✔
1458

1459
        // If we have JS enabled and so does the other side, we will
1460
        // check to see if we need to kick any internal source or mirror consumers.
1461
        if checkSyncConsumers {
1,612✔
1462
                s.checkInternalSyncConsumers(c.acc, info.Domain)
260✔
1463
        }
260✔
1464
}
1465

1466
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,221✔
1467
        // Negotiate the appropriate compression mode (or no compression)
1,221✔
1468
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,221✔
1469
        if err != nil {
1,221✔
1470
                return false, err
×
1471
        }
×
1472
        c.mu.Lock()
1,221✔
1473
        // For "auto" mode, set the initial compression mode based on RTT
1,221✔
1474
        if cm == CompressionS2Auto {
2,274✔
1475
                if c.rttStart.IsZero() {
2,106✔
1476
                        c.rtt = computeRTT(c.start)
1,053✔
1477
                }
1,053✔
1478
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,053✔
1479
        }
1480
        // Keep track of the negotiated compression mode.
1481
        c.leaf.compression = cm
1,221✔
1482
        cid := c.cid
1,221✔
1483
        var nonce string
1,221✔
1484
        if !didSolicit {
1,758✔
1485
                nonce = bytesToString(c.nonce)
537✔
1486
        }
537✔
1487
        c.mu.Unlock()
1,221✔
1488

1,221✔
1489
        if !needsCompression(cm) {
1,350✔
1490
                return false, nil
129✔
1491
        }
129✔
1492

1493
        // If we end-up doing compression...
1494

1495
        // Generate an INFO with the chosen compression mode.
1496
        s.mu.Lock()
1,092✔
1497
        info := s.copyLeafNodeInfo()
1,092✔
1498
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,092✔
1499
        infoProto := generateInfoJSON(info)
1,092✔
1500
        s.mu.Unlock()
1,092✔
1501

1,092✔
1502
        // If we solicited, then send this INFO protocol BEFORE switching
1,092✔
1503
        // to compression writer. However, if we did not, we send it after.
1,092✔
1504
        c.mu.Lock()
1,092✔
1505
        if didSolicit {
1,647✔
1506
                c.enqueueProto(infoProto)
555✔
1507
                // Make sure it is completely flushed (the pending bytes goes to
555✔
1508
                // 0) before proceeding.
555✔
1509
                for c.out.pb > 0 && !c.isClosed() {
1,110✔
1510
                        c.flushOutbound()
555✔
1511
                }
555✔
1512
        }
1513
        // This is to notify the readLoop that it should switch to a
1514
        // (de)compression reader.
1515
        c.in.flags.set(switchToCompression)
1,092✔
1516
        // Create the compress writer before queueing the INFO protocol for
1,092✔
1517
        // a route that did not solicit. It will make sure that that proto
1,092✔
1518
        // is sent with compression on.
1,092✔
1519
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,092✔
1520
        if !didSolicit {
1,629✔
1521
                c.enqueueProto(infoProto)
537✔
1522
        }
537✔
1523
        c.mu.Unlock()
1,092✔
1524
        return true, nil
1,092✔
1525
}
1526

1527
// When getting a leaf node INFO protocol, use the provided
1528
// array of urls to update the list of possible endpoints.
1529
func (c *client) updateLeafNodeURLs(info *Info) {
1,269✔
1530
        cfg := c.leaf.remote
1,269✔
1531
        cfg.Lock()
1,269✔
1532
        defer cfg.Unlock()
1,269✔
1533

1,269✔
1534
        // We have ensured that if a remote has a WS scheme, then all are.
1,269✔
1535
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,269✔
1536
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,323✔
1537
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1538
                // we will have already marked that the remote should use TLS anyway.
54✔
1539
                // But use proper scheme for log statements, etc...
54✔
1540
                proto := wsSchemePrefix
54✔
1541
                if cfg.TLS {
54✔
1542
                        proto = wsSchemePrefixTLS
×
1543
                }
×
1544
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1545
                return
54✔
1546
        }
1547
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,215✔
1548
}
1549

1550
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,269✔
1551
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,269✔
1552
        // Add the ones we receive in the protocol
1,269✔
1553
        for _, surl := range URLs {
3,571✔
1554
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,302✔
1555
                if err != nil {
2,302✔
1556
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1557
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1558
                        continue
×
1559
                }
1560
                // Do not add if it's the same as what we already have configured.
1561
                var dup bool
2,302✔
1562
                for _, u := range cfg.URLs {
5,862✔
1563
                        // URLs that we receive never have user info, but the
3,560✔
1564
                        // ones that were configured may have. Simply compare
3,560✔
1565
                        // host and port to decide if they are equal or not.
3,560✔
1566
                        if url.Host == u.Host && url.Port() == u.Port() {
5,248✔
1567
                                dup = true
1,688✔
1568
                                break
1,688✔
1569
                        }
1570
                }
1571
                if !dup {
2,916✔
1572
                        cfg.urls = append(cfg.urls, url)
614✔
1573
                        cfg.saveTLSHostname(url)
614✔
1574
                }
614✔
1575
        }
1576
        // Add the configured one
1577
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,269✔
1578
}
1579

1580
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1581
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,199✔
1582
        opts := s.getOpts()
3,199✔
1583
        if opts.LeafNode.Advertise != _EMPTY_ {
3,210✔
1584
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1585
                if err != nil {
11✔
1586
                        return err
×
1587
                }
×
1588
                s.leafNodeInfo.Host = advHost
11✔
1589
                s.leafNodeInfo.Port = advPort
11✔
1590
        } else {
3,188✔
1591
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,188✔
1592
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,188✔
1593
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,188✔
1594
                // This will return at most 1 IP.
3,188✔
1595
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,188✔
1596
                if err != nil {
3,188✔
1597
                        return err
×
1598
                }
×
1599
                if hostIsIPAny {
3,475✔
1600
                        if len(ips) == 0 {
287✔
1601
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1602
                                        s.leafNodeInfo.Host)
×
1603
                        } else {
287✔
1604
                                // Take the first from the list...
287✔
1605
                                s.leafNodeInfo.Host = ips[0]
287✔
1606
                        }
287✔
1607
                }
1608
        }
1609
        // Use just host:port for the IP
1610
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,199✔
1611
        if opts.LeafNode.Advertise != _EMPTY_ {
3,210✔
1612
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1613
        }
11✔
1614
        return nil
3,199✔
1615
}
1616

1617
// Add the connection to the map of leaf nodes.
1618
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1619
// if a connection already exists for the same server name and account.
1620
// That can happen when the remote is attempting to reconnect while the accepting
1621
// side did not detect the connection as broken yet.
1622
// But it can also happen when there is a misconfiguration and the remote is
1623
// creating two (or more) connections that bind to the same account on the accept
1624
// side.
1625
// When a duplicate is found, the new connection is accepted and the old is closed
1626
// (this solves the stale connection situation). An error is returned to help the
1627
// remote detect the misconfiguration when the duplicate is the result of that
1628
// misconfiguration.
1629
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,266✔
1630
        var accName string
1,266✔
1631
        c.mu.Lock()
1,266✔
1632
        cid := c.cid
1,266✔
1633
        acc := c.acc
1,266✔
1634
        if acc != nil {
2,532✔
1635
                accName = acc.Name
1,266✔
1636
        }
1,266✔
1637
        myRemoteDomain := c.leaf.remoteDomain
1,266✔
1638
        mySrvName := c.leaf.remoteServer
1,266✔
1639
        remoteAccName := c.leaf.remoteAccName
1,266✔
1640
        myClustName := c.leaf.remoteCluster
1,266✔
1641
        solicited := c.leaf.remote != nil
1,266✔
1642
        c.mu.Unlock()
1,266✔
1643

1,266✔
1644
        var old *client
1,266✔
1645
        s.mu.Lock()
1,266✔
1646
        // We check for empty because in some test we may send empty CONNECT{}
1,266✔
1647
        if checkForDup && srvName != _EMPTY_ {
1,884✔
1648
                for _, ol := range s.leafs {
998✔
1649
                        ol.mu.Lock()
380✔
1650
                        // We care here only about non solicited Leafnode. This function
380✔
1651
                        // is more about replacing stale connections than detecting loops.
380✔
1652
                        // We have code for the loop detection elsewhere, which also delays
380✔
1653
                        // attempt to reconnect.
380✔
1654
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
380✔
1655
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
380✔
1656
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
382✔
1657
                                old = ol
2✔
1658
                        }
2✔
1659
                        ol.mu.Unlock()
380✔
1660
                        if old != nil {
382✔
1661
                                break
2✔
1662
                        }
1663
                }
1664
        }
1665
        // Store new connection in the map
1666
        s.leafs[cid] = c
1,266✔
1667
        s.mu.Unlock()
1,266✔
1668
        s.removeFromTempClients(cid)
1,266✔
1669

1,266✔
1670
        // If applicable, evict the old one.
1,266✔
1671
        if old != nil {
1,268✔
1672
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1673
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1674
                c.Warnf("Replacing connection from same server")
2✔
1675
        }
2✔
1676

1677
        srvDecorated := func() string {
1,473✔
1678
                if myClustName == _EMPTY_ {
229✔
1679
                        return mySrvName
22✔
1680
                }
22✔
1681
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
185✔
1682
        }
1683

1684
        opts := s.getOpts()
1,266✔
1685
        sysAcc := s.SystemAccount()
1,266✔
1686
        js := s.getJetStream()
1,266✔
1687
        var meta *raft
1,266✔
1688
        if js != nil {
1,787✔
1689
                if mg := js.getMetaGroup(); mg != nil {
949✔
1690
                        meta = mg.(*raft)
428✔
1691
                }
428✔
1692
        }
1693
        blockMappingOutgoing := false
1,266✔
1694
        // Deny (non domain) JetStream API traffic unless system account is shared
1,266✔
1695
        // and domain names are identical and extending is not disabled
1,266✔
1696

1,266✔
1697
        // Check if backwards compatibility has been enabled and needs to be acted on
1,266✔
1698
        forceSysAccDeny := false
1,266✔
1699
        if len(opts.JsAccDefaultDomain) > 0 {
1,303✔
1700
                if acc == sysAcc {
48✔
1701
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1702
                                if d == _EMPTY_ {
19✔
1703
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1704
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1705
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1706
                                        forceSysAccDeny = true
8✔
1707
                                        break
8✔
1708
                                }
1709
                        }
1710
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1711
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1712
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1713
                        return
15✔
1714
                }
15✔
1715
        }
1716

1717
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1718
        // This is either signaled by js being disabled and a domain set,
1719
        // or in cases where no domain name exists, an extension hint is set.
1720
        // However, this is only relevant in mixed setups.
1721
        //
1722
        // If the system account connects but default domains are present, JetStream can't be extended.
1723
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,251✔
1724
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,344✔
1725
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,093✔
1726
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,093✔
1727
                if acc != nil && acc == sysAcc {
1,228✔
1728
                        c.Noticef("System account connected from %s", srvDecorated())
135✔
1729
                        c.Noticef("JetStream not extended, domains differ")
135✔
1730
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
135✔
1731
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
135✔
1732
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
135✔
1733
                        if solicited && meta != nil && meta.IsObserver() {
164✔
1734
                                meta.setObserver(false, extNotExtended)
29✔
1735
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1736
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1737
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1738
                                // Meta controller can't be leader yet.
29✔
1739
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1740
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1741
                                meta.Campaign()
29✔
1742
                        }
29✔
1743
                } else {
958✔
1744
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
958✔
1745
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
958✔
1746
                }
958✔
1747
                blockMappingOutgoing = true
1,093✔
1748
        } else if acc == sysAcc {
230✔
1749
                // system account and same domain
72✔
1750
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1751
                        myRemoteDomain, srvDecorated())
72✔
1752
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1753
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1754
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1755
                        meta.setObserver(true, extExtended)
4✔
1756
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1757
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1758
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1759
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1760
                        meta.StepDown()
4✔
1761
                }
4✔
1762
        } else {
86✔
1763
                // This deny is needed in all cases (system account shared or not)
86✔
1764
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1765
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1766
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1767
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1768
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1769
        }
86✔
1770
        // If we have a specified JetStream domain we will want to add a mapping to
1771
        // allow access cross domain for each non-system account.
1772
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,482✔
1773
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,310✔
1774
                        if err := acc.AddMapping(src, dest); err != nil {
2,079✔
1775
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1776
                        } else {
2,079✔
1777
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,079✔
1778
                        }
2,079✔
1779
                }
1780
                if blockMappingOutgoing {
431✔
1781
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
200✔
1782
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
200✔
1783
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
200✔
1784
                        // of this issue, not all of them.
200✔
1785
                        // This guards against a hub and a spoke having the same domain name.
200✔
1786
                        // But not two spokes having the same one and the request coming from the hub.
200✔
1787
                        c.mergeDenyPermissionsLocked(pub, []string{src})
200✔
1788
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
200✔
1789
                }
200✔
1790
        }
1791
}
1792

1793
func (s *Server) removeLeafNodeConnection(c *client) {
1,575✔
1794
        c.mu.Lock()
1,575✔
1795
        cid := c.cid
1,575✔
1796
        if c.leaf != nil {
3,150✔
1797
                if c.leaf.tsubt != nil {
2,730✔
1798
                        c.leaf.tsubt.Stop()
1,155✔
1799
                        c.leaf.tsubt = nil
1,155✔
1800
                }
1,155✔
1801
                if c.leaf.gwSub != nil {
2,190✔
1802
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
615✔
1803
                        // We need to set this to nil for GC to release the connection
615✔
1804
                        c.leaf.gwSub = nil
615✔
1805
                }
615✔
1806
        }
1807
        c.mu.Unlock()
1,575✔
1808
        s.mu.Lock()
1,575✔
1809
        delete(s.leafs, cid)
1,575✔
1810
        s.mu.Unlock()
1,575✔
1811
        s.removeFromTempClients(cid)
1,575✔
1812
}
1813

1814
// Connect information for solicited leafnodes.
1815
type leafConnectInfo struct {
1816
        Version   string   `json:"version,omitempty"`
1817
        Nkey      string   `json:"nkey,omitempty"`
1818
        JWT       string   `json:"jwt,omitempty"`
1819
        Sig       string   `json:"sig,omitempty"`
1820
        User      string   `json:"user,omitempty"`
1821
        Pass      string   `json:"pass,omitempty"`
1822
        Token     string   `json:"auth_token,omitempty"`
1823
        ID        string   `json:"server_id,omitempty"`
1824
        Domain    string   `json:"domain,omitempty"`
1825
        Name      string   `json:"name,omitempty"`
1826
        Hub       bool     `json:"is_hub,omitempty"`
1827
        Cluster   string   `json:"cluster,omitempty"`
1828
        Headers   bool     `json:"headers,omitempty"`
1829
        JetStream bool     `json:"jetstream,omitempty"`
1830
        DenyPub   []string `json:"deny_pub,omitempty"`
1831

1832
        // There was an existing field called:
1833
        // >> Comp bool `json:"compression,omitempty"`
1834
        // that has never been used. With support for compression, we now need
1835
        // a field that is a string. So we use a different json tag:
1836
        Compression string `json:"compress_mode,omitempty"`
1837

1838
        // Just used to detect wrong connection attempts.
1839
        Gateway string `json:"gateway,omitempty"`
1840

1841
        // Tells the accept side which account the remote is binding to.
1842
        RemoteAccount string `json:"remote_account,omitempty"`
1843

1844
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1845
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1846
        // version as part of the CONNECT. It will indicate if a connection supports
1847
        // some features, such as message tracing.
1848
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1849
        // in the low level process CONNECT.
1850
        Proto int `json:"protocol,omitempty"`
1851
}
1852

1853
// processLeafNodeConnect will process the inbound connect args.
1854
// Once we are here we are bound to an account, so can send any interest that
1855
// we would have to the other side.
1856
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
657✔
1857
        // Way to detect clients that incorrectly connect to the route listen
657✔
1858
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
657✔
1859
        if lang != _EMPTY_ {
657✔
1860
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1861
                c.closeConnection(WrongPort)
×
1862
                return ErrClientConnectedToLeafNodePort
×
1863
        }
×
1864

1865
        // Unmarshal as a leaf node connect protocol
1866
        proto := &leafConnectInfo{}
657✔
1867
        if err := json.Unmarshal(arg, proto); err != nil {
657✔
1868
                return err
×
1869
        }
×
1870

1871
        // Reject a cluster that contains spaces.
1872
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
658✔
1873
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1874
                c.closeConnection(ProtocolViolation)
1✔
1875
                return ErrClusterNameHasSpaces
1✔
1876
        }
1✔
1877

1878
        // Check for cluster name collisions.
1879
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
660✔
1880
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
4✔
1881
                c.closeConnection(ClusterNamesIdentical)
4✔
1882
                return ErrLeafNodeHasSameClusterName
4✔
1883
        }
4✔
1884

1885
        // Reject if this has Gateway which means that it would be from a gateway
1886
        // connection that incorrectly connects to the leafnode port.
1887
        if proto.Gateway != _EMPTY_ {
652✔
1888
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1889
                c.Errorf(errTxt)
×
1890
                c.sendErr(errTxt)
×
1891
                c.closeConnection(WrongGateway)
×
1892
                return ErrWrongGateway
×
1893
        }
×
1894

1895
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
654✔
1896
                major, minor, update, _ := versionComponents(mv)
2✔
1897
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1898
                        // We are going to send back an INFO because otherwise recent
1✔
1899
                        // versions of the remote server would simply break the connection
1✔
1900
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1901
                        // other side to just "stall" until we finish waiting for the holding
1✔
1902
                        // period and close the connection below.
1✔
1903
                        s.sendPermsAndAccountInfo(c)
1✔
1904
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1905
                        select {
1✔
1906
                        case <-c.srv.quitCh:
1✔
1907
                        case <-time.After(leafNodeWaitBeforeClose):
×
1908
                        }
1909
                        c.closeConnection(MinimumVersionRequired)
1✔
1910
                        return ErrMinimumVersionRequired
1✔
1911
                }
1912
        }
1913

1914
        // Check if this server supports headers.
1915
        supportHeaders := c.srv.supportsHeaders()
651✔
1916

651✔
1917
        c.mu.Lock()
651✔
1918
        // Leaf Nodes do not do echo or verbose or pedantic.
651✔
1919
        c.opts.Verbose = false
651✔
1920
        c.opts.Echo = false
651✔
1921
        c.opts.Pedantic = false
651✔
1922
        // This inbound connection will be marked as supporting headers if this server
651✔
1923
        // support headers and the remote has sent in the CONNECT protocol that it does
651✔
1924
        // support headers too.
651✔
1925
        c.headers = supportHeaders && proto.Headers
651✔
1926
        // If the compression level is still not set, set it based on what has been
651✔
1927
        // given to us in the CONNECT protocol.
651✔
1928
        if c.leaf.compression == _EMPTY_ {
779✔
1929
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
128✔
1930
                if proto.Compression == _EMPTY_ {
161✔
1931
                        c.leaf.compression = CompressionNotSupported
33✔
1932
                } else {
128✔
1933
                        c.leaf.compression = proto.Compression
95✔
1934
                }
95✔
1935
        }
1936

1937
        // Remember the remote server.
1938
        c.leaf.remoteServer = proto.Name
651✔
1939
        // Remember the remote account name
651✔
1940
        c.leaf.remoteAccName = proto.RemoteAccount
651✔
1941

651✔
1942
        // If the other side has declared itself a hub, so we will take on the spoke role.
651✔
1943
        if proto.Hub {
657✔
1944
                c.leaf.isSpoke = true
6✔
1945
        }
6✔
1946

1947
        // The soliciting side is part of a cluster.
1948
        if proto.Cluster != _EMPTY_ {
1,155✔
1949
                c.leaf.remoteCluster = proto.Cluster
504✔
1950
        }
504✔
1951

1952
        c.leaf.remoteDomain = proto.Domain
651✔
1953

651✔
1954
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
651✔
1955
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
651✔
1956
        if !c.isSolicitedLeafNode() && c.perms != nil {
664✔
1957
                sp, pp := c.perms.sub, c.perms.pub
13✔
1958
                c.perms.sub, c.perms.pub = pp, sp
13✔
1959
                if c.opts.Import != nil {
25✔
1960
                        c.darray = c.opts.Import.Deny
12✔
1961
                } else {
13✔
1962
                        c.darray = nil
1✔
1963
                }
1✔
1964
        }
1965

1966
        // Set the Ping timer
1967
        c.setFirstPingTimer()
651✔
1968

651✔
1969
        // If we received pub deny permissions from the other end, merge with existing ones.
651✔
1970
        c.mergeDenyPermissions(pub, proto.DenyPub)
651✔
1971

651✔
1972
        acc := c.acc
651✔
1973
        c.mu.Unlock()
651✔
1974

651✔
1975
        // Register the cluster, even if empty, as long as we are acting as a hub.
651✔
1976
        if !proto.Hub {
1,296✔
1977
                acc.registerLeafNodeCluster(proto.Cluster)
645✔
1978
        }
645✔
1979

1980
        // Add in the leafnode here since we passed through auth at this point.
1981
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
651✔
1982

651✔
1983
        // If we have permissions bound to this leafnode we need to send then back to the
651✔
1984
        // origin server for local enforcement.
651✔
1985
        s.sendPermsAndAccountInfo(c)
651✔
1986

651✔
1987
        // Create and initialize the smap since we know our bound account now.
651✔
1988
        // This will send all registered subs too.
651✔
1989
        s.initLeafNodeSmapAndSendSubs(c)
651✔
1990

651✔
1991
        // Announce the account connect event for a leaf node.
651✔
1992
        // This will be a no-op as needed.
651✔
1993
        s.sendLeafNodeConnect(c.acc)
651✔
1994

651✔
1995
        // If we have JS enabled and so does the other side, we will
651✔
1996
        // check to see if we need to kick any internal source or mirror consumers.
651✔
1997
        if proto.JetStream {
891✔
1998
                s.checkInternalSyncConsumers(acc, proto.Domain)
240✔
1999
        }
240✔
2000
        return nil
651✔
2001
}
2002

2003
// checkInternalSyncConsumers
2004
func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
500✔
2005
        // Grab our js
500✔
2006
        js := s.getJetStream()
500✔
2007

500✔
2008
        // Only applicable if we have JS and the leafnode has JS as well.
500✔
2009
        // We check for remote JS outside.
500✔
2010
        if !js.isEnabled() || acc == nil {
624✔
2011
                return
124✔
2012
        }
124✔
2013

2014
        // We will check all streams in our local account. They must be a leader and
2015
        // be sourcing or mirroring. We will check the external config on the stream itself
2016
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2017
        // extedning the system across this leafnode connection and hence we would be extending
2018
        // our own domain.
2019
        jsa := js.lookupAccount(acc)
376✔
2020
        if jsa == nil {
487✔
2021
                return
111✔
2022
        }
111✔
2023
        var streams []*stream
265✔
2024
        jsa.mu.RLock()
265✔
2025
        for _, mset := range jsa.streams {
284✔
2026
                mset.cfgMu.RLock()
19✔
2027
                // We need to have a mirror or source defined.
19✔
2028
                // We do not want to force another lock here to look for leader status,
19✔
2029
                // so collect and after we release jsa will make sure.
19✔
2030
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
23✔
2031
                        streams = append(streams, mset)
4✔
2032
                }
4✔
2033
                mset.cfgMu.RUnlock()
19✔
2034
        }
2035
        jsa.mu.RUnlock()
265✔
2036

265✔
2037
        // Now loop through all candidates and check if we are the leader and have NOT
265✔
2038
        // created the sync up consumer.
265✔
2039
        for _, mset := range streams {
269✔
2040
                mset.retryDisconnectedSyncConsumers(remoteDomain)
4✔
2041
        }
4✔
2042
}
2043

2044
// Returns the remote cluster name. This is set only once so does not require a lock.
2045
func (c *client) remoteCluster() string {
188,945✔
2046
        if c.leaf == nil {
188,945✔
2047
                return _EMPTY_
×
2048
        }
×
2049
        return c.leaf.remoteCluster
188,945✔
2050
}
2051

2052
// Sends back an info block to the soliciting leafnode to let it know about
2053
// its permission settings for local enforcement.
2054
func (s *Server) sendPermsAndAccountInfo(c *client) {
652✔
2055
        // Copy
652✔
2056
        s.mu.Lock()
652✔
2057
        info := s.copyLeafNodeInfo()
652✔
2058
        s.mu.Unlock()
652✔
2059
        c.mu.Lock()
652✔
2060
        info.CID = c.cid
652✔
2061
        info.Import = c.opts.Import
652✔
2062
        info.Export = c.opts.Export
652✔
2063
        info.RemoteAccount = c.acc.Name
652✔
2064
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
652✔
2065
        info.IsSystemAccount = c.acc == s.SystemAccount()
652✔
2066
        info.ConnectInfo = true
652✔
2067
        c.enqueueProto(generateInfoJSON(info))
652✔
2068
        c.mu.Unlock()
652✔
2069
}
652✔
2070

2071
// Snapshot the current subscriptions from the sublist into our smap which
2072
// we will keep updated from now on.
2073
// Also send the registered subscriptions.
2074
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,266✔
2075
        acc := c.acc
1,266✔
2076
        if acc == nil {
1,266✔
2077
                c.Debugf("Leafnode does not have an account bound")
×
2078
                return
×
2079
        }
×
2080
        // Collect all account subs here.
2081
        _subs := [1024]*subscription{}
1,266✔
2082
        subs := _subs[:0]
1,266✔
2083
        ims := []string{}
1,266✔
2084

1,266✔
2085
        // Hold the client lock otherwise there can be a race and miss some subs.
1,266✔
2086
        c.mu.Lock()
1,266✔
2087
        defer c.mu.Unlock()
1,266✔
2088

1,266✔
2089
        acc.mu.RLock()
1,266✔
2090
        accName := acc.Name
1,266✔
2091
        accNTag := acc.nameTag
1,266✔
2092

1,266✔
2093
        // To make printing look better when no friendly name present.
1,266✔
2094
        if accNTag != _EMPTY_ {
1,277✔
2095
                accNTag = "/" + accNTag
11✔
2096
        }
11✔
2097

2098
        // If we are solicited we only send interest for local clients.
2099
        if c.isSpokeLeafNode() {
1,881✔
2100
                acc.sl.localSubs(&subs, true)
615✔
2101
        } else {
1,266✔
2102
                acc.sl.All(&subs)
651✔
2103
        }
651✔
2104

2105
        // Check if we have an existing service import reply.
2106
        siReply := copyBytes(acc.siReply)
1,266✔
2107

1,266✔
2108
        // Since leaf nodes only send on interest, if the bound
1,266✔
2109
        // account has import services we need to send those over.
1,266✔
2110
        for isubj := range acc.imports.services {
5,956✔
2111
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,965✔
2112
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
275✔
2113
                        continue
275✔
2114
                }
2115
                ims = append(ims, isubj)
4,415✔
2116
        }
2117
        // Likewise for mappings.
2118
        for _, m := range acc.mappings {
3,465✔
2119
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,235✔
2120
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2121
                        continue
36✔
2122
                }
2123
                ims = append(ims, m.src)
2,163✔
2124
        }
2125

2126
        // Create a unique subject that will be used for loop detection.
2127
        lds := acc.lds
1,266✔
2128
        acc.mu.RUnlock()
1,266✔
2129

1,266✔
2130
        // Check if we have to create the LDS.
1,266✔
2131
        if lds == _EMPTY_ {
2,256✔
2132
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
990✔
2133
                acc.mu.Lock()
990✔
2134
                acc.lds = lds
990✔
2135
                acc.mu.Unlock()
990✔
2136
        }
990✔
2137

2138
        // Now check for gateway interest. Leafnodes will put this into
2139
        // the proper mode to propagate, but they are not held in the account.
2140
        gwsa := [16]*client{}
1,266✔
2141
        gws := gwsa[:0]
1,266✔
2142
        s.getOutboundGatewayConnections(&gws)
1,266✔
2143
        for _, cgw := range gws {
1,348✔
2144
                cgw.mu.Lock()
82✔
2145
                gw := cgw.gw
82✔
2146
                cgw.mu.Unlock()
82✔
2147
                if gw != nil {
164✔
2148
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
164✔
2149
                                if e := ei.(*outsie); e != nil && e.sl != nil {
164✔
2150
                                        e.sl.All(&subs)
82✔
2151
                                }
82✔
2152
                        }
2153
                }
2154
        }
2155

2156
        applyGlobalRouting := s.gateway.enabled
1,266✔
2157
        if c.isSpokeLeafNode() {
1,881✔
2158
                // Add a fake subscription for this solicited leafnode connection
615✔
2159
                // so that we can send back directly for mapped GW replies.
615✔
2160
                // We need to keep track of this subscription so it can be removed
615✔
2161
                // when the connection is closed so that the GC can release it.
615✔
2162
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
615✔
2163
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
615✔
2164
        }
615✔
2165

2166
        // Now walk the results and add them to our smap
2167
        rc := c.leaf.remoteCluster
1,266✔
2168
        c.leaf.smap = make(map[string]int32)
1,266✔
2169
        for _, sub := range subs {
39,111✔
2170
                // Check perms regardless of role.
37,845✔
2171
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,152✔
2172
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,307✔
2173
                        continue
2,307✔
2174
                }
2175
                // We ignore ourselves here.
2176
                // Also don't add the subscription if it has a origin cluster and the
2177
                // cluster name matches the one of the client we are sending to.
2178
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,686✔
2179
                        count := int32(1)
30,148✔
2180
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,160✔
2181
                                count = sub.qw
12✔
2182
                        }
12✔
2183
                        c.leaf.smap[keyFromSub(sub)] += count
30,148✔
2184
                        if c.leaf.tsub == nil {
31,337✔
2185
                                c.leaf.tsub = make(map[*subscription]struct{})
1,189✔
2186
                        }
1,189✔
2187
                        c.leaf.tsub[sub] = struct{}{}
30,148✔
2188
                }
2189
        }
2190
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2191
        for _, isubj := range ims {
7,844✔
2192
                c.leaf.smap[isubj]++
6,578✔
2193
        }
6,578✔
2194
        // If we have gateways enabled we need to make sure the other side sends us responses
2195
        // that have been augmented from the original subscription.
2196
        // TODO(dlc) - Should we lock this down more?
2197
        if applyGlobalRouting {
1,369✔
2198
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
103✔
2199
                c.leaf.smap[gwReplyPrefix+">"]++
103✔
2200
        }
103✔
2201
        // Detect loops by subscribing to a specific subject and checking
2202
        // if this sub is coming back to us.
2203
        c.leaf.smap[lds]++
1,266✔
2204

1,266✔
2205
        // Check if we need to add an existing siReply to our map.
1,266✔
2206
        // This will be a prefix so add on the wildcard.
1,266✔
2207
        if siReply != nil {
1,284✔
2208
                wcsub := append(siReply, '>')
18✔
2209
                c.leaf.smap[string(wcsub)]++
18✔
2210
        }
18✔
2211
        // Queue all protocols. There is no max pending limit for LN connection,
2212
        // so we don't need chunking. The writes will happen from the writeLoop.
2213
        var b bytes.Buffer
1,266✔
2214
        for key, n := range c.leaf.smap {
27,281✔
2215
                c.writeLeafSub(&b, key, n)
26,015✔
2216
        }
26,015✔
2217
        if b.Len() > 0 {
2,532✔
2218
                c.enqueueProto(b.Bytes())
1,266✔
2219
        }
1,266✔
2220
        if c.leaf.tsub != nil {
2,456✔
2221
                // Clear the tsub map after 5 seconds.
1,190✔
2222
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,225✔
2223
                        c.mu.Lock()
35✔
2224
                        if c.leaf != nil {
70✔
2225
                                c.leaf.tsub = nil
35✔
2226
                                c.leaf.tsubt = nil
35✔
2227
                        }
35✔
2228
                        c.mu.Unlock()
35✔
2229
                })
2230
        }
2231
}
2232

2233
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2234
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
201,287✔
2235
        acc, err := s.LookupAccount(accName)
201,287✔
2236
        if acc == nil || err != nil {
201,449✔
2237
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
162✔
2238
                return
162✔
2239
        }
162✔
2240
        acc.updateLeafNodes(sub, delta)
201,125✔
2241
}
2242

2243
// updateLeafNodes will make sure to update the account smap for the subscription.
2244
// Will also forward to all leaf nodes as needed.
2245
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,365,711✔
2246
        if acc == nil || sub == nil {
2,365,711✔
2247
                return
×
2248
        }
×
2249

2250
        // We will do checks for no leafnodes and same cluster here inline and under the
2251
        // general account read lock.
2252
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2253
        // blocking routes or GWs.
2254

2255
        acc.mu.RLock()
2,365,711✔
2256
        // First check if we even have leafnodes here.
2,365,711✔
2257
        if acc.nleafs == 0 {
4,664,637✔
2258
                acc.mu.RUnlock()
2,298,926✔
2259
                return
2,298,926✔
2260
        }
2,298,926✔
2261

2262
        // Is this a loop detection subject.
2263
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
66,785✔
2264

66,785✔
2265
        // Capture the cluster even if its empty.
66,785✔
2266
        var cluster string
66,785✔
2267
        if sub.origin != nil {
115,029✔
2268
                cluster = bytesToString(sub.origin)
48,244✔
2269
        }
48,244✔
2270

2271
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2272
        // Empty clusters will return false for the check.
2273
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
86,758✔
2274
                acc.mu.RUnlock()
19,973✔
2275
                return
19,973✔
2276
        }
19,973✔
2277

2278
        // We can release the general account lock.
2279
        acc.mu.RUnlock()
46,812✔
2280

46,812✔
2281
        // We can hold the list lock here to avoid having to copy a large slice.
46,812✔
2282
        acc.lmu.RLock()
46,812✔
2283
        defer acc.lmu.RUnlock()
46,812✔
2284

46,812✔
2285
        // Do this once.
46,812✔
2286
        subject := string(sub.subject)
46,812✔
2287

46,812✔
2288
        // Walk the connected leafnodes.
46,812✔
2289
        for _, ln := range acc.lleafs {
104,598✔
2290
                if ln == sub.client {
87,415✔
2291
                        continue
29,629✔
2292
                }
2293
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2294
                ln.mu.Lock()
28,157✔
2295
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
28,157✔
2296
                // the detection of loops as long as different cluster.
28,157✔
2297
                clusterDifferent := cluster != ln.remoteCluster()
28,157✔
2298
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
51,737✔
2299
                        ln.updateSmap(sub, delta, isLDS)
23,580✔
2300
                }
23,580✔
2301
                ln.mu.Unlock()
28,157✔
2302
        }
2303
}
2304

2305
// This will make an update to our internal smap and determine if we should send out
2306
// an interest update to the remote side.
2307
// Lock should be held.
2308
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,580✔
2309
        if c.leaf.smap == nil {
23,589✔
2310
                return
9✔
2311
        }
9✔
2312

2313
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2314
        skind := sub.client.kind
23,571✔
2315
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,571✔
2316
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,010✔
2317
                return
8,439✔
2318
        }
8,439✔
2319

2320
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2321
        if delta > 0 && c.leaf.tsub != nil {
22,448✔
2322
                if _, present := c.leaf.tsub[sub]; present {
7,316✔
2323
                        delete(c.leaf.tsub, sub)
×
2324
                        if len(c.leaf.tsub) == 0 {
×
2325
                                c.leaf.tsub = nil
×
2326
                                c.leaf.tsubt.Stop()
×
2327
                                c.leaf.tsubt = nil
×
2328
                        }
×
2329
                        return
×
2330
                }
2331
        }
2332

2333
        key := keyFromSub(sub)
15,132✔
2334
        n, ok := c.leaf.smap[key]
15,132✔
2335
        if delta < 0 && !ok {
16,036✔
2336
                return
904✔
2337
        }
904✔
2338

2339
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2340
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,228✔
2341
        n += delta
14,228✔
2342
        if n > 0 {
24,910✔
2343
                c.leaf.smap[key] = n
10,682✔
2344
        } else {
14,228✔
2345
                delete(c.leaf.smap, key)
3,546✔
2346
        }
3,546✔
2347
        if update {
23,666✔
2348
                c.sendLeafNodeSubUpdate(key, n)
9,438✔
2349
        }
9,438✔
2350
}
2351

2352
// Used to force add subjects to the subject map.
2353
func (c *client) forceAddToSmap(subj string) {
4✔
2354
        c.mu.Lock()
4✔
2355
        defer c.mu.Unlock()
4✔
2356

4✔
2357
        if c.leaf.smap == nil {
4✔
2358
                return
×
2359
        }
×
2360
        n := c.leaf.smap[subj]
4✔
2361
        if n != 0 {
5✔
2362
                return
1✔
2363
        }
1✔
2364
        // Place into the map since it was not there.
2365
        c.leaf.smap[subj] = 1
3✔
2366
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2367
}
2368

2369
// Used to force remove a subject from the subject map.
2370
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2371
        c.mu.Lock()
1✔
2372
        defer c.mu.Unlock()
1✔
2373

1✔
2374
        if c.leaf.smap == nil {
1✔
2375
                return
×
2376
        }
×
2377
        n := c.leaf.smap[subj]
1✔
2378
        if n == 0 {
1✔
2379
                return
×
2380
        }
×
2381
        n--
1✔
2382
        if n == 0 {
2✔
2383
                // Remove is now zero
1✔
2384
                delete(c.leaf.smap, subj)
1✔
2385
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2386
        } else {
1✔
2387
                c.leaf.smap[subj] = n
×
2388
        }
×
2389
}
2390

2391
// Send the subscription interest change to the other side.
2392
// Lock should be held.
2393
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,442✔
2394
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,442✔
2395
        if c.isSpokeLeafNode() {
11,745✔
2396
                checkPerms := true
2,303✔
2397
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,636✔
2398
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,333✔
2399
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,333✔
2400
                                strings.HasPrefix(key, gwReplyPrefix) {
1,416✔
2401
                                checkPerms = false
83✔
2402
                        }
83✔
2403
                }
2404
                if checkPerms {
4,523✔
2405
                        var subject string
2,220✔
2406
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,712✔
2407
                                subject = key[:sep]
492✔
2408
                        } else {
2,220✔
2409
                                subject = key
1,728✔
2410
                        }
1,728✔
2411
                        if !c.canSubscribe(subject) {
2,220✔
2412
                                return
×
2413
                        }
×
2414
                }
2415
        }
2416
        // If we are here we can send over to the other side.
2417
        _b := [64]byte{}
9,442✔
2418
        b := bytes.NewBuffer(_b[:0])
9,442✔
2419
        c.writeLeafSub(b, key, n)
9,442✔
2420
        c.enqueueProto(b.Bytes())
9,442✔
2421
}
2422

2423
// Helper function to build the key.
2424
func keyFromSub(sub *subscription) string {
46,274✔
2425
        var sb strings.Builder
46,274✔
2426
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
46,274✔
2427
        sb.Write(sub.subject)
46,274✔
2428
        if sub.queue != nil {
50,046✔
2429
                // Just make the key subject spc group, e.g. 'foo bar'
3,772✔
2430
                sb.WriteByte(' ')
3,772✔
2431
                sb.Write(sub.queue)
3,772✔
2432
        }
3,772✔
2433
        return sb.String()
46,274✔
2434
}
2435

2436
const (
2437
        keyRoutedSub         = "R"
2438
        keyRoutedSubByte     = 'R'
2439
        keyRoutedLeafSub     = "L"
2440
        keyRoutedLeafSubByte = 'L'
2441
)
2442

2443
// Helper function to build the key that prevents collisions between normal
2444
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2445
// Keys will look like this:
2446
// "R foo"          -> plain routed sub on "foo"
2447
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2448
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2449
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2450
func keyFromSubWithOrigin(sub *subscription) string {
658,754✔
2451
        var sb strings.Builder
658,754✔
2452
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
658,754✔
2453
        leaf := len(sub.origin) > 0
658,754✔
2454
        if leaf {
675,375✔
2455
                sb.WriteByte(keyRoutedLeafSubByte)
16,621✔
2456
        } else {
658,754✔
2457
                sb.WriteByte(keyRoutedSubByte)
642,133✔
2458
        }
642,133✔
2459
        sb.WriteByte(' ')
658,754✔
2460
        sb.Write(sub.subject)
658,754✔
2461
        if sub.queue != nil {
684,551✔
2462
                sb.WriteByte(' ')
25,797✔
2463
                sb.Write(sub.queue)
25,797✔
2464
        }
25,797✔
2465
        if leaf {
675,375✔
2466
                sb.WriteByte(' ')
16,621✔
2467
                sb.Write(sub.origin)
16,621✔
2468
        }
16,621✔
2469
        return sb.String()
658,754✔
2470
}
2471

2472
// Lock should be held.
2473
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,457✔
2474
        if key == _EMPTY_ {
35,457✔
2475
                return
×
2476
        }
×
2477
        if n > 0 {
67,367✔
2478
                w.WriteString("LS+ " + key)
31,910✔
2479
                // Check for queue semantics, if found write n.
31,910✔
2480
                if strings.Contains(key, " ") {
34,219✔
2481
                        w.WriteString(" ")
2,309✔
2482
                        var b [12]byte
2,309✔
2483
                        var i = len(b)
2,309✔
2484
                        for l := n; l > 0; l /= 10 {
5,516✔
2485
                                i--
3,207✔
2486
                                b[i] = digits[l%10]
3,207✔
2487
                        }
3,207✔
2488
                        w.Write(b[i:])
2,309✔
2489
                        if c.trace {
2,309✔
2490
                                arg := fmt.Sprintf("%s %d", key, n)
×
2491
                                c.traceOutOp("LS+", []byte(arg))
×
2492
                        }
×
2493
                } else if c.trace {
29,797✔
2494
                        c.traceOutOp("LS+", []byte(key))
196✔
2495
                }
196✔
2496
        } else {
3,547✔
2497
                w.WriteString("LS- " + key)
3,547✔
2498
                if c.trace {
3,557✔
2499
                        c.traceOutOp("LS-", []byte(key))
10✔
2500
                }
10✔
2501
        }
2502
        w.WriteString(CR_LF)
35,457✔
2503
}
2504

2505
// processLeafSub will process an inbound sub request for the remote leaf node.
2506
func (c *client) processLeafSub(argo []byte) (err error) {
31,641✔
2507
        // Indicate activity.
31,641✔
2508
        c.in.subs++
31,641✔
2509

31,641✔
2510
        srv := c.srv
31,641✔
2511
        if srv == nil {
31,641✔
2512
                return nil
×
2513
        }
×
2514

2515
        // Copy so we do not reference a potentially large buffer
2516
        arg := make([]byte, len(argo))
31,641✔
2517
        copy(arg, argo)
31,641✔
2518

31,641✔
2519
        args := splitArg(arg)
31,641✔
2520
        sub := &subscription{client: c}
31,641✔
2521

31,641✔
2522
        delta := int32(1)
31,641✔
2523
        switch len(args) {
31,641✔
2524
        case 1:
29,380✔
2525
                sub.queue = nil
29,380✔
2526
        case 3:
2,260✔
2527
                sub.queue = args[1]
2,260✔
2528
                sub.qw = int32(parseSize(args[2]))
2,260✔
2529
                // TODO: (ik) We should have a non empty queue name and a queue
2,260✔
2530
                // weight >= 1. For 2.11, we may want to return an error if that
2,260✔
2531
                // is not the case, but for now just overwrite `delta` if queue
2,260✔
2532
                // weight is greater than 1 (it is possible after a reconnect/
2,260✔
2533
                // server restart to receive a queue weight > 1 for a new sub).
2,260✔
2534
                if sub.qw > 1 {
3,906✔
2535
                        delta = sub.qw
1,646✔
2536
                }
1,646✔
2537
        default:
1✔
2538
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
1✔
2539
        }
2540
        sub.subject = args[0]
31,640✔
2541

31,640✔
2542
        c.mu.Lock()
31,640✔
2543
        if c.isClosed() {
31,654✔
2544
                c.mu.Unlock()
14✔
2545
                return nil
14✔
2546
        }
14✔
2547

2548
        acc := c.acc
31,626✔
2549
        // Check if we have a loop.
31,626✔
2550
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,626✔
2551

31,626✔
2552
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,631✔
2553
                c.mu.Unlock()
5✔
2554
                c.handleLeafNodeLoop(true)
5✔
2555
                return nil
5✔
2556
        }
5✔
2557

2558
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2559
        checkPerms := true
31,621✔
2560
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,347✔
2561
                if ldsPrefix ||
28,726✔
2562
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,726✔
2563
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,673✔
2564
                        checkPerms = false
1,947✔
2565
                }
1,947✔
2566
        }
2567

2568
        // If we are a hub check that we can publish to this subject.
2569
        if checkPerms {
61,295✔
2570
                subj := string(sub.subject)
29,674✔
2571
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
29,993✔
2572
                        c.mu.Unlock()
319✔
2573
                        c.leafSubPermViolation(sub.subject)
319✔
2574
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
319✔
2575
                        return nil
319✔
2576
                }
319✔
2577
        }
2578

2579
        // Check if we have a maximum on the number of subscriptions.
2580
        if c.subsAtLimit() {
31,310✔
2581
                c.mu.Unlock()
8✔
2582
                c.maxSubsExceeded()
8✔
2583
                return nil
8✔
2584
        }
8✔
2585

2586
        // If we have an origin cluster associated mark that in the sub.
2587
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,488✔
2588
                sub.origin = []byte(rc)
28,194✔
2589
        }
28,194✔
2590

2591
        // Like Routes, we store local subs by account and subject and optionally queue name.
2592
        // If we have a queue it will have a trailing weight which we do not want.
2593
        if sub.queue != nil {
33,269✔
2594
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,975✔
2595
        } else {
31,294✔
2596
                sub.sid = arg
29,319✔
2597
        }
29,319✔
2598
        key := bytesToString(sub.sid)
31,294✔
2599
        osub := c.subs[key]
31,294✔
2600
        if osub == nil {
61,076✔
2601
                c.subs[key] = sub
29,782✔
2602
                // Now place into the account sl.
29,782✔
2603
                if err := acc.sl.Insert(sub); err != nil {
29,782✔
2604
                        delete(c.subs, key)
×
2605
                        c.mu.Unlock()
×
2606
                        c.Errorf("Could not insert subscription: %v", err)
×
2607
                        c.sendErr("Invalid Subscription")
×
2608
                        return nil
×
2609
                }
×
2610
        } else if sub.queue != nil {
3,023✔
2611
                // For a queue we need to update the weight.
1,511✔
2612
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,511✔
2613
                atomic.StoreInt32(&osub.qw, sub.qw)
1,511✔
2614
                acc.sl.UpdateRemoteQSub(osub)
1,511✔
2615
        }
1,511✔
2616
        spoke := c.isSpokeLeafNode()
31,294✔
2617
        c.mu.Unlock()
31,294✔
2618

31,294✔
2619
        // Only add in shadow subs if a new sub or qsub.
31,294✔
2620
        if osub == nil {
61,076✔
2621
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,782✔
2622
                        c.Errorf(err.Error())
×
2623
                }
×
2624
        }
2625

2626
        // If we are not solicited, treat leaf node subscriptions similar to a
2627
        // client subscription, meaning we forward them to routes, gateways and
2628
        // other leaf nodes as needed.
2629
        if !spoke {
42,258✔
2630
                // If we are routing add to the route map for the associated account.
10,964✔
2631
                srv.updateRouteSubscriptionMap(acc, sub, delta)
10,964✔
2632
                if srv.gateway.enabled {
12,491✔
2633
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,527✔
2634
                }
1,527✔
2635
        }
2636
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2637
        // and non-solicited state in this call so we will do the right thing.
2638
        acc.updateLeafNodes(sub, delta)
31,294✔
2639

31,294✔
2640
        return nil
31,294✔
2641
}
2642

2643
// If the leafnode is a solicited, set the connect delay based on default
2644
// or private option (for tests). Sends the error to the other side, log and
2645
// close the connection.
2646
func (c *client) handleLeafNodeLoop(sendErr bool) {
14✔
2647
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
14✔
2648
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
14✔
2649
        if sendErr {
21✔
2650
                c.sendErr(errTxt)
7✔
2651
        }
7✔
2652

2653
        c.Errorf(errTxt)
14✔
2654
        // If we are here with "sendErr" false, it means that this is the server
14✔
2655
        // that received the error. The other side will have closed the connection,
14✔
2656
        // but does not hurt to close here too.
14✔
2657
        c.closeConnection(ProtocolViolation)
14✔
2658
}
2659

2660
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2661
func (c *client) processLeafUnsub(arg []byte) error {
3,327✔
2662
        // Indicate any activity, so pub and sub or unsubs.
3,327✔
2663
        c.in.subs++
3,327✔
2664

3,327✔
2665
        acc := c.acc
3,327✔
2666
        srv := c.srv
3,327✔
2667

3,327✔
2668
        c.mu.Lock()
3,327✔
2669
        if c.isClosed() {
3,369✔
2670
                c.mu.Unlock()
42✔
2671
                return nil
42✔
2672
        }
42✔
2673

2674
        spoke := c.isSpokeLeafNode()
3,285✔
2675
        // We store local subs by account and subject and optionally queue name.
3,285✔
2676
        // LS- will have the arg exactly as the key.
3,285✔
2677
        sub, ok := c.subs[string(arg)]
3,285✔
2678
        if !ok {
3,299✔
2679
                // If not found, don't try to update routes/gws/leaf nodes.
14✔
2680
                c.mu.Unlock()
14✔
2681
                return nil
14✔
2682
        }
14✔
2683
        delta := int32(1)
3,271✔
2684
        if len(sub.queue) > 0 {
3,699✔
2685
                delta = sub.qw
428✔
2686
        }
428✔
2687
        c.mu.Unlock()
3,271✔
2688

3,271✔
2689
        c.unsubscribe(acc, sub, true, true)
3,271✔
2690
        if !spoke {
4,303✔
2691
                // If we are routing subtract from the route map for the associated account.
1,032✔
2692
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,032✔
2693
                // Gateways
1,032✔
2694
                if srv.gateway.enabled {
1,314✔
2695
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
282✔
2696
                }
282✔
2697
        }
2698
        // Now check on leafnode updates for other leaf nodes.
2699
        acc.updateLeafNodes(sub, -delta)
3,271✔
2700
        return nil
3,271✔
2701
}
2702

2703
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
474✔
2704
        // Unroll splitArgs to avoid runtime/heap issues
474✔
2705
        a := [MAX_MSG_ARGS][]byte{}
474✔
2706
        args := a[:0]
474✔
2707
        start := -1
474✔
2708
        for i, b := range arg {
31,431✔
2709
                switch b {
30,957✔
2710
                case ' ', '\t', '\r', '\n':
1,361✔
2711
                        if start >= 0 {
2,722✔
2712
                                args = append(args, arg[start:i])
1,361✔
2713
                                start = -1
1,361✔
2714
                        }
1,361✔
2715
                default:
29,596✔
2716
                        if start < 0 {
31,431✔
2717
                                start = i
1,835✔
2718
                        }
1,835✔
2719
                }
2720
        }
2721
        if start >= 0 {
948✔
2722
                args = append(args, arg[start:])
474✔
2723
        }
474✔
2724

2725
        c.pa.arg = arg
474✔
2726
        switch len(args) {
474✔
2727
        case 0, 1, 2:
×
2728
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2729
        case 3:
79✔
2730
                c.pa.reply = nil
79✔
2731
                c.pa.queues = nil
79✔
2732
                c.pa.hdb = args[1]
79✔
2733
                c.pa.hdr = parseSize(args[1])
79✔
2734
                c.pa.szb = args[2]
79✔
2735
                c.pa.size = parseSize(args[2])
79✔
2736
        case 4:
381✔
2737
                c.pa.reply = args[1]
381✔
2738
                c.pa.queues = nil
381✔
2739
                c.pa.hdb = args[2]
381✔
2740
                c.pa.hdr = parseSize(args[2])
381✔
2741
                c.pa.szb = args[3]
381✔
2742
                c.pa.size = parseSize(args[3])
381✔
2743
        default:
14✔
2744
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2745
                if len(args[1]) != 1 {
14✔
2746
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2747
                }
×
2748
                switch args[1][0] {
14✔
2749
                case '+':
4✔
2750
                        c.pa.reply = args[2]
4✔
2751
                case '|':
10✔
2752
                        c.pa.reply = nil
10✔
2753
                default:
×
2754
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2755
                }
2756
                // Grab header size.
2757
                c.pa.hdb = args[len(args)-2]
14✔
2758
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2759

14✔
2760
                // Grab size.
14✔
2761
                c.pa.szb = args[len(args)-1]
14✔
2762
                c.pa.size = parseSize(c.pa.szb)
14✔
2763

14✔
2764
                // Grab queue names.
14✔
2765
                if c.pa.reply != nil {
18✔
2766
                        c.pa.queues = args[3 : len(args)-2]
4✔
2767
                } else {
14✔
2768
                        c.pa.queues = args[2 : len(args)-2]
10✔
2769
                }
10✔
2770
        }
2771
        if c.pa.hdr < 0 {
474✔
2772
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2773
        }
×
2774
        if c.pa.size < 0 {
474✔
2775
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2776
        }
×
2777
        if c.pa.hdr > c.pa.size {
475✔
2778
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
1✔
2779
        }
1✔
2780

2781
        // Common ones processed after check for arg length
2782
        c.pa.subject = args[0]
473✔
2783

473✔
2784
        return nil
473✔
2785
}
2786

2787
func (c *client) processLeafMsgArgs(arg []byte) error {
105,499✔
2788
        // Unroll splitArgs to avoid runtime/heap issues
105,499✔
2789
        a := [MAX_MSG_ARGS][]byte{}
105,499✔
2790
        args := a[:0]
105,499✔
2791
        start := -1
105,499✔
2792
        for i, b := range arg {
3,395,108✔
2793
                switch b {
3,289,609✔
2794
                case ' ', '\t', '\r', '\n':
156,754✔
2795
                        if start >= 0 {
313,508✔
2796
                                args = append(args, arg[start:i])
156,754✔
2797
                                start = -1
156,754✔
2798
                        }
156,754✔
2799
                default:
3,132,855✔
2800
                        if start < 0 {
3,395,108✔
2801
                                start = i
262,253✔
2802
                        }
262,253✔
2803
                }
2804
        }
2805
        if start >= 0 {
210,998✔
2806
                args = append(args, arg[start:])
105,499✔
2807
        }
105,499✔
2808

2809
        c.pa.arg = arg
105,499✔
2810
        switch len(args) {
105,499✔
2811
        case 0, 1:
×
2812
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2813
        case 2:
76,949✔
2814
                c.pa.reply = nil
76,949✔
2815
                c.pa.queues = nil
76,949✔
2816
                c.pa.szb = args[1]
76,949✔
2817
                c.pa.size = parseSize(args[1])
76,949✔
2818
        case 3:
6,007✔
2819
                c.pa.reply = args[1]
6,007✔
2820
                c.pa.queues = nil
6,007✔
2821
                c.pa.szb = args[2]
6,007✔
2822
                c.pa.size = parseSize(args[2])
6,007✔
2823
        default:
22,543✔
2824
                // args[1] is our reply indicator. Should be + or | normally.
22,543✔
2825
                if len(args[1]) != 1 {
22,544✔
2826
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
1✔
2827
                }
1✔
2828
                switch args[1][0] {
22,542✔
2829
                case '+':
161✔
2830
                        c.pa.reply = args[2]
161✔
2831
                case '|':
22,381✔
2832
                        c.pa.reply = nil
22,381✔
2833
                default:
×
2834
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2835
                }
2836
                // Grab size.
2837
                c.pa.szb = args[len(args)-1]
22,542✔
2838
                c.pa.size = parseSize(c.pa.szb)
22,542✔
2839

22,542✔
2840
                // Grab queue names.
22,542✔
2841
                if c.pa.reply != nil {
22,703✔
2842
                        c.pa.queues = args[3 : len(args)-1]
161✔
2843
                } else {
22,542✔
2844
                        c.pa.queues = args[2 : len(args)-1]
22,381✔
2845
                }
22,381✔
2846
        }
2847
        if c.pa.size < 0 {
105,498✔
2848
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2849
        }
×
2850

2851
        // Common ones processed after check for arg length
2852
        c.pa.subject = args[0]
105,498✔
2853

105,498✔
2854
        return nil
105,498✔
2855
}
2856

2857
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2858
func (c *client) processInboundLeafMsg(msg []byte) {
104,011✔
2859
        // Update statistics
104,011✔
2860
        // The msg includes the CR_LF, so pull back out for accounting.
104,011✔
2861
        c.in.msgs++
104,011✔
2862
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
104,011✔
2863

104,011✔
2864
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
104,011✔
2865

104,011✔
2866
        // Mostly under testing scenarios.
104,011✔
2867
        if srv == nil || acc == nil {
104,013✔
2868
                return
2✔
2869
        }
2✔
2870

2871
        // Match the subscriptions. We will use our own L1 map if
2872
        // it's still valid, avoiding contention on the shared sublist.
2873
        var r *SublistResult
104,009✔
2874
        var ok bool
104,009✔
2875

104,009✔
2876
        genid := atomic.LoadUint64(&c.acc.sl.genid)
104,009✔
2877
        if genid == c.in.genid && c.in.results != nil {
205,672✔
2878
                r, ok = c.in.results[subject]
101,663✔
2879
        } else {
104,009✔
2880
                // Reset our L1 completely.
2,346✔
2881
                c.in.results = make(map[string]*SublistResult)
2,346✔
2882
                c.in.genid = genid
2,346✔
2883
        }
2,346✔
2884

2885
        // Go back to the sublist data structure.
2886
        if !ok {
178,183✔
2887
                r = c.acc.sl.Match(subject)
74,174✔
2888
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
74,174✔
2889
                if len(c.in.results) >= maxResultCacheSize {
76,249✔
2890
                        n := 0
2,075✔
2891
                        for subj := range c.in.results {
70,550✔
2892
                                delete(c.in.results, subj)
68,475✔
2893
                                if n++; n > pruneSize {
70,550✔
2894
                                        break
2,075✔
2895
                                }
2896
                        }
2897
                }
2898
                // Then add the new cache entry.
2899
                c.in.results[subject] = r
74,174✔
2900
        }
2901

2902
        // Collect queue names if needed.
2903
        var qnames [][]byte
104,009✔
2904

104,009✔
2905
        // Check for no interest, short circuit if so.
104,009✔
2906
        // This is the fanout scale.
104,009✔
2907
        if len(r.psubs)+len(r.qsubs) > 0 {
207,517✔
2908
                flag := pmrNoFlag
103,508✔
2909
                // If we have queue subs in this cluster, then if we run in gateway
103,508✔
2910
                // mode and the remote gateways have queue subs, then we need to
103,508✔
2911
                // collect the queue groups this message was sent to so that we
103,508✔
2912
                // exclude them when sending to gateways.
103,508✔
2913
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
103,508✔
2914
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
115,758✔
2915
                        flag |= pmrCollectQueueNames
12,250✔
2916
                }
12,250✔
2917
                // If this is a mapped subject that means the mapped interest
2918
                // is what got us here, but this might not have a queue designation
2919
                // If that is the case, make sure we ignore to process local queue subscribers.
2920
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
103,806✔
2921
                        flag |= pmrIgnoreEmptyQueueFilter
298✔
2922
                }
298✔
2923
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
103,508✔
2924
        }
2925

2926
        // Now deal with gateways
2927
        if c.srv.gateway.enabled {
117,351✔
2928
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,342✔
2929
        }
13,342✔
2930
}
2931

2932
// Handles a subscription permission violation.
2933
// See leafPermViolation() for details.
2934
func (c *client) leafSubPermViolation(subj []byte) {
319✔
2935
        c.leafPermViolation(false, subj)
319✔
2936
}
319✔
2937

2938
// Common function to process publish or subscribe leafnode permission violation.
2939
// Sends the permission violation error to the remote, logs it and closes the connection.
2940
// If this is from a server soliciting, the reconnection will be delayed.
2941
func (c *client) leafPermViolation(pub bool, subj []byte) {
319✔
2942
        if c.isSpokeLeafNode() {
638✔
2943
                // For spokes these are no-ops since the hub server told us our permissions.
319✔
2944
                // We just need to not send these over to the other side since we will get cutoff.
319✔
2945
                return
319✔
2946
        }
319✔
2947
        // FIXME(dlc) ?
2948
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2949
        var action string
×
2950
        if pub {
×
2951
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2952
                action = "Publish"
×
2953
        } else {
×
2954
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2955
                action = "Subscription"
×
2956
        }
×
2957
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2958
        // TODO: add a new close reason that is more appropriate?
×
2959
        c.closeConnection(ProtocolViolation)
×
2960
}
2961

2962
// Invoked from generic processErr() for LEAF connections.
2963
func (c *client) leafProcessErr(errStr string) {
32✔
2964
        // Check if we got a cluster name collision.
32✔
2965
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
36✔
2966
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
4✔
2967
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
4✔
2968
                return
4✔
2969
        }
4✔
2970

2971
        // We will look for Loop detected error coming from the other side.
2972
        // If we solicit, set the connect delay.
2973
        if !strings.Contains(errStr, "Loop detected") {
49✔
2974
                return
21✔
2975
        }
21✔
2976
        c.handleLeafNodeLoop(false)
7✔
2977
}
2978

2979
// If this leaf connection solicits, sets the connect delay to the given value,
2980
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2981
// Returns the connection's account name and delay.
2982
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
18✔
2983
        c.mu.Lock()
18✔
2984
        if c.isSolicitedLeafNode() {
29✔
2985
                if s := c.srv; s != nil {
22✔
2986
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
2987
                                delay = srvdelay
4✔
2988
                        }
4✔
2989
                }
2990
                c.leaf.remote.setConnectDelay(delay)
11✔
2991
        }
2992
        accName := c.acc.Name
18✔
2993
        c.mu.Unlock()
18✔
2994
        return accName, delay
18✔
2995
}
2996

2997
// For the given remote Leafnode configuration, this function returns
2998
// if TLS is required, and if so, will return a clone of the TLS Config
2999
// (since some fields will be changed during handshake), the TLS server
3000
// name that is remembered, and the TLS timeout.
3001
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,832✔
3002
        var (
1,832✔
3003
                tlsConfig  *tls.Config
1,832✔
3004
                tlsName    string
1,832✔
3005
                tlsTimeout float64
1,832✔
3006
        )
1,832✔
3007

1,832✔
3008
        remote.RLock()
1,832✔
3009
        defer remote.RUnlock()
1,832✔
3010

1,832✔
3011
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,832✔
3012
        if tlsRequired {
1,913✔
3013
                if remote.TLSConfig != nil {
133✔
3014
                        tlsConfig = remote.TLSConfig.Clone()
52✔
3015
                } else {
81✔
3016
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
29✔
3017
                }
29✔
3018
                tlsName = remote.tlsName
81✔
3019
                tlsTimeout = remote.TLSTimeout
81✔
3020
                if tlsTimeout == 0 {
127✔
3021
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
46✔
3022
                }
46✔
3023
        }
3024

3025
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,832✔
3026
}
3027

3028
// Initiates the LeafNode Websocket connection by:
3029
// - doing the TLS handshake if needed
3030
// - sending the HTTP request
3031
// - waiting for the HTTP response
3032
//
3033
// Since some bufio reader is used to consume the HTTP response, this function
3034
// returns the slice of buffered bytes (if any) so that the readLoop that will
3035
// be started after that consume those first before reading from the socket.
3036
// The boolean
3037
//
3038
// Lock held on entry.
3039
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
3040
        remote.RLock()
43✔
3041
        compress := remote.Websocket.Compression
43✔
3042
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
3043
        noMasking := remote.Websocket.NoMasking
43✔
3044
        infoTimeout := remote.FirstInfoTimeout
43✔
3045
        remote.RUnlock()
43✔
3046
        // Will do the client-side TLS handshake if needed.
43✔
3047
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
3048
        if err != nil {
47✔
3049
                // 0 will indicate that the connection was already closed
4✔
3050
                return nil, 0, err
4✔
3051
        }
4✔
3052

3053
        // For http request, we need the passed URL to contain either http or https scheme.
3054
        scheme := "http"
39✔
3055
        if tlsRequired {
47✔
3056
                scheme = "https"
8✔
3057
        }
8✔
3058
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3059
        // create a LEAF connection, not a CLIENT.
3060
        // In case we use the user's URL path in the future, make sure we append the user's
3061
        // path to our `/leafnode` path.
3062
        lpath := leafNodeWSPath
39✔
3063
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
3064
                if curPath[0] == '/' {
42✔
3065
                        curPath = curPath[1:]
21✔
3066
                }
21✔
3067
                lpath = path.Join(curPath, lpath)
21✔
3068
        } else {
18✔
3069
                lpath = lpath[1:]
18✔
3070
        }
18✔
3071
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3072
        u, _ := url.Parse(ustr)
39✔
3073
        req := &http.Request{
39✔
3074
                Method:     "GET",
39✔
3075
                URL:        u,
39✔
3076
                Proto:      "HTTP/1.1",
39✔
3077
                ProtoMajor: 1,
39✔
3078
                ProtoMinor: 1,
39✔
3079
                Header:     make(http.Header),
39✔
3080
                Host:       u.Host,
39✔
3081
        }
39✔
3082
        wsKey, err := wsMakeChallengeKey()
39✔
3083
        if err != nil {
39✔
3084
                return nil, WriteError, err
×
3085
        }
×
3086

3087
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3088
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3089
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3090
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3091
        if compress {
48✔
3092
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3093
        }
9✔
3094
        if noMasking {
49✔
3095
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3096
        }
10✔
3097
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3098
        if err := req.Write(c.nc); err != nil {
39✔
3099
                return nil, WriteError, err
×
3100
        }
×
3101

3102
        var resp *http.Response
39✔
3103

39✔
3104
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3105
        resp, err = http.ReadResponse(br, req)
39✔
3106
        if err == nil &&
39✔
3107
                (resp.StatusCode != 101 ||
39✔
3108
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3109
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3110
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3111

1✔
3112
                err = fmt.Errorf("invalid websocket connection")
1✔
3113
        }
1✔
3114
        // Check compression extension...
3115
        if err == nil && c.ws.compress {
48✔
3116
                // Check that not only permessage-deflate extension is present, but that
9✔
3117
                // we also have server and client no context take over.
9✔
3118
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3119

9✔
3120
                // If server does not support compression, then simply disable it in our side.
9✔
3121
                if !srvCompress {
13✔
3122
                        c.ws.compress = false
4✔
3123
                } else if !noCtxTakeover {
9✔
3124
                        err = fmt.Errorf("compression negotiation error")
×
3125
                }
×
3126
        }
3127
        // Same for no masking...
3128
        if err == nil && noMasking {
49✔
3129
                // Check if server accepts no masking
10✔
3130
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3131
                        // Nope, need to mask our writes as any client would do.
1✔
3132
                        c.ws.maskwrite = true
1✔
3133
                }
1✔
3134
        }
3135
        if resp != nil {
67✔
3136
                resp.Body.Close()
28✔
3137
        }
28✔
3138
        if err != nil {
51✔
3139
                return nil, ReadError, err
12✔
3140
        }
12✔
3141
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3142

27✔
3143
        var preBuf []byte
27✔
3144
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3145
        if n := br.Buffered(); n != 0 {
27✔
3146
                preBuf, _ = br.Peek(n)
×
3147
        }
×
3148
        return preBuf, 0, nil
27✔
3149
}
3150

3151
const connectProcessTimeout = 2 * time.Second
3152

3153
// This is invoked for remote LEAF remote connections after processing the INFO
3154
// protocol.
3155
func (s *Server) leafNodeResumeConnectProcess(c *client) {
640✔
3156
        clusterName := s.ClusterName()
640✔
3157

640✔
3158
        c.mu.Lock()
640✔
3159
        if c.isClosed() {
640✔
3160
                c.mu.Unlock()
×
3161
                return
×
3162
        }
×
3163
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
642✔
3164
                c.mu.Unlock()
2✔
3165
                c.closeConnection(WriteError)
2✔
3166
                return
2✔
3167
        }
2✔
3168

3169
        // Spin up the write loop.
3170
        s.startGoRoutine(func() { c.writeLoop() })
1,276✔
3171

3172
        // timeout leafNodeFinishConnectProcess
3173
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
638✔
3174
                c.mu.Lock()
×
3175
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3176
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3177
                        c.mu.Unlock()
×
3178
                        return
×
3179
                }
×
3180
                clearTimer(&c.ping.tmr)
×
3181
                closed := c.isClosed()
×
3182
                c.mu.Unlock()
×
3183
                if !closed {
×
3184
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3185
                        c.closeConnection(StaleConnection)
×
3186
                }
×
3187
        })
3188
        c.mu.Unlock()
638✔
3189
        c.Debugf("Remote leafnode connect msg sent")
638✔
3190
}
3191

3192
// This is invoked for remote LEAF connections after processing the INFO
3193
// protocol and leafNodeResumeConnectProcess.
3194
// This will send LS+ the CONNECT protocol and register the leaf node.
3195
func (s *Server) leafNodeFinishConnectProcess(c *client) {
617✔
3196
        c.mu.Lock()
617✔
3197
        if !c.flags.setIfNotSet(connectProcessFinished) {
617✔
3198
                c.mu.Unlock()
×
3199
                return
×
3200
        }
×
3201
        if c.isClosed() {
617✔
3202
                c.mu.Unlock()
×
3203
                s.removeLeafNodeConnection(c)
×
3204
                return
×
3205
        }
×
3206
        remote := c.leaf.remote
617✔
3207
        // Check if we will need to send the system connect event.
617✔
3208
        remote.RLock()
617✔
3209
        sendSysConnectEvent := remote.Hub
617✔
3210
        remote.RUnlock()
617✔
3211

617✔
3212
        // Capture account before releasing lock
617✔
3213
        acc := c.acc
617✔
3214
        // cancel connectProcessTimeout
617✔
3215
        clearTimer(&c.ping.tmr)
617✔
3216
        c.mu.Unlock()
617✔
3217

617✔
3218
        // Make sure we register with the account here.
617✔
3219
        if err := c.registerWithAccount(acc); err != nil {
619✔
3220
                if err == ErrTooManyAccountConnections {
2✔
3221
                        c.maxAccountConnExceeded()
×
3222
                        return
×
3223
                } else if err == ErrLeafNodeLoop {
4✔
3224
                        c.handleLeafNodeLoop(true)
2✔
3225
                        return
2✔
3226
                }
2✔
3227
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3228
                c.closeConnection(ProtocolViolation)
×
3229
                return
×
3230
        }
3231
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
615✔
3232
        s.initLeafNodeSmapAndSendSubs(c)
615✔
3233
        if sendSysConnectEvent {
621✔
3234
                s.sendLeafNodeConnect(acc)
6✔
3235
        }
6✔
3236

3237
        // The above functions are not atomically under the client
3238
        // lock doing those operations. It is possible - since we
3239
        // have started the read/write loops - that the connection
3240
        // is closed before or in between. This would leave the
3241
        // closed LN connection possible registered with the account
3242
        // and/or the server's leafs map. So check if connection
3243
        // is closed, and if so, manually cleanup.
3244
        c.mu.Lock()
615✔
3245
        closed := c.isClosed()
615✔
3246
        if !closed {
1,230✔
3247
                c.setFirstPingTimer()
615✔
3248
        }
615✔
3249
        c.mu.Unlock()
615✔
3250
        if closed {
615✔
3251
                s.removeLeafNodeConnection(c)
×
3252
                if prev := acc.removeClient(c); prev == 1 {
×
3253
                        s.decActiveAccounts()
×
3254
                }
×
3255
        }
3256
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc