• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 27256811330

09 Jun 2026 03:22PM UTC coverage: 76.412% (-4.5%) from 80.917%
27256811330

push

github

web-flow
De-flake TestJetStreamClusterProposeFailureDoesNotDriftClseq (#8292)

The test forced proposal failures by temporarily swapping the Raft node
state to Follower. That made Propose return errNotLeader, but it also
let the running Raft loop observe follower state and drain or stall real
proposals, causing a follow up publish to time out:

```
=== NAME  TestJetStreamClusterProposeFailureDoesNotDriftClseq
    jetstream_cluster_3_test.go:10695: require no error, but got: nats: timeout
--- FAIL: TestJetStreamClusterProposeFailureDoesNotDriftClseq (10.46s)
```

71603 of 93706 relevant lines covered (76.41%)

459367.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.65
/server/leafnode.go
1
// Copyright 2019-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "regexp"
31
        "runtime"
32
        "strconv"
33
        "strings"
34
        "sync"
35
        "sync/atomic"
36
        "time"
37

38
        "github.com/klauspost/compress/s2"
39
        "github.com/nats-io/jwt/v2"
40
        "github.com/nats-io/nkeys"
41
        "github.com/nats-io/nuid"
42
)
43

44
const (
45
        // Warning when user configures leafnode TLS insecure
46
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
47

48
        // When a loop is detected, delay the reconnect of solicited connection.
49
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
50

51
        // When a server receives a message causing a permission violation, the
52
        // connection is closed and it won't attempt to reconnect for that long.
53
        leafNodeReconnectAfterPermViolation = 30 * time.Second
54

55
        // When we have the same cluster name as the hub.
56
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
57

58
        // Prefix for loop detection subject
59
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
60

61
        // Path added to URL to indicate to WS server that the connection is a
62
        // LEAF connection as opposed to a CLIENT.
63
        leafNodeWSPath = "/leafnode"
64

65
        // When a soliciting leafnode is rejected because it does not meet the
66
        // configured minimum version, delay the next reconnect attempt by this long.
67
        leafNodeMinVersionReconnectDelay = 5 * time.Second
68
)
69

70
type leaf struct {
71
        // We have any auth stuff here for solicited connections.
72
        remote *leafNodeCfg
73
        // isSpoke tells us what role we are playing.
74
        // Used when we receive a connection but otherside tells us they are a hub.
75
        isSpoke bool
76
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
77
        remoteCluster string
78
        // remoteServer holds onto the remote server's name or ID.
79
        remoteServer string
80
        // domain name of remote server
81
        remoteDomain string
82
        // account name of remote server
83
        remoteAccName string
84
        // Whether or not we want to propagate east-west interest from other LNs.
85
        isolated bool
86
        // Used to suppress sub and unsub interest. Same as routes but our audience
87
        // here is tied to this leaf node. This will hold all subscriptions except this
88
        // leaf nodes. This represents all the interest we want to send to the other side.
89
        smap map[string]int32
90
        // This map will contain all the subscriptions that have been added to the smap
91
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
92
        // race between processing of a sub where sub is added to account sublist but
93
        // updateSmap has not be called on that "thread", while in the LN readloop,
94
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
95
        // this subscription to smap. When processing of the sub then calls updateSmap,
96
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
97
        tsub  map[*subscription]struct{}
98
        tsubt *time.Timer
99
        // Selected compression mode, which may be different from the server configured mode.
100
        compression string
101
        // This is for GW map replies.
102
        gwSub *subscription
103
}
104

105
// Used for remote (solicited) leafnodes.
106
type leafNodeCfg struct {
107
        sync.RWMutex
108
        *RemoteLeafOpts
109
        urls           []*url.URL
110
        curURL         *url.URL
111
        tlsName        string
112
        username       string
113
        password       string
114
        perms          *Permissions
115
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
116
        jsMigrateTimer *time.Timer
117
        quitCh         chan struct{}
118
        removed        bool
119
        connInProgress bool
120
}
121

122
// Check to see if this is a solicited leafnode. We do special processing for solicited.
123
func (c *client) isSolicitedLeafNode() bool {
1,228✔
124
        return c.kind == LEAF && c.leaf != nil && c.leaf.remote != nil
1,228✔
125
}
1,228✔
126

127
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
128
// connection leafnode where the otherside has declared itself to be the hub.
129
func (c *client) isSpokeLeafNode() bool {
11,218,229✔
130
        return c.kind == LEAF && c.leaf != nil && c.leaf.isSpoke
11,218,229✔
131
}
11,218,229✔
132

133
func (c *client) isHubLeafNode() bool {
13,595✔
134
        return c.kind == LEAF && c.leaf != nil && !c.leaf.isSpoke
13,595✔
135
}
13,595✔
136

137
func (c *client) isIsolatedLeafNode() bool {
7,962✔
138
        // TODO(nat): In future we may want to pass in and consider an isolation
7,962✔
139
        // group name here, which the hub and/or leaf could provide, so that we
7,962✔
140
        // can isolate away certain LNs but not others on an opt-in basis. For
7,962✔
141
        // now we will just isolate all LN interest until then.
7,962✔
142
        return c.kind == LEAF && c.leaf != nil && c.leaf.isolated
7,962✔
143
}
7,962✔
144

145
// This will spin up go routines to solicit the remote leaf node connections.
146
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
222✔
147
        sysAccName := _EMPTY_
222✔
148
        sAcc := s.SystemAccount()
222✔
149
        if sAcc != nil {
422✔
150
                sysAccName = sAcc.Name
200✔
151
        }
200✔
152
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
546✔
153
                s.mu.Lock()
324✔
154
                remote := newLeafNodeCfg(r)
324✔
155
                creds := remote.Credentials
324✔
156
                accName := remote.LocalAccount
324✔
157
                if s.leafRemoteCfgs == nil {
546✔
158
                        s.leafRemoteCfgs = make(map[*leafNodeCfg]struct{})
222✔
159
                }
222✔
160
                s.leafRemoteCfgs[remote] = struct{}{}
324✔
161
                // Print notice if
324✔
162
                if isSysAccRemote {
406✔
163
                        if len(remote.DenyExports) > 0 {
83✔
164
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
165
                        }
1✔
166
                        if len(remote.DenyImports) > 0 {
83✔
167
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
168
                        }
1✔
169
                }
170
                s.mu.Unlock()
324✔
171
                if creds != _EMPTY_ {
364✔
172
                        contents, err := os.ReadFile(creds)
40✔
173
                        defer wipeSlice(contents)
40✔
174
                        if err != nil {
40✔
175
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
176
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
40✔
177
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
178
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
40✔
179
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
180
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
40✔
181
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
182
                        } else if isSysAccRemote {
44✔
183
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
184
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
185
                                }
1✔
186
                        } else {
36✔
187
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
40✔
188
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
4✔
189
                                }
4✔
190
                        }
191
                }
192
                return remote
324✔
193
        }
194
        for _, r := range remotes {
546✔
195
                // We need to call this, even if the leaf is disabled. This is so that
324✔
196
                // the number of internal configuration matches the options' remote leaf
324✔
197
                // configuration required for configuration reload.
324✔
198
                remote := addRemote(r, r.LocalAccount == sysAccName)
324✔
199
                if !r.Disabled {
648✔
200
                        s.connectToRemoteLeafNodeAsynchronously(remote, true)
324✔
201
                }
324✔
202
        }
203
}
204

205
// Ensure that leafnode is properly configured.
206
func validateLeafNode(o *Options) error {
7,050✔
207
        if err := validateLeafNodeAuthOptions(o); err != nil {
7,050✔
208
                return err
×
209
        }
×
210

211
        if len(o.LeafNode.Remotes) > 0 {
7,272✔
212
                names := make(map[string]struct{})
222✔
213
                // Check for duplicate remotes, also, users can bind to any local account,
222✔
214
                // if its empty we will assume the $G account.
222✔
215
                for _, r := range o.LeafNode.Remotes {
546✔
216
                        if r.LocalAccount == _EMPTY_ {
496✔
217
                                r.LocalAccount = globalAccountName
172✔
218
                        }
172✔
219
                        rn := r.name()
324✔
220
                        if _, dup := names[rn]; dup {
324✔
221
                                return fmt.Errorf("duplicate remote %s", r.safeName())
×
222
                        }
×
223
                        names[rn] = struct{}{}
324✔
224
                }
225
        }
226

227
        // In local config mode, check that leafnode configuration refers to accounts that exist.
228
        if len(o.TrustedOperators) == 0 {
13,835✔
229
                accNames := map[string]struct{}{}
6,785✔
230
                for _, a := range o.Accounts {
15,012✔
231
                        accNames[a.Name] = struct{}{}
8,227✔
232
                }
8,227✔
233
                // global account is always created
234
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
6,785✔
235
                // in the context of leaf nodes, empty account means global account
6,785✔
236
                accNames[_EMPTY_] = struct{}{}
6,785✔
237
                // system account either exists or, if not disabled, will be created
6,785✔
238
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
12,221✔
239
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
5,436✔
240
                }
5,436✔
241
                checkAccountExists := func(accName string, cfgType string) error {
13,895✔
242
                        if _, ok := accNames[accName]; !ok {
7,110✔
243
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
×
244
                        }
×
245
                        return nil
7,110✔
246
                }
247
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
6,785✔
248
                        return err
×
249
                }
×
250
                for _, lu := range o.LeafNode.Users {
6,793✔
251
                        if lu.Account == nil { // means global account
15✔
252
                                continue
7✔
253
                        }
254
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
1✔
255
                                return err
×
256
                        }
×
257
                }
258
                for _, r := range o.LeafNode.Remotes {
7,109✔
259
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
324✔
260
                                return err
×
261
                        }
×
262
                }
263
        } else {
265✔
264
                if len(o.LeafNode.Users) != 0 {
265✔
265
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
×
266
                }
×
267
                for _, r := range o.LeafNode.Remotes {
265✔
268
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
×
269
                                return fmt.Errorf(
×
270
                                        "operator mode requires account nkeys in remotes. " +
×
271
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
×
272
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
×
273
                        }
×
274
                }
275
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
265✔
276
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
×
277
                }
×
278
        }
279

280
        // Validate compression settings
281
        if o.LeafNode.Compression.Mode != _EMPTY_ {
11,183✔
282
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,133✔
283
                        return err
×
284
                }
×
285
        }
286

287
        // If a remote has a websocket scheme, all need to have it.
288
        for _, rcfg := range o.LeafNode.Remotes {
7,374✔
289
                // Validate proxy configuration
324✔
290
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
324✔
291
                        return err
×
292
                }
×
293

294
                if len(rcfg.URLs) >= 2 {
467✔
295
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
143✔
296
                        for i := 1; i < len(rcfg.URLs); i++ {
403✔
297
                                u := rcfg.URLs[i]
260✔
298
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
260✔
299
                                        ok = false
×
300
                                        break
×
301
                                }
302
                        }
303
                        if !ok {
143✔
304
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
×
305
                        }
×
306
                }
307
                if !wsAllowedFIPS() {
324✔
308
                        for _, u := range rcfg.URLs {
×
309
                                if isWSURL(u) {
×
310
                                        return fmt.Errorf("remote leaf node URL %q cannot be used in FIPS-140 mode when built with this Go version, use Go 1.26 or later", redactURLString(u.String()))
×
311
                                }
×
312
                        }
313
                }
314
                // Validate compression settings
315
                if rcfg.Compression.Mode != _EMPTY_ {
648✔
316
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
324✔
317
                                return err
×
318
                        }
×
319
                }
320
        }
321

322
        if o.LeafNode.Port == 0 {
10,553✔
323
                return nil
3,503✔
324
        }
3,503✔
325

326
        // If MinVersion is defined, check that it is valid.
327
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,547✔
328
                if err := checkLeafMinVersionConfig(mv); err != nil {
×
329
                        return err
×
330
                }
×
331
        }
332

333
        // The checks below will be done only when detecting that we are configured
334
        // with gateways. So if an option validation needs to be done regardless,
335
        // it MUST be done before this point!
336

337
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,478✔
338
                return nil
2,931✔
339
        }
2,931✔
340
        // If we are here we have both leaf nodes and gateways defined, make sure there
341
        // is a system account defined.
342
        if o.SystemAccount == _EMPTY_ {
617✔
343
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
344
        }
1✔
345
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
615✔
346
                return fmt.Errorf("leafnode: %v", err)
×
347
        }
×
348
        return nil
615✔
349
}
350

351
func checkLeafMinVersionConfig(mv string) error {
2✔
352
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
4✔
353
                if err != nil {
3✔
354
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
1✔
355
                } else {
2✔
356
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
1✔
357
                }
1✔
358
        }
359
        return nil
×
360
}
361

362
// Used to validate user names in LeafNode configuration.
363
// - rejects mix of single and multiple users.
364
// - rejects duplicate user names.
365
func validateLeafNodeAuthOptions(o *Options) error {
7,078✔
366
        if len(o.LeafNode.Users) == 0 {
14,146✔
367
                return nil
7,068✔
368
        }
7,068✔
369
        if o.LeafNode.Username != _EMPTY_ {
11✔
370
                return fmt.Errorf("can not have a single user/pass and a users array")
1✔
371
        }
1✔
372
        if o.LeafNode.Nkey != _EMPTY_ {
9✔
373
                return fmt.Errorf("can not have a single nkey and a users array")
×
374
        }
×
375
        users := map[string]struct{}{}
9✔
376
        for _, u := range o.LeafNode.Users {
27✔
377
                if _, exists := users[u.Username]; exists {
19✔
378
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
1✔
379
                }
1✔
380
                users[u.Username] = struct{}{}
17✔
381
        }
382
        return nil
8✔
383
}
384

385
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
628✔
386
        var warnings []string
628✔
387

628✔
388
        if remote.Proxy.URL == _EMPTY_ {
1,251✔
389
                return warnings, nil
623✔
390
        }
623✔
391

392
        proxyURL, err := url.Parse(remote.Proxy.URL)
5✔
393
        if err != nil {
5✔
394
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
×
395
        }
×
396

397
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
6✔
398
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
1✔
399
        }
1✔
400

401
        if proxyURL.Host == _EMPTY_ {
5✔
402
                return warnings, fmt.Errorf("proxy URL must specify a host")
1✔
403
        }
1✔
404

405
        if remote.Proxy.Timeout < 0 {
3✔
406
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
×
407
        }
×
408

409
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
5✔
410
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
2✔
411
        }
2✔
412

413
        if len(remote.URLs) > 0 {
2✔
414
                hasWebSocketURL := false
1✔
415
                hasNonWebSocketURL := false
1✔
416

1✔
417
                for _, remoteURL := range remote.URLs {
2✔
418
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
2✔
419
                                hasWebSocketURL = true
1✔
420
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
1✔
421
                                        remote.TLSConfig == nil && !remote.TLS {
2✔
422
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
423
                                }
1✔
424
                        } else {
×
425
                                hasNonWebSocketURL = true
×
426
                        }
×
427
                }
428

429
                if !hasWebSocketURL {
×
430
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
×
431
                } else if hasNonWebSocketURL {
×
432
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
×
433
                }
×
434
        }
435

436
        return warnings, nil
×
437
}
438

439
// Wait for the configured reconnect interval before attempting to connect
440
// again to the remote leafnode.
441
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
99✔
442
        clearInProgress := true
99✔
443
        defer func() {
197✔
444
                s.grWG.Done()
98✔
445
                if clearInProgress {
126✔
446
                        remote.setConnectInProgress(false)
28✔
447
                }
28✔
448
        }()
449
        delay := s.getOpts().LeafNode.ReconnectInterval
99✔
450
        select {
99✔
451
        case <-time.After(delay):
72✔
452
        case <-remote.quitCh:
×
453
                return
×
454
        case <-s.quitCh:
27✔
455
                return
27✔
456
        }
457
        clearInProgress = !connectToRemoteLeafNode(s, remote, false)
72✔
458
}
459

460
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
461
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
324✔
462
        cfg := &leafNodeCfg{
324✔
463
                RemoteLeafOpts: remote,
324✔
464
                urls:           make([]*url.URL, 0, len(remote.URLs)),
324✔
465
                quitCh:         make(chan struct{}, 1),
324✔
466
        }
324✔
467
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
326✔
468
                perms := &Permissions{}
2✔
469
                if len(remote.DenyExports) > 0 {
4✔
470
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
2✔
471
                }
2✔
472
                if len(remote.DenyImports) > 0 {
4✔
473
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
2✔
474
                }
2✔
475
                cfg.perms = perms
2✔
476
        }
477
        // Start with the one that is configured. We will add to this
478
        // array when receiving async leafnode INFOs.
479
        cfg.urls = append(cfg.urls, cfg.URLs...)
324✔
480
        // If allowed to randomize, do it on our copy of URLs
324✔
481
        if !remote.NoRandomize {
648✔
482
                rand.Shuffle(len(cfg.urls), func(i, j int) {
584✔
483
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
260✔
484
                })
260✔
485
        }
486
        // If we are TLS make sure we save off a proper servername if possible.
487
        // Do same for user/password since we may need them to connect to
488
        // a bare URL that we get from INFO protocol.
489
        for _, u := range cfg.urls {
908✔
490
                cfg.saveTLSHostname(u)
584✔
491
                cfg.saveUserPassword(u)
584✔
492
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
584✔
493
                // config, mark that we should be using TLS anyway.
584✔
494
                if !cfg.TLS && isWSSURL(u) {
584✔
495
                        cfg.TLS = true
×
496
                }
×
497
        }
498
        return cfg
324✔
499
}
500

501
// Notifies the quit channel without blocking.
502
// No lock is needed to invoke this function.
503
func (cfg *leafNodeCfg) notifyQuitChannel() {
×
504
        select {
×
505
        case cfg.quitCh <- struct{}{}:
×
506
        default:
×
507
        }
508
}
509

510
// Sets the connect-in-progress status for this remote leaf configuration.
511
func (cfg *leafNodeCfg) setConnectInProgress(inProgress bool) {
1,083✔
512
        cfg.Lock()
1,083✔
513
        defer cfg.Unlock()
1,083✔
514
        // In both cases we want to drain the "quit" channel.
1,083✔
515
        select {
1,083✔
516
        case <-cfg.quitCh:
×
517
        default:
1,083✔
518
        }
519
        cfg.connInProgress = inProgress
1,083✔
520
}
521

522
// Returns `true` if this remote is in the middle of a connect, `false` otherwise.
523
func (cfg *leafNodeCfg) isConnectInProgress() bool {
×
524
        cfg.RLock()
×
525
        defer cfg.RUnlock()
×
526
        return cfg.connInProgress
×
527
}
×
528

529
// Mark that this remote is being removed from the configuration.
530
func (cfg *leafNodeCfg) markAsRemoved() {
×
531
        cfg.Lock()
×
532
        defer cfg.Unlock()
×
533
        // This function should be invoked only once, but protect.
×
534
        if cfg.removed {
×
535
                return
×
536
        }
×
537
        cfg.removed = true
×
538
        cfg.notifyQuitChannel()
×
539
}
540

541
// Returns false if it has been disabled or removed.
542
func (cfg *leafNodeCfg) stillValid() bool {
2,897✔
543
        cfg.RLock()
2,897✔
544
        defer cfg.RUnlock()
2,897✔
545
        return !cfg.Disabled && !cfg.removed
2,897✔
546
}
2,897✔
547

548
// Will pick an URL from the list of available URLs.
549
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
2,174✔
550
        cfg.Lock()
2,174✔
551
        defer cfg.Unlock()
2,174✔
552
        // If the current URL is the first in the list and we have more than
2,174✔
553
        // one URL, then move that one to end of the list.
2,174✔
554
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
3,968✔
555
                first := cfg.urls[0]
1,794✔
556
                copy(cfg.urls, cfg.urls[1:])
1,794✔
557
                cfg.urls[len(cfg.urls)-1] = first
1,794✔
558
        }
1,794✔
559
        cfg.curURL = cfg.urls[0]
2,174✔
560
        return cfg.curURL
2,174✔
561
}
562

563
// Returns the current URL
564
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
24✔
565
        cfg.RLock()
24✔
566
        defer cfg.RUnlock()
24✔
567
        return cfg.curURL
24✔
568
}
24✔
569

570
// Returns how long the server should wait before attempting
571
// to solicit a remote leafnode connection.
572
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
396✔
573
        cfg.RLock()
396✔
574
        delay := cfg.connDelay
396✔
575
        cfg.RUnlock()
396✔
576
        return delay
396✔
577
}
396✔
578

579
// Sets the connect delay.
580
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
120✔
581
        cfg.Lock()
120✔
582
        cfg.connDelay = delay
120✔
583
        cfg.Unlock()
120✔
584
}
120✔
585

586
// Ensure that non-exported options (used in tests) have
587
// been properly set.
588
func (s *Server) setLeafNodeNonExportedOptions() {
5,922✔
589
        opts := s.getOpts()
5,922✔
590
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
5,922✔
591
        if s.leafNodeOpts.dialTimeout == 0 {
11,844✔
592
                // Use same timeouts as routes for now.
5,922✔
593
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
5,922✔
594
        }
5,922✔
595
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
5,922✔
596
        if s.leafNodeOpts.resolver == nil {
11,844✔
597
                s.leafNodeOpts.resolver = net.DefaultResolver
5,922✔
598
        }
5,922✔
599
}
600

601
const sharedSysAccDelay = 250 * time.Millisecond
602

603
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
604
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
×
605
        proxyAddr, err := url.Parse(proxyURL)
×
606
        if err != nil {
×
607
                // This should not happen since proxy URL is validated during configuration parsing
×
608
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
609
        }
×
610

611
        // Connect to the proxy server
612
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
×
613
        if err != nil {
×
614
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
615
        }
×
616

617
        // Set deadline for the entire proxy handshake
618
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
×
619
                conn.Close()
×
620
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
621
        }
×
622

623
        req := &http.Request{
×
624
                Method: http.MethodConnect,
×
625
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
×
626
                Host:   targetHost,
×
627
                Header: make(http.Header),
×
628
        }
×
629

×
630
        // Add proxy authentication if provided
×
631
        if username != "" && password != "" {
×
632
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
×
633
        }
×
634

635
        if err := req.Write(conn); err != nil {
×
636
                conn.Close()
×
637
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
638
        }
×
639

640
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
×
641
        if err != nil {
×
642
                conn.Close()
×
643
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
644
        }
×
645

646
        if resp.StatusCode != http.StatusOK {
×
647
                resp.Body.Close()
×
648
                conn.Close()
×
649
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
×
650
        }
×
651

652
        // Close the response body
653
        resp.Body.Close()
×
654

×
655
        // Clear the deadline now that we've finished the proxy handshake
×
656
        if err := conn.SetDeadline(time.Time{}); err != nil {
×
657
                conn.Close()
×
658
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
659
        }
×
660

661
        return conn, nil
×
662
}
663

664
// Connect to a remote leaf node asynchronously (that is, this function will do
665
// the connect in a go routine).
666
func (s *Server) connectToRemoteLeafNodeAsynchronously(remote *leafNodeCfg, firstConnect bool) {
324✔
667
        remote.setConnectInProgress(true)
324✔
668
        s.startGoRoutine(func() {
648✔
669
                defer s.grWG.Done()
324✔
670
                if !connectToRemoteLeafNode(s, remote, firstConnect) {
332✔
671
                        remote.setConnectInProgress(false)
8✔
672
                }
8✔
673
        })
674
}
675

676
// Connect to a remote leaf node. Should only be invoked from
677
// `s.connectToRemoteLeafNodeAsynchronously()` or `s.reConnectToRemoteLeafNode()`.
678
// Returns `true` if this function invoked `s.createLeafNode()`, false otherwise.
679
func connectToRemoteLeafNode(s *Server, remote *leafNodeCfg, firstConnect bool) bool {
396✔
680

396✔
681
        if remote == nil || len(remote.URLs) == 0 {
396✔
682
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
683
                return false
×
684
        }
×
685

686
        opts := s.getOpts()
396✔
687
        reconnectDelay := opts.LeafNode.ReconnectInterval
396✔
688
        s.mu.RLock()
396✔
689
        dialTimeout := s.leafNodeOpts.dialTimeout
396✔
690
        resolver := s.leafNodeOpts.resolver
396✔
691
        var isSysAcc bool
396✔
692
        if s.eventsEnabled() {
764✔
693
                isSysAcc = remote.LocalAccount == s.sys.account.Name
368✔
694
        }
368✔
695
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
396✔
696
        s.mu.RUnlock()
396✔
697

396✔
698
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
396✔
699
        if firstConnect && isSysAcc && !s.standAloneMode() {
456✔
700
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
60✔
701
                remote.setConnectDelay(sharedSysAccDelay)
60✔
702
        }
60✔
703

704
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
456✔
705
                select {
60✔
706
                case <-time.After(connDelay):
57✔
707
                case <-remote.quitCh:
×
708
                        return false
×
709
                case <-s.quitCh:
3✔
710
                        return false
3✔
711
                }
712
                remote.setConnectDelay(0)
57✔
713
        }
714

715
        var conn net.Conn
393✔
716

393✔
717
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
393✔
718

393✔
719
        // Capture proxy configuration once before the loop with proper locking
393✔
720
        remote.RLock()
393✔
721
        proxyURL := remote.Proxy.URL
393✔
722
        proxyUsername := remote.Proxy.Username
393✔
723
        proxyPassword := remote.Proxy.Password
393✔
724
        proxyTimeout := remote.Proxy.Timeout
393✔
725
        remote.RUnlock()
393✔
726

393✔
727
        // Set default proxy timeout if not specified
393✔
728
        if proxyTimeout == 0 {
786✔
729
                proxyTimeout = dialTimeout
393✔
730
        }
393✔
731

732
        attempts := 0
393✔
733

393✔
734
        // In case the migrate timer was created but not canceled, do it when
393✔
735
        // this function exits. Note that the timer would not be created if
393✔
736
        // `jetstreamMigrateDelay == 0`.
393✔
737
        if jetstreamMigrateDelay > 0 {
401✔
738
                defer remote.cancelMigrateTimer()
8✔
739
        }
8✔
740

741
        reconnectTimer := time.NewTimer(reconnectDelay)
393✔
742
        reconnectTimer.Stop()
393✔
743
        defer stopAndClearTimer(&reconnectTimer)
393✔
744

393✔
745
        for s.isRunning() && remote.stillValid() {
2,567✔
746
                rURL := remote.pickNextURL()
2,174✔
747
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
2,174✔
748
                if err == nil {
4,345✔
749
                        var ipStr string
2,171✔
750
                        if url != rURL.Host {
2,173✔
751
                                ipStr = fmt.Sprintf(" (%s)", url)
2✔
752
                        }
2✔
753
                        // Some test may want to disable remotes from connecting
754
                        if s.isLeafConnectDisabled() {
2,299✔
755
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
128✔
756
                                err = ErrLeafNodeDisabled
128✔
757
                        } else {
2,171✔
758
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
2,043✔
759

2,043✔
760
                                // Check if proxy is configured
2,043✔
761
                                if proxyURL != _EMPTY_ {
2,043✔
762
                                        targetHost := rURL.Host
×
763
                                        // If URL doesn't include port, add the default port for the scheme
×
764
                                        if rURL.Port() == _EMPTY_ {
×
765
                                                defaultPort := "80"
×
766
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
767
                                                        defaultPort = "443"
×
768
                                                }
×
769
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
770
                                        }
771

772
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
×
773
                                } else {
2,043✔
774
                                        // Direct connection
2,043✔
775
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
2,043✔
776
                                }
2,043✔
777
                        }
778
                }
779
                if err != nil {
3,962✔
780
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
1,788✔
781
                        delay := reconnectDelay + jitter
1,788✔
782
                        attempts++
1,788✔
783
                        if s.shouldReportConnectErr(firstConnect, attempts) {
3,576✔
784
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
1,788✔
785
                        } else {
1,788✔
786
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
×
787
                        }
×
788
                        remote.Lock()
1,788✔
789
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
1,788✔
790
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
1,796✔
791
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
792
                                        s.checkJetStreamMigrate(remote)
8✔
793
                                })
8✔
794
                        }
795
                        remote.Unlock()
1,788✔
796
                        reconnectTimer.Reset(delay)
1,788✔
797
                        select {
1,788✔
798
                        case <-s.quitCh:
6✔
799
                                return false
6✔
800
                        case <-remote.quitCh:
×
801
                                return false
×
802
                        case <-reconnectTimer.C:
1,781✔
803
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
1,781✔
804
                                // This will be used if JetStreamClusterMigrateDelay was not set
1,781✔
805
                                if jetstreamMigrateDelay == 0 {
3,489✔
806
                                        s.checkJetStreamMigrate(remote)
1,708✔
807
                                }
1,708✔
808
                                continue
1,781✔
809
                        }
810
                }
811
                remote.cancelMigrateTimer()
386✔
812
                // We can check here, but really we will have to check again when the server
386✔
813
                // is about to add to the `s.leafs` map later in the process.
386✔
814
                if !remote.stillValid() {
386✔
815
                        conn.Close()
×
816
                        return false
×
817
                }
×
818

819
                // We have a connection here to a remote server.
820
                // Go ahead and create our leaf node and return.
821
                s.createLeafNode(conn, rURL, remote, nil)
386✔
822

386✔
823
                // Clear any observer states if we had them.
386✔
824
                s.clearObserverState(remote)
386✔
825

386✔
826
                return true
386✔
827
        }
828

829
        return false
×
830
}
831

832
func (cfg *leafNodeCfg) cancelMigrateTimer() {
394✔
833
        cfg.Lock()
394✔
834
        stopAndClearTimer(&cfg.jsMigrateTimer)
394✔
835
        cfg.Unlock()
394✔
836
}
394✔
837

838
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
839
func (s *Server) clearObserverState(remote *leafNodeCfg) {
386✔
840
        s.mu.RLock()
386✔
841
        accName := remote.LocalAccount
386✔
842
        s.mu.RUnlock()
386✔
843

386✔
844
        acc, err := s.LookupAccount(accName)
386✔
845
        if err != nil {
386✔
846
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
×
847
                return
×
848
        }
×
849

850
        acc.jscmMu.Lock()
386✔
851
        defer acc.jscmMu.Unlock()
386✔
852

386✔
853
        // Walk all streams looking for any clustered stream, skip otherwise.
386✔
854
        for _, mset := range acc.streams() {
406✔
855
                node := mset.raftNode()
20✔
856
                if node == nil {
32✔
857
                        // Not R>1
12✔
858
                        continue
12✔
859
                }
860
                // Check consumers
861
                for _, o := range mset.getConsumers() {
10✔
862
                        if n := o.raftNode(); n != nil {
4✔
863
                                // Ensure we can become a leader again.
2✔
864
                                n.SetObserver(false)
2✔
865
                        }
2✔
866
                }
867
                // Ensure we can not become a leader again.
868
                node.SetObserver(false)
8✔
869
        }
870
}
871

872
// Check to see if we should migrate any assets from this account.
873
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
1,716✔
874
        s.mu.RLock()
1,716✔
875
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
1,716✔
876
        s.mu.RUnlock()
1,716✔
877

1,716✔
878
        if !shouldMigrate {
3,369✔
879
                return
1,653✔
880
        }
1,653✔
881

882
        acc, err := s.LookupAccount(accName)
63✔
883
        if err != nil {
63✔
884
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
885
                return
×
886
        }
×
887

888
        acc.jscmMu.Lock()
63✔
889
        defer acc.jscmMu.Unlock()
63✔
890

63✔
891
        // Walk all streams looking for any clustered stream, skip otherwise.
63✔
892
        // If we are the leader force stepdown.
63✔
893
        for _, mset := range acc.streams() {
94✔
894
                node := mset.raftNode()
31✔
895
                if node == nil {
31✔
896
                        // Not R>1
×
897
                        continue
×
898
                }
899
                // Collect any consumers
900
                for _, o := range mset.getConsumers() {
51✔
901
                        if n := o.raftNode(); n != nil {
40✔
902
                                n.StepDown()
20✔
903
                                // Ensure we can not become a leader while in this state.
20✔
904
                                n.SetObserver(true)
20✔
905
                        }
20✔
906
                }
907
                // Stepdown if this stream was leader.
908
                node.StepDown()
31✔
909
                // Ensure we can not become a leader while in this state.
31✔
910
                node.SetObserver(true)
31✔
911
        }
912
}
913

914
// Helper for checking.
915
func (s *Server) isLeafConnectDisabled() bool {
2,171✔
916
        s.mu.RLock()
2,171✔
917
        defer s.mu.RUnlock()
2,171✔
918
        return s.leafDisableConnect
2,171✔
919
}
2,171✔
920

921
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
922
// come from the server we connect to.
923
//
924
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
925
// However, this was causing failures for users that did not set the scheme (and
926
// their remote connections did not have a tls{} block).
927
// We now save the host name regardless in case the remote returns an INFO indicating
928
// that TLS is required.
929
//
930
// Lock held on entry.
931
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
827✔
932
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
832✔
933
                cfg.tlsName = u.Hostname()
5✔
934
        }
5✔
935
}
936

937
// Save off the username/password for when we connect using a bare URL
938
// that we get from the INFO protocol.
939
//
940
// Lock held on entry.
941
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
584✔
942
        if cfg.username == _EMPTY_ && u.User != nil {
759✔
943
                cfg.username = u.User.Username()
175✔
944
                cfg.password, _ = u.User.Password()
175✔
945
        }
175✔
946
}
947

948
// This starts the leafnode accept loop in a go routine, unless it
949
// is detected that the server has already been shutdown.
950
func (s *Server) startLeafNodeAcceptLoop() {
3,536✔
951
        // Snapshot server options.
3,536✔
952
        opts := s.getOpts()
3,536✔
953

3,536✔
954
        port := opts.LeafNode.Port
3,536✔
955
        if port == -1 {
7,012✔
956
                port = 0
3,476✔
957
        }
3,476✔
958

959
        if s.isShuttingDown() {
3,536✔
960
                return
×
961
        }
×
962

963
        s.mu.Lock()
3,536✔
964
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,536✔
965
        l, e := natsListen("tcp", hp)
3,536✔
966
        s.leafNodeListenerErr = e
3,536✔
967
        if e != nil {
3,536✔
968
                s.mu.Unlock()
×
969
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
970
                return
×
971
        }
×
972

973
        s.Noticef("Listening for leafnode connections on %s",
3,536✔
974
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,536✔
975

3,536✔
976
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,536✔
977
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,536✔
978
        // Do not set compression in this Info object, it would possibly cause
3,536✔
979
        // issues when sending asynchronous INFO to the remote.
3,536✔
980
        info := Info{
3,536✔
981
                ID:            s.info.ID,
3,536✔
982
                Name:          s.info.Name,
3,536✔
983
                Version:       s.info.Version,
3,536✔
984
                GitCommit:     gitCommit,
3,536✔
985
                GoVersion:     runtime.Version(),
3,536✔
986
                AuthRequired:  true,
3,536✔
987
                TLSRequired:   tlsRequired,
3,536✔
988
                TLSVerify:     tlsVerify,
3,536✔
989
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,536✔
990
                Headers:       s.supportsHeaders(),
3,536✔
991
                JetStream:     opts.JetStream,
3,536✔
992
                Domain:        opts.JetStreamDomain,
3,536✔
993
                Proto:         s.getServerProto(),
3,536✔
994
                InfoOnConnect: true,
3,536✔
995
                JSApiLevel:    JSApiLevel,
3,536✔
996
        }
3,536✔
997
        // If we have selected a random port...
3,536✔
998
        if port == 0 {
7,012✔
999
                // Write resolved port back to options.
3,476✔
1000
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,476✔
1001
        }
3,476✔
1002

1003
        s.leafNodeInfo = info
3,536✔
1004
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,536✔
1005
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,536✔
1006
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
1007
                l.Close()
×
1008
                s.mu.Unlock()
×
1009
                return
×
1010
        }
×
1011
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,536✔
1012
        s.generateLeafNodeInfoJSON()
3,536✔
1013

3,536✔
1014
        // Setup state that can enable shutdown
3,536✔
1015
        s.leafNodeListener = l
3,536✔
1016

3,536✔
1017
        // As of now, a server that does not have remotes configured would
3,536✔
1018
        // never solicit a connection, so we should not have to warn if
3,536✔
1019
        // InsecureSkipVerify is set in main LeafNodes config (since
3,536✔
1020
        // this TLS setting matters only when soliciting a connection).
3,536✔
1021
        // Still, warn if insecure is set in any of LeafNode block.
3,536✔
1022
        // We need to check remotes, even if tls is not required on accept.
3,536✔
1023
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,536✔
1024
        if !warn {
7,072✔
1025
                for _, r := range opts.LeafNode.Remotes {
3,562✔
1026
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
26✔
1027
                                warn = true
×
1028
                                break
×
1029
                        }
1030
                }
1031
        }
1032
        if warn {
3,536✔
1033
                s.Warnf(leafnodeTLSInsecureWarning)
×
1034
        }
×
1035
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,053✔
1036
        s.mu.Unlock()
3,536✔
1037
}
1038

1039
// RegEx to match a creds file with user JWT and Seed.
1040
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
1041

1042
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
1043
// Lock should be held entering here.
1044
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
367✔
1045
        // We support basic user/pass and operator based user JWT with signatures.
367✔
1046
        cinfo := leafConnectInfo{
367✔
1047
                Version:       VERSION,
367✔
1048
                ID:            c.srv.info.ID,
367✔
1049
                Domain:        c.srv.info.Domain,
367✔
1050
                Name:          c.srv.info.Name,
367✔
1051
                Hub:           c.leaf.remote.Hub,
367✔
1052
                Cluster:       clusterName,
367✔
1053
                Headers:       headers,
367✔
1054
                JetStream:     c.acc.jetStreamConfigured(),
367✔
1055
                DenyPub:       c.leaf.remote.DenyImports,
367✔
1056
                Compression:   c.leaf.compression,
367✔
1057
                RemoteAccount: c.acc.GetName(),
367✔
1058
                Proto:         c.srv.getServerProto(),
367✔
1059
                Isolate:       c.leaf.remote.RequestIsolation,
367✔
1060
        }
367✔
1061

367✔
1062
        // If a signature callback is specified, this takes precedence over anything else.
367✔
1063
        if cb := c.leaf.remote.SignatureCB; cb != nil {
369✔
1064
                nonce := c.nonce
2✔
1065
                c.mu.Unlock()
2✔
1066
                jwt, sigraw, err := cb(nonce)
2✔
1067
                c.mu.Lock()
2✔
1068
                if err == nil && c.isClosed() {
2✔
1069
                        err = ErrConnectionClosed
×
1070
                }
×
1071
                if err != nil {
2✔
1072
                        c.Errorf("Error signing the nonce: %v", err)
×
1073
                        return err
×
1074
                }
×
1075
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
2✔
1076
                cinfo.JWT, cinfo.Sig = jwt, sig
2✔
1077

1078
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
409✔
1079
                // Check for credentials first, that will take precedence..
44✔
1080
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
44✔
1081
                contents, err := os.ReadFile(creds)
44✔
1082
                if err != nil {
44✔
1083
                        c.Errorf("%v", err)
×
1084
                        return err
×
1085
                }
×
1086
                defer wipeSlice(contents)
44✔
1087
                items := credsRe.FindAllSubmatch(contents, -1)
44✔
1088
                if len(items) < 2 {
44✔
1089
                        c.Errorf("Credentials file malformed")
×
1090
                        return err
×
1091
                }
×
1092
                // First result should be the user JWT.
1093
                // We copy here so that the file containing the seed will be wiped appropriately.
1094
                raw := items[0][1]
44✔
1095
                tmp := make([]byte, len(raw))
44✔
1096
                copy(tmp, raw)
44✔
1097
                // Seed is second item.
44✔
1098
                kp, err := nkeys.FromSeed(items[1][1])
44✔
1099
                if err != nil {
44✔
1100
                        c.Errorf("Credentials file has malformed seed")
×
1101
                        return err
×
1102
                }
×
1103
                // Wipe our key on exit.
1104
                defer kp.Wipe()
44✔
1105

44✔
1106
                sigraw, _ := kp.Sign(c.nonce)
44✔
1107
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
44✔
1108
                cinfo.JWT = bytesToString(tmp)
44✔
1109
                cinfo.Sig = sig
44✔
1110
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
324✔
1111
                kp, err := nkeys.FromSeed([]byte(nkey))
3✔
1112
                if err != nil {
3✔
1113
                        c.Errorf("Remote nkey has malformed seed")
×
1114
                        return err
×
1115
                }
×
1116
                // Wipe our key on exit.
1117
                defer kp.Wipe()
3✔
1118
                sigraw, _ := kp.Sign(c.nonce)
3✔
1119
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1120
                pkey, _ := kp.PublicKey()
3✔
1121
                cinfo.Nkey = pkey
3✔
1122
                cinfo.Sig = sig
3✔
1123
        }
1124
        // In addition, and this is to allow auth callout, set user/password or
1125
        // token if applicable.
1126
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
560✔
1127
                cinfo.User = userInfo.Username()
193✔
1128
                var ok bool
193✔
1129
                cinfo.Pass, ok = userInfo.Password()
193✔
1130
                // For backward compatibility, if only username is provided, set both
193✔
1131
                // Token and User, not just Token.
193✔
1132
                if !ok {
199✔
1133
                        cinfo.Token = cinfo.User
6✔
1134
                }
6✔
1135
        } else if c.leaf.remote.username != _EMPTY_ {
177✔
1136
                cinfo.User = c.leaf.remote.username
3✔
1137
                cinfo.Pass = c.leaf.remote.password
3✔
1138
                // For backward compatibility, if only username is provided, set both
3✔
1139
                // Token and User, not just Token.
3✔
1140
                if cinfo.Pass == _EMPTY_ {
3✔
1141
                        cinfo.Token = cinfo.User
×
1142
                }
×
1143
        }
1144
        b, err := json.Marshal(cinfo)
367✔
1145
        if err != nil {
367✔
1146
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1147
                return err
×
1148
        }
×
1149
        // Although this call is made before the writeLoop is created,
1150
        // we don't really need to send in place. The protocol will be
1151
        // sent out by the writeLoop.
1152
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
367✔
1153
        return nil
367✔
1154
}
1155

1156
// Makes a deep copy of the LeafNode Info structure.
1157
// The server lock is held on entry.
1158
func (s *Server) copyLeafNodeInfo() *Info {
1,457✔
1159
        clone := s.leafNodeInfo
1,457✔
1160
        // Copy the array of urls.
1,457✔
1161
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
2,649✔
1162
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
1,192✔
1163
        }
1,192✔
1164
        return &clone
1,457✔
1165
}
1166

1167
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1168
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1169
// Returns a boolean indicating if the URL was added or not.
1170
// Server lock is held on entry
1171
func (s *Server) addLeafNodeURL(urlStr string) bool {
8,019✔
1172
        if s.leafURLsMap.addUrl(urlStr) {
16,033✔
1173
                s.generateLeafNodeInfoJSON()
8,014✔
1174
                return true
8,014✔
1175
        }
8,014✔
1176
        return false
5✔
1177
}
1178

1179
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1180
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1181
// Returns a boolean indicating if the URL was removed or not.
1182
// Server lock is held on entry.
1183
func (s *Server) removeLeafNodeURL(urlStr string) bool {
8,019✔
1184
        // Don't need to do this if we are removing the route connection because
8,019✔
1185
        // we are shuting down...
8,019✔
1186
        if s.isShuttingDown() {
12,242✔
1187
                return false
4,223✔
1188
        }
4,223✔
1189
        if s.leafURLsMap.removeUrl(urlStr) {
7,588✔
1190
                s.generateLeafNodeInfoJSON()
3,792✔
1191
                return true
3,792✔
1192
        }
3,792✔
1193
        return false
4✔
1194
}
1195

1196
// Server lock is held on entry
1197
func (s *Server) generateLeafNodeInfoJSON() {
15,342✔
1198
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,342✔
1199
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,342✔
1200
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,342✔
1201
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,342✔
1202
}
15,342✔
1203

1204
// Sends an async INFO protocol so that the connected servers can update
1205
// their list of LeafNode urls.
1206
func (s *Server) sendAsyncLeafNodeInfo() {
11,806✔
1207
        for _, c := range s.leafs {
11,849✔
1208
                c.mu.Lock()
43✔
1209
                c.enqueueProto(s.leafNodeInfoJSON)
43✔
1210
                c.mu.Unlock()
43✔
1211
        }
43✔
1212
}
1213

1214
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1215
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
903✔
1216
        // Snapshot server options.
903✔
1217
        opts := s.getOpts()
903✔
1218

903✔
1219
        maxPay := int32(opts.MaxPayload)
903✔
1220
        maxSubs := int32(opts.MaxSubs)
903✔
1221
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
903✔
1222
        if maxSubs == 0 {
1,805✔
1223
                maxSubs = -1
902✔
1224
        }
902✔
1225
        now := time.Now().UTC()
903✔
1226

903✔
1227
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
903✔
1228
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
903✔
1229
        c.leaf = &leaf{}
903✔
1230

903✔
1231
        // If the leafnode subject interest should be isolated, flag it here.
903✔
1232
        s.optsMu.RLock()
903✔
1233
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
1,289✔
1234
                c.leaf.isolated = remote.LocalIsolation
386✔
1235
        }
386✔
1236
        s.optsMu.RUnlock()
903✔
1237

903✔
1238
        // For accepted LN connections, ws will be != nil if it was accepted
903✔
1239
        // through the Websocket port.
903✔
1240
        c.ws = ws
903✔
1241

903✔
1242
        // For remote, check if the scheme starts with "ws", if so, we will initiate
903✔
1243
        // a remote Leaf Node connection as a websocket connection.
903✔
1244
        if remote != nil && rURL != nil && isWSURL(rURL) {
903✔
1245
                remote.RLock()
×
1246
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
×
1247
                remote.RUnlock()
×
1248
        }
×
1249

1250
        // Determines if we are soliciting the connection or not.
1251
        var solicited bool
903✔
1252
        var acc *Account
903✔
1253
        var remoteSuffix string
903✔
1254
        if remote != nil {
1,289✔
1255
                // For now, if lookup fails, we will constantly try
386✔
1256
                // to recreate this LN connection.
386✔
1257
                lacc := remote.LocalAccount
386✔
1258
                var err error
386✔
1259
                acc, err = s.LookupAccount(lacc)
386✔
1260
                if err != nil {
386✔
1261
                        // An account not existing is something that can happen with nats/http account resolver and the account
×
1262
                        // has not yet been pushed, or the request failed for other reasons.
×
1263
                        // remote needs to be set or retry won't happen
×
1264
                        c.leaf.remote = remote
×
1265
                        c.closeConnection(MissingAccount)
×
1266
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
×
1267
                        return nil
×
1268
                }
×
1269
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
386✔
1270
        }
1271

1272
        c.mu.Lock()
903✔
1273
        c.initClient()
903✔
1274
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
903✔
1275

903✔
1276
        var (
903✔
1277
                tlsFirst         bool
903✔
1278
                tlsFirstFallback time.Duration
903✔
1279
                infoTimeout      time.Duration
903✔
1280
        )
903✔
1281
        if remote != nil {
1,289✔
1282
                solicited = true
386✔
1283
                remote.Lock()
386✔
1284
                c.leaf.remote = remote
386✔
1285
                c.setPermissions(remote.perms)
386✔
1286
                if !c.leaf.remote.Hub {
766✔
1287
                        c.leaf.isSpoke = true
380✔
1288
                }
380✔
1289
                tlsFirst = remote.TLSHandshakeFirst
386✔
1290
                infoTimeout = remote.FirstInfoTimeout
386✔
1291
                remote.Unlock()
386✔
1292
                c.acc = acc
386✔
1293
        } else {
517✔
1294
                c.flags.set(expectConnect)
517✔
1295
                if ws != nil {
517✔
1296
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
×
1297
                }
×
1298
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
517✔
1299
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
517✔
1300
                        tlsFirstFallback = f
×
1301
                }
×
1302
        }
1303
        c.mu.Unlock()
903✔
1304

903✔
1305
        var nonce [nonceLen]byte
903✔
1306
        var info *Info
903✔
1307

903✔
1308
        // Grab this before the client lock below.
903✔
1309
        if !solicited {
1,420✔
1310
                // Grab server variables
517✔
1311
                s.mu.Lock()
517✔
1312
                info = s.copyLeafNodeInfo()
517✔
1313
                // For tests that want to simulate old servers, do not set the compression
517✔
1314
                // on the INFO protocol if configured with CompressionNotSupported.
517✔
1315
                // Also suppress it if WebSocket compression is already in use, otherwise
517✔
1316
                // an old soliciting peer would honor the advertised mode, switch to S2,
517✔
1317
                // and then wait forever for a compressed INFO response from us.
517✔
1318
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported && (ws == nil || !ws.compress) {
1,034✔
1319
                        info.Compression = cm
517✔
1320
                }
517✔
1321
                // We always send a nonce for LEAF connections. Do not change that without
1322
                // taking into account presence of proxy trusted keys.
1323
                s.generateNonce(nonce[:])
517✔
1324
                s.mu.Unlock()
517✔
1325
        }
1326

1327
        // Grab lock
1328
        c.mu.Lock()
903✔
1329

903✔
1330
        var preBuf []byte
903✔
1331
        if solicited {
1,289✔
1332
                // For websocket connection, we need to send an HTTP request,
386✔
1333
                // and get the response before starting the readLoop to get
386✔
1334
                // the INFO, etc..
386✔
1335
                if c.isWebsocket() {
386✔
1336
                        var err error
×
1337
                        var closeReason ClosedState
×
1338

×
1339
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
×
1340
                        if err != nil {
×
1341
                                c.Errorf("Error soliciting websocket connection: %v", err)
×
1342
                                c.mu.Unlock()
×
1343
                                if closeReason != 0 {
×
1344
                                        c.closeConnection(closeReason)
×
1345
                                }
×
1346
                                return nil
×
1347
                        }
1348
                } else {
386✔
1349
                        // If configured to do TLS handshake first
386✔
1350
                        if tlsFirst {
386✔
1351
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
×
1352
                                        c.mu.Unlock()
×
1353
                                        return nil
×
1354
                                }
×
1355
                        }
1356
                        // We need to wait for the info, but not for too long.
1357
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
386✔
1358
                }
1359

1360
                // We will process the INFO from the readloop and finish by
1361
                // sending the CONNECT and finish registration later.
1362
        } else {
517✔
1363
                // Send our info to the other side.
517✔
1364
                // Remember the nonce we sent here for signatures, etc.
517✔
1365
                c.nonce = make([]byte, nonceLen)
517✔
1366
                copy(c.nonce, nonce[:])
517✔
1367
                info.Nonce = bytesToString(c.nonce)
517✔
1368
                info.CID = c.cid
517✔
1369
                proto := generateInfoJSON(info)
517✔
1370

517✔
1371
                var pre []byte
517✔
1372
                // We need first to check for "TLS First" fallback delay.
517✔
1373
                if tlsFirstFallback > 0 {
517✔
1374
                        // We wait and see if we are getting any data. Since we did not send
×
1375
                        // the INFO protocol yet, only clients that use TLS first should be
×
1376
                        // sending data (the TLS handshake). We don't really check the content:
×
1377
                        // if it is a rogue agent and not an actual client performing the
×
1378
                        // TLS handshake, the error will be detected when performing the
×
1379
                        // handshake on our side.
×
1380
                        pre = make([]byte, 4)
×
1381
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
×
1382
                        n, _ := io.ReadFull(c.nc, pre[:])
×
1383
                        c.nc.SetReadDeadline(time.Time{})
×
1384
                        // If we get any data (regardless of possible timeout), we will proceed
×
1385
                        // with the TLS handshake.
×
1386
                        if n > 0 {
×
1387
                                pre = pre[:n]
×
1388
                        } else {
×
1389
                                // We did not get anything so we will send the INFO protocol.
×
1390
                                pre = nil
×
1391
                                // Set the boolean to false for the rest of the function.
×
1392
                                tlsFirst = false
×
1393
                        }
×
1394
                }
1395

1396
                if !tlsFirst {
1,034✔
1397
                        // We have to send from this go routine because we may
517✔
1398
                        // have to block for TLS handshake before we start our
517✔
1399
                        // writeLoop go routine. The other side needs to receive
517✔
1400
                        // this before it can initiate the TLS handshake..
517✔
1401
                        c.sendProtoNow(proto)
517✔
1402

517✔
1403
                        // The above call could have marked the connection as closed (due to TCP error).
517✔
1404
                        if c.isClosed() {
517✔
1405
                                c.mu.Unlock()
×
1406
                                c.closeConnection(WriteError)
×
1407
                                return nil
×
1408
                        }
×
1409
                }
1410

1411
                // Check to see if we need to spin up TLS.
1412
                if !c.isWebsocket() && info.TLSRequired {
543✔
1413
                        // If we have a prebuffer create a multi-reader.
26✔
1414
                        if len(pre) > 0 {
26✔
1415
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1416
                        }
×
1417
                        // Perform server-side TLS handshake.
1418
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
38✔
1419
                                c.mu.Unlock()
12✔
1420
                                return nil
12✔
1421
                        }
12✔
1422
                }
1423

1424
                // If the user wants the TLS handshake to occur first, now that it is
1425
                // done, send the INFO protocol.
1426
                if tlsFirst {
505✔
1427
                        c.flags.set(didTLSFirst)
×
1428
                        c.sendProtoNow(proto)
×
1429
                        if c.isClosed() {
×
1430
                                c.mu.Unlock()
×
1431
                                c.closeConnection(WriteError)
×
1432
                                return nil
×
1433
                        }
×
1434
                }
1435

1436
                // Leaf nodes will always require a CONNECT to let us know
1437
                // when we are properly bound to an account.
1438
                //
1439
                // If compression is configured, we can't set the authTimer here because
1440
                // it would cause the parser to fail any incoming protocol that is not a
1441
                // CONNECT (and we need to exchange INFO protocols for compression
1442
                // negotiation). So instead, use the ping timer until we are done with
1443
                // negotiation and can set the auth timer.
1444
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
505✔
1445
                if needsCompression(opts.LeafNode.Compression.Mode) {
802✔
1446
                        c.ping.tmr = time.AfterFunc(timeout, func() {
309✔
1447
                                c.authTimeout()
12✔
1448
                        })
12✔
1449
                } else {
208✔
1450
                        c.setAuthTimer(timeout)
208✔
1451
                }
208✔
1452
        }
1453

1454
        // Keep track in case server is shutdown before we can successfully register.
1455
        if !s.addToTempClients(c.cid, c) {
891✔
1456
                c.mu.Unlock()
×
1457
                c.setNoReconnect()
×
1458
                c.closeConnection(ServerShutdown)
×
1459
                return nil
×
1460
        }
×
1461

1462
        // Spin up the read loop.
1463
        s.startGoRoutine(func() { c.readLoop(preBuf) })
1,782✔
1464

1465
        // We will spin the write loop for solicited connections only
1466
        // when processing the INFO and after switching to TLS if needed.
1467
        if !solicited {
1,396✔
1468
                s.startGoRoutine(func() { c.writeLoop() })
1,010✔
1469
        }
1470

1471
        c.mu.Unlock()
891✔
1472

891✔
1473
        return c
891✔
1474
}
1475

1476
// Will perform the client-side TLS handshake if needed. Assumes that this
1477
// is called by the solicit side (remote will be non nil). Returns `true`
1478
// if TLS is required, `false` otherwise.
1479
// Lock held on entry.
1480
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
980✔
1481
        // Check if TLS is required and gather TLS config variables.
980✔
1482
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
980✔
1483
        if !tlsRequired {
1,936✔
1484
                return false, nil
956✔
1485
        }
956✔
1486

1487
        // If TLS required, peform handshake.
1488
        // Get the URL that was used to connect to the remote server.
1489
        rURL := remote.getCurrentURL()
24✔
1490

24✔
1491
        // Perform the client-side TLS handshake.
24✔
1492
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
30✔
1493
                // Check if we need to reset the remote's TLS name.
6✔
1494
                if resetTLSName {
6✔
1495
                        remote.Lock()
×
1496
                        remote.tlsName = _EMPTY_
×
1497
                        remote.Unlock()
×
1498
                }
×
1499
                return false, err
6✔
1500
        }
1501
        return true, nil
18✔
1502
}
1503

1504
func (c *client) processLeafnodeInfo(info *Info) {
1,319✔
1505
        c.mu.Lock()
1,319✔
1506
        if c.leaf == nil || c.isClosed() {
1,319✔
1507
                c.mu.Unlock()
×
1508
                return
×
1509
        }
×
1510
        s := c.srv
1,319✔
1511
        opts := s.getOpts()
1,319✔
1512
        remote := c.leaf.remote
1,319✔
1513
        didSolicit := remote != nil
1,319✔
1514
        firstINFO := !c.flags.isSet(infoReceived)
1,319✔
1515

1,319✔
1516
        // In case of websocket, the TLS handshake has been already done.
1,319✔
1517
        // So check only for non websocket connections and for configurations
1,319✔
1518
        // where the TLS Handshake was not done first.
1,319✔
1519
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
2,299✔
1520
                // If the server requires TLS, we need to set this in the remote
980✔
1521
                // otherwise if there is no TLS configuration block for the remote,
980✔
1522
                // the solicit side will not attempt to perform the TLS handshake.
980✔
1523
                if firstINFO && info.TLSRequired {
1,004✔
1524
                        // Check for TLS/proxy configuration mismatch
24✔
1525
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
24✔
1526
                                c.mu.Unlock()
×
1527
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1528
                                c.closeConnection(TLSHandshakeError)
×
1529
                                return
×
1530
                        }
×
1531
                        remote.TLS = true
24✔
1532
                }
1533
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
986✔
1534
                        c.mu.Unlock()
6✔
1535
                        return
6✔
1536
                }
6✔
1537
        }
1538

1539
        // Check for compression, unless already done.
1540
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
1,964✔
1541
                // A solicited leafnode connection must first receive a leafnode INFO.
651✔
1542
                // Classify wrong-port connections before any leaf-specific negotiation.
651✔
1543
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
651✔
1544
                        c.mu.Unlock()
×
1545
                        c.Errorf(ErrConnectedToWrongPort.Error())
×
1546
                        c.closeConnection(WrongPort)
×
1547
                        return
×
1548
                }
×
1549

1550
                // Prevent from getting back here.
1551
                c.flags.set(compressionNegotiated)
651✔
1552

651✔
1553
                var co *CompressionOpts
651✔
1554
                if !didSolicit {
935✔
1555
                        co = &opts.LeafNode.Compression
284✔
1556
                } else {
651✔
1557
                        co = &remote.Compression
367✔
1558
                }
367✔
1559
                if needsCompression(co.Mode) {
1,300✔
1560
                        // Release client lock since following function will need server lock.
649✔
1561
                        c.mu.Unlock()
649✔
1562
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
649✔
1563
                        if err != nil {
649✔
1564
                                c.sendErrAndErr(err.Error())
×
1565
                                c.closeConnection(ProtocolViolation)
×
1566
                                return
×
1567
                        }
×
1568
                        if compress {
1,215✔
1569
                                // Done for now, will get back another INFO protocol...
566✔
1570
                                return
566✔
1571
                        }
566✔
1572
                        // No compression because one side does not want/can't, so proceed.
1573
                        c.mu.Lock()
83✔
1574
                        // Check that the connection did not close if the lock was released.
83✔
1575
                        if c.isClosed() {
83✔
1576
                                c.mu.Unlock()
×
1577
                                return
×
1578
                        }
×
1579
                } else {
2✔
1580
                        // Coming from an old server, the Compression field would be the empty
2✔
1581
                        // string. For servers that are configured with CompressionNotSupported,
2✔
1582
                        // this makes them behave as old servers.
2✔
1583
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
3✔
1584
                                c.leaf.compression = CompressionNotSupported
1✔
1585
                        } else {
2✔
1586
                                c.leaf.compression = CompressionOff
1✔
1587
                        }
1✔
1588
                }
1589
                // Accepting side does not normally process an INFO protocol during
1590
                // initial connection handshake. So we keep it consistent by returning
1591
                // if we are not soliciting.
1592
                if !didSolicit {
86✔
1593
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1594
                        // clear the ping timer and set the auth timer now that the compression
1✔
1595
                        // negotiation is done.
1✔
1596
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1597
                                clearTimer(&c.ping.tmr)
×
1598
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1599
                        }
×
1600
                        c.mu.Unlock()
1✔
1601
                        return
1✔
1602
                }
1603
                // Fall through and process the INFO protocol as usual.
1604
        }
1605

1606
        // Note: For now, only the initial INFO has a nonce. We
1607
        // will probably do auto key rotation at some point.
1608
        if firstINFO {
1,132✔
1609
                // Mark that the INFO protocol has been received.
386✔
1610
                c.flags.set(infoReceived)
386✔
1611
                // Prevent connecting to non leafnode port. Need to do this only for
386✔
1612
                // the first INFO, not for async INFO updates...
386✔
1613
                //
386✔
1614
                // Content of INFO sent by the server when accepting a tcp connection.
386✔
1615
                // -------------------------------------------------------------------
386✔
1616
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
386✔
1617
                // -------------------------------------------------------------------
386✔
1618
                //      CLIENT    |  X* |        X**        |              |         |
386✔
1619
                //      ROUTE     |     |        X**        |      X***    |         |
386✔
1620
                //     GATEWAY    |     |                   |              |    X    |
386✔
1621
                //     LEAFNODE   |  X  |                   |       X      |         |
386✔
1622
                // -------------------------------------------------------------------
386✔
1623
                // *   Not on older servers.
386✔
1624
                // **  Not if "no advertise" is enabled.
386✔
1625
                // *** Not if leafnode's "no advertise" is enabled.
386✔
1626
                //
386✔
1627
                // Reject a cluster that contains spaces.
386✔
1628
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
387✔
1629
                        c.mu.Unlock()
1✔
1630
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1631
                        c.closeConnection(ProtocolViolation)
1✔
1632
                        return
1✔
1633
                }
1✔
1634
                // For solicited outbound leaf connections, capture the remote's nonce.
1635
                // For inbound leaf connections, keep using the server-issued nonce that
1636
                // was sent in our initial INFO and must be signed in CONNECT.
1637
                if didSolicit {
752✔
1638
                        c.nonce = []byte(info.Nonce)
367✔
1639
                }
367✔
1640
                if info.TLSRequired && didSolicit {
403✔
1641
                        remote.TLS = true
18✔
1642
                }
18✔
1643
                supportsHeaders := c.srv.supportsHeaders()
385✔
1644
                c.headers = supportsHeaders && info.Headers
385✔
1645

385✔
1646
                // Remember the remote server.
385✔
1647
                // Pre 2.2.0 servers are not sending their server name.
385✔
1648
                // In that case, use info.ID, which, for those servers, matches
385✔
1649
                // the content of the field `Name` in the leafnode CONNECT protocol.
385✔
1650
                if info.Name == _EMPTY_ {
385✔
1651
                        c.leaf.remoteServer = info.ID
×
1652
                } else {
385✔
1653
                        c.leaf.remoteServer = info.Name
385✔
1654
                }
385✔
1655
                c.leaf.remoteDomain = info.Domain
385✔
1656
                c.leaf.remoteCluster = info.Cluster
385✔
1657
                // We send the protocol version in the INFO protocol.
385✔
1658
                // Keep track of it, so we know if this connection supports message
385✔
1659
                // tracing for instance.
385✔
1660
                c.opts.Protocol = info.Proto
385✔
1661
        }
1662

1663
        // For both initial INFO and async INFO protocols, Possibly
1664
        // update our list of remote leafnode URLs we can connect to,
1665
        // unless we are instructed not to.
1666
        if didSolicit && !remote.IgnoreDiscoveredServers &&
745✔
1667
                (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
1,451✔
1668
                // Consider the incoming array as the most up-to-date
706✔
1669
                // representation of the remote cluster's list of URLs.
706✔
1670
                c.updateLeafNodeURLs(info)
706✔
1671
        }
706✔
1672

1673
        // Only solicited leafnode connections trust permission updates from INFO.
1674
        if didSolicit && (info.Import != nil || info.Export != nil) {
748✔
1675
                perms := &Permissions{
3✔
1676
                        Publish:   info.Export,
3✔
1677
                        Subscribe: info.Import,
3✔
1678
                }
3✔
1679
                // Check if we have local deny clauses that we need to merge.
3✔
1680
                if remote := c.leaf.remote; remote != nil {
6✔
1681
                        if len(remote.DenyExports) > 0 {
4✔
1682
                                if perms.Publish == nil {
1✔
1683
                                        perms.Publish = &SubjectPermission{}
×
1684
                                }
×
1685
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1686
                        }
1687
                        if len(remote.DenyImports) > 0 {
4✔
1688
                                if perms.Subscribe == nil {
1✔
1689
                                        perms.Subscribe = &SubjectPermission{}
×
1690
                                }
×
1691
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1692
                        }
1693
                }
1694
                c.setPermissions(perms)
3✔
1695
        }
1696

1697
        var resumeConnect bool
745✔
1698

745✔
1699
        // If this is a remote connection and this is the first INFO protocol,
745✔
1700
        // then we need to finish the connect process by sending CONNECT, etc..
745✔
1701
        if firstINFO && didSolicit {
1,112✔
1702
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
367✔
1703
                c.nc.SetDeadline(time.Time{})
367✔
1704
                resumeConnect = true
367✔
1705
        } else if !firstINFO && didSolicit {
1,084✔
1706
                c.leaf.remoteAccName = info.RemoteAccount
339✔
1707
        }
339✔
1708

1709
        // Check if we have the remote account information and if so make sure it's stored.
1710
        if info.RemoteAccount != _EMPTY_ {
1,082✔
1711
                if c.acc == nil {
337✔
1712
                        c.mu.Unlock()
×
1713
                        c.sendErr("Authorization Violation")
×
1714
                        c.closeConnection(ProtocolViolation)
×
1715
                        return
×
1716
                }
×
1717
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
337✔
1718
        }
1719
        c.mu.Unlock()
745✔
1720

745✔
1721
        finishConnect := info.ConnectInfo
745✔
1722
        if resumeConnect && s != nil {
1,112✔
1723
                s.leafNodeResumeConnectProcess(c)
367✔
1724
                if !info.InfoOnConnect {
367✔
1725
                        finishConnect = true
×
1726
                }
×
1727
        }
1728
        if finishConnect {
1,082✔
1729
                s.leafNodeFinishConnectProcess(c)
337✔
1730
        }
337✔
1731

1732
        // Check to see if we need to kick any internal source or mirror consumers.
1733
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1734
        // does not have jetstream.
1735
        s.checkInternalSyncConsumers(c.acc)
745✔
1736
}
1737

1738
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
649✔
1739
        // If WebSocket compression is already negotiated on this connection then
649✔
1740
        // we shouldn't layer S2 compression on top of it.
649✔
1741
        c.mu.Lock()
649✔
1742
        if c.ws != nil && c.ws.compress {
649✔
1743
                c.leaf.compression = CompressionOff
×
1744
                c.mu.Unlock()
×
1745
                return false, nil
×
1746
        }
×
1747
        c.mu.Unlock()
649✔
1748
        // Negotiate the appropriate compression mode (or no compression)
649✔
1749
        cm, err := selectCompressionMode(co.Mode, infoCompression)
649✔
1750
        if err != nil {
649✔
1751
                return false, err
×
1752
        }
×
1753
        c.mu.Lock()
649✔
1754
        // For "auto" mode, set the initial compression mode based on RTT
649✔
1755
        if cm == CompressionS2Auto {
1,215✔
1756
                if c.rttStart.IsZero() {
1,132✔
1757
                        c.rtt = computeRTT(c.start)
566✔
1758
                }
566✔
1759
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
566✔
1760
        }
1761
        // Keep track of the negotiated compression mode.
1762
        c.leaf.compression = cm
649✔
1763
        cid := c.cid
649✔
1764
        var nonce string
649✔
1765
        if !didSolicit {
932✔
1766
                nonce = bytesToString(c.nonce)
283✔
1767
        }
283✔
1768
        c.mu.Unlock()
649✔
1769

649✔
1770
        if !needsCompression(cm) {
732✔
1771
                return false, nil
83✔
1772
        }
83✔
1773

1774
        // If we end-up doing compression...
1775

1776
        // Generate an INFO with the chosen compression mode.
1777
        s.mu.Lock()
566✔
1778
        info := s.copyLeafNodeInfo()
566✔
1779
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
566✔
1780
        infoProto := generateInfoJSON(info)
566✔
1781
        s.mu.Unlock()
566✔
1782

566✔
1783
        // If we solicited, then send this INFO protocol BEFORE switching
566✔
1784
        // to compression writer. However, if we did not, we send it after.
566✔
1785
        c.mu.Lock()
566✔
1786
        if didSolicit {
849✔
1787
                c.enqueueProto(infoProto)
283✔
1788
                // Make sure it is completely flushed (the pending bytes goes to
283✔
1789
                // 0) before proceeding.
283✔
1790
                for c.out.pb > 0 && !c.isClosed() {
566✔
1791
                        c.flushOutbound()
283✔
1792
                }
283✔
1793
        }
1794
        // This is to notify the readLoop that it should switch to a
1795
        // (de)compression reader.
1796
        c.in.flags.set(switchToCompression)
566✔
1797
        // Create the compress writer before queueing the INFO protocol for
566✔
1798
        // a route that did not solicit. It will make sure that that proto
566✔
1799
        // is sent with compression on.
566✔
1800
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
566✔
1801
        if !didSolicit {
849✔
1802
                c.enqueueProto(infoProto)
283✔
1803
        }
283✔
1804
        c.mu.Unlock()
566✔
1805
        return true, nil
566✔
1806
}
1807

1808
// When getting a leaf node INFO protocol, use the provided
1809
// array of urls to update the list of possible endpoints.
1810
func (c *client) updateLeafNodeURLs(info *Info) {
706✔
1811
        cfg := c.leaf.remote
706✔
1812
        cfg.Lock()
706✔
1813
        defer cfg.Unlock()
706✔
1814

706✔
1815
        // We have ensured that if a remote has a WS scheme, then all are.
706✔
1816
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
706✔
1817
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
706✔
1818
                // It does not really matter if we use "ws://" or "wss://" here since
×
1819
                // we will have already marked that the remote should use TLS anyway.
×
1820
                // But use proper scheme for log statements, etc...
×
1821
                proto := wsSchemePrefix
×
1822
                if cfg.TLS {
×
1823
                        proto = wsSchemePrefixTLS
×
1824
                }
×
1825
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
×
1826
                return
×
1827
        }
1828
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
706✔
1829
}
1830

1831
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
706✔
1832
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
706✔
1833
        // Add the ones we receive in the protocol
706✔
1834
        for _, surl := range URLs {
2,137✔
1835
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
1,431✔
1836
                if err != nil {
1,431✔
1837
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1838
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1839
                        continue
×
1840
                }
1841
                // Do not add if it's the same as what we already have configured.
1842
                var dup bool
1,431✔
1843
                for _, u := range cfg.URLs {
3,689✔
1844
                        // URLs that we receive never have user info, but the
2,258✔
1845
                        // ones that were configured may have. Simply compare
2,258✔
1846
                        // host and port to decide if they are equal or not.
2,258✔
1847
                        if url.Host == u.Host && url.Port() == u.Port() {
3,446✔
1848
                                dup = true
1,188✔
1849
                                break
1,188✔
1850
                        }
1851
                }
1852
                if !dup {
1,674✔
1853
                        cfg.urls = append(cfg.urls, url)
243✔
1854
                        cfg.saveTLSHostname(url)
243✔
1855
                }
243✔
1856
        }
1857
        // Add the configured one
1858
        cfg.urls = append(cfg.urls, cfg.URLs...)
706✔
1859
}
1860

1861
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1862
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,536✔
1863
        opts := s.getOpts()
3,536✔
1864
        if opts.LeafNode.Advertise != _EMPTY_ {
3,547✔
1865
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1866
                if err != nil {
11✔
1867
                        return err
×
1868
                }
×
1869
                s.leafNodeInfo.Host = advHost
11✔
1870
                s.leafNodeInfo.Port = advPort
11✔
1871
        } else {
3,525✔
1872
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,525✔
1873
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,525✔
1874
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,525✔
1875
                // This will return at most 1 IP.
3,525✔
1876
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,525✔
1877
                if err != nil {
3,525✔
1878
                        return err
×
1879
                }
×
1880
                if hostIsIPAny {
3,544✔
1881
                        if len(ips) == 0 {
19✔
1882
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1883
                                        s.leafNodeInfo.Host)
×
1884
                        } else {
19✔
1885
                                // Take the first from the list...
19✔
1886
                                s.leafNodeInfo.Host = ips[0]
19✔
1887
                        }
19✔
1888
                }
1889
        }
1890
        // Use just host:port for the IP
1891
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,536✔
1892
        if opts.LeafNode.Advertise != _EMPTY_ {
3,547✔
1893
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1894
        }
11✔
1895
        return nil
3,536✔
1896
}
1897

1898
// Add the connection to the map of leaf nodes.
1899
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1900
// if a connection already exists for the same server name and account.
1901
// That can happen when the remote is attempting to reconnect while the accepting
1902
// side did not detect the connection as broken yet.
1903
// But it can also happen when there is a misconfiguration and the remote is
1904
// creating two (or more) connections that bind to the same account on the accept
1905
// side.
1906
// When a duplicate is found, the new connection is accepted and the old is closed
1907
// (this solves the stale connection situation). An error is returned to help the
1908
// remote detect the misconfiguration when the duplicate is the result of that
1909
// misconfiguration.
1910
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) bool {
711✔
1911
        var accName string
711✔
1912
        c.mu.Lock()
711✔
1913
        cid := c.cid
711✔
1914
        acc := c.acc
711✔
1915
        if acc != nil {
1,422✔
1916
                accName = acc.Name
711✔
1917
        }
711✔
1918
        myRemoteDomain := c.leaf.remoteDomain
711✔
1919
        mySrvName := c.leaf.remoteServer
711✔
1920
        remoteAccName := c.leaf.remoteAccName
711✔
1921
        myClustName := c.leaf.remoteCluster
711✔
1922
        remote := c.leaf.remote
711✔
1923
        solicited := remote != nil
711✔
1924
        c.mu.Unlock()
711✔
1925

711✔
1926
        var old *client
711✔
1927
        s.mu.Lock()
711✔
1928
        // We check for empty because in some test we may send empty CONNECT{}
711✔
1929
        if checkForDup && srvName != _EMPTY_ {
1,048✔
1930
                for _, ol := range s.leafs {
623✔
1931
                        ol.mu.Lock()
286✔
1932
                        // We care here only about non solicited Leafnode. This function
286✔
1933
                        // is more about replacing stale connections than detecting loops.
286✔
1934
                        // We have code for the loop detection elsewhere, which also delays
286✔
1935
                        // attempt to reconnect.
286✔
1936
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
286✔
1937
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
286✔
1938
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
286✔
1939
                                old = ol
×
1940
                        }
×
1941
                        ol.mu.Unlock()
286✔
1942
                        if old != nil {
286✔
1943
                                break
×
1944
                        }
1945
                }
1946
        }
1947
        // Now that we are under the server lock and before adding it to the map,
1948
        // for a solicited leaf, we need to make sure that it has not been removed
1949
        // from the config or disabled.
1950
        if solicited {
1,048✔
1951
                // If no longer valid, do not add to the server map. The connection
337✔
1952
                // should have been marked so that it can't reconnect. When the caller
337✔
1953
                // calls closeConnection(), cleanup (including clearing the connect-
337✔
1954
                // in-progress flag) will occur at the appropriate time.
337✔
1955
                if !remote.stillValid() {
337✔
1956
                        // Prevent reconnect in case it was not yet done.
×
1957
                        c.setNoReconnect()
×
1958
                        s.mu.Unlock()
×
1959
                        s.removeFromTempClients(cid)
×
1960
                        return false
×
1961
                }
×
1962
                remote.setConnectInProgress(false)
337✔
1963
        }
1964
        // Store new connection in the map
1965
        s.leafs[cid] = c
711✔
1966
        s.mu.Unlock()
711✔
1967
        s.removeFromTempClients(cid)
711✔
1968

711✔
1969
        // If applicable, evict the old one.
711✔
1970
        if old != nil {
711✔
1971
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
×
1972
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
×
1973
                c.Warnf("Replacing connection from same server")
×
1974
        }
×
1975

1976
        srvDecorated := func() string {
896✔
1977
                if myClustName == _EMPTY_ {
205✔
1978
                        return mySrvName
20✔
1979
                }
20✔
1980
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
165✔
1981
        }
1982

1983
        opts := s.getOpts()
711✔
1984
        sysAcc := s.SystemAccount()
711✔
1985
        js := s.getJetStream()
711✔
1986
        var meta *raft
711✔
1987
        if js != nil {
1,156✔
1988
                if mg := js.getMetaGroup(); mg != nil {
778✔
1989
                        meta = mg.(*raft)
333✔
1990
                }
333✔
1991
        }
1992
        blockMappingOutgoing := false
711✔
1993
        // Deny (non domain) JetStream API traffic unless system account is shared
711✔
1994
        // and domain names are identical and extending is not disabled
711✔
1995

711✔
1996
        // Check if backwards compatibility has been enabled and needs to be acted on
711✔
1997
        forceSysAccDeny := false
711✔
1998
        if len(opts.JsAccDefaultDomain) > 0 {
744✔
1999
                if acc == sysAcc {
44✔
2000
                        for _, d := range opts.JsAccDefaultDomain {
22✔
2001
                                if d == _EMPTY_ {
19✔
2002
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
2003
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
2004
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
2005
                                        forceSysAccDeny = true
8✔
2006
                                        break
8✔
2007
                                }
2008
                        }
2009
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
35✔
2010
                        // for backwards compatibility with old setups that do not have a domain name set
13✔
2011
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
13✔
2012
                        return true
13✔
2013
                }
13✔
2014
        }
2015

2016
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
2017
        // This is either signaled by js being disabled and a domain set,
2018
        // or in cases where no domain name exists, an extension hint is set.
2019
        // However, this is only relevant in mixed setups.
2020
        //
2021
        // If the system account connects but default domains are present, JetStream can't be extended.
2022
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
698✔
2023
                sysAcc == nil || acc == nil || forceSysAccDeny {
1,261✔
2024
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
563✔
2025
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
563✔
2026
                if acc != nil && acc == sysAcc {
682✔
2027
                        c.Noticef("System account connected from %s", srvDecorated())
119✔
2028
                        c.Noticef("JetStream not extended, domains differ")
119✔
2029
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
119✔
2030
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
119✔
2031
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
119✔
2032
                        if solicited && meta != nil && meta.IsObserver() {
142✔
2033
                                meta.setObserver(false, extNotExtended)
23✔
2034
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
23✔
2035
                                // Take note that the domain was not extended to avoid this state from startup.
23✔
2036
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
23✔
2037
                                // Meta controller can't be leader yet.
23✔
2038
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
23✔
2039
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
23✔
2040
                                meta.Campaign()
23✔
2041
                        }
23✔
2042
                } else {
444✔
2043
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
444✔
2044
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
444✔
2045
                }
444✔
2046
                blockMappingOutgoing = true
563✔
2047
        } else if acc == sysAcc {
201✔
2048
                // system account and same domain
66✔
2049
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
66✔
2050
                        myRemoteDomain, srvDecorated())
66✔
2051
                // In an extension use case, pin leadership to server remotes connect to.
66✔
2052
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
66✔
2053
                if solicited && meta != nil && !meta.IsObserver() {
70✔
2054
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
2055
                        // Discard any local metagroup state accumulated before the SYS-account
4✔
2056
                        // leaf came up (e.g. the wrong-hint case where this server bootstrapped
4✔
2057
                        // its own metagroup). The parent's view is now authoritative; without
4✔
2058
                        // this reset the two raft logs stay forked because the standalone log's
4✔
2059
                        // commit prefix short-circuits the follower's AE handling.
4✔
2060
                        meta.setObserver(true, extExtended)
4✔
2061
                        meta.Reset()
4✔
2062
                }
4✔
2063
        } else {
69✔
2064
                // This deny is needed in all cases (system account shared or not)
69✔
2065
                // If the system account is shared, jsAllAPI traffic will go through the system account.
69✔
2066
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
69✔
2067
                // If the system account is NOT shared, jsAllAPI traffic has no business
69✔
2068
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
69✔
2069
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
69✔
2070
        }
69✔
2071
        // If we have a specified JetStream domain we will want to add a mapping to
2072
        // allow access cross domain for each non-system account.
2073
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
918✔
2074
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,200✔
2075
                        if err := acc.AddMapping(src, dest); err != nil {
1,980✔
2076
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
2077
                        } else {
1,980✔
2078
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
1,980✔
2079
                        }
1,980✔
2080
                }
2081
                if blockMappingOutgoing {
409✔
2082
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
189✔
2083
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
189✔
2084
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
189✔
2085
                        // of this issue, not all of them.
189✔
2086
                        // This guards against a hub and a spoke having the same domain name.
189✔
2087
                        // But not two spokes having the same one and the request coming from the hub.
189✔
2088
                        c.mergeDenyPermissionsLocked(pub, []string{src})
189✔
2089
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
189✔
2090
                }
189✔
2091
        }
2092
        return true
698✔
2093
}
2094

2095
func (s *Server) removeLeafNodeConnection(c *client) {
904✔
2096
        s.mu.Lock()
904✔
2097
        c.mu.Lock()
904✔
2098
        cid := c.cid
904✔
2099
        if c.leaf != nil {
1,807✔
2100
                if c.leaf.tsubt != nil {
1,515✔
2101
                        c.leaf.tsubt.Stop()
612✔
2102
                        c.leaf.tsubt = nil
612✔
2103
                }
612✔
2104
                if c.leaf.gwSub != nil {
1,240✔
2105
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
337✔
2106
                        // We need to set this to nil for GC to release the connection
337✔
2107
                        c.leaf.gwSub = nil
337✔
2108
                }
337✔
2109
                if remote := c.leaf.remote; remote != nil {
1,289✔
2110
                        // If "noReconnect" is true, then we won't attempt to reconnect, so
386✔
2111
                        // we will clear the "connect-in-progress" flag. However, if we can
386✔
2112
                        // reconnect, then we should set "connect-in-progress" to true while
386✔
2113
                        // we are under the server/client lock. The go routine that performs
386✔
2114
                        // the reconnect will be started later and there would be a gap with
386✔
2115
                        // the wrong flag value otherwise.
386✔
2116
                        remote.setConnectInProgress(!c.flags.isSet(noReconnect))
386✔
2117
                }
386✔
2118
        }
2119
        proxyKey := c.proxyKey
904✔
2120
        c.mu.Unlock()
904✔
2121
        delete(s.leafs, cid)
904✔
2122
        if proxyKey != _EMPTY_ {
908✔
2123
                s.removeProxiedConn(proxyKey, cid)
4✔
2124
        }
4✔
2125
        s.mu.Unlock()
904✔
2126
        s.removeFromTempClients(cid)
904✔
2127
}
2128

2129
// Connect information for solicited leafnodes.
2130
type leafConnectInfo struct {
2131
        Version   string   `json:"version,omitempty"`
2132
        Nkey      string   `json:"nkey,omitempty"`
2133
        JWT       string   `json:"jwt,omitempty"`
2134
        Sig       string   `json:"sig,omitempty"`
2135
        User      string   `json:"user,omitempty"`
2136
        Pass      string   `json:"pass,omitempty"`
2137
        Token     string   `json:"auth_token,omitempty"`
2138
        ID        string   `json:"server_id,omitempty"`
2139
        Domain    string   `json:"domain,omitempty"`
2140
        Name      string   `json:"name,omitempty"`
2141
        Hub       bool     `json:"is_hub,omitempty"`
2142
        Cluster   string   `json:"cluster,omitempty"`
2143
        Headers   bool     `json:"headers,omitempty"`
2144
        JetStream bool     `json:"jetstream,omitempty"`
2145
        DenyPub   []string `json:"deny_pub,omitempty"`
2146
        Isolate   bool     `json:"isolate,omitempty"`
2147

2148
        // There was an existing field called:
2149
        // >> Comp bool `json:"compression,omitempty"`
2150
        // that has never been used. With support for compression, we now need
2151
        // a field that is a string. So we use a different json tag:
2152
        Compression string `json:"compress_mode,omitempty"`
2153

2154
        // Just used to detect wrong connection attempts.
2155
        Gateway string `json:"gateway,omitempty"`
2156

2157
        // Tells the accept side which account the remote is binding to.
2158
        RemoteAccount string `json:"remote_account,omitempty"`
2159

2160
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2161
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2162
        // version as part of the CONNECT. It will indicate if a connection supports
2163
        // some features, such as message tracing.
2164
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2165
        // in the low level process CONNECT.
2166
        Proto int `json:"protocol,omitempty"`
2167
}
2168

2169
// processLeafNodeConnect will process the inbound connect args.
2170
// Once we are here we are bound to an account, so can send any interest that
2171
// we would have to the other side.
2172
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
378✔
2173
        // Way to detect clients that incorrectly connect to the route listen
378✔
2174
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
378✔
2175
        if lang != _EMPTY_ {
378✔
2176
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2177
                c.closeConnection(WrongPort)
×
2178
                return ErrClientConnectedToLeafNodePort
×
2179
        }
×
2180

2181
        // Unmarshal as a leaf node connect protocol
2182
        proto := &leafConnectInfo{}
378✔
2183
        if err := json.Unmarshal(arg, proto); err != nil {
378✔
2184
                return err
×
2185
        }
×
2186

2187
        // Reject a cluster that contains spaces.
2188
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
379✔
2189
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2190
                c.closeConnection(ProtocolViolation)
1✔
2191
                return ErrClusterNameHasSpaces
1✔
2192
        }
1✔
2193

2194
        // Check for cluster name collisions.
2195
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
380✔
2196
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2197
                c.closeConnection(ClusterNamesIdentical)
3✔
2198
                return ErrLeafNodeHasSameClusterName
3✔
2199
        }
3✔
2200

2201
        // Reject if this has Gateway which means that it would be from a gateway
2202
        // connection that incorrectly connects to the leafnode port.
2203
        if proto.Gateway != _EMPTY_ {
374✔
2204
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2205
                c.Errorf(errTxt)
×
2206
                c.sendErr(errTxt)
×
2207
                c.closeConnection(WrongGateway)
×
2208
                return ErrWrongGateway
×
2209
        }
×
2210

2211
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
374✔
2212
                major, minor, update, _ := versionComponents(mv)
×
2213
                if !versionAtLeast(proto.Version, major, minor, update) {
×
2214
                        // Send back an INFO so recent remote servers process the rejection
×
2215
                        // cleanly, then close immediately. The soliciting side applies the
×
2216
                        // reconnect delay when it processes the error.
×
2217
                        s.sendPermsAndAccountInfo(c)
×
2218
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
×
2219
                        c.closeConnection(MinimumVersionRequired)
×
2220
                        return ErrMinimumVersionRequired
×
2221
                }
×
2222
        }
2223

2224
        // Check if this server supports headers.
2225
        supportHeaders := c.srv.supportsHeaders()
374✔
2226

374✔
2227
        c.mu.Lock()
374✔
2228
        // Leaf Nodes do not do echo or verbose or pedantic.
374✔
2229
        c.opts.Verbose = false
374✔
2230
        c.opts.Echo = false
374✔
2231
        c.opts.Pedantic = false
374✔
2232
        // This inbound connection will be marked as supporting headers if this server
374✔
2233
        // support headers and the remote has sent in the CONNECT protocol that it does
374✔
2234
        // support headers too.
374✔
2235
        c.headers = supportHeaders && proto.Headers
374✔
2236
        // If the compression level is still not set, set it based on what has been
374✔
2237
        // given to us in the CONNECT protocol.
374✔
2238
        if c.leaf.compression == _EMPTY_ {
486✔
2239
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
112✔
2240
                if proto.Compression == _EMPTY_ {
148✔
2241
                        c.leaf.compression = CompressionNotSupported
36✔
2242
                } else {
112✔
2243
                        c.leaf.compression = proto.Compression
76✔
2244
                }
76✔
2245
        }
2246

2247
        // Remember the remote server.
2248
        c.leaf.remoteServer = proto.Name
374✔
2249
        // Remember the remote account name
374✔
2250
        c.leaf.remoteAccName = proto.RemoteAccount
374✔
2251
        // Remember if the leafnode requested isolation.
374✔
2252
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
374✔
2253

374✔
2254
        // If the other side has declared itself a hub, so we will take on the spoke role.
374✔
2255
        if proto.Hub {
380✔
2256
                c.leaf.isSpoke = true
6✔
2257
        }
6✔
2258

2259
        // The soliciting side is part of a cluster.
2260
        if proto.Cluster != _EMPTY_ {
641✔
2261
                c.leaf.remoteCluster = proto.Cluster
267✔
2262
        }
267✔
2263

2264
        c.leaf.remoteDomain = proto.Domain
374✔
2265

374✔
2266
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
374✔
2267
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
374✔
2268
        if !c.isSolicitedLeafNode() && c.perms != nil {
378✔
2269
                sp, pp := c.perms.sub, c.perms.pub
4✔
2270
                c.perms.sub, c.perms.pub = pp, sp
4✔
2271
                if c.opts.Import != nil {
7✔
2272
                        c.darray = c.opts.Import.Deny
3✔
2273
                } else {
4✔
2274
                        c.darray = nil
1✔
2275
                }
1✔
2276
        }
2277

2278
        // Set the Ping timer
2279
        c.setFirstPingTimer()
374✔
2280

374✔
2281
        // If we received pub deny permissions from the other end, merge with existing ones.
374✔
2282
        c.mergeDenyPermissions(pub, proto.DenyPub)
374✔
2283

374✔
2284
        acc := c.acc
374✔
2285
        c.mu.Unlock()
374✔
2286

374✔
2287
        // If the account is not set (e.g. connection was closed due to auth
374✔
2288
        // timeout while still being processed), bail out to avoid a panic.
374✔
2289
        if acc == nil {
374✔
2290
                c.closeConnection(MissingAccount)
×
2291
                return ErrMissingAccount
×
2292
        }
×
2293

2294
        // Register the cluster, even if empty, as long as we are acting as a hub.
2295
        if !proto.Hub {
742✔
2296
                acc.registerLeafNodeCluster(proto.Cluster)
368✔
2297
        }
368✔
2298

2299
        // Add in the leafnode here since we passed through auth at this point.
2300
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
374✔
2301

374✔
2302
        // If we have permissions bound to this leafnode we need to send then back to the
374✔
2303
        // origin server for local enforcement.
374✔
2304
        s.sendPermsAndAccountInfo(c)
374✔
2305

374✔
2306
        // Create and initialize the smap since we know our bound account now.
374✔
2307
        // This will send all registered subs too.
374✔
2308
        s.initLeafNodeSmapAndSendSubs(c)
374✔
2309

374✔
2310
        // Announce the account connect event for a leaf node.
374✔
2311
        // This will be a no-op as needed.
374✔
2312
        s.sendLeafNodeConnect(c.acc)
374✔
2313

374✔
2314
        // Check to see if we need to kick any internal source or mirror consumers.
374✔
2315
        // This will be a no-op if JetStream not enabled for this server or if the bound account
374✔
2316
        // does not have jetstream.
374✔
2317
        s.checkInternalSyncConsumers(acc)
374✔
2318

374✔
2319
        return nil
374✔
2320
}
2321

2322
// checkInternalSyncConsumers
2323
func (s *Server) checkInternalSyncConsumers(acc *Account) {
1,119✔
2324
        // Grab our js
1,119✔
2325
        js := s.getJetStream()
1,119✔
2326

1,119✔
2327
        // Only applicable if we have JS and the leafnode has JS as well.
1,119✔
2328
        // We check for remote JS outside.
1,119✔
2329
        if !js.isEnabled() || acc == nil {
1,524✔
2330
                return
405✔
2331
        }
405✔
2332

2333
        // We will check all streams in our local account. They must be a leader and
2334
        // be sourcing or mirroring. We will check the external config on the stream itself
2335
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2336
        // extending the system across this leafnode connection and hence we would be extending
2337
        // our own domain.
2338
        jsa := js.lookupAccount(acc)
714✔
2339
        if jsa == nil {
982✔
2340
                return
268✔
2341
        }
268✔
2342

2343
        var streams []*stream
446✔
2344
        jsa.mu.RLock()
446✔
2345
        for _, mset := range jsa.streams {
506✔
2346
                mset.cfgMu.RLock()
60✔
2347
                // We need to have a mirror or source defined.
60✔
2348
                // We do not want to force another lock here to look for leader status,
60✔
2349
                // so collect and after we release jsa will make sure.
60✔
2350
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
73✔
2351
                        streams = append(streams, mset)
13✔
2352
                }
13✔
2353
                mset.cfgMu.RUnlock()
60✔
2354
        }
2355
        jsa.mu.RUnlock()
446✔
2356

446✔
2357
        // Now loop through all candidates and check if we are the leader and have NOT
446✔
2358
        // created the sync up consumer.
446✔
2359
        for _, mset := range streams {
459✔
2360
                mset.retryDisconnectedSyncConsumers()
13✔
2361
        }
13✔
2362
}
2363

2364
// Returns the remote cluster name. This is set only once so does not require a lock.
2365
func (c *client) remoteCluster() string {
60,325✔
2366
        if c.leaf == nil {
60,325✔
2367
                return _EMPTY_
×
2368
        }
×
2369
        return c.leaf.remoteCluster
60,325✔
2370
}
2371

2372
// Sends back an info block to the soliciting leafnode to let it know about
2373
// its permission settings for local enforcement.
2374
func (s *Server) sendPermsAndAccountInfo(c *client) {
374✔
2375
        // Copy
374✔
2376
        s.mu.Lock()
374✔
2377
        info := s.copyLeafNodeInfo()
374✔
2378
        s.mu.Unlock()
374✔
2379
        c.mu.Lock()
374✔
2380
        info.CID = c.cid
374✔
2381
        info.Import = c.opts.Import
374✔
2382
        info.Export = c.opts.Export
374✔
2383
        info.RemoteAccount = c.acc.Name
374✔
2384
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
374✔
2385
        info.IsSystemAccount = c.acc == s.SystemAccount()
374✔
2386
        info.ConnectInfo = true
374✔
2387
        c.enqueueProto(generateInfoJSON(info))
374✔
2388
        c.mu.Unlock()
374✔
2389
}
374✔
2390

2391
// Snapshot the current subscriptions from the sublist into our smap which
2392
// we will keep updated from now on.
2393
// Also send the registered subscriptions.
2394
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
711✔
2395
        acc := c.acc
711✔
2396
        if acc == nil {
711✔
2397
                c.Debugf("Leafnode does not have an account bound")
×
2398
                return
×
2399
        }
×
2400
        // Collect all account subs here.
2401
        _subs := [1024]*subscription{}
711✔
2402
        subs := _subs[:0]
711✔
2403
        ims := []string{}
711✔
2404

711✔
2405
        // Hold the client lock otherwise there can be a race and miss some subs.
711✔
2406
        c.mu.Lock()
711✔
2407
        defer c.mu.Unlock()
711✔
2408

711✔
2409
        acc.mu.RLock()
711✔
2410
        accName := acc.Name
711✔
2411
        accNTag := acc.nameTag
711✔
2412

711✔
2413
        // To make printing look better when no friendly name present.
711✔
2414
        if accNTag != _EMPTY_ {
714✔
2415
                accNTag = "/" + accNTag
3✔
2416
        }
3✔
2417

2418
        // If we are solicited we only send interest for local clients.
2419
        if c.isSpokeLeafNode() {
1,048✔
2420
                acc.sl.localSubs(&subs, true)
337✔
2421
        } else {
711✔
2422
                acc.sl.All(&subs)
374✔
2423
        }
374✔
2424

2425
        // Check if we have an existing service import reply.
2426
        siReply := copyBytes(acc.siReply)
711✔
2427

711✔
2428
        // Since leaf nodes only send on interest, if the bound
711✔
2429
        // account has import services we need to send those over.
711✔
2430
        for isubj := range acc.imports.services {
3,065✔
2431
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
2,514✔
2432
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
160✔
2433
                        continue
160✔
2434
                }
2435
                ims = append(ims, isubj)
2,194✔
2436
        }
2437
        // Likewise for mappings.
2438
        for _, m := range acc.mappings {
2,781✔
2439
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,088✔
2440
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
18✔
2441
                        continue
18✔
2442
                }
2443
                ims = append(ims, m.src)
2,052✔
2444
        }
2445

2446
        // Create a unique subject that will be used for loop detection.
2447
        lds := acc.lds
711✔
2448
        acc.mu.RUnlock()
711✔
2449

711✔
2450
        // Check if we have to create the LDS.
711✔
2451
        if lds == _EMPTY_ {
1,245✔
2452
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
534✔
2453
                acc.mu.Lock()
534✔
2454
                acc.lds = lds
534✔
2455
                acc.mu.Unlock()
534✔
2456
        }
534✔
2457

2458
        // Now check for gateway interest. Leafnodes will put this into
2459
        // the proper mode to propagate, but they are not held in the account.
2460
        gwsa := [16]*client{}
711✔
2461
        gws := gwsa[:0]
711✔
2462
        s.getOutboundGatewayConnections(&gws)
711✔
2463
        for _, cgw := range gws {
758✔
2464
                cgw.mu.Lock()
47✔
2465
                gw := cgw.gw
47✔
2466
                cgw.mu.Unlock()
47✔
2467
                if gw != nil {
94✔
2468
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
94✔
2469
                                if e := ei.(*outsie); e != nil && e.sl != nil {
94✔
2470
                                        e.sl.All(&subs)
47✔
2471
                                }
47✔
2472
                        }
2473
                }
2474
        }
2475

2476
        applyGlobalRouting := s.gateway.enabled
711✔
2477
        if c.isSpokeLeafNode() {
1,048✔
2478
                // Add a fake subscription for this solicited leafnode connection
337✔
2479
                // so that we can send back directly for mapped GW replies.
337✔
2480
                // We need to keep track of this subscription so it can be removed
337✔
2481
                // when the connection is closed so that the GC can release it.
337✔
2482
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
337✔
2483
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
337✔
2484
        }
337✔
2485

2486
        // Now walk the results and add them to our smap
2487
        rc := c.leaf.remoteCluster
711✔
2488
        c.leaf.smap = make(map[string]int32)
711✔
2489
        for _, sub := range subs {
32,049✔
2490
                // Check perms regardless of role.
31,338✔
2491
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
33,194✔
2492
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
1,856✔
2493
                        continue
1,856✔
2494
                }
2495
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2496
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
29,482✔
2497
                        continue
×
2498
                }
2499
                // We ignore ourselves here.
2500
                // Also don't add the subscription if it has a origin cluster and the
2501
                // cluster name matches the one of the client we are sending to.
2502
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
54,556✔
2503
                        count := int32(1)
25,074✔
2504
                        if len(sub.queue) > 0 && sub.qw > 0 {
25,079✔
2505
                                count = sub.qw
5✔
2506
                        }
5✔
2507
                        c.leaf.smap[keyFromSub(sub)] += count
25,074✔
2508
                        if c.leaf.tsub == nil {
25,726✔
2509
                                c.leaf.tsub = make(map[*subscription]struct{})
652✔
2510
                        }
652✔
2511
                        c.leaf.tsub[sub] = struct{}{}
25,074✔
2512
                }
2513
        }
2514
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2515
        for _, isubj := range ims {
4,957✔
2516
                c.leaf.smap[isubj]++
4,246✔
2517
        }
4,246✔
2518
        // If we have gateways enabled we need to make sure the other side sends us responses
2519
        // that have been augmented from the original subscription.
2520
        // TODO(dlc) - Should we lock this down more?
2521
        if applyGlobalRouting {
759✔
2522
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
48✔
2523
                c.leaf.smap[gwReplyPrefix+">"]++
48✔
2524
        }
48✔
2525
        // Detect loops by subscribing to a specific subject and checking
2526
        // if this sub is coming back to us.
2527
        c.leaf.smap[lds]++
711✔
2528

711✔
2529
        // Check if we need to add an existing siReply to our map.
711✔
2530
        // This will be a prefix so add on the wildcard.
711✔
2531
        if siReply != nil {
729✔
2532
                wcsub := append(siReply, '>')
18✔
2533
                c.leaf.smap[string(wcsub)]++
18✔
2534
        }
18✔
2535
        // Queue all protocols. There is no max pending limit for LN connection,
2536
        // so we don't need chunking. The writes will happen from the writeLoop.
2537
        var b bytes.Buffer
711✔
2538
        for key, n := range c.leaf.smap {
21,871✔
2539
                c.writeLeafSub(&b, key, n)
21,160✔
2540
        }
21,160✔
2541
        if b.Len() > 0 {
1,422✔
2542
                c.enqueueProto(b.Bytes())
711✔
2543
        }
711✔
2544
        if c.leaf.tsub != nil {
1,364✔
2545
                // Clear the tsub map after 5 seconds.
653✔
2546
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
694✔
2547
                        c.mu.Lock()
41✔
2548
                        if c.leaf != nil {
82✔
2549
                                c.leaf.tsub = nil
41✔
2550
                                c.leaf.tsubt = nil
41✔
2551
                        }
41✔
2552
                        c.mu.Unlock()
41✔
2553
                })
2554
        }
2555
}
2556

2557
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2558
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
189,865✔
2559
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
189,865✔
2560
        acc, err := s.lookupOrFetchAccount(accName, false)
189,865✔
2561
        if acc == nil || err != nil {
190,193✔
2562
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
328✔
2563
                return
328✔
2564
        }
328✔
2565
        acc.updateLeafNodes(sub, delta)
189,537✔
2566
}
2567

2568
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2569
// Will also forward to all leaf nodes as needed.
2570
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2571
// (that is, for which this server acts as a hub to them).
2572
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,336,593✔
2573
        if acc == nil || sub == nil {
2,336,593✔
2574
                return
×
2575
        }
×
2576

2577
        // We will do checks for no leafnodes and same cluster here inline and under the
2578
        // general account read lock.
2579
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2580
        // blocking routes or GWs.
2581

2582
        acc.mu.RLock()
2,336,593✔
2583
        // First check if we even have leafnodes here.
2,336,593✔
2584
        if acc.nleafs == 0 {
4,623,626✔
2585
                acc.mu.RUnlock()
2,287,033✔
2586
                return
2,287,033✔
2587
        }
2,287,033✔
2588

2589
        // Is this a loop detection subject.
2590
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
49,560✔
2591

49,560✔
2592
        // Capture the cluster even if its empty.
49,560✔
2593
        var cluster string
49,560✔
2594
        if sub.origin != nil {
87,031✔
2595
                cluster = bytesToString(sub.origin)
37,471✔
2596
        }
37,471✔
2597

2598
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2599
        // Empty clusters will return false for the check.
2600
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
65,070✔
2601
                acc.mu.RUnlock()
15,510✔
2602
                return
15,510✔
2603
        }
15,510✔
2604

2605
        // We can release the general account lock.
2606
        acc.mu.RUnlock()
34,050✔
2607

34,050✔
2608
        // We can hold the list lock here to avoid having to copy a large slice.
34,050✔
2609
        acc.lmu.RLock()
34,050✔
2610
        defer acc.lmu.RUnlock()
34,050✔
2611

34,050✔
2612
        // Do this once.
34,050✔
2613
        subject := string(sub.subject)
34,050✔
2614

34,050✔
2615
        // Walk the connected leafnodes from a random starting point to avoid
34,050✔
2616
        // concurrent callers all contending over leafs in the same order.
34,050✔
2617
        nleafs := len(acc.lleafs)
34,050✔
2618
        start := 0
34,050✔
2619
        if nleafs > 1 {
38,531✔
2620
                start = rand.Intn(nleafs)
4,481✔
2621
        }
4,481✔
2622
        for i := 0; i < nleafs; i++ {
75,221✔
2623
                ln := acc.lleafs[(start+i)%nleafs]
41,171✔
2624
                if ln == sub.client {
64,482✔
2625
                        continue
23,311✔
2626
                }
2627
                ln.mu.RLock()
17,860✔
2628
                // Don't advertise interest from leafnodes to other isolated leafnodes.
17,860✔
2629
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
17,860✔
2630
                        ln.mu.RUnlock()
×
2631
                        continue
×
2632
                }
2633
                // If `hubOnly` is true, it means that we want to update only leafnodes
2634
                // that connect to this server (so isHubLeafNode() would return `true`).
2635
                if hubOnly && !ln.isHubLeafNode() {
17,866✔
2636
                        ln.mu.RUnlock()
6✔
2637
                        continue
6✔
2638
                }
2639
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2640
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2641
                // the detection of loops as long as different cluster.
2642
                clusterDifferent := cluster != ln.remoteCluster()
17,854✔
2643
                update := (isLDS && clusterDifferent) ||
17,854✔
2644
                        ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribeInternal(subject)))
17,854✔
2645
                ln.mu.RUnlock()
17,854✔
2646
                if update {
33,019✔
2647
                        ln.mu.Lock()
15,165✔
2648
                        // The leaf role, isolation mode, and remote cluster are stable
15,165✔
2649
                        // for the connection. Recheck canSubscribe here since permissions
15,165✔
2650
                        // can change, and to initializes mperms for wildcard subscriptions
15,165✔
2651
                        // that collide with deny rules.
15,165✔
2652
                        if isLDS || delta <= 0 || ln.canSubscribe(subject) {
30,330✔
2653
                                ln.updateSmap(sub, delta, isLDS)
15,165✔
2654
                        }
15,165✔
2655
                        ln.mu.Unlock()
15,165✔
2656
                }
2657
        }
2658
}
2659

2660
// updateLeafNodes will make sure to update the account smap for the subscription.
2661
// Will also forward to all leaf nodes as needed.
2662
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,336,586✔
2663
        acc.updateLeafNodesEx(sub, delta, false)
2,336,586✔
2664
}
2,336,586✔
2665

2666
// This will make an update to our internal smap and determine if we should send out
2667
// an interest update to the remote side.
2668
// Lock should be held.
2669
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
15,165✔
2670
        if c.leaf.smap == nil {
15,183✔
2671
                return
18✔
2672
        }
18✔
2673

2674
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2675
        skind := sub.client.kind
15,147✔
2676
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
15,147✔
2677
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
19,872✔
2678
                return
4,725✔
2679
        }
4,725✔
2680

2681
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2682
        if delta > 0 && c.leaf.tsub != nil {
15,375✔
2683
                if _, present := c.leaf.tsub[sub]; present {
4,955✔
2684
                        delete(c.leaf.tsub, sub)
2✔
2685
                        if len(c.leaf.tsub) == 0 {
2✔
2686
                                c.leaf.tsub = nil
×
2687
                                c.leaf.tsubt.Stop()
×
2688
                                c.leaf.tsubt = nil
×
2689
                        }
×
2690
                        return
2✔
2691
                }
2692
        }
2693

2694
        key := keyFromSub(sub)
10,420✔
2695
        n, ok := c.leaf.smap[key]
10,420✔
2696
        if delta < 0 && !ok {
11,122✔
2697
                return
702✔
2698
        }
702✔
2699

2700
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2701
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
9,718✔
2702
        n += delta
9,718✔
2703
        if n > 0 {
16,820✔
2704
                c.leaf.smap[key] = n
7,102✔
2705
        } else {
9,718✔
2706
                delete(c.leaf.smap, key)
2,616✔
2707
        }
2,616✔
2708
        if update {
15,422✔
2709
                c.sendLeafNodeSubUpdate(key, n)
5,704✔
2710
        }
5,704✔
2711
}
2712

2713
// Used to force add subjects to the subject map.
2714
func (c *client) forceAddToSmap(subj string) {
2✔
2715
        c.mu.Lock()
2✔
2716
        defer c.mu.Unlock()
2✔
2717

2✔
2718
        if c.leaf.smap == nil {
2✔
2719
                return
×
2720
        }
×
2721
        n := c.leaf.smap[subj]
2✔
2722
        if n != 0 {
2✔
2723
                return
×
2724
        }
×
2725
        // Place into the map since it was not there.
2726
        c.leaf.smap[subj] = 1
2✔
2727
        c.sendLeafNodeSubUpdate(subj, 1)
2✔
2728
}
2729

2730
// Used to force remove a subject from the subject map.
2731
func (c *client) forceRemoveFromSmap(subj string) {
×
2732
        c.mu.Lock()
×
2733
        defer c.mu.Unlock()
×
2734

×
2735
        if c.leaf.smap == nil {
×
2736
                return
×
2737
        }
×
2738
        n := c.leaf.smap[subj]
×
2739
        if n == 0 {
×
2740
                return
×
2741
        }
×
2742
        n--
×
2743
        if n == 0 {
×
2744
                // Remove is now zero
×
2745
                delete(c.leaf.smap, subj)
×
2746
                c.sendLeafNodeSubUpdate(subj, 0)
×
2747
        } else {
×
2748
                c.leaf.smap[subj] = n
×
2749
        }
×
2750
}
2751

2752
// Send the subscription interest change to the other side.
2753
// Lock should be held.
2754
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
5,706✔
2755
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
5,706✔
2756
        if c.isSpokeLeafNode() {
6,861✔
2757
                checkPerms := true
1,155✔
2758
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
2,076✔
2759
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
921✔
2760
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
921✔
2761
                                strings.HasPrefix(key, gwReplyPrefix) {
961✔
2762
                                checkPerms = false
40✔
2763
                        }
40✔
2764
                }
2765
                if checkPerms {
2,270✔
2766
                        var subject string
1,115✔
2767
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
1,131✔
2768
                                subject = key[:sep]
16✔
2769
                        } else {
1,115✔
2770
                                subject = key
1,099✔
2771
                        }
1,099✔
2772
                        if !c.canSubscribe(subject) {
1,115✔
2773
                                return
×
2774
                        }
×
2775
                }
2776
        }
2777
        // If we are here we can send over to the other side.
2778
        _b := [64]byte{}
5,706✔
2779
        b := bytes.NewBuffer(_b[:0])
5,706✔
2780
        c.writeLeafSub(b, key, n)
5,706✔
2781
        c.enqueueProto(b.Bytes())
5,706✔
2782
}
2783

2784
// Helper function to build the key.
2785
func keyFromSub(sub *subscription) string {
36,359✔
2786
        var sb strings.Builder
36,359✔
2787
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
36,359✔
2788
        sb.Write(sub.subject)
36,359✔
2789
        if sub.queue != nil {
37,284✔
2790
                // Just make the key subject spc group, e.g. 'foo bar'
925✔
2791
                sb.WriteByte(' ')
925✔
2792
                sb.Write(sub.queue)
925✔
2793
        }
925✔
2794
        return sb.String()
36,359✔
2795
}
2796

2797
const (
2798
        keyRoutedSub         = "R"
2799
        keyRoutedSubByte     = 'R'
2800
        keyRoutedLeafSub     = "L"
2801
        keyRoutedLeafSubByte = 'L'
2802
)
2803

2804
// Helper function to build the key that prevents collisions between normal
2805
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2806
// Keys will look like this:
2807
// "R foo"          -> plain routed sub on "foo"
2808
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2809
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2810
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2811
func keyFromSubWithOrigin(sub *subscription) string {
691,517✔
2812
        var sb strings.Builder
691,517✔
2813
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
691,517✔
2814
        leaf := len(sub.origin) > 0
691,517✔
2815
        if leaf {
703,882✔
2816
                sb.WriteByte(keyRoutedLeafSubByte)
12,365✔
2817
        } else {
691,517✔
2818
                sb.WriteByte(keyRoutedSubByte)
679,152✔
2819
        }
679,152✔
2820
        sb.WriteByte(' ')
691,517✔
2821
        sb.Write(sub.subject)
691,517✔
2822
        if sub.queue != nil {
716,304✔
2823
                sb.WriteByte(' ')
24,787✔
2824
                sb.Write(sub.queue)
24,787✔
2825
        }
24,787✔
2826
        if leaf {
703,882✔
2827
                sb.WriteByte(' ')
12,365✔
2828
                sb.Write(sub.origin)
12,365✔
2829
        }
12,365✔
2830
        return sb.String()
691,517✔
2831
}
2832

2833
// Lock should be held.
2834
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
26,866✔
2835
        if key == _EMPTY_ {
26,866✔
2836
                return
×
2837
        }
×
2838
        if n > 0 {
51,116✔
2839
                w.WriteString("LS+ " + key)
24,250✔
2840
                // Check for queue semantics, if found write n.
24,250✔
2841
                if strings.Contains(key, " ") {
24,292✔
2842
                        w.WriteString(" ")
42✔
2843
                        var b [12]byte
42✔
2844
                        var i = len(b)
42✔
2845
                        for l := n; l > 0; l /= 10 {
84✔
2846
                                i--
42✔
2847
                                b[i] = digits[l%10]
42✔
2848
                        }
42✔
2849
                        w.Write(b[i:])
42✔
2850
                        if c.trace {
42✔
2851
                                arg := fmt.Sprintf("%s %d", key, n)
×
2852
                                c.traceOutOp("LS+", []byte(arg))
×
2853
                        }
×
2854
                } else if c.trace {
24,208✔
2855
                        c.traceOutOp("LS+", []byte(key))
×
2856
                }
×
2857
        } else {
2,616✔
2858
                w.WriteString("LS- " + key)
2,616✔
2859
                if c.trace {
2,616✔
2860
                        c.traceOutOp("LS-", []byte(key))
×
2861
                }
×
2862
        }
2863
        w.WriteString(CR_LF)
26,866✔
2864
}
2865

2866
// processLeafSub will process an inbound sub request for the remote leaf node.
2867
func (c *client) processLeafSub(argo []byte) (err error) {
24,024✔
2868
        // Indicate activity.
24,024✔
2869
        c.in.subs++
24,024✔
2870

24,024✔
2871
        srv := c.srv
24,024✔
2872
        if srv == nil {
24,024✔
2873
                return nil
×
2874
        }
×
2875

2876
        // Copy so we do not reference a potentially large buffer
2877
        arg := make([]byte, len(argo))
24,024✔
2878
        copy(arg, argo)
24,024✔
2879

24,024✔
2880
        args := splitArg(arg)
24,024✔
2881
        sub := &subscription{client: c}
24,024✔
2882

24,024✔
2883
        delta := int32(1)
24,024✔
2884
        switch len(args) {
24,024✔
2885
        case 1:
23,992✔
2886
                sub.queue = nil
23,992✔
2887
        case 3:
32✔
2888
                sub.queue = args[1]
32✔
2889
                sub.qw = int32(parseSize(args[2]))
32✔
2890
                // TODO: (ik) We should have a non empty queue name and a queue
32✔
2891
                // weight >= 1. For 2.11, we may want to return an error if that
32✔
2892
                // is not the case, but for now just overwrite `delta` if queue
32✔
2893
                // weight is greater than 1 (it is possible after a reconnect/
32✔
2894
                // server restart to receive a queue weight > 1 for a new sub).
32✔
2895
                if sub.qw > 1 {
52✔
2896
                        delta = sub.qw
20✔
2897
                }
20✔
2898
        default:
×
2899
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2900
        }
2901
        sub.subject = args[0]
24,024✔
2902

24,024✔
2903
        c.mu.Lock()
24,024✔
2904
        if c.isClosed() {
24,025✔
2905
                c.mu.Unlock()
1✔
2906
                return nil
1✔
2907
        }
1✔
2908

2909
        acc := c.acc
24,023✔
2910
        // Guard against LS+ arriving before CONNECT has been processed, which
24,023✔
2911
        // can happen when compression is enabled.
24,023✔
2912
        if acc == nil {
24,023✔
2913
                c.mu.Unlock()
×
2914
                c.sendErr("Authorization Violation")
×
2915
                c.closeConnection(ProtocolViolation)
×
2916
                return nil
×
2917
        }
×
2918
        // Check if we have a loop.
2919
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
24,023✔
2920

24,023✔
2921
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
24,023✔
2922
                c.mu.Unlock()
×
2923
                c.handleLeafNodeLoop(true)
×
2924
                return nil
×
2925
        }
×
2926

2927
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2928
        checkPerms := true
24,023✔
2929
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
47,668✔
2930
                if ldsPrefix ||
23,645✔
2931
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
23,645✔
2932
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
24,849✔
2933
                        checkPerms = false
1,204✔
2934
                }
1,204✔
2935
        }
2936

2937
        // If we are a hub check that we can publish to this subject.
2938
        if checkPerms {
46,842✔
2939
                subj := string(sub.subject)
22,819✔
2940
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
22,824✔
2941
                        c.mu.Unlock()
5✔
2942
                        c.leafSubPermViolation(sub.subject)
5✔
2943
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
5✔
2944
                        return nil
5✔
2945
                }
5✔
2946
        }
2947

2948
        // Check if we have a maximum on the number of subscriptions.
2949
        if c.subsAtLimit() {
24,026✔
2950
                c.mu.Unlock()
8✔
2951
                c.maxSubsExceeded()
8✔
2952
                return nil
8✔
2953
        }
8✔
2954

2955
        // If we have an origin cluster associated mark that in the sub.
2956
        if rc := c.remoteCluster(); rc != _EMPTY_ {
45,338✔
2957
                sub.origin = []byte(rc)
21,328✔
2958
        }
21,328✔
2959

2960
        // Like Routes, we store local subs by account and subject and optionally queue name.
2961
        // If we have a queue it will have a trailing weight which we do not want.
2962
        if sub.queue != nil {
24,041✔
2963
                sub.sid = arg[:len(arg)-len(args[2])-1]
31✔
2964
        } else {
24,010✔
2965
                sub.sid = arg
23,979✔
2966
        }
23,979✔
2967
        key := bytesToString(sub.sid)
24,010✔
2968
        osub := c.subs[key]
24,010✔
2969
        if osub == nil {
48,006✔
2970
                c.subs[key] = sub
23,996✔
2971
                // Now place into the account sl.
23,996✔
2972
                if err := acc.sl.Insert(sub); err != nil {
23,996✔
2973
                        delete(c.subs, key)
×
2974
                        c.mu.Unlock()
×
2975
                        c.Errorf("Could not insert subscription: %v", err)
×
2976
                        c.sendErr("Invalid Subscription")
×
2977
                        return nil
×
2978
                }
×
2979
        } else if sub.queue != nil {
27✔
2980
                // For a queue we need to update the weight.
13✔
2981
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
13✔
2982
                atomic.StoreInt32(&osub.qw, sub.qw)
13✔
2983
                acc.sl.UpdateRemoteQSub(osub)
13✔
2984
        }
13✔
2985
        spoke := c.isSpokeLeafNode()
24,010✔
2986
        c.mu.Unlock()
24,010✔
2987

24,010✔
2988
        // Only add in shadow subs if a new sub or qsub.
24,010✔
2989
        if osub == nil {
48,006✔
2990
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
23,996✔
2991
                        c.Errorf(err.Error())
×
2992
                }
×
2993
        }
2994

2995
        // If we are not solicited, treat leaf node subscriptions similar to a
2996
        // client subscription, meaning we forward them to routes, gateways and
2997
        // other leaf nodes as needed.
2998
        if !spoke {
32,293✔
2999
                // If we are routing add to the route map for the associated account.
8,283✔
3000
                srv.updateRouteSubscriptionMap(acc, sub, delta)
8,283✔
3001
                if srv.gateway.enabled {
8,869✔
3002
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
586✔
3003
                }
586✔
3004
        }
3005
        // Now check on leafnode updates for other leaf nodes. We understand solicited
3006
        // and non-solicited state in this call so we will do the right thing.
3007
        acc.updateLeafNodes(sub, delta)
24,010✔
3008

24,010✔
3009
        return nil
24,010✔
3010
}
3011

3012
// If the leafnode is a solicited, set the connect delay based on default
3013
// or private option (for tests). Sends the error to the other side, log and
3014
// close the connection.
3015
func (c *client) handleLeafNodeLoop(sendErr bool) {
×
3016
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
×
3017
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
×
3018
        if sendErr {
×
3019
                c.sendErr(errTxt)
×
3020
        }
×
3021

3022
        c.Errorf(errTxt)
×
3023
        // If we are here with "sendErr" false, it means that this is the server
×
3024
        // that received the error. The other side will have closed the connection,
×
3025
        // but does not hurt to close here too.
×
3026
        c.closeConnection(ProtocolViolation)
×
3027
}
3028

3029
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
3030
func (c *client) processLeafUnsub(arg []byte) error {
2,293✔
3031
        // Indicate any activity, so pub and sub or unsubs.
2,293✔
3032
        c.in.subs++
2,293✔
3033

2,293✔
3034
        srv := c.srv
2,293✔
3035

2,293✔
3036
        c.mu.Lock()
2,293✔
3037
        if c.isClosed() {
2,345✔
3038
                c.mu.Unlock()
52✔
3039
                return nil
52✔
3040
        }
52✔
3041

3042
        acc := c.acc
2,241✔
3043
        // Guard against LS- arriving before CONNECT has been processed.
2,241✔
3044
        if acc == nil {
2,241✔
3045
                c.mu.Unlock()
×
3046
                c.sendErr("Authorization Violation")
×
3047
                c.closeConnection(ProtocolViolation)
×
3048
                return nil
×
3049
        }
×
3050

3051
        spoke := c.isSpokeLeafNode()
2,241✔
3052
        // We store local subs by account and subject and optionally queue name.
2,241✔
3053
        // LS- will have the arg exactly as the key.
2,241✔
3054
        sub, ok := c.subs[string(arg)]
2,241✔
3055
        if !ok {
2,241✔
3056
                // If not found, don't try to update routes/gws/leaf nodes.
×
3057
                c.mu.Unlock()
×
3058
                return nil
×
3059
        }
×
3060
        delta := int32(1)
2,241✔
3061
        if len(sub.queue) > 0 {
2,247✔
3062
                delta = sub.qw
6✔
3063
        }
6✔
3064
        c.mu.Unlock()
2,241✔
3065

2,241✔
3066
        c.unsubscribe(acc, sub, true, true)
2,241✔
3067
        if !spoke {
2,755✔
3068
                // If we are routing subtract from the route map for the associated account.
514✔
3069
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
514✔
3070
                // Gateways
514✔
3071
                if srv.gateway.enabled {
567✔
3072
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
53✔
3073
                }
53✔
3074
        }
3075
        // Now check on leafnode updates for other leaf nodes.
3076
        acc.updateLeafNodes(sub, -delta)
2,241✔
3077
        return nil
2,241✔
3078
}
3079

3080
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
217✔
3081
        // Unroll splitArgs to avoid runtime/heap issues
217✔
3082
        args := c.argsa[:0]
217✔
3083
        start := -1
217✔
3084
        for i, b := range arg {
12,065✔
3085
                switch b {
11,848✔
3086
                case ' ', '\t', '\r', '\n':
641✔
3087
                        if start >= 0 {
1,282✔
3088
                                args = append(args, arg[start:i])
641✔
3089
                                start = -1
641✔
3090
                        }
641✔
3091
                default:
11,207✔
3092
                        if start < 0 {
12,065✔
3093
                                start = i
858✔
3094
                        }
858✔
3095
                }
3096
        }
3097
        if start >= 0 {
434✔
3098
                args = append(args, arg[start:])
217✔
3099
        }
217✔
3100

3101
        c.pa.arg = arg
217✔
3102
        switch len(args) {
217✔
3103
        case 0, 1, 2:
×
3104
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
3105
        case 3:
15✔
3106
                c.pa.reply = nil
15✔
3107
                c.pa.queues = nil
15✔
3108
                c.pa.hdb = args[1]
15✔
3109
                c.pa.hdr = parseSize(args[1])
15✔
3110
                c.pa.szb = args[2]
15✔
3111
                c.pa.size = parseSize(args[2])
15✔
3112
        case 4:
199✔
3113
                c.pa.reply = args[1]
199✔
3114
                c.pa.queues = nil
199✔
3115
                c.pa.hdb = args[2]
199✔
3116
                c.pa.hdr = parseSize(args[2])
199✔
3117
                c.pa.szb = args[3]
199✔
3118
                c.pa.size = parseSize(args[3])
199✔
3119
        default:
3✔
3120
                // args[1] is our reply indicator. Should be + or | normally.
3✔
3121
                if len(args[1]) != 1 {
3✔
3122
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3123
                }
×
3124
                switch args[1][0] {
3✔
3125
                case '+':
2✔
3126
                        c.pa.reply = args[2]
2✔
3127
                case '|':
1✔
3128
                        c.pa.reply = nil
1✔
3129
                default:
×
3130
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3131
                }
3132
                // Grab header size.
3133
                c.pa.hdb = args[len(args)-2]
3✔
3134
                c.pa.hdr = parseSize(c.pa.hdb)
3✔
3135

3✔
3136
                // Grab size.
3✔
3137
                c.pa.szb = args[len(args)-1]
3✔
3138
                c.pa.size = parseSize(c.pa.szb)
3✔
3139

3✔
3140
                // Grab queue names.
3✔
3141
                if c.pa.reply != nil {
5✔
3142
                        c.pa.queues = args[3 : len(args)-2]
2✔
3143
                } else {
3✔
3144
                        c.pa.queues = args[2 : len(args)-2]
1✔
3145
                }
1✔
3146
        }
3147
        if c.pa.hdr < 0 {
217✔
3148
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
3149
        }
×
3150
        if c.pa.size < 0 {
217✔
3151
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
3152
        }
×
3153
        if c.pa.hdr > c.pa.size {
217✔
3154
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3155
        }
×
3156
        maxPayload := atomic.LoadInt32(&c.mpay)
217✔
3157
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
217✔
3158
                c.maxPayloadViolation(c.pa.size, maxPayload)
×
3159
                return ErrMaxPayload
×
3160
        }
×
3161

3162
        // Common ones processed after check for arg length
3163
        c.pa.subject = args[0]
217✔
3164

217✔
3165
        return nil
217✔
3166
}
3167

3168
func (c *client) processLeafMsgArgs(arg []byte) error {
12,005✔
3169
        // Unroll splitArgs to avoid runtime/heap issues
12,005✔
3170
        args := c.argsa[:0]
12,005✔
3171
        start := -1
12,005✔
3172
        for i, b := range arg {
815,917✔
3173
                switch b {
803,912✔
3174
                case ' ', '\t', '\r', '\n':
18,476✔
3175
                        if start >= 0 {
36,952✔
3176
                                args = append(args, arg[start:i])
18,476✔
3177
                                start = -1
18,476✔
3178
                        }
18,476✔
3179
                default:
785,436✔
3180
                        if start < 0 {
815,917✔
3181
                                start = i
30,481✔
3182
                        }
30,481✔
3183
                }
3184
        }
3185
        if start >= 0 {
24,010✔
3186
                args = append(args, arg[start:])
12,005✔
3187
        }
12,005✔
3188

3189
        c.pa.arg = arg
12,005✔
3190
        switch len(args) {
12,005✔
3191
        case 0, 1:
×
3192
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3193
        case 2:
5,551✔
3194
                c.pa.reply = nil
5,551✔
3195
                c.pa.queues = nil
5,551✔
3196
                c.pa.szb = args[1]
5,551✔
3197
                c.pa.size = parseSize(args[1])
5,551✔
3198
        case 3:
6,445✔
3199
                c.pa.reply = args[1]
6,445✔
3200
                c.pa.queues = nil
6,445✔
3201
                c.pa.szb = args[2]
6,445✔
3202
                c.pa.size = parseSize(args[2])
6,445✔
3203
        default:
9✔
3204
                // args[1] is our reply indicator. Should be + or | normally.
9✔
3205
                if len(args[1]) != 1 {
9✔
3206
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3207
                }
×
3208
                switch args[1][0] {
9✔
3209
                case '+':
8✔
3210
                        c.pa.reply = args[2]
8✔
3211
                case '|':
1✔
3212
                        c.pa.reply = nil
1✔
3213
                default:
×
3214
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3215
                }
3216
                // Grab size.
3217
                c.pa.szb = args[len(args)-1]
9✔
3218
                c.pa.size = parseSize(c.pa.szb)
9✔
3219

9✔
3220
                // Grab queue names.
9✔
3221
                if c.pa.reply != nil {
17✔
3222
                        c.pa.queues = args[3 : len(args)-1]
8✔
3223
                } else {
9✔
3224
                        c.pa.queues = args[2 : len(args)-1]
1✔
3225
                }
1✔
3226
        }
3227
        if c.pa.size < 0 {
12,005✔
3228
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3229
        }
×
3230
        maxPayload := atomic.LoadInt32(&c.mpay)
12,005✔
3231
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
12,005✔
3232
                c.maxPayloadViolation(c.pa.size, maxPayload)
×
3233
                return ErrMaxPayload
×
3234
        }
×
3235

3236
        // Common ones processed after check for arg length
3237
        c.pa.subject = args[0]
12,005✔
3238

12,005✔
3239
        return nil
12,005✔
3240
}
3241

3242
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3243
func (c *client) processInboundLeafMsg(msg []byte) {
11,223✔
3244
        // Update statistics
11,223✔
3245
        // The msg includes the CR_LF, so pull back out for accounting.
11,223✔
3246
        c.in.msgs++
11,223✔
3247
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
11,223✔
3248

11,223✔
3249
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
11,223✔
3250

11,223✔
3251
        // Mostly under testing scenarios.
11,223✔
3252
        if srv == nil || acc == nil {
11,223✔
3253
                return
×
3254
        }
×
3255

3256
        // Check that leaf messages respect the subject permissions.
3257
        if c.perms != nil && !c.leafMsgAllowed() {
11,223✔
3258
                c.leafPubPermViolation(c.pa.subject)
×
3259
                return
×
3260
        }
×
3261

3262
        // Match the subscriptions. We will use our own L1 map if
3263
        // it's still valid, avoiding contention on the shared sublist.
3264
        var r *SublistResult
11,223✔
3265
        var ok bool
11,223✔
3266

11,223✔
3267
        genid := atomic.LoadUint64(&c.acc.sl.genid)
11,223✔
3268
        if genid == c.in.genid && c.in.results != nil {
20,891✔
3269
                r, ok = c.in.results[subject]
9,668✔
3270
        } else {
11,223✔
3271
                // Reset our L1 completely.
1,555✔
3272
                c.in.results = make(map[string]*SublistResult)
1,555✔
3273
                c.in.genid = genid
1,555✔
3274
        }
1,555✔
3275

3276
        // Go back to the sublist data structure.
3277
        if !ok {
14,886✔
3278
                r = c.acc.sl.Match(subject)
3,663✔
3279
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
3,663✔
3280
                if len(c.in.results) >= maxResultCacheSize {
3,663✔
3281
                        n := 0
×
3282
                        for subj := range c.in.results {
×
3283
                                delete(c.in.results, subj)
×
3284
                                if n++; n > pruneSize {
×
3285
                                        break
×
3286
                                }
3287
                        }
3288
                }
3289
                // Then add the new cache entry.
3290
                c.in.results[subject] = r
3,663✔
3291
        }
3292

3293
        // Collect queue names if needed.
3294
        var qnames [][]byte
11,223✔
3295

11,223✔
3296
        // Check for no interest, short circuit if so.
11,223✔
3297
        // This is the fanout scale.
11,223✔
3298
        if len(r.psubs)+len(r.qsubs) > 0 {
22,180✔
3299
                flag := pmrNoFlag
10,957✔
3300
                // If we have queue subs in this cluster, then if we run in gateway
10,957✔
3301
                // mode and the remote gateways have queue subs, then we need to
10,957✔
3302
                // collect the queue groups this message was sent to so that we
10,957✔
3303
                // exclude them when sending to gateways.
10,957✔
3304
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
10,957✔
3305
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
10,959✔
3306
                        flag |= pmrCollectQueueNames
2✔
3307
                }
2✔
3308
                // If this is a mapped subject that means the mapped interest
3309
                // is what got us here, but this might not have a queue designation
3310
                // If that is the case, make sure we ignore to process local queue subscribers.
3311
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
11,189✔
3312
                        flag |= pmrIgnoreEmptyQueueFilter
232✔
3313
                }
232✔
3314
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
10,957✔
3315
        }
3316

3317
        // Now deal with gateways
3318
        if c.srv.gateway.enabled {
12,036✔
3319
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
813✔
3320
        }
813✔
3321
}
3322

3323
// Checks whether the inbound leaf message is allowed by the
3324
// connection's permissions. On the hub side this enforces what
3325
// the remote leaf may publish. On the spoke side this enforces
3326
// import restrictions such as deny_imports.
3327
func (c *client) leafMsgAllowed() bool {
7,198✔
3328
        wireSubject := c.pa.subject
7,198✔
3329
        if len(c.pa.mapped) > 0 {
7,430✔
3330
                // Mappings rewrite c.pa.subject to the internal
232✔
3331
                // destination. For leaf ACLs, need to check
232✔
3332
                // the original wire subject from the remote side.
232✔
3333
                wireSubject = c.pa.mapped
232✔
3334
        }
232✔
3335
        // Strip any gateway routing prefix for the permission check.
3336
        subjectToCheck, isGW := getGWRoutedSubjectOrSelf(wireSubject)
7,198✔
3337

7,198✔
3338
        // Service-import replies (_R_), JS ack subjects ($JS.ACK.)
7,198✔
3339
        // are internal routing subjects forwarded via LS+ without
7,198✔
3340
        // permission checks.
7,198✔
3341
        if isServiceReply(subjectToCheck) || isJSAckSubject(subjectToCheck) {
7,228✔
3342
                return true
30✔
3343
        }
30✔
3344

3345
        c.mu.RLock()
7,168✔
3346
        if c.isSpokeLeafNode() {
11,101✔
3347
                // Gateway routed replies are forwarded without
3,933✔
3348
                // permission checks.
3,933✔
3349
                if isGW || c.leafReceiveAllowed(subjectToCheck) {
7,866✔
3350
                        c.mu.RUnlock()
3,933✔
3351
                        return true
3,933✔
3352
                }
3,933✔
3353
        } else if c.leafSendAllowed(subjectToCheck) {
6,470✔
3354
                c.mu.RUnlock()
3,235✔
3355
                return true
3,235✔
3356
        }
3,235✔
3357

3358
        // If allow_responses is not configured, or there is no tracked reply for
3359
        // this subject, the answer is "denied" and we can return it while still
3360
        // holding only the read lock.
3361
        replySubject := bytesToString(wireSubject)
×
3362
        if c.perms == nil || c.perms.resp == nil || c.replies[replySubject] == nil {
×
3363
                c.mu.RUnlock()
×
3364
                return false
×
3365
        }
×
3366
        c.mu.RUnlock()
×
3367

×
3368
        // Check tracked reply permissions (allow_responses).
×
3369
        // Use the pre-strip subject since deliverMsg tracks
×
3370
        // replies under the original form, which includes
×
3371
        // the GW routing prefix for routed requests.
×
3372
        c.mu.Lock()
×
3373
        defer c.mu.Unlock()
×
3374
        return c.responseAllowed(replySubject)
×
3375
}
3376

3377
// Returns true if the leaf side ACLs allow importing this subject,
3378
// based on the permissions received over INFO and any local deny_imports.
3379
// At least a read lock must be held.
3380
func (c *client) leafReceiveAllowed(subject []byte) bool {
3,933✔
3381
        return c.canSubscribeInternal(bytesToString(subject))
3,933✔
3382
}
3,933✔
3383

3384
// Returns true if the hub side ACLs allow the remote leaf to send
3385
// this subject.
3386
// At least a read lock must be held.
3387
func (c *client) leafSendAllowed(bsubject []byte) bool {
3,235✔
3388
        // Use the original export ACL captured for this accepted leaf.
3,235✔
3389
        // The live perms also contain additional JetStream denies used by
3,235✔
3390
        // the normal forwarding path, and applying them here would reject
3,235✔
3391
        // legitimate inbound JS API requests.
3,235✔
3392
        subject := bytesToString(bsubject)
3,235✔
3393
        perms := c.opts.Export
3,235✔
3394
        if perms == nil || (perms.Allow == nil && perms.Deny == nil) {
6,467✔
3395
                return true
3,232✔
3396
        }
3,232✔
3397

3398
        allowed := true
3✔
3399
        if perms.Allow != nil && !strings.HasPrefix(subject, mqttPrefix) {
5✔
3400
                allowed = false
2✔
3401
                for _, allowSubj := range perms.Allow {
5✔
3402
                        if matchLiteral(subject, allowSubj) {
5✔
3403
                                allowed = true
2✔
3404
                                break
2✔
3405
                        }
3406
                }
3407
        }
3408

3409
        if allowed && len(perms.Deny) > 0 {
4✔
3410
                for _, denySubj := range perms.Deny {
2✔
3411
                        if matchLiteral(subject, denySubj) {
1✔
3412
                                allowed = false
×
3413
                                break
×
3414
                        }
3415
                }
3416
        }
3417
        return allowed
3✔
3418
}
3419

3420
// Handles a subscription permission violation.
3421
// See leafPermViolation() for details.
3422
func (c *client) leafSubPermViolation(subj []byte) {
5✔
3423
        c.leafPermViolation(false, subj)
5✔
3424
}
5✔
3425

3426
// Handles a publish permission violation.
3427
// See leafPermViolation() for details.
3428
func (c *client) leafPubPermViolation(subj []byte) {
×
3429
        c.leafPermViolation(true, subj)
×
3430
}
×
3431

3432
// Common function to process publish or subscribe leafnode permission violation.
3433
// Sends the permission violation error to the remote, logs it and closes the connection.
3434
// If this is from a server soliciting, the reconnection will be delayed.
3435
func (c *client) leafPermViolation(pub bool, subj []byte) {
5✔
3436
        if c.isSpokeLeafNode() {
10✔
3437
                // For spokes these are no-ops since the hub server told us our permissions.
5✔
3438
                // We just need to not send these over to the other side since we will get cutoff.
5✔
3439
                return
5✔
3440
        }
5✔
3441
        // FIXME(dlc) ?
3442
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3443
        var action string
×
3444
        if pub {
×
3445
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3446
                action = "Publish"
×
3447
        } else {
×
3448
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3449
                action = "Subscription"
×
3450
        }
×
3451
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3452
        // TODO: add a new close reason that is more appropriate?
×
3453
        c.closeConnection(ProtocolViolation)
×
3454
}
3455

3456
// Invoked from generic processErr() for LEAF connections.
3457
func (c *client) leafProcessErr(errStr string) {
34✔
3458
        // Check if we got a cluster name collision.
34✔
3459
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
37✔
3460
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3461
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3462
                return
3✔
3463
        }
3✔
3464
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
31✔
3465
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
×
3466
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
×
3467
                return
×
3468
        }
×
3469

3470
        // We will look for Loop detected error coming from the other side.
3471
        // If we solicit, set the connect delay.
3472
        if !strings.Contains(errStr, "Loop detected") {
62✔
3473
                return
31✔
3474
        }
31✔
3475
        c.handleLeafNodeLoop(false)
×
3476
}
3477

3478
// If this leaf connection solicits, sets the connect delay to the given value,
3479
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3480
// Returns the connection's account name and delay.
3481
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
3✔
3482
        c.mu.Lock()
3✔
3483
        if c.isSolicitedLeafNode() {
6✔
3484
                if s := c.srv; s != nil {
6✔
3485
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
3✔
3486
                                delay = srvdelay
×
3487
                        }
×
3488
                }
3489
                c.leaf.remote.setConnectDelay(delay)
3✔
3490
        }
3491
        var accName string
3✔
3492
        if c.acc != nil {
6✔
3493
                accName = c.acc.Name
3✔
3494
        }
3✔
3495
        c.mu.Unlock()
3✔
3496
        return accName, delay
3✔
3497
}
3498

3499
// For the given remote Leafnode configuration, this function returns
3500
// if TLS is required, and if so, will return a clone of the TLS Config
3501
// (since some fields will be changed during handshake), the TLS server
3502
// name that is remembered, and the TLS timeout.
3503
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
980✔
3504
        var (
980✔
3505
                tlsConfig  *tls.Config
980✔
3506
                tlsName    string
980✔
3507
                tlsTimeout float64
980✔
3508
        )
980✔
3509

980✔
3510
        remote.RLock()
980✔
3511
        defer remote.RUnlock()
980✔
3512

980✔
3513
        tlsRequired := remote.TLS || remote.TLSConfig != nil
980✔
3514
        if tlsRequired {
1,004✔
3515
                if remote.TLSConfig != nil {
48✔
3516
                        tlsConfig = remote.TLSConfig.Clone()
24✔
3517
                } else {
24✔
3518
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
×
3519
                }
×
3520
                tlsName = remote.tlsName
24✔
3521
                tlsTimeout = remote.TLSTimeout
24✔
3522
                if tlsTimeout == 0 {
27✔
3523
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
3✔
3524
                }
3✔
3525
        }
3526

3527
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
980✔
3528
}
3529

3530
// Initiates the LeafNode Websocket connection by:
3531
// - doing the TLS handshake if needed
3532
// - sending the HTTP request
3533
// - waiting for the HTTP response
3534
//
3535
// Since some bufio reader is used to consume the HTTP response, this function
3536
// returns the slice of buffered bytes (if any) so that the readLoop that will
3537
// be started after that consume those first before reading from the socket.
3538
// The boolean
3539
//
3540
// Lock held on entry.
3541
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
×
3542
        remote.RLock()
×
3543
        compress := remote.Websocket.Compression
×
3544
        // By default the server will mask outbound frames, but it can be disabled with this option.
×
3545
        noMasking := remote.Websocket.NoMasking
×
3546
        infoTimeout := remote.FirstInfoTimeout
×
3547
        remote.RUnlock()
×
3548
        // Will do the client-side TLS handshake if needed.
×
3549
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
×
3550
        if err != nil {
×
3551
                // 0 will indicate that the connection was already closed
×
3552
                return nil, 0, err
×
3553
        }
×
3554

3555
        // For http request, we need the passed URL to contain either http or https scheme.
3556
        scheme := "http"
×
3557
        if tlsRequired {
×
3558
                scheme = "https"
×
3559
        }
×
3560
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3561
        // create a LEAF connection, not a CLIENT.
3562
        // In case we use the user's URL path in the future, make sure we append the user's
3563
        // path to our `/leafnode` path.
3564
        lpath := leafNodeWSPath
×
3565
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
×
3566
                if curPath[0] == '/' {
×
3567
                        curPath = curPath[1:]
×
3568
                }
×
3569
                lpath = path.Join(curPath, lpath)
×
3570
        } else {
×
3571
                lpath = lpath[1:]
×
3572
        }
×
3573
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
×
3574
        u, _ := url.Parse(ustr)
×
3575
        req := &http.Request{
×
3576
                Method:     "GET",
×
3577
                URL:        u,
×
3578
                Proto:      "HTTP/1.1",
×
3579
                ProtoMajor: 1,
×
3580
                ProtoMinor: 1,
×
3581
                Header:     make(http.Header),
×
3582
                Host:       u.Host,
×
3583
        }
×
3584
        wsKey, err := wsMakeChallengeKey()
×
3585
        if err != nil {
×
3586
                return nil, WriteError, err
×
3587
        }
×
3588

3589
        req.Header["Upgrade"] = []string{"websocket"}
×
3590
        req.Header["Connection"] = []string{"Upgrade"}
×
3591
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
×
3592
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
×
3593
        if compress {
×
3594
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
×
3595
        }
×
3596
        if noMasking {
×
3597
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
×
3598
        }
×
3599
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
×
3600
        if err := req.Write(c.nc); err != nil {
×
3601
                return nil, WriteError, err
×
3602
        }
×
3603

3604
        var resp *http.Response
×
3605

×
3606
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
×
3607
        resp, err = http.ReadResponse(br, req)
×
3608
        if err == nil &&
×
3609
                (resp.StatusCode != 101 ||
×
3610
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
×
3611
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
×
3612
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
×
3613

×
3614
                err = fmt.Errorf("invalid websocket connection")
×
3615
        }
×
3616
        // Check compression extension...
3617
        if err == nil && c.ws.compress {
×
3618
                // Check that not only permessage-deflate extension is present, but that
×
3619
                // we also have server and client no context take over.
×
3620
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
×
3621

×
3622
                // If server does not support compression, then simply disable it in our side.
×
3623
                if !srvCompress {
×
3624
                        c.ws.compress = false
×
3625
                } else if !noCtxTakeover {
×
3626
                        err = fmt.Errorf("compression negotiation error")
×
3627
                }
×
3628
        }
3629
        // Same for no masking...
3630
        if err == nil && noMasking {
×
3631
                // Check if server accepts no masking
×
3632
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
×
3633
                        // Nope, need to mask our writes as any client would do.
×
3634
                        c.ws.maskwrite = true
×
3635
                }
×
3636
        }
3637
        if resp != nil {
×
3638
                resp.Body.Close()
×
3639
        }
×
3640
        if err != nil {
×
3641
                return nil, ReadError, err
×
3642
        }
×
3643
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
×
3644

×
3645
        var preBuf []byte
×
3646
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
×
3647
        if n := br.Buffered(); n != 0 {
×
3648
                preBuf, _ = br.Peek(n)
×
3649
        }
×
3650
        return preBuf, 0, nil
×
3651
}
3652

3653
const connectProcessTimeout = 2 * time.Second
3654

3655
// This is invoked for remote LEAF remote connections after processing the INFO
3656
// protocol.
3657
func (s *Server) leafNodeResumeConnectProcess(c *client) {
367✔
3658
        clusterName := s.ClusterName()
367✔
3659

367✔
3660
        c.mu.Lock()
367✔
3661
        if c.isClosed() {
367✔
3662
                c.mu.Unlock()
×
3663
                return
×
3664
        }
×
3665
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
367✔
3666
                c.mu.Unlock()
×
3667
                c.closeConnection(WriteError)
×
3668
                return
×
3669
        }
×
3670

3671
        // Spin up the write loop.
3672
        s.startGoRoutine(func() { c.writeLoop() })
734✔
3673

3674
        // timeout leafNodeFinishConnectProcess
3675
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
367✔
3676
                c.mu.Lock()
×
3677
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3678
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3679
                        c.mu.Unlock()
×
3680
                        return
×
3681
                }
×
3682
                clearTimer(&c.ping.tmr)
×
3683
                closed := c.isClosed()
×
3684
                c.mu.Unlock()
×
3685
                if !closed {
×
3686
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3687
                        c.closeConnection(StaleConnection)
×
3688
                }
×
3689
        })
3690
        c.mu.Unlock()
367✔
3691
        c.Debugf("Remote leafnode connect msg sent")
367✔
3692
}
3693

3694
// This is invoked for remote LEAF connections after processing the INFO
3695
// protocol and leafNodeResumeConnectProcess.
3696
// This will send LS+ the CONNECT protocol and register the leaf node.
3697
func (s *Server) leafNodeFinishConnectProcess(c *client) {
337✔
3698
        c.mu.Lock()
337✔
3699
        if !c.flags.setIfNotSet(connectProcessFinished) {
337✔
3700
                c.mu.Unlock()
×
3701
                return
×
3702
        }
×
3703
        if c.isClosed() {
337✔
3704
                c.mu.Unlock()
×
3705
                s.removeLeafNodeConnection(c)
×
3706
                return
×
3707
        }
×
3708
        remote := c.leaf.remote
337✔
3709
        if remote == nil || c.acc == nil {
337✔
3710
                c.mu.Unlock()
×
3711
                c.sendErr("Authorization Violation")
×
3712
                c.closeConnection(ProtocolViolation)
×
3713
                return
×
3714
        }
×
3715
        // Check if we will need to send the system connect event.
3716
        remote.RLock()
337✔
3717
        sendSysConnectEvent := remote.Hub
337✔
3718
        remote.RUnlock()
337✔
3719

337✔
3720
        // Capture account before releasing lock
337✔
3721
        acc := c.acc
337✔
3722
        // cancel connectProcessTimeout
337✔
3723
        clearTimer(&c.ping.tmr)
337✔
3724
        c.mu.Unlock()
337✔
3725

337✔
3726
        // Make sure we register with the account here.
337✔
3727
        if err := c.registerWithAccount(acc); err != nil {
337✔
3728
                if err == ErrTooManyAccountConnections {
×
3729
                        c.maxAccountConnExceeded()
×
3730
                        return
×
3731
                } else if err == ErrLeafNodeLoop {
×
3732
                        c.handleLeafNodeLoop(true)
×
3733
                        return
×
3734
                }
×
3735
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3736
                c.closeConnection(ProtocolViolation)
×
3737
                return
×
3738
        }
3739
        if !s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false) {
337✔
3740
                // Was not added, could be because the remote configuration has been removed.
×
3741
                c.closeConnection(ClientClosed)
×
3742
                return
×
3743
        }
×
3744
        s.initLeafNodeSmapAndSendSubs(c)
337✔
3745
        if sendSysConnectEvent {
343✔
3746
                s.sendLeafNodeConnect(acc)
6✔
3747
        }
6✔
3748
        s.accountConnectEvent(c)
337✔
3749

337✔
3750
        // The above functions are not running under the client lock, so it is
337✔
3751
        // possible that between the time we have started the read/write loops
337✔
3752
        // and now, that the connection was closed. This would leave the closed
337✔
3753
        // LN connection possibly registered with the account and/or the server's
337✔
3754
        // leafs map. So check if connection is closed, and if so, manually cleanup.
337✔
3755
        c.mu.Lock()
337✔
3756
        closed := c.isClosed()
337✔
3757
        if !closed {
674✔
3758
                c.setFirstPingTimer()
337✔
3759
        }
337✔
3760
        c.mu.Unlock()
337✔
3761
        if closed {
337✔
3762
                s.removeLeafNodeConnection(c)
×
3763
                if prev := acc.removeClient(c); prev == 1 {
×
3764
                        s.decActiveAccounts()
×
3765
                }
×
3766
        }
3767
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc