• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 23474196077

23 Mar 2026 10:09AM UTC coverage: 81.839% (-1.3%) from 83.107%
23474196077

push

github

web-flow
Update dependencies (#7979)

Signed-off-by: Neil Twigg <neil@nats.io>

74257 of 90736 relevant lines covered (81.84%)

377851.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.17
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // When a soliciting leafnode is rejected because it does not meet the
67
        // configured minimum version, delay the next reconnect attempt by this long.
68
        leafNodeMinVersionReconnectDelay = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remote server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Whether or not we want to propagate east-west interest from other LNs.
86
        isolated bool
87
        // Used to suppress sub and unsub interest. Same as routes but our audience
88
        // here is tied to this leaf node. This will hold all subscriptions except this
89
        // leaf nodes. This represents all the interest we want to send to the other side.
90
        smap map[string]int32
91
        // This map will contain all the subscriptions that have been added to the smap
92
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
93
        // race between processing of a sub where sub is added to account sublist but
94
        // updateSmap has not be called on that "thread", while in the LN readloop,
95
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
96
        // this subscription to smap. When processing of the sub then calls updateSmap,
97
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
98
        tsub  map[*subscription]struct{}
99
        tsubt *time.Timer
100
        // Selected compression mode, which may be different from the server configured mode.
101
        compression string
102
        // This is for GW map replies.
103
        gwSub *subscription
104
}
105

106
// Used for remote (solicited) leafnodes.
107
type leafNodeCfg struct {
108
        sync.RWMutex
109
        *RemoteLeafOpts
110
        urls           []*url.URL
111
        curURL         *url.URL
112
        tlsName        string
113
        username       string
114
        password       string
115
        perms          *Permissions
116
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
117
        jsMigrateTimer *time.Timer
118
}
119

120
// Check to see if this is a solicited leafnode. We do special processing for solicited.
121
func (c *client) isSolicitedLeafNode() bool {
2,042✔
122
        return c.kind == LEAF && c.leaf.remote != nil
2,042✔
123
}
2,042✔
124

125
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
126
// connection leafnode where the otherside has declared itself to be the hub.
127
func (c *client) isSpokeLeafNode() bool {
6,613,512✔
128
        return c.kind == LEAF && c.leaf.isSpoke
6,613,512✔
129
}
6,613,512✔
130

131
func (c *client) isHubLeafNode() bool {
17,777✔
132
        return c.kind == LEAF && !c.leaf.isSpoke
17,777✔
133
}
17,777✔
134

135
func (c *client) isIsolatedLeafNode() bool {
9,658✔
136
        // TODO(nat): In future we may want to pass in and consider an isolation
9,658✔
137
        // group name here, which the hub and/or leaf could provide, so that we
9,658✔
138
        // can isolate away certain LNs but not others on an opt-in basis. For
9,658✔
139
        // now we will just isolate all LN interest until then.
9,658✔
140
        return c.kind == LEAF && c.leaf.isolated
9,658✔
141
}
9,658✔
142

143
// This will spin up go routines to solicit the remote leaf node connections.
144
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,206✔
145
        sysAccName := _EMPTY_
1,206✔
146
        sAcc := s.SystemAccount()
1,206✔
147
        if sAcc != nil {
2,389✔
148
                sysAccName = sAcc.Name
1,183✔
149
        }
1,183✔
150
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,554✔
151
                s.mu.Lock()
1,348✔
152
                remote := newLeafNodeCfg(r)
1,348✔
153
                creds := remote.Credentials
1,348✔
154
                accName := remote.LocalAccount
1,348✔
155
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
1,348✔
156
                // Print notice if
1,348✔
157
                if isSysAccRemote {
1,439✔
158
                        if len(remote.DenyExports) > 0 {
92✔
159
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
160
                        }
1✔
161
                        if len(remote.DenyImports) > 0 {
92✔
162
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
163
                        }
1✔
164
                }
165
                s.mu.Unlock()
1,348✔
166
                if creds != _EMPTY_ {
1,400✔
167
                        contents, err := os.ReadFile(creds)
52✔
168
                        defer wipeSlice(contents)
52✔
169
                        if err != nil {
52✔
170
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
171
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
172
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
173
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
174
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
175
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
176
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
177
                        } else if isSysAccRemote {
56✔
178
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
179
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
180
                                }
1✔
181
                        } else {
48✔
182
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
183
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
184
                                }
6✔
185
                        }
186
                }
187
                return remote
1,348✔
188
        }
189
        for _, r := range remotes {
2,554✔
190
                // We need to call this, even if the leaf is disabled. This is so that
1,348✔
191
                // the number of internal configuration matches the options' remote leaf
1,348✔
192
                // configuration required for configuration reload.
1,348✔
193
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,348✔
194
                if !r.Disabled {
2,695✔
195
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
2,694✔
196
                }
197
        }
198
}
199

200
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
6,839✔
201
        if remote.Disabled {
6,840✔
202
                return false
1✔
203
        }
1✔
204
        for _, ri := range s.getOpts().LeafNode.Remotes {
14,050✔
205
                // FIXME(dlc) - What about auth changes?
7,212✔
206
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
14,050✔
207
                        return true
6,838✔
208
                }
6,838✔
209
        }
210
        return false
×
211
}
212

213
// Ensure that leafnode is properly configured.
214
func validateLeafNode(o *Options) error {
7,772✔
215
        if err := validateLeafNodeAuthOptions(o); err != nil {
7,774✔
216
                return err
2✔
217
        }
2✔
218

219
        // Users can bind to any local account, if its empty we will assume the $G account.
220
        for _, r := range o.LeafNode.Remotes {
9,163✔
221
                if r.LocalAccount == _EMPTY_ {
1,828✔
222
                        r.LocalAccount = globalAccountName
435✔
223
                }
435✔
224
        }
225

226
        // In local config mode, check that leafnode configuration refers to accounts that exist.
227
        if len(o.TrustedOperators) == 0 {
15,226✔
228
                accNames := map[string]struct{}{}
7,456✔
229
                for _, a := range o.Accounts {
15,914✔
230
                        accNames[a.Name] = struct{}{}
8,458✔
231
                }
8,458✔
232
                // global account is always created
233
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,456✔
234
                // in the context of leaf nodes, empty account means global account
7,456✔
235
                accNames[_EMPTY_] = struct{}{}
7,456✔
236
                // system account either exists or, if not disabled, will be created
7,456✔
237
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,376✔
238
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
5,920✔
239
                }
5,920✔
240
                checkAccountExists := func(accName string, cfgType string) error {
16,311✔
241
                        if _, ok := accNames[accName]; !ok {
8,857✔
242
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
243
                        }
2✔
244
                        return nil
8,853✔
245
                }
246
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,457✔
247
                        return err
1✔
248
                }
1✔
249
                for _, lu := range o.LeafNode.Users {
7,472✔
250
                        if lu.Account == nil { // means global account
27✔
251
                                continue
10✔
252
                        }
253
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
254
                                return err
×
255
                        }
×
256
                }
257
                for _, r := range o.LeafNode.Remotes {
8,847✔
258
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,393✔
259
                                return err
1✔
260
                        }
1✔
261
                }
262
        } else {
314✔
263
                if len(o.LeafNode.Users) != 0 {
315✔
264
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
265
                }
1✔
266
                for _, r := range o.LeafNode.Remotes {
314✔
267
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
268
                                return fmt.Errorf(
1✔
269
                                        "operator mode requires account nkeys in remotes. " +
1✔
270
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
271
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
272
                        }
1✔
273
                }
274
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
313✔
275
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
276
                }
1✔
277
        }
278

279
        // Validate compression settings
280
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,049✔
281
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,289✔
282
                        return err
5✔
283
                }
5✔
284
        }
285

286
        // If a remote has a websocket scheme, all need to have it.
287
        for _, rcfg := range o.LeafNode.Remotes {
9,151✔
288
                // Validate proxy configuration
1,391✔
289
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,397✔
290
                        return err
6✔
291
                }
6✔
292

293
                if len(rcfg.URLs) >= 2 {
1,581✔
294
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
196✔
295
                        for i := 1; i < len(rcfg.URLs); i++ {
601✔
296
                                u := rcfg.URLs[i]
405✔
297
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
412✔
298
                                        ok = false
7✔
299
                                        break
7✔
300
                                }
301
                        }
302
                        if !ok {
203✔
303
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
304
                        }
7✔
305
                }
306
                // Validate compression settings
307
                if rcfg.Compression.Mode != _EMPTY_ {
2,752✔
308
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,379✔
309
                                return err
5✔
310
                        }
5✔
311
                }
312
        }
313

314
        if o.LeafNode.Port == 0 {
11,784✔
315
                return nil
4,042✔
316
        }
4,042✔
317

318
        // If MinVersion is defined, check that it is valid.
319
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,704✔
320
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
321
                        return err
2✔
322
                }
2✔
323
        }
324

325
        // The checks below will be done only when detecting that we are configured
326
        // with gateways. So if an option validation needs to be done regardless,
327
        // it MUST be done before this point!
328

329
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,720✔
330
                return nil
3,022✔
331
        }
3,022✔
332
        // If we are here we have both leaf nodes and gateways defined, make sure there
333
        // is a system account defined.
334
        if o.SystemAccount == _EMPTY_ {
677✔
335
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
336
        }
1✔
337
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
675✔
338
                return fmt.Errorf("leafnode: %v", err)
×
339
        }
×
340
        return nil
675✔
341
}
342

343
func checkLeafMinVersionConfig(mv string) error {
8✔
344
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
345
                if err != nil {
6✔
346
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
347
                } else {
4✔
348
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
349
                }
2✔
350
        }
351
        return nil
4✔
352
}
353

354
// Used to validate user names in LeafNode configuration.
355
// - rejects mix of single and multiple users.
356
// - rejects duplicate user names.
357
func validateLeafNodeAuthOptions(o *Options) error {
7,819✔
358
        if len(o.LeafNode.Users) == 0 {
15,612✔
359
                return nil
7,793✔
360
        }
7,793✔
361
        if o.LeafNode.Username != _EMPTY_ {
28✔
362
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
363
        }
2✔
364
        if o.LeafNode.Nkey != _EMPTY_ {
24✔
365
                return fmt.Errorf("can not have a single nkey and a users array")
×
366
        }
×
367
        users := map[string]struct{}{}
24✔
368
        for _, u := range o.LeafNode.Users {
62✔
369
                if _, exists := users[u.Username]; exists {
40✔
370
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
371
                }
2✔
372
                users[u.Username] = struct{}{}
36✔
373
        }
374
        return nil
22✔
375
}
376

377
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
1,962✔
378
        var warnings []string
1,962✔
379

1,962✔
380
        if remote.Proxy.URL == _EMPTY_ {
3,898✔
381
                return warnings, nil
1,936✔
382
        }
1,936✔
383

384
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
385
        if err != nil {
27✔
386
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
387
        }
1✔
388

389
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
390
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
391
        }
2✔
392

393
        if proxyURL.Host == _EMPTY_ {
25✔
394
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
395
        }
2✔
396

397
        if remote.Proxy.Timeout < 0 {
22✔
398
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
399
        }
1✔
400

401
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
402
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
403
        }
4✔
404

405
        if len(remote.URLs) > 0 {
32✔
406
                hasWebSocketURL := false
16✔
407
                hasNonWebSocketURL := false
16✔
408

16✔
409
                for _, remoteURL := range remote.URLs {
33✔
410
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
411
                                hasWebSocketURL = true
13✔
412
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
413
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
414
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
415
                                }
1✔
416
                        } else {
4✔
417
                                hasNonWebSocketURL = true
4✔
418
                        }
4✔
419
                }
420

421
                if !hasWebSocketURL {
18✔
422
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
423
                } else if hasNonWebSocketURL {
16✔
424
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
425
                }
1✔
426
        }
427

428
        return warnings, nil
15✔
429
}
430

431
// Update remote LeafNode TLS configurations after a config reload.
432
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
8✔
433
        max := len(opts.LeafNode.Remotes)
8✔
434
        if max == 0 {
8✔
435
                return
×
436
        }
×
437

438
        s.mu.RLock()
8✔
439
        defer s.mu.RUnlock()
8✔
440

8✔
441
        // Changes in the list of remote leaf nodes is not supported.
8✔
442
        // However, make sure that we don't go over the arrays.
8✔
443
        if len(s.leafRemoteCfgs) < max {
8✔
444
                max = len(s.leafRemoteCfgs)
×
445
        }
×
446
        for i := 0; i < max; i++ {
21✔
447
                ro := opts.LeafNode.Remotes[i]
13✔
448
                cfg := s.leafRemoteCfgs[i]
13✔
449
                if ro.TLSConfig != nil {
15✔
450
                        cfg.Lock()
2✔
451
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
452
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
453
                        cfg.Unlock()
2✔
454
                }
2✔
455
        }
456
}
457

458
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
235✔
459
        delay := s.getOpts().LeafNode.ReconnectInterval
235✔
460
        select {
235✔
461
        case <-time.After(delay):
179✔
462
        case <-s.quitCh:
56✔
463
                s.grWG.Done()
56✔
464
                return
56✔
465
        }
466
        s.connectToRemoteLeafNode(remote, false)
179✔
467
}
468

469
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
470
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,348✔
471
        cfg := &leafNodeCfg{
1,348✔
472
                RemoteLeafOpts: remote,
1,348✔
473
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,348✔
474
        }
1,348✔
475
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,356✔
476
                perms := &Permissions{}
8✔
477
                if len(remote.DenyExports) > 0 {
16✔
478
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
479
                }
8✔
480
                if len(remote.DenyImports) > 0 {
15✔
481
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
482
                }
7✔
483
                cfg.perms = perms
8✔
484
        }
485
        // Start with the one that is configured. We will add to this
486
        // array when receiving async leafnode INFOs.
487
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,348✔
488
        // If allowed to randomize, do it on our copy of URLs
1,348✔
489
        if !remote.NoRandomize {
2,695✔
490
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,729✔
491
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
382✔
492
                })
382✔
493
        }
494
        // If we are TLS make sure we save off a proper servername if possible.
495
        // Do same for user/password since we may need them to connect to
496
        // a bare URL that we get from INFO protocol.
497
        for _, u := range cfg.urls {
3,093✔
498
                cfg.saveTLSHostname(u)
1,745✔
499
                cfg.saveUserPassword(u)
1,745✔
500
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,745✔
501
                // config, mark that we should be using TLS anyway.
1,745✔
502
                if !cfg.TLS && isWSSURL(u) {
1,746✔
503
                        cfg.TLS = true
1✔
504
                }
1✔
505
        }
506
        return cfg
1,348✔
507
}
508

509
// Will pick an URL from the list of available URLs.
510
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,051✔
511
        cfg.Lock()
6,051✔
512
        defer cfg.Unlock()
6,051✔
513
        // If the current URL is the first in the list and we have more than
6,051✔
514
        // one URL, then move that one to end of the list.
6,051✔
515
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
8,650✔
516
                first := cfg.urls[0]
2,599✔
517
                copy(cfg.urls, cfg.urls[1:])
2,599✔
518
                cfg.urls[len(cfg.urls)-1] = first
2,599✔
519
        }
2,599✔
520
        cfg.curURL = cfg.urls[0]
6,051✔
521
        return cfg.curURL
6,051✔
522
}
523

524
// Returns the current URL
525
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
77✔
526
        cfg.RLock()
77✔
527
        defer cfg.RUnlock()
77✔
528
        return cfg.curURL
77✔
529
}
77✔
530

531
// Returns how long the server should wait before attempting
532
// to solicit a remote leafnode connection.
533
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,528✔
534
        cfg.RLock()
1,528✔
535
        delay := cfg.connDelay
1,528✔
536
        cfg.RUnlock()
1,528✔
537
        return delay
1,528✔
538
}
1,528✔
539

540
// Sets the connect delay.
541
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
147✔
542
        cfg.Lock()
147✔
543
        cfg.connDelay = delay
147✔
544
        cfg.Unlock()
147✔
545
}
147✔
546

547
// Ensure that non-exported options (used in tests) have
548
// been properly set.
549
func (s *Server) setLeafNodeNonExportedOptions() {
6,578✔
550
        opts := s.getOpts()
6,578✔
551
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,578✔
552
        if s.leafNodeOpts.dialTimeout == 0 {
13,155✔
553
                // Use same timeouts as routes for now.
6,577✔
554
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,577✔
555
        }
6,577✔
556
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,578✔
557
        if s.leafNodeOpts.resolver == nil {
13,153✔
558
                s.leafNodeOpts.resolver = net.DefaultResolver
6,575✔
559
        }
6,575✔
560
}
561

562
const sharedSysAccDelay = 250 * time.Millisecond
563

564
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
565
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
566
        proxyAddr, err := url.Parse(proxyURL)
11✔
567
        if err != nil {
11✔
568
                // This should not happen since proxy URL is validated during configuration parsing
×
569
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
570
        }
×
571

572
        // Connect to the proxy server
573
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
574
        if err != nil {
11✔
575
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
576
        }
×
577

578
        // Set deadline for the entire proxy handshake
579
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
580
                conn.Close()
×
581
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
582
        }
×
583

584
        req := &http.Request{
11✔
585
                Method: http.MethodConnect,
11✔
586
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
587
                Host:   targetHost,
11✔
588
                Header: make(http.Header),
11✔
589
        }
11✔
590

11✔
591
        // Add proxy authentication if provided
11✔
592
        if username != "" && password != "" {
13✔
593
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
594
        }
2✔
595

596
        if err := req.Write(conn); err != nil {
11✔
597
                conn.Close()
×
598
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
599
        }
×
600

601
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
602
        if err != nil {
11✔
603
                conn.Close()
×
604
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
605
        }
×
606

607
        if resp.StatusCode != http.StatusOK {
12✔
608
                resp.Body.Close()
1✔
609
                conn.Close()
1✔
610
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
611
        }
1✔
612

613
        // Close the response body
614
        resp.Body.Close()
10✔
615

10✔
616
        // Clear the deadline now that we've finished the proxy handshake
10✔
617
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
618
                conn.Close()
×
619
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
620
        }
×
621

622
        return conn, nil
10✔
623
}
624

625
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
1,528✔
626
        defer s.grWG.Done()
1,528✔
627

1,528✔
628
        if remote == nil || len(remote.URLs) == 0 {
1,528✔
629
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
630
                return
×
631
        }
×
632

633
        opts := s.getOpts()
1,528✔
634
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,528✔
635
        s.mu.RLock()
1,528✔
636
        dialTimeout := s.leafNodeOpts.dialTimeout
1,528✔
637
        resolver := s.leafNodeOpts.resolver
1,528✔
638
        var isSysAcc bool
1,528✔
639
        if s.eventsEnabled() {
3,023✔
640
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,495✔
641
        }
1,495✔
642
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,528✔
643
        s.mu.RUnlock()
1,528✔
644

1,528✔
645
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,528✔
646
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,595✔
647
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
67✔
648
                remote.setConnectDelay(sharedSysAccDelay)
67✔
649
        }
67✔
650

651
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,602✔
652
                select {
74✔
653
                case <-time.After(connDelay):
69✔
654
                case <-s.quitCh:
5✔
655
                        return
5✔
656
                }
657
                remote.setConnectDelay(0)
69✔
658
        }
659

660
        var conn net.Conn
1,523✔
661

1,523✔
662
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,523✔
663

1,523✔
664
        // Capture proxy configuration once before the loop with proper locking
1,523✔
665
        remote.RLock()
1,523✔
666
        proxyURL := remote.Proxy.URL
1,523✔
667
        proxyUsername := remote.Proxy.Username
1,523✔
668
        proxyPassword := remote.Proxy.Password
1,523✔
669
        proxyTimeout := remote.Proxy.Timeout
1,523✔
670
        remote.RUnlock()
1,523✔
671

1,523✔
672
        // Set default proxy timeout if not specified
1,523✔
673
        if proxyTimeout == 0 {
3,038✔
674
                proxyTimeout = dialTimeout
1,515✔
675
        }
1,515✔
676

677
        attempts := 0
1,523✔
678

1,523✔
679
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
7,574✔
680
                rURL := remote.pickNextURL()
6,051✔
681
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,051✔
682
                if err == nil {
12,097✔
683
                        var ipStr string
6,046✔
684
                        if url != rURL.Host {
6,116✔
685
                                ipStr = fmt.Sprintf(" (%s)", url)
70✔
686
                        }
70✔
687
                        // Some test may want to disable remotes from connecting
688
                        if s.isLeafConnectDisabled() {
6,171✔
689
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
125✔
690
                                err = ErrLeafNodeDisabled
125✔
691
                        } else {
6,046✔
692
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
5,921✔
693

5,921✔
694
                                // Check if proxy is configured
5,921✔
695
                                if proxyURL != _EMPTY_ {
5,929✔
696
                                        targetHost := rURL.Host
8✔
697
                                        // If URL doesn't include port, add the default port for the scheme
8✔
698
                                        if rURL.Port() == _EMPTY_ {
8✔
699
                                                defaultPort := "80"
×
700
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
701
                                                        defaultPort = "443"
×
702
                                                }
×
703
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
704
                                        }
705

706
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
707
                                } else {
5,913✔
708
                                        // Direct connection
5,913✔
709
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
5,913✔
710
                                }
5,913✔
711
                        }
712
                }
713
                if err != nil {
11,315✔
714
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,264✔
715
                        delay := reconnectDelay + jitter
5,264✔
716
                        attempts++
5,264✔
717
                        if s.shouldReportConnectErr(firstConnect, attempts) {
8,615✔
718
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,351✔
719
                        } else {
5,264✔
720
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,913✔
721
                        }
1,913✔
722
                        remote.Lock()
5,264✔
723
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,264✔
724
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,272✔
725
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
726
                                        s.checkJetStreamMigrate(remote)
8✔
727
                                })
8✔
728
                        }
729
                        remote.Unlock()
5,264✔
730
                        select {
5,264✔
731
                        case <-s.quitCh:
728✔
732
                                remote.cancelMigrateTimer()
728✔
733
                                return
728✔
734
                        case <-time.After(delay):
4,535✔
735
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
4,535✔
736
                                // This will be used if JetStreamClusterMigrateDelay was not set
4,535✔
737
                                if jetstreamMigrateDelay == 0 {
9,003✔
738
                                        s.checkJetStreamMigrate(remote)
4,468✔
739
                                }
4,468✔
740
                                continue
4,535✔
741
                        }
742
                }
743
                remote.cancelMigrateTimer()
787✔
744
                if !s.remoteLeafNodeStillValid(remote) {
787✔
745
                        conn.Close()
×
746
                        return
×
747
                }
×
748

749
                // We have a connection here to a remote server.
750
                // Go ahead and create our leaf node and return.
751
                s.createLeafNode(conn, rURL, remote, nil)
787✔
752

787✔
753
                // Clear any observer states if we had them.
787✔
754
                s.clearObserverState(remote)
787✔
755

787✔
756
                return
787✔
757
        }
758
}
759

760
func (cfg *leafNodeCfg) cancelMigrateTimer() {
1,515✔
761
        cfg.Lock()
1,515✔
762
        stopAndClearTimer(&cfg.jsMigrateTimer)
1,515✔
763
        cfg.Unlock()
1,515✔
764
}
1,515✔
765

766
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
767
func (s *Server) clearObserverState(remote *leafNodeCfg) {
787✔
768
        s.mu.RLock()
787✔
769
        accName := remote.LocalAccount
787✔
770
        s.mu.RUnlock()
787✔
771

787✔
772
        acc, err := s.LookupAccount(accName)
787✔
773
        if err != nil {
789✔
774
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
775
                return
2✔
776
        }
2✔
777

778
        acc.jscmMu.Lock()
785✔
779
        defer acc.jscmMu.Unlock()
785✔
780

785✔
781
        // Walk all streams looking for any clustered stream, skip otherwise.
785✔
782
        for _, mset := range acc.streams() {
803✔
783
                node := mset.raftNode()
18✔
784
                if node == nil {
28✔
785
                        // Not R>1
10✔
786
                        continue
10✔
787
                }
788
                // Check consumers
789
                for _, o := range mset.getConsumers() {
10✔
790
                        if n := o.raftNode(); n != nil {
4✔
791
                                // Ensure we can become a leader again.
2✔
792
                                n.SetObserver(false)
2✔
793
                        }
2✔
794
                }
795
                // Ensure we can not become a leader again.
796
                node.SetObserver(false)
8✔
797
        }
798
}
799

800
// Check to see if we should migrate any assets from this account.
801
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,476✔
802
        s.mu.RLock()
4,476✔
803
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,476✔
804
        s.mu.RUnlock()
4,476✔
805

4,476✔
806
        if !shouldMigrate {
8,886✔
807
                return
4,410✔
808
        }
4,410✔
809

810
        acc, err := s.LookupAccount(accName)
66✔
811
        if err != nil {
66✔
812
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
813
                return
×
814
        }
×
815

816
        acc.jscmMu.Lock()
66✔
817
        defer acc.jscmMu.Unlock()
66✔
818

66✔
819
        // Walk all streams looking for any clustered stream, skip otherwise.
66✔
820
        // If we are the leader force stepdown.
66✔
821
        for _, mset := range acc.streams() {
98✔
822
                node := mset.raftNode()
32✔
823
                if node == nil {
32✔
824
                        // Not R>1
×
825
                        continue
×
826
                }
827
                // Collect any consumers
828
                for _, o := range mset.getConsumers() {
52✔
829
                        if n := o.raftNode(); n != nil {
40✔
830
                                n.StepDown()
20✔
831
                                // Ensure we can not become a leader while in this state.
20✔
832
                                n.SetObserver(true)
20✔
833
                        }
20✔
834
                }
835
                // Stepdown if this stream was leader.
836
                node.StepDown()
32✔
837
                // Ensure we can not become a leader while in this state.
32✔
838
                node.SetObserver(true)
32✔
839
        }
840
}
841

842
// Helper for checking.
843
func (s *Server) isLeafConnectDisabled() bool {
6,046✔
844
        s.mu.RLock()
6,046✔
845
        defer s.mu.RUnlock()
6,046✔
846
        return s.leafDisableConnect
6,046✔
847
}
6,046✔
848

849
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
850
// come from the server we connect to.
851
//
852
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
853
// However, this was causing failures for users that did not set the scheme (and
854
// their remote connections did not have a tls{} block).
855
// We now save the host name regardless in case the remote returns an INFO indicating
856
// that TLS is required.
857
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,369✔
858
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,387✔
859
                cfg.tlsName = u.Hostname()
18✔
860
        }
18✔
861
}
862

863
// Save off the username/password for when we connect using a bare URL
864
// that we get from the INFO protocol.
865
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,745✔
866
        if cfg.username == _EMPTY_ && u.User != nil {
2,038✔
867
                cfg.username = u.User.Username()
293✔
868
                cfg.password, _ = u.User.Password()
293✔
869
        }
293✔
870
}
871

872
// This starts the leafnode accept loop in a go routine, unless it
873
// is detected that the server has already been shutdown.
874
func (s *Server) startLeafNodeAcceptLoop() {
3,679✔
875
        // Snapshot server options.
3,679✔
876
        opts := s.getOpts()
3,679✔
877

3,679✔
878
        port := opts.LeafNode.Port
3,679✔
879
        if port == -1 {
7,183✔
880
                port = 0
3,504✔
881
        }
3,504✔
882

883
        if s.isShuttingDown() {
3,679✔
884
                return
×
885
        }
×
886

887
        s.mu.Lock()
3,679✔
888
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,679✔
889
        l, e := natsListen("tcp", hp)
3,679✔
890
        s.leafNodeListenerErr = e
3,679✔
891
        if e != nil {
3,679✔
892
                s.mu.Unlock()
×
893
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
894
                return
×
895
        }
×
896

897
        s.Noticef("Listening for leafnode connections on %s",
3,679✔
898
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,679✔
899

3,679✔
900
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,679✔
901
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,679✔
902
        // Do not set compression in this Info object, it would possibly cause
3,679✔
903
        // issues when sending asynchronous INFO to the remote.
3,679✔
904
        info := Info{
3,679✔
905
                ID:            s.info.ID,
3,679✔
906
                Name:          s.info.Name,
3,679✔
907
                Version:       s.info.Version,
3,679✔
908
                GitCommit:     gitCommit,
3,679✔
909
                GoVersion:     runtime.Version(),
3,679✔
910
                AuthRequired:  true,
3,679✔
911
                TLSRequired:   tlsRequired,
3,679✔
912
                TLSVerify:     tlsVerify,
3,679✔
913
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,679✔
914
                Headers:       s.supportsHeaders(),
3,679✔
915
                JetStream:     opts.JetStream,
3,679✔
916
                Domain:        opts.JetStreamDomain,
3,679✔
917
                Proto:         s.getServerProto(),
3,679✔
918
                InfoOnConnect: true,
3,679✔
919
                JSApiLevel:    JSApiLevel,
3,679✔
920
        }
3,679✔
921
        // If we have selected a random port...
3,679✔
922
        if port == 0 {
7,183✔
923
                // Write resolved port back to options.
3,504✔
924
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,504✔
925
        }
3,504✔
926

927
        s.leafNodeInfo = info
3,679✔
928
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,679✔
929
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,679✔
930
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
931
                l.Close()
×
932
                s.mu.Unlock()
×
933
                return
×
934
        }
×
935
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,679✔
936
        s.generateLeafNodeInfoJSON()
3,679✔
937

3,679✔
938
        // Setup state that can enable shutdown
3,679✔
939
        s.leafNodeListener = l
3,679✔
940

3,679✔
941
        // As of now, a server that does not have remotes configured would
3,679✔
942
        // never solicit a connection, so we should not have to warn if
3,679✔
943
        // InsecureSkipVerify is set in main LeafNodes config (since
3,679✔
944
        // this TLS setting matters only when soliciting a connection).
3,679✔
945
        // Still, warn if insecure is set in any of LeafNode block.
3,679✔
946
        // We need to check remotes, even if tls is not required on accept.
3,679✔
947
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,679✔
948
        if !warn {
7,356✔
949
                for _, r := range opts.LeafNode.Remotes {
3,868✔
950
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
191✔
951
                                warn = true
×
952
                                break
×
953
                        }
954
                }
955
        }
956
        if warn {
3,681✔
957
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
958
        }
2✔
959
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,510✔
960
        s.mu.Unlock()
3,679✔
961
}
962

963
// RegEx to match a creds file with user JWT and Seed.
964
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
965

966
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
967
// Lock should be held entering here.
968
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
664✔
969
        // We support basic user/pass and operator based user JWT with signatures.
664✔
970
        cinfo := leafConnectInfo{
664✔
971
                Version:       VERSION,
664✔
972
                ID:            c.srv.info.ID,
664✔
973
                Domain:        c.srv.info.Domain,
664✔
974
                Name:          c.srv.info.Name,
664✔
975
                Hub:           c.leaf.remote.Hub,
664✔
976
                Cluster:       clusterName,
664✔
977
                Headers:       headers,
664✔
978
                JetStream:     c.acc.jetStreamConfigured(),
664✔
979
                DenyPub:       c.leaf.remote.DenyImports,
664✔
980
                Compression:   c.leaf.compression,
664✔
981
                RemoteAccount: c.acc.GetName(),
664✔
982
                Proto:         c.srv.getServerProto(),
664✔
983
                Isolate:       c.leaf.remote.RequestIsolation,
664✔
984
        }
664✔
985

664✔
986
        // If a signature callback is specified, this takes precedence over anything else.
664✔
987
        if cb := c.leaf.remote.SignatureCB; cb != nil {
669✔
988
                nonce := c.nonce
5✔
989
                c.mu.Unlock()
5✔
990
                jwt, sigraw, err := cb(nonce)
5✔
991
                c.mu.Lock()
5✔
992
                if err == nil && c.isClosed() {
6✔
993
                        err = ErrConnectionClosed
1✔
994
                }
1✔
995
                if err != nil {
7✔
996
                        c.Errorf("Error signing the nonce: %v", err)
2✔
997
                        return err
2✔
998
                }
2✔
999
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1000
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
1001

1002
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
715✔
1003
                // Check for credentials first, that will take precedence..
56✔
1004
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1005
                contents, err := os.ReadFile(creds)
56✔
1006
                if err != nil {
56✔
1007
                        c.Errorf("%v", err)
×
1008
                        return err
×
1009
                }
×
1010
                defer wipeSlice(contents)
56✔
1011
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1012
                if len(items) < 2 {
56✔
1013
                        c.Errorf("Credentials file malformed")
×
1014
                        return err
×
1015
                }
×
1016
                // First result should be the user JWT.
1017
                // We copy here so that the file containing the seed will be wiped appropriately.
1018
                raw := items[0][1]
56✔
1019
                tmp := make([]byte, len(raw))
56✔
1020
                copy(tmp, raw)
56✔
1021
                // Seed is second item.
56✔
1022
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1023
                if err != nil {
56✔
1024
                        c.Errorf("Credentials file has malformed seed")
×
1025
                        return err
×
1026
                }
×
1027
                // Wipe our key on exit.
1028
                defer kp.Wipe()
56✔
1029

56✔
1030
                sigraw, _ := kp.Sign(c.nonce)
56✔
1031
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1032
                cinfo.JWT = bytesToString(tmp)
56✔
1033
                cinfo.Sig = sig
56✔
1034
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
608✔
1035
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1036
                if err != nil {
5✔
1037
                        c.Errorf("Remote nkey has malformed seed")
×
1038
                        return err
×
1039
                }
×
1040
                // Wipe our key on exit.
1041
                defer kp.Wipe()
5✔
1042
                sigraw, _ := kp.Sign(c.nonce)
5✔
1043
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1044
                pkey, _ := kp.PublicKey()
5✔
1045
                cinfo.Nkey = pkey
5✔
1046
                cinfo.Sig = sig
5✔
1047
        }
1048
        // In addition, and this is to allow auth callout, set user/password or
1049
        // token if applicable.
1050
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
979✔
1051
                cinfo.User = userInfo.Username()
317✔
1052
                var ok bool
317✔
1053
                cinfo.Pass, ok = userInfo.Password()
317✔
1054
                // For backward compatibility, if only username is provided, set both
317✔
1055
                // Token and User, not just Token.
317✔
1056
                if !ok {
326✔
1057
                        cinfo.Token = cinfo.User
9✔
1058
                }
9✔
1059
        } else if c.leaf.remote.username != _EMPTY_ {
352✔
1060
                cinfo.User = c.leaf.remote.username
7✔
1061
                cinfo.Pass = c.leaf.remote.password
7✔
1062
                // For backward compatibility, if only username is provided, set both
7✔
1063
                // Token and User, not just Token.
7✔
1064
                if cinfo.Pass == _EMPTY_ {
7✔
1065
                        cinfo.Token = cinfo.User
×
1066
                }
×
1067
        }
1068
        b, err := json.Marshal(cinfo)
662✔
1069
        if err != nil {
662✔
1070
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1071
                return err
×
1072
        }
×
1073
        // Although this call is made before the writeLoop is created,
1074
        // we don't really need to send in place. The protocol will be
1075
        // sent out by the writeLoop.
1076
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
662✔
1077
        return nil
662✔
1078
}
1079

1080
// Makes a deep copy of the LeafNode Info structure.
1081
// The server lock is held on entry.
1082
func (s *Server) copyLeafNodeInfo() *Info {
2,681✔
1083
        clone := s.leafNodeInfo
2,681✔
1084
        // Copy the array of urls.
2,681✔
1085
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,864✔
1086
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,183✔
1087
        }
2,183✔
1088
        return &clone
2,681✔
1089
}
1090

1091
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1092
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1093
// Returns a boolean indicating if the URL was added or not.
1094
// Server lock is held on entry
1095
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,444✔
1096
        if s.leafURLsMap.addUrl(urlStr) {
14,883✔
1097
                s.generateLeafNodeInfoJSON()
7,439✔
1098
                return true
7,439✔
1099
        }
7,439✔
1100
        return false
5✔
1101
}
1102

1103
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1104
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1105
// Returns a boolean indicating if the URL was removed or not.
1106
// Server lock is held on entry.
1107
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,444✔
1108
        // Don't need to do this if we are removing the route connection because
7,444✔
1109
        // we are shuting down...
7,444✔
1110
        if s.isShuttingDown() {
11,262✔
1111
                return false
3,818✔
1112
        }
3,818✔
1113
        if s.leafURLsMap.removeUrl(urlStr) {
7,248✔
1114
                s.generateLeafNodeInfoJSON()
3,622✔
1115
                return true
3,622✔
1116
        }
3,622✔
1117
        return false
4✔
1118
}
1119

1120
// Server lock is held on entry
1121
func (s *Server) generateLeafNodeInfoJSON() {
14,740✔
1122
        s.leafNodeInfo.Cluster = s.cachedClusterName()
14,740✔
1123
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
14,740✔
1124
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
14,740✔
1125
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
14,740✔
1126
}
14,740✔
1127

1128
// Sends an async INFO protocol so that the connected servers can update
1129
// their list of LeafNode urls.
1130
func (s *Server) sendAsyncLeafNodeInfo() {
11,061✔
1131
        for _, c := range s.leafs {
11,160✔
1132
                c.mu.Lock()
99✔
1133
                c.enqueueProto(s.leafNodeInfoJSON)
99✔
1134
                c.mu.Unlock()
99✔
1135
        }
99✔
1136
}
1137

1138
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1139
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,647✔
1140
        // Snapshot server options.
1,647✔
1141
        opts := s.getOpts()
1,647✔
1142

1,647✔
1143
        maxPay := int32(opts.MaxPayload)
1,647✔
1144
        maxSubs := int32(opts.MaxSubs)
1,647✔
1145
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,647✔
1146
        if maxSubs == 0 {
3,293✔
1147
                maxSubs = -1
1,646✔
1148
        }
1,646✔
1149
        now := time.Now().UTC()
1,647✔
1150

1,647✔
1151
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,647✔
1152
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,647✔
1153
        c.leaf = &leaf{}
1,647✔
1154

1,647✔
1155
        // If the leafnode subject interest should be isolated, flag it here.
1,647✔
1156
        s.optsMu.RLock()
1,647✔
1157
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,432✔
1158
                c.leaf.isolated = remote.LocalIsolation
785✔
1159
        }
785✔
1160
        s.optsMu.RUnlock()
1,647✔
1161

1,647✔
1162
        // For accepted LN connections, ws will be != nil if it was accepted
1,647✔
1163
        // through the Websocket port.
1,647✔
1164
        c.ws = ws
1,647✔
1165

1,647✔
1166
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,647✔
1167
        // a remote Leaf Node connection as a websocket connection.
1,647✔
1168
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,697✔
1169
                remote.RLock()
50✔
1170
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1171
                remote.RUnlock()
50✔
1172
        }
50✔
1173

1174
        // Determines if we are soliciting the connection or not.
1175
        var solicited bool
1,647✔
1176
        var acc *Account
1,647✔
1177
        var remoteSuffix string
1,647✔
1178
        if remote != nil {
2,434✔
1179
                // For now, if lookup fails, we will constantly try
787✔
1180
                // to recreate this LN connection.
787✔
1181
                lacc := remote.LocalAccount
787✔
1182
                var err error
787✔
1183
                acc, err = s.LookupAccount(lacc)
787✔
1184
                if err != nil {
789✔
1185
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1186
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1187
                        // remote needs to be set or retry won't happen
2✔
1188
                        c.leaf.remote = remote
2✔
1189
                        c.closeConnection(MissingAccount)
2✔
1190
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1191
                        return nil
2✔
1192
                }
2✔
1193
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
785✔
1194
        }
1195

1196
        c.mu.Lock()
1,645✔
1197
        c.initClient()
1,645✔
1198
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,645✔
1199

1,645✔
1200
        var (
1,645✔
1201
                tlsFirst         bool
1,645✔
1202
                tlsFirstFallback time.Duration
1,645✔
1203
                infoTimeout      time.Duration
1,645✔
1204
        )
1,645✔
1205
        if remote != nil {
2,430✔
1206
                solicited = true
785✔
1207
                remote.Lock()
785✔
1208
                c.leaf.remote = remote
785✔
1209
                c.setPermissions(remote.perms)
785✔
1210
                if !c.leaf.remote.Hub {
1,554✔
1211
                        c.leaf.isSpoke = true
769✔
1212
                }
769✔
1213
                tlsFirst = remote.TLSHandshakeFirst
785✔
1214
                infoTimeout = remote.FirstInfoTimeout
785✔
1215
                remote.Unlock()
785✔
1216
                c.acc = acc
785✔
1217
        } else {
860✔
1218
                c.flags.set(expectConnect)
860✔
1219
                if ws != nil {
889✔
1220
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1221
                }
29✔
1222
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
860✔
1223
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
861✔
1224
                        tlsFirstFallback = f
1✔
1225
                }
1✔
1226
        }
1227
        c.mu.Unlock()
1,645✔
1228

1,645✔
1229
        var nonce [nonceLen]byte
1,645✔
1230
        var info *Info
1,645✔
1231

1,645✔
1232
        // Grab this before the client lock below.
1,645✔
1233
        if !solicited {
2,505✔
1234
                // Grab server variables
860✔
1235
                s.mu.Lock()
860✔
1236
                info = s.copyLeafNodeInfo()
860✔
1237
                // For tests that want to simulate old servers, do not set the compression
860✔
1238
                // on the INFO protocol if configured with CompressionNotSupported.
860✔
1239
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,719✔
1240
                        info.Compression = cm
859✔
1241
                }
859✔
1242
                // We always send a nonce for LEAF connections. Do not change that without
1243
                // taking into account presence of proxy trusted keys.
1244
                s.generateNonce(nonce[:])
860✔
1245
                s.mu.Unlock()
860✔
1246
        }
1247

1248
        // Grab lock
1249
        c.mu.Lock()
1,645✔
1250

1,645✔
1251
        var preBuf []byte
1,645✔
1252
        if solicited {
2,430✔
1253
                // For websocket connection, we need to send an HTTP request,
785✔
1254
                // and get the response before starting the readLoop to get
785✔
1255
                // the INFO, etc..
785✔
1256
                if c.isWebsocket() {
835✔
1257
                        var err error
50✔
1258
                        var closeReason ClosedState
50✔
1259

50✔
1260
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1261
                        if err != nil {
71✔
1262
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1263
                                c.mu.Unlock()
21✔
1264
                                if closeReason != 0 {
38✔
1265
                                        c.closeConnection(closeReason)
17✔
1266
                                }
17✔
1267
                                return nil
21✔
1268
                        }
1269
                } else {
735✔
1270
                        // If configured to do TLS handshake first
735✔
1271
                        if tlsFirst {
739✔
1272
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1273
                                        c.mu.Unlock()
1✔
1274
                                        return nil
1✔
1275
                                }
1✔
1276
                        }
1277
                        // We need to wait for the info, but not for too long.
1278
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
734✔
1279
                }
1280

1281
                // We will process the INFO from the readloop and finish by
1282
                // sending the CONNECT and finish registration later.
1283
        } else {
860✔
1284
                // Send our info to the other side.
860✔
1285
                // Remember the nonce we sent here for signatures, etc.
860✔
1286
                c.nonce = make([]byte, nonceLen)
860✔
1287
                copy(c.nonce, nonce[:])
860✔
1288
                info.Nonce = bytesToString(c.nonce)
860✔
1289
                info.CID = c.cid
860✔
1290
                proto := generateInfoJSON(info)
860✔
1291

860✔
1292
                var pre []byte
860✔
1293
                // We need first to check for "TLS First" fallback delay.
860✔
1294
                if tlsFirstFallback > 0 {
861✔
1295
                        // We wait and see if we are getting any data. Since we did not send
1✔
1296
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1297
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1298
                        // if it is a rogue agent and not an actual client performing the
1✔
1299
                        // TLS handshake, the error will be detected when performing the
1✔
1300
                        // handshake on our side.
1✔
1301
                        pre = make([]byte, 4)
1✔
1302
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1303
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1304
                        c.nc.SetReadDeadline(time.Time{})
1✔
1305
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1306
                        // with the TLS handshake.
1✔
1307
                        if n > 0 {
1✔
1308
                                pre = pre[:n]
×
1309
                        } else {
1✔
1310
                                // We did not get anything so we will send the INFO protocol.
1✔
1311
                                pre = nil
1✔
1312
                                // Set the boolean to false for the rest of the function.
1✔
1313
                                tlsFirst = false
1✔
1314
                        }
1✔
1315
                }
1316

1317
                if !tlsFirst {
1,715✔
1318
                        // We have to send from this go routine because we may
855✔
1319
                        // have to block for TLS handshake before we start our
855✔
1320
                        // writeLoop go routine. The other side needs to receive
855✔
1321
                        // this before it can initiate the TLS handshake..
855✔
1322
                        c.sendProtoNow(proto)
855✔
1323

855✔
1324
                        // The above call could have marked the connection as closed (due to TCP error).
855✔
1325
                        if c.isClosed() {
855✔
1326
                                c.mu.Unlock()
×
1327
                                c.closeConnection(WriteError)
×
1328
                                return nil
×
1329
                        }
×
1330
                }
1331

1332
                // Check to see if we need to spin up TLS.
1333
                if !c.isWebsocket() && info.TLSRequired {
935✔
1334
                        // If we have a prebuffer create a multi-reader.
75✔
1335
                        if len(pre) > 0 {
75✔
1336
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1337
                        }
×
1338
                        // Perform server-side TLS handshake.
1339
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
124✔
1340
                                c.mu.Unlock()
49✔
1341
                                return nil
49✔
1342
                        }
49✔
1343
                }
1344

1345
                // If the user wants the TLS handshake to occur first, now that it is
1346
                // done, send the INFO protocol.
1347
                if tlsFirst {
814✔
1348
                        c.flags.set(didTLSFirst)
3✔
1349
                        c.sendProtoNow(proto)
3✔
1350
                        if c.isClosed() {
3✔
1351
                                c.mu.Unlock()
×
1352
                                c.closeConnection(WriteError)
×
1353
                                return nil
×
1354
                        }
×
1355
                }
1356

1357
                // Leaf nodes will always require a CONNECT to let us know
1358
                // when we are properly bound to an account.
1359
                //
1360
                // If compression is configured, we can't set the authTimer here because
1361
                // it would cause the parser to fail any incoming protocol that is not a
1362
                // CONNECT (and we need to exchange INFO protocols for compression
1363
                // negotiation). So instead, use the ping timer until we are done with
1364
                // negotiation and can set the auth timer.
1365
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
811✔
1366
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,406✔
1367
                        c.ping.tmr = time.AfterFunc(timeout, func() {
603✔
1368
                                c.authTimeout()
8✔
1369
                        })
8✔
1370
                } else {
216✔
1371
                        c.setAuthTimer(timeout)
216✔
1372
                }
216✔
1373
        }
1374

1375
        // Keep track in case server is shutdown before we can successfully register.
1376
        if !s.addToTempClients(c.cid, c) {
1,575✔
1377
                c.mu.Unlock()
1✔
1378
                c.setNoReconnect()
1✔
1379
                c.closeConnection(ServerShutdown)
1✔
1380
                return nil
1✔
1381
        }
1✔
1382

1383
        // Spin up the read loop.
1384
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,146✔
1385

1386
        // We will spin the write loop for solicited connections only
1387
        // when processing the INFO and after switching to TLS if needed.
1388
        if !solicited {
2,384✔
1389
                s.startGoRoutine(func() { c.writeLoop() })
1,622✔
1390
        }
1391

1392
        c.mu.Unlock()
1,573✔
1393

1,573✔
1394
        return c
1,573✔
1395
}
1396

1397
// Will perform the client-side TLS handshake if needed. Assumes that this
1398
// is called by the solicit side (remote will be non nil). Returns `true`
1399
// if TLS is required, `false` otherwise.
1400
// Lock held on entry.
1401
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,899✔
1402
        // Check if TLS is required and gather TLS config variables.
1,899✔
1403
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,899✔
1404
        if !tlsRequired {
3,721✔
1405
                return false, nil
1,822✔
1406
        }
1,822✔
1407

1408
        // If TLS required, peform handshake.
1409
        // Get the URL that was used to connect to the remote server.
1410
        rURL := remote.getCurrentURL()
77✔
1411

77✔
1412
        // Perform the client-side TLS handshake.
77✔
1413
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
114✔
1414
                // Check if we need to reset the remote's TLS name.
37✔
1415
                if resetTLSName {
37✔
1416
                        remote.Lock()
×
1417
                        remote.tlsName = _EMPTY_
×
1418
                        remote.Unlock()
×
1419
                }
×
1420
                return false, err
37✔
1421
        }
1422
        return true, nil
40✔
1423
}
1424

1425
func (c *client) processLeafnodeInfo(info *Info) {
2,624✔
1426
        c.mu.Lock()
2,624✔
1427
        if c.leaf == nil || c.isClosed() {
2,624✔
1428
                c.mu.Unlock()
×
1429
                return
×
1430
        }
×
1431
        s := c.srv
2,624✔
1432
        opts := s.getOpts()
2,624✔
1433
        remote := c.leaf.remote
2,624✔
1434
        didSolicit := remote != nil
2,624✔
1435
        firstINFO := !c.flags.isSet(infoReceived)
2,624✔
1436

2,624✔
1437
        // In case of websocket, the TLS handshake has been already done.
2,624✔
1438
        // So check only for non websocket connections and for configurations
2,624✔
1439
        // where the TLS Handshake was not done first.
2,624✔
1440
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,469✔
1441
                // If the server requires TLS, we need to set this in the remote
1,845✔
1442
                // otherwise if there is no TLS configuration block for the remote,
1,845✔
1443
                // the solicit side will not attempt to perform the TLS handshake.
1,845✔
1444
                if firstINFO && info.TLSRequired {
1,906✔
1445
                        // Check for TLS/proxy configuration mismatch
61✔
1446
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
61✔
1447
                                c.mu.Unlock()
×
1448
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1449
                                c.closeConnection(TLSHandshakeError)
×
1450
                                return
×
1451
                        }
×
1452
                        remote.TLS = true
61✔
1453
                }
1454
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,877✔
1455
                        c.mu.Unlock()
32✔
1456
                        return
32✔
1457
                }
32✔
1458
        }
1459

1460
        // Check for compression, unless already done.
1461
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,879✔
1462
                // Prevent from getting back here.
1,287✔
1463
                c.flags.set(compressionNegotiated)
1,287✔
1464

1,287✔
1465
                var co *CompressionOpts
1,287✔
1466
                if !didSolicit {
1,855✔
1467
                        co = &opts.LeafNode.Compression
568✔
1468
                } else {
1,287✔
1469
                        co = &remote.Compression
719✔
1470
                }
719✔
1471
                if needsCompression(co.Mode) {
2,565✔
1472
                        // Release client lock since following function will need server lock.
1,278✔
1473
                        c.mu.Unlock()
1,278✔
1474
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,278✔
1475
                        if err != nil {
1,278✔
1476
                                c.sendErrAndErr(err.Error())
×
1477
                                c.closeConnection(ProtocolViolation)
×
1478
                                return
×
1479
                        }
×
1480
                        if compress {
2,431✔
1481
                                // Done for now, will get back another INFO protocol...
1,153✔
1482
                                return
1,153✔
1483
                        }
1,153✔
1484
                        // No compression because one side does not want/can't, so proceed.
1485
                        c.mu.Lock()
125✔
1486
                        // Check that the connection did not close if the lock was released.
125✔
1487
                        if c.isClosed() {
125✔
1488
                                c.mu.Unlock()
×
1489
                                return
×
1490
                        }
×
1491
                } else {
9✔
1492
                        // Coming from an old server, the Compression field would be the empty
9✔
1493
                        // string. For servers that are configured with CompressionNotSupported,
9✔
1494
                        // this makes them behave as old servers.
9✔
1495
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
11✔
1496
                                c.leaf.compression = CompressionNotSupported
2✔
1497
                        } else {
9✔
1498
                                c.leaf.compression = CompressionOff
7✔
1499
                        }
7✔
1500
                }
1501
                // Accepting side does not normally process an INFO protocol during
1502
                // initial connection handshake. So we keep it consistent by returning
1503
                // if we are not soliciting.
1504
                if !didSolicit {
135✔
1505
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1506
                        // clear the ping timer and set the auth timer now that the compression
1✔
1507
                        // negotiation is done.
1✔
1508
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1509
                                clearTimer(&c.ping.tmr)
×
1510
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1511
                        }
×
1512
                        c.mu.Unlock()
1✔
1513
                        return
1✔
1514
                }
1515
                // Fall through and process the INFO protocol as usual.
1516
        }
1517

1518
        // Note: For now, only the initial INFO has a nonce. We
1519
        // will probably do auto key rotation at some point.
1520
        if firstINFO {
2,197✔
1521
                // Mark that the INFO protocol has been received.
759✔
1522
                c.flags.set(infoReceived)
759✔
1523
                // Prevent connecting to non leafnode port. Need to do this only for
759✔
1524
                // the first INFO, not for async INFO updates...
759✔
1525
                //
759✔
1526
                // Content of INFO sent by the server when accepting a tcp connection.
759✔
1527
                // -------------------------------------------------------------------
759✔
1528
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
759✔
1529
                // -------------------------------------------------------------------
759✔
1530
                //      CLIENT    |  X* |        X**        |              |         |
759✔
1531
                //      ROUTE     |     |        X**        |      X***    |         |
759✔
1532
                //     GATEWAY    |     |                   |              |    X    |
759✔
1533
                //     LEAFNODE   |  X  |                   |       X      |         |
759✔
1534
                // -------------------------------------------------------------------
759✔
1535
                // *   Not on older servers.
759✔
1536
                // **  Not if "no advertise" is enabled.
759✔
1537
                // *** Not if leafnode's "no advertise" is enabled.
759✔
1538
                //
759✔
1539
                // As seen from above, a solicited LeafNode connection should receive
759✔
1540
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
759✔
1541
                // else should be considered an attempt to connect to a wrong port.
759✔
1542
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
810✔
1543
                        c.mu.Unlock()
51✔
1544
                        c.Errorf(ErrConnectedToWrongPort.Error())
51✔
1545
                        c.closeConnection(WrongPort)
51✔
1546
                        return
51✔
1547
                }
51✔
1548
                // Reject a cluster that contains spaces.
1549
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
709✔
1550
                        c.mu.Unlock()
1✔
1551
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1552
                        c.closeConnection(ProtocolViolation)
1✔
1553
                        return
1✔
1554
                }
1✔
1555
                // Capture a nonce here.
1556
                c.nonce = []byte(info.Nonce)
707✔
1557
                if info.TLSRequired && didSolicit {
736✔
1558
                        remote.TLS = true
29✔
1559
                }
29✔
1560
                supportsHeaders := c.srv.supportsHeaders()
707✔
1561
                c.headers = supportsHeaders && info.Headers
707✔
1562

707✔
1563
                // Remember the remote server.
707✔
1564
                // Pre 2.2.0 servers are not sending their server name.
707✔
1565
                // In that case, use info.ID, which, for those servers, matches
707✔
1566
                // the content of the field `Name` in the leafnode CONNECT protocol.
707✔
1567
                if info.Name == _EMPTY_ {
707✔
1568
                        c.leaf.remoteServer = info.ID
×
1569
                } else {
707✔
1570
                        c.leaf.remoteServer = info.Name
707✔
1571
                }
707✔
1572
                c.leaf.remoteDomain = info.Domain
707✔
1573
                c.leaf.remoteCluster = info.Cluster
707✔
1574
                // We send the protocol version in the INFO protocol.
707✔
1575
                // Keep track of it, so we know if this connection supports message
707✔
1576
                // tracing for instance.
707✔
1577
                c.opts.Protocol = info.Proto
707✔
1578
        }
1579

1580
        // For both initial INFO and async INFO protocols, Possibly
1581
        // update our list of remote leafnode URLs we can connect to.
1582
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,690✔
1583
                // Consider the incoming array as the most up-to-date
1,304✔
1584
                // representation of the remote cluster's list of URLs.
1,304✔
1585
                c.updateLeafNodeURLs(info)
1,304✔
1586
        }
1,304✔
1587

1588
        // Check to see if we have permissions updates here.
1589
        if info.Import != nil || info.Export != nil {
1,402✔
1590
                perms := &Permissions{
16✔
1591
                        Publish:   info.Export,
16✔
1592
                        Subscribe: info.Import,
16✔
1593
                }
16✔
1594
                // Check if we have local deny clauses that we need to merge.
16✔
1595
                if remote := c.leaf.remote; remote != nil {
32✔
1596
                        if len(remote.DenyExports) > 0 {
17✔
1597
                                if perms.Publish == nil {
1✔
1598
                                        perms.Publish = &SubjectPermission{}
×
1599
                                }
×
1600
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1601
                        }
1602
                        if len(remote.DenyImports) > 0 {
17✔
1603
                                if perms.Subscribe == nil {
1✔
1604
                                        perms.Subscribe = &SubjectPermission{}
×
1605
                                }
×
1606
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1607
                        }
1608
                }
1609
                c.setPermissions(perms)
16✔
1610
        }
1611

1612
        var resumeConnect bool
1,386✔
1613

1,386✔
1614
        // If this is a remote connection and this is the first INFO protocol,
1,386✔
1615
        // then we need to finish the connect process by sending CONNECT, etc..
1,386✔
1616
        if firstINFO && didSolicit {
2,050✔
1617
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
664✔
1618
                c.nc.SetDeadline(time.Time{})
664✔
1619
                resumeConnect = true
664✔
1620
        } else if !firstINFO && didSolicit {
2,026✔
1621
                c.leaf.remoteAccName = info.RemoteAccount
640✔
1622
        }
640✔
1623

1624
        // Check if we have the remote account information and if so make sure it's stored.
1625
        if info.RemoteAccount != _EMPTY_ {
2,013✔
1626
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
627✔
1627
        }
627✔
1628
        c.mu.Unlock()
1,386✔
1629

1,386✔
1630
        finishConnect := info.ConnectInfo
1,386✔
1631
        if resumeConnect && s != nil {
2,050✔
1632
                s.leafNodeResumeConnectProcess(c)
664✔
1633
                if !info.InfoOnConnect {
664✔
1634
                        finishConnect = true
×
1635
                }
×
1636
        }
1637
        if finishConnect {
2,013✔
1638
                s.leafNodeFinishConnectProcess(c)
627✔
1639
        }
627✔
1640

1641
        // Check to see if we need to kick any internal source or mirror consumers.
1642
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1643
        // does not have jetstream.
1644
        s.checkInternalSyncConsumers(c.acc)
1,386✔
1645
}
1646

1647
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,278✔
1648
        // Negotiate the appropriate compression mode (or no compression)
1,278✔
1649
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,278✔
1650
        if err != nil {
1,278✔
1651
                return false, err
×
1652
        }
×
1653
        c.mu.Lock()
1,278✔
1654
        // For "auto" mode, set the initial compression mode based on RTT
1,278✔
1655
        if cm == CompressionS2Auto {
2,399✔
1656
                if c.rttStart.IsZero() {
2,242✔
1657
                        c.rtt = computeRTT(c.start)
1,121✔
1658
                }
1,121✔
1659
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,121✔
1660
        }
1661
        // Keep track of the negotiated compression mode.
1662
        c.leaf.compression = cm
1,278✔
1663
        cid := c.cid
1,278✔
1664
        var nonce string
1,278✔
1665
        if !didSolicit {
1,845✔
1666
                nonce = bytesToString(c.nonce)
567✔
1667
        }
567✔
1668
        c.mu.Unlock()
1,278✔
1669

1,278✔
1670
        if !needsCompression(cm) {
1,403✔
1671
                return false, nil
125✔
1672
        }
125✔
1673

1674
        // If we end-up doing compression...
1675

1676
        // Generate an INFO with the chosen compression mode.
1677
        s.mu.Lock()
1,153✔
1678
        info := s.copyLeafNodeInfo()
1,153✔
1679
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,153✔
1680
        infoProto := generateInfoJSON(info)
1,153✔
1681
        s.mu.Unlock()
1,153✔
1682

1,153✔
1683
        // If we solicited, then send this INFO protocol BEFORE switching
1,153✔
1684
        // to compression writer. However, if we did not, we send it after.
1,153✔
1685
        c.mu.Lock()
1,153✔
1686
        if didSolicit {
1,739✔
1687
                c.enqueueProto(infoProto)
586✔
1688
                // Make sure it is completely flushed (the pending bytes goes to
586✔
1689
                // 0) before proceeding.
586✔
1690
                for c.out.pb > 0 && !c.isClosed() {
1,171✔
1691
                        c.flushOutbound()
585✔
1692
                }
585✔
1693
        }
1694
        // This is to notify the readLoop that it should switch to a
1695
        // (de)compression reader.
1696
        c.in.flags.set(switchToCompression)
1,153✔
1697
        // Create the compress writer before queueing the INFO protocol for
1,153✔
1698
        // a route that did not solicit. It will make sure that that proto
1,153✔
1699
        // is sent with compression on.
1,153✔
1700
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,153✔
1701
        if !didSolicit {
1,720✔
1702
                c.enqueueProto(infoProto)
567✔
1703
        }
567✔
1704
        c.mu.Unlock()
1,153✔
1705
        return true, nil
1,153✔
1706
}
1707

1708
// When getting a leaf node INFO protocol, use the provided
1709
// array of urls to update the list of possible endpoints.
1710
func (c *client) updateLeafNodeURLs(info *Info) {
1,304✔
1711
        cfg := c.leaf.remote
1,304✔
1712
        cfg.Lock()
1,304✔
1713
        defer cfg.Unlock()
1,304✔
1714

1,304✔
1715
        // We have ensured that if a remote has a WS scheme, then all are.
1,304✔
1716
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,304✔
1717
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,362✔
1718
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1719
                // we will have already marked that the remote should use TLS anyway.
58✔
1720
                // But use proper scheme for log statements, etc...
58✔
1721
                proto := wsSchemePrefix
58✔
1722
                if cfg.TLS {
58✔
1723
                        proto = wsSchemePrefixTLS
×
1724
                }
×
1725
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1726
                return
58✔
1727
        }
1728
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,246✔
1729
}
1730

1731
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,304✔
1732
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,304✔
1733
        // Add the ones we receive in the protocol
1,304✔
1734
        for _, surl := range URLs {
3,619✔
1735
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,315✔
1736
                if err != nil {
2,315✔
1737
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1738
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1739
                        continue
×
1740
                }
1741
                // Do not add if it's the same as what we already have configured.
1742
                var dup bool
2,315✔
1743
                for _, u := range cfg.URLs {
5,874✔
1744
                        // URLs that we receive never have user info, but the
3,559✔
1745
                        // ones that were configured may have. Simply compare
3,559✔
1746
                        // host and port to decide if they are equal or not.
3,559✔
1747
                        if url.Host == u.Host && url.Port() == u.Port() {
5,250✔
1748
                                dup = true
1,691✔
1749
                                break
1,691✔
1750
                        }
1751
                }
1752
                if !dup {
2,939✔
1753
                        cfg.urls = append(cfg.urls, url)
624✔
1754
                        cfg.saveTLSHostname(url)
624✔
1755
                }
624✔
1756
        }
1757
        // Add the configured one
1758
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,304✔
1759
}
1760

1761
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1762
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,679✔
1763
        opts := s.getOpts()
3,679✔
1764
        if opts.LeafNode.Advertise != _EMPTY_ {
3,690✔
1765
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1766
                if err != nil {
11✔
1767
                        return err
×
1768
                }
×
1769
                s.leafNodeInfo.Host = advHost
11✔
1770
                s.leafNodeInfo.Port = advPort
11✔
1771
        } else {
3,668✔
1772
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,668✔
1773
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,668✔
1774
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,668✔
1775
                // This will return at most 1 IP.
3,668✔
1776
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,668✔
1777
                if err != nil {
3,668✔
1778
                        return err
×
1779
                }
×
1780
                if hostIsIPAny {
3,963✔
1781
                        if len(ips) == 0 {
295✔
1782
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1783
                                        s.leafNodeInfo.Host)
×
1784
                        } else {
295✔
1785
                                // Take the first from the list...
295✔
1786
                                s.leafNodeInfo.Host = ips[0]
295✔
1787
                        }
295✔
1788
                }
1789
        }
1790
        // Use just host:port for the IP
1791
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,679✔
1792
        if opts.LeafNode.Advertise != _EMPTY_ {
3,690✔
1793
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1794
        }
11✔
1795
        return nil
3,679✔
1796
}
1797

1798
// Add the connection to the map of leaf nodes.
1799
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1800
// if a connection already exists for the same server name and account.
1801
// That can happen when the remote is attempting to reconnect while the accepting
1802
// side did not detect the connection as broken yet.
1803
// But it can also happen when there is a misconfiguration and the remote is
1804
// creating two (or more) connections that bind to the same account on the accept
1805
// side.
1806
// When a duplicate is found, the new connection is accepted and the old is closed
1807
// (this solves the stale connection situation). An error is returned to help the
1808
// remote detect the misconfiguration when the duplicate is the result of that
1809
// misconfiguration.
1810
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,292✔
1811
        var accName string
1,292✔
1812
        c.mu.Lock()
1,292✔
1813
        cid := c.cid
1,292✔
1814
        acc := c.acc
1,292✔
1815
        if acc != nil {
2,584✔
1816
                accName = acc.Name
1,292✔
1817
        }
1,292✔
1818
        myRemoteDomain := c.leaf.remoteDomain
1,292✔
1819
        mySrvName := c.leaf.remoteServer
1,292✔
1820
        remoteAccName := c.leaf.remoteAccName
1,292✔
1821
        myClustName := c.leaf.remoteCluster
1,292✔
1822
        solicited := c.leaf.remote != nil
1,292✔
1823
        c.mu.Unlock()
1,292✔
1824

1,292✔
1825
        var old *client
1,292✔
1826
        s.mu.Lock()
1,292✔
1827
        // We check for empty because in some test we may send empty CONNECT{}
1,292✔
1828
        if checkForDup && srvName != _EMPTY_ {
1,920✔
1829
                for _, ol := range s.leafs {
992✔
1830
                        ol.mu.Lock()
364✔
1831
                        // We care here only about non solicited Leafnode. This function
364✔
1832
                        // is more about replacing stale connections than detecting loops.
364✔
1833
                        // We have code for the loop detection elsewhere, which also delays
364✔
1834
                        // attempt to reconnect.
364✔
1835
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
364✔
1836
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
364✔
1837
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
366✔
1838
                                old = ol
2✔
1839
                        }
2✔
1840
                        ol.mu.Unlock()
364✔
1841
                        if old != nil {
366✔
1842
                                break
2✔
1843
                        }
1844
                }
1845
        }
1846
        // Store new connection in the map
1847
        s.leafs[cid] = c
1,292✔
1848
        s.mu.Unlock()
1,292✔
1849
        s.removeFromTempClients(cid)
1,292✔
1850

1,292✔
1851
        // If applicable, evict the old one.
1,292✔
1852
        if old != nil {
1,294✔
1853
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1854
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1855
                c.Warnf("Replacing connection from same server")
2✔
1856
        }
2✔
1857

1858
        srvDecorated := func() string {
1,499✔
1859
                if myClustName == _EMPTY_ {
230✔
1860
                        return mySrvName
23✔
1861
                }
23✔
1862
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
184✔
1863
        }
1864

1865
        opts := s.getOpts()
1,292✔
1866
        sysAcc := s.SystemAccount()
1,292✔
1867
        js := s.getJetStream()
1,292✔
1868
        var meta *raft
1,292✔
1869
        if js != nil {
1,828✔
1870
                if mg := js.getMetaGroup(); mg != nil {
955✔
1871
                        meta = mg.(*raft)
419✔
1872
                }
419✔
1873
        }
1874
        blockMappingOutgoing := false
1,292✔
1875
        // Deny (non domain) JetStream API traffic unless system account is shared
1,292✔
1876
        // and domain names are identical and extending is not disabled
1,292✔
1877

1,292✔
1878
        // Check if backwards compatibility has been enabled and needs to be acted on
1,292✔
1879
        forceSysAccDeny := false
1,292✔
1880
        if len(opts.JsAccDefaultDomain) > 0 {
1,329✔
1881
                if acc == sysAcc {
48✔
1882
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1883
                                if d == _EMPTY_ {
19✔
1884
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1885
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1886
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1887
                                        forceSysAccDeny = true
8✔
1888
                                        break
8✔
1889
                                }
1890
                        }
1891
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1892
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1893
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1894
                        return
15✔
1895
                }
15✔
1896
        }
1897

1898
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1899
        // This is either signaled by js being disabled and a domain set,
1900
        // or in cases where no domain name exists, an extension hint is set.
1901
        // However, this is only relevant in mixed setups.
1902
        //
1903
        // If the system account connects but default domains are present, JetStream can't be extended.
1904
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,277✔
1905
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,396✔
1906
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,119✔
1907
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,119✔
1908
                if acc != nil && acc == sysAcc {
1,254✔
1909
                        c.Noticef("System account connected from %s", srvDecorated())
135✔
1910
                        c.Noticef("JetStream not extended, domains differ")
135✔
1911
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
135✔
1912
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
135✔
1913
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
135✔
1914
                        if solicited && meta != nil && meta.IsObserver() {
164✔
1915
                                meta.setObserver(false, extNotExtended)
29✔
1916
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1917
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1918
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1919
                                // Meta controller can't be leader yet.
29✔
1920
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1921
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1922
                                meta.Campaign()
29✔
1923
                        }
29✔
1924
                } else {
984✔
1925
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
984✔
1926
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
984✔
1927
                }
984✔
1928
                blockMappingOutgoing = true
1,119✔
1929
        } else if acc == sysAcc {
230✔
1930
                // system account and same domain
72✔
1931
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1932
                        myRemoteDomain, srvDecorated())
72✔
1933
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1934
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1935
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1936
                        meta.setObserver(true, extExtended)
4✔
1937
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1938
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1939
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1940
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1941
                        meta.StepDown()
4✔
1942
                }
4✔
1943
        } else {
86✔
1944
                // This deny is needed in all cases (system account shared or not)
86✔
1945
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1946
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1947
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1948
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1949
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1950
        }
86✔
1951
        // If we have a specified JetStream domain we will want to add a mapping to
1952
        // allow access cross domain for each non-system account.
1953
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,527✔
1954
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,500✔
1955
                        if err := acc.AddMapping(src, dest); err != nil {
2,250✔
1956
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1957
                        } else {
2,250✔
1958
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,250✔
1959
                        }
2,250✔
1960
                }
1961
                if blockMappingOutgoing {
469✔
1962
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
219✔
1963
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
219✔
1964
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
219✔
1965
                        // of this issue, not all of them.
219✔
1966
                        // This guards against a hub and a spoke having the same domain name.
219✔
1967
                        // But not two spokes having the same one and the request coming from the hub.
219✔
1968
                        c.mergeDenyPermissionsLocked(pub, []string{src})
219✔
1969
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
219✔
1970
                }
219✔
1971
        }
1972
}
1973

1974
func (s *Server) removeLeafNodeConnection(c *client) {
1,647✔
1975
        c.mu.Lock()
1,647✔
1976
        cid := c.cid
1,647✔
1977
        if c.leaf != nil {
3,294✔
1978
                if c.leaf.tsubt != nil {
2,807✔
1979
                        c.leaf.tsubt.Stop()
1,160✔
1980
                        c.leaf.tsubt = nil
1,160✔
1981
                }
1,160✔
1982
                if c.leaf.gwSub != nil {
2,272✔
1983
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
625✔
1984
                        // We need to set this to nil for GC to release the connection
625✔
1985
                        c.leaf.gwSub = nil
625✔
1986
                }
625✔
1987
        }
1988
        proxyKey := c.proxyKey
1,647✔
1989
        c.mu.Unlock()
1,647✔
1990
        s.mu.Lock()
1,647✔
1991
        delete(s.leafs, cid)
1,647✔
1992
        if proxyKey != _EMPTY_ {
1,651✔
1993
                s.removeProxiedConn(proxyKey, cid)
4✔
1994
        }
4✔
1995
        s.mu.Unlock()
1,647✔
1996
        s.removeFromTempClients(cid)
1,647✔
1997
}
1998

1999
// Connect information for solicited leafnodes.
2000
type leafConnectInfo struct {
2001
        Version   string   `json:"version,omitempty"`
2002
        Nkey      string   `json:"nkey,omitempty"`
2003
        JWT       string   `json:"jwt,omitempty"`
2004
        Sig       string   `json:"sig,omitempty"`
2005
        User      string   `json:"user,omitempty"`
2006
        Pass      string   `json:"pass,omitempty"`
2007
        Token     string   `json:"auth_token,omitempty"`
2008
        ID        string   `json:"server_id,omitempty"`
2009
        Domain    string   `json:"domain,omitempty"`
2010
        Name      string   `json:"name,omitempty"`
2011
        Hub       bool     `json:"is_hub,omitempty"`
2012
        Cluster   string   `json:"cluster,omitempty"`
2013
        Headers   bool     `json:"headers,omitempty"`
2014
        JetStream bool     `json:"jetstream,omitempty"`
2015
        DenyPub   []string `json:"deny_pub,omitempty"`
2016
        Isolate   bool     `json:"isolate,omitempty"`
2017

2018
        // There was an existing field called:
2019
        // >> Comp bool `json:"compression,omitempty"`
2020
        // that has never been used. With support for compression, we now need
2021
        // a field that is a string. So we use a different json tag:
2022
        Compression string `json:"compress_mode,omitempty"`
2023

2024
        // Just used to detect wrong connection attempts.
2025
        Gateway string `json:"gateway,omitempty"`
2026

2027
        // Tells the accept side which account the remote is binding to.
2028
        RemoteAccount string `json:"remote_account,omitempty"`
2029

2030
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2031
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2032
        // version as part of the CONNECT. It will indicate if a connection supports
2033
        // some features, such as message tracing.
2034
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2035
        // in the low level process CONNECT.
2036
        Proto int `json:"protocol,omitempty"`
2037
}
2038

2039
// processLeafNodeConnect will process the inbound connect args.
2040
// Once we are here we are bound to an account, so can send any interest that
2041
// we would have to the other side.
2042
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
672✔
2043
        // Way to detect clients that incorrectly connect to the route listen
672✔
2044
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
672✔
2045
        if lang != _EMPTY_ {
672✔
2046
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2047
                c.closeConnection(WrongPort)
×
2048
                return ErrClientConnectedToLeafNodePort
×
2049
        }
×
2050

2051
        // Unmarshal as a leaf node connect protocol
2052
        proto := &leafConnectInfo{}
672✔
2053
        if err := json.Unmarshal(arg, proto); err != nil {
672✔
2054
                return err
×
2055
        }
×
2056

2057
        // Reject a cluster that contains spaces.
2058
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
673✔
2059
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2060
                c.closeConnection(ProtocolViolation)
1✔
2061
                return ErrClusterNameHasSpaces
1✔
2062
        }
1✔
2063

2064
        // Check for cluster name collisions.
2065
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
674✔
2066
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2067
                c.closeConnection(ClusterNamesIdentical)
3✔
2068
                return ErrLeafNodeHasSameClusterName
3✔
2069
        }
3✔
2070

2071
        // Reject if this has Gateway which means that it would be from a gateway
2072
        // connection that incorrectly connects to the leafnode port.
2073
        if proto.Gateway != _EMPTY_ {
668✔
2074
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2075
                c.Errorf(errTxt)
×
2076
                c.sendErr(errTxt)
×
2077
                c.closeConnection(WrongGateway)
×
2078
                return ErrWrongGateway
×
2079
        }
×
2080

2081
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
670✔
2082
                major, minor, update, _ := versionComponents(mv)
2✔
2083
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2084
                        // Send back an INFO so recent remote servers process the rejection
1✔
2085
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2086
                        // reconnect delay when it processes the error.
1✔
2087
                        s.sendPermsAndAccountInfo(c)
1✔
2088
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2089
                        c.closeConnection(MinimumVersionRequired)
1✔
2090
                        return ErrMinimumVersionRequired
1✔
2091
                }
1✔
2092
        }
2093

2094
        // Check if this server supports headers.
2095
        supportHeaders := c.srv.supportsHeaders()
667✔
2096

667✔
2097
        c.mu.Lock()
667✔
2098
        // Leaf Nodes do not do echo or verbose or pedantic.
667✔
2099
        c.opts.Verbose = false
667✔
2100
        c.opts.Echo = false
667✔
2101
        c.opts.Pedantic = false
667✔
2102
        // This inbound connection will be marked as supporting headers if this server
667✔
2103
        // support headers and the remote has sent in the CONNECT protocol that it does
667✔
2104
        // support headers too.
667✔
2105
        c.headers = supportHeaders && proto.Headers
667✔
2106
        // If the compression level is still not set, set it based on what has been
667✔
2107
        // given to us in the CONNECT protocol.
667✔
2108
        if c.leaf.compression == _EMPTY_ {
796✔
2109
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
129✔
2110
                if proto.Compression == _EMPTY_ {
168✔
2111
                        c.leaf.compression = CompressionNotSupported
39✔
2112
                } else {
129✔
2113
                        c.leaf.compression = proto.Compression
90✔
2114
                }
90✔
2115
        }
2116

2117
        // Remember the remote server.
2118
        c.leaf.remoteServer = proto.Name
667✔
2119
        // Remember the remote account name
667✔
2120
        c.leaf.remoteAccName = proto.RemoteAccount
667✔
2121
        // Remember if the leafnode requested isolation.
667✔
2122
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
667✔
2123

667✔
2124
        // If the other side has declared itself a hub, so we will take on the spoke role.
667✔
2125
        if proto.Hub {
683✔
2126
                c.leaf.isSpoke = true
16✔
2127
        }
16✔
2128

2129
        // The soliciting side is part of a cluster.
2130
        if proto.Cluster != _EMPTY_ {
1,180✔
2131
                c.leaf.remoteCluster = proto.Cluster
513✔
2132
        }
513✔
2133

2134
        c.leaf.remoteDomain = proto.Domain
667✔
2135

667✔
2136
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
667✔
2137
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
667✔
2138
        if !c.isSolicitedLeafNode() && c.perms != nil {
684✔
2139
                sp, pp := c.perms.sub, c.perms.pub
17✔
2140
                c.perms.sub, c.perms.pub = pp, sp
17✔
2141
                if c.opts.Import != nil {
33✔
2142
                        c.darray = c.opts.Import.Deny
16✔
2143
                } else {
17✔
2144
                        c.darray = nil
1✔
2145
                }
1✔
2146
        }
2147

2148
        // Set the Ping timer
2149
        c.setFirstPingTimer()
667✔
2150

667✔
2151
        // If we received pub deny permissions from the other end, merge with existing ones.
667✔
2152
        c.mergeDenyPermissions(pub, proto.DenyPub)
667✔
2153

667✔
2154
        acc := c.acc
667✔
2155
        c.mu.Unlock()
667✔
2156

667✔
2157
        // Register the cluster, even if empty, as long as we are acting as a hub.
667✔
2158
        if !proto.Hub {
1,318✔
2159
                acc.registerLeafNodeCluster(proto.Cluster)
651✔
2160
        }
651✔
2161

2162
        // Add in the leafnode here since we passed through auth at this point.
2163
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
667✔
2164

667✔
2165
        // If we have permissions bound to this leafnode we need to send then back to the
667✔
2166
        // origin server for local enforcement.
667✔
2167
        s.sendPermsAndAccountInfo(c)
667✔
2168

667✔
2169
        // Create and initialize the smap since we know our bound account now.
667✔
2170
        // This will send all registered subs too.
667✔
2171
        s.initLeafNodeSmapAndSendSubs(c)
667✔
2172

667✔
2173
        // Announce the account connect event for a leaf node.
667✔
2174
        // This will be a no-op as needed.
667✔
2175
        s.sendLeafNodeConnect(c.acc)
667✔
2176

667✔
2177
        // Check to see if we need to kick any internal source or mirror consumers.
667✔
2178
        // This will be a no-op if JetStream not enabled for this server or if the bound account
667✔
2179
        // does not have jetstream.
667✔
2180
        s.checkInternalSyncConsumers(acc)
667✔
2181

667✔
2182
        return nil
667✔
2183
}
2184

2185
// checkInternalSyncConsumers
2186
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,053✔
2187
        // Grab our js
2,053✔
2188
        js := s.getJetStream()
2,053✔
2189

2,053✔
2190
        // Only applicable if we have JS and the leafnode has JS as well.
2,053✔
2191
        // We check for remote JS outside.
2,053✔
2192
        if !js.isEnabled() || acc == nil {
3,220✔
2193
                return
1,167✔
2194
        }
1,167✔
2195

2196
        // We will check all streams in our local account. They must be a leader and
2197
        // be sourcing or mirroring. We will check the external config on the stream itself
2198
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2199
        // extending the system across this leafnode connection and hence we would be extending
2200
        // our own domain.
2201
        jsa := js.lookupAccount(acc)
886✔
2202
        if jsa == nil {
1,233✔
2203
                return
347✔
2204
        }
347✔
2205

2206
        var streams []*stream
539✔
2207
        jsa.mu.RLock()
539✔
2208
        for _, mset := range jsa.streams {
594✔
2209
                mset.cfgMu.RLock()
55✔
2210
                // We need to have a mirror or source defined.
55✔
2211
                // We do not want to force another lock here to look for leader status,
55✔
2212
                // so collect and after we release jsa will make sure.
55✔
2213
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
67✔
2214
                        streams = append(streams, mset)
12✔
2215
                }
12✔
2216
                mset.cfgMu.RUnlock()
55✔
2217
        }
2218
        jsa.mu.RUnlock()
539✔
2219

539✔
2220
        // Now loop through all candidates and check if we are the leader and have NOT
539✔
2221
        // created the sync up consumer.
539✔
2222
        for _, mset := range streams {
551✔
2223
                mset.retryDisconnectedSyncConsumers()
12✔
2224
        }
12✔
2225
}
2226

2227
// Returns the remote cluster name. This is set only once so does not require a lock.
2228
func (c *client) remoteCluster() string {
164,458✔
2229
        if c.leaf == nil {
164,458✔
2230
                return _EMPTY_
×
2231
        }
×
2232
        return c.leaf.remoteCluster
164,458✔
2233
}
2234

2235
// Sends back an info block to the soliciting leafnode to let it know about
2236
// its permission settings for local enforcement.
2237
func (s *Server) sendPermsAndAccountInfo(c *client) {
668✔
2238
        // Copy
668✔
2239
        s.mu.Lock()
668✔
2240
        info := s.copyLeafNodeInfo()
668✔
2241
        s.mu.Unlock()
668✔
2242
        c.mu.Lock()
668✔
2243
        info.CID = c.cid
668✔
2244
        info.Import = c.opts.Import
668✔
2245
        info.Export = c.opts.Export
668✔
2246
        info.RemoteAccount = c.acc.Name
668✔
2247
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
668✔
2248
        info.IsSystemAccount = c.acc == s.SystemAccount()
668✔
2249
        info.ConnectInfo = true
668✔
2250
        c.enqueueProto(generateInfoJSON(info))
668✔
2251
        c.mu.Unlock()
668✔
2252
}
668✔
2253

2254
// Snapshot the current subscriptions from the sublist into our smap which
2255
// we will keep updated from now on.
2256
// Also send the registered subscriptions.
2257
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,292✔
2258
        acc := c.acc
1,292✔
2259
        if acc == nil {
1,292✔
2260
                c.Debugf("Leafnode does not have an account bound")
×
2261
                return
×
2262
        }
×
2263
        // Collect all account subs here.
2264
        _subs := [1024]*subscription{}
1,292✔
2265
        subs := _subs[:0]
1,292✔
2266
        ims := []string{}
1,292✔
2267

1,292✔
2268
        // Hold the client lock otherwise there can be a race and miss some subs.
1,292✔
2269
        c.mu.Lock()
1,292✔
2270
        defer c.mu.Unlock()
1,292✔
2271

1,292✔
2272
        acc.mu.RLock()
1,292✔
2273
        accName := acc.Name
1,292✔
2274
        accNTag := acc.nameTag
1,292✔
2275

1,292✔
2276
        // To make printing look better when no friendly name present.
1,292✔
2277
        if accNTag != _EMPTY_ {
1,304✔
2278
                accNTag = "/" + accNTag
12✔
2279
        }
12✔
2280

2281
        // If we are solicited we only send interest for local clients.
2282
        if c.isSpokeLeafNode() {
1,917✔
2283
                acc.sl.localSubs(&subs, true)
625✔
2284
        } else {
1,292✔
2285
                acc.sl.All(&subs)
667✔
2286
        }
667✔
2287

2288
        // Check if we have an existing service import reply.
2289
        siReply := copyBytes(acc.siReply)
1,292✔
2290

1,292✔
2291
        // Since leaf nodes only send on interest, if the bound
1,292✔
2292
        // account has import services we need to send those over.
1,292✔
2293
        for isubj := range acc.imports.services {
6,109✔
2294
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,104✔
2295
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
287✔
2296
                        continue
287✔
2297
                }
2298
                ims = append(ims, isubj)
4,530✔
2299
        }
2300
        // Likewise for mappings.
2301
        for _, m := range acc.mappings {
3,662✔
2302
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,416✔
2303
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2304
                        continue
46✔
2305
                }
2306
                ims = append(ims, m.src)
2,324✔
2307
        }
2308

2309
        // Create a unique subject that will be used for loop detection.
2310
        lds := acc.lds
1,292✔
2311
        acc.mu.RUnlock()
1,292✔
2312

1,292✔
2313
        // Check if we have to create the LDS.
1,292✔
2314
        if lds == _EMPTY_ {
2,306✔
2315
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,014✔
2316
                acc.mu.Lock()
1,014✔
2317
                acc.lds = lds
1,014✔
2318
                acc.mu.Unlock()
1,014✔
2319
        }
1,014✔
2320

2321
        // Now check for gateway interest. Leafnodes will put this into
2322
        // the proper mode to propagate, but they are not held in the account.
2323
        gwsa := [16]*client{}
1,292✔
2324
        gws := gwsa[:0]
1,292✔
2325
        s.getOutboundGatewayConnections(&gws)
1,292✔
2326
        for _, cgw := range gws {
1,373✔
2327
                cgw.mu.Lock()
81✔
2328
                gw := cgw.gw
81✔
2329
                cgw.mu.Unlock()
81✔
2330
                if gw != nil {
162✔
2331
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
162✔
2332
                                if e := ei.(*outsie); e != nil && e.sl != nil {
162✔
2333
                                        e.sl.All(&subs)
81✔
2334
                                }
81✔
2335
                        }
2336
                }
2337
        }
2338

2339
        applyGlobalRouting := s.gateway.enabled
1,292✔
2340
        if c.isSpokeLeafNode() {
1,917✔
2341
                // Add a fake subscription for this solicited leafnode connection
625✔
2342
                // so that we can send back directly for mapped GW replies.
625✔
2343
                // We need to keep track of this subscription so it can be removed
625✔
2344
                // when the connection is closed so that the GC can release it.
625✔
2345
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
625✔
2346
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
625✔
2347
        }
625✔
2348

2349
        // Now walk the results and add them to our smap
2350
        rc := c.leaf.remoteCluster
1,292✔
2351
        c.leaf.smap = make(map[string]int32)
1,292✔
2352
        for _, sub := range subs {
38,699✔
2353
                // Check perms regardless of role.
37,407✔
2354
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
39,729✔
2355
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,322✔
2356
                        continue
2,322✔
2357
                }
2358
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2359
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
35,100✔
2360
                        continue
15✔
2361
                }
2362
                // We ignore ourselves here.
2363
                // Also don't add the subscription if it has a origin cluster and the
2364
                // cluster name matches the one of the client we are sending to.
2365
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
64,898✔
2366
                        count := int32(1)
29,828✔
2367
                        if len(sub.queue) > 0 && sub.qw > 0 {
29,840✔
2368
                                count = sub.qw
12✔
2369
                        }
12✔
2370
                        c.leaf.smap[keyFromSub(sub)] += count
29,828✔
2371
                        if c.leaf.tsub == nil {
31,043✔
2372
                                c.leaf.tsub = make(map[*subscription]struct{})
1,215✔
2373
                        }
1,215✔
2374
                        c.leaf.tsub[sub] = struct{}{}
29,828✔
2375
                }
2376
        }
2377
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2378
        for _, isubj := range ims {
8,146✔
2379
                c.leaf.smap[isubj]++
6,854✔
2380
        }
6,854✔
2381
        // If we have gateways enabled we need to make sure the other side sends us responses
2382
        // that have been augmented from the original subscription.
2383
        // TODO(dlc) - Should we lock this down more?
2384
        if applyGlobalRouting {
1,393✔
2385
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
101✔
2386
                c.leaf.smap[gwReplyPrefix+">"]++
101✔
2387
        }
101✔
2388
        // Detect loops by subscribing to a specific subject and checking
2389
        // if this sub is coming back to us.
2390
        c.leaf.smap[lds]++
1,292✔
2391

1,292✔
2392
        // Check if we need to add an existing siReply to our map.
1,292✔
2393
        // This will be a prefix so add on the wildcard.
1,292✔
2394
        if siReply != nil {
1,310✔
2395
                wcsub := append(siReply, '>')
18✔
2396
                c.leaf.smap[string(wcsub)]++
18✔
2397
        }
18✔
2398
        // Queue all protocols. There is no max pending limit for LN connection,
2399
        // so we don't need chunking. The writes will happen from the writeLoop.
2400
        var b bytes.Buffer
1,292✔
2401
        for key, n := range c.leaf.smap {
27,684✔
2402
                c.writeLeafSub(&b, key, n)
26,392✔
2403
        }
26,392✔
2404
        if b.Len() > 0 {
2,584✔
2405
                c.enqueueProto(b.Bytes())
1,292✔
2406
        }
1,292✔
2407
        if c.leaf.tsub != nil {
2,508✔
2408
                // Clear the tsub map after 5 seconds.
1,216✔
2409
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,272✔
2410
                        c.mu.Lock()
56✔
2411
                        if c.leaf != nil {
112✔
2412
                                c.leaf.tsub = nil
56✔
2413
                                c.leaf.tsubt = nil
56✔
2414
                        }
56✔
2415
                        c.mu.Unlock()
56✔
2416
                })
2417
        }
2418
}
2419

2420
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2421
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
199,926✔
2422
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
199,926✔
2423
        acc, err := s.lookupOrFetchAccount(accName, false)
199,926✔
2424
        if acc == nil || err != nil {
200,178✔
2425
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
252✔
2426
                return
252✔
2427
        }
252✔
2428
        acc.updateLeafNodes(sub, delta)
199,674✔
2429
}
2430

2431
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2432
// Will also forward to all leaf nodes as needed.
2433
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2434
// (that is, for which this server acts as a hub to them).
2435
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,406,823✔
2436
        if acc == nil || sub == nil {
2,406,823✔
2437
                return
×
2438
        }
×
2439

2440
        // We will do checks for no leafnodes and same cluster here inline and under the
2441
        // general account read lock.
2442
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2443
        // blocking routes or GWs.
2444

2445
        acc.mu.RLock()
2,406,823✔
2446
        // First check if we even have leafnodes here.
2,406,823✔
2447
        if acc.nleafs == 0 {
4,745,088✔
2448
                acc.mu.RUnlock()
2,338,265✔
2449
                return
2,338,265✔
2450
        }
2,338,265✔
2451

2452
        // Is this a loop detection subject.
2453
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
68,558✔
2454

68,558✔
2455
        // Capture the cluster even if its empty.
68,558✔
2456
        var cluster string
68,558✔
2457
        if sub.origin != nil {
117,876✔
2458
                cluster = bytesToString(sub.origin)
49,318✔
2459
        }
49,318✔
2460

2461
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2462
        // Empty clusters will return false for the check.
2463
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
90,225✔
2464
                acc.mu.RUnlock()
21,667✔
2465
                return
21,667✔
2466
        }
21,667✔
2467

2468
        // We can release the general account lock.
2469
        acc.mu.RUnlock()
46,891✔
2470

46,891✔
2471
        // We can hold the list lock here to avoid having to copy a large slice.
46,891✔
2472
        acc.lmu.RLock()
46,891✔
2473
        defer acc.lmu.RUnlock()
46,891✔
2474

46,891✔
2475
        // Do this once.
46,891✔
2476
        subject := string(sub.subject)
46,891✔
2477

46,891✔
2478
        // Walk the connected leafnodes.
46,891✔
2479
        for _, ln := range acc.lleafs {
104,542✔
2480
                if ln == sub.client {
87,723✔
2481
                        continue
30,072✔
2482
                }
2483
                ln.mu.Lock()
27,579✔
2484
                // Don't advertise interest from leafnodes to other isolated leafnodes.
27,579✔
2485
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
27,610✔
2486
                        ln.mu.Unlock()
31✔
2487
                        continue
31✔
2488
                }
2489
                // If `hubOnly` is true, it means that we want to update only leafnodes
2490
                // that connect to this server (so isHubLeafNode() would return `true`).
2491
                if hubOnly && !ln.isHubLeafNode() {
27,554✔
2492
                        ln.mu.Unlock()
6✔
2493
                        continue
6✔
2494
                }
2495
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2496
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2497
                // the detection of loops as long as different cluster.
2498
                clusterDifferent := cluster != ln.remoteCluster()
27,542✔
2499
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
50,834✔
2500
                        ln.updateSmap(sub, delta, isLDS)
23,292✔
2501
                }
23,292✔
2502
                ln.mu.Unlock()
27,542✔
2503
        }
2504
}
2505

2506
// updateLeafNodes will make sure to update the account smap for the subscription.
2507
// Will also forward to all leaf nodes as needed.
2508
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,406,800✔
2509
        acc.updateLeafNodesEx(sub, delta, false)
2,406,800✔
2510
}
2,406,800✔
2511

2512
// This will make an update to our internal smap and determine if we should send out
2513
// an interest update to the remote side.
2514
// Lock should be held.
2515
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,292✔
2516
        if c.leaf.smap == nil {
23,308✔
2517
                return
16✔
2518
        }
16✔
2519

2520
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2521
        skind := sub.client.kind
23,276✔
2522
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,276✔
2523
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
31,515✔
2524
                return
8,239✔
2525
        }
8,239✔
2526

2527
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2528
        if delta > 0 && c.leaf.tsub != nil {
22,356✔
2529
                if _, present := c.leaf.tsub[sub]; present {
7,321✔
2530
                        delete(c.leaf.tsub, sub)
2✔
2531
                        if len(c.leaf.tsub) == 0 {
2✔
2532
                                c.leaf.tsub = nil
×
2533
                                c.leaf.tsubt.Stop()
×
2534
                                c.leaf.tsubt = nil
×
2535
                        }
×
2536
                        return
2✔
2537
                }
2538
        }
2539

2540
        key := keyFromSub(sub)
15,035✔
2541
        n, ok := c.leaf.smap[key]
15,035✔
2542
        if delta < 0 && !ok {
16,102✔
2543
                return
1,067✔
2544
        }
1,067✔
2545

2546
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2547
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,968✔
2548
        n += delta
13,968✔
2549
        if n > 0 {
24,215✔
2550
                c.leaf.smap[key] = n
10,247✔
2551
        } else {
13,968✔
2552
                delete(c.leaf.smap, key)
3,721✔
2553
        }
3,721✔
2554
        if update {
23,983✔
2555
                c.sendLeafNodeSubUpdate(key, n)
10,015✔
2556
        }
10,015✔
2557
}
2558

2559
// Used to force add subjects to the subject map.
2560
func (c *client) forceAddToSmap(subj string) {
13✔
2561
        c.mu.Lock()
13✔
2562
        defer c.mu.Unlock()
13✔
2563

13✔
2564
        if c.leaf.smap == nil {
13✔
2565
                return
×
2566
        }
×
2567
        n := c.leaf.smap[subj]
13✔
2568
        if n != 0 {
14✔
2569
                return
1✔
2570
        }
1✔
2571
        // Place into the map since it was not there.
2572
        c.leaf.smap[subj] = 1
12✔
2573
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2574
}
2575

2576
// Used to force remove a subject from the subject map.
2577
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2578
        c.mu.Lock()
1✔
2579
        defer c.mu.Unlock()
1✔
2580

1✔
2581
        if c.leaf.smap == nil {
1✔
2582
                return
×
2583
        }
×
2584
        n := c.leaf.smap[subj]
1✔
2585
        if n == 0 {
1✔
2586
                return
×
2587
        }
×
2588
        n--
1✔
2589
        if n == 0 {
2✔
2590
                // Remove is now zero
1✔
2591
                delete(c.leaf.smap, subj)
1✔
2592
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2593
        } else {
1✔
2594
                c.leaf.smap[subj] = n
×
2595
        }
×
2596
}
2597

2598
// Send the subscription interest change to the other side.
2599
// Lock should be held.
2600
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
10,028✔
2601
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
10,028✔
2602
        if c.isSpokeLeafNode() {
12,464✔
2603
                checkPerms := true
2,436✔
2604
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,903✔
2605
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,467✔
2606
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,467✔
2607
                                strings.HasPrefix(key, gwReplyPrefix) {
1,550✔
2608
                                checkPerms = false
83✔
2609
                        }
83✔
2610
                }
2611
                if checkPerms {
4,789✔
2612
                        var subject string
2,353✔
2613
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,842✔
2614
                                subject = key[:sep]
489✔
2615
                        } else {
2,353✔
2616
                                subject = key
1,864✔
2617
                        }
1,864✔
2618
                        if !c.canSubscribe(subject) {
2,362✔
2619
                                return
9✔
2620
                        }
9✔
2621
                }
2622
        }
2623
        // If we are here we can send over to the other side.
2624
        _b := [64]byte{}
10,019✔
2625
        b := bytes.NewBuffer(_b[:0])
10,019✔
2626
        c.writeLeafSub(b, key, n)
10,019✔
2627
        c.enqueueProto(b.Bytes())
10,019✔
2628
}
2629

2630
// Helper function to build the key.
2631
func keyFromSub(sub *subscription) string {
45,889✔
2632
        var sb strings.Builder
45,889✔
2633
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
45,889✔
2634
        sb.Write(sub.subject)
45,889✔
2635
        if sub.queue != nil {
49,700✔
2636
                // Just make the key subject spc group, e.g. 'foo bar'
3,811✔
2637
                sb.WriteByte(' ')
3,811✔
2638
                sb.Write(sub.queue)
3,811✔
2639
        }
3,811✔
2640
        return sb.String()
45,889✔
2641
}
2642

2643
const (
2644
        keyRoutedSub         = "R"
2645
        keyRoutedSubByte     = 'R'
2646
        keyRoutedLeafSub     = "L"
2647
        keyRoutedLeafSubByte = 'L'
2648
)
2649

2650
// Helper function to build the key that prevents collisions between normal
2651
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2652
// Keys will look like this:
2653
// "R foo"          -> plain routed sub on "foo"
2654
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2655
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2656
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2657
func keyFromSubWithOrigin(sub *subscription) string {
682,461✔
2658
        var sb strings.Builder
682,461✔
2659
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
682,461✔
2660
        leaf := len(sub.origin) > 0
682,461✔
2661
        if leaf {
698,839✔
2662
                sb.WriteByte(keyRoutedLeafSubByte)
16,378✔
2663
        } else {
682,461✔
2664
                sb.WriteByte(keyRoutedSubByte)
666,083✔
2665
        }
666,083✔
2666
        sb.WriteByte(' ')
682,461✔
2667
        sb.Write(sub.subject)
682,461✔
2668
        if sub.queue != nil {
708,728✔
2669
                sb.WriteByte(' ')
26,267✔
2670
                sb.Write(sub.queue)
26,267✔
2671
        }
26,267✔
2672
        if leaf {
698,839✔
2673
                sb.WriteByte(' ')
16,378✔
2674
                sb.Write(sub.origin)
16,378✔
2675
        }
16,378✔
2676
        return sb.String()
682,461✔
2677
}
2678

2679
// Lock should be held.
2680
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
36,411✔
2681
        if key == _EMPTY_ {
36,411✔
2682
                return
×
2683
        }
×
2684
        if n > 0 {
69,100✔
2685
                w.WriteString("LS+ " + key)
32,689✔
2686
                // Check for queue semantics, if found write n.
32,689✔
2687
                if strings.Contains(key, " ") {
34,997✔
2688
                        w.WriteString(" ")
2,308✔
2689
                        var b [12]byte
2,308✔
2690
                        var i = len(b)
2,308✔
2691
                        for l := n; l > 0; l /= 10 {
5,522✔
2692
                                i--
3,214✔
2693
                                b[i] = digits[l%10]
3,214✔
2694
                        }
3,214✔
2695
                        w.Write(b[i:])
2,308✔
2696
                        if c.trace {
2,308✔
2697
                                arg := fmt.Sprintf("%s %d", key, n)
×
2698
                                c.traceOutOp("LS+", []byte(arg))
×
2699
                        }
×
2700
                } else if c.trace {
30,572✔
2701
                        c.traceOutOp("LS+", []byte(key))
191✔
2702
                }
191✔
2703
        } else {
3,722✔
2704
                w.WriteString("LS- " + key)
3,722✔
2705
                if c.trace {
3,732✔
2706
                        c.traceOutOp("LS-", []byte(key))
10✔
2707
                }
10✔
2708
        }
2709
        w.WriteString(CR_LF)
36,411✔
2710
}
2711

2712
// processLeafSub will process an inbound sub request for the remote leaf node.
2713
func (c *client) processLeafSub(argo []byte) (err error) {
32,387✔
2714
        // Indicate activity.
32,387✔
2715
        c.in.subs++
32,387✔
2716

32,387✔
2717
        srv := c.srv
32,387✔
2718
        if srv == nil {
32,387✔
2719
                return nil
×
2720
        }
×
2721

2722
        // Copy so we do not reference a potentially large buffer
2723
        arg := make([]byte, len(argo))
32,387✔
2724
        copy(arg, argo)
32,387✔
2725

32,387✔
2726
        args := splitArg(arg)
32,387✔
2727
        sub := &subscription{client: c}
32,387✔
2728

32,387✔
2729
        delta := int32(1)
32,387✔
2730
        switch len(args) {
32,387✔
2731
        case 1:
30,130✔
2732
                sub.queue = nil
30,130✔
2733
        case 3:
2,257✔
2734
                sub.queue = args[1]
2,257✔
2735
                sub.qw = int32(parseSize(args[2]))
2,257✔
2736
                // TODO: (ik) We should have a non empty queue name and a queue
2,257✔
2737
                // weight >= 1. For 2.11, we may want to return an error if that
2,257✔
2738
                // is not the case, but for now just overwrite `delta` if queue
2,257✔
2739
                // weight is greater than 1 (it is possible after a reconnect/
2,257✔
2740
                // server restart to receive a queue weight > 1 for a new sub).
2,257✔
2741
                if sub.qw > 1 {
3,912✔
2742
                        delta = sub.qw
1,655✔
2743
                }
1,655✔
2744
        default:
×
2745
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2746
        }
2747
        sub.subject = args[0]
32,387✔
2748

32,387✔
2749
        c.mu.Lock()
32,387✔
2750
        if c.isClosed() {
32,401✔
2751
                c.mu.Unlock()
14✔
2752
                return nil
14✔
2753
        }
14✔
2754

2755
        acc := c.acc
32,373✔
2756
        // Guard against LS+ arriving before CONNECT has been processed, which
32,373✔
2757
        // can happen when compression is enabled.
32,373✔
2758
        if acc == nil {
32,376✔
2759
                c.mu.Unlock()
3✔
2760
                c.sendErr("Authorization Violation")
3✔
2761
                c.closeConnection(ProtocolViolation)
3✔
2762
                return nil
3✔
2763
        }
3✔
2764
        // Check if we have a loop.
2765
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
32,370✔
2766

32,370✔
2767
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
32,376✔
2768
                c.mu.Unlock()
6✔
2769
                c.handleLeafNodeLoop(true)
6✔
2770
                return nil
6✔
2771
        }
6✔
2772

2773
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2774
        checkPerms := true
32,364✔
2775
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
61,799✔
2776
                if ldsPrefix ||
29,435✔
2777
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
29,435✔
2778
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
31,414✔
2779
                        checkPerms = false
1,979✔
2780
                }
1,979✔
2781
        }
2782

2783
        // If we are a hub check that we can publish to this subject.
2784
        if checkPerms {
62,749✔
2785
                subj := string(sub.subject)
30,385✔
2786
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,709✔
2787
                        c.mu.Unlock()
324✔
2788
                        c.leafSubPermViolation(sub.subject)
324✔
2789
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
324✔
2790
                        return nil
324✔
2791
                }
324✔
2792
        }
2793

2794
        // Check if we have a maximum on the number of subscriptions.
2795
        if c.subsAtLimit() {
32,048✔
2796
                c.mu.Unlock()
8✔
2797
                c.maxSubsExceeded()
8✔
2798
                return nil
8✔
2799
        }
8✔
2800

2801
        // If we have an origin cluster associated mark that in the sub.
2802
        if rc := c.remoteCluster(); rc != _EMPTY_ {
60,492✔
2803
                sub.origin = []byte(rc)
28,460✔
2804
        }
28,460✔
2805

2806
        // Like Routes, we store local subs by account and subject and optionally queue name.
2807
        // If we have a queue it will have a trailing weight which we do not want.
2808
        if sub.queue != nil {
33,999✔
2809
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,967✔
2810
        } else {
32,032✔
2811
                sub.sid = arg
30,065✔
2812
        }
30,065✔
2813
        key := bytesToString(sub.sid)
32,032✔
2814
        osub := c.subs[key]
32,032✔
2815
        if osub == nil {
62,553✔
2816
                c.subs[key] = sub
30,521✔
2817
                // Now place into the account sl.
30,521✔
2818
                if err := acc.sl.Insert(sub); err != nil {
30,521✔
2819
                        delete(c.subs, key)
×
2820
                        c.mu.Unlock()
×
2821
                        c.Errorf("Could not insert subscription: %v", err)
×
2822
                        c.sendErr("Invalid Subscription")
×
2823
                        return nil
×
2824
                }
×
2825
        } else if sub.queue != nil {
3,021✔
2826
                // For a queue we need to update the weight.
1,510✔
2827
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,510✔
2828
                atomic.StoreInt32(&osub.qw, sub.qw)
1,510✔
2829
                acc.sl.UpdateRemoteQSub(osub)
1,510✔
2830
        }
1,510✔
2831
        spoke := c.isSpokeLeafNode()
32,032✔
2832
        c.mu.Unlock()
32,032✔
2833

32,032✔
2834
        // Only add in shadow subs if a new sub or qsub.
32,032✔
2835
        if osub == nil {
62,553✔
2836
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
30,521✔
2837
                        c.Errorf(err.Error())
×
2838
                }
×
2839
        }
2840

2841
        // If we are not solicited, treat leaf node subscriptions similar to a
2842
        // client subscription, meaning we forward them to routes, gateways and
2843
        // other leaf nodes as needed.
2844
        if !spoke {
43,240✔
2845
                // If we are routing add to the route map for the associated account.
11,208✔
2846
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,208✔
2847
                if srv.gateway.enabled {
12,718✔
2848
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,510✔
2849
                }
1,510✔
2850
        }
2851
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2852
        // and non-solicited state in this call so we will do the right thing.
2853
        acc.updateLeafNodes(sub, delta)
32,032✔
2854

32,032✔
2855
        return nil
32,032✔
2856
}
2857

2858
// If the leafnode is a solicited, set the connect delay based on default
2859
// or private option (for tests). Sends the error to the other side, log and
2860
// close the connection.
2861
func (c *client) handleLeafNodeLoop(sendErr bool) {
14✔
2862
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
14✔
2863
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
14✔
2864
        if sendErr {
22✔
2865
                c.sendErr(errTxt)
8✔
2866
        }
8✔
2867

2868
        c.Errorf(errTxt)
14✔
2869
        // If we are here with "sendErr" false, it means that this is the server
14✔
2870
        // that received the error. The other side will have closed the connection,
14✔
2871
        // but does not hurt to close here too.
14✔
2872
        c.closeConnection(ProtocolViolation)
14✔
2873
}
2874

2875
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2876
func (c *client) processLeafUnsub(arg []byte) error {
3,337✔
2877
        // Indicate any activity, so pub and sub or unsubs.
3,337✔
2878
        c.in.subs++
3,337✔
2879

3,337✔
2880
        srv := c.srv
3,337✔
2881

3,337✔
2882
        c.mu.Lock()
3,337✔
2883
        if c.isClosed() {
3,364✔
2884
                c.mu.Unlock()
27✔
2885
                return nil
27✔
2886
        }
27✔
2887

2888
        acc := c.acc
3,310✔
2889
        // Guard against LS- arriving before CONNECT has been processed.
3,310✔
2890
        if acc == nil {
3,311✔
2891
                c.mu.Unlock()
1✔
2892
                c.sendErr("Authorization Violation")
1✔
2893
                c.closeConnection(ProtocolViolation)
1✔
2894
                return nil
1✔
2895
        }
1✔
2896

2897
        spoke := c.isSpokeLeafNode()
3,309✔
2898
        // We store local subs by account and subject and optionally queue name.
3,309✔
2899
        // LS- will have the arg exactly as the key.
3,309✔
2900
        sub, ok := c.subs[string(arg)]
3,309✔
2901
        if !ok {
3,317✔
2902
                // If not found, don't try to update routes/gws/leaf nodes.
8✔
2903
                c.mu.Unlock()
8✔
2904
                return nil
8✔
2905
        }
8✔
2906
        delta := int32(1)
3,301✔
2907
        if len(sub.queue) > 0 {
3,715✔
2908
                delta = sub.qw
414✔
2909
        }
414✔
2910
        c.mu.Unlock()
3,301✔
2911

3,301✔
2912
        c.unsubscribe(acc, sub, true, true)
3,301✔
2913
        if !spoke {
4,272✔
2914
                // If we are routing subtract from the route map for the associated account.
971✔
2915
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
971✔
2916
                // Gateways
971✔
2917
                if srv.gateway.enabled {
1,209✔
2918
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
238✔
2919
                }
238✔
2920
        }
2921
        // Now check on leafnode updates for other leaf nodes.
2922
        acc.updateLeafNodes(sub, -delta)
3,301✔
2923
        return nil
3,301✔
2924
}
2925

2926
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
479✔
2927
        // Unroll splitArgs to avoid runtime/heap issues
479✔
2928
        args := c.argsa[:0]
479✔
2929
        start := -1
479✔
2930
        for i, b := range arg {
32,445✔
2931
                switch b {
31,966✔
2932
                case ' ', '\t', '\r', '\n':
1,360✔
2933
                        if start >= 0 {
2,720✔
2934
                                args = append(args, arg[start:i])
1,360✔
2935
                                start = -1
1,360✔
2936
                        }
1,360✔
2937
                default:
30,606✔
2938
                        if start < 0 {
32,445✔
2939
                                start = i
1,839✔
2940
                        }
1,839✔
2941
                }
2942
        }
2943
        if start >= 0 {
958✔
2944
                args = append(args, arg[start:])
479✔
2945
        }
479✔
2946

2947
        c.pa.arg = arg
479✔
2948
        switch len(args) {
479✔
2949
        case 0, 1, 2:
×
2950
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2951
        case 3:
86✔
2952
                c.pa.reply = nil
86✔
2953
                c.pa.queues = nil
86✔
2954
                c.pa.hdb = args[1]
86✔
2955
                c.pa.hdr = parseSize(args[1])
86✔
2956
                c.pa.szb = args[2]
86✔
2957
                c.pa.size = parseSize(args[2])
86✔
2958
        case 4:
386✔
2959
                c.pa.reply = args[1]
386✔
2960
                c.pa.queues = nil
386✔
2961
                c.pa.hdb = args[2]
386✔
2962
                c.pa.hdr = parseSize(args[2])
386✔
2963
                c.pa.szb = args[3]
386✔
2964
                c.pa.size = parseSize(args[3])
386✔
2965
        default:
7✔
2966
                // args[1] is our reply indicator. Should be + or | normally.
7✔
2967
                if len(args[1]) != 1 {
7✔
2968
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2969
                }
×
2970
                switch args[1][0] {
7✔
2971
                case '+':
2✔
2972
                        c.pa.reply = args[2]
2✔
2973
                case '|':
5✔
2974
                        c.pa.reply = nil
5✔
2975
                default:
×
2976
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2977
                }
2978
                // Grab header size.
2979
                c.pa.hdb = args[len(args)-2]
7✔
2980
                c.pa.hdr = parseSize(c.pa.hdb)
7✔
2981

7✔
2982
                // Grab size.
7✔
2983
                c.pa.szb = args[len(args)-1]
7✔
2984
                c.pa.size = parseSize(c.pa.szb)
7✔
2985

7✔
2986
                // Grab queue names.
7✔
2987
                if c.pa.reply != nil {
9✔
2988
                        c.pa.queues = args[3 : len(args)-2]
2✔
2989
                } else {
7✔
2990
                        c.pa.queues = args[2 : len(args)-2]
5✔
2991
                }
5✔
2992
        }
2993
        if c.pa.hdr < 0 {
479✔
2994
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2995
        }
×
2996
        if c.pa.size < 0 {
479✔
2997
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2998
        }
×
2999
        if c.pa.hdr > c.pa.size {
479✔
3000
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3001
        }
×
3002

3003
        // Common ones processed after check for arg length
3004
        c.pa.subject = args[0]
479✔
3005

479✔
3006
        return nil
479✔
3007
}
3008

3009
func (c *client) processLeafMsgArgs(arg []byte) error {
80,663✔
3010
        // Unroll splitArgs to avoid runtime/heap issues
80,663✔
3011
        args := c.argsa[:0]
80,663✔
3012
        start := -1
80,663✔
3013
        for i, b := range arg {
2,606,684✔
3014
                switch b {
2,526,021✔
3015
                case ' ', '\t', '\r', '\n':
132,630✔
3016
                        if start >= 0 {
265,260✔
3017
                                args = append(args, arg[start:i])
132,630✔
3018
                                start = -1
132,630✔
3019
                        }
132,630✔
3020
                default:
2,393,391✔
3021
                        if start < 0 {
2,606,684✔
3022
                                start = i
213,293✔
3023
                        }
213,293✔
3024
                }
3025
        }
3026
        if start >= 0 {
161,326✔
3027
                args = append(args, arg[start:])
80,663✔
3028
        }
80,663✔
3029

3030
        c.pa.arg = arg
80,663✔
3031
        switch len(args) {
80,663✔
3032
        case 0, 1:
×
3033
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3034
        case 2:
51,416✔
3035
                c.pa.reply = nil
51,416✔
3036
                c.pa.queues = nil
51,416✔
3037
                c.pa.szb = args[1]
51,416✔
3038
                c.pa.size = parseSize(args[1])
51,416✔
3039
        case 3:
6,689✔
3040
                c.pa.reply = args[1]
6,689✔
3041
                c.pa.queues = nil
6,689✔
3042
                c.pa.szb = args[2]
6,689✔
3043
                c.pa.size = parseSize(args[2])
6,689✔
3044
        default:
22,558✔
3045
                // args[1] is our reply indicator. Should be + or | normally.
22,558✔
3046
                if len(args[1]) != 1 {
22,558✔
3047
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3048
                }
×
3049
                switch args[1][0] {
22,558✔
3050
                case '+':
162✔
3051
                        c.pa.reply = args[2]
162✔
3052
                case '|':
22,396✔
3053
                        c.pa.reply = nil
22,396✔
3054
                default:
×
3055
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3056
                }
3057
                // Grab size.
3058
                c.pa.szb = args[len(args)-1]
22,558✔
3059
                c.pa.size = parseSize(c.pa.szb)
22,558✔
3060

22,558✔
3061
                // Grab queue names.
22,558✔
3062
                if c.pa.reply != nil {
22,720✔
3063
                        c.pa.queues = args[3 : len(args)-1]
162✔
3064
                } else {
22,558✔
3065
                        c.pa.queues = args[2 : len(args)-1]
22,396✔
3066
                }
22,396✔
3067
        }
3068
        if c.pa.size < 0 {
80,663✔
3069
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3070
        }
×
3071

3072
        // Common ones processed after check for arg length
3073
        c.pa.subject = args[0]
80,663✔
3074

80,663✔
3075
        return nil
80,663✔
3076
}
3077

3078
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3079
func (c *client) processInboundLeafMsg(msg []byte) {
79,332✔
3080
        // Update statistics
79,332✔
3081
        // The msg includes the CR_LF, so pull back out for accounting.
79,332✔
3082
        c.in.msgs++
79,332✔
3083
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
79,332✔
3084

79,332✔
3085
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
79,332✔
3086

79,332✔
3087
        // Mostly under testing scenarios.
79,332✔
3088
        if srv == nil || acc == nil {
79,332✔
3089
                return
×
3090
        }
×
3091

3092
        // Match the subscriptions. We will use our own L1 map if
3093
        // it's still valid, avoiding contention on the shared sublist.
3094
        var r *SublistResult
79,332✔
3095
        var ok bool
79,332✔
3096

79,332✔
3097
        genid := atomic.LoadUint64(&c.acc.sl.genid)
79,332✔
3098
        if genid == c.in.genid && c.in.results != nil {
156,270✔
3099
                r, ok = c.in.results[subject]
76,938✔
3100
        } else {
79,332✔
3101
                // Reset our L1 completely.
2,394✔
3102
                c.in.results = make(map[string]*SublistResult)
2,394✔
3103
                c.in.genid = genid
2,394✔
3104
        }
2,394✔
3105

3106
        // Go back to the sublist data structure.
3107
        if !ok {
127,844✔
3108
                r = c.acc.sl.Match(subject)
48,512✔
3109
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
48,512✔
3110
                if len(c.in.results) >= maxResultCacheSize {
49,803✔
3111
                        n := 0
1,291✔
3112
                        for subj := range c.in.results {
43,894✔
3113
                                delete(c.in.results, subj)
42,603✔
3114
                                if n++; n > pruneSize {
43,894✔
3115
                                        break
1,291✔
3116
                                }
3117
                        }
3118
                }
3119
                // Then add the new cache entry.
3120
                c.in.results[subject] = r
48,512✔
3121
        }
3122

3123
        // Collect queue names if needed.
3124
        var qnames [][]byte
79,332✔
3125

79,332✔
3126
        // Check for no interest, short circuit if so.
79,332✔
3127
        // This is the fanout scale.
79,332✔
3128
        if len(r.psubs)+len(r.qsubs) > 0 {
158,386✔
3129
                flag := pmrNoFlag
79,054✔
3130
                // If we have queue subs in this cluster, then if we run in gateway
79,054✔
3131
                // mode and the remote gateways have queue subs, then we need to
79,054✔
3132
                // collect the queue groups this message was sent to so that we
79,054✔
3133
                // exclude them when sending to gateways.
79,054✔
3134
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
79,054✔
3135
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
91,318✔
3136
                        flag |= pmrCollectQueueNames
12,264✔
3137
                }
12,264✔
3138
                // If this is a mapped subject that means the mapped interest
3139
                // is what got us here, but this might not have a queue designation
3140
                // If that is the case, make sure we ignore to process local queue subscribers.
3141
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
79,392✔
3142
                        flag |= pmrIgnoreEmptyQueueFilter
338✔
3143
                }
338✔
3144
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
79,054✔
3145
        }
3146

3147
        // Now deal with gateways
3148
        if c.srv.gateway.enabled {
92,632✔
3149
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,300✔
3150
        }
13,300✔
3151
}
3152

3153
// Handles a subscription permission violation.
3154
// See leafPermViolation() for details.
3155
func (c *client) leafSubPermViolation(subj []byte) {
324✔
3156
        c.leafPermViolation(false, subj)
324✔
3157
}
324✔
3158

3159
// Common function to process publish or subscribe leafnode permission violation.
3160
// Sends the permission violation error to the remote, logs it and closes the connection.
3161
// If this is from a server soliciting, the reconnection will be delayed.
3162
func (c *client) leafPermViolation(pub bool, subj []byte) {
324✔
3163
        if c.isSpokeLeafNode() {
648✔
3164
                // For spokes these are no-ops since the hub server told us our permissions.
324✔
3165
                // We just need to not send these over to the other side since we will get cutoff.
324✔
3166
                return
324✔
3167
        }
324✔
3168
        // FIXME(dlc) ?
3169
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3170
        var action string
×
3171
        if pub {
×
3172
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3173
                action = "Publish"
×
3174
        } else {
×
3175
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3176
                action = "Subscription"
×
3177
        }
×
3178
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3179
        // TODO: add a new close reason that is more appropriate?
×
3180
        c.closeConnection(ProtocolViolation)
×
3181
}
3182

3183
// Invoked from generic processErr() for LEAF connections.
3184
func (c *client) leafProcessErr(errStr string) {
45✔
3185
        // Check if we got a cluster name collision.
45✔
3186
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
48✔
3187
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3188
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3189
                return
3✔
3190
        }
3✔
3191
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
43✔
3192
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3193
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3194
                return
1✔
3195
        }
1✔
3196

3197
        // We will look for Loop detected error coming from the other side.
3198
        // If we solicit, set the connect delay.
3199
        if !strings.Contains(errStr, "Loop detected") {
76✔
3200
                return
35✔
3201
        }
35✔
3202
        c.handleLeafNodeLoop(false)
6✔
3203
}
3204

3205
// If this leaf connection solicits, sets the connect delay to the given value,
3206
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3207
// Returns the connection's account name and delay.
3208
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
18✔
3209
        c.mu.Lock()
18✔
3210
        if c.isSolicitedLeafNode() {
29✔
3211
                if s := c.srv; s != nil {
22✔
3212
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
3213
                                delay = srvdelay
4✔
3214
                        }
4✔
3215
                }
3216
                c.leaf.remote.setConnectDelay(delay)
11✔
3217
        }
3218
        accName := c.acc.Name
18✔
3219
        c.mu.Unlock()
18✔
3220
        return accName, delay
18✔
3221
}
3222

3223
// For the given remote Leafnode configuration, this function returns
3224
// if TLS is required, and if so, will return a clone of the TLS Config
3225
// (since some fields will be changed during handshake), the TLS server
3226
// name that is remembered, and the TLS timeout.
3227
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,899✔
3228
        var (
1,899✔
3229
                tlsConfig  *tls.Config
1,899✔
3230
                tlsName    string
1,899✔
3231
                tlsTimeout float64
1,899✔
3232
        )
1,899✔
3233

1,899✔
3234
        remote.RLock()
1,899✔
3235
        defer remote.RUnlock()
1,899✔
3236

1,899✔
3237
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,899✔
3238
        if tlsRequired {
1,976✔
3239
                if remote.TLSConfig != nil {
127✔
3240
                        tlsConfig = remote.TLSConfig.Clone()
50✔
3241
                } else {
77✔
3242
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
27✔
3243
                }
27✔
3244
                tlsName = remote.tlsName
77✔
3245
                tlsTimeout = remote.TLSTimeout
77✔
3246
                if tlsTimeout == 0 {
121✔
3247
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
44✔
3248
                }
44✔
3249
        }
3250

3251
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,899✔
3252
}
3253

3254
// Initiates the LeafNode Websocket connection by:
3255
// - doing the TLS handshake if needed
3256
// - sending the HTTP request
3257
// - waiting for the HTTP response
3258
//
3259
// Since some bufio reader is used to consume the HTTP response, this function
3260
// returns the slice of buffered bytes (if any) so that the readLoop that will
3261
// be started after that consume those first before reading from the socket.
3262
// The boolean
3263
//
3264
// Lock held on entry.
3265
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3266
        remote.RLock()
50✔
3267
        compress := remote.Websocket.Compression
50✔
3268
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3269
        noMasking := remote.Websocket.NoMasking
50✔
3270
        infoTimeout := remote.FirstInfoTimeout
50✔
3271
        remote.RUnlock()
50✔
3272
        // Will do the client-side TLS handshake if needed.
50✔
3273
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3274
        if err != nil {
54✔
3275
                // 0 will indicate that the connection was already closed
4✔
3276
                return nil, 0, err
4✔
3277
        }
4✔
3278

3279
        // For http request, we need the passed URL to contain either http or https scheme.
3280
        scheme := "http"
46✔
3281
        if tlsRequired {
54✔
3282
                scheme = "https"
8✔
3283
        }
8✔
3284
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3285
        // create a LEAF connection, not a CLIENT.
3286
        // In case we use the user's URL path in the future, make sure we append the user's
3287
        // path to our `/leafnode` path.
3288
        lpath := leafNodeWSPath
46✔
3289
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3290
                if curPath[0] == '/' {
42✔
3291
                        curPath = curPath[1:]
21✔
3292
                }
21✔
3293
                lpath = path.Join(curPath, lpath)
21✔
3294
        } else {
25✔
3295
                lpath = lpath[1:]
25✔
3296
        }
25✔
3297
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3298
        u, _ := url.Parse(ustr)
46✔
3299
        req := &http.Request{
46✔
3300
                Method:     "GET",
46✔
3301
                URL:        u,
46✔
3302
                Proto:      "HTTP/1.1",
46✔
3303
                ProtoMajor: 1,
46✔
3304
                ProtoMinor: 1,
46✔
3305
                Header:     make(http.Header),
46✔
3306
                Host:       u.Host,
46✔
3307
        }
46✔
3308
        wsKey, err := wsMakeChallengeKey()
46✔
3309
        if err != nil {
46✔
3310
                return nil, WriteError, err
×
3311
        }
×
3312

3313
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3314
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3315
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3316
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3317
        if compress {
55✔
3318
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3319
        }
9✔
3320
        if noMasking {
56✔
3321
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3322
        }
10✔
3323
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3324
        if err := req.Write(c.nc); err != nil {
46✔
3325
                return nil, WriteError, err
×
3326
        }
×
3327

3328
        var resp *http.Response
46✔
3329

46✔
3330
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3331
        resp, err = http.ReadResponse(br, req)
46✔
3332
        if err == nil &&
46✔
3333
                (resp.StatusCode != 101 ||
46✔
3334
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3335
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3336
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3337

1✔
3338
                err = fmt.Errorf("invalid websocket connection")
1✔
3339
        }
1✔
3340
        // Check compression extension...
3341
        if err == nil && c.ws.compress {
55✔
3342
                // Check that not only permessage-deflate extension is present, but that
9✔
3343
                // we also have server and client no context take over.
9✔
3344
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3345

9✔
3346
                // If server does not support compression, then simply disable it in our side.
9✔
3347
                if !srvCompress {
13✔
3348
                        c.ws.compress = false
4✔
3349
                } else if !noCtxTakeover {
9✔
3350
                        err = fmt.Errorf("compression negotiation error")
×
3351
                }
×
3352
        }
3353
        // Same for no masking...
3354
        if err == nil && noMasking {
56✔
3355
                // Check if server accepts no masking
10✔
3356
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3357
                        // Nope, need to mask our writes as any client would do.
1✔
3358
                        c.ws.maskwrite = true
1✔
3359
                }
1✔
3360
        }
3361
        if resp != nil {
76✔
3362
                resp.Body.Close()
30✔
3363
        }
30✔
3364
        if err != nil {
63✔
3365
                return nil, ReadError, err
17✔
3366
        }
17✔
3367
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3368

29✔
3369
        var preBuf []byte
29✔
3370
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3371
        if n := br.Buffered(); n != 0 {
29✔
3372
                preBuf, _ = br.Peek(n)
×
3373
        }
×
3374
        return preBuf, 0, nil
29✔
3375
}
3376

3377
const connectProcessTimeout = 2 * time.Second
3378

3379
// This is invoked for remote LEAF remote connections after processing the INFO
3380
// protocol.
3381
func (s *Server) leafNodeResumeConnectProcess(c *client) {
664✔
3382
        clusterName := s.ClusterName()
664✔
3383

664✔
3384
        c.mu.Lock()
664✔
3385
        if c.isClosed() {
664✔
3386
                c.mu.Unlock()
×
3387
                return
×
3388
        }
×
3389
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
666✔
3390
                c.mu.Unlock()
2✔
3391
                c.closeConnection(WriteError)
2✔
3392
                return
2✔
3393
        }
2✔
3394

3395
        // Spin up the write loop.
3396
        s.startGoRoutine(func() { c.writeLoop() })
1,324✔
3397

3398
        // timeout leafNodeFinishConnectProcess
3399
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
662✔
3400
                c.mu.Lock()
×
3401
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3402
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3403
                        c.mu.Unlock()
×
3404
                        return
×
3405
                }
×
3406
                clearTimer(&c.ping.tmr)
×
3407
                closed := c.isClosed()
×
3408
                c.mu.Unlock()
×
3409
                if !closed {
×
3410
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3411
                        c.closeConnection(StaleConnection)
×
3412
                }
×
3413
        })
3414
        c.mu.Unlock()
662✔
3415
        c.Debugf("Remote leafnode connect msg sent")
662✔
3416
}
3417

3418
// This is invoked for remote LEAF connections after processing the INFO
3419
// protocol and leafNodeResumeConnectProcess.
3420
// This will send LS+ the CONNECT protocol and register the leaf node.
3421
func (s *Server) leafNodeFinishConnectProcess(c *client) {
627✔
3422
        c.mu.Lock()
627✔
3423
        if !c.flags.setIfNotSet(connectProcessFinished) {
627✔
3424
                c.mu.Unlock()
×
3425
                return
×
3426
        }
×
3427
        if c.isClosed() {
627✔
3428
                c.mu.Unlock()
×
3429
                s.removeLeafNodeConnection(c)
×
3430
                return
×
3431
        }
×
3432
        remote := c.leaf.remote
627✔
3433
        // Check if we will need to send the system connect event.
627✔
3434
        remote.RLock()
627✔
3435
        sendSysConnectEvent := remote.Hub
627✔
3436
        remote.RUnlock()
627✔
3437

627✔
3438
        // Capture account before releasing lock
627✔
3439
        acc := c.acc
627✔
3440
        // cancel connectProcessTimeout
627✔
3441
        clearTimer(&c.ping.tmr)
627✔
3442
        c.mu.Unlock()
627✔
3443

627✔
3444
        // Make sure we register with the account here.
627✔
3445
        if err := c.registerWithAccount(acc); err != nil {
629✔
3446
                if err == ErrTooManyAccountConnections {
2✔
3447
                        c.maxAccountConnExceeded()
×
3448
                        return
×
3449
                } else if err == ErrLeafNodeLoop {
4✔
3450
                        c.handleLeafNodeLoop(true)
2✔
3451
                        return
2✔
3452
                }
2✔
3453
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3454
                c.closeConnection(ProtocolViolation)
×
3455
                return
×
3456
        }
3457
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
625✔
3458
        s.initLeafNodeSmapAndSendSubs(c)
625✔
3459
        if sendSysConnectEvent {
641✔
3460
                s.sendLeafNodeConnect(acc)
16✔
3461
        }
16✔
3462

3463
        // The above functions are not atomically under the client
3464
        // lock doing those operations. It is possible - since we
3465
        // have started the read/write loops - that the connection
3466
        // is closed before or in between. This would leave the
3467
        // closed LN connection possible registered with the account
3468
        // and/or the server's leafs map. So check if connection
3469
        // is closed, and if so, manually cleanup.
3470
        c.mu.Lock()
625✔
3471
        closed := c.isClosed()
625✔
3472
        if !closed {
1,250✔
3473
                c.setFirstPingTimer()
625✔
3474
        }
625✔
3475
        c.mu.Unlock()
625✔
3476
        if closed {
625✔
3477
                s.removeLeafNodeConnection(c)
×
3478
                if prev := acc.removeClient(c); prev == 1 {
×
3479
                        s.decActiveAccounts()
×
3480
                }
×
3481
        }
3482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc