• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 22212128311

19 Feb 2026 03:38PM UTC coverage: 83.794% (-0.2%) from 84.003%
22212128311

push

github

web-flow
NRG: Fix cluster size drop to 1 on replaying EntryAddPeer after restart (#7850)

On restart, replaying EntryAddPeer could incorrectly leave a raft node
at cluster size 1 instead of restoring the expected size and quorum from
persisted state.
This bug could lead to the following scenario: a node in a 3 node
cluster could restart, reset set cluster size to 1. If the node did not
receive any message from other nodes, it could campaign to become
leader. Being in a single node cluster, it would win the election.
Resulting in the original cluster splitting into two clusters (or two
leaders at the same time).
Specifically, if an EntryAddPeer was replayed on from the log, it would
overwrite the cluster size and quorum to 1. The peer set is now restored
before the log is replayed, and it is taken from the snapshot (if no
snapshot is present then we fallback to peer.idx).
If a log entry that changes membership is replayed, it will now update
the cluster and quorum size correctly.

Signed-off-by: Daniele Sciascia <daniele@nats.io>

74376 of 88761 relevant lines covered (83.79%)

353483.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.2
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remote server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Whether or not we want to propagate east-west interest from other LNs.
86
        isolated bool
87
        // Used to suppress sub and unsub interest. Same as routes but our audience
88
        // here is tied to this leaf node. This will hold all subscriptions except this
89
        // leaf nodes. This represents all the interest we want to send to the other side.
90
        smap map[string]int32
91
        // This map will contain all the subscriptions that have been added to the smap
92
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
93
        // race between processing of a sub where sub is added to account sublist but
94
        // updateSmap has not be called on that "thread", while in the LN readloop,
95
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
96
        // this subscription to smap. When processing of the sub then calls updateSmap,
97
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
98
        tsub  map[*subscription]struct{}
99
        tsubt *time.Timer
100
        // Selected compression mode, which may be different from the server configured mode.
101
        compression string
102
        // This is for GW map replies.
103
        gwSub *subscription
104
}
105

106
// Used for remote (solicited) leafnodes.
107
type leafNodeCfg struct {
108
        sync.RWMutex
109
        *RemoteLeafOpts
110
        urls           []*url.URL
111
        curURL         *url.URL
112
        tlsName        string
113
        username       string
114
        password       string
115
        perms          *Permissions
116
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
117
        jsMigrateTimer *time.Timer
118
}
119

120
// Check to see if this is a solicited leafnode. We do special processing for solicited.
121
func (c *client) isSolicitedLeafNode() bool {
2,103✔
122
        return c.kind == LEAF && c.leaf.remote != nil
2,103✔
123
}
2,103✔
124

125
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
126
// connection leafnode where the otherside has declared itself to be the hub.
127
func (c *client) isSpokeLeafNode() bool {
6,661,133✔
128
        return c.kind == LEAF && c.leaf.isSpoke
6,661,133✔
129
}
6,661,133✔
130

131
func (c *client) isHubLeafNode() bool {
17,886✔
132
        return c.kind == LEAF && !c.leaf.isSpoke
17,886✔
133
}
17,886✔
134

135
func (c *client) isIsolatedLeafNode() bool {
10,652✔
136
        // TODO(nat): In future we may want to pass in and consider an isolation
10,652✔
137
        // group name here, which the hub and/or leaf could provide, so that we
10,652✔
138
        // can isolate away certain LNs but not others on an opt-in basis. For
10,652✔
139
        // now we will just isolate all LN interest until then.
10,652✔
140
        return c.kind == LEAF && c.leaf.isolated
10,652✔
141
}
10,652✔
142

143
// This will spin up go routines to solicit the remote leaf node connections.
144
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,208✔
145
        sysAccName := _EMPTY_
1,208✔
146
        sAcc := s.SystemAccount()
1,208✔
147
        if sAcc != nil {
2,393✔
148
                sysAccName = sAcc.Name
1,185✔
149
        }
1,185✔
150
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,562✔
151
                s.mu.Lock()
1,354✔
152
                remote := newLeafNodeCfg(r)
1,354✔
153
                creds := remote.Credentials
1,354✔
154
                accName := remote.LocalAccount
1,354✔
155
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
1,354✔
156
                // Print notice if
1,354✔
157
                if isSysAccRemote {
1,447✔
158
                        if len(remote.DenyExports) > 0 {
94✔
159
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
160
                        }
1✔
161
                        if len(remote.DenyImports) > 0 {
94✔
162
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
163
                        }
1✔
164
                }
165
                s.mu.Unlock()
1,354✔
166
                if creds != _EMPTY_ {
1,406✔
167
                        contents, err := os.ReadFile(creds)
52✔
168
                        defer wipeSlice(contents)
52✔
169
                        if err != nil {
52✔
170
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
171
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
172
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
173
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
174
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
175
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
176
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
177
                        } else if isSysAccRemote {
56✔
178
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
179
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
180
                                }
1✔
181
                        } else {
48✔
182
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
183
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
184
                                }
6✔
185
                        }
186
                }
187
                return remote
1,354✔
188
        }
189
        for _, r := range remotes {
2,562✔
190
                // We need to call this, even if the leaf is disabled. This is so that
1,354✔
191
                // the number of internal configuration matches the options' remote leaf
1,354✔
192
                // configuration required for configuration reload.
1,354✔
193
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,354✔
194
                if !r.Disabled {
2,707✔
195
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
2,706✔
196
                }
197
        }
198
}
199

200
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
7,516✔
201
        if remote.Disabled {
7,517✔
202
                return false
1✔
203
        }
1✔
204
        for _, ri := range s.getOpts().LeafNode.Remotes {
15,405✔
205
                // FIXME(dlc) - What about auth changes?
7,890✔
206
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
15,405✔
207
                        return true
7,515✔
208
                }
7,515✔
209
        }
210
        return false
×
211
}
212

213
// Ensure that leafnode is properly configured.
214
func validateLeafNode(o *Options) error {
8,060✔
215
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,062✔
216
                return err
2✔
217
        }
2✔
218

219
        // Users can bind to any local account, if its empty we will assume the $G account.
220
        for _, r := range o.LeafNode.Remotes {
9,457✔
221
                if r.LocalAccount == _EMPTY_ {
1,838✔
222
                        r.LocalAccount = globalAccountName
439✔
223
                }
439✔
224
        }
225

226
        // In local config mode, check that leafnode configuration refers to accounts that exist.
227
        if len(o.TrustedOperators) == 0 {
15,797✔
228
                accNames := map[string]struct{}{}
7,739✔
229
                for _, a := range o.Accounts {
16,453✔
230
                        accNames[a.Name] = struct{}{}
8,714✔
231
                }
8,714✔
232
                // global account is always created
233
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,739✔
234
                // in the context of leaf nodes, empty account means global account
7,739✔
235
                accNames[_EMPTY_] = struct{}{}
7,739✔
236
                // system account either exists or, if not disabled, will be created
7,739✔
237
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,919✔
238
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,180✔
239
                }
6,180✔
240
                checkAccountExists := func(accName string, cfgType string) error {
16,883✔
241
                        if _, ok := accNames[accName]; !ok {
9,146✔
242
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
243
                        }
2✔
244
                        return nil
9,142✔
245
                }
246
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,740✔
247
                        return err
1✔
248
                }
1✔
249
                for _, lu := range o.LeafNode.Users {
7,755✔
250
                        if lu.Account == nil { // means global account
27✔
251
                                continue
10✔
252
                        }
253
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
254
                                return err
×
255
                        }
×
256
                }
257
                for _, r := range o.LeafNode.Remotes {
9,136✔
258
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,399✔
259
                                return err
1✔
260
                        }
1✔
261
                }
262
        } else {
319✔
263
                if len(o.LeafNode.Users) != 0 {
320✔
264
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
265
                }
1✔
266
                for _, r := range o.LeafNode.Remotes {
319✔
267
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
268
                                return fmt.Errorf(
1✔
269
                                        "operator mode requires account nkeys in remotes. " +
1✔
270
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
271
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
272
                        }
1✔
273
                }
274
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
318✔
275
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
276
                }
1✔
277
        }
278

279
        // Validate compression settings
280
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,483✔
281
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,435✔
282
                        return err
5✔
283
                }
5✔
284
        }
285

286
        // If a remote has a websocket scheme, all need to have it.
287
        for _, rcfg := range o.LeafNode.Remotes {
9,445✔
288
                // Validate proxy configuration
1,397✔
289
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,403✔
290
                        return err
6✔
291
                }
6✔
292

293
                if len(rcfg.URLs) >= 2 {
1,593✔
294
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
202✔
295
                        for i := 1; i < len(rcfg.URLs); i++ {
641✔
296
                                u := rcfg.URLs[i]
439✔
297
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
446✔
298
                                        ok = false
7✔
299
                                        break
7✔
300
                                }
301
                        }
302
                        if !ok {
209✔
303
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
304
                        }
7✔
305
                }
306
                // Validate compression settings
307
                if rcfg.Compression.Mode != _EMPTY_ {
2,764✔
308
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,385✔
309
                                return err
5✔
310
                        }
5✔
311
                }
312
        }
313

314
        if o.LeafNode.Port == 0 {
12,216✔
315
                return nil
4,186✔
316
        }
4,186✔
317

318
        // If MinVersion is defined, check that it is valid.
319
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,848✔
320
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
321
                        return err
2✔
322
                }
2✔
323
        }
324

325
        // The checks below will be done only when detecting that we are configured
326
        // with gateways. So if an option validation needs to be done regardless,
327
        // it MUST be done before this point!
328

329
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,999✔
330
                return nil
3,157✔
331
        }
3,157✔
332
        // If we are here we have both leaf nodes and gateways defined, make sure there
333
        // is a system account defined.
334
        if o.SystemAccount == _EMPTY_ {
686✔
335
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
336
        }
1✔
337
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
684✔
338
                return fmt.Errorf("leafnode: %v", err)
×
339
        }
×
340
        return nil
684✔
341
}
342

343
func checkLeafMinVersionConfig(mv string) error {
8✔
344
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
345
                if err != nil {
6✔
346
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
347
                } else {
4✔
348
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
349
                }
2✔
350
        }
351
        return nil
4✔
352
}
353

354
// Used to validate user names in LeafNode configuration.
355
// - rejects mix of single and multiple users.
356
// - rejects duplicate user names.
357
func validateLeafNodeAuthOptions(o *Options) error {
8,119✔
358
        if len(o.LeafNode.Users) == 0 {
16,211✔
359
                return nil
8,092✔
360
        }
8,092✔
361
        if o.LeafNode.Username != _EMPTY_ {
29✔
362
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
363
        }
2✔
364
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
365
                return fmt.Errorf("can not have a single nkey and a users array")
×
366
        }
×
367
        users := map[string]struct{}{}
25✔
368
        for _, u := range o.LeafNode.Users {
66✔
369
                if _, exists := users[u.Username]; exists {
43✔
370
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
371
                }
2✔
372
                users[u.Username] = struct{}{}
39✔
373
        }
374
        return nil
23✔
375
}
376

377
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
1,988✔
378
        var warnings []string
1,988✔
379

1,988✔
380
        if remote.Proxy.URL == _EMPTY_ {
3,952✔
381
                return warnings, nil
1,964✔
382
        }
1,964✔
383

384
        proxyURL, err := url.Parse(remote.Proxy.URL)
24✔
385
        if err != nil {
25✔
386
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
387
        }
1✔
388

389
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
25✔
390
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
391
        }
2✔
392

393
        if proxyURL.Host == _EMPTY_ {
23✔
394
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
395
        }
2✔
396

397
        if remote.Proxy.Timeout < 0 {
20✔
398
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
399
        }
1✔
400

401
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
22✔
402
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
403
        }
4✔
404

405
        if len(remote.URLs) > 0 {
28✔
406
                hasWebSocketURL := false
14✔
407
                hasNonWebSocketURL := false
14✔
408

14✔
409
                for _, remoteURL := range remote.URLs {
29✔
410
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
28✔
411
                                hasWebSocketURL = true
13✔
412
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
413
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
414
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
415
                                }
1✔
416
                        } else {
2✔
417
                                hasNonWebSocketURL = true
2✔
418
                        }
2✔
419
                }
420

421
                if !hasWebSocketURL {
14✔
422
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
1✔
423
                } else if hasNonWebSocketURL {
14✔
424
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
425
                }
1✔
426
        }
427

428
        return warnings, nil
13✔
429
}
430

431
// Update remote LeafNode TLS configurations after a config reload.
432
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
8✔
433
        max := len(opts.LeafNode.Remotes)
8✔
434
        if max == 0 {
8✔
435
                return
×
436
        }
×
437

438
        s.mu.RLock()
8✔
439
        defer s.mu.RUnlock()
8✔
440

8✔
441
        // Changes in the list of remote leaf nodes is not supported.
8✔
442
        // However, make sure that we don't go over the arrays.
8✔
443
        if len(s.leafRemoteCfgs) < max {
8✔
444
                max = len(s.leafRemoteCfgs)
×
445
        }
×
446
        for i := 0; i < max; i++ {
21✔
447
                ro := opts.LeafNode.Remotes[i]
13✔
448
                cfg := s.leafRemoteCfgs[i]
13✔
449
                if ro.TLSConfig != nil {
15✔
450
                        cfg.Lock()
2✔
451
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
452
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
453
                        cfg.Unlock()
2✔
454
                }
2✔
455
        }
456
}
457

458
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
244✔
459
        delay := s.getOpts().LeafNode.ReconnectInterval
244✔
460
        select {
244✔
461
        case <-time.After(delay):
187✔
462
        case <-s.quitCh:
57✔
463
                s.grWG.Done()
57✔
464
                return
57✔
465
        }
466
        s.connectToRemoteLeafNode(remote, false)
187✔
467
}
468

469
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
470
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,354✔
471
        cfg := &leafNodeCfg{
1,354✔
472
                RemoteLeafOpts: remote,
1,354✔
473
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,354✔
474
        }
1,354✔
475
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,362✔
476
                perms := &Permissions{}
8✔
477
                if len(remote.DenyExports) > 0 {
16✔
478
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
479
                }
8✔
480
                if len(remote.DenyImports) > 0 {
15✔
481
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
482
                }
7✔
483
                cfg.perms = perms
8✔
484
        }
485
        // Start with the one that is configured. We will add to this
486
        // array when receiving async leafnode INFOs.
487
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,354✔
488
        // If allowed to randomize, do it on our copy of URLs
1,354✔
489
        if !remote.NoRandomize {
2,706✔
490
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,753✔
491
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
401✔
492
                })
401✔
493
        }
494
        // If we are TLS make sure we save off a proper servername if possible.
495
        // Do same for user/password since we may need them to connect to
496
        // a bare URL that we get from INFO protocol.
497
        for _, u := range cfg.urls {
3,139✔
498
                cfg.saveTLSHostname(u)
1,785✔
499
                cfg.saveUserPassword(u)
1,785✔
500
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,785✔
501
                // config, mark that we should be using TLS anyway.
1,785✔
502
                if !cfg.TLS && isWSSURL(u) {
1,786✔
503
                        cfg.TLS = true
1✔
504
                }
1✔
505
        }
506
        return cfg
1,354✔
507
}
508

509
// Will pick an URL from the list of available URLs.
510
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,706✔
511
        cfg.Lock()
6,706✔
512
        defer cfg.Unlock()
6,706✔
513
        // If the current URL is the first in the list and we have more than
6,706✔
514
        // one URL, then move that one to end of the list.
6,706✔
515
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
9,998✔
516
                first := cfg.urls[0]
3,292✔
517
                copy(cfg.urls, cfg.urls[1:])
3,292✔
518
                cfg.urls[len(cfg.urls)-1] = first
3,292✔
519
        }
3,292✔
520
        cfg.curURL = cfg.urls[0]
6,706✔
521
        return cfg.curURL
6,706✔
522
}
523

524
// Returns the current URL
525
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
83✔
526
        cfg.RLock()
83✔
527
        defer cfg.RUnlock()
83✔
528
        return cfg.curURL
83✔
529
}
83✔
530

531
// Returns how long the server should wait before attempting
532
// to solicit a remote leafnode connection.
533
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,542✔
534
        cfg.RLock()
1,542✔
535
        delay := cfg.connDelay
1,542✔
536
        cfg.RUnlock()
1,542✔
537
        return delay
1,542✔
538
}
1,542✔
539

540
// Sets the connect delay.
541
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
148✔
542
        cfg.Lock()
148✔
543
        cfg.connDelay = delay
148✔
544
        cfg.Unlock()
148✔
545
}
148✔
546

547
// Ensure that non-exported options (used in tests) have
548
// been properly set.
549
func (s *Server) setLeafNodeNonExportedOptions() {
6,854✔
550
        opts := s.getOpts()
6,854✔
551
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,854✔
552
        if s.leafNodeOpts.dialTimeout == 0 {
13,707✔
553
                // Use same timeouts as routes for now.
6,853✔
554
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,853✔
555
        }
6,853✔
556
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,854✔
557
        if s.leafNodeOpts.resolver == nil {
13,705✔
558
                s.leafNodeOpts.resolver = net.DefaultResolver
6,851✔
559
        }
6,851✔
560
}
561

562
const sharedSysAccDelay = 250 * time.Millisecond
563

564
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
565
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
10✔
566
        proxyAddr, err := url.Parse(proxyURL)
10✔
567
        if err != nil {
10✔
568
                // This should not happen since proxy URL is validated during configuration parsing
×
569
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
570
        }
×
571

572
        // Connect to the proxy server
573
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
10✔
574
        if err != nil {
10✔
575
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
576
        }
×
577

578
        // Set deadline for the entire proxy handshake
579
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
10✔
580
                conn.Close()
×
581
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
582
        }
×
583

584
        req := &http.Request{
10✔
585
                Method: http.MethodConnect,
10✔
586
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
10✔
587
                Host:   targetHost,
10✔
588
                Header: make(http.Header),
10✔
589
        }
10✔
590

10✔
591
        // Add proxy authentication if provided
10✔
592
        if username != "" && password != "" {
12✔
593
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
594
        }
2✔
595

596
        if err := req.Write(conn); err != nil {
10✔
597
                conn.Close()
×
598
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
599
        }
×
600

601
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
10✔
602
        if err != nil {
10✔
603
                conn.Close()
×
604
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
605
        }
×
606

607
        if resp.StatusCode != http.StatusOK {
11✔
608
                resp.Body.Close()
1✔
609
                conn.Close()
1✔
610
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
611
        }
1✔
612

613
        // Close the response body
614
        resp.Body.Close()
9✔
615

9✔
616
        // Clear the deadline now that we've finished the proxy handshake
9✔
617
        if err := conn.SetDeadline(time.Time{}); err != nil {
9✔
618
                conn.Close()
×
619
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
620
        }
×
621

622
        return conn, nil
9✔
623
}
624

625
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
1,542✔
626
        defer s.grWG.Done()
1,542✔
627

1,542✔
628
        if remote == nil || len(remote.URLs) == 0 {
1,542✔
629
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
630
                return
×
631
        }
×
632

633
        opts := s.getOpts()
1,542✔
634
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,542✔
635
        s.mu.RLock()
1,542✔
636
        dialTimeout := s.leafNodeOpts.dialTimeout
1,542✔
637
        resolver := s.leafNodeOpts.resolver
1,542✔
638
        var isSysAcc bool
1,542✔
639
        if s.eventsEnabled() {
3,054✔
640
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,512✔
641
        }
1,512✔
642
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,542✔
643
        s.mu.RUnlock()
1,542✔
644

1,542✔
645
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,542✔
646
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,611✔
647
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
69✔
648
                remote.setConnectDelay(sharedSysAccDelay)
69✔
649
        }
69✔
650

651
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,616✔
652
                select {
74✔
653
                case <-time.After(connDelay):
69✔
654
                case <-s.quitCh:
5✔
655
                        return
5✔
656
                }
657
                remote.setConnectDelay(0)
69✔
658
        }
659

660
        var conn net.Conn
1,537✔
661

1,537✔
662
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,537✔
663

1,537✔
664
        // Capture proxy configuration once before the loop with proper locking
1,537✔
665
        remote.RLock()
1,537✔
666
        proxyURL := remote.Proxy.URL
1,537✔
667
        proxyUsername := remote.Proxy.Username
1,537✔
668
        proxyPassword := remote.Proxy.Password
1,537✔
669
        proxyTimeout := remote.Proxy.Timeout
1,537✔
670
        remote.RUnlock()
1,537✔
671

1,537✔
672
        // Set default proxy timeout if not specified
1,537✔
673
        if proxyTimeout == 0 {
3,067✔
674
                proxyTimeout = dialTimeout
1,530✔
675
        }
1,530✔
676

677
        attempts := 0
1,537✔
678

1,537✔
679
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
8,243✔
680
                rURL := remote.pickNextURL()
6,706✔
681
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,706✔
682
                if err == nil {
13,405✔
683
                        var ipStr string
6,699✔
684
                        if url != rURL.Host {
6,775✔
685
                                ipStr = fmt.Sprintf(" (%s)", url)
76✔
686
                        }
76✔
687
                        // Some test may want to disable remotes from connecting
688
                        if s.isLeafConnectDisabled() {
6,819✔
689
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
120✔
690
                                err = ErrLeafNodeDisabled
120✔
691
                        } else {
6,699✔
692
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,579✔
693

6,579✔
694
                                // Check if proxy is configured first, then check if URL supports it
6,579✔
695
                                if proxyURL != _EMPTY_ && isWSURL(rURL) {
6,586✔
696
                                        // Use proxy for WebSocket connections - use original hostname, resolved IP for connection
7✔
697
                                        targetHost := rURL.Host
7✔
698
                                        // If URL doesn't include port, add the default port for the scheme
7✔
699
                                        if rURL.Port() == _EMPTY_ {
7✔
700
                                                defaultPort := "80"
×
701
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
702
                                                        defaultPort = "443"
×
703
                                                }
×
704
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
705
                                        }
706

707
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
7✔
708
                                } else {
6,572✔
709
                                        // Direct connection
6,572✔
710
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,572✔
711
                                }
6,572✔
712
                        }
713
                }
714
                if err != nil {
12,603✔
715
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,897✔
716
                        delay := reconnectDelay + jitter
5,897✔
717
                        attempts++
5,897✔
718
                        if s.shouldReportConnectErr(firstConnect, attempts) {
9,933✔
719
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
4,036✔
720
                        } else {
5,897✔
721
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,861✔
722
                        }
1,861✔
723
                        remote.Lock()
5,897✔
724
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,897✔
725
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,905✔
726
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
727
                                        s.checkJetStreamMigrate(remote)
8✔
728
                                })
8✔
729
                        }
730
                        remote.Unlock()
5,897✔
731
                        select {
5,897✔
732
                        case <-s.quitCh:
722✔
733
                                remote.cancelMigrateTimer()
722✔
734
                                return
722✔
735
                        case <-time.After(delay):
5,174✔
736
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
5,174✔
737
                                // This will be used if JetStreamClusterMigrateDelay was not set
5,174✔
738
                                if jetstreamMigrateDelay == 0 {
10,281✔
739
                                        s.checkJetStreamMigrate(remote)
5,107✔
740
                                }
5,107✔
741
                                continue
5,174✔
742
                        }
743
                }
744
                remote.cancelMigrateTimer()
809✔
745
                if !s.remoteLeafNodeStillValid(remote) {
809✔
746
                        conn.Close()
×
747
                        return
×
748
                }
×
749

750
                // We have a connection here to a remote server.
751
                // Go ahead and create our leaf node and return.
752
                s.createLeafNode(conn, rURL, remote, nil)
809✔
753

809✔
754
                // Clear any observer states if we had them.
809✔
755
                s.clearObserverState(remote)
809✔
756

809✔
757
                return
809✔
758
        }
759
}
760

761
func (cfg *leafNodeCfg) cancelMigrateTimer() {
1,531✔
762
        cfg.Lock()
1,531✔
763
        stopAndClearTimer(&cfg.jsMigrateTimer)
1,531✔
764
        cfg.Unlock()
1,531✔
765
}
1,531✔
766

767
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
768
func (s *Server) clearObserverState(remote *leafNodeCfg) {
809✔
769
        s.mu.RLock()
809✔
770
        accName := remote.LocalAccount
809✔
771
        s.mu.RUnlock()
809✔
772

809✔
773
        acc, err := s.LookupAccount(accName)
809✔
774
        if err != nil {
811✔
775
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
776
                return
2✔
777
        }
2✔
778

779
        acc.jscmMu.Lock()
807✔
780
        defer acc.jscmMu.Unlock()
807✔
781

807✔
782
        // Walk all streams looking for any clustered stream, skip otherwise.
807✔
783
        for _, mset := range acc.streams() {
828✔
784
                node := mset.raftNode()
21✔
785
                if node == nil {
34✔
786
                        // Not R>1
13✔
787
                        continue
13✔
788
                }
789
                // Check consumers
790
                for _, o := range mset.getConsumers() {
10✔
791
                        if n := o.raftNode(); n != nil {
4✔
792
                                // Ensure we can become a leader again.
2✔
793
                                n.SetObserver(false)
2✔
794
                        }
2✔
795
                }
796
                // Ensure we can not become a leader again.
797
                node.SetObserver(false)
8✔
798
        }
799
}
800

801
// Check to see if we should migrate any assets from this account.
802
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
5,115✔
803
        s.mu.RLock()
5,115✔
804
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
5,115✔
805
        s.mu.RUnlock()
5,115✔
806

5,115✔
807
        if !shouldMigrate {
10,169✔
808
                return
5,054✔
809
        }
5,054✔
810

811
        acc, err := s.LookupAccount(accName)
61✔
812
        if err != nil {
61✔
813
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
814
                return
×
815
        }
×
816

817
        acc.jscmMu.Lock()
61✔
818
        defer acc.jscmMu.Unlock()
61✔
819

61✔
820
        // Walk all streams looking for any clustered stream, skip otherwise.
61✔
821
        // If we are the leader force stepdown.
61✔
822
        for _, mset := range acc.streams() {
91✔
823
                node := mset.raftNode()
30✔
824
                if node == nil {
30✔
825
                        // Not R>1
×
826
                        continue
×
827
                }
828
                // Collect any consumers
829
                for _, o := range mset.getConsumers() {
49✔
830
                        if n := o.raftNode(); n != nil {
38✔
831
                                n.StepDown()
19✔
832
                                // Ensure we can not become a leader while in this state.
19✔
833
                                n.SetObserver(true)
19✔
834
                        }
19✔
835
                }
836
                // Stepdown if this stream was leader.
837
                node.StepDown()
30✔
838
                // Ensure we can not become a leader while in this state.
30✔
839
                node.SetObserver(true)
30✔
840
        }
841
}
842

843
// Helper for checking.
844
func (s *Server) isLeafConnectDisabled() bool {
6,699✔
845
        s.mu.RLock()
6,699✔
846
        defer s.mu.RUnlock()
6,699✔
847
        return s.leafDisableConnect
6,699✔
848
}
6,699✔
849

850
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
851
// come from the server we connect to.
852
//
853
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
854
// However, this was causing failures for users that did not set the scheme (and
855
// their remote connections did not have a tls{} block).
856
// We now save the host name regardless in case the remote returns an INFO indicating
857
// that TLS is required.
858
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,432✔
859
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,452✔
860
                cfg.tlsName = u.Hostname()
20✔
861
        }
20✔
862
}
863

864
// Save off the username/password for when we connect using a bare URL
865
// that we get from the INFO protocol.
866
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,785✔
867
        if cfg.username == _EMPTY_ && u.User != nil {
2,090✔
868
                cfg.username = u.User.Username()
305✔
869
                cfg.password, _ = u.User.Password()
305✔
870
        }
305✔
871
}
872

873
// This starts the leafnode accept loop in a go routine, unless it
874
// is detected that the server has already been shutdown.
875
func (s *Server) startLeafNodeAcceptLoop() {
3,823✔
876
        // Snapshot server options.
3,823✔
877
        opts := s.getOpts()
3,823✔
878

3,823✔
879
        port := opts.LeafNode.Port
3,823✔
880
        if port == -1 {
7,471✔
881
                port = 0
3,648✔
882
        }
3,648✔
883

884
        if s.isShuttingDown() {
3,823✔
885
                return
×
886
        }
×
887

888
        s.mu.Lock()
3,823✔
889
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,823✔
890
        l, e := natsListen("tcp", hp)
3,823✔
891
        s.leafNodeListenerErr = e
3,823✔
892
        if e != nil {
3,823✔
893
                s.mu.Unlock()
×
894
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
895
                return
×
896
        }
×
897

898
        s.Noticef("Listening for leafnode connections on %s",
3,823✔
899
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,823✔
900

3,823✔
901
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,823✔
902
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,823✔
903
        // Do not set compression in this Info object, it would possibly cause
3,823✔
904
        // issues when sending asynchronous INFO to the remote.
3,823✔
905
        info := Info{
3,823✔
906
                ID:            s.info.ID,
3,823✔
907
                Name:          s.info.Name,
3,823✔
908
                Version:       s.info.Version,
3,823✔
909
                GitCommit:     gitCommit,
3,823✔
910
                GoVersion:     runtime.Version(),
3,823✔
911
                AuthRequired:  true,
3,823✔
912
                TLSRequired:   tlsRequired,
3,823✔
913
                TLSVerify:     tlsVerify,
3,823✔
914
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,823✔
915
                Headers:       s.supportsHeaders(),
3,823✔
916
                JetStream:     opts.JetStream,
3,823✔
917
                Domain:        opts.JetStreamDomain,
3,823✔
918
                Proto:         s.getServerProto(),
3,823✔
919
                InfoOnConnect: true,
3,823✔
920
                JSApiLevel:    JSApiLevel,
3,823✔
921
        }
3,823✔
922
        // If we have selected a random port...
3,823✔
923
        if port == 0 {
7,471✔
924
                // Write resolved port back to options.
3,648✔
925
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,648✔
926
        }
3,648✔
927

928
        s.leafNodeInfo = info
3,823✔
929
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,823✔
930
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,823✔
931
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
932
                l.Close()
×
933
                s.mu.Unlock()
×
934
                return
×
935
        }
×
936
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,823✔
937
        s.generateLeafNodeInfoJSON()
3,823✔
938

3,823✔
939
        // Setup state that can enable shutdown
3,823✔
940
        s.leafNodeListener = l
3,823✔
941

3,823✔
942
        // As of now, a server that does not have remotes configured would
3,823✔
943
        // never solicit a connection, so we should not have to warn if
3,823✔
944
        // InsecureSkipVerify is set in main LeafNodes config (since
3,823✔
945
        // this TLS setting matters only when soliciting a connection).
3,823✔
946
        // Still, warn if insecure is set in any of LeafNode block.
3,823✔
947
        // We need to check remotes, even if tls is not required on accept.
3,823✔
948
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,823✔
949
        if !warn {
7,644✔
950
                for _, r := range opts.LeafNode.Remotes {
4,014✔
951
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
193✔
952
                                warn = true
×
953
                                break
×
954
                        }
955
                }
956
        }
957
        if warn {
3,825✔
958
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
959
        }
2✔
960
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,667✔
961
        s.mu.Unlock()
3,823✔
962
}
963

964
// RegEx to match a creds file with user JWT and Seed.
965
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
966

967
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
968
// Lock should be held entering here.
969
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
678✔
970
        // We support basic user/pass and operator based user JWT with signatures.
678✔
971
        cinfo := leafConnectInfo{
678✔
972
                Version:       VERSION,
678✔
973
                ID:            c.srv.info.ID,
678✔
974
                Domain:        c.srv.info.Domain,
678✔
975
                Name:          c.srv.info.Name,
678✔
976
                Hub:           c.leaf.remote.Hub,
678✔
977
                Cluster:       clusterName,
678✔
978
                Headers:       headers,
678✔
979
                JetStream:     c.acc.jetStreamConfigured(),
678✔
980
                DenyPub:       c.leaf.remote.DenyImports,
678✔
981
                Compression:   c.leaf.compression,
678✔
982
                RemoteAccount: c.acc.GetName(),
678✔
983
                Proto:         c.srv.getServerProto(),
678✔
984
                Isolate:       c.leaf.remote.RequestIsolation,
678✔
985
        }
678✔
986

678✔
987
        // If a signature callback is specified, this takes precedence over anything else.
678✔
988
        if cb := c.leaf.remote.SignatureCB; cb != nil {
683✔
989
                nonce := c.nonce
5✔
990
                c.mu.Unlock()
5✔
991
                jwt, sigraw, err := cb(nonce)
5✔
992
                c.mu.Lock()
5✔
993
                if err == nil && c.isClosed() {
6✔
994
                        err = ErrConnectionClosed
1✔
995
                }
1✔
996
                if err != nil {
7✔
997
                        c.Errorf("Error signing the nonce: %v", err)
2✔
998
                        return err
2✔
999
                }
2✔
1000
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1001
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
1002

1003
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
729✔
1004
                // Check for credentials first, that will take precedence..
56✔
1005
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1006
                contents, err := os.ReadFile(creds)
56✔
1007
                if err != nil {
56✔
1008
                        c.Errorf("%v", err)
×
1009
                        return err
×
1010
                }
×
1011
                defer wipeSlice(contents)
56✔
1012
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1013
                if len(items) < 2 {
56✔
1014
                        c.Errorf("Credentials file malformed")
×
1015
                        return err
×
1016
                }
×
1017
                // First result should be the user JWT.
1018
                // We copy here so that the file containing the seed will be wiped appropriately.
1019
                raw := items[0][1]
56✔
1020
                tmp := make([]byte, len(raw))
56✔
1021
                copy(tmp, raw)
56✔
1022
                // Seed is second item.
56✔
1023
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1024
                if err != nil {
56✔
1025
                        c.Errorf("Credentials file has malformed seed")
×
1026
                        return err
×
1027
                }
×
1028
                // Wipe our key on exit.
1029
                defer kp.Wipe()
56✔
1030

56✔
1031
                sigraw, _ := kp.Sign(c.nonce)
56✔
1032
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1033
                cinfo.JWT = bytesToString(tmp)
56✔
1034
                cinfo.Sig = sig
56✔
1035
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
622✔
1036
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1037
                if err != nil {
5✔
1038
                        c.Errorf("Remote nkey has malformed seed")
×
1039
                        return err
×
1040
                }
×
1041
                // Wipe our key on exit.
1042
                defer kp.Wipe()
5✔
1043
                sigraw, _ := kp.Sign(c.nonce)
5✔
1044
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1045
                pkey, _ := kp.PublicKey()
5✔
1046
                cinfo.Nkey = pkey
5✔
1047
                cinfo.Sig = sig
5✔
1048
        }
1049
        // In addition, and this is to allow auth callout, set user/password or
1050
        // token if applicable.
1051
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
1,005✔
1052
                cinfo.User = userInfo.Username()
329✔
1053
                var ok bool
329✔
1054
                cinfo.Pass, ok = userInfo.Password()
329✔
1055
                // For backward compatibility, if only username is provided, set both
329✔
1056
                // Token and User, not just Token.
329✔
1057
                if !ok {
338✔
1058
                        cinfo.Token = cinfo.User
9✔
1059
                }
9✔
1060
        } else if c.leaf.remote.username != _EMPTY_ {
353✔
1061
                cinfo.User = c.leaf.remote.username
6✔
1062
                cinfo.Pass = c.leaf.remote.password
6✔
1063
                // For backward compatibility, if only username is provided, set both
6✔
1064
                // Token and User, not just Token.
6✔
1065
                if cinfo.Pass == _EMPTY_ {
6✔
1066
                        cinfo.Token = cinfo.User
×
1067
                }
×
1068
        }
1069
        b, err := json.Marshal(cinfo)
676✔
1070
        if err != nil {
676✔
1071
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1072
                return err
×
1073
        }
×
1074
        // Although this call is made before the writeLoop is created,
1075
        // we don't really need to send in place. The protocol will be
1076
        // sent out by the writeLoop.
1077
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
676✔
1078
        return nil
676✔
1079
}
1080

1081
// Makes a deep copy of the LeafNode Info structure.
1082
// The server lock is held on entry.
1083
func (s *Server) copyLeafNodeInfo() *Info {
2,737✔
1084
        clone := s.leafNodeInfo
2,737✔
1085
        // Copy the array of urls.
2,737✔
1086
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,963✔
1087
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,226✔
1088
        }
2,226✔
1089
        return &clone
2,737✔
1090
}
1091

1092
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1093
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1094
// Returns a boolean indicating if the URL was added or not.
1095
// Server lock is held on entry
1096
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,654✔
1097
        if s.leafURLsMap.addUrl(urlStr) {
15,303✔
1098
                s.generateLeafNodeInfoJSON()
7,649✔
1099
                return true
7,649✔
1100
        }
7,649✔
1101
        return false
5✔
1102
}
1103

1104
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1105
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1106
// Returns a boolean indicating if the URL was removed or not.
1107
// Server lock is held on entry.
1108
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,648✔
1109
        // Don't need to do this if we are removing the route connection because
7,648✔
1110
        // we are shuting down...
7,648✔
1111
        if s.isShuttingDown() {
11,631✔
1112
                return false
3,983✔
1113
        }
3,983✔
1114
        if s.leafURLsMap.removeUrl(urlStr) {
7,326✔
1115
                s.generateLeafNodeInfoJSON()
3,661✔
1116
                return true
3,661✔
1117
        }
3,661✔
1118
        return false
4✔
1119
}
1120

1121
// Server lock is held on entry
1122
func (s *Server) generateLeafNodeInfoJSON() {
15,133✔
1123
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,133✔
1124
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,133✔
1125
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,133✔
1126
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,133✔
1127
}
15,133✔
1128

1129
// Sends an async INFO protocol so that the connected servers can update
1130
// their list of LeafNode urls.
1131
func (s *Server) sendAsyncLeafNodeInfo() {
11,310✔
1132
        for _, c := range s.leafs {
11,408✔
1133
                c.mu.Lock()
98✔
1134
                c.enqueueProto(s.leafNodeInfoJSON)
98✔
1135
                c.mu.Unlock()
98✔
1136
        }
98✔
1137
}
1138

1139
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1140
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,682✔
1141
        // Snapshot server options.
1,682✔
1142
        opts := s.getOpts()
1,682✔
1143

1,682✔
1144
        maxPay := int32(opts.MaxPayload)
1,682✔
1145
        maxSubs := int32(opts.MaxSubs)
1,682✔
1146
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,682✔
1147
        if maxSubs == 0 {
3,363✔
1148
                maxSubs = -1
1,681✔
1149
        }
1,681✔
1150
        now := time.Now().UTC()
1,682✔
1151

1,682✔
1152
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,682✔
1153
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,682✔
1154
        c.leaf = &leaf{}
1,682✔
1155

1,682✔
1156
        // If the leafnode subject interest should be isolated, flag it here.
1,682✔
1157
        s.optsMu.RLock()
1,682✔
1158
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,489✔
1159
                c.leaf.isolated = remote.LocalIsolation
807✔
1160
        }
807✔
1161
        s.optsMu.RUnlock()
1,682✔
1162

1,682✔
1163
        // For accepted LN connections, ws will be != nil if it was accepted
1,682✔
1164
        // through the Websocket port.
1,682✔
1165
        c.ws = ws
1,682✔
1166

1,682✔
1167
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,682✔
1168
        // a remote Leaf Node connection as a websocket connection.
1,682✔
1169
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,732✔
1170
                remote.RLock()
50✔
1171
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1172
                remote.RUnlock()
50✔
1173
        }
50✔
1174

1175
        // Determines if we are soliciting the connection or not.
1176
        var solicited bool
1,682✔
1177
        var acc *Account
1,682✔
1178
        var remoteSuffix string
1,682✔
1179
        if remote != nil {
2,491✔
1180
                // For now, if lookup fails, we will constantly try
809✔
1181
                // to recreate this LN connection.
809✔
1182
                lacc := remote.LocalAccount
809✔
1183
                var err error
809✔
1184
                acc, err = s.LookupAccount(lacc)
809✔
1185
                if err != nil {
811✔
1186
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1187
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1188
                        // remote needs to be set or retry won't happen
2✔
1189
                        c.leaf.remote = remote
2✔
1190
                        c.closeConnection(MissingAccount)
2✔
1191
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1192
                        return nil
2✔
1193
                }
2✔
1194
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
807✔
1195
        }
1196

1197
        c.mu.Lock()
1,680✔
1198
        c.initClient()
1,680✔
1199
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,680✔
1200

1,680✔
1201
        var (
1,680✔
1202
                tlsFirst         bool
1,680✔
1203
                tlsFirstFallback time.Duration
1,680✔
1204
                infoTimeout      time.Duration
1,680✔
1205
        )
1,680✔
1206
        if remote != nil {
2,487✔
1207
                solicited = true
807✔
1208
                remote.Lock()
807✔
1209
                c.leaf.remote = remote
807✔
1210
                c.setPermissions(remote.perms)
807✔
1211
                if !c.leaf.remote.Hub {
1,598✔
1212
                        c.leaf.isSpoke = true
791✔
1213
                }
791✔
1214
                tlsFirst = remote.TLSHandshakeFirst
807✔
1215
                infoTimeout = remote.FirstInfoTimeout
807✔
1216
                remote.Unlock()
807✔
1217
                c.acc = acc
807✔
1218
        } else {
873✔
1219
                c.flags.set(expectConnect)
873✔
1220
                if ws != nil {
902✔
1221
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1222
                }
29✔
1223
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
873✔
1224
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
874✔
1225
                        tlsFirstFallback = f
1✔
1226
                }
1✔
1227
        }
1228
        c.mu.Unlock()
1,680✔
1229

1,680✔
1230
        var nonce [nonceLen]byte
1,680✔
1231
        var info *Info
1,680✔
1232

1,680✔
1233
        // Grab this before the client lock below.
1,680✔
1234
        if !solicited {
2,553✔
1235
                // Grab server variables
873✔
1236
                s.mu.Lock()
873✔
1237
                info = s.copyLeafNodeInfo()
873✔
1238
                // For tests that want to simulate old servers, do not set the compression
873✔
1239
                // on the INFO protocol if configured with CompressionNotSupported.
873✔
1240
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,745✔
1241
                        info.Compression = cm
872✔
1242
                }
872✔
1243
                // We always send a nonce for LEAF connections. Do not change that without
1244
                // taking into account presence of proxy trusted keys.
1245
                s.generateNonce(nonce[:])
873✔
1246
                s.mu.Unlock()
873✔
1247
        }
1248

1249
        // Grab lock
1250
        c.mu.Lock()
1,680✔
1251

1,680✔
1252
        var preBuf []byte
1,680✔
1253
        if solicited {
2,487✔
1254
                // For websocket connection, we need to send an HTTP request,
807✔
1255
                // and get the response before starting the readLoop to get
807✔
1256
                // the INFO, etc..
807✔
1257
                if c.isWebsocket() {
857✔
1258
                        var err error
50✔
1259
                        var closeReason ClosedState
50✔
1260

50✔
1261
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1262
                        if err != nil {
71✔
1263
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1264
                                c.mu.Unlock()
21✔
1265
                                if closeReason != 0 {
38✔
1266
                                        c.closeConnection(closeReason)
17✔
1267
                                }
17✔
1268
                                return nil
21✔
1269
                        }
1270
                } else {
757✔
1271
                        // If configured to do TLS handshake first
757✔
1272
                        if tlsFirst {
761✔
1273
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1274
                                        c.mu.Unlock()
1✔
1275
                                        return nil
1✔
1276
                                }
1✔
1277
                        }
1278
                        // We need to wait for the info, but not for too long.
1279
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
756✔
1280
                }
1281

1282
                // We will process the INFO from the readloop and finish by
1283
                // sending the CONNECT and finish registration later.
1284
        } else {
873✔
1285
                // Send our info to the other side.
873✔
1286
                // Remember the nonce we sent here for signatures, etc.
873✔
1287
                c.nonce = make([]byte, nonceLen)
873✔
1288
                copy(c.nonce, nonce[:])
873✔
1289
                info.Nonce = bytesToString(c.nonce)
873✔
1290
                info.CID = c.cid
873✔
1291
                proto := generateInfoJSON(info)
873✔
1292

873✔
1293
                var pre []byte
873✔
1294
                // We need first to check for "TLS First" fallback delay.
873✔
1295
                if tlsFirstFallback > 0 {
874✔
1296
                        // We wait and see if we are getting any data. Since we did not send
1✔
1297
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1298
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1299
                        // if it is a rogue agent and not an actual client performing the
1✔
1300
                        // TLS handshake, the error will be detected when performing the
1✔
1301
                        // handshake on our side.
1✔
1302
                        pre = make([]byte, 4)
1✔
1303
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1304
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1305
                        c.nc.SetReadDeadline(time.Time{})
1✔
1306
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1307
                        // with the TLS handshake.
1✔
1308
                        if n > 0 {
1✔
1309
                                pre = pre[:n]
×
1310
                        } else {
1✔
1311
                                // We did not get anything so we will send the INFO protocol.
1✔
1312
                                pre = nil
1✔
1313
                                // Set the boolean to false for the rest of the function.
1✔
1314
                                tlsFirst = false
1✔
1315
                        }
1✔
1316
                }
1317

1318
                if !tlsFirst {
1,741✔
1319
                        // We have to send from this go routine because we may
868✔
1320
                        // have to block for TLS handshake before we start our
868✔
1321
                        // writeLoop go routine. The other side needs to receive
868✔
1322
                        // this before it can initiate the TLS handshake..
868✔
1323
                        c.sendProtoNow(proto)
868✔
1324

868✔
1325
                        // The above call could have marked the connection as closed (due to TCP error).
868✔
1326
                        if c.isClosed() {
868✔
1327
                                c.mu.Unlock()
×
1328
                                c.closeConnection(WriteError)
×
1329
                                return nil
×
1330
                        }
×
1331
                }
1332

1333
                // Check to see if we need to spin up TLS.
1334
                if !c.isWebsocket() && info.TLSRequired {
954✔
1335
                        // If we have a prebuffer create a multi-reader.
81✔
1336
                        if len(pre) > 0 {
81✔
1337
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1338
                        }
×
1339
                        // Perform server-side TLS handshake.
1340
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
136✔
1341
                                c.mu.Unlock()
55✔
1342
                                return nil
55✔
1343
                        }
55✔
1344
                }
1345

1346
                // If the user wants the TLS handshake to occur first, now that it is
1347
                // done, send the INFO protocol.
1348
                if tlsFirst {
821✔
1349
                        c.flags.set(didTLSFirst)
3✔
1350
                        c.sendProtoNow(proto)
3✔
1351
                        if c.isClosed() {
3✔
1352
                                c.mu.Unlock()
×
1353
                                c.closeConnection(WriteError)
×
1354
                                return nil
×
1355
                        }
×
1356
                }
1357

1358
                // Leaf nodes will always require a CONNECT to let us know
1359
                // when we are properly bound to an account.
1360
                //
1361
                // If compression is configured, we can't set the authTimer here because
1362
                // it would cause the parser to fail any incoming protocol that is not a
1363
                // CONNECT (and we need to exchange INFO protocols for compression
1364
                // negotiation). So instead, use the ping timer until we are done with
1365
                // negotiation and can set the auth timer.
1366
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
818✔
1367
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,422✔
1368
                        c.ping.tmr = time.AfterFunc(timeout, func() {
612✔
1369
                                c.authTimeout()
8✔
1370
                        })
8✔
1371
                } else {
214✔
1372
                        c.setAuthTimer(timeout)
214✔
1373
                }
214✔
1374
        }
1375

1376
        // Keep track in case server is shutdown before we can successfully register.
1377
        if !s.addToTempClients(c.cid, c) {
1,604✔
1378
                c.mu.Unlock()
1✔
1379
                c.setNoReconnect()
1✔
1380
                c.closeConnection(ServerShutdown)
1✔
1381
                return nil
1✔
1382
        }
1✔
1383

1384
        // Spin up the read loop.
1385
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,204✔
1386

1387
        // We will spin the write loop for solicited connections only
1388
        // when processing the INFO and after switching to TLS if needed.
1389
        if !solicited {
2,420✔
1390
                s.startGoRoutine(func() { c.writeLoop() })
1,636✔
1391
        }
1392

1393
        c.mu.Unlock()
1,602✔
1394

1,602✔
1395
        return c
1,602✔
1396
}
1397

1398
// Will perform the client-side TLS handshake if needed. Assumes that this
1399
// is called by the solicit side (remote will be non nil). Returns `true`
1400
// if TLS is required, `false` otherwise.
1401
// Lock held on entry.
1402
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,948✔
1403
        // Check if TLS is required and gather TLS config variables.
1,948✔
1404
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,948✔
1405
        if !tlsRequired {
3,813✔
1406
                return false, nil
1,865✔
1407
        }
1,865✔
1408

1409
        // If TLS required, peform handshake.
1410
        // Get the URL that was used to connect to the remote server.
1411
        rURL := remote.getCurrentURL()
83✔
1412

83✔
1413
        // Perform the client-side TLS handshake.
83✔
1414
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
126✔
1415
                // Check if we need to reset the remote's TLS name.
43✔
1416
                if resetTLSName {
43✔
1417
                        remote.Lock()
×
1418
                        remote.tlsName = _EMPTY_
×
1419
                        remote.Unlock()
×
1420
                }
×
1421
                return false, err
43✔
1422
        }
1423
        return true, nil
40✔
1424
}
1425

1426
func (c *client) processLeafnodeInfo(info *Info) {
2,691✔
1427
        c.mu.Lock()
2,691✔
1428
        if c.leaf == nil || c.isClosed() {
2,691✔
1429
                c.mu.Unlock()
×
1430
                return
×
1431
        }
×
1432
        s := c.srv
2,691✔
1433
        opts := s.getOpts()
2,691✔
1434
        remote := c.leaf.remote
2,691✔
1435
        didSolicit := remote != nil
2,691✔
1436
        firstINFO := !c.flags.isSet(infoReceived)
2,691✔
1437

2,691✔
1438
        // In case of websocket, the TLS handshake has been already done.
2,691✔
1439
        // So check only for non websocket connections and for configurations
2,691✔
1440
        // where the TLS Handshake was not done first.
2,691✔
1441
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,585✔
1442
                // If the server requires TLS, we need to set this in the remote
1,894✔
1443
                // otherwise if there is no TLS configuration block for the remote,
1,894✔
1444
                // the solicit side will not attempt to perform the TLS handshake.
1,894✔
1445
                if firstINFO && info.TLSRequired {
1,961✔
1446
                        // Check for TLS/proxy configuration mismatch
67✔
1447
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
67✔
1448
                                c.mu.Unlock()
×
1449
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1450
                                c.closeConnection(TLSHandshakeError)
×
1451
                                return
×
1452
                        }
×
1453
                        remote.TLS = true
67✔
1454
                }
1455
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,932✔
1456
                        c.mu.Unlock()
38✔
1457
                        return
38✔
1458
                }
38✔
1459
        }
1460

1461
        // Check for compression, unless already done.
1462
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,970✔
1463
                // Prevent from getting back here.
1,317✔
1464
                c.flags.set(compressionNegotiated)
1,317✔
1465

1,317✔
1466
                var co *CompressionOpts
1,317✔
1467
                if !didSolicit {
1,899✔
1468
                        co = &opts.LeafNode.Compression
582✔
1469
                } else {
1,317✔
1470
                        co = &remote.Compression
735✔
1471
                }
735✔
1472
                if needsCompression(co.Mode) {
2,625✔
1473
                        // Release client lock since following function will need server lock.
1,308✔
1474
                        c.mu.Unlock()
1,308✔
1475
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,308✔
1476
                        if err != nil {
1,308✔
1477
                                c.sendErrAndErr(err.Error())
×
1478
                                c.closeConnection(ProtocolViolation)
×
1479
                                return
×
1480
                        }
×
1481
                        if compress {
2,490✔
1482
                                // Done for now, will get back another INFO protocol...
1,182✔
1483
                                return
1,182✔
1484
                        }
1,182✔
1485
                        // No compression because one side does not want/can't, so proceed.
1486
                        c.mu.Lock()
126✔
1487
                        // Check that the connection did not close if the lock was released.
126✔
1488
                        if c.isClosed() {
126✔
1489
                                c.mu.Unlock()
×
1490
                                return
×
1491
                        }
×
1492
                } else {
9✔
1493
                        // Coming from an old server, the Compression field would be the empty
9✔
1494
                        // string. For servers that are configured with CompressionNotSupported,
9✔
1495
                        // this makes them behave as old servers.
9✔
1496
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
11✔
1497
                                c.leaf.compression = CompressionNotSupported
2✔
1498
                        } else {
9✔
1499
                                c.leaf.compression = CompressionOff
7✔
1500
                        }
7✔
1501
                }
1502
                // Accepting side does not normally process an INFO protocol during
1503
                // initial connection handshake. So we keep it consistent by returning
1504
                // if we are not soliciting.
1505
                if !didSolicit {
136✔
1506
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1507
                        // clear the ping timer and set the auth timer now that the compression
1✔
1508
                        // negotiation is done.
1✔
1509
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1510
                                clearTimer(&c.ping.tmr)
×
1511
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1512
                        }
×
1513
                        c.mu.Unlock()
1✔
1514
                        return
1✔
1515
                }
1516
                // Fall through and process the INFO protocol as usual.
1517
        }
1518

1519
        // Note: For now, only the initial INFO has a nonce. We
1520
        // will probably do auto key rotation at some point.
1521
        if firstINFO {
2,246✔
1522
                // Mark that the INFO protocol has been received.
776✔
1523
                c.flags.set(infoReceived)
776✔
1524
                // Prevent connecting to non leafnode port. Need to do this only for
776✔
1525
                // the first INFO, not for async INFO updates...
776✔
1526
                //
776✔
1527
                // Content of INFO sent by the server when accepting a tcp connection.
776✔
1528
                // -------------------------------------------------------------------
776✔
1529
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
776✔
1530
                // -------------------------------------------------------------------
776✔
1531
                //      CLIENT    |  X* |        X**        |              |         |
776✔
1532
                //      ROUTE     |     |        X**        |      X***    |         |
776✔
1533
                //     GATEWAY    |     |                   |              |    X    |
776✔
1534
                //     LEAFNODE   |  X  |                   |       X      |         |
776✔
1535
                // -------------------------------------------------------------------
776✔
1536
                // *   Not on older servers.
776✔
1537
                // **  Not if "no advertise" is enabled.
776✔
1538
                // *** Not if leafnode's "no advertise" is enabled.
776✔
1539
                //
776✔
1540
                // As seen from above, a solicited LeafNode connection should receive
776✔
1541
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
776✔
1542
                // else should be considered an attempt to connect to a wrong port.
776✔
1543
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
830✔
1544
                        c.mu.Unlock()
54✔
1545
                        c.Errorf(ErrConnectedToWrongPort.Error())
54✔
1546
                        c.closeConnection(WrongPort)
54✔
1547
                        return
54✔
1548
                }
54✔
1549
                // Reject a cluster that contains spaces.
1550
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
723✔
1551
                        c.mu.Unlock()
1✔
1552
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1553
                        c.closeConnection(ProtocolViolation)
1✔
1554
                        return
1✔
1555
                }
1✔
1556
                // Capture a nonce here.
1557
                c.nonce = []byte(info.Nonce)
721✔
1558
                if info.TLSRequired && didSolicit {
750✔
1559
                        remote.TLS = true
29✔
1560
                }
29✔
1561
                supportsHeaders := c.srv.supportsHeaders()
721✔
1562
                c.headers = supportsHeaders && info.Headers
721✔
1563

721✔
1564
                // Remember the remote server.
721✔
1565
                // Pre 2.2.0 servers are not sending their server name.
721✔
1566
                // In that case, use info.ID, which, for those servers, matches
721✔
1567
                // the content of the field `Name` in the leafnode CONNECT protocol.
721✔
1568
                if info.Name == _EMPTY_ {
721✔
1569
                        c.leaf.remoteServer = info.ID
×
1570
                } else {
721✔
1571
                        c.leaf.remoteServer = info.Name
721✔
1572
                }
721✔
1573
                c.leaf.remoteDomain = info.Domain
721✔
1574
                c.leaf.remoteCluster = info.Cluster
721✔
1575
                // We send the protocol version in the INFO protocol.
721✔
1576
                // Keep track of it, so we know if this connection supports message
721✔
1577
                // tracing for instance.
721✔
1578
                c.opts.Protocol = info.Proto
721✔
1579
        }
1580

1581
        // For both initial INFO and async INFO protocols, Possibly
1582
        // update our list of remote leafnode URLs we can connect to.
1583
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,744✔
1584
                // Consider the incoming array as the most up-to-date
1,329✔
1585
                // representation of the remote cluster's list of URLs.
1,329✔
1586
                c.updateLeafNodeURLs(info)
1,329✔
1587
        }
1,329✔
1588

1589
        // Check to see if we have permissions updates here.
1590
        if info.Import != nil || info.Export != nil {
1,431✔
1591
                perms := &Permissions{
16✔
1592
                        Publish:   info.Export,
16✔
1593
                        Subscribe: info.Import,
16✔
1594
                }
16✔
1595
                // Check if we have local deny clauses that we need to merge.
16✔
1596
                if remote := c.leaf.remote; remote != nil {
32✔
1597
                        if len(remote.DenyExports) > 0 {
17✔
1598
                                if perms.Publish == nil {
1✔
1599
                                        perms.Publish = &SubjectPermission{}
×
1600
                                }
×
1601
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1602
                        }
1603
                        if len(remote.DenyImports) > 0 {
17✔
1604
                                if perms.Subscribe == nil {
1✔
1605
                                        perms.Subscribe = &SubjectPermission{}
×
1606
                                }
×
1607
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1608
                        }
1609
                }
1610
                c.setPermissions(perms)
16✔
1611
        }
1612

1613
        var resumeConnect bool
1,415✔
1614

1,415✔
1615
        // If this is a remote connection and this is the first INFO protocol,
1,415✔
1616
        // then we need to finish the connect process by sending CONNECT, etc..
1,415✔
1617
        if firstINFO && didSolicit {
2,093✔
1618
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
678✔
1619
                c.nc.SetDeadline(time.Time{})
678✔
1620
                resumeConnect = true
678✔
1621
        } else if !firstINFO && didSolicit {
2,066✔
1622
                c.leaf.remoteAccName = info.RemoteAccount
651✔
1623
        }
651✔
1624

1625
        // Check if we have the remote account information and if so make sure it's stored.
1626
        if info.RemoteAccount != _EMPTY_ {
2,057✔
1627
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
642✔
1628
        }
642✔
1629
        c.mu.Unlock()
1,415✔
1630

1,415✔
1631
        finishConnect := info.ConnectInfo
1,415✔
1632
        if resumeConnect && s != nil {
2,093✔
1633
                s.leafNodeResumeConnectProcess(c)
678✔
1634
                if !info.InfoOnConnect {
678✔
1635
                        finishConnect = true
×
1636
                }
×
1637
        }
1638
        if finishConnect {
2,057✔
1639
                s.leafNodeFinishConnectProcess(c)
642✔
1640
        }
642✔
1641

1642
        // Check to see if we need to kick any internal source or mirror consumers.
1643
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1644
        // does not have jetstream.
1645
        s.checkInternalSyncConsumers(c.acc)
1,415✔
1646
}
1647

1648
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,308✔
1649
        // Negotiate the appropriate compression mode (or no compression)
1,308✔
1650
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,308✔
1651
        if err != nil {
1,308✔
1652
                return false, err
×
1653
        }
×
1654
        c.mu.Lock()
1,308✔
1655
        // For "auto" mode, set the initial compression mode based on RTT
1,308✔
1656
        if cm == CompressionS2Auto {
2,458✔
1657
                if c.rttStart.IsZero() {
2,300✔
1658
                        c.rtt = computeRTT(c.start)
1,150✔
1659
                }
1,150✔
1660
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,150✔
1661
        }
1662
        // Keep track of the negotiated compression mode.
1663
        c.leaf.compression = cm
1,308✔
1664
        cid := c.cid
1,308✔
1665
        var nonce string
1,308✔
1666
        if !didSolicit {
1,889✔
1667
                nonce = bytesToString(c.nonce)
581✔
1668
        }
581✔
1669
        c.mu.Unlock()
1,308✔
1670

1,308✔
1671
        if !needsCompression(cm) {
1,434✔
1672
                return false, nil
126✔
1673
        }
126✔
1674

1675
        // If we end-up doing compression...
1676

1677
        // Generate an INFO with the chosen compression mode.
1678
        s.mu.Lock()
1,182✔
1679
        info := s.copyLeafNodeInfo()
1,182✔
1680
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,182✔
1681
        infoProto := generateInfoJSON(info)
1,182✔
1682
        s.mu.Unlock()
1,182✔
1683

1,182✔
1684
        // If we solicited, then send this INFO protocol BEFORE switching
1,182✔
1685
        // to compression writer. However, if we did not, we send it after.
1,182✔
1686
        c.mu.Lock()
1,182✔
1687
        if didSolicit {
1,783✔
1688
                c.enqueueProto(infoProto)
601✔
1689
                // Make sure it is completely flushed (the pending bytes goes to
601✔
1690
                // 0) before proceeding.
601✔
1691
                for c.out.pb > 0 && !c.isClosed() {
1,202✔
1692
                        c.flushOutbound()
601✔
1693
                }
601✔
1694
        }
1695
        // This is to notify the readLoop that it should switch to a
1696
        // (de)compression reader.
1697
        c.in.flags.set(switchToCompression)
1,182✔
1698
        // Create the compress writer before queueing the INFO protocol for
1,182✔
1699
        // a route that did not solicit. It will make sure that that proto
1,182✔
1700
        // is sent with compression on.
1,182✔
1701
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,182✔
1702
        if !didSolicit {
1,763✔
1703
                c.enqueueProto(infoProto)
581✔
1704
        }
581✔
1705
        c.mu.Unlock()
1,182✔
1706
        return true, nil
1,182✔
1707
}
1708

1709
// When getting a leaf node INFO protocol, use the provided
1710
// array of urls to update the list of possible endpoints.
1711
func (c *client) updateLeafNodeURLs(info *Info) {
1,329✔
1712
        cfg := c.leaf.remote
1,329✔
1713
        cfg.Lock()
1,329✔
1714
        defer cfg.Unlock()
1,329✔
1715

1,329✔
1716
        // We have ensured that if a remote has a WS scheme, then all are.
1,329✔
1717
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,329✔
1718
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,387✔
1719
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1720
                // we will have already marked that the remote should use TLS anyway.
58✔
1721
                // But use proper scheme for log statements, etc...
58✔
1722
                proto := wsSchemePrefix
58✔
1723
                if cfg.TLS {
58✔
1724
                        proto = wsSchemePrefixTLS
×
1725
                }
×
1726
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1727
                return
58✔
1728
        }
1729
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,271✔
1730
}
1731

1732
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,329✔
1733
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,329✔
1734
        // Add the ones we receive in the protocol
1,329✔
1735
        for _, surl := range URLs {
3,680✔
1736
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,351✔
1737
                if err != nil {
2,351✔
1738
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1739
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1740
                        continue
×
1741
                }
1742
                // Do not add if it's the same as what we already have configured.
1743
                var dup bool
2,351✔
1744
                for _, u := range cfg.URLs {
5,934✔
1745
                        // URLs that we receive never have user info, but the
3,583✔
1746
                        // ones that were configured may have. Simply compare
3,583✔
1747
                        // host and port to decide if they are equal or not.
3,583✔
1748
                        if url.Host == u.Host && url.Port() == u.Port() {
5,287✔
1749
                                dup = true
1,704✔
1750
                                break
1,704✔
1751
                        }
1752
                }
1753
                if !dup {
2,998✔
1754
                        cfg.urls = append(cfg.urls, url)
647✔
1755
                        cfg.saveTLSHostname(url)
647✔
1756
                }
647✔
1757
        }
1758
        // Add the configured one
1759
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,329✔
1760
}
1761

1762
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1763
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,823✔
1764
        opts := s.getOpts()
3,823✔
1765
        if opts.LeafNode.Advertise != _EMPTY_ {
3,834✔
1766
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1767
                if err != nil {
11✔
1768
                        return err
×
1769
                }
×
1770
                s.leafNodeInfo.Host = advHost
11✔
1771
                s.leafNodeInfo.Port = advPort
11✔
1772
        } else {
3,812✔
1773
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,812✔
1774
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,812✔
1775
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,812✔
1776
                // This will return at most 1 IP.
3,812✔
1777
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,812✔
1778
                if err != nil {
3,812✔
1779
                        return err
×
1780
                }
×
1781
                if hostIsIPAny {
4,112✔
1782
                        if len(ips) == 0 {
300✔
1783
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1784
                                        s.leafNodeInfo.Host)
×
1785
                        } else {
300✔
1786
                                // Take the first from the list...
300✔
1787
                                s.leafNodeInfo.Host = ips[0]
300✔
1788
                        }
300✔
1789
                }
1790
        }
1791
        // Use just host:port for the IP
1792
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,823✔
1793
        if opts.LeafNode.Advertise != _EMPTY_ {
3,834✔
1794
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1795
        }
11✔
1796
        return nil
3,823✔
1797
}
1798

1799
// Add the connection to the map of leaf nodes.
1800
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1801
// if a connection already exists for the same server name and account.
1802
// That can happen when the remote is attempting to reconnect while the accepting
1803
// side did not detect the connection as broken yet.
1804
// But it can also happen when there is a misconfiguration and the remote is
1805
// creating two (or more) connections that bind to the same account on the accept
1806
// side.
1807
// When a duplicate is found, the new connection is accepted and the old is closed
1808
// (this solves the stale connection situation). An error is returned to help the
1809
// remote detect the misconfiguration when the duplicate is the result of that
1810
// misconfiguration.
1811
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,321✔
1812
        var accName string
1,321✔
1813
        c.mu.Lock()
1,321✔
1814
        cid := c.cid
1,321✔
1815
        acc := c.acc
1,321✔
1816
        if acc != nil {
2,642✔
1817
                accName = acc.Name
1,321✔
1818
        }
1,321✔
1819
        myRemoteDomain := c.leaf.remoteDomain
1,321✔
1820
        mySrvName := c.leaf.remoteServer
1,321✔
1821
        remoteAccName := c.leaf.remoteAccName
1,321✔
1822
        myClustName := c.leaf.remoteCluster
1,321✔
1823
        solicited := c.leaf.remote != nil
1,321✔
1824
        c.mu.Unlock()
1,321✔
1825

1,321✔
1826
        var old *client
1,321✔
1827
        s.mu.Lock()
1,321✔
1828
        // We check for empty because in some test we may send empty CONNECT{}
1,321✔
1829
        if checkForDup && srvName != _EMPTY_ {
1,963✔
1830
                for _, ol := range s.leafs {
1,029✔
1831
                        ol.mu.Lock()
387✔
1832
                        // We care here only about non solicited Leafnode. This function
387✔
1833
                        // is more about replacing stale connections than detecting loops.
387✔
1834
                        // We have code for the loop detection elsewhere, which also delays
387✔
1835
                        // attempt to reconnect.
387✔
1836
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
387✔
1837
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
387✔
1838
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
389✔
1839
                                old = ol
2✔
1840
                        }
2✔
1841
                        ol.mu.Unlock()
387✔
1842
                        if old != nil {
389✔
1843
                                break
2✔
1844
                        }
1845
                }
1846
        }
1847
        // Store new connection in the map
1848
        s.leafs[cid] = c
1,321✔
1849
        s.mu.Unlock()
1,321✔
1850
        s.removeFromTempClients(cid)
1,321✔
1851

1,321✔
1852
        // If applicable, evict the old one.
1,321✔
1853
        if old != nil {
1,323✔
1854
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1855
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1856
                c.Warnf("Replacing connection from same server")
2✔
1857
        }
2✔
1858

1859
        srvDecorated := func() string {
1,530✔
1860
                if myClustName == _EMPTY_ {
232✔
1861
                        return mySrvName
23✔
1862
                }
23✔
1863
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
186✔
1864
        }
1865

1866
        opts := s.getOpts()
1,321✔
1867
        sysAcc := s.SystemAccount()
1,321✔
1868
        js := s.getJetStream()
1,321✔
1869
        var meta *raft
1,321✔
1870
        if js != nil {
1,863✔
1871
                if mg := js.getMetaGroup(); mg != nil {
967✔
1872
                        meta = mg.(*raft)
425✔
1873
                }
425✔
1874
        }
1875
        blockMappingOutgoing := false
1,321✔
1876
        // Deny (non domain) JetStream API traffic unless system account is shared
1,321✔
1877
        // and domain names are identical and extending is not disabled
1,321✔
1878

1,321✔
1879
        // Check if backwards compatibility has been enabled and needs to be acted on
1,321✔
1880
        forceSysAccDeny := false
1,321✔
1881
        if len(opts.JsAccDefaultDomain) > 0 {
1,358✔
1882
                if acc == sysAcc {
48✔
1883
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1884
                                if d == _EMPTY_ {
19✔
1885
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1886
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1887
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1888
                                        forceSysAccDeny = true
8✔
1889
                                        break
8✔
1890
                                }
1891
                        }
1892
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1893
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1894
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1895
                        return
15✔
1896
                }
15✔
1897
        }
1898

1899
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1900
        // This is either signaled by js being disabled and a domain set,
1901
        // or in cases where no domain name exists, an extension hint is set.
1902
        // However, this is only relevant in mixed setups.
1903
        //
1904
        // If the system account connects but default domains are present, JetStream can't be extended.
1905
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,306✔
1906
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,454✔
1907
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,148✔
1908
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,148✔
1909
                if acc != nil && acc == sysAcc {
1,285✔
1910
                        c.Noticef("System account connected from %s", srvDecorated())
137✔
1911
                        c.Noticef("JetStream not extended, domains differ")
137✔
1912
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
137✔
1913
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
137✔
1914
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
137✔
1915
                        if solicited && meta != nil && meta.IsObserver() {
166✔
1916
                                meta.setObserver(false, extNotExtended)
29✔
1917
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1918
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1919
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1920
                                // Meta controller can't be leader yet.
29✔
1921
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1922
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1923
                                meta.Campaign()
29✔
1924
                        }
29✔
1925
                } else {
1,011✔
1926
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
1,011✔
1927
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
1,011✔
1928
                }
1,011✔
1929
                blockMappingOutgoing = true
1,148✔
1930
        } else if acc == sysAcc {
230✔
1931
                // system account and same domain
72✔
1932
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1933
                        myRemoteDomain, srvDecorated())
72✔
1934
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1935
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1936
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1937
                        meta.setObserver(true, extExtended)
4✔
1938
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1939
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1940
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1941
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1942
                        meta.StepDown()
4✔
1943
                }
4✔
1944
        } else {
86✔
1945
                // This deny is needed in all cases (system account shared or not)
86✔
1946
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1947
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1948
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1949
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1950
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1951
        }
86✔
1952
        // If we have a specified JetStream domain we will want to add a mapping to
1953
        // allow access cross domain for each non-system account.
1954
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,558✔
1955
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,520✔
1956
                        if err := acc.AddMapping(src, dest); err != nil {
2,268✔
1957
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1958
                        } else {
2,268✔
1959
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,268✔
1960
                        }
2,268✔
1961
                }
1962
                if blockMappingOutgoing {
473✔
1963
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
221✔
1964
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
221✔
1965
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
221✔
1966
                        // of this issue, not all of them.
221✔
1967
                        // This guards against a hub and a spoke having the same domain name.
221✔
1968
                        // But not two spokes having the same one and the request coming from the hub.
221✔
1969
                        c.mergeDenyPermissionsLocked(pub, []string{src})
221✔
1970
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
221✔
1971
                }
221✔
1972
        }
1973
}
1974

1975
func (s *Server) removeLeafNodeConnection(c *client) {
1,683✔
1976
        c.mu.Lock()
1,683✔
1977
        cid := c.cid
1,683✔
1978
        if c.leaf != nil {
3,366✔
1979
                if c.leaf.tsubt != nil {
2,891✔
1980
                        c.leaf.tsubt.Stop()
1,208✔
1981
                        c.leaf.tsubt = nil
1,208✔
1982
                }
1,208✔
1983
                if c.leaf.gwSub != nil {
2,323✔
1984
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
640✔
1985
                        // We need to set this to nil for GC to release the connection
640✔
1986
                        c.leaf.gwSub = nil
640✔
1987
                }
640✔
1988
        }
1989
        proxyKey := c.proxyKey
1,683✔
1990
        c.mu.Unlock()
1,683✔
1991
        s.mu.Lock()
1,683✔
1992
        delete(s.leafs, cid)
1,683✔
1993
        if proxyKey != _EMPTY_ {
1,687✔
1994
                s.removeProxiedConn(proxyKey, cid)
4✔
1995
        }
4✔
1996
        s.mu.Unlock()
1,683✔
1997
        s.removeFromTempClients(cid)
1,683✔
1998
}
1999

2000
// Connect information for solicited leafnodes.
2001
type leafConnectInfo struct {
2002
        Version   string   `json:"version,omitempty"`
2003
        Nkey      string   `json:"nkey,omitempty"`
2004
        JWT       string   `json:"jwt,omitempty"`
2005
        Sig       string   `json:"sig,omitempty"`
2006
        User      string   `json:"user,omitempty"`
2007
        Pass      string   `json:"pass,omitempty"`
2008
        Token     string   `json:"auth_token,omitempty"`
2009
        ID        string   `json:"server_id,omitempty"`
2010
        Domain    string   `json:"domain,omitempty"`
2011
        Name      string   `json:"name,omitempty"`
2012
        Hub       bool     `json:"is_hub,omitempty"`
2013
        Cluster   string   `json:"cluster,omitempty"`
2014
        Headers   bool     `json:"headers,omitempty"`
2015
        JetStream bool     `json:"jetstream,omitempty"`
2016
        DenyPub   []string `json:"deny_pub,omitempty"`
2017
        Isolate   bool     `json:"isolate,omitempty"`
2018

2019
        // There was an existing field called:
2020
        // >> Comp bool `json:"compression,omitempty"`
2021
        // that has never been used. With support for compression, we now need
2022
        // a field that is a string. So we use a different json tag:
2023
        Compression string `json:"compress_mode,omitempty"`
2024

2025
        // Just used to detect wrong connection attempts.
2026
        Gateway string `json:"gateway,omitempty"`
2027

2028
        // Tells the accept side which account the remote is binding to.
2029
        RemoteAccount string `json:"remote_account,omitempty"`
2030

2031
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2032
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2033
        // version as part of the CONNECT. It will indicate if a connection supports
2034
        // some features, such as message tracing.
2035
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2036
        // in the low level process CONNECT.
2037
        Proto int `json:"protocol,omitempty"`
2038
}
2039

2040
// processLeafNodeConnect will process the inbound connect args.
2041
// Once we are here we are bound to an account, so can send any interest that
2042
// we would have to the other side.
2043
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
686✔
2044
        // Way to detect clients that incorrectly connect to the route listen
686✔
2045
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
686✔
2046
        if lang != _EMPTY_ {
686✔
2047
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2048
                c.closeConnection(WrongPort)
×
2049
                return ErrClientConnectedToLeafNodePort
×
2050
        }
×
2051

2052
        // Unmarshal as a leaf node connect protocol
2053
        proto := &leafConnectInfo{}
686✔
2054
        if err := json.Unmarshal(arg, proto); err != nil {
686✔
2055
                return err
×
2056
        }
×
2057

2058
        // Reject a cluster that contains spaces.
2059
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
687✔
2060
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2061
                c.closeConnection(ProtocolViolation)
1✔
2062
                return ErrClusterNameHasSpaces
1✔
2063
        }
1✔
2064

2065
        // Check for cluster name collisions.
2066
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
688✔
2067
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2068
                c.closeConnection(ClusterNamesIdentical)
3✔
2069
                return ErrLeafNodeHasSameClusterName
3✔
2070
        }
3✔
2071

2072
        // Reject if this has Gateway which means that it would be from a gateway
2073
        // connection that incorrectly connects to the leafnode port.
2074
        if proto.Gateway != _EMPTY_ {
682✔
2075
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2076
                c.Errorf(errTxt)
×
2077
                c.sendErr(errTxt)
×
2078
                c.closeConnection(WrongGateway)
×
2079
                return ErrWrongGateway
×
2080
        }
×
2081

2082
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
684✔
2083
                major, minor, update, _ := versionComponents(mv)
2✔
2084
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2085
                        // We are going to send back an INFO because otherwise recent
1✔
2086
                        // versions of the remote server would simply break the connection
1✔
2087
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
2088
                        // other side to just "stall" until we finish waiting for the holding
1✔
2089
                        // period and close the connection below.
1✔
2090
                        s.sendPermsAndAccountInfo(c)
1✔
2091
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
2092
                        select {
1✔
2093
                        case <-c.srv.quitCh:
1✔
2094
                        case <-time.After(leafNodeWaitBeforeClose):
×
2095
                        }
2096
                        c.closeConnection(MinimumVersionRequired)
1✔
2097
                        return ErrMinimumVersionRequired
1✔
2098
                }
2099
        }
2100

2101
        // Check if this server supports headers.
2102
        supportHeaders := c.srv.supportsHeaders()
681✔
2103

681✔
2104
        c.mu.Lock()
681✔
2105
        // Leaf Nodes do not do echo or verbose or pedantic.
681✔
2106
        c.opts.Verbose = false
681✔
2107
        c.opts.Echo = false
681✔
2108
        c.opts.Pedantic = false
681✔
2109
        // This inbound connection will be marked as supporting headers if this server
681✔
2110
        // support headers and the remote has sent in the CONNECT protocol that it does
681✔
2111
        // support headers too.
681✔
2112
        c.headers = supportHeaders && proto.Headers
681✔
2113
        // If the compression level is still not set, set it based on what has been
681✔
2114
        // given to us in the CONNECT protocol.
681✔
2115
        if c.leaf.compression == _EMPTY_ {
810✔
2116
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
129✔
2117
                if proto.Compression == _EMPTY_ {
168✔
2118
                        c.leaf.compression = CompressionNotSupported
39✔
2119
                } else {
129✔
2120
                        c.leaf.compression = proto.Compression
90✔
2121
                }
90✔
2122
        }
2123

2124
        // Remember the remote server.
2125
        c.leaf.remoteServer = proto.Name
681✔
2126
        // Remember the remote account name
681✔
2127
        c.leaf.remoteAccName = proto.RemoteAccount
681✔
2128
        // Remember if the leafnode requested isolation.
681✔
2129
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
681✔
2130

681✔
2131
        // If the other side has declared itself a hub, so we will take on the spoke role.
681✔
2132
        if proto.Hub {
697✔
2133
                c.leaf.isSpoke = true
16✔
2134
        }
16✔
2135

2136
        // The soliciting side is part of a cluster.
2137
        if proto.Cluster != _EMPTY_ {
1,209✔
2138
                c.leaf.remoteCluster = proto.Cluster
528✔
2139
        }
528✔
2140

2141
        c.leaf.remoteDomain = proto.Domain
681✔
2142

681✔
2143
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
681✔
2144
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
681✔
2145
        if !c.isSolicitedLeafNode() && c.perms != nil {
698✔
2146
                sp, pp := c.perms.sub, c.perms.pub
17✔
2147
                c.perms.sub, c.perms.pub = pp, sp
17✔
2148
                if c.opts.Import != nil {
33✔
2149
                        c.darray = c.opts.Import.Deny
16✔
2150
                } else {
17✔
2151
                        c.darray = nil
1✔
2152
                }
1✔
2153
        }
2154

2155
        // Set the Ping timer
2156
        c.setFirstPingTimer()
681✔
2157

681✔
2158
        // If we received pub deny permissions from the other end, merge with existing ones.
681✔
2159
        c.mergeDenyPermissions(pub, proto.DenyPub)
681✔
2160

681✔
2161
        acc := c.acc
681✔
2162
        c.mu.Unlock()
681✔
2163

681✔
2164
        // Register the cluster, even if empty, as long as we are acting as a hub.
681✔
2165
        if !proto.Hub {
1,346✔
2166
                acc.registerLeafNodeCluster(proto.Cluster)
665✔
2167
        }
665✔
2168

2169
        // Add in the leafnode here since we passed through auth at this point.
2170
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
681✔
2171

681✔
2172
        // If we have permissions bound to this leafnode we need to send then back to the
681✔
2173
        // origin server for local enforcement.
681✔
2174
        s.sendPermsAndAccountInfo(c)
681✔
2175

681✔
2176
        // Create and initialize the smap since we know our bound account now.
681✔
2177
        // This will send all registered subs too.
681✔
2178
        s.initLeafNodeSmapAndSendSubs(c)
681✔
2179

681✔
2180
        // Announce the account connect event for a leaf node.
681✔
2181
        // This will be a no-op as needed.
681✔
2182
        s.sendLeafNodeConnect(c.acc)
681✔
2183

681✔
2184
        // Check to see if we need to kick any internal source or mirror consumers.
681✔
2185
        // This will be a no-op if JetStream not enabled for this server or if the bound account
681✔
2186
        // does not have jetstream.
681✔
2187
        s.checkInternalSyncConsumers(acc)
681✔
2188

681✔
2189
        return nil
681✔
2190
}
2191

2192
// checkInternalSyncConsumers
2193
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,096✔
2194
        // Grab our js
2,096✔
2195
        js := s.getJetStream()
2,096✔
2196

2,096✔
2197
        // Only applicable if we have JS and the leafnode has JS as well.
2,096✔
2198
        // We check for remote JS outside.
2,096✔
2199
        if !js.isEnabled() || acc == nil {
3,297✔
2200
                return
1,201✔
2201
        }
1,201✔
2202

2203
        // We will check all streams in our local account. They must be a leader and
2204
        // be sourcing or mirroring. We will check the external config on the stream itself
2205
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2206
        // extending the system across this leafnode connection and hence we would be extending
2207
        // our own domain.
2208
        jsa := js.lookupAccount(acc)
895✔
2209
        if jsa == nil {
1,246✔
2210
                return
351✔
2211
        }
351✔
2212

2213
        var streams []*stream
544✔
2214
        jsa.mu.RLock()
544✔
2215
        for _, mset := range jsa.streams {
599✔
2216
                mset.cfgMu.RLock()
55✔
2217
                // We need to have a mirror or source defined.
55✔
2218
                // We do not want to force another lock here to look for leader status,
55✔
2219
                // so collect and after we release jsa will make sure.
55✔
2220
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
67✔
2221
                        streams = append(streams, mset)
12✔
2222
                }
12✔
2223
                mset.cfgMu.RUnlock()
55✔
2224
        }
2225
        jsa.mu.RUnlock()
544✔
2226

544✔
2227
        // Now loop through all candidates and check if we are the leader and have NOT
544✔
2228
        // created the sync up consumer.
544✔
2229
        for _, mset := range streams {
556✔
2230
                mset.retryDisconnectedSyncConsumers()
12✔
2231
        }
12✔
2232
}
2233

2234
// Returns the remote cluster name. This is set only once so does not require a lock.
2235
func (c *client) remoteCluster() string {
182,338✔
2236
        if c.leaf == nil {
182,338✔
2237
                return _EMPTY_
×
2238
        }
×
2239
        return c.leaf.remoteCluster
182,338✔
2240
}
2241

2242
// Sends back an info block to the soliciting leafnode to let it know about
2243
// its permission settings for local enforcement.
2244
func (s *Server) sendPermsAndAccountInfo(c *client) {
682✔
2245
        // Copy
682✔
2246
        s.mu.Lock()
682✔
2247
        info := s.copyLeafNodeInfo()
682✔
2248
        s.mu.Unlock()
682✔
2249
        c.mu.Lock()
682✔
2250
        info.CID = c.cid
682✔
2251
        info.Import = c.opts.Import
682✔
2252
        info.Export = c.opts.Export
682✔
2253
        info.RemoteAccount = c.acc.Name
682✔
2254
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
682✔
2255
        info.IsSystemAccount = c.acc == s.SystemAccount()
682✔
2256
        info.ConnectInfo = true
682✔
2257
        c.enqueueProto(generateInfoJSON(info))
682✔
2258
        c.mu.Unlock()
682✔
2259
}
682✔
2260

2261
// Snapshot the current subscriptions from the sublist into our smap which
2262
// we will keep updated from now on.
2263
// Also send the registered subscriptions.
2264
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,321✔
2265
        acc := c.acc
1,321✔
2266
        if acc == nil {
1,321✔
2267
                c.Debugf("Leafnode does not have an account bound")
×
2268
                return
×
2269
        }
×
2270
        // Collect all account subs here.
2271
        _subs := [1024]*subscription{}
1,321✔
2272
        subs := _subs[:0]
1,321✔
2273
        ims := []string{}
1,321✔
2274

1,321✔
2275
        // Hold the client lock otherwise there can be a race and miss some subs.
1,321✔
2276
        c.mu.Lock()
1,321✔
2277
        defer c.mu.Unlock()
1,321✔
2278

1,321✔
2279
        acc.mu.RLock()
1,321✔
2280
        accName := acc.Name
1,321✔
2281
        accNTag := acc.nameTag
1,321✔
2282

1,321✔
2283
        // To make printing look better when no friendly name present.
1,321✔
2284
        if accNTag != _EMPTY_ {
1,333✔
2285
                accNTag = "/" + accNTag
12✔
2286
        }
12✔
2287

2288
        // If we are solicited we only send interest for local clients.
2289
        if c.isSpokeLeafNode() {
1,961✔
2290
                acc.sl.localSubs(&subs, true)
640✔
2291
        } else {
1,321✔
2292
                acc.sl.All(&subs)
681✔
2293
        }
681✔
2294

2295
        // Check if we have an existing service import reply.
2296
        siReply := copyBytes(acc.siReply)
1,321✔
2297

1,321✔
2298
        // Since leaf nodes only send on interest, if the bound
1,321✔
2299
        // account has import services we need to send those over.
1,321✔
2300
        for isubj := range acc.imports.services {
6,259✔
2301
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,229✔
2302
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
291✔
2303
                        continue
291✔
2304
                }
2305
                ims = append(ims, isubj)
4,647✔
2306
        }
2307
        // Likewise for mappings.
2308
        for _, m := range acc.mappings {
3,710✔
2309
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,435✔
2310
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2311
                        continue
46✔
2312
                }
2313
                ims = append(ims, m.src)
2,343✔
2314
        }
2315

2316
        // Create a unique subject that will be used for loop detection.
2317
        lds := acc.lds
1,321✔
2318
        acc.mu.RUnlock()
1,321✔
2319

1,321✔
2320
        // Check if we have to create the LDS.
1,321✔
2321
        if lds == _EMPTY_ {
2,359✔
2322
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,038✔
2323
                acc.mu.Lock()
1,038✔
2324
                acc.lds = lds
1,038✔
2325
                acc.mu.Unlock()
1,038✔
2326
        }
1,038✔
2327

2328
        // Now check for gateway interest. Leafnodes will put this into
2329
        // the proper mode to propagate, but they are not held in the account.
2330
        gwsa := [16]*client{}
1,321✔
2331
        gws := gwsa[:0]
1,321✔
2332
        s.getOutboundGatewayConnections(&gws)
1,321✔
2333
        for _, cgw := range gws {
1,404✔
2334
                cgw.mu.Lock()
83✔
2335
                gw := cgw.gw
83✔
2336
                cgw.mu.Unlock()
83✔
2337
                if gw != nil {
166✔
2338
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
166✔
2339
                                if e := ei.(*outsie); e != nil && e.sl != nil {
166✔
2340
                                        e.sl.All(&subs)
83✔
2341
                                }
83✔
2342
                        }
2343
                }
2344
        }
2345

2346
        applyGlobalRouting := s.gateway.enabled
1,321✔
2347
        if c.isSpokeLeafNode() {
1,961✔
2348
                // Add a fake subscription for this solicited leafnode connection
640✔
2349
                // so that we can send back directly for mapped GW replies.
640✔
2350
                // We need to keep track of this subscription so it can be removed
640✔
2351
                // when the connection is closed so that the GC can release it.
640✔
2352
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
640✔
2353
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
640✔
2354
        }
640✔
2355

2356
        // Now walk the results and add them to our smap
2357
        rc := c.leaf.remoteCluster
1,321✔
2358
        c.leaf.smap = make(map[string]int32)
1,321✔
2359
        for _, sub := range subs {
39,139✔
2360
                // Check perms regardless of role.
37,818✔
2361
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,196✔
2362
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,378✔
2363
                        continue
2,378✔
2364
                }
2365
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2366
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
35,455✔
2367
                        continue
15✔
2368
                }
2369
                // We ignore ourselves here.
2370
                // Also don't add the subscription if it has a origin cluster and the
2371
                // cluster name matches the one of the client we are sending to.
2372
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,510✔
2373
                        count := int32(1)
30,085✔
2374
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,095✔
2375
                                count = sub.qw
10✔
2376
                        }
10✔
2377
                        c.leaf.smap[keyFromSub(sub)] += count
30,085✔
2378
                        if c.leaf.tsub == nil {
31,329✔
2379
                                c.leaf.tsub = make(map[*subscription]struct{})
1,244✔
2380
                        }
1,244✔
2381
                        c.leaf.tsub[sub] = struct{}{}
30,085✔
2382
                }
2383
        }
2384
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2385
        for _, isubj := range ims {
8,311✔
2386
                c.leaf.smap[isubj]++
6,990✔
2387
        }
6,990✔
2388
        // If we have gateways enabled we need to make sure the other side sends us responses
2389
        // that have been augmented from the original subscription.
2390
        // TODO(dlc) - Should we lock this down more?
2391
        if applyGlobalRouting {
1,425✔
2392
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
104✔
2393
                c.leaf.smap[gwReplyPrefix+">"]++
104✔
2394
        }
104✔
2395
        // Detect loops by subscribing to a specific subject and checking
2396
        // if this sub is coming back to us.
2397
        c.leaf.smap[lds]++
1,321✔
2398

1,321✔
2399
        // Check if we need to add an existing siReply to our map.
1,321✔
2400
        // This will be a prefix so add on the wildcard.
1,321✔
2401
        if siReply != nil {
1,336✔
2402
                wcsub := append(siReply, '>')
15✔
2403
                c.leaf.smap[string(wcsub)]++
15✔
2404
        }
15✔
2405
        // Queue all protocols. There is no max pending limit for LN connection,
2406
        // so we don't need chunking. The writes will happen from the writeLoop.
2407
        var b bytes.Buffer
1,321✔
2408
        for key, n := range c.leaf.smap {
28,134✔
2409
                c.writeLeafSub(&b, key, n)
26,813✔
2410
        }
26,813✔
2411
        if b.Len() > 0 {
2,642✔
2412
                c.enqueueProto(b.Bytes())
1,321✔
2413
        }
1,321✔
2414
        if c.leaf.tsub != nil {
2,566✔
2415
                // Clear the tsub map after 5 seconds.
1,245✔
2416
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,282✔
2417
                        c.mu.Lock()
37✔
2418
                        if c.leaf != nil {
74✔
2419
                                c.leaf.tsub = nil
37✔
2420
                                c.leaf.tsubt = nil
37✔
2421
                        }
37✔
2422
                        c.mu.Unlock()
37✔
2423
                })
2424
        }
2425
}
2426

2427
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2428
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
201,691✔
2429
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
201,691✔
2430
        acc, err := s.lookupOrFetchAccount(accName, false)
201,691✔
2431
        if acc == nil || err != nil {
201,932✔
2432
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
241✔
2433
                return
241✔
2434
        }
241✔
2435
        acc.updateLeafNodes(sub, delta)
201,450✔
2436
}
2437

2438
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2439
// Will also forward to all leaf nodes as needed.
2440
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2441
// (that is, for which this server acts as a hub to them).
2442
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,474,570✔
2443
        if acc == nil || sub == nil {
2,474,570✔
2444
                return
×
2445
        }
×
2446

2447
        // We will do checks for no leafnodes and same cluster here inline and under the
2448
        // general account read lock.
2449
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2450
        // blocking routes or GWs.
2451

2452
        acc.mu.RLock()
2,474,570✔
2453
        // First check if we even have leafnodes here.
2,474,570✔
2454
        if acc.nleafs == 0 {
4,879,598✔
2455
                acc.mu.RUnlock()
2,405,028✔
2456
                return
2,405,028✔
2457
        }
2,405,028✔
2458

2459
        // Is this a loop detection subject.
2460
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
69,542✔
2461

69,542✔
2462
        // Capture the cluster even if its empty.
69,542✔
2463
        var cluster string
69,542✔
2464
        if sub.origin != nil {
119,626✔
2465
                cluster = bytesToString(sub.origin)
50,084✔
2466
        }
50,084✔
2467

2468
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2469
        // Empty clusters will return false for the check.
2470
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
91,862✔
2471
                acc.mu.RUnlock()
22,320✔
2472
                return
22,320✔
2473
        }
22,320✔
2474

2475
        // We can release the general account lock.
2476
        acc.mu.RUnlock()
47,222✔
2477

47,222✔
2478
        // We can hold the list lock here to avoid having to copy a large slice.
47,222✔
2479
        acc.lmu.RLock()
47,222✔
2480
        defer acc.lmu.RUnlock()
47,222✔
2481

47,222✔
2482
        // Do this once.
47,222✔
2483
        subject := string(sub.subject)
47,222✔
2484

47,222✔
2485
        // Walk the connected leafnodes.
47,222✔
2486
        for _, ln := range acc.lleafs {
105,954✔
2487
                if ln == sub.client {
89,180✔
2488
                        continue
30,448✔
2489
                }
2490
                ln.mu.Lock()
28,284✔
2491
                // Don't advertise interest from leafnodes to other isolated leafnodes.
28,284✔
2492
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
28,316✔
2493
                        ln.mu.Unlock()
32✔
2494
                        continue
32✔
2495
                }
2496
                // If `hubOnly` is true, it means that we want to update only leafnodes
2497
                // that connect to this server (so isHubLeafNode() would return `true`).
2498
                if hubOnly && !ln.isHubLeafNode() {
28,258✔
2499
                        ln.mu.Unlock()
6✔
2500
                        continue
6✔
2501
                }
2502
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2503
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2504
                // the detection of loops as long as different cluster.
2505
                clusterDifferent := cluster != ln.remoteCluster()
28,246✔
2506
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
52,264✔
2507
                        ln.updateSmap(sub, delta, isLDS)
24,018✔
2508
                }
24,018✔
2509
                ln.mu.Unlock()
28,246✔
2510
        }
2511
}
2512

2513
// updateLeafNodes will make sure to update the account smap for the subscription.
2514
// Will also forward to all leaf nodes as needed.
2515
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,474,547✔
2516
        acc.updateLeafNodesEx(sub, delta, false)
2,474,547✔
2517
}
2,474,547✔
2518

2519
// This will make an update to our internal smap and determine if we should send out
2520
// an interest update to the remote side.
2521
// Lock should be held.
2522
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
24,018✔
2523
        if c.leaf.smap == nil {
24,042✔
2524
                return
24✔
2525
        }
24✔
2526

2527
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2528
        skind := sub.client.kind
23,994✔
2529
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,994✔
2530
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,346✔
2531
                return
8,352✔
2532
        }
8,352✔
2533

2534
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2535
        if delta > 0 && c.leaf.tsub != nil {
23,223✔
2536
                if _, present := c.leaf.tsub[sub]; present {
7,581✔
2537
                        delete(c.leaf.tsub, sub)
×
2538
                        if len(c.leaf.tsub) == 0 {
×
2539
                                c.leaf.tsub = nil
×
2540
                                c.leaf.tsubt.Stop()
×
2541
                                c.leaf.tsubt = nil
×
2542
                        }
×
2543
                        return
×
2544
                }
2545
        }
2546

2547
        key := keyFromSub(sub)
15,642✔
2548
        n, ok := c.leaf.smap[key]
15,642✔
2549
        if delta < 0 && !ok {
16,687✔
2550
                return
1,045✔
2551
        }
1,045✔
2552

2553
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2554
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,597✔
2555
        n += delta
14,597✔
2556
        if n > 0 {
25,439✔
2557
                c.leaf.smap[key] = n
10,842✔
2558
        } else {
14,597✔
2559
                delete(c.leaf.smap, key)
3,755✔
2560
        }
3,755✔
2561
        if update {
24,559✔
2562
                c.sendLeafNodeSubUpdate(key, n)
9,962✔
2563
        }
9,962✔
2564
}
2565

2566
// Used to force add subjects to the subject map.
2567
func (c *client) forceAddToSmap(subj string) {
13✔
2568
        c.mu.Lock()
13✔
2569
        defer c.mu.Unlock()
13✔
2570

13✔
2571
        if c.leaf.smap == nil {
13✔
2572
                return
×
2573
        }
×
2574
        n := c.leaf.smap[subj]
13✔
2575
        if n != 0 {
14✔
2576
                return
1✔
2577
        }
1✔
2578
        // Place into the map since it was not there.
2579
        c.leaf.smap[subj] = 1
12✔
2580
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2581
}
2582

2583
// Used to force remove a subject from the subject map.
2584
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2585
        c.mu.Lock()
1✔
2586
        defer c.mu.Unlock()
1✔
2587

1✔
2588
        if c.leaf.smap == nil {
1✔
2589
                return
×
2590
        }
×
2591
        n := c.leaf.smap[subj]
1✔
2592
        if n == 0 {
1✔
2593
                return
×
2594
        }
×
2595
        n--
1✔
2596
        if n == 0 {
2✔
2597
                // Remove is now zero
1✔
2598
                delete(c.leaf.smap, subj)
1✔
2599
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2600
        } else {
1✔
2601
                c.leaf.smap[subj] = n
×
2602
        }
×
2603
}
2604

2605
// Send the subscription interest change to the other side.
2606
// Lock should be held.
2607
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,975✔
2608
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,975✔
2609
        if c.isSpokeLeafNode() {
12,428✔
2610
                checkPerms := true
2,453✔
2611
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,919✔
2612
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,466✔
2613
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,466✔
2614
                                strings.HasPrefix(key, gwReplyPrefix) {
1,558✔
2615
                                checkPerms = false
92✔
2616
                        }
92✔
2617
                }
2618
                if checkPerms {
4,814✔
2619
                        var subject string
2,361✔
2620
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,853✔
2621
                                subject = key[:sep]
492✔
2622
                        } else {
2,361✔
2623
                                subject = key
1,869✔
2624
                        }
1,869✔
2625
                        if !c.canSubscribe(subject) {
2,370✔
2626
                                return
9✔
2627
                        }
9✔
2628
                }
2629
        }
2630
        // If we are here we can send over to the other side.
2631
        _b := [64]byte{}
9,966✔
2632
        b := bytes.NewBuffer(_b[:0])
9,966✔
2633
        c.writeLeafSub(b, key, n)
9,966✔
2634
        c.enqueueProto(b.Bytes())
9,966✔
2635
}
2636

2637
// Helper function to build the key.
2638
func keyFromSub(sub *subscription) string {
46,757✔
2639
        var sb strings.Builder
46,757✔
2640
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
46,757✔
2641
        sb.Write(sub.subject)
46,757✔
2642
        if sub.queue != nil {
50,555✔
2643
                // Just make the key subject spc group, e.g. 'foo bar'
3,798✔
2644
                sb.WriteByte(' ')
3,798✔
2645
                sb.Write(sub.queue)
3,798✔
2646
        }
3,798✔
2647
        return sb.String()
46,757✔
2648
}
2649

2650
const (
2651
        keyRoutedSub         = "R"
2652
        keyRoutedSubByte     = 'R'
2653
        keyRoutedLeafSub     = "L"
2654
        keyRoutedLeafSubByte = 'L'
2655
)
2656

2657
// Helper function to build the key that prevents collisions between normal
2658
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2659
// Keys will look like this:
2660
// "R foo"          -> plain routed sub on "foo"
2661
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2662
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2663
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2664
func keyFromSubWithOrigin(sub *subscription) string {
694,640✔
2665
        var sb strings.Builder
694,640✔
2666
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
694,640✔
2667
        leaf := len(sub.origin) > 0
694,640✔
2668
        if leaf {
711,253✔
2669
                sb.WriteByte(keyRoutedLeafSubByte)
16,613✔
2670
        } else {
694,640✔
2671
                sb.WriteByte(keyRoutedSubByte)
678,027✔
2672
        }
678,027✔
2673
        sb.WriteByte(' ')
694,640✔
2674
        sb.Write(sub.subject)
694,640✔
2675
        if sub.queue != nil {
720,568✔
2676
                sb.WriteByte(' ')
25,928✔
2677
                sb.Write(sub.queue)
25,928✔
2678
        }
25,928✔
2679
        if leaf {
711,253✔
2680
                sb.WriteByte(' ')
16,613✔
2681
                sb.Write(sub.origin)
16,613✔
2682
        }
16,613✔
2683
        return sb.String()
694,640✔
2684
}
2685

2686
// Lock should be held.
2687
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
36,779✔
2688
        if key == _EMPTY_ {
36,779✔
2689
                return
×
2690
        }
×
2691
        if n > 0 {
69,802✔
2692
                w.WriteString("LS+ " + key)
33,023✔
2693
                // Check for queue semantics, if found write n.
33,023✔
2694
                if strings.Contains(key, " ") {
35,318✔
2695
                        w.WriteString(" ")
2,295✔
2696
                        var b [12]byte
2,295✔
2697
                        var i = len(b)
2,295✔
2698
                        for l := n; l > 0; l /= 10 {
5,484✔
2699
                                i--
3,189✔
2700
                                b[i] = digits[l%10]
3,189✔
2701
                        }
3,189✔
2702
                        w.Write(b[i:])
2,295✔
2703
                        if c.trace {
2,295✔
2704
                                arg := fmt.Sprintf("%s %d", key, n)
×
2705
                                c.traceOutOp("LS+", []byte(arg))
×
2706
                        }
×
2707
                } else if c.trace {
30,919✔
2708
                        c.traceOutOp("LS+", []byte(key))
191✔
2709
                }
191✔
2710
        } else {
3,756✔
2711
                w.WriteString("LS- " + key)
3,756✔
2712
                if c.trace {
3,771✔
2713
                        c.traceOutOp("LS-", []byte(key))
15✔
2714
                }
15✔
2715
        }
2716
        w.WriteString(CR_LF)
36,779✔
2717
}
2718

2719
// processLeafSub will process an inbound sub request for the remote leaf node.
2720
func (c *client) processLeafSub(argo []byte) (err error) {
32,714✔
2721
        // Indicate activity.
32,714✔
2722
        c.in.subs++
32,714✔
2723

32,714✔
2724
        srv := c.srv
32,714✔
2725
        if srv == nil {
32,714✔
2726
                return nil
×
2727
        }
×
2728

2729
        // Copy so we do not reference a potentially large buffer
2730
        arg := make([]byte, len(argo))
32,714✔
2731
        copy(arg, argo)
32,714✔
2732

32,714✔
2733
        args := splitArg(arg)
32,714✔
2734
        sub := &subscription{client: c}
32,714✔
2735

32,714✔
2736
        delta := int32(1)
32,714✔
2737
        switch len(args) {
32,714✔
2738
        case 1:
30,470✔
2739
                sub.queue = nil
30,470✔
2740
        case 3:
2,244✔
2741
                sub.queue = args[1]
2,244✔
2742
                sub.qw = int32(parseSize(args[2]))
2,244✔
2743
                // TODO: (ik) We should have a non empty queue name and a queue
2,244✔
2744
                // weight >= 1. For 2.11, we may want to return an error if that
2,244✔
2745
                // is not the case, but for now just overwrite `delta` if queue
2,244✔
2746
                // weight is greater than 1 (it is possible after a reconnect/
2,244✔
2747
                // server restart to receive a queue weight > 1 for a new sub).
2,244✔
2748
                if sub.qw > 1 {
3,877✔
2749
                        delta = sub.qw
1,633✔
2750
                }
1,633✔
2751
        default:
×
2752
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2753
        }
2754
        sub.subject = args[0]
32,714✔
2755

32,714✔
2756
        c.mu.Lock()
32,714✔
2757
        if c.isClosed() {
32,739✔
2758
                c.mu.Unlock()
25✔
2759
                return nil
25✔
2760
        }
25✔
2761

2762
        acc := c.acc
32,689✔
2763
        // Check if we have a loop.
32,689✔
2764
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
32,689✔
2765

32,689✔
2766
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
32,694✔
2767
                c.mu.Unlock()
5✔
2768
                c.handleLeafNodeLoop(true)
5✔
2769
                return nil
5✔
2770
        }
5✔
2771

2772
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2773
        checkPerms := true
32,684✔
2774
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
62,429✔
2775
                if ldsPrefix ||
29,745✔
2776
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
29,745✔
2777
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
31,777✔
2778
                        checkPerms = false
2,032✔
2779
                }
2,032✔
2780
        }
2781

2782
        // If we are a hub check that we can publish to this subject.
2783
        if checkPerms {
63,336✔
2784
                subj := string(sub.subject)
30,652✔
2785
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,966✔
2786
                        c.mu.Unlock()
314✔
2787
                        c.leafSubPermViolation(sub.subject)
314✔
2788
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
314✔
2789
                        return nil
314✔
2790
                }
314✔
2791
        }
2792

2793
        // Check if we have a maximum on the number of subscriptions.
2794
        if c.subsAtLimit() {
32,378✔
2795
                c.mu.Unlock()
8✔
2796
                c.maxSubsExceeded()
8✔
2797
                return nil
8✔
2798
        }
8✔
2799

2800
        // If we have an origin cluster associated mark that in the sub.
2801
        if rc := c.remoteCluster(); rc != _EMPTY_ {
61,125✔
2802
                sub.origin = []byte(rc)
28,763✔
2803
        }
28,763✔
2804

2805
        // Like Routes, we store local subs by account and subject and optionally queue name.
2806
        // If we have a queue it will have a trailing weight which we do not want.
2807
        if sub.queue != nil {
34,321✔
2808
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,959✔
2809
        } else {
32,362✔
2810
                sub.sid = arg
30,403✔
2811
        }
30,403✔
2812
        key := bytesToString(sub.sid)
32,362✔
2813
        osub := c.subs[key]
32,362✔
2814
        if osub == nil {
63,222✔
2815
                c.subs[key] = sub
30,860✔
2816
                // Now place into the account sl.
30,860✔
2817
                if err := acc.sl.Insert(sub); err != nil {
30,860✔
2818
                        delete(c.subs, key)
×
2819
                        c.mu.Unlock()
×
2820
                        c.Errorf("Could not insert subscription: %v", err)
×
2821
                        c.sendErr("Invalid Subscription")
×
2822
                        return nil
×
2823
                }
×
2824
        } else if sub.queue != nil {
3,003✔
2825
                // For a queue we need to update the weight.
1,501✔
2826
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,501✔
2827
                atomic.StoreInt32(&osub.qw, sub.qw)
1,501✔
2828
                acc.sl.UpdateRemoteQSub(osub)
1,501✔
2829
        }
1,501✔
2830
        spoke := c.isSpokeLeafNode()
32,362✔
2831
        c.mu.Unlock()
32,362✔
2832

32,362✔
2833
        // Only add in shadow subs if a new sub or qsub.
32,362✔
2834
        if osub == nil {
63,222✔
2835
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
30,860✔
2836
                        c.Errorf(err.Error())
×
2837
                }
×
2838
        }
2839

2840
        // If we are not solicited, treat leaf node subscriptions similar to a
2841
        // client subscription, meaning we forward them to routes, gateways and
2842
        // other leaf nodes as needed.
2843
        if !spoke {
43,722✔
2844
                // If we are routing add to the route map for the associated account.
11,360✔
2845
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,360✔
2846
                if srv.gateway.enabled {
12,892✔
2847
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,532✔
2848
                }
1,532✔
2849
        }
2850
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2851
        // and non-solicited state in this call so we will do the right thing.
2852
        acc.updateLeafNodes(sub, delta)
32,362✔
2853

32,362✔
2854
        return nil
32,362✔
2855
}
2856

2857
// If the leafnode is a solicited, set the connect delay based on default
2858
// or private option (for tests). Sends the error to the other side, log and
2859
// close the connection.
2860
func (c *client) handleLeafNodeLoop(sendErr bool) {
14✔
2861
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
14✔
2862
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
14✔
2863
        if sendErr {
21✔
2864
                c.sendErr(errTxt)
7✔
2865
        }
7✔
2866

2867
        c.Errorf(errTxt)
14✔
2868
        // If we are here with "sendErr" false, it means that this is the server
14✔
2869
        // that received the error. The other side will have closed the connection,
14✔
2870
        // but does not hurt to close here too.
14✔
2871
        c.closeConnection(ProtocolViolation)
14✔
2872
}
2873

2874
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2875
func (c *client) processLeafUnsub(arg []byte) error {
3,474✔
2876
        // Indicate any activity, so pub and sub or unsubs.
3,474✔
2877
        c.in.subs++
3,474✔
2878

3,474✔
2879
        acc := c.acc
3,474✔
2880
        srv := c.srv
3,474✔
2881

3,474✔
2882
        c.mu.Lock()
3,474✔
2883
        if c.isClosed() {
3,507✔
2884
                c.mu.Unlock()
33✔
2885
                return nil
33✔
2886
        }
33✔
2887

2888
        spoke := c.isSpokeLeafNode()
3,441✔
2889
        // We store local subs by account and subject and optionally queue name.
3,441✔
2890
        // LS- will have the arg exactly as the key.
3,441✔
2891
        sub, ok := c.subs[string(arg)]
3,441✔
2892
        if !ok {
3,451✔
2893
                // If not found, don't try to update routes/gws/leaf nodes.
10✔
2894
                c.mu.Unlock()
10✔
2895
                return nil
10✔
2896
        }
10✔
2897
        delta := int32(1)
3,431✔
2898
        if len(sub.queue) > 0 {
3,849✔
2899
                delta = sub.qw
418✔
2900
        }
418✔
2901
        c.mu.Unlock()
3,431✔
2902

3,431✔
2903
        c.unsubscribe(acc, sub, true, true)
3,431✔
2904
        if !spoke {
4,465✔
2905
                // If we are routing subtract from the route map for the associated account.
1,034✔
2906
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,034✔
2907
                // Gateways
1,034✔
2908
                if srv.gateway.enabled {
1,338✔
2909
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
304✔
2910
                }
304✔
2911
        }
2912
        // Now check on leafnode updates for other leaf nodes.
2913
        acc.updateLeafNodes(sub, -delta)
3,431✔
2914
        return nil
3,431✔
2915
}
2916

2917
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
505✔
2918
        // Unroll splitArgs to avoid runtime/heap issues
505✔
2919
        a := [MAX_MSG_ARGS][]byte{}
505✔
2920
        args := a[:0]
505✔
2921
        start := -1
505✔
2922
        for i, b := range arg {
33,672✔
2923
                switch b {
33,167✔
2924
                case ' ', '\t', '\r', '\n':
1,442✔
2925
                        if start >= 0 {
2,884✔
2926
                                args = append(args, arg[start:i])
1,442✔
2927
                                start = -1
1,442✔
2928
                        }
1,442✔
2929
                default:
31,725✔
2930
                        if start < 0 {
33,672✔
2931
                                start = i
1,947✔
2932
                        }
1,947✔
2933
                }
2934
        }
2935
        if start >= 0 {
1,010✔
2936
                args = append(args, arg[start:])
505✔
2937
        }
505✔
2938

2939
        c.pa.arg = arg
505✔
2940
        switch len(args) {
505✔
2941
        case 0, 1, 2:
×
2942
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2943
        case 3:
91✔
2944
                c.pa.reply = nil
91✔
2945
                c.pa.queues = nil
91✔
2946
                c.pa.hdb = args[1]
91✔
2947
                c.pa.hdr = parseSize(args[1])
91✔
2948
                c.pa.szb = args[2]
91✔
2949
                c.pa.size = parseSize(args[2])
91✔
2950
        case 4:
400✔
2951
                c.pa.reply = args[1]
400✔
2952
                c.pa.queues = nil
400✔
2953
                c.pa.hdb = args[2]
400✔
2954
                c.pa.hdr = parseSize(args[2])
400✔
2955
                c.pa.szb = args[3]
400✔
2956
                c.pa.size = parseSize(args[3])
400✔
2957
        default:
14✔
2958
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2959
                if len(args[1]) != 1 {
14✔
2960
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2961
                }
×
2962
                switch args[1][0] {
14✔
2963
                case '+':
4✔
2964
                        c.pa.reply = args[2]
4✔
2965
                case '|':
10✔
2966
                        c.pa.reply = nil
10✔
2967
                default:
×
2968
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2969
                }
2970
                // Grab header size.
2971
                c.pa.hdb = args[len(args)-2]
14✔
2972
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2973

14✔
2974
                // Grab size.
14✔
2975
                c.pa.szb = args[len(args)-1]
14✔
2976
                c.pa.size = parseSize(c.pa.szb)
14✔
2977

14✔
2978
                // Grab queue names.
14✔
2979
                if c.pa.reply != nil {
18✔
2980
                        c.pa.queues = args[3 : len(args)-2]
4✔
2981
                } else {
14✔
2982
                        c.pa.queues = args[2 : len(args)-2]
10✔
2983
                }
10✔
2984
        }
2985
        if c.pa.hdr < 0 {
505✔
2986
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2987
        }
×
2988
        if c.pa.size < 0 {
505✔
2989
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2990
        }
×
2991
        if c.pa.hdr > c.pa.size {
505✔
2992
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
2993
        }
×
2994

2995
        // Common ones processed after check for arg length
2996
        c.pa.subject = args[0]
505✔
2997

505✔
2998
        return nil
505✔
2999
}
3000

3001
func (c *client) processLeafMsgArgs(arg []byte) error {
98,309✔
3002
        // Unroll splitArgs to avoid runtime/heap issues
98,309✔
3003
        a := [MAX_MSG_ARGS][]byte{}
98,309✔
3004
        args := a[:0]
98,309✔
3005
        start := -1
98,309✔
3006
        for i, b := range arg {
3,170,909✔
3007
                switch b {
3,072,600✔
3008
                case ' ', '\t', '\r', '\n':
150,080✔
3009
                        if start >= 0 {
300,160✔
3010
                                args = append(args, arg[start:i])
150,080✔
3011
                                start = -1
150,080✔
3012
                        }
150,080✔
3013
                default:
2,922,520✔
3014
                        if start < 0 {
3,170,909✔
3015
                                start = i
248,389✔
3016
                        }
248,389✔
3017
                }
3018
        }
3019
        if start >= 0 {
196,618✔
3020
                args = append(args, arg[start:])
98,309✔
3021
        }
98,309✔
3022

3023
        c.pa.arg = arg
98,309✔
3024
        switch len(args) {
98,309✔
3025
        case 0, 1:
×
3026
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3027
        case 2:
69,249✔
3028
                c.pa.reply = nil
69,249✔
3029
                c.pa.queues = nil
69,249✔
3030
                c.pa.szb = args[1]
69,249✔
3031
                c.pa.size = parseSize(args[1])
69,249✔
3032
        case 3:
6,509✔
3033
                c.pa.reply = args[1]
6,509✔
3034
                c.pa.queues = nil
6,509✔
3035
                c.pa.szb = args[2]
6,509✔
3036
                c.pa.size = parseSize(args[2])
6,509✔
3037
        default:
22,551✔
3038
                // args[1] is our reply indicator. Should be + or | normally.
22,551✔
3039
                if len(args[1]) != 1 {
22,551✔
3040
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3041
                }
×
3042
                switch args[1][0] {
22,551✔
3043
                case '+':
160✔
3044
                        c.pa.reply = args[2]
160✔
3045
                case '|':
22,391✔
3046
                        c.pa.reply = nil
22,391✔
3047
                default:
×
3048
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3049
                }
3050
                // Grab size.
3051
                c.pa.szb = args[len(args)-1]
22,551✔
3052
                c.pa.size = parseSize(c.pa.szb)
22,551✔
3053

22,551✔
3054
                // Grab queue names.
22,551✔
3055
                if c.pa.reply != nil {
22,711✔
3056
                        c.pa.queues = args[3 : len(args)-1]
160✔
3057
                } else {
22,551✔
3058
                        c.pa.queues = args[2 : len(args)-1]
22,391✔
3059
                }
22,391✔
3060
        }
3061
        if c.pa.size < 0 {
98,309✔
3062
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3063
        }
×
3064

3065
        // Common ones processed after check for arg length
3066
        c.pa.subject = args[0]
98,309✔
3067

98,309✔
3068
        return nil
98,309✔
3069
}
3070

3071
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3072
func (c *client) processInboundLeafMsg(msg []byte) {
96,881✔
3073
        // Update statistics
96,881✔
3074
        // The msg includes the CR_LF, so pull back out for accounting.
96,881✔
3075
        c.in.msgs++
96,881✔
3076
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
96,881✔
3077

96,881✔
3078
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
96,881✔
3079

96,881✔
3080
        // Mostly under testing scenarios.
96,881✔
3081
        if srv == nil || acc == nil {
96,882✔
3082
                return
1✔
3083
        }
1✔
3084

3085
        // Match the subscriptions. We will use our own L1 map if
3086
        // it's still valid, avoiding contention on the shared sublist.
3087
        var r *SublistResult
96,880✔
3088
        var ok bool
96,880✔
3089

96,880✔
3090
        genid := atomic.LoadUint64(&c.acc.sl.genid)
96,880✔
3091
        if genid == c.in.genid && c.in.results != nil {
191,338✔
3092
                r, ok = c.in.results[subject]
94,458✔
3093
        } else {
96,880✔
3094
                // Reset our L1 completely.
2,422✔
3095
                c.in.results = make(map[string]*SublistResult)
2,422✔
3096
                c.in.genid = genid
2,422✔
3097
        }
2,422✔
3098

3099
        // Go back to the sublist data structure.
3100
        if !ok {
163,256✔
3101
                r = c.acc.sl.Match(subject)
66,376✔
3102
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
66,376✔
3103
                if len(c.in.results) >= maxResultCacheSize {
68,207✔
3104
                        n := 0
1,831✔
3105
                        for subj := range c.in.results {
62,254✔
3106
                                delete(c.in.results, subj)
60,423✔
3107
                                if n++; n > pruneSize {
62,254✔
3108
                                        break
1,831✔
3109
                                }
3110
                        }
3111
                }
3112
                // Then add the new cache entry.
3113
                c.in.results[subject] = r
66,376✔
3114
        }
3115

3116
        // Collect queue names if needed.
3117
        var qnames [][]byte
96,880✔
3118

96,880✔
3119
        // Check for no interest, short circuit if so.
96,880✔
3120
        // This is the fanout scale.
96,880✔
3121
        if len(r.psubs)+len(r.qsubs) > 0 {
193,476✔
3122
                flag := pmrNoFlag
96,596✔
3123
                // If we have queue subs in this cluster, then if we run in gateway
96,596✔
3124
                // mode and the remote gateways have queue subs, then we need to
96,596✔
3125
                // collect the queue groups this message was sent to so that we
96,596✔
3126
                // exclude them when sending to gateways.
96,596✔
3127
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
96,596✔
3128
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
108,849✔
3129
                        flag |= pmrCollectQueueNames
12,253✔
3130
                }
12,253✔
3131
                // If this is a mapped subject that means the mapped interest
3132
                // is what got us here, but this might not have a queue designation
3133
                // If that is the case, make sure we ignore to process local queue subscribers.
3134
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
96,934✔
3135
                        flag |= pmrIgnoreEmptyQueueFilter
338✔
3136
                }
338✔
3137
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
96,596✔
3138
        }
3139

3140
        // Now deal with gateways
3141
        if c.srv.gateway.enabled {
110,132✔
3142
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,252✔
3143
        }
13,252✔
3144
}
3145

3146
// Handles a subscription permission violation.
3147
// See leafPermViolation() for details.
3148
func (c *client) leafSubPermViolation(subj []byte) {
314✔
3149
        c.leafPermViolation(false, subj)
314✔
3150
}
314✔
3151

3152
// Common function to process publish or subscribe leafnode permission violation.
3153
// Sends the permission violation error to the remote, logs it and closes the connection.
3154
// If this is from a server soliciting, the reconnection will be delayed.
3155
func (c *client) leafPermViolation(pub bool, subj []byte) {
314✔
3156
        if c.isSpokeLeafNode() {
628✔
3157
                // For spokes these are no-ops since the hub server told us our permissions.
314✔
3158
                // We just need to not send these over to the other side since we will get cutoff.
314✔
3159
                return
314✔
3160
        }
314✔
3161
        // FIXME(dlc) ?
3162
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3163
        var action string
×
3164
        if pub {
×
3165
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3166
                action = "Publish"
×
3167
        } else {
×
3168
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3169
                action = "Subscription"
×
3170
        }
×
3171
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3172
        // TODO: add a new close reason that is more appropriate?
×
3173
        c.closeConnection(ProtocolViolation)
×
3174
}
3175

3176
// Invoked from generic processErr() for LEAF connections.
3177
func (c *client) leafProcessErr(errStr string) {
46✔
3178
        // Check if we got a cluster name collision.
46✔
3179
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
49✔
3180
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3181
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3182
                return
3✔
3183
        }
3✔
3184

3185
        // We will look for Loop detected error coming from the other side.
3186
        // If we solicit, set the connect delay.
3187
        if !strings.Contains(errStr, "Loop detected") {
79✔
3188
                return
36✔
3189
        }
36✔
3190
        c.handleLeafNodeLoop(false)
7✔
3191
}
3192

3193
// If this leaf connection solicits, sets the connect delay to the given value,
3194
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3195
// Returns the connection's account name and delay.
3196
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
17✔
3197
        c.mu.Lock()
17✔
3198
        if c.isSolicitedLeafNode() {
27✔
3199
                if s := c.srv; s != nil {
20✔
3200
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
14✔
3201
                                delay = srvdelay
4✔
3202
                        }
4✔
3203
                }
3204
                c.leaf.remote.setConnectDelay(delay)
10✔
3205
        }
3206
        accName := c.acc.Name
17✔
3207
        c.mu.Unlock()
17✔
3208
        return accName, delay
17✔
3209
}
3210

3211
// For the given remote Leafnode configuration, this function returns
3212
// if TLS is required, and if so, will return a clone of the TLS Config
3213
// (since some fields will be changed during handshake), the TLS server
3214
// name that is remembered, and the TLS timeout.
3215
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,948✔
3216
        var (
1,948✔
3217
                tlsConfig  *tls.Config
1,948✔
3218
                tlsName    string
1,948✔
3219
                tlsTimeout float64
1,948✔
3220
        )
1,948✔
3221

1,948✔
3222
        remote.RLock()
1,948✔
3223
        defer remote.RUnlock()
1,948✔
3224

1,948✔
3225
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,948✔
3226
        if tlsRequired {
2,031✔
3227
                if remote.TLSConfig != nil {
133✔
3228
                        tlsConfig = remote.TLSConfig.Clone()
50✔
3229
                } else {
83✔
3230
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
33✔
3231
                }
33✔
3232
                tlsName = remote.tlsName
83✔
3233
                tlsTimeout = remote.TLSTimeout
83✔
3234
                if tlsTimeout == 0 {
133✔
3235
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
50✔
3236
                }
50✔
3237
        }
3238

3239
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,948✔
3240
}
3241

3242
// Initiates the LeafNode Websocket connection by:
3243
// - doing the TLS handshake if needed
3244
// - sending the HTTP request
3245
// - waiting for the HTTP response
3246
//
3247
// Since some bufio reader is used to consume the HTTP response, this function
3248
// returns the slice of buffered bytes (if any) so that the readLoop that will
3249
// be started after that consume those first before reading from the socket.
3250
// The boolean
3251
//
3252
// Lock held on entry.
3253
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3254
        remote.RLock()
50✔
3255
        compress := remote.Websocket.Compression
50✔
3256
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3257
        noMasking := remote.Websocket.NoMasking
50✔
3258
        infoTimeout := remote.FirstInfoTimeout
50✔
3259
        remote.RUnlock()
50✔
3260
        // Will do the client-side TLS handshake if needed.
50✔
3261
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3262
        if err != nil {
54✔
3263
                // 0 will indicate that the connection was already closed
4✔
3264
                return nil, 0, err
4✔
3265
        }
4✔
3266

3267
        // For http request, we need the passed URL to contain either http or https scheme.
3268
        scheme := "http"
46✔
3269
        if tlsRequired {
54✔
3270
                scheme = "https"
8✔
3271
        }
8✔
3272
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3273
        // create a LEAF connection, not a CLIENT.
3274
        // In case we use the user's URL path in the future, make sure we append the user's
3275
        // path to our `/leafnode` path.
3276
        lpath := leafNodeWSPath
46✔
3277
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3278
                if curPath[0] == '/' {
42✔
3279
                        curPath = curPath[1:]
21✔
3280
                }
21✔
3281
                lpath = path.Join(curPath, lpath)
21✔
3282
        } else {
25✔
3283
                lpath = lpath[1:]
25✔
3284
        }
25✔
3285
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3286
        u, _ := url.Parse(ustr)
46✔
3287
        req := &http.Request{
46✔
3288
                Method:     "GET",
46✔
3289
                URL:        u,
46✔
3290
                Proto:      "HTTP/1.1",
46✔
3291
                ProtoMajor: 1,
46✔
3292
                ProtoMinor: 1,
46✔
3293
                Header:     make(http.Header),
46✔
3294
                Host:       u.Host,
46✔
3295
        }
46✔
3296
        wsKey, err := wsMakeChallengeKey()
46✔
3297
        if err != nil {
46✔
3298
                return nil, WriteError, err
×
3299
        }
×
3300

3301
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3302
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3303
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3304
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3305
        if compress {
55✔
3306
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3307
        }
9✔
3308
        if noMasking {
56✔
3309
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3310
        }
10✔
3311
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3312
        if err := req.Write(c.nc); err != nil {
46✔
3313
                return nil, WriteError, err
×
3314
        }
×
3315

3316
        var resp *http.Response
46✔
3317

46✔
3318
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3319
        resp, err = http.ReadResponse(br, req)
46✔
3320
        if err == nil &&
46✔
3321
                (resp.StatusCode != 101 ||
46✔
3322
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3323
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3324
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3325

1✔
3326
                err = fmt.Errorf("invalid websocket connection")
1✔
3327
        }
1✔
3328
        // Check compression extension...
3329
        if err == nil && c.ws.compress {
55✔
3330
                // Check that not only permessage-deflate extension is present, but that
9✔
3331
                // we also have server and client no context take over.
9✔
3332
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3333

9✔
3334
                // If server does not support compression, then simply disable it in our side.
9✔
3335
                if !srvCompress {
13✔
3336
                        c.ws.compress = false
4✔
3337
                } else if !noCtxTakeover {
9✔
3338
                        err = fmt.Errorf("compression negotiation error")
×
3339
                }
×
3340
        }
3341
        // Same for no masking...
3342
        if err == nil && noMasking {
56✔
3343
                // Check if server accepts no masking
10✔
3344
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3345
                        // Nope, need to mask our writes as any client would do.
1✔
3346
                        c.ws.maskwrite = true
1✔
3347
                }
1✔
3348
        }
3349
        if resp != nil {
76✔
3350
                resp.Body.Close()
30✔
3351
        }
30✔
3352
        if err != nil {
63✔
3353
                return nil, ReadError, err
17✔
3354
        }
17✔
3355
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3356

29✔
3357
        var preBuf []byte
29✔
3358
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3359
        if n := br.Buffered(); n != 0 {
29✔
3360
                preBuf, _ = br.Peek(n)
×
3361
        }
×
3362
        return preBuf, 0, nil
29✔
3363
}
3364

3365
const connectProcessTimeout = 2 * time.Second
3366

3367
// This is invoked for remote LEAF remote connections after processing the INFO
3368
// protocol.
3369
func (s *Server) leafNodeResumeConnectProcess(c *client) {
678✔
3370
        clusterName := s.ClusterName()
678✔
3371

678✔
3372
        c.mu.Lock()
678✔
3373
        if c.isClosed() {
678✔
3374
                c.mu.Unlock()
×
3375
                return
×
3376
        }
×
3377
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
680✔
3378
                c.mu.Unlock()
2✔
3379
                c.closeConnection(WriteError)
2✔
3380
                return
2✔
3381
        }
2✔
3382

3383
        // Spin up the write loop.
3384
        s.startGoRoutine(func() { c.writeLoop() })
1,352✔
3385

3386
        // timeout leafNodeFinishConnectProcess
3387
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
676✔
3388
                c.mu.Lock()
×
3389
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3390
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3391
                        c.mu.Unlock()
×
3392
                        return
×
3393
                }
×
3394
                clearTimer(&c.ping.tmr)
×
3395
                closed := c.isClosed()
×
3396
                c.mu.Unlock()
×
3397
                if !closed {
×
3398
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3399
                        c.closeConnection(StaleConnection)
×
3400
                }
×
3401
        })
3402
        c.mu.Unlock()
676✔
3403
        c.Debugf("Remote leafnode connect msg sent")
676✔
3404
}
3405

3406
// This is invoked for remote LEAF connections after processing the INFO
3407
// protocol and leafNodeResumeConnectProcess.
3408
// This will send LS+ the CONNECT protocol and register the leaf node.
3409
func (s *Server) leafNodeFinishConnectProcess(c *client) {
642✔
3410
        c.mu.Lock()
642✔
3411
        if !c.flags.setIfNotSet(connectProcessFinished) {
642✔
3412
                c.mu.Unlock()
×
3413
                return
×
3414
        }
×
3415
        if c.isClosed() {
642✔
3416
                c.mu.Unlock()
×
3417
                s.removeLeafNodeConnection(c)
×
3418
                return
×
3419
        }
×
3420
        remote := c.leaf.remote
642✔
3421
        // Check if we will need to send the system connect event.
642✔
3422
        remote.RLock()
642✔
3423
        sendSysConnectEvent := remote.Hub
642✔
3424
        remote.RUnlock()
642✔
3425

642✔
3426
        // Capture account before releasing lock
642✔
3427
        acc := c.acc
642✔
3428
        // cancel connectProcessTimeout
642✔
3429
        clearTimer(&c.ping.tmr)
642✔
3430
        c.mu.Unlock()
642✔
3431

642✔
3432
        // Make sure we register with the account here.
642✔
3433
        if err := c.registerWithAccount(acc); err != nil {
644✔
3434
                if err == ErrTooManyAccountConnections {
2✔
3435
                        c.maxAccountConnExceeded()
×
3436
                        return
×
3437
                } else if err == ErrLeafNodeLoop {
4✔
3438
                        c.handleLeafNodeLoop(true)
2✔
3439
                        return
2✔
3440
                }
2✔
3441
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3442
                c.closeConnection(ProtocolViolation)
×
3443
                return
×
3444
        }
3445
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
640✔
3446
        s.initLeafNodeSmapAndSendSubs(c)
640✔
3447
        if sendSysConnectEvent {
656✔
3448
                s.sendLeafNodeConnect(acc)
16✔
3449
        }
16✔
3450

3451
        // The above functions are not atomically under the client
3452
        // lock doing those operations. It is possible - since we
3453
        // have started the read/write loops - that the connection
3454
        // is closed before or in between. This would leave the
3455
        // closed LN connection possible registered with the account
3456
        // and/or the server's leafs map. So check if connection
3457
        // is closed, and if so, manually cleanup.
3458
        c.mu.Lock()
640✔
3459
        closed := c.isClosed()
640✔
3460
        if !closed {
1,279✔
3461
                c.setFirstPingTimer()
639✔
3462
        }
639✔
3463
        c.mu.Unlock()
640✔
3464
        if closed {
641✔
3465
                s.removeLeafNodeConnection(c)
1✔
3466
                if prev := acc.removeClient(c); prev == 1 {
2✔
3467
                        s.decActiveAccounts()
1✔
3468
                }
1✔
3469
        }
3470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc