• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 23329921902

19 Mar 2026 03:23PM UTC coverage: 83.107% (-0.08%) from 83.187%
23329921902

push

github

web-flow
[FIXED] Incomplete route pool on premature pong (#7971)

If the first PONG was received before `c.route.startNewRoute` was
initialized it would result in the route pool not being fully populated.
Instead of relying on the very first pong, we can just rely on the first
pong after initializing the field.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

75408 of 90736 relevant lines covered (83.11%)

342585.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.24
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // When a soliciting leafnode is rejected because it does not meet the
67
        // configured minimum version, delay the next reconnect attempt by this long.
68
        leafNodeMinVersionReconnectDelay = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remote server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Whether or not we want to propagate east-west interest from other LNs.
86
        isolated bool
87
        // Used to suppress sub and unsub interest. Same as routes but our audience
88
        // here is tied to this leaf node. This will hold all subscriptions except this
89
        // leaf nodes. This represents all the interest we want to send to the other side.
90
        smap map[string]int32
91
        // This map will contain all the subscriptions that have been added to the smap
92
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
93
        // race between processing of a sub where sub is added to account sublist but
94
        // updateSmap has not be called on that "thread", while in the LN readloop,
95
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
96
        // this subscription to smap. When processing of the sub then calls updateSmap,
97
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
98
        tsub  map[*subscription]struct{}
99
        tsubt *time.Timer
100
        // Selected compression mode, which may be different from the server configured mode.
101
        compression string
102
        // This is for GW map replies.
103
        gwSub *subscription
104
}
105

106
// Used for remote (solicited) leafnodes.
107
type leafNodeCfg struct {
108
        sync.RWMutex
109
        *RemoteLeafOpts
110
        urls           []*url.URL
111
        curURL         *url.URL
112
        tlsName        string
113
        username       string
114
        password       string
115
        perms          *Permissions
116
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
117
        jsMigrateTimer *time.Timer
118
}
119

120
// Check to see if this is a solicited leafnode. We do special processing for solicited.
121
func (c *client) isSolicitedLeafNode() bool {
2,148✔
122
        return c.kind == LEAF && c.leaf.remote != nil
2,148✔
123
}
2,148✔
124

125
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
126
// connection leafnode where the otherside has declared itself to be the hub.
127
func (c *client) isSpokeLeafNode() bool {
5,785,814✔
128
        return c.kind == LEAF && c.leaf.isSpoke
5,785,814✔
129
}
5,785,814✔
130

131
func (c *client) isHubLeafNode() bool {
18,036✔
132
        return c.kind == LEAF && !c.leaf.isSpoke
18,036✔
133
}
18,036✔
134

135
func (c *client) isIsolatedLeafNode() bool {
11,607✔
136
        // TODO(nat): In future we may want to pass in and consider an isolation
11,607✔
137
        // group name here, which the hub and/or leaf could provide, so that we
11,607✔
138
        // can isolate away certain LNs but not others on an opt-in basis. For
11,607✔
139
        // now we will just isolate all LN interest until then.
11,607✔
140
        return c.kind == LEAF && c.leaf.isolated
11,607✔
141
}
11,607✔
142

143
// This will spin up go routines to solicit the remote leaf node connections.
144
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,220✔
145
        sysAccName := _EMPTY_
1,220✔
146
        sAcc := s.SystemAccount()
1,220✔
147
        if sAcc != nil {
2,417✔
148
                sysAccName = sAcc.Name
1,197✔
149
        }
1,197✔
150
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,587✔
151
                s.mu.Lock()
1,367✔
152
                remote := newLeafNodeCfg(r)
1,367✔
153
                creds := remote.Credentials
1,367✔
154
                accName := remote.LocalAccount
1,367✔
155
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
1,367✔
156
                // Print notice if
1,367✔
157
                if isSysAccRemote {
1,461✔
158
                        if len(remote.DenyExports) > 0 {
95✔
159
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
160
                        }
1✔
161
                        if len(remote.DenyImports) > 0 {
95✔
162
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
163
                        }
1✔
164
                }
165
                s.mu.Unlock()
1,367✔
166
                if creds != _EMPTY_ {
1,419✔
167
                        contents, err := os.ReadFile(creds)
52✔
168
                        defer wipeSlice(contents)
52✔
169
                        if err != nil {
52✔
170
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
171
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
172
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
173
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
174
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
175
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
176
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
177
                        } else if isSysAccRemote {
56✔
178
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
179
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
180
                                }
1✔
181
                        } else {
48✔
182
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
183
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
184
                                }
6✔
185
                        }
186
                }
187
                return remote
1,367✔
188
        }
189
        for _, r := range remotes {
2,587✔
190
                // We need to call this, even if the leaf is disabled. This is so that
1,367✔
191
                // the number of internal configuration matches the options' remote leaf
1,367✔
192
                // configuration required for configuration reload.
1,367✔
193
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,367✔
194
                if !r.Disabled {
2,733✔
195
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
2,732✔
196
                }
197
        }
198
}
199

200
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
7,397✔
201
        if remote.Disabled {
7,398✔
202
                return false
1✔
203
        }
1✔
204
        for _, ri := range s.getOpts().LeafNode.Remotes {
15,174✔
205
                // FIXME(dlc) - What about auth changes?
7,778✔
206
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
15,174✔
207
                        return true
7,396✔
208
                }
7,396✔
209
        }
210
        return false
×
211
}
212

213
// Ensure that leafnode is properly configured.
214
func validateLeafNode(o *Options) error {
8,207✔
215
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,209✔
216
                return err
2✔
217
        }
2✔
218

219
        // Users can bind to any local account, if its empty we will assume the $G account.
220
        for _, r := range o.LeafNode.Remotes {
9,617✔
221
                if r.LocalAccount == _EMPTY_ {
1,853✔
222
                        r.LocalAccount = globalAccountName
441✔
223
                }
441✔
224
        }
225

226
        // In local config mode, check that leafnode configuration refers to accounts that exist.
227
        if len(o.TrustedOperators) == 0 {
16,090✔
228
                accNames := map[string]struct{}{}
7,885✔
229
                for _, a := range o.Accounts {
16,728✔
230
                        accNames[a.Name] = struct{}{}
8,843✔
231
                }
8,843✔
232
                // global account is always created
233
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,885✔
234
                // in the context of leaf nodes, empty account means global account
7,885✔
235
                accNames[_EMPTY_] = struct{}{}
7,885✔
236
                // system account either exists or, if not disabled, will be created
7,885✔
237
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,211✔
238
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,326✔
239
                }
6,326✔
240
                checkAccountExists := func(accName string, cfgType string) error {
17,188✔
241
                        if _, ok := accNames[accName]; !ok {
9,305✔
242
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
243
                        }
2✔
244
                        return nil
9,301✔
245
                }
246
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,886✔
247
                        return err
1✔
248
                }
1✔
249
                for _, lu := range o.LeafNode.Users {
7,901✔
250
                        if lu.Account == nil { // means global account
27✔
251
                                continue
10✔
252
                        }
253
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
254
                                return err
×
255
                        }
×
256
                }
257
                for _, r := range o.LeafNode.Remotes {
9,295✔
258
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,412✔
259
                                return err
1✔
260
                        }
1✔
261
                }
262
        } else {
320✔
263
                if len(o.LeafNode.Users) != 0 {
321✔
264
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
265
                }
1✔
266
                for _, r := range o.LeafNode.Remotes {
320✔
267
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
268
                                return fmt.Errorf(
1✔
269
                                        "operator mode requires account nkeys in remotes. " +
1✔
270
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
271
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
272
                        }
1✔
273
                }
274
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
319✔
275
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
276
                }
1✔
277
        }
278

279
        // Validate compression settings
280
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,750✔
281
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,555✔
282
                        return err
5✔
283
                }
5✔
284
        }
285

286
        // If a remote has a websocket scheme, all need to have it.
287
        for _, rcfg := range o.LeafNode.Remotes {
9,605✔
288
                // Validate proxy configuration
1,410✔
289
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,416✔
290
                        return err
6✔
291
                }
6✔
292

293
                if len(rcfg.URLs) >= 2 {
1,608✔
294
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
204✔
295
                        for i := 1; i < len(rcfg.URLs); i++ {
645✔
296
                                u := rcfg.URLs[i]
441✔
297
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
448✔
298
                                        ok = false
7✔
299
                                        break
7✔
300
                                }
301
                        }
302
                        if !ok {
211✔
303
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
304
                        }
7✔
305
                }
306
                // Validate compression settings
307
                if rcfg.Compression.Mode != _EMPTY_ {
2,790✔
308
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,398✔
309
                                return err
5✔
310
                        }
5✔
311
                }
312
        }
313

314
        if o.LeafNode.Port == 0 {
12,389✔
315
                return nil
4,212✔
316
        }
4,212✔
317

318
        // If MinVersion is defined, check that it is valid.
319
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,969✔
320
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
321
                        return err
2✔
322
                }
2✔
323
        }
324

325
        // The checks below will be done only when detecting that we are configured
326
        // with gateways. So if an option validation needs to be done regardless,
327
        // it MUST be done before this point!
328

329
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
7,241✔
330
                return nil
3,278✔
331
        }
3,278✔
332
        // If we are here we have both leaf nodes and gateways defined, make sure there
333
        // is a system account defined.
334
        if o.SystemAccount == _EMPTY_ {
686✔
335
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
336
        }
1✔
337
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
684✔
338
                return fmt.Errorf("leafnode: %v", err)
×
339
        }
×
340
        return nil
684✔
341
}
342

343
func checkLeafMinVersionConfig(mv string) error {
8✔
344
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
345
                if err != nil {
6✔
346
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
347
                } else {
4✔
348
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
349
                }
2✔
350
        }
351
        return nil
4✔
352
}
353

354
// Used to validate user names in LeafNode configuration.
355
// - rejects mix of single and multiple users.
356
// - rejects duplicate user names.
357
func validateLeafNodeAuthOptions(o *Options) error {
8,266✔
358
        if len(o.LeafNode.Users) == 0 {
16,505✔
359
                return nil
8,239✔
360
        }
8,239✔
361
        if o.LeafNode.Username != _EMPTY_ {
29✔
362
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
363
        }
2✔
364
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
365
                return fmt.Errorf("can not have a single nkey and a users array")
×
366
        }
×
367
        users := map[string]struct{}{}
25✔
368
        for _, u := range o.LeafNode.Users {
66✔
369
                if _, exists := users[u.Username]; exists {
43✔
370
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
371
                }
2✔
372
                users[u.Username] = struct{}{}
39✔
373
        }
374
        return nil
23✔
375
}
376

377
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
2,004✔
378
        var warnings []string
2,004✔
379

2,004✔
380
        if remote.Proxy.URL == _EMPTY_ {
3,982✔
381
                return warnings, nil
1,978✔
382
        }
1,978✔
383

384
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
385
        if err != nil {
27✔
386
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
387
        }
1✔
388

389
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
390
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
391
        }
2✔
392

393
        if proxyURL.Host == _EMPTY_ {
25✔
394
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
395
        }
2✔
396

397
        if remote.Proxy.Timeout < 0 {
22✔
398
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
399
        }
1✔
400

401
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
402
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
403
        }
4✔
404

405
        if len(remote.URLs) > 0 {
32✔
406
                hasWebSocketURL := false
16✔
407
                hasNonWebSocketURL := false
16✔
408

16✔
409
                for _, remoteURL := range remote.URLs {
33✔
410
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
411
                                hasWebSocketURL = true
13✔
412
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
413
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
414
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
415
                                }
1✔
416
                        } else {
4✔
417
                                hasNonWebSocketURL = true
4✔
418
                        }
4✔
419
                }
420

421
                if !hasWebSocketURL {
18✔
422
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
423
                } else if hasNonWebSocketURL {
16✔
424
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
425
                }
1✔
426
        }
427

428
        return warnings, nil
15✔
429
}
430

431
// Update remote LeafNode TLS configurations after a config reload.
432
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
8✔
433
        max := len(opts.LeafNode.Remotes)
8✔
434
        if max == 0 {
8✔
435
                return
×
436
        }
×
437

438
        s.mu.RLock()
8✔
439
        defer s.mu.RUnlock()
8✔
440

8✔
441
        // Changes in the list of remote leaf nodes is not supported.
8✔
442
        // However, make sure that we don't go over the arrays.
8✔
443
        if len(s.leafRemoteCfgs) < max {
8✔
444
                max = len(s.leafRemoteCfgs)
×
445
        }
×
446
        for i := 0; i < max; i++ {
21✔
447
                ro := opts.LeafNode.Remotes[i]
13✔
448
                cfg := s.leafRemoteCfgs[i]
13✔
449
                if ro.TLSConfig != nil {
15✔
450
                        cfg.Lock()
2✔
451
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
452
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
453
                        cfg.Unlock()
2✔
454
                }
2✔
455
        }
456
}
457

458
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
248✔
459
        delay := s.getOpts().LeafNode.ReconnectInterval
248✔
460
        select {
248✔
461
        case <-time.After(delay):
185✔
462
        case <-s.quitCh:
63✔
463
                s.grWG.Done()
63✔
464
                return
63✔
465
        }
466
        s.connectToRemoteLeafNode(remote, false)
185✔
467
}
468

469
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
470
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,367✔
471
        cfg := &leafNodeCfg{
1,367✔
472
                RemoteLeafOpts: remote,
1,367✔
473
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,367✔
474
        }
1,367✔
475
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,375✔
476
                perms := &Permissions{}
8✔
477
                if len(remote.DenyExports) > 0 {
16✔
478
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
479
                }
8✔
480
                if len(remote.DenyImports) > 0 {
15✔
481
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
482
                }
7✔
483
                cfg.perms = perms
8✔
484
        }
485
        // Start with the one that is configured. We will add to this
486
        // array when receiving async leafnode INFOs.
487
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,367✔
488
        // If allowed to randomize, do it on our copy of URLs
1,367✔
489
        if !remote.NoRandomize {
2,732✔
490
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,768✔
491
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
403✔
492
                })
403✔
493
        }
494
        // If we are TLS make sure we save off a proper servername if possible.
495
        // Do same for user/password since we may need them to connect to
496
        // a bare URL that we get from INFO protocol.
497
        for _, u := range cfg.urls {
3,167✔
498
                cfg.saveTLSHostname(u)
1,800✔
499
                cfg.saveUserPassword(u)
1,800✔
500
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,800✔
501
                // config, mark that we should be using TLS anyway.
1,800✔
502
                if !cfg.TLS && isWSSURL(u) {
1,801✔
503
                        cfg.TLS = true
1✔
504
                }
1✔
505
        }
506
        return cfg
1,367✔
507
}
508

509
// Will pick an URL from the list of available URLs.
510
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,583✔
511
        cfg.Lock()
6,583✔
512
        defer cfg.Unlock()
6,583✔
513
        // If the current URL is the first in the list and we have more than
6,583✔
514
        // one URL, then move that one to end of the list.
6,583✔
515
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
9,755✔
516
                first := cfg.urls[0]
3,172✔
517
                copy(cfg.urls, cfg.urls[1:])
3,172✔
518
                cfg.urls[len(cfg.urls)-1] = first
3,172✔
519
        }
3,172✔
520
        cfg.curURL = cfg.urls[0]
6,583✔
521
        return cfg.curURL
6,583✔
522
}
523

524
// Returns the current URL
525
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
75✔
526
        cfg.RLock()
75✔
527
        defer cfg.RUnlock()
75✔
528
        return cfg.curURL
75✔
529
}
75✔
530

531
// Returns how long the server should wait before attempting
532
// to solicit a remote leafnode connection.
533
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,553✔
534
        cfg.RLock()
1,553✔
535
        delay := cfg.connDelay
1,553✔
536
        cfg.RUnlock()
1,553✔
537
        return delay
1,553✔
538
}
1,553✔
539

540
// Sets the connect delay.
541
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
154✔
542
        cfg.Lock()
154✔
543
        cfg.connDelay = delay
154✔
544
        cfg.Unlock()
154✔
545
}
154✔
546

547
// Ensure that non-exported options (used in tests) have
548
// been properly set.
549
func (s *Server) setLeafNodeNonExportedOptions() {
7,001✔
550
        opts := s.getOpts()
7,001✔
551
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
7,001✔
552
        if s.leafNodeOpts.dialTimeout == 0 {
14,001✔
553
                // Use same timeouts as routes for now.
7,000✔
554
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
7,000✔
555
        }
7,000✔
556
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
7,001✔
557
        if s.leafNodeOpts.resolver == nil {
13,999✔
558
                s.leafNodeOpts.resolver = net.DefaultResolver
6,998✔
559
        }
6,998✔
560
}
561

562
const sharedSysAccDelay = 250 * time.Millisecond
563

564
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
565
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
566
        proxyAddr, err := url.Parse(proxyURL)
11✔
567
        if err != nil {
11✔
568
                // This should not happen since proxy URL is validated during configuration parsing
×
569
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
570
        }
×
571

572
        // Connect to the proxy server
573
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
574
        if err != nil {
11✔
575
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
576
        }
×
577

578
        // Set deadline for the entire proxy handshake
579
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
580
                conn.Close()
×
581
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
582
        }
×
583

584
        req := &http.Request{
11✔
585
                Method: http.MethodConnect,
11✔
586
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
587
                Host:   targetHost,
11✔
588
                Header: make(http.Header),
11✔
589
        }
11✔
590

11✔
591
        // Add proxy authentication if provided
11✔
592
        if username != "" && password != "" {
13✔
593
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
594
        }
2✔
595

596
        if err := req.Write(conn); err != nil {
11✔
597
                conn.Close()
×
598
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
599
        }
×
600

601
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
602
        if err != nil {
11✔
603
                conn.Close()
×
604
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
605
        }
×
606

607
        if resp.StatusCode != http.StatusOK {
12✔
608
                resp.Body.Close()
1✔
609
                conn.Close()
1✔
610
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
611
        }
1✔
612

613
        // Close the response body
614
        resp.Body.Close()
10✔
615

10✔
616
        // Clear the deadline now that we've finished the proxy handshake
10✔
617
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
618
                conn.Close()
×
619
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
620
        }
×
621

622
        return conn, nil
10✔
623
}
624

625
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
1,553✔
626
        defer s.grWG.Done()
1,553✔
627

1,553✔
628
        if remote == nil || len(remote.URLs) == 0 {
1,553✔
629
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
630
                return
×
631
        }
×
632

633
        opts := s.getOpts()
1,553✔
634
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,553✔
635
        s.mu.RLock()
1,553✔
636
        dialTimeout := s.leafNodeOpts.dialTimeout
1,553✔
637
        resolver := s.leafNodeOpts.resolver
1,553✔
638
        var isSysAcc bool
1,553✔
639
        if s.eventsEnabled() {
3,071✔
640
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,518✔
641
        }
1,518✔
642
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,553✔
643
        s.mu.RUnlock()
1,553✔
644

1,553✔
645
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,553✔
646
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,623✔
647
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
70✔
648
                remote.setConnectDelay(sharedSysAccDelay)
70✔
649
        }
70✔
650

651
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,631✔
652
                select {
78✔
653
                case <-time.After(connDelay):
72✔
654
                case <-s.quitCh:
6✔
655
                        return
6✔
656
                }
657
                remote.setConnectDelay(0)
72✔
658
        }
659

660
        var conn net.Conn
1,547✔
661

1,547✔
662
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,547✔
663

1,547✔
664
        // Capture proxy configuration once before the loop with proper locking
1,547✔
665
        remote.RLock()
1,547✔
666
        proxyURL := remote.Proxy.URL
1,547✔
667
        proxyUsername := remote.Proxy.Username
1,547✔
668
        proxyPassword := remote.Proxy.Password
1,547✔
669
        proxyTimeout := remote.Proxy.Timeout
1,547✔
670
        remote.RUnlock()
1,547✔
671

1,547✔
672
        // Set default proxy timeout if not specified
1,547✔
673
        if proxyTimeout == 0 {
3,086✔
674
                proxyTimeout = dialTimeout
1,539✔
675
        }
1,539✔
676

677
        attempts := 0
1,547✔
678

1,547✔
679
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
8,130✔
680
                rURL := remote.pickNextURL()
6,583✔
681
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,583✔
682
                if err == nil {
13,159✔
683
                        var ipStr string
6,576✔
684
                        if url != rURL.Host {
6,645✔
685
                                ipStr = fmt.Sprintf(" (%s)", url)
69✔
686
                        }
69✔
687
                        // Some test may want to disable remotes from connecting
688
                        if s.isLeafConnectDisabled() {
6,704✔
689
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
128✔
690
                                err = ErrLeafNodeDisabled
128✔
691
                        } else {
6,576✔
692
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,448✔
693

6,448✔
694
                                // Check if proxy is configured
6,448✔
695
                                if proxyURL != _EMPTY_ {
6,456✔
696
                                        targetHost := rURL.Host
8✔
697
                                        // If URL doesn't include port, add the default port for the scheme
8✔
698
                                        if rURL.Port() == _EMPTY_ {
8✔
699
                                                defaultPort := "80"
×
700
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
701
                                                        defaultPort = "443"
×
702
                                                }
×
703
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
704
                                        }
705

706
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
707
                                } else {
6,440✔
708
                                        // Direct connection
6,440✔
709
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,440✔
710
                                }
6,440✔
711
                        }
712
                }
713
                if err != nil {
12,353✔
714
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,770✔
715
                        delay := reconnectDelay + jitter
5,770✔
716
                        attempts++
5,770✔
717
                        if s.shouldReportConnectErr(firstConnect, attempts) {
9,691✔
718
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,921✔
719
                        } else {
5,770✔
720
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,849✔
721
                        }
1,849✔
722
                        remote.Lock()
5,770✔
723
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,770✔
724
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,778✔
725
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
726
                                        s.checkJetStreamMigrate(remote)
8✔
727
                                })
8✔
728
                        }
729
                        remote.Unlock()
5,770✔
730
                        select {
5,770✔
731
                        case <-s.quitCh:
721✔
732
                                remote.cancelMigrateTimer()
721✔
733
                                return
721✔
734
                        case <-time.After(delay):
5,048✔
735
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
5,048✔
736
                                // This will be used if JetStreamClusterMigrateDelay was not set
5,048✔
737
                                if jetstreamMigrateDelay == 0 {
10,020✔
738
                                        s.checkJetStreamMigrate(remote)
4,972✔
739
                                }
4,972✔
740
                                continue
5,048✔
741
                        }
742
                }
743
                remote.cancelMigrateTimer()
813✔
744
                if !s.remoteLeafNodeStillValid(remote) {
813✔
745
                        conn.Close()
×
746
                        return
×
747
                }
×
748

749
                // We have a connection here to a remote server.
750
                // Go ahead and create our leaf node and return.
751
                s.createLeafNode(conn, rURL, remote, nil)
813✔
752

813✔
753
                // Clear any observer states if we had them.
813✔
754
                s.clearObserverState(remote)
813✔
755

813✔
756
                return
813✔
757
        }
758
}
759

760
func (cfg *leafNodeCfg) cancelMigrateTimer() {
1,534✔
761
        cfg.Lock()
1,534✔
762
        stopAndClearTimer(&cfg.jsMigrateTimer)
1,534✔
763
        cfg.Unlock()
1,534✔
764
}
1,534✔
765

766
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
767
func (s *Server) clearObserverState(remote *leafNodeCfg) {
813✔
768
        s.mu.RLock()
813✔
769
        accName := remote.LocalAccount
813✔
770
        s.mu.RUnlock()
813✔
771

813✔
772
        acc, err := s.LookupAccount(accName)
813✔
773
        if err != nil {
815✔
774
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
775
                return
2✔
776
        }
2✔
777

778
        acc.jscmMu.Lock()
811✔
779
        defer acc.jscmMu.Unlock()
811✔
780

811✔
781
        // Walk all streams looking for any clustered stream, skip otherwise.
811✔
782
        for _, mset := range acc.streams() {
832✔
783
                node := mset.raftNode()
21✔
784
                if node == nil {
34✔
785
                        // Not R>1
13✔
786
                        continue
13✔
787
                }
788
                // Check consumers
789
                for _, o := range mset.getConsumers() {
10✔
790
                        if n := o.raftNode(); n != nil {
4✔
791
                                // Ensure we can become a leader again.
2✔
792
                                n.SetObserver(false)
2✔
793
                        }
2✔
794
                }
795
                // Ensure we can not become a leader again.
796
                node.SetObserver(false)
8✔
797
        }
798
}
799

800
// Check to see if we should migrate any assets from this account.
801
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,980✔
802
        s.mu.RLock()
4,980✔
803
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,980✔
804
        s.mu.RUnlock()
4,980✔
805

4,980✔
806
        if !shouldMigrate {
9,900✔
807
                return
4,920✔
808
        }
4,920✔
809

810
        acc, err := s.LookupAccount(accName)
60✔
811
        if err != nil {
60✔
812
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
813
                return
×
814
        }
×
815

816
        acc.jscmMu.Lock()
60✔
817
        defer acc.jscmMu.Unlock()
60✔
818

60✔
819
        // Walk all streams looking for any clustered stream, skip otherwise.
60✔
820
        // If we are the leader force stepdown.
60✔
821
        for _, mset := range acc.streams() {
90✔
822
                node := mset.raftNode()
30✔
823
                if node == nil {
30✔
824
                        // Not R>1
×
825
                        continue
×
826
                }
827
                // Collect any consumers
828
                for _, o := range mset.getConsumers() {
49✔
829
                        if n := o.raftNode(); n != nil {
38✔
830
                                n.StepDown()
19✔
831
                                // Ensure we can not become a leader while in this state.
19✔
832
                                n.SetObserver(true)
19✔
833
                        }
19✔
834
                }
835
                // Stepdown if this stream was leader.
836
                node.StepDown()
30✔
837
                // Ensure we can not become a leader while in this state.
30✔
838
                node.SetObserver(true)
30✔
839
        }
840
}
841

842
// Helper for checking.
843
func (s *Server) isLeafConnectDisabled() bool {
6,576✔
844
        s.mu.RLock()
6,576✔
845
        defer s.mu.RUnlock()
6,576✔
846
        return s.leafDisableConnect
6,576✔
847
}
6,576✔
848

849
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
850
// come from the server we connect to.
851
//
852
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
853
// However, this was causing failures for users that did not set the scheme (and
854
// their remote connections did not have a tls{} block).
855
// We now save the host name regardless in case the remote returns an INFO indicating
856
// that TLS is required.
857
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,459✔
858
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,479✔
859
                cfg.tlsName = u.Hostname()
20✔
860
        }
20✔
861
}
862

863
// Save off the username/password for when we connect using a bare URL
864
// that we get from the INFO protocol.
865
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,800✔
866
        if cfg.username == _EMPTY_ && u.User != nil {
2,106✔
867
                cfg.username = u.User.Username()
306✔
868
                cfg.password, _ = u.User.Password()
306✔
869
        }
306✔
870
}
871

872
// This starts the leafnode accept loop in a go routine, unless it
873
// is detected that the server has already been shutdown.
874
func (s *Server) startLeafNodeAcceptLoop() {
3,944✔
875
        // Snapshot server options.
3,944✔
876
        opts := s.getOpts()
3,944✔
877

3,944✔
878
        port := opts.LeafNode.Port
3,944✔
879
        if port == -1 {
7,713✔
880
                port = 0
3,769✔
881
        }
3,769✔
882

883
        if s.isShuttingDown() {
3,944✔
884
                return
×
885
        }
×
886

887
        s.mu.Lock()
3,944✔
888
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,944✔
889
        l, e := natsListen("tcp", hp)
3,944✔
890
        s.leafNodeListenerErr = e
3,944✔
891
        if e != nil {
3,944✔
892
                s.mu.Unlock()
×
893
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
894
                return
×
895
        }
×
896

897
        s.Noticef("Listening for leafnode connections on %s",
3,944✔
898
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,944✔
899

3,944✔
900
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,944✔
901
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,944✔
902
        // Do not set compression in this Info object, it would possibly cause
3,944✔
903
        // issues when sending asynchronous INFO to the remote.
3,944✔
904
        info := Info{
3,944✔
905
                ID:            s.info.ID,
3,944✔
906
                Name:          s.info.Name,
3,944✔
907
                Version:       s.info.Version,
3,944✔
908
                GitCommit:     gitCommit,
3,944✔
909
                GoVersion:     runtime.Version(),
3,944✔
910
                AuthRequired:  true,
3,944✔
911
                TLSRequired:   tlsRequired,
3,944✔
912
                TLSVerify:     tlsVerify,
3,944✔
913
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,944✔
914
                Headers:       s.supportsHeaders(),
3,944✔
915
                JetStream:     opts.JetStream,
3,944✔
916
                Domain:        opts.JetStreamDomain,
3,944✔
917
                Proto:         s.getServerProto(),
3,944✔
918
                InfoOnConnect: true,
3,944✔
919
                JSApiLevel:    JSApiLevel,
3,944✔
920
        }
3,944✔
921
        // If we have selected a random port...
3,944✔
922
        if port == 0 {
7,713✔
923
                // Write resolved port back to options.
3,769✔
924
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,769✔
925
        }
3,769✔
926

927
        s.leafNodeInfo = info
3,944✔
928
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,944✔
929
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,944✔
930
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
931
                l.Close()
×
932
                s.mu.Unlock()
×
933
                return
×
934
        }
×
935
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,944✔
936
        s.generateLeafNodeInfoJSON()
3,944✔
937

3,944✔
938
        // Setup state that can enable shutdown
3,944✔
939
        s.leafNodeListener = l
3,944✔
940

3,944✔
941
        // As of now, a server that does not have remotes configured would
3,944✔
942
        // never solicit a connection, so we should not have to warn if
3,944✔
943
        // InsecureSkipVerify is set in main LeafNodes config (since
3,944✔
944
        // this TLS setting matters only when soliciting a connection).
3,944✔
945
        // Still, warn if insecure is set in any of LeafNode block.
3,944✔
946
        // We need to check remotes, even if tls is not required on accept.
3,944✔
947
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,944✔
948
        if !warn {
7,886✔
949
                for _, r := range opts.LeafNode.Remotes {
4,135✔
950
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
193✔
951
                                warn = true
×
952
                                break
×
953
                        }
954
                }
955
        }
956
        if warn {
3,946✔
957
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
958
        }
2✔
959
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,793✔
960
        s.mu.Unlock()
3,944✔
961
}
962

963
// RegEx to match a creds file with user JWT and Seed.
964
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
965

966
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
967
// Lock should be held entering here.
968
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
687✔
969
        // We support basic user/pass and operator based user JWT with signatures.
687✔
970
        cinfo := leafConnectInfo{
687✔
971
                Version:       VERSION,
687✔
972
                ID:            c.srv.info.ID,
687✔
973
                Domain:        c.srv.info.Domain,
687✔
974
                Name:          c.srv.info.Name,
687✔
975
                Hub:           c.leaf.remote.Hub,
687✔
976
                Cluster:       clusterName,
687✔
977
                Headers:       headers,
687✔
978
                JetStream:     c.acc.jetStreamConfigured(),
687✔
979
                DenyPub:       c.leaf.remote.DenyImports,
687✔
980
                Compression:   c.leaf.compression,
687✔
981
                RemoteAccount: c.acc.GetName(),
687✔
982
                Proto:         c.srv.getServerProto(),
687✔
983
                Isolate:       c.leaf.remote.RequestIsolation,
687✔
984
        }
687✔
985

687✔
986
        // If a signature callback is specified, this takes precedence over anything else.
687✔
987
        if cb := c.leaf.remote.SignatureCB; cb != nil {
695✔
988
                nonce := c.nonce
8✔
989
                c.mu.Unlock()
8✔
990
                jwt, sigraw, err := cb(nonce)
8✔
991
                c.mu.Lock()
8✔
992
                if err == nil && c.isClosed() {
9✔
993
                        err = ErrConnectionClosed
1✔
994
                }
1✔
995
                if err != nil {
10✔
996
                        c.Errorf("Error signing the nonce: %v", err)
2✔
997
                        return err
2✔
998
                }
2✔
999
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
6✔
1000
                cinfo.JWT, cinfo.Sig = jwt, sig
6✔
1001

1002
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
735✔
1003
                // Check for credentials first, that will take precedence..
56✔
1004
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1005
                contents, err := os.ReadFile(creds)
56✔
1006
                if err != nil {
56✔
1007
                        c.Errorf("%v", err)
×
1008
                        return err
×
1009
                }
×
1010
                defer wipeSlice(contents)
56✔
1011
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1012
                if len(items) < 2 {
56✔
1013
                        c.Errorf("Credentials file malformed")
×
1014
                        return err
×
1015
                }
×
1016
                // First result should be the user JWT.
1017
                // We copy here so that the file containing the seed will be wiped appropriately.
1018
                raw := items[0][1]
56✔
1019
                tmp := make([]byte, len(raw))
56✔
1020
                copy(tmp, raw)
56✔
1021
                // Seed is second item.
56✔
1022
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1023
                if err != nil {
56✔
1024
                        c.Errorf("Credentials file has malformed seed")
×
1025
                        return err
×
1026
                }
×
1027
                // Wipe our key on exit.
1028
                defer kp.Wipe()
56✔
1029

56✔
1030
                sigraw, _ := kp.Sign(c.nonce)
56✔
1031
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1032
                cinfo.JWT = bytesToString(tmp)
56✔
1033
                cinfo.Sig = sig
56✔
1034
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
628✔
1035
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1036
                if err != nil {
5✔
1037
                        c.Errorf("Remote nkey has malformed seed")
×
1038
                        return err
×
1039
                }
×
1040
                // Wipe our key on exit.
1041
                defer kp.Wipe()
5✔
1042
                sigraw, _ := kp.Sign(c.nonce)
5✔
1043
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1044
                pkey, _ := kp.PublicKey()
5✔
1045
                cinfo.Nkey = pkey
5✔
1046
                cinfo.Sig = sig
5✔
1047
        }
1048
        // In addition, and this is to allow auth callout, set user/password or
1049
        // token if applicable.
1050
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
1,014✔
1051
                cinfo.User = userInfo.Username()
329✔
1052
                var ok bool
329✔
1053
                cinfo.Pass, ok = userInfo.Password()
329✔
1054
                // For backward compatibility, if only username is provided, set both
329✔
1055
                // Token and User, not just Token.
329✔
1056
                if !ok {
338✔
1057
                        cinfo.Token = cinfo.User
9✔
1058
                }
9✔
1059
        } else if c.leaf.remote.username != _EMPTY_ {
363✔
1060
                cinfo.User = c.leaf.remote.username
7✔
1061
                cinfo.Pass = c.leaf.remote.password
7✔
1062
                // For backward compatibility, if only username is provided, set both
7✔
1063
                // Token and User, not just Token.
7✔
1064
                if cinfo.Pass == _EMPTY_ {
7✔
1065
                        cinfo.Token = cinfo.User
×
1066
                }
×
1067
        }
1068
        b, err := json.Marshal(cinfo)
685✔
1069
        if err != nil {
685✔
1070
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1071
                return err
×
1072
        }
×
1073
        // Although this call is made before the writeLoop is created,
1074
        // we don't really need to send in place. The protocol will be
1075
        // sent out by the writeLoop.
1076
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
685✔
1077
        return nil
685✔
1078
}
1079

1080
// Makes a deep copy of the LeafNode Info structure.
1081
// The server lock is held on entry.
1082
func (s *Server) copyLeafNodeInfo() *Info {
2,764✔
1083
        clone := s.leafNodeInfo
2,764✔
1084
        // Copy the array of urls.
2,764✔
1085
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
5,012✔
1086
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,248✔
1087
        }
2,248✔
1088
        return &clone
2,764✔
1089
}
1090

1091
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1092
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1093
// Returns a boolean indicating if the URL was added or not.
1094
// Server lock is held on entry
1095
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,896✔
1096
        if s.leafURLsMap.addUrl(urlStr) {
15,787✔
1097
                s.generateLeafNodeInfoJSON()
7,891✔
1098
                return true
7,891✔
1099
        }
7,891✔
1100
        return false
5✔
1101
}
1102

1103
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1104
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1105
// Returns a boolean indicating if the URL was removed or not.
1106
// Server lock is held on entry.
1107
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,896✔
1108
        // Don't need to do this if we are removing the route connection because
7,896✔
1109
        // we are shuting down...
7,896✔
1110
        if s.isShuttingDown() {
11,997✔
1111
                return false
4,101✔
1112
        }
4,101✔
1113
        if s.leafURLsMap.removeUrl(urlStr) {
7,586✔
1114
                s.generateLeafNodeInfoJSON()
3,791✔
1115
                return true
3,791✔
1116
        }
3,791✔
1117
        return false
4✔
1118
}
1119

1120
// Server lock is held on entry
1121
func (s *Server) generateLeafNodeInfoJSON() {
15,626✔
1122
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,626✔
1123
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,626✔
1124
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,626✔
1125
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,626✔
1126
}
15,626✔
1127

1128
// Sends an async INFO protocol so that the connected servers can update
1129
// their list of LeafNode urls.
1130
func (s *Server) sendAsyncLeafNodeInfo() {
11,682✔
1131
        for _, c := range s.leafs {
11,781✔
1132
                c.mu.Lock()
99✔
1133
                c.enqueueProto(s.leafNodeInfoJSON)
99✔
1134
                c.mu.Unlock()
99✔
1135
        }
99✔
1136
}
1137

1138
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1139
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,691✔
1140
        // Snapshot server options.
1,691✔
1141
        opts := s.getOpts()
1,691✔
1142

1,691✔
1143
        maxPay := int32(opts.MaxPayload)
1,691✔
1144
        maxSubs := int32(opts.MaxSubs)
1,691✔
1145
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,691✔
1146
        if maxSubs == 0 {
3,381✔
1147
                maxSubs = -1
1,690✔
1148
        }
1,690✔
1149
        now := time.Now().UTC()
1,691✔
1150

1,691✔
1151
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,691✔
1152
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,691✔
1153
        c.leaf = &leaf{}
1,691✔
1154

1,691✔
1155
        // If the leafnode subject interest should be isolated, flag it here.
1,691✔
1156
        s.optsMu.RLock()
1,691✔
1157
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,502✔
1158
                c.leaf.isolated = remote.LocalIsolation
811✔
1159
        }
811✔
1160
        s.optsMu.RUnlock()
1,691✔
1161

1,691✔
1162
        // For accepted LN connections, ws will be != nil if it was accepted
1,691✔
1163
        // through the Websocket port.
1,691✔
1164
        c.ws = ws
1,691✔
1165

1,691✔
1166
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,691✔
1167
        // a remote Leaf Node connection as a websocket connection.
1,691✔
1168
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,741✔
1169
                remote.RLock()
50✔
1170
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1171
                remote.RUnlock()
50✔
1172
        }
50✔
1173

1174
        // Determines if we are soliciting the connection or not.
1175
        var solicited bool
1,691✔
1176
        var acc *Account
1,691✔
1177
        var remoteSuffix string
1,691✔
1178
        if remote != nil {
2,504✔
1179
                // For now, if lookup fails, we will constantly try
813✔
1180
                // to recreate this LN connection.
813✔
1181
                lacc := remote.LocalAccount
813✔
1182
                var err error
813✔
1183
                acc, err = s.LookupAccount(lacc)
813✔
1184
                if err != nil {
815✔
1185
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1186
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1187
                        // remote needs to be set or retry won't happen
2✔
1188
                        c.leaf.remote = remote
2✔
1189
                        c.closeConnection(MissingAccount)
2✔
1190
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1191
                        return nil
2✔
1192
                }
2✔
1193
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
811✔
1194
        }
1195

1196
        c.mu.Lock()
1,689✔
1197
        c.initClient()
1,689✔
1198
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,689✔
1199

1,689✔
1200
        var (
1,689✔
1201
                tlsFirst         bool
1,689✔
1202
                tlsFirstFallback time.Duration
1,689✔
1203
                infoTimeout      time.Duration
1,689✔
1204
        )
1,689✔
1205
        if remote != nil {
2,500✔
1206
                solicited = true
811✔
1207
                remote.Lock()
811✔
1208
                c.leaf.remote = remote
811✔
1209
                c.setPermissions(remote.perms)
811✔
1210
                if !c.leaf.remote.Hub {
1,606✔
1211
                        c.leaf.isSpoke = true
795✔
1212
                }
795✔
1213
                tlsFirst = remote.TLSHandshakeFirst
811✔
1214
                infoTimeout = remote.FirstInfoTimeout
811✔
1215
                remote.Unlock()
811✔
1216
                c.acc = acc
811✔
1217
        } else {
878✔
1218
                c.flags.set(expectConnect)
878✔
1219
                if ws != nil {
907✔
1220
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1221
                }
29✔
1222
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
878✔
1223
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
879✔
1224
                        tlsFirstFallback = f
1✔
1225
                }
1✔
1226
        }
1227
        c.mu.Unlock()
1,689✔
1228

1,689✔
1229
        var nonce [nonceLen]byte
1,689✔
1230
        var info *Info
1,689✔
1231

1,689✔
1232
        // Grab this before the client lock below.
1,689✔
1233
        if !solicited {
2,567✔
1234
                // Grab server variables
878✔
1235
                s.mu.Lock()
878✔
1236
                info = s.copyLeafNodeInfo()
878✔
1237
                // For tests that want to simulate old servers, do not set the compression
878✔
1238
                // on the INFO protocol if configured with CompressionNotSupported.
878✔
1239
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,755✔
1240
                        info.Compression = cm
877✔
1241
                }
877✔
1242
                // We always send a nonce for LEAF connections. Do not change that without
1243
                // taking into account presence of proxy trusted keys.
1244
                s.generateNonce(nonce[:])
878✔
1245
                s.mu.Unlock()
878✔
1246
        }
1247

1248
        // Grab lock
1249
        c.mu.Lock()
1,689✔
1250

1,689✔
1251
        var preBuf []byte
1,689✔
1252
        if solicited {
2,500✔
1253
                // For websocket connection, we need to send an HTTP request,
811✔
1254
                // and get the response before starting the readLoop to get
811✔
1255
                // the INFO, etc..
811✔
1256
                if c.isWebsocket() {
861✔
1257
                        var err error
50✔
1258
                        var closeReason ClosedState
50✔
1259

50✔
1260
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1261
                        if err != nil {
71✔
1262
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1263
                                c.mu.Unlock()
21✔
1264
                                if closeReason != 0 {
38✔
1265
                                        c.closeConnection(closeReason)
17✔
1266
                                }
17✔
1267
                                return nil
21✔
1268
                        }
1269
                } else {
761✔
1270
                        // If configured to do TLS handshake first
761✔
1271
                        if tlsFirst {
765✔
1272
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1273
                                        c.mu.Unlock()
1✔
1274
                                        return nil
1✔
1275
                                }
1✔
1276
                        }
1277
                        // We need to wait for the info, but not for too long.
1278
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
760✔
1279
                }
1280

1281
                // We will process the INFO from the readloop and finish by
1282
                // sending the CONNECT and finish registration later.
1283
        } else {
878✔
1284
                // Send our info to the other side.
878✔
1285
                // Remember the nonce we sent here for signatures, etc.
878✔
1286
                c.nonce = make([]byte, nonceLen)
878✔
1287
                copy(c.nonce, nonce[:])
878✔
1288
                info.Nonce = bytesToString(c.nonce)
878✔
1289
                info.CID = c.cid
878✔
1290
                proto := generateInfoJSON(info)
878✔
1291

878✔
1292
                var pre []byte
878✔
1293
                // We need first to check for "TLS First" fallback delay.
878✔
1294
                if tlsFirstFallback > 0 {
879✔
1295
                        // We wait and see if we are getting any data. Since we did not send
1✔
1296
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1297
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1298
                        // if it is a rogue agent and not an actual client performing the
1✔
1299
                        // TLS handshake, the error will be detected when performing the
1✔
1300
                        // handshake on our side.
1✔
1301
                        pre = make([]byte, 4)
1✔
1302
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1303
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1304
                        c.nc.SetReadDeadline(time.Time{})
1✔
1305
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1306
                        // with the TLS handshake.
1✔
1307
                        if n > 0 {
1✔
1308
                                pre = pre[:n]
×
1309
                        } else {
1✔
1310
                                // We did not get anything so we will send the INFO protocol.
1✔
1311
                                pre = nil
1✔
1312
                                // Set the boolean to false for the rest of the function.
1✔
1313
                                tlsFirst = false
1✔
1314
                        }
1✔
1315
                }
1316

1317
                if !tlsFirst {
1,751✔
1318
                        // We have to send from this go routine because we may
873✔
1319
                        // have to block for TLS handshake before we start our
873✔
1320
                        // writeLoop go routine. The other side needs to receive
873✔
1321
                        // this before it can initiate the TLS handshake..
873✔
1322
                        c.sendProtoNow(proto)
873✔
1323

873✔
1324
                        // The above call could have marked the connection as closed (due to TCP error).
873✔
1325
                        if c.isClosed() {
873✔
1326
                                c.mu.Unlock()
×
1327
                                c.closeConnection(WriteError)
×
1328
                                return nil
×
1329
                        }
×
1330
                }
1331

1332
                // Check to see if we need to spin up TLS.
1333
                if !c.isWebsocket() && info.TLSRequired {
951✔
1334
                        // If we have a prebuffer create a multi-reader.
73✔
1335
                        if len(pre) > 0 {
73✔
1336
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1337
                        }
×
1338
                        // Perform server-side TLS handshake.
1339
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
119✔
1340
                                c.mu.Unlock()
46✔
1341
                                return nil
46✔
1342
                        }
46✔
1343
                }
1344

1345
                // If the user wants the TLS handshake to occur first, now that it is
1346
                // done, send the INFO protocol.
1347
                if tlsFirst {
835✔
1348
                        c.flags.set(didTLSFirst)
3✔
1349
                        c.sendProtoNow(proto)
3✔
1350
                        if c.isClosed() {
3✔
1351
                                c.mu.Unlock()
×
1352
                                c.closeConnection(WriteError)
×
1353
                                return nil
×
1354
                        }
×
1355
                }
1356

1357
                // Leaf nodes will always require a CONNECT to let us know
1358
                // when we are properly bound to an account.
1359
                //
1360
                // If compression is configured, we can't set the authTimer here because
1361
                // it would cause the parser to fail any incoming protocol that is not a
1362
                // CONNECT (and we need to exchange INFO protocols for compression
1363
                // negotiation). So instead, use the ping timer until we are done with
1364
                // negotiation and can set the auth timer.
1365
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
832✔
1366
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,449✔
1367
                        c.ping.tmr = time.AfterFunc(timeout, func() {
617✔
1368
                                c.authTimeout()
×
1369
                        })
×
1370
                } else {
215✔
1371
                        c.setAuthTimer(timeout)
215✔
1372
                }
215✔
1373
        }
1374

1375
        // Keep track in case server is shutdown before we can successfully register.
1376
        if !s.addToTempClients(c.cid, c) {
1,622✔
1377
                c.mu.Unlock()
1✔
1378
                c.setNoReconnect()
1✔
1379
                c.closeConnection(ServerShutdown)
1✔
1380
                return nil
1✔
1381
        }
1✔
1382

1383
        // Spin up the read loop.
1384
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,240✔
1385

1386
        // We will spin the write loop for solicited connections only
1387
        // when processing the INFO and after switching to TLS if needed.
1388
        if !solicited {
2,452✔
1389
                s.startGoRoutine(func() { c.writeLoop() })
1,664✔
1390
        }
1391

1392
        c.mu.Unlock()
1,620✔
1393

1,620✔
1394
        return c
1,620✔
1395
}
1396

1397
// Will perform the client-side TLS handshake if needed. Assumes that this
1398
// is called by the solicit side (remote will be non nil). Returns `true`
1399
// if TLS is required, `false` otherwise.
1400
// Lock held on entry.
1401
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,968✔
1402
        // Check if TLS is required and gather TLS config variables.
1,968✔
1403
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,968✔
1404
        if !tlsRequired {
3,861✔
1405
                return false, nil
1,893✔
1406
        }
1,893✔
1407

1408
        // If TLS required, peform handshake.
1409
        // Get the URL that was used to connect to the remote server.
1410
        rURL := remote.getCurrentURL()
75✔
1411

75✔
1412
        // Perform the client-side TLS handshake.
75✔
1413
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
109✔
1414
                // Check if we need to reset the remote's TLS name.
34✔
1415
                if resetTLSName {
34✔
1416
                        remote.Lock()
×
1417
                        remote.tlsName = _EMPTY_
×
1418
                        remote.Unlock()
×
1419
                }
×
1420
                return false, err
34✔
1421
        }
1422
        return true, nil
41✔
1423
}
1424

1425
func (c *client) processLeafnodeInfo(info *Info) {
2,717✔
1426
        c.mu.Lock()
2,717✔
1427
        if c.leaf == nil || c.isClosed() {
2,717✔
1428
                c.mu.Unlock()
×
1429
                return
×
1430
        }
×
1431
        s := c.srv
2,717✔
1432
        opts := s.getOpts()
2,717✔
1433
        remote := c.leaf.remote
2,717✔
1434
        didSolicit := remote != nil
2,717✔
1435
        firstINFO := !c.flags.isSet(infoReceived)
2,717✔
1436

2,717✔
1437
        // In case of websocket, the TLS handshake has been already done.
2,717✔
1438
        // So check only for non websocket connections and for configurations
2,717✔
1439
        // where the TLS Handshake was not done first.
2,717✔
1440
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,631✔
1441
                // If the server requires TLS, we need to set this in the remote
1,914✔
1442
                // otherwise if there is no TLS configuration block for the remote,
1,914✔
1443
                // the solicit side will not attempt to perform the TLS handshake.
1,914✔
1444
                if firstINFO && info.TLSRequired {
1,973✔
1445
                        // Check for TLS/proxy configuration mismatch
59✔
1446
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
59✔
1447
                                c.mu.Unlock()
×
1448
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1449
                                c.closeConnection(TLSHandshakeError)
×
1450
                                return
×
1451
                        }
×
1452
                        remote.TLS = true
59✔
1453
                }
1454
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,943✔
1455
                        c.mu.Unlock()
29✔
1456
                        return
29✔
1457
                }
29✔
1458
        }
1459

1460
        // Check for compression, unless already done.
1461
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
4,026✔
1462
                // Prevent from getting back here.
1,338✔
1463
                c.flags.set(compressionNegotiated)
1,338✔
1464

1,338✔
1465
                var co *CompressionOpts
1,338✔
1466
                if !didSolicit {
1,928✔
1467
                        co = &opts.LeafNode.Compression
590✔
1468
                } else {
1,338✔
1469
                        co = &remote.Compression
748✔
1470
                }
748✔
1471
                if needsCompression(co.Mode) {
2,667✔
1472
                        // Release client lock since following function will need server lock.
1,329✔
1473
                        c.mu.Unlock()
1,329✔
1474
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,329✔
1475
                        if err != nil {
1,329✔
1476
                                c.sendErrAndErr(err.Error())
×
1477
                                c.closeConnection(ProtocolViolation)
×
1478
                                return
×
1479
                        }
×
1480
                        if compress {
2,528✔
1481
                                // Done for now, will get back another INFO protocol...
1,199✔
1482
                                return
1,199✔
1483
                        }
1,199✔
1484
                        // No compression because one side does not want/can't, so proceed.
1485
                        c.mu.Lock()
130✔
1486
                        // Check that the connection did not close if the lock was released.
130✔
1487
                        if c.isClosed() {
130✔
1488
                                c.mu.Unlock()
×
1489
                                return
×
1490
                        }
×
1491
                } else {
9✔
1492
                        // Coming from an old server, the Compression field would be the empty
9✔
1493
                        // string. For servers that are configured with CompressionNotSupported,
9✔
1494
                        // this makes them behave as old servers.
9✔
1495
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
11✔
1496
                                c.leaf.compression = CompressionNotSupported
2✔
1497
                        } else {
9✔
1498
                                c.leaf.compression = CompressionOff
7✔
1499
                        }
7✔
1500
                }
1501
                // Accepting side does not normally process an INFO protocol during
1502
                // initial connection handshake. So we keep it consistent by returning
1503
                // if we are not soliciting.
1504
                if !didSolicit {
140✔
1505
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1506
                        // clear the ping timer and set the auth timer now that the compression
1✔
1507
                        // negotiation is done.
1✔
1508
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1509
                                clearTimer(&c.ping.tmr)
×
1510
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1511
                        }
×
1512
                        c.mu.Unlock()
1✔
1513
                        return
1✔
1514
                }
1515
                // Fall through and process the INFO protocol as usual.
1516
        }
1517

1518
        // Note: For now, only the initial INFO has a nonce. We
1519
        // will probably do auto key rotation at some point.
1520
        if firstINFO {
2,276✔
1521
                // Mark that the INFO protocol has been received.
788✔
1522
                c.flags.set(infoReceived)
788✔
1523
                // Prevent connecting to non leafnode port. Need to do this only for
788✔
1524
                // the first INFO, not for async INFO updates...
788✔
1525
                //
788✔
1526
                // Content of INFO sent by the server when accepting a tcp connection.
788✔
1527
                // -------------------------------------------------------------------
788✔
1528
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
788✔
1529
                // -------------------------------------------------------------------
788✔
1530
                //      CLIENT    |  X* |        X**        |              |         |
788✔
1531
                //      ROUTE     |     |        X**        |      X***    |         |
788✔
1532
                //     GATEWAY    |     |                   |              |    X    |
788✔
1533
                //     LEAFNODE   |  X  |                   |       X      |         |
788✔
1534
                // -------------------------------------------------------------------
788✔
1535
                // *   Not on older servers.
788✔
1536
                // **  Not if "no advertise" is enabled.
788✔
1537
                // *** Not if leafnode's "no advertise" is enabled.
788✔
1538
                //
788✔
1539
                // As seen from above, a solicited LeafNode connection should receive
788✔
1540
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
788✔
1541
                // else should be considered an attempt to connect to a wrong port.
788✔
1542
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
845✔
1543
                        c.mu.Unlock()
57✔
1544
                        c.Errorf(ErrConnectedToWrongPort.Error())
57✔
1545
                        c.closeConnection(WrongPort)
57✔
1546
                        return
57✔
1547
                }
57✔
1548
                // Reject a cluster that contains spaces.
1549
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
732✔
1550
                        c.mu.Unlock()
1✔
1551
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1552
                        c.closeConnection(ProtocolViolation)
1✔
1553
                        return
1✔
1554
                }
1✔
1555
                // Capture a nonce here.
1556
                c.nonce = []byte(info.Nonce)
730✔
1557
                if info.TLSRequired && didSolicit {
760✔
1558
                        remote.TLS = true
30✔
1559
                }
30✔
1560
                supportsHeaders := c.srv.supportsHeaders()
730✔
1561
                c.headers = supportsHeaders && info.Headers
730✔
1562

730✔
1563
                // Remember the remote server.
730✔
1564
                // Pre 2.2.0 servers are not sending their server name.
730✔
1565
                // In that case, use info.ID, which, for those servers, matches
730✔
1566
                // the content of the field `Name` in the leafnode CONNECT protocol.
730✔
1567
                if info.Name == _EMPTY_ {
730✔
1568
                        c.leaf.remoteServer = info.ID
×
1569
                } else {
730✔
1570
                        c.leaf.remoteServer = info.Name
730✔
1571
                }
730✔
1572
                c.leaf.remoteDomain = info.Domain
730✔
1573
                c.leaf.remoteCluster = info.Cluster
730✔
1574
                // We send the protocol version in the INFO protocol.
730✔
1575
                // Keep track of it, so we know if this connection supports message
730✔
1576
                // tracing for instance.
730✔
1577
                c.opts.Protocol = info.Proto
730✔
1578
        }
1579

1580
        // For both initial INFO and async INFO protocols, Possibly
1581
        // update our list of remote leafnode URLs we can connect to.
1582
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,776✔
1583
                // Consider the incoming array as the most up-to-date
1,346✔
1584
                // representation of the remote cluster's list of URLs.
1,346✔
1585
                c.updateLeafNodeURLs(info)
1,346✔
1586
        }
1,346✔
1587

1588
        // Check to see if we have permissions updates here.
1589
        if info.Import != nil || info.Export != nil {
1,446✔
1590
                perms := &Permissions{
16✔
1591
                        Publish:   info.Export,
16✔
1592
                        Subscribe: info.Import,
16✔
1593
                }
16✔
1594
                // Check if we have local deny clauses that we need to merge.
16✔
1595
                if remote := c.leaf.remote; remote != nil {
32✔
1596
                        if len(remote.DenyExports) > 0 {
17✔
1597
                                if perms.Publish == nil {
1✔
1598
                                        perms.Publish = &SubjectPermission{}
×
1599
                                }
×
1600
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1601
                        }
1602
                        if len(remote.DenyImports) > 0 {
17✔
1603
                                if perms.Subscribe == nil {
1✔
1604
                                        perms.Subscribe = &SubjectPermission{}
×
1605
                                }
×
1606
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1607
                        }
1608
                }
1609
                c.setPermissions(perms)
16✔
1610
        }
1611

1612
        var resumeConnect bool
1,430✔
1613

1,430✔
1614
        // If this is a remote connection and this is the first INFO protocol,
1,430✔
1615
        // then we need to finish the connect process by sending CONNECT, etc..
1,430✔
1616
        if firstINFO && didSolicit {
2,117✔
1617
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
687✔
1618
                c.nc.SetDeadline(time.Time{})
687✔
1619
                resumeConnect = true
687✔
1620
        } else if !firstINFO && didSolicit {
2,089✔
1621
                c.leaf.remoteAccName = info.RemoteAccount
659✔
1622
        }
659✔
1623

1624
        // Check if we have the remote account information and if so make sure it's stored.
1625
        if info.RemoteAccount != _EMPTY_ {
2,077✔
1626
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
647✔
1627
        }
647✔
1628
        c.mu.Unlock()
1,430✔
1629

1,430✔
1630
        finishConnect := info.ConnectInfo
1,430✔
1631
        if resumeConnect && s != nil {
2,117✔
1632
                s.leafNodeResumeConnectProcess(c)
687✔
1633
                if !info.InfoOnConnect {
687✔
1634
                        finishConnect = true
×
1635
                }
×
1636
        }
1637
        if finishConnect {
2,077✔
1638
                s.leafNodeFinishConnectProcess(c)
647✔
1639
        }
647✔
1640

1641
        // Check to see if we need to kick any internal source or mirror consumers.
1642
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1643
        // does not have jetstream.
1644
        s.checkInternalSyncConsumers(c.acc)
1,430✔
1645
}
1646

1647
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,329✔
1648
        // Negotiate the appropriate compression mode (or no compression)
1,329✔
1649
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,329✔
1650
        if err != nil {
1,329✔
1651
                return false, err
×
1652
        }
×
1653
        c.mu.Lock()
1,329✔
1654
        // For "auto" mode, set the initial compression mode based on RTT
1,329✔
1655
        if cm == CompressionS2Auto {
2,496✔
1656
                if c.rttStart.IsZero() {
2,334✔
1657
                        c.rtt = computeRTT(c.start)
1,167✔
1658
                }
1,167✔
1659
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,167✔
1660
        }
1661
        // Keep track of the negotiated compression mode.
1662
        c.leaf.compression = cm
1,329✔
1663
        cid := c.cid
1,329✔
1664
        var nonce string
1,329✔
1665
        if !didSolicit {
1,918✔
1666
                nonce = bytesToString(c.nonce)
589✔
1667
        }
589✔
1668
        c.mu.Unlock()
1,329✔
1669

1,329✔
1670
        if !needsCompression(cm) {
1,459✔
1671
                return false, nil
130✔
1672
        }
130✔
1673

1674
        // If we end-up doing compression...
1675

1676
        // Generate an INFO with the chosen compression mode.
1677
        s.mu.Lock()
1,199✔
1678
        info := s.copyLeafNodeInfo()
1,199✔
1679
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,199✔
1680
        infoProto := generateInfoJSON(info)
1,199✔
1681
        s.mu.Unlock()
1,199✔
1682

1,199✔
1683
        // If we solicited, then send this INFO protocol BEFORE switching
1,199✔
1684
        // to compression writer. However, if we did not, we send it after.
1,199✔
1685
        c.mu.Lock()
1,199✔
1686
        if didSolicit {
1,809✔
1687
                c.enqueueProto(infoProto)
610✔
1688
                // Make sure it is completely flushed (the pending bytes goes to
610✔
1689
                // 0) before proceeding.
610✔
1690
                for c.out.pb > 0 && !c.isClosed() {
1,219✔
1691
                        c.flushOutbound()
609✔
1692
                }
609✔
1693
        }
1694
        // This is to notify the readLoop that it should switch to a
1695
        // (de)compression reader.
1696
        c.in.flags.set(switchToCompression)
1,199✔
1697
        // Create the compress writer before queueing the INFO protocol for
1,199✔
1698
        // a route that did not solicit. It will make sure that that proto
1,199✔
1699
        // is sent with compression on.
1,199✔
1700
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,199✔
1701
        if !didSolicit {
1,788✔
1702
                c.enqueueProto(infoProto)
589✔
1703
        }
589✔
1704
        c.mu.Unlock()
1,199✔
1705
        return true, nil
1,199✔
1706
}
1707

1708
// When getting a leaf node INFO protocol, use the provided
1709
// array of urls to update the list of possible endpoints.
1710
func (c *client) updateLeafNodeURLs(info *Info) {
1,346✔
1711
        cfg := c.leaf.remote
1,346✔
1712
        cfg.Lock()
1,346✔
1713
        defer cfg.Unlock()
1,346✔
1714

1,346✔
1715
        // We have ensured that if a remote has a WS scheme, then all are.
1,346✔
1716
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,346✔
1717
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,404✔
1718
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1719
                // we will have already marked that the remote should use TLS anyway.
58✔
1720
                // But use proper scheme for log statements, etc...
58✔
1721
                proto := wsSchemePrefix
58✔
1722
                if cfg.TLS {
58✔
1723
                        proto = wsSchemePrefixTLS
×
1724
                }
×
1725
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1726
                return
58✔
1727
        }
1728
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,288✔
1729
}
1730

1731
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,346✔
1732
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,346✔
1733
        // Add the ones we receive in the protocol
1,346✔
1734
        for _, surl := range URLs {
3,724✔
1735
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,378✔
1736
                if err != nil {
2,378✔
1737
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1738
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1739
                        continue
×
1740
                }
1741
                // Do not add if it's the same as what we already have configured.
1742
                var dup bool
2,378✔
1743
                for _, u := range cfg.URLs {
6,013✔
1744
                        // URLs that we receive never have user info, but the
3,635✔
1745
                        // ones that were configured may have. Simply compare
3,635✔
1746
                        // host and port to decide if they are equal or not.
3,635✔
1747
                        if url.Host == u.Host && url.Port() == u.Port() {
5,354✔
1748
                                dup = true
1,719✔
1749
                                break
1,719✔
1750
                        }
1751
                }
1752
                if !dup {
3,037✔
1753
                        cfg.urls = append(cfg.urls, url)
659✔
1754
                        cfg.saveTLSHostname(url)
659✔
1755
                }
659✔
1756
        }
1757
        // Add the configured one
1758
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,346✔
1759
}
1760

1761
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1762
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,944✔
1763
        opts := s.getOpts()
3,944✔
1764
        if opts.LeafNode.Advertise != _EMPTY_ {
3,955✔
1765
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1766
                if err != nil {
11✔
1767
                        return err
×
1768
                }
×
1769
                s.leafNodeInfo.Host = advHost
11✔
1770
                s.leafNodeInfo.Port = advPort
11✔
1771
        } else {
3,933✔
1772
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,933✔
1773
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,933✔
1774
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,933✔
1775
                // This will return at most 1 IP.
3,933✔
1776
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,933✔
1777
                if err != nil {
3,933✔
1778
                        return err
×
1779
                }
×
1780
                if hostIsIPAny {
4,237✔
1781
                        if len(ips) == 0 {
304✔
1782
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1783
                                        s.leafNodeInfo.Host)
×
1784
                        } else {
304✔
1785
                                // Take the first from the list...
304✔
1786
                                s.leafNodeInfo.Host = ips[0]
304✔
1787
                        }
304✔
1788
                }
1789
        }
1790
        // Use just host:port for the IP
1791
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,944✔
1792
        if opts.LeafNode.Advertise != _EMPTY_ {
3,955✔
1793
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1794
        }
11✔
1795
        return nil
3,944✔
1796
}
1797

1798
// Add the connection to the map of leaf nodes.
1799
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1800
// if a connection already exists for the same server name and account.
1801
// That can happen when the remote is attempting to reconnect while the accepting
1802
// side did not detect the connection as broken yet.
1803
// But it can also happen when there is a misconfiguration and the remote is
1804
// creating two (or more) connections that bind to the same account on the accept
1805
// side.
1806
// When a duplicate is found, the new connection is accepted and the old is closed
1807
// (this solves the stale connection situation). An error is returned to help the
1808
// remote detect the misconfiguration when the duplicate is the result of that
1809
// misconfiguration.
1810
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,331✔
1811
        var accName string
1,331✔
1812
        c.mu.Lock()
1,331✔
1813
        cid := c.cid
1,331✔
1814
        acc := c.acc
1,331✔
1815
        if acc != nil {
2,662✔
1816
                accName = acc.Name
1,331✔
1817
        }
1,331✔
1818
        myRemoteDomain := c.leaf.remoteDomain
1,331✔
1819
        mySrvName := c.leaf.remoteServer
1,331✔
1820
        remoteAccName := c.leaf.remoteAccName
1,331✔
1821
        myClustName := c.leaf.remoteCluster
1,331✔
1822
        solicited := c.leaf.remote != nil
1,331✔
1823
        c.mu.Unlock()
1,331✔
1824

1,331✔
1825
        var old *client
1,331✔
1826
        s.mu.Lock()
1,331✔
1827
        // We check for empty because in some test we may send empty CONNECT{}
1,331✔
1828
        if checkForDup && srvName != _EMPTY_ {
1,978✔
1829
                for _, ol := range s.leafs {
1,063✔
1830
                        ol.mu.Lock()
416✔
1831
                        // We care here only about non solicited Leafnode. This function
416✔
1832
                        // is more about replacing stale connections than detecting loops.
416✔
1833
                        // We have code for the loop detection elsewhere, which also delays
416✔
1834
                        // attempt to reconnect.
416✔
1835
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
416✔
1836
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
416✔
1837
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
418✔
1838
                                old = ol
2✔
1839
                        }
2✔
1840
                        ol.mu.Unlock()
416✔
1841
                        if old != nil {
418✔
1842
                                break
2✔
1843
                        }
1844
                }
1845
        }
1846
        // Store new connection in the map
1847
        s.leafs[cid] = c
1,331✔
1848
        s.mu.Unlock()
1,331✔
1849
        s.removeFromTempClients(cid)
1,331✔
1850

1,331✔
1851
        // If applicable, evict the old one.
1,331✔
1852
        if old != nil {
1,333✔
1853
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1854
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1855
                c.Warnf("Replacing connection from same server")
2✔
1856
        }
2✔
1857

1858
        srvDecorated := func() string {
1,542✔
1859
                if myClustName == _EMPTY_ {
234✔
1860
                        return mySrvName
23✔
1861
                }
23✔
1862
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
188✔
1863
        }
1864

1865
        opts := s.getOpts()
1,331✔
1866
        sysAcc := s.SystemAccount()
1,331✔
1867
        js := s.getJetStream()
1,331✔
1868
        var meta *raft
1,331✔
1869
        if js != nil {
1,877✔
1870
                if mg := js.getMetaGroup(); mg != nil {
975✔
1871
                        meta = mg.(*raft)
429✔
1872
                }
429✔
1873
        }
1874
        blockMappingOutgoing := false
1,331✔
1875
        // Deny (non domain) JetStream API traffic unless system account is shared
1,331✔
1876
        // and domain names are identical and extending is not disabled
1,331✔
1877

1,331✔
1878
        // Check if backwards compatibility has been enabled and needs to be acted on
1,331✔
1879
        forceSysAccDeny := false
1,331✔
1880
        if len(opts.JsAccDefaultDomain) > 0 {
1,368✔
1881
                if acc == sysAcc {
48✔
1882
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1883
                                if d == _EMPTY_ {
19✔
1884
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1885
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1886
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1887
                                        forceSysAccDeny = true
8✔
1888
                                        break
8✔
1889
                                }
1890
                        }
1891
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1892
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1893
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1894
                        return
15✔
1895
                }
15✔
1896
        }
1897

1898
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1899
        // This is either signaled by js being disabled and a domain set,
1900
        // or in cases where no domain name exists, an extension hint is set.
1901
        // However, this is only relevant in mixed setups.
1902
        //
1903
        // If the system account connects but default domains are present, JetStream can't be extended.
1904
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,316✔
1905
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,474✔
1906
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,158✔
1907
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,158✔
1908
                if acc != nil && acc == sysAcc {
1,297✔
1909
                        c.Noticef("System account connected from %s", srvDecorated())
139✔
1910
                        c.Noticef("JetStream not extended, domains differ")
139✔
1911
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
139✔
1912
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
139✔
1913
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
139✔
1914
                        if solicited && meta != nil && meta.IsObserver() {
169✔
1915
                                meta.setObserver(false, extNotExtended)
30✔
1916
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
30✔
1917
                                // Take note that the domain was not extended to avoid this state from startup.
30✔
1918
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
30✔
1919
                                // Meta controller can't be leader yet.
30✔
1920
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
30✔
1921
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
30✔
1922
                                meta.Campaign()
30✔
1923
                        }
30✔
1924
                } else {
1,019✔
1925
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
1,019✔
1926
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
1,019✔
1927
                }
1,019✔
1928
                blockMappingOutgoing = true
1,158✔
1929
        } else if acc == sysAcc {
230✔
1930
                // system account and same domain
72✔
1931
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1932
                        myRemoteDomain, srvDecorated())
72✔
1933
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1934
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1935
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1936
                        meta.setObserver(true, extExtended)
4✔
1937
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1938
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1939
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1940
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1941
                        meta.StepDown()
4✔
1942
                }
4✔
1943
        } else {
86✔
1944
                // This deny is needed in all cases (system account shared or not)
86✔
1945
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1946
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1947
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1948
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1949
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1950
        }
86✔
1951
        // If we have a specified JetStream domain we will want to add a mapping to
1952
        // allow access cross domain for each non-system account.
1953
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,569✔
1954
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,530✔
1955
                        if err := acc.AddMapping(src, dest); err != nil {
2,277✔
1956
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1957
                        } else {
2,277✔
1958
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,277✔
1959
                        }
2,277✔
1960
                }
1961
                if blockMappingOutgoing {
475✔
1962
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
222✔
1963
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
222✔
1964
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
222✔
1965
                        // of this issue, not all of them.
222✔
1966
                        // This guards against a hub and a spoke having the same domain name.
222✔
1967
                        // But not two spokes having the same one and the request coming from the hub.
222✔
1968
                        c.mergeDenyPermissionsLocked(pub, []string{src})
222✔
1969
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
222✔
1970
                }
222✔
1971
        }
1972
}
1973

1974
func (s *Server) removeLeafNodeConnection(c *client) {
1,691✔
1975
        c.mu.Lock()
1,691✔
1976
        cid := c.cid
1,691✔
1977
        if c.leaf != nil {
3,382✔
1978
                if c.leaf.tsubt != nil {
2,911✔
1979
                        c.leaf.tsubt.Stop()
1,220✔
1980
                        c.leaf.tsubt = nil
1,220✔
1981
                }
1,220✔
1982
                if c.leaf.gwSub != nil {
2,336✔
1983
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
645✔
1984
                        // We need to set this to nil for GC to release the connection
645✔
1985
                        c.leaf.gwSub = nil
645✔
1986
                }
645✔
1987
        }
1988
        proxyKey := c.proxyKey
1,691✔
1989
        c.mu.Unlock()
1,691✔
1990
        s.mu.Lock()
1,691✔
1991
        delete(s.leafs, cid)
1,691✔
1992
        if proxyKey != _EMPTY_ {
1,695✔
1993
                s.removeProxiedConn(proxyKey, cid)
4✔
1994
        }
4✔
1995
        s.mu.Unlock()
1,691✔
1996
        s.removeFromTempClients(cid)
1,691✔
1997
}
1998

1999
// Connect information for solicited leafnodes.
2000
type leafConnectInfo struct {
2001
        Version   string   `json:"version,omitempty"`
2002
        Nkey      string   `json:"nkey,omitempty"`
2003
        JWT       string   `json:"jwt,omitempty"`
2004
        Sig       string   `json:"sig,omitempty"`
2005
        User      string   `json:"user,omitempty"`
2006
        Pass      string   `json:"pass,omitempty"`
2007
        Token     string   `json:"auth_token,omitempty"`
2008
        ID        string   `json:"server_id,omitempty"`
2009
        Domain    string   `json:"domain,omitempty"`
2010
        Name      string   `json:"name,omitempty"`
2011
        Hub       bool     `json:"is_hub,omitempty"`
2012
        Cluster   string   `json:"cluster,omitempty"`
2013
        Headers   bool     `json:"headers,omitempty"`
2014
        JetStream bool     `json:"jetstream,omitempty"`
2015
        DenyPub   []string `json:"deny_pub,omitempty"`
2016
        Isolate   bool     `json:"isolate,omitempty"`
2017

2018
        // There was an existing field called:
2019
        // >> Comp bool `json:"compression,omitempty"`
2020
        // that has never been used. With support for compression, we now need
2021
        // a field that is a string. So we use a different json tag:
2022
        Compression string `json:"compress_mode,omitempty"`
2023

2024
        // Just used to detect wrong connection attempts.
2025
        Gateway string `json:"gateway,omitempty"`
2026

2027
        // Tells the accept side which account the remote is binding to.
2028
        RemoteAccount string `json:"remote_account,omitempty"`
2029

2030
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2031
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2032
        // version as part of the CONNECT. It will indicate if a connection supports
2033
        // some features, such as message tracing.
2034
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2035
        // in the low level process CONNECT.
2036
        Proto int `json:"protocol,omitempty"`
2037
}
2038

2039
// processLeafNodeConnect will process the inbound connect args.
2040
// Once we are here we are bound to an account, so can send any interest that
2041
// we would have to the other side.
2042
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
691✔
2043
        // Way to detect clients that incorrectly connect to the route listen
691✔
2044
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
691✔
2045
        if lang != _EMPTY_ {
691✔
2046
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2047
                c.closeConnection(WrongPort)
×
2048
                return ErrClientConnectedToLeafNodePort
×
2049
        }
×
2050

2051
        // Unmarshal as a leaf node connect protocol
2052
        proto := &leafConnectInfo{}
691✔
2053
        if err := json.Unmarshal(arg, proto); err != nil {
691✔
2054
                return err
×
2055
        }
×
2056

2057
        // Reject a cluster that contains spaces.
2058
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
692✔
2059
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2060
                c.closeConnection(ProtocolViolation)
1✔
2061
                return ErrClusterNameHasSpaces
1✔
2062
        }
1✔
2063

2064
        // Check for cluster name collisions.
2065
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
693✔
2066
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2067
                c.closeConnection(ClusterNamesIdentical)
3✔
2068
                return ErrLeafNodeHasSameClusterName
3✔
2069
        }
3✔
2070

2071
        // Reject if this has Gateway which means that it would be from a gateway
2072
        // connection that incorrectly connects to the leafnode port.
2073
        if proto.Gateway != _EMPTY_ {
687✔
2074
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2075
                c.Errorf(errTxt)
×
2076
                c.sendErr(errTxt)
×
2077
                c.closeConnection(WrongGateway)
×
2078
                return ErrWrongGateway
×
2079
        }
×
2080

2081
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
689✔
2082
                major, minor, update, _ := versionComponents(mv)
2✔
2083
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2084
                        // Send back an INFO so recent remote servers process the rejection
1✔
2085
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2086
                        // reconnect delay when it processes the error.
1✔
2087
                        s.sendPermsAndAccountInfo(c)
1✔
2088
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2089
                        c.closeConnection(MinimumVersionRequired)
1✔
2090
                        return ErrMinimumVersionRequired
1✔
2091
                }
1✔
2092
        }
2093

2094
        // Check if this server supports headers.
2095
        supportHeaders := c.srv.supportsHeaders()
686✔
2096

686✔
2097
        c.mu.Lock()
686✔
2098
        // Leaf Nodes do not do echo or verbose or pedantic.
686✔
2099
        c.opts.Verbose = false
686✔
2100
        c.opts.Echo = false
686✔
2101
        c.opts.Pedantic = false
686✔
2102
        // This inbound connection will be marked as supporting headers if this server
686✔
2103
        // support headers and the remote has sent in the CONNECT protocol that it does
686✔
2104
        // support headers too.
686✔
2105
        c.headers = supportHeaders && proto.Headers
686✔
2106
        // If the compression level is still not set, set it based on what has been
686✔
2107
        // given to us in the CONNECT protocol.
686✔
2108
        if c.leaf.compression == _EMPTY_ {
815✔
2109
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
129✔
2110
                if proto.Compression == _EMPTY_ {
168✔
2111
                        c.leaf.compression = CompressionNotSupported
39✔
2112
                } else {
129✔
2113
                        c.leaf.compression = proto.Compression
90✔
2114
                }
90✔
2115
        }
2116

2117
        // Remember the remote server.
2118
        c.leaf.remoteServer = proto.Name
686✔
2119
        // Remember the remote account name
686✔
2120
        c.leaf.remoteAccName = proto.RemoteAccount
686✔
2121
        // Remember if the leafnode requested isolation.
686✔
2122
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
686✔
2123

686✔
2124
        // If the other side has declared itself a hub, so we will take on the spoke role.
686✔
2125
        if proto.Hub {
702✔
2126
                c.leaf.isSpoke = true
16✔
2127
        }
16✔
2128

2129
        // The soliciting side is part of a cluster.
2130
        if proto.Cluster != _EMPTY_ {
1,217✔
2131
                c.leaf.remoteCluster = proto.Cluster
531✔
2132
        }
531✔
2133

2134
        c.leaf.remoteDomain = proto.Domain
686✔
2135

686✔
2136
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
686✔
2137
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
686✔
2138
        if !c.isSolicitedLeafNode() && c.perms != nil {
703✔
2139
                sp, pp := c.perms.sub, c.perms.pub
17✔
2140
                c.perms.sub, c.perms.pub = pp, sp
17✔
2141
                if c.opts.Import != nil {
33✔
2142
                        c.darray = c.opts.Import.Deny
16✔
2143
                } else {
17✔
2144
                        c.darray = nil
1✔
2145
                }
1✔
2146
        }
2147

2148
        // Set the Ping timer
2149
        c.setFirstPingTimer()
686✔
2150

686✔
2151
        // If we received pub deny permissions from the other end, merge with existing ones.
686✔
2152
        c.mergeDenyPermissions(pub, proto.DenyPub)
686✔
2153

686✔
2154
        acc := c.acc
686✔
2155
        c.mu.Unlock()
686✔
2156

686✔
2157
        // Register the cluster, even if empty, as long as we are acting as a hub.
686✔
2158
        if !proto.Hub {
1,356✔
2159
                acc.registerLeafNodeCluster(proto.Cluster)
670✔
2160
        }
670✔
2161

2162
        // Add in the leafnode here since we passed through auth at this point.
2163
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
686✔
2164

686✔
2165
        // If we have permissions bound to this leafnode we need to send then back to the
686✔
2166
        // origin server for local enforcement.
686✔
2167
        s.sendPermsAndAccountInfo(c)
686✔
2168

686✔
2169
        // Create and initialize the smap since we know our bound account now.
686✔
2170
        // This will send all registered subs too.
686✔
2171
        s.initLeafNodeSmapAndSendSubs(c)
686✔
2172

686✔
2173
        // Announce the account connect event for a leaf node.
686✔
2174
        // This will be a no-op as needed.
686✔
2175
        s.sendLeafNodeConnect(c.acc)
686✔
2176

686✔
2177
        // Check to see if we need to kick any internal source or mirror consumers.
686✔
2178
        // This will be a no-op if JetStream not enabled for this server or if the bound account
686✔
2179
        // does not have jetstream.
686✔
2180
        s.checkInternalSyncConsumers(acc)
686✔
2181

686✔
2182
        return nil
686✔
2183
}
2184

2185
// checkInternalSyncConsumers
2186
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,116✔
2187
        // Grab our js
2,116✔
2188
        js := s.getJetStream()
2,116✔
2189

2,116✔
2190
        // Only applicable if we have JS and the leafnode has JS as well.
2,116✔
2191
        // We check for remote JS outside.
2,116✔
2192
        if !js.isEnabled() || acc == nil {
3,332✔
2193
                return
1,216✔
2194
        }
1,216✔
2195

2196
        // We will check all streams in our local account. They must be a leader and
2197
        // be sourcing or mirroring. We will check the external config on the stream itself
2198
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2199
        // extending the system across this leafnode connection and hence we would be extending
2200
        // our own domain.
2201
        jsa := js.lookupAccount(acc)
900✔
2202
        if jsa == nil {
1,253✔
2203
                return
353✔
2204
        }
353✔
2205

2206
        var streams []*stream
547✔
2207
        jsa.mu.RLock()
547✔
2208
        for _, mset := range jsa.streams {
602✔
2209
                mset.cfgMu.RLock()
55✔
2210
                // We need to have a mirror or source defined.
55✔
2211
                // We do not want to force another lock here to look for leader status,
55✔
2212
                // so collect and after we release jsa will make sure.
55✔
2213
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
67✔
2214
                        streams = append(streams, mset)
12✔
2215
                }
12✔
2216
                mset.cfgMu.RUnlock()
55✔
2217
        }
2218
        jsa.mu.RUnlock()
547✔
2219

547✔
2220
        // Now loop through all candidates and check if we are the leader and have NOT
547✔
2221
        // created the sync up consumer.
547✔
2222
        for _, mset := range streams {
559✔
2223
                mset.retryDisconnectedSyncConsumers()
12✔
2224
        }
12✔
2225
}
2226

2227
// Returns the remote cluster name. This is set only once so does not require a lock.
2228
func (c *client) remoteCluster() string {
130,129✔
2229
        if c.leaf == nil {
130,129✔
2230
                return _EMPTY_
×
2231
        }
×
2232
        return c.leaf.remoteCluster
130,129✔
2233
}
2234

2235
// Sends back an info block to the soliciting leafnode to let it know about
2236
// its permission settings for local enforcement.
2237
func (s *Server) sendPermsAndAccountInfo(c *client) {
687✔
2238
        // Copy
687✔
2239
        s.mu.Lock()
687✔
2240
        info := s.copyLeafNodeInfo()
687✔
2241
        s.mu.Unlock()
687✔
2242
        c.mu.Lock()
687✔
2243
        info.CID = c.cid
687✔
2244
        info.Import = c.opts.Import
687✔
2245
        info.Export = c.opts.Export
687✔
2246
        info.RemoteAccount = c.acc.Name
687✔
2247
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
687✔
2248
        info.IsSystemAccount = c.acc == s.SystemAccount()
687✔
2249
        info.ConnectInfo = true
687✔
2250
        c.enqueueProto(generateInfoJSON(info))
687✔
2251
        c.mu.Unlock()
687✔
2252
}
687✔
2253

2254
// Snapshot the current subscriptions from the sublist into our smap which
2255
// we will keep updated from now on.
2256
// Also send the registered subscriptions.
2257
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,331✔
2258
        acc := c.acc
1,331✔
2259
        if acc == nil {
1,331✔
2260
                c.Debugf("Leafnode does not have an account bound")
×
2261
                return
×
2262
        }
×
2263
        // Collect all account subs here.
2264
        _subs := [1024]*subscription{}
1,331✔
2265
        subs := _subs[:0]
1,331✔
2266
        ims := []string{}
1,331✔
2267

1,331✔
2268
        // Hold the client lock otherwise there can be a race and miss some subs.
1,331✔
2269
        c.mu.Lock()
1,331✔
2270
        defer c.mu.Unlock()
1,331✔
2271

1,331✔
2272
        acc.mu.RLock()
1,331✔
2273
        accName := acc.Name
1,331✔
2274
        accNTag := acc.nameTag
1,331✔
2275

1,331✔
2276
        // To make printing look better when no friendly name present.
1,331✔
2277
        if accNTag != _EMPTY_ {
1,343✔
2278
                accNTag = "/" + accNTag
12✔
2279
        }
12✔
2280

2281
        // If we are solicited we only send interest for local clients.
2282
        if c.isSpokeLeafNode() {
1,976✔
2283
                acc.sl.localSubs(&subs, true)
645✔
2284
        } else {
1,331✔
2285
                acc.sl.All(&subs)
686✔
2286
        }
686✔
2287

2288
        // Check if we have an existing service import reply.
2289
        siReply := copyBytes(acc.siReply)
1,331✔
2290

1,331✔
2291
        // Since leaf nodes only send on interest, if the bound
1,331✔
2292
        // account has import services we need to send those over.
1,331✔
2293
        for isubj := range acc.imports.services {
6,306✔
2294
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,267✔
2295
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
292✔
2296
                        continue
292✔
2297
                }
2298
                ims = append(ims, isubj)
4,683✔
2299
        }
2300
        // Likewise for mappings.
2301
        for _, m := range acc.mappings {
3,729✔
2302
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,444✔
2303
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2304
                        continue
46✔
2305
                }
2306
                ims = append(ims, m.src)
2,352✔
2307
        }
2308

2309
        // Create a unique subject that will be used for loop detection.
2310
        lds := acc.lds
1,331✔
2311
        acc.mu.RUnlock()
1,331✔
2312

1,331✔
2313
        // Check if we have to create the LDS.
1,331✔
2314
        if lds == _EMPTY_ {
2,366✔
2315
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,035✔
2316
                acc.mu.Lock()
1,035✔
2317
                acc.lds = lds
1,035✔
2318
                acc.mu.Unlock()
1,035✔
2319
        }
1,035✔
2320

2321
        // Now check for gateway interest. Leafnodes will put this into
2322
        // the proper mode to propagate, but they are not held in the account.
2323
        gwsa := [16]*client{}
1,331✔
2324
        gws := gwsa[:0]
1,331✔
2325
        s.getOutboundGatewayConnections(&gws)
1,331✔
2326
        for _, cgw := range gws {
1,414✔
2327
                cgw.mu.Lock()
83✔
2328
                gw := cgw.gw
83✔
2329
                cgw.mu.Unlock()
83✔
2330
                if gw != nil {
166✔
2331
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
166✔
2332
                                if e := ei.(*outsie); e != nil && e.sl != nil {
166✔
2333
                                        e.sl.All(&subs)
83✔
2334
                                }
83✔
2335
                        }
2336
                }
2337
        }
2338

2339
        applyGlobalRouting := s.gateway.enabled
1,331✔
2340
        if c.isSpokeLeafNode() {
1,976✔
2341
                // Add a fake subscription for this solicited leafnode connection
645✔
2342
                // so that we can send back directly for mapped GW replies.
645✔
2343
                // We need to keep track of this subscription so it can be removed
645✔
2344
                // when the connection is closed so that the GC can release it.
645✔
2345
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
645✔
2346
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
645✔
2347
        }
645✔
2348

2349
        // Now walk the results and add them to our smap
2350
        rc := c.leaf.remoteCluster
1,331✔
2351
        c.leaf.smap = make(map[string]int32)
1,331✔
2352
        for _, sub := range subs {
39,311✔
2353
                // Check perms regardless of role.
37,980✔
2354
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,391✔
2355
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,411✔
2356
                        continue
2,411✔
2357
                }
2358
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2359
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
35,579✔
2360
                        continue
10✔
2361
                }
2362
                // We ignore ourselves here.
2363
                // Also don't add the subscription if it has a origin cluster and the
2364
                // cluster name matches the one of the client we are sending to.
2365
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,784✔
2366
                        count := int32(1)
30,225✔
2367
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,235✔
2368
                                count = sub.qw
10✔
2369
                        }
10✔
2370
                        c.leaf.smap[keyFromSub(sub)] += count
30,225✔
2371
                        if c.leaf.tsub == nil {
31,479✔
2372
                                c.leaf.tsub = make(map[*subscription]struct{})
1,254✔
2373
                        }
1,254✔
2374
                        c.leaf.tsub[sub] = struct{}{}
30,225✔
2375
                }
2376
        }
2377
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2378
        for _, isubj := range ims {
8,366✔
2379
                c.leaf.smap[isubj]++
7,035✔
2380
        }
7,035✔
2381
        // If we have gateways enabled we need to make sure the other side sends us responses
2382
        // that have been augmented from the original subscription.
2383
        // TODO(dlc) - Should we lock this down more?
2384
        if applyGlobalRouting {
1,435✔
2385
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
104✔
2386
                c.leaf.smap[gwReplyPrefix+">"]++
104✔
2387
        }
104✔
2388
        // Detect loops by subscribing to a specific subject and checking
2389
        // if this sub is coming back to us.
2390
        c.leaf.smap[lds]++
1,331✔
2391

1,331✔
2392
        // Check if we need to add an existing siReply to our map.
1,331✔
2393
        // This will be a prefix so add on the wildcard.
1,331✔
2394
        if siReply != nil {
1,346✔
2395
                wcsub := append(siReply, '>')
15✔
2396
                c.leaf.smap[string(wcsub)]++
15✔
2397
        }
15✔
2398
        // Queue all protocols. There is no max pending limit for LN connection,
2399
        // so we don't need chunking. The writes will happen from the writeLoop.
2400
        var b bytes.Buffer
1,331✔
2401
        for key, n := range c.leaf.smap {
28,289✔
2402
                c.writeLeafSub(&b, key, n)
26,958✔
2403
        }
26,958✔
2404
        if b.Len() > 0 {
2,662✔
2405
                c.enqueueProto(b.Bytes())
1,331✔
2406
        }
1,331✔
2407
        if c.leaf.tsub != nil {
2,586✔
2408
                // Clear the tsub map after 5 seconds.
1,255✔
2409
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,290✔
2410
                        c.mu.Lock()
35✔
2411
                        if c.leaf != nil {
70✔
2412
                                c.leaf.tsub = nil
35✔
2413
                                c.leaf.tsubt = nil
35✔
2414
                        }
35✔
2415
                        c.mu.Unlock()
35✔
2416
                })
2417
        }
2418
}
2419

2420
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2421
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
204,907✔
2422
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
204,907✔
2423
        acc, err := s.lookupOrFetchAccount(accName, false)
204,907✔
2424
        if acc == nil || err != nil {
205,160✔
2425
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
253✔
2426
                return
253✔
2427
        }
253✔
2428
        acc.updateLeafNodes(sub, delta)
204,654✔
2429
}
2430

2431
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2432
// Will also forward to all leaf nodes as needed.
2433
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2434
// (that is, for which this server acts as a hub to them).
2435
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,491,563✔
2436
        if acc == nil || sub == nil {
2,491,563✔
2437
                return
×
2438
        }
×
2439

2440
        // We will do checks for no leafnodes and same cluster here inline and under the
2441
        // general account read lock.
2442
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2443
        // blocking routes or GWs.
2444

2445
        acc.mu.RLock()
2,491,563✔
2446
        // First check if we even have leafnodes here.
2,491,563✔
2447
        if acc.nleafs == 0 {
4,915,458✔
2448
                acc.mu.RUnlock()
2,423,895✔
2449
                return
2,423,895✔
2450
        }
2,423,895✔
2451

2452
        // Is this a loop detection subject.
2453
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
67,668✔
2454

67,668✔
2455
        // Capture the cluster even if its empty.
67,668✔
2456
        var cluster string
67,668✔
2457
        if sub.origin != nil {
116,228✔
2458
                cluster = bytesToString(sub.origin)
48,560✔
2459
        }
48,560✔
2460

2461
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2462
        // Empty clusters will return false for the check.
2463
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
88,515✔
2464
                acc.mu.RUnlock()
20,847✔
2465
                return
20,847✔
2466
        }
20,847✔
2467

2468
        // We can release the general account lock.
2469
        acc.mu.RUnlock()
46,821✔
2470

46,821✔
2471
        // We can hold the list lock here to avoid having to copy a large slice.
46,821✔
2472
        acc.lmu.RLock()
46,821✔
2473
        defer acc.lmu.RUnlock()
46,821✔
2474

46,821✔
2475
        // Do this once.
46,821✔
2476
        subject := string(sub.subject)
46,821✔
2477

46,821✔
2478
        // Walk the connected leafnodes.
46,821✔
2479
        for _, ln := range acc.lleafs {
105,537✔
2480
                if ln == sub.client {
89,166✔
2481
                        continue
30,450✔
2482
                }
2483
                ln.mu.Lock()
28,266✔
2484
                // Don't advertise interest from leafnodes to other isolated leafnodes.
28,266✔
2485
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
28,302✔
2486
                        ln.mu.Unlock()
36✔
2487
                        continue
36✔
2488
                }
2489
                // If `hubOnly` is true, it means that we want to update only leafnodes
2490
                // that connect to this server (so isHubLeafNode() would return `true`).
2491
                if hubOnly && !ln.isHubLeafNode() {
28,236✔
2492
                        ln.mu.Unlock()
6✔
2493
                        continue
6✔
2494
                }
2495
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2496
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2497
                // the detection of loops as long as different cluster.
2498
                clusterDifferent := cluster != ln.remoteCluster()
28,224✔
2499
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
52,175✔
2500
                        ln.updateSmap(sub, delta, isLDS)
23,951✔
2501
                }
23,951✔
2502
                ln.mu.Unlock()
28,224✔
2503
        }
2504
}
2505

2506
// updateLeafNodes will make sure to update the account smap for the subscription.
2507
// Will also forward to all leaf nodes as needed.
2508
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,491,540✔
2509
        acc.updateLeafNodesEx(sub, delta, false)
2,491,540✔
2510
}
2,491,540✔
2511

2512
// This will make an update to our internal smap and determine if we should send out
2513
// an interest update to the remote side.
2514
// Lock should be held.
2515
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,951✔
2516
        if c.leaf.smap == nil {
24,000✔
2517
                return
49✔
2518
        }
49✔
2519

2520
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2521
        skind := sub.client.kind
23,902✔
2522
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,902✔
2523
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,180✔
2524
                return
8,278✔
2525
        }
8,278✔
2526

2527
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2528
        if delta > 0 && c.leaf.tsub != nil {
23,219✔
2529
                if _, present := c.leaf.tsub[sub]; present {
7,599✔
2530
                        delete(c.leaf.tsub, sub)
4✔
2531
                        if len(c.leaf.tsub) == 0 {
4✔
2532
                                c.leaf.tsub = nil
×
2533
                                c.leaf.tsubt.Stop()
×
2534
                                c.leaf.tsubt = nil
×
2535
                        }
×
2536
                        return
4✔
2537
                }
2538
        }
2539

2540
        key := keyFromSub(sub)
15,620✔
2541
        n, ok := c.leaf.smap[key]
15,620✔
2542
        if delta < 0 && !ok {
16,678✔
2543
                return
1,058✔
2544
        }
1,058✔
2545

2546
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2547
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,562✔
2548
        n += delta
14,562✔
2549
        if n > 0 {
25,389✔
2550
                c.leaf.smap[key] = n
10,827✔
2551
        } else {
14,562✔
2552
                delete(c.leaf.smap, key)
3,735✔
2553
        }
3,735✔
2554
        if update {
24,545✔
2555
                c.sendLeafNodeSubUpdate(key, n)
9,983✔
2556
        }
9,983✔
2557
}
2558

2559
// Used to force add subjects to the subject map.
2560
func (c *client) forceAddToSmap(subj string) {
13✔
2561
        c.mu.Lock()
13✔
2562
        defer c.mu.Unlock()
13✔
2563

13✔
2564
        if c.leaf.smap == nil {
13✔
2565
                return
×
2566
        }
×
2567
        n := c.leaf.smap[subj]
13✔
2568
        if n != 0 {
14✔
2569
                return
1✔
2570
        }
1✔
2571
        // Place into the map since it was not there.
2572
        c.leaf.smap[subj] = 1
12✔
2573
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2574
}
2575

2576
// Used to force remove a subject from the subject map.
2577
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2578
        c.mu.Lock()
1✔
2579
        defer c.mu.Unlock()
1✔
2580

1✔
2581
        if c.leaf.smap == nil {
1✔
2582
                return
×
2583
        }
×
2584
        n := c.leaf.smap[subj]
1✔
2585
        if n == 0 {
1✔
2586
                return
×
2587
        }
×
2588
        n--
1✔
2589
        if n == 0 {
2✔
2590
                // Remove is now zero
1✔
2591
                delete(c.leaf.smap, subj)
1✔
2592
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2593
        } else {
1✔
2594
                c.leaf.smap[subj] = n
×
2595
        }
×
2596
}
2597

2598
// Send the subscription interest change to the other side.
2599
// Lock should be held.
2600
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,996✔
2601
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,996✔
2602
        if c.isSpokeLeafNode() {
12,442✔
2603
                checkPerms := true
2,446✔
2604
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,902✔
2605
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,456✔
2606
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,456✔
2607
                                strings.HasPrefix(key, gwReplyPrefix) {
1,549✔
2608
                                checkPerms = false
93✔
2609
                        }
93✔
2610
                }
2611
                if checkPerms {
4,799✔
2612
                        var subject string
2,353✔
2613
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,846✔
2614
                                subject = key[:sep]
493✔
2615
                        } else {
2,353✔
2616
                                subject = key
1,860✔
2617
                        }
1,860✔
2618
                        if !c.canSubscribe(subject) {
2,362✔
2619
                                return
9✔
2620
                        }
9✔
2621
                }
2622
        }
2623
        // If we are here we can send over to the other side.
2624
        _b := [64]byte{}
9,987✔
2625
        b := bytes.NewBuffer(_b[:0])
9,987✔
2626
        c.writeLeafSub(b, key, n)
9,987✔
2627
        c.enqueueProto(b.Bytes())
9,987✔
2628
}
2629

2630
// Helper function to build the key.
2631
func keyFromSub(sub *subscription) string {
46,871✔
2632
        var sb strings.Builder
46,871✔
2633
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
46,871✔
2634
        sb.Write(sub.subject)
46,871✔
2635
        if sub.queue != nil {
50,673✔
2636
                // Just make the key subject spc group, e.g. 'foo bar'
3,802✔
2637
                sb.WriteByte(' ')
3,802✔
2638
                sb.Write(sub.queue)
3,802✔
2639
        }
3,802✔
2640
        return sb.String()
46,871✔
2641
}
2642

2643
const (
2644
        keyRoutedSub         = "R"
2645
        keyRoutedSubByte     = 'R'
2646
        keyRoutedLeafSub     = "L"
2647
        keyRoutedLeafSubByte = 'L'
2648
)
2649

2650
// Helper function to build the key that prevents collisions between normal
2651
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2652
// Keys will look like this:
2653
// "R foo"          -> plain routed sub on "foo"
2654
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2655
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2656
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2657
func keyFromSubWithOrigin(sub *subscription) string {
706,839✔
2658
        var sb strings.Builder
706,839✔
2659
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
706,839✔
2660
        leaf := len(sub.origin) > 0
706,839✔
2661
        if leaf {
723,613✔
2662
                sb.WriteByte(keyRoutedLeafSubByte)
16,774✔
2663
        } else {
706,839✔
2664
                sb.WriteByte(keyRoutedSubByte)
690,065✔
2665
        }
690,065✔
2666
        sb.WriteByte(' ')
706,839✔
2667
        sb.Write(sub.subject)
706,839✔
2668
        if sub.queue != nil {
735,179✔
2669
                sb.WriteByte(' ')
28,340✔
2670
                sb.Write(sub.queue)
28,340✔
2671
        }
28,340✔
2672
        if leaf {
723,613✔
2673
                sb.WriteByte(' ')
16,774✔
2674
                sb.Write(sub.origin)
16,774✔
2675
        }
16,774✔
2676
        return sb.String()
706,839✔
2677
}
2678

2679
// Lock should be held.
2680
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
36,945✔
2681
        if key == _EMPTY_ {
36,945✔
2682
                return
×
2683
        }
×
2684
        if n > 0 {
70,154✔
2685
                w.WriteString("LS+ " + key)
33,209✔
2686
                // Check for queue semantics, if found write n.
33,209✔
2687
                if strings.Contains(key, " ") {
35,508✔
2688
                        w.WriteString(" ")
2,299✔
2689
                        var b [12]byte
2,299✔
2690
                        var i = len(b)
2,299✔
2691
                        for l := n; l > 0; l /= 10 {
5,491✔
2692
                                i--
3,192✔
2693
                                b[i] = digits[l%10]
3,192✔
2694
                        }
3,192✔
2695
                        w.Write(b[i:])
2,299✔
2696
                        if c.trace {
2,299✔
2697
                                arg := fmt.Sprintf("%s %d", key, n)
×
2698
                                c.traceOutOp("LS+", []byte(arg))
×
2699
                        }
×
2700
                } else if c.trace {
31,101✔
2701
                        c.traceOutOp("LS+", []byte(key))
191✔
2702
                }
191✔
2703
        } else {
3,736✔
2704
                w.WriteString("LS- " + key)
3,736✔
2705
                if c.trace {
3,751✔
2706
                        c.traceOutOp("LS-", []byte(key))
15✔
2707
                }
15✔
2708
        }
2709
        w.WriteString(CR_LF)
36,945✔
2710
}
2711

2712
// processLeafSub will process an inbound sub request for the remote leaf node.
2713
func (c *client) processLeafSub(argo []byte) (err error) {
32,920✔
2714
        // Indicate activity.
32,920✔
2715
        c.in.subs++
32,920✔
2716

32,920✔
2717
        srv := c.srv
32,920✔
2718
        if srv == nil {
32,920✔
2719
                return nil
×
2720
        }
×
2721

2722
        // Copy so we do not reference a potentially large buffer
2723
        arg := make([]byte, len(argo))
32,920✔
2724
        copy(arg, argo)
32,920✔
2725

32,920✔
2726
        args := splitArg(arg)
32,920✔
2727
        sub := &subscription{client: c}
32,920✔
2728

32,920✔
2729
        delta := int32(1)
32,920✔
2730
        switch len(args) {
32,920✔
2731
        case 1:
30,659✔
2732
                sub.queue = nil
30,659✔
2733
        case 3:
2,261✔
2734
                sub.queue = args[1]
2,261✔
2735
                sub.qw = int32(parseSize(args[2]))
2,261✔
2736
                // TODO: (ik) We should have a non empty queue name and a queue
2,261✔
2737
                // weight >= 1. For 2.11, we may want to return an error if that
2,261✔
2738
                // is not the case, but for now just overwrite `delta` if queue
2,261✔
2739
                // weight is greater than 1 (it is possible after a reconnect/
2,261✔
2740
                // server restart to receive a queue weight > 1 for a new sub).
2,261✔
2741
                if sub.qw > 1 {
3,916✔
2742
                        delta = sub.qw
1,655✔
2743
                }
1,655✔
2744
        default:
×
2745
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2746
        }
2747
        sub.subject = args[0]
32,920✔
2748

32,920✔
2749
        c.mu.Lock()
32,920✔
2750
        if c.isClosed() {
32,932✔
2751
                c.mu.Unlock()
12✔
2752
                return nil
12✔
2753
        }
12✔
2754

2755
        acc := c.acc
32,908✔
2756
        // Guard against LS+ arriving before CONNECT has been processed, which
32,908✔
2757
        // can happen when compression is enabled.
32,908✔
2758
        if acc == nil {
32,911✔
2759
                c.mu.Unlock()
3✔
2760
                c.sendErr("Authorization Violation")
3✔
2761
                c.closeConnection(ProtocolViolation)
3✔
2762
                return nil
3✔
2763
        }
3✔
2764
        // Check if we have a loop.
2765
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
32,905✔
2766

32,905✔
2767
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
32,912✔
2768
                c.mu.Unlock()
7✔
2769
                c.handleLeafNodeLoop(true)
7✔
2770
                return nil
7✔
2771
        }
7✔
2772

2773
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2774
        checkPerms := true
32,898✔
2775
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
62,833✔
2776
                if ldsPrefix ||
29,935✔
2777
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
29,935✔
2778
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
31,983✔
2779
                        checkPerms = false
2,048✔
2780
                }
2,048✔
2781
        }
2782

2783
        // If we are a hub check that we can publish to this subject.
2784
        if checkPerms {
63,748✔
2785
                subj := string(sub.subject)
30,850✔
2786
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
31,175✔
2787
                        c.mu.Unlock()
325✔
2788
                        c.leafSubPermViolation(sub.subject)
325✔
2789
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
325✔
2790
                        return nil
325✔
2791
                }
325✔
2792
        }
2793

2794
        // Check if we have a maximum on the number of subscriptions.
2795
        if c.subsAtLimit() {
32,581✔
2796
                c.mu.Unlock()
8✔
2797
                c.maxSubsExceeded()
8✔
2798
                return nil
8✔
2799
        }
8✔
2800

2801
        // If we have an origin cluster associated mark that in the sub.
2802
        if rc := c.remoteCluster(); rc != _EMPTY_ {
61,509✔
2803
                sub.origin = []byte(rc)
28,944✔
2804
        }
28,944✔
2805

2806
        // Like Routes, we store local subs by account and subject and optionally queue name.
2807
        // If we have a queue it will have a trailing weight which we do not want.
2808
        if sub.queue != nil {
34,536✔
2809
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,971✔
2810
        } else {
32,565✔
2811
                sub.sid = arg
30,594✔
2812
        }
30,594✔
2813
        key := bytesToString(sub.sid)
32,565✔
2814
        osub := c.subs[key]
32,565✔
2815
        if osub == nil {
63,616✔
2816
                c.subs[key] = sub
31,051✔
2817
                // Now place into the account sl.
31,051✔
2818
                if err := acc.sl.Insert(sub); err != nil {
31,051✔
2819
                        delete(c.subs, key)
×
2820
                        c.mu.Unlock()
×
2821
                        c.Errorf("Could not insert subscription: %v", err)
×
2822
                        c.sendErr("Invalid Subscription")
×
2823
                        return nil
×
2824
                }
×
2825
        } else if sub.queue != nil {
3,027✔
2826
                // For a queue we need to update the weight.
1,513✔
2827
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,513✔
2828
                atomic.StoreInt32(&osub.qw, sub.qw)
1,513✔
2829
                acc.sl.UpdateRemoteQSub(osub)
1,513✔
2830
        }
1,513✔
2831
        spoke := c.isSpokeLeafNode()
32,565✔
2832
        c.mu.Unlock()
32,565✔
2833

32,565✔
2834
        // Only add in shadow subs if a new sub or qsub.
32,565✔
2835
        if osub == nil {
63,616✔
2836
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
31,051✔
2837
                        c.Errorf(err.Error())
×
2838
                }
×
2839
        }
2840

2841
        // If we are not solicited, treat leaf node subscriptions similar to a
2842
        // client subscription, meaning we forward them to routes, gateways and
2843
        // other leaf nodes as needed.
2844
        if !spoke {
44,018✔
2845
                // If we are routing add to the route map for the associated account.
11,453✔
2846
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,453✔
2847
                if srv.gateway.enabled {
12,987✔
2848
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,534✔
2849
                }
1,534✔
2850
        }
2851
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2852
        // and non-solicited state in this call so we will do the right thing.
2853
        acc.updateLeafNodes(sub, delta)
32,565✔
2854

32,565✔
2855
        return nil
32,565✔
2856
}
2857

2858
// If the leafnode is a solicited, set the connect delay based on default
2859
// or private option (for tests). Sends the error to the other side, log and
2860
// close the connection.
2861
func (c *client) handleLeafNodeLoop(sendErr bool) {
16✔
2862
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
16✔
2863
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
16✔
2864
        if sendErr {
25✔
2865
                c.sendErr(errTxt)
9✔
2866
        }
9✔
2867

2868
        c.Errorf(errTxt)
16✔
2869
        // If we are here with "sendErr" false, it means that this is the server
16✔
2870
        // that received the error. The other side will have closed the connection,
16✔
2871
        // but does not hurt to close here too.
16✔
2872
        c.closeConnection(ProtocolViolation)
16✔
2873
}
2874

2875
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2876
func (c *client) processLeafUnsub(arg []byte) error {
3,377✔
2877
        // Indicate any activity, so pub and sub or unsubs.
3,377✔
2878
        c.in.subs++
3,377✔
2879

3,377✔
2880
        srv := c.srv
3,377✔
2881

3,377✔
2882
        c.mu.Lock()
3,377✔
2883
        if c.isClosed() {
3,413✔
2884
                c.mu.Unlock()
36✔
2885
                return nil
36✔
2886
        }
36✔
2887

2888
        acc := c.acc
3,341✔
2889
        // Guard against LS- arriving before CONNECT has been processed.
3,341✔
2890
        if acc == nil {
3,342✔
2891
                c.mu.Unlock()
1✔
2892
                c.sendErr("Authorization Violation")
1✔
2893
                c.closeConnection(ProtocolViolation)
1✔
2894
                return nil
1✔
2895
        }
1✔
2896

2897
        spoke := c.isSpokeLeafNode()
3,340✔
2898
        // We store local subs by account and subject and optionally queue name.
3,340✔
2899
        // LS- will have the arg exactly as the key.
3,340✔
2900
        sub, ok := c.subs[string(arg)]
3,340✔
2901
        if !ok {
3,349✔
2902
                // If not found, don't try to update routes/gws/leaf nodes.
9✔
2903
                c.mu.Unlock()
9✔
2904
                return nil
9✔
2905
        }
9✔
2906
        delta := int32(1)
3,331✔
2907
        if len(sub.queue) > 0 {
3,752✔
2908
                delta = sub.qw
421✔
2909
        }
421✔
2910
        c.mu.Unlock()
3,331✔
2911

3,331✔
2912
        c.unsubscribe(acc, sub, true, true)
3,331✔
2913
        if !spoke {
4,339✔
2914
                // If we are routing subtract from the route map for the associated account.
1,008✔
2915
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,008✔
2916
                // Gateways
1,008✔
2917
                if srv.gateway.enabled {
1,278✔
2918
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
270✔
2919
                }
270✔
2920
        }
2921
        // Now check on leafnode updates for other leaf nodes.
2922
        acc.updateLeafNodes(sub, -delta)
3,331✔
2923
        return nil
3,331✔
2924
}
2925

2926
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
498✔
2927
        // Unroll splitArgs to avoid runtime/heap issues
498✔
2928
        args := c.argsa[:0]
498✔
2929
        start := -1
498✔
2930
        for i, b := range arg {
33,119✔
2931
                switch b {
32,621✔
2932
                case ' ', '\t', '\r', '\n':
1,424✔
2933
                        if start >= 0 {
2,848✔
2934
                                args = append(args, arg[start:i])
1,424✔
2935
                                start = -1
1,424✔
2936
                        }
1,424✔
2937
                default:
31,197✔
2938
                        if start < 0 {
33,119✔
2939
                                start = i
1,922✔
2940
                        }
1,922✔
2941
                }
2942
        }
2943
        if start >= 0 {
996✔
2944
                args = append(args, arg[start:])
498✔
2945
        }
498✔
2946

2947
        c.pa.arg = arg
498✔
2948
        switch len(args) {
498✔
2949
        case 0, 1, 2:
×
2950
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2951
        case 3:
88✔
2952
                c.pa.reply = nil
88✔
2953
                c.pa.queues = nil
88✔
2954
                c.pa.hdb = args[1]
88✔
2955
                c.pa.hdr = parseSize(args[1])
88✔
2956
                c.pa.szb = args[2]
88✔
2957
                c.pa.size = parseSize(args[2])
88✔
2958
        case 4:
396✔
2959
                c.pa.reply = args[1]
396✔
2960
                c.pa.queues = nil
396✔
2961
                c.pa.hdb = args[2]
396✔
2962
                c.pa.hdr = parseSize(args[2])
396✔
2963
                c.pa.szb = args[3]
396✔
2964
                c.pa.size = parseSize(args[3])
396✔
2965
        default:
14✔
2966
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2967
                if len(args[1]) != 1 {
14✔
2968
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2969
                }
×
2970
                switch args[1][0] {
14✔
2971
                case '+':
4✔
2972
                        c.pa.reply = args[2]
4✔
2973
                case '|':
10✔
2974
                        c.pa.reply = nil
10✔
2975
                default:
×
2976
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2977
                }
2978
                // Grab header size.
2979
                c.pa.hdb = args[len(args)-2]
14✔
2980
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2981

14✔
2982
                // Grab size.
14✔
2983
                c.pa.szb = args[len(args)-1]
14✔
2984
                c.pa.size = parseSize(c.pa.szb)
14✔
2985

14✔
2986
                // Grab queue names.
14✔
2987
                if c.pa.reply != nil {
18✔
2988
                        c.pa.queues = args[3 : len(args)-2]
4✔
2989
                } else {
14✔
2990
                        c.pa.queues = args[2 : len(args)-2]
10✔
2991
                }
10✔
2992
        }
2993
        if c.pa.hdr < 0 {
498✔
2994
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2995
        }
×
2996
        if c.pa.size < 0 {
498✔
2997
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2998
        }
×
2999
        if c.pa.hdr > c.pa.size {
498✔
3000
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3001
        }
×
3002

3003
        // Common ones processed after check for arg length
3004
        c.pa.subject = args[0]
498✔
3005

498✔
3006
        return nil
498✔
3007
}
3008

3009
func (c *client) processLeafMsgArgs(arg []byte) error {
45,286✔
3010
        // Unroll splitArgs to avoid runtime/heap issues
45,286✔
3011
        args := c.argsa[:0]
45,286✔
3012
        start := -1
45,286✔
3013
        for i, b := range arg {
1,475,179✔
3014
                switch b {
1,429,893✔
3015
                case ' ', '\t', '\r', '\n':
97,204✔
3016
                        if start >= 0 {
194,408✔
3017
                                args = append(args, arg[start:i])
97,204✔
3018
                                start = -1
97,204✔
3019
                        }
97,204✔
3020
                default:
1,332,689✔
3021
                        if start < 0 {
1,475,179✔
3022
                                start = i
142,490✔
3023
                        }
142,490✔
3024
                }
3025
        }
3026
        if start >= 0 {
90,572✔
3027
                args = append(args, arg[start:])
45,286✔
3028
        }
45,286✔
3029

3030
        c.pa.arg = arg
45,286✔
3031
        switch len(args) {
45,286✔
3032
        case 0, 1:
×
3033
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3034
        case 2:
16,091✔
3035
                c.pa.reply = nil
16,091✔
3036
                c.pa.queues = nil
16,091✔
3037
                c.pa.szb = args[1]
16,091✔
3038
                c.pa.size = parseSize(args[1])
16,091✔
3039
        case 3:
6,634✔
3040
                c.pa.reply = args[1]
6,634✔
3041
                c.pa.queues = nil
6,634✔
3042
                c.pa.szb = args[2]
6,634✔
3043
                c.pa.size = parseSize(args[2])
6,634✔
3044
        default:
22,561✔
3045
                // args[1] is our reply indicator. Should be + or | normally.
22,561✔
3046
                if len(args[1]) != 1 {
22,561✔
3047
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3048
                }
×
3049
                switch args[1][0] {
22,561✔
3050
                case '+':
162✔
3051
                        c.pa.reply = args[2]
162✔
3052
                case '|':
22,399✔
3053
                        c.pa.reply = nil
22,399✔
3054
                default:
×
3055
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3056
                }
3057
                // Grab size.
3058
                c.pa.szb = args[len(args)-1]
22,561✔
3059
                c.pa.size = parseSize(c.pa.szb)
22,561✔
3060

22,561✔
3061
                // Grab queue names.
22,561✔
3062
                if c.pa.reply != nil {
22,723✔
3063
                        c.pa.queues = args[3 : len(args)-1]
162✔
3064
                } else {
22,561✔
3065
                        c.pa.queues = args[2 : len(args)-1]
22,399✔
3066
                }
22,399✔
3067
        }
3068
        if c.pa.size < 0 {
45,286✔
3069
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3070
        }
×
3071

3072
        // Common ones processed after check for arg length
3073
        c.pa.subject = args[0]
45,286✔
3074

45,286✔
3075
        return nil
45,286✔
3076
}
3077

3078
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3079
func (c *client) processInboundLeafMsg(msg []byte) {
44,296✔
3080
        // Update statistics
44,296✔
3081
        // The msg includes the CR_LF, so pull back out for accounting.
44,296✔
3082
        c.in.msgs++
44,296✔
3083
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
44,296✔
3084

44,296✔
3085
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
44,296✔
3086

44,296✔
3087
        // Mostly under testing scenarios.
44,296✔
3088
        if srv == nil || acc == nil {
44,297✔
3089
                return
1✔
3090
        }
1✔
3091

3092
        // Match the subscriptions. We will use our own L1 map if
3093
        // it's still valid, avoiding contention on the shared sublist.
3094
        var r *SublistResult
44,295✔
3095
        var ok bool
44,295✔
3096

44,295✔
3097
        genid := atomic.LoadUint64(&c.acc.sl.genid)
44,295✔
3098
        if genid == c.in.genid && c.in.results != nil {
86,200✔
3099
                r, ok = c.in.results[subject]
41,905✔
3100
        } else {
44,295✔
3101
                // Reset our L1 completely.
2,390✔
3102
                c.in.results = make(map[string]*SublistResult)
2,390✔
3103
                c.in.genid = genid
2,390✔
3104
        }
2,390✔
3105

3106
        // Go back to the sublist data structure.
3107
        if !ok {
57,851✔
3108
                r = c.acc.sl.Match(subject)
13,556✔
3109
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
13,556✔
3110
                if len(c.in.results) >= maxResultCacheSize {
13,789✔
3111
                        n := 0
233✔
3112
                        for subj := range c.in.results {
7,922✔
3113
                                delete(c.in.results, subj)
7,689✔
3114
                                if n++; n > pruneSize {
7,922✔
3115
                                        break
233✔
3116
                                }
3117
                        }
3118
                }
3119
                // Then add the new cache entry.
3120
                c.in.results[subject] = r
13,556✔
3121
        }
3122

3123
        // Collect queue names if needed.
3124
        var qnames [][]byte
44,295✔
3125

44,295✔
3126
        // Check for no interest, short circuit if so.
44,295✔
3127
        // This is the fanout scale.
44,295✔
3128
        if len(r.psubs)+len(r.qsubs) > 0 {
88,321✔
3129
                flag := pmrNoFlag
44,026✔
3130
                // If we have queue subs in this cluster, then if we run in gateway
44,026✔
3131
                // mode and the remote gateways have queue subs, then we need to
44,026✔
3132
                // collect the queue groups this message was sent to so that we
44,026✔
3133
                // exclude them when sending to gateways.
44,026✔
3134
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
44,026✔
3135
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
56,324✔
3136
                        flag |= pmrCollectQueueNames
12,298✔
3137
                }
12,298✔
3138
                // If this is a mapped subject that means the mapped interest
3139
                // is what got us here, but this might not have a queue designation
3140
                // If that is the case, make sure we ignore to process local queue subscribers.
3141
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
44,364✔
3142
                        flag |= pmrIgnoreEmptyQueueFilter
338✔
3143
                }
338✔
3144
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
44,026✔
3145
        }
3146

3147
        // Now deal with gateways
3148
        if c.srv.gateway.enabled {
57,624✔
3149
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,329✔
3150
        }
13,329✔
3151
}
3152

3153
// Handles a subscription permission violation.
3154
// See leafPermViolation() for details.
3155
func (c *client) leafSubPermViolation(subj []byte) {
325✔
3156
        c.leafPermViolation(false, subj)
325✔
3157
}
325✔
3158

3159
// Common function to process publish or subscribe leafnode permission violation.
3160
// Sends the permission violation error to the remote, logs it and closes the connection.
3161
// If this is from a server soliciting, the reconnection will be delayed.
3162
func (c *client) leafPermViolation(pub bool, subj []byte) {
325✔
3163
        if c.isSpokeLeafNode() {
650✔
3164
                // For spokes these are no-ops since the hub server told us our permissions.
325✔
3165
                // We just need to not send these over to the other side since we will get cutoff.
325✔
3166
                return
325✔
3167
        }
325✔
3168
        // FIXME(dlc) ?
3169
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3170
        var action string
×
3171
        if pub {
×
3172
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3173
                action = "Publish"
×
3174
        } else {
×
3175
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3176
                action = "Subscription"
×
3177
        }
×
3178
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3179
        // TODO: add a new close reason that is more appropriate?
×
3180
        c.closeConnection(ProtocolViolation)
×
3181
}
3182

3183
// Invoked from generic processErr() for LEAF connections.
3184
func (c *client) leafProcessErr(errStr string) {
51✔
3185
        // Check if we got a cluster name collision.
51✔
3186
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
54✔
3187
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3188
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3189
                return
3✔
3190
        }
3✔
3191
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
49✔
3192
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3193
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3194
                return
1✔
3195
        }
1✔
3196

3197
        // We will look for Loop detected error coming from the other side.
3198
        // If we solicit, set the connect delay.
3199
        if !strings.Contains(errStr, "Loop detected") {
87✔
3200
                return
40✔
3201
        }
40✔
3202
        c.handleLeafNodeLoop(false)
7✔
3203
}
3204

3205
// If this leaf connection solicits, sets the connect delay to the given value,
3206
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3207
// Returns the connection's account name and delay.
3208
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
20✔
3209
        c.mu.Lock()
20✔
3210
        if c.isSolicitedLeafNode() {
32✔
3211
                if s := c.srv; s != nil {
24✔
3212
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
17✔
3213
                                delay = srvdelay
5✔
3214
                        }
5✔
3215
                }
3216
                c.leaf.remote.setConnectDelay(delay)
12✔
3217
        }
3218
        accName := c.acc.Name
20✔
3219
        c.mu.Unlock()
20✔
3220
        return accName, delay
20✔
3221
}
3222

3223
// For the given remote Leafnode configuration, this function returns
3224
// if TLS is required, and if so, will return a clone of the TLS Config
3225
// (since some fields will be changed during handshake), the TLS server
3226
// name that is remembered, and the TLS timeout.
3227
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,968✔
3228
        var (
1,968✔
3229
                tlsConfig  *tls.Config
1,968✔
3230
                tlsName    string
1,968✔
3231
                tlsTimeout float64
1,968✔
3232
        )
1,968✔
3233

1,968✔
3234
        remote.RLock()
1,968✔
3235
        defer remote.RUnlock()
1,968✔
3236

1,968✔
3237
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,968✔
3238
        if tlsRequired {
2,043✔
3239
                if remote.TLSConfig != nil {
126✔
3240
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3241
                } else {
75✔
3242
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
24✔
3243
                }
24✔
3244
                tlsName = remote.tlsName
75✔
3245
                tlsTimeout = remote.TLSTimeout
75✔
3246
                if tlsTimeout == 0 {
116✔
3247
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
41✔
3248
                }
41✔
3249
        }
3250

3251
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,968✔
3252
}
3253

3254
// Initiates the LeafNode Websocket connection by:
3255
// - doing the TLS handshake if needed
3256
// - sending the HTTP request
3257
// - waiting for the HTTP response
3258
//
3259
// Since some bufio reader is used to consume the HTTP response, this function
3260
// returns the slice of buffered bytes (if any) so that the readLoop that will
3261
// be started after that consume those first before reading from the socket.
3262
// The boolean
3263
//
3264
// Lock held on entry.
3265
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3266
        remote.RLock()
50✔
3267
        compress := remote.Websocket.Compression
50✔
3268
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3269
        noMasking := remote.Websocket.NoMasking
50✔
3270
        infoTimeout := remote.FirstInfoTimeout
50✔
3271
        remote.RUnlock()
50✔
3272
        // Will do the client-side TLS handshake if needed.
50✔
3273
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3274
        if err != nil {
54✔
3275
                // 0 will indicate that the connection was already closed
4✔
3276
                return nil, 0, err
4✔
3277
        }
4✔
3278

3279
        // For http request, we need the passed URL to contain either http or https scheme.
3280
        scheme := "http"
46✔
3281
        if tlsRequired {
54✔
3282
                scheme = "https"
8✔
3283
        }
8✔
3284
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3285
        // create a LEAF connection, not a CLIENT.
3286
        // In case we use the user's URL path in the future, make sure we append the user's
3287
        // path to our `/leafnode` path.
3288
        lpath := leafNodeWSPath
46✔
3289
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3290
                if curPath[0] == '/' {
42✔
3291
                        curPath = curPath[1:]
21✔
3292
                }
21✔
3293
                lpath = path.Join(curPath, lpath)
21✔
3294
        } else {
25✔
3295
                lpath = lpath[1:]
25✔
3296
        }
25✔
3297
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3298
        u, _ := url.Parse(ustr)
46✔
3299
        req := &http.Request{
46✔
3300
                Method:     "GET",
46✔
3301
                URL:        u,
46✔
3302
                Proto:      "HTTP/1.1",
46✔
3303
                ProtoMajor: 1,
46✔
3304
                ProtoMinor: 1,
46✔
3305
                Header:     make(http.Header),
46✔
3306
                Host:       u.Host,
46✔
3307
        }
46✔
3308
        wsKey, err := wsMakeChallengeKey()
46✔
3309
        if err != nil {
46✔
3310
                return nil, WriteError, err
×
3311
        }
×
3312

3313
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3314
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3315
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3316
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3317
        if compress {
55✔
3318
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3319
        }
9✔
3320
        if noMasking {
56✔
3321
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3322
        }
10✔
3323
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3324
        if err := req.Write(c.nc); err != nil {
46✔
3325
                return nil, WriteError, err
×
3326
        }
×
3327

3328
        var resp *http.Response
46✔
3329

46✔
3330
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3331
        resp, err = http.ReadResponse(br, req)
46✔
3332
        if err == nil &&
46✔
3333
                (resp.StatusCode != 101 ||
46✔
3334
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3335
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3336
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3337

1✔
3338
                err = fmt.Errorf("invalid websocket connection")
1✔
3339
        }
1✔
3340
        // Check compression extension...
3341
        if err == nil && c.ws.compress {
55✔
3342
                // Check that not only permessage-deflate extension is present, but that
9✔
3343
                // we also have server and client no context take over.
9✔
3344
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3345

9✔
3346
                // If server does not support compression, then simply disable it in our side.
9✔
3347
                if !srvCompress {
13✔
3348
                        c.ws.compress = false
4✔
3349
                } else if !noCtxTakeover {
9✔
3350
                        err = fmt.Errorf("compression negotiation error")
×
3351
                }
×
3352
        }
3353
        // Same for no masking...
3354
        if err == nil && noMasking {
56✔
3355
                // Check if server accepts no masking
10✔
3356
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3357
                        // Nope, need to mask our writes as any client would do.
1✔
3358
                        c.ws.maskwrite = true
1✔
3359
                }
1✔
3360
        }
3361
        if resp != nil {
76✔
3362
                resp.Body.Close()
30✔
3363
        }
30✔
3364
        if err != nil {
63✔
3365
                return nil, ReadError, err
17✔
3366
        }
17✔
3367
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3368

29✔
3369
        var preBuf []byte
29✔
3370
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3371
        if n := br.Buffered(); n != 0 {
31✔
3372
                preBuf, _ = br.Peek(n)
2✔
3373
        }
2✔
3374
        return preBuf, 0, nil
29✔
3375
}
3376

3377
const connectProcessTimeout = 2 * time.Second
3378

3379
// This is invoked for remote LEAF remote connections after processing the INFO
3380
// protocol.
3381
func (s *Server) leafNodeResumeConnectProcess(c *client) {
687✔
3382
        clusterName := s.ClusterName()
687✔
3383

687✔
3384
        c.mu.Lock()
687✔
3385
        if c.isClosed() {
687✔
3386
                c.mu.Unlock()
×
3387
                return
×
3388
        }
×
3389
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
689✔
3390
                c.mu.Unlock()
2✔
3391
                c.closeConnection(WriteError)
2✔
3392
                return
2✔
3393
        }
2✔
3394

3395
        // Spin up the write loop.
3396
        s.startGoRoutine(func() { c.writeLoop() })
1,370✔
3397

3398
        // timeout leafNodeFinishConnectProcess
3399
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
685✔
3400
                c.mu.Lock()
×
3401
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3402
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3403
                        c.mu.Unlock()
×
3404
                        return
×
3405
                }
×
3406
                clearTimer(&c.ping.tmr)
×
3407
                closed := c.isClosed()
×
3408
                c.mu.Unlock()
×
3409
                if !closed {
×
3410
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3411
                        c.closeConnection(StaleConnection)
×
3412
                }
×
3413
        })
3414
        c.mu.Unlock()
685✔
3415
        c.Debugf("Remote leafnode connect msg sent")
685✔
3416
}
3417

3418
// This is invoked for remote LEAF connections after processing the INFO
3419
// protocol and leafNodeResumeConnectProcess.
3420
// This will send LS+ the CONNECT protocol and register the leaf node.
3421
func (s *Server) leafNodeFinishConnectProcess(c *client) {
647✔
3422
        c.mu.Lock()
647✔
3423
        if !c.flags.setIfNotSet(connectProcessFinished) {
647✔
3424
                c.mu.Unlock()
×
3425
                return
×
3426
        }
×
3427
        if c.isClosed() {
647✔
3428
                c.mu.Unlock()
×
3429
                s.removeLeafNodeConnection(c)
×
3430
                return
×
3431
        }
×
3432
        remote := c.leaf.remote
647✔
3433
        // Check if we will need to send the system connect event.
647✔
3434
        remote.RLock()
647✔
3435
        sendSysConnectEvent := remote.Hub
647✔
3436
        remote.RUnlock()
647✔
3437

647✔
3438
        // Capture account before releasing lock
647✔
3439
        acc := c.acc
647✔
3440
        // cancel connectProcessTimeout
647✔
3441
        clearTimer(&c.ping.tmr)
647✔
3442
        c.mu.Unlock()
647✔
3443

647✔
3444
        // Make sure we register with the account here.
647✔
3445
        if err := c.registerWithAccount(acc); err != nil {
649✔
3446
                if err == ErrTooManyAccountConnections {
2✔
3447
                        c.maxAccountConnExceeded()
×
3448
                        return
×
3449
                } else if err == ErrLeafNodeLoop {
4✔
3450
                        c.handleLeafNodeLoop(true)
2✔
3451
                        return
2✔
3452
                }
2✔
3453
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3454
                c.closeConnection(ProtocolViolation)
×
3455
                return
×
3456
        }
3457
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
645✔
3458
        s.initLeafNodeSmapAndSendSubs(c)
645✔
3459
        if sendSysConnectEvent {
661✔
3460
                s.sendLeafNodeConnect(acc)
16✔
3461
        }
16✔
3462

3463
        // The above functions are not atomically under the client
3464
        // lock doing those operations. It is possible - since we
3465
        // have started the read/write loops - that the connection
3466
        // is closed before or in between. This would leave the
3467
        // closed LN connection possible registered with the account
3468
        // and/or the server's leafs map. So check if connection
3469
        // is closed, and if so, manually cleanup.
3470
        c.mu.Lock()
645✔
3471
        closed := c.isClosed()
645✔
3472
        if !closed {
1,290✔
3473
                c.setFirstPingTimer()
645✔
3474
        }
645✔
3475
        c.mu.Unlock()
645✔
3476
        if closed {
645✔
3477
                s.removeLeafNodeConnection(c)
×
3478
                if prev := acc.removeClient(c); prev == 1 {
×
3479
                        s.decActiveAccounts()
×
3480
                }
×
3481
        }
3482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc