• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24923490937

24 Apr 2026 08:34AM UTC coverage: 83.05% (+0.1%) from 82.911%
24923490937

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

76913 of 92610 relevant lines covered (83.05%)

481935.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.07
/server/leafnode.go
1
// Copyright 2019-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "regexp"
31
        "runtime"
32
        "strconv"
33
        "strings"
34
        "sync"
35
        "sync/atomic"
36
        "time"
37

38
        "github.com/klauspost/compress/s2"
39
        "github.com/nats-io/jwt/v2"
40
        "github.com/nats-io/nkeys"
41
        "github.com/nats-io/nuid"
42
)
43

44
const (
45
        // Warning when user configures leafnode TLS insecure
46
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
47

48
        // When a loop is detected, delay the reconnect of solicited connection.
49
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
50

51
        // When a server receives a message causing a permission violation, the
52
        // connection is closed and it won't attempt to reconnect for that long.
53
        leafNodeReconnectAfterPermViolation = 30 * time.Second
54

55
        // When we have the same cluster name as the hub.
56
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
57

58
        // Prefix for loop detection subject
59
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
60

61
        // Path added to URL to indicate to WS server that the connection is a
62
        // LEAF connection as opposed to a CLIENT.
63
        leafNodeWSPath = "/leafnode"
64

65
        // When a soliciting leafnode is rejected because it does not meet the
66
        // configured minimum version, delay the next reconnect attempt by this long.
67
        leafNodeMinVersionReconnectDelay = 5 * time.Second
68
)
69

70
type leaf struct {
71
        // We have any auth stuff here for solicited connections.
72
        remote *leafNodeCfg
73
        // isSpoke tells us what role we are playing.
74
        // Used when we receive a connection but otherside tells us they are a hub.
75
        isSpoke bool
76
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
77
        remoteCluster string
78
        // remoteServer holds onto the remote server's name or ID.
79
        remoteServer string
80
        // domain name of remote server
81
        remoteDomain string
82
        // account name of remote server
83
        remoteAccName string
84
        // Whether or not we want to propagate east-west interest from other LNs.
85
        isolated bool
86
        // Used to suppress sub and unsub interest. Same as routes but our audience
87
        // here is tied to this leaf node. This will hold all subscriptions except this
88
        // leaf nodes. This represents all the interest we want to send to the other side.
89
        smap map[string]int32
90
        // This map will contain all the subscriptions that have been added to the smap
91
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
92
        // race between processing of a sub where sub is added to account sublist but
93
        // updateSmap has not be called on that "thread", while in the LN readloop,
94
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
95
        // this subscription to smap. When processing of the sub then calls updateSmap,
96
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
97
        tsub  map[*subscription]struct{}
98
        tsubt *time.Timer
99
        // Selected compression mode, which may be different from the server configured mode.
100
        compression string
101
        // This is for GW map replies.
102
        gwSub *subscription
103
}
104

105
// Used for remote (solicited) leafnodes.
106
type leafNodeCfg struct {
107
        sync.RWMutex
108
        *RemoteLeafOpts
109
        urls           []*url.URL
110
        curURL         *url.URL
111
        tlsName        string
112
        username       string
113
        password       string
114
        perms          *Permissions
115
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
116
        jsMigrateTimer *time.Timer
117
        quitCh         chan struct{}
118
        removed        bool
119
        connInProgress bool
120
}
121

122
// Check to see if this is a solicited leafnode. We do special processing for solicited.
123
func (c *client) isSolicitedLeafNode() bool {
2,196✔
124
        return c.kind == LEAF && c.leaf.remote != nil
2,196✔
125
}
2,196✔
126

127
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
128
// connection leafnode where the otherside has declared itself to be the hub.
129
func (c *client) isSpokeLeafNode() bool {
10,662,065✔
130
        return c.kind == LEAF && c.leaf.isSpoke
10,662,065✔
131
}
10,662,065✔
132

133
func (c *client) isHubLeafNode() bool {
18,129✔
134
        return c.kind == LEAF && !c.leaf.isSpoke
18,129✔
135
}
18,129✔
136

137
func (c *client) isIsolatedLeafNode() bool {
11,506✔
138
        // TODO(nat): In future we may want to pass in and consider an isolation
11,506✔
139
        // group name here, which the hub and/or leaf could provide, so that we
11,506✔
140
        // can isolate away certain LNs but not others on an opt-in basis. For
11,506✔
141
        // now we will just isolate all LN interest until then.
11,506✔
142
        return c.kind == LEAF && c.leaf.isolated
11,506✔
143
}
11,506✔
144

145
// This will spin up go routines to solicit the remote leaf node connections.
146
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,270✔
147
        sysAccName := _EMPTY_
1,270✔
148
        sAcc := s.SystemAccount()
1,270✔
149
        if sAcc != nil {
2,517✔
150
                sysAccName = sAcc.Name
1,247✔
151
        }
1,247✔
152
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,688✔
153
                s.mu.Lock()
1,418✔
154
                remote := newLeafNodeCfg(r)
1,418✔
155
                creds := remote.Credentials
1,418✔
156
                accName := remote.LocalAccount
1,418✔
157
                if s.leafRemoteCfgs == nil {
2,687✔
158
                        s.leafRemoteCfgs = make(map[*leafNodeCfg]struct{})
1,269✔
159
                }
1,269✔
160
                s.leafRemoteCfgs[remote] = struct{}{}
1,418✔
161
                // Print notice if
1,418✔
162
                if isSysAccRemote {
1,514✔
163
                        if len(remote.DenyExports) > 0 {
97✔
164
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
165
                        }
1✔
166
                        if len(remote.DenyImports) > 0 {
97✔
167
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
168
                        }
1✔
169
                }
170
                s.mu.Unlock()
1,418✔
171
                if creds != _EMPTY_ {
1,470✔
172
                        contents, err := os.ReadFile(creds)
52✔
173
                        defer wipeSlice(contents)
52✔
174
                        if err != nil {
52✔
175
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
176
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
177
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
178
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
179
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
180
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
181
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
182
                        } else if isSysAccRemote {
56✔
183
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
184
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
185
                                }
1✔
186
                        } else {
48✔
187
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
188
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
189
                                }
6✔
190
                        }
191
                }
192
                return remote
1,418✔
193
        }
194
        for _, r := range remotes {
2,688✔
195
                // We need to call this, even if the leaf is disabled. This is so that
1,418✔
196
                // the number of internal configuration matches the options' remote leaf
1,418✔
197
                // configuration required for configuration reload.
1,418✔
198
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,418✔
199
                if !r.Disabled {
2,835✔
200
                        s.connectToRemoteLeafNodeAsynchronously(remote, true)
1,417✔
201
                }
1,417✔
202
        }
203
}
204

205
// Ensure that leafnode is properly configured.
206
func validateLeafNode(o *Options) error {
8,335✔
207
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,337✔
208
                return err
2✔
209
        }
2✔
210

211
        if len(o.LeafNode.Remotes) > 0 {
9,645✔
212
                names := make(map[string]struct{})
1,312✔
213
                // Check for duplicate remotes, also, users can bind to any local account,
1,312✔
214
                // if its empty we will assume the $G account.
1,312✔
215
                for _, r := range o.LeafNode.Remotes {
2,783✔
216
                        if r.LocalAccount == _EMPTY_ {
1,916✔
217
                                r.LocalAccount = globalAccountName
445✔
218
                        }
445✔
219
                        rn := r.name()
1,471✔
220
                        if _, dup := names[rn]; dup {
1,474✔
221
                                return fmt.Errorf("duplicate remote %s", r.safeName())
3✔
222
                        }
3✔
223
                        names[rn] = struct{}{}
1,468✔
224
                }
225
        }
226

227
        // In local config mode, check that leafnode configuration refers to accounts that exist.
228
        if len(o.TrustedOperators) == 0 {
16,339✔
229
                accNames := map[string]struct{}{}
8,009✔
230
                for _, a := range o.Accounts {
16,903✔
231
                        accNames[a.Name] = struct{}{}
8,894✔
232
                }
8,894✔
233
                // global account is always created
234
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,009✔
235
                // in the context of leaf nodes, empty account means global account
8,009✔
236
                accNames[_EMPTY_] = struct{}{}
8,009✔
237
                // system account either exists or, if not disabled, will be created
8,009✔
238
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,451✔
239
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,442✔
240
                }
6,442✔
241
                checkAccountExists := func(accName string, cfgType string) error {
17,486✔
242
                        if _, ok := accNames[accName]; !ok {
9,479✔
243
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
244
                        }
2✔
245
                        return nil
9,475✔
246
                }
247
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,010✔
248
                        return err
1✔
249
                }
1✔
250
                for _, lu := range o.LeafNode.Users {
8,025✔
251
                        if lu.Account == nil { // means global account
27✔
252
                                continue
10✔
253
                        }
254
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
255
                                return err
×
256
                        }
×
257
                }
258
                for _, r := range o.LeafNode.Remotes {
9,469✔
259
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,462✔
260
                                return err
1✔
261
                        }
1✔
262
                }
263
        } else {
321✔
264
                if len(o.LeafNode.Users) != 0 {
322✔
265
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
266
                }
1✔
267
                for _, r := range o.LeafNode.Remotes {
321✔
268
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
269
                                return fmt.Errorf(
1✔
270
                                        "operator mode requires account nkeys in remotes. " +
1✔
271
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
272
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
273
                        }
1✔
274
                }
275
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
320✔
276
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
277
                }
1✔
278
        }
279

280
        // Validate compression settings
281
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,932✔
282
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,612✔
283
                        return err
5✔
284
                }
5✔
285
        }
286

287
        // If a remote has a websocket scheme, all need to have it.
288
        for _, rcfg := range o.LeafNode.Remotes {
9,780✔
289
                // Validate proxy configuration
1,460✔
290
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,466✔
291
                        return err
6✔
292
                }
6✔
293

294
                if len(rcfg.URLs) >= 2 {
1,660✔
295
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
206✔
296
                        for i := 1; i < len(rcfg.URLs); i++ {
649✔
297
                                u := rcfg.URLs[i]
443✔
298
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
450✔
299
                                        ok = false
7✔
300
                                        break
7✔
301
                                }
302
                        }
303
                        if !ok {
213✔
304
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
305
                        }
7✔
306
                }
307
                // Validate compression settings
308
                if rcfg.Compression.Mode != _EMPTY_ {
2,890✔
309
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,448✔
310
                                return err
5✔
311
                        }
5✔
312
                }
313
        }
314

315
        if o.LeafNode.Port == 0 {
12,604✔
316
                return nil
4,302✔
317
        }
4,302✔
318

319
        // If MinVersion is defined, check that it is valid.
320
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
4,004✔
321
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
322
                        return err
2✔
323
                }
2✔
324
        }
325

326
        // The checks below will be done only when detecting that we are configured
327
        // with gateways. So if an option validation needs to be done regardless,
328
        // it MUST be done before this point!
329

330
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
7,311✔
331
                return nil
3,313✔
332
        }
3,313✔
333
        // If we are here we have both leaf nodes and gateways defined, make sure there
334
        // is a system account defined.
335
        if o.SystemAccount == _EMPTY_ {
686✔
336
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
337
        }
1✔
338
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
684✔
339
                return fmt.Errorf("leafnode: %v", err)
×
340
        }
×
341
        return nil
684✔
342
}
343

344
func checkLeafMinVersionConfig(mv string) error {
8✔
345
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
346
                if err != nil {
6✔
347
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
348
                } else {
4✔
349
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
350
                }
2✔
351
        }
352
        return nil
4✔
353
}
354

355
// Used to validate user names in LeafNode configuration.
356
// - rejects mix of single and multiple users.
357
// - rejects duplicate user names.
358
func validateLeafNodeAuthOptions(o *Options) error {
8,397✔
359
        if len(o.LeafNode.Users) == 0 {
16,767✔
360
                return nil
8,370✔
361
        }
8,370✔
362
        if o.LeafNode.Username != _EMPTY_ {
29✔
363
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
364
        }
2✔
365
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
366
                return fmt.Errorf("can not have a single nkey and a users array")
×
367
        }
×
368
        users := map[string]struct{}{}
25✔
369
        for _, u := range o.LeafNode.Users {
66✔
370
                if _, exists := users[u.Username]; exists {
43✔
371
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
372
                }
2✔
373
                users[u.Username] = struct{}{}
39✔
374
        }
375
        return nil
23✔
376
}
377

378
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
2,067✔
379
        var warnings []string
2,067✔
380

2,067✔
381
        if remote.Proxy.URL == _EMPTY_ {
4,108✔
382
                return warnings, nil
2,041✔
383
        }
2,041✔
384

385
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
386
        if err != nil {
27✔
387
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
388
        }
1✔
389

390
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
391
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
392
        }
2✔
393

394
        if proxyURL.Host == _EMPTY_ {
25✔
395
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
396
        }
2✔
397

398
        if remote.Proxy.Timeout < 0 {
22✔
399
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
400
        }
1✔
401

402
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
403
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
404
        }
4✔
405

406
        if len(remote.URLs) > 0 {
32✔
407
                hasWebSocketURL := false
16✔
408
                hasNonWebSocketURL := false
16✔
409

16✔
410
                for _, remoteURL := range remote.URLs {
33✔
411
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
412
                                hasWebSocketURL = true
13✔
413
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
414
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
415
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
416
                                }
1✔
417
                        } else {
4✔
418
                                hasNonWebSocketURL = true
4✔
419
                        }
4✔
420
                }
421

422
                if !hasWebSocketURL {
18✔
423
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
424
                } else if hasNonWebSocketURL {
16✔
425
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
426
                }
1✔
427
        }
428

429
        return warnings, nil
15✔
430
}
431

432
// Wait for the configured reconnect interval before attempting to connect
433
// again to the remote leafnode.
434
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
249✔
435
        clearInProgress := true
249✔
436
        defer func() {
497✔
437
                s.grWG.Done()
248✔
438
                if clearInProgress {
317✔
439
                        remote.setConnectInProgress(false)
69✔
440
                }
69✔
441
        }()
442
        delay := s.getOpts().LeafNode.ReconnectInterval
249✔
443
        select {
249✔
444
        case <-time.After(delay):
188✔
445
        case <-remote.quitCh:
×
446
                return
×
447
        case <-s.quitCh:
61✔
448
                return
61✔
449
        }
450
        clearInProgress = !connectToRemoteLeafNode(s, remote, false)
188✔
451
}
452

453
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
454
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,418✔
455
        cfg := &leafNodeCfg{
1,418✔
456
                RemoteLeafOpts: remote,
1,418✔
457
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,418✔
458
                quitCh:         make(chan struct{}, 1),
1,418✔
459
        }
1,418✔
460
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,428✔
461
                perms := &Permissions{}
10✔
462
                if len(remote.DenyExports) > 0 {
19✔
463
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
9✔
464
                }
9✔
465
                if len(remote.DenyImports) > 0 {
18✔
466
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
8✔
467
                }
8✔
468
                cfg.perms = perms
10✔
469
        }
470
        // Start with the one that is configured. We will add to this
471
        // array when receiving async leafnode INFOs.
472
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,418✔
473
        // If allowed to randomize, do it on our copy of URLs
1,418✔
474
        if !remote.NoRandomize {
2,834✔
475
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,821✔
476
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
405✔
477
                })
405✔
478
        }
479
        // If we are TLS make sure we save off a proper servername if possible.
480
        // Do same for user/password since we may need them to connect to
481
        // a bare URL that we get from INFO protocol.
482
        for _, u := range cfg.urls {
3,271✔
483
                cfg.saveTLSHostname(u)
1,853✔
484
                cfg.saveUserPassword(u)
1,853✔
485
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,853✔
486
                // config, mark that we should be using TLS anyway.
1,853✔
487
                if !cfg.TLS && isWSSURL(u) {
1,854✔
488
                        cfg.TLS = true
1✔
489
                }
1✔
490
        }
491
        return cfg
1,418✔
492
}
493

494
// Notifies the quit channel without blocking.
495
// No lock is needed to invoke this function.
496
func (cfg *leafNodeCfg) notifyQuitChannel() {
2✔
497
        select {
2✔
498
        case cfg.quitCh <- struct{}{}:
2✔
499
        default:
×
500
        }
501
}
502

503
// Sets the connect-in-progress status for this remote leaf configuration.
504
func (cfg *leafNodeCfg) setConnectInProgress(inProgress bool) {
3,740✔
505
        cfg.Lock()
3,740✔
506
        defer cfg.Unlock()
3,740✔
507
        // In both cases we want to drain the "quit" channel.
3,740✔
508
        select {
3,740✔
509
        case <-cfg.quitCh:
1✔
510
        default:
3,739✔
511
        }
512
        cfg.connInProgress = inProgress
3,740✔
513
}
514

515
// Returns `true` if this remote is in the middle of a connect, `false` otherwise.
516
func (cfg *leafNodeCfg) isConnectInProgress() bool {
×
517
        cfg.RLock()
×
518
        defer cfg.RUnlock()
×
519
        return cfg.connInProgress
×
520
}
×
521

522
// Mark that this remote is being removed from the configuration.
523
func (cfg *leafNodeCfg) markAsRemoved() {
×
524
        cfg.Lock()
×
525
        defer cfg.Unlock()
×
526
        // This function should be invoked only once, but protect.
×
527
        if cfg.removed {
×
528
                return
×
529
        }
×
530
        cfg.removed = true
×
531
        cfg.notifyQuitChannel()
×
532
}
533

534
// Returns false if it has been disabled or removed.
535
func (cfg *leafNodeCfg) stillValid() bool {
7,823✔
536
        cfg.RLock()
7,823✔
537
        defer cfg.RUnlock()
7,823✔
538
        return !cfg.Disabled && !cfg.removed
7,823✔
539
}
7,823✔
540

541
// Will pick an URL from the list of available URLs.
542
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,351✔
543
        cfg.Lock()
6,351✔
544
        defer cfg.Unlock()
6,351✔
545
        // If the current URL is the first in the list and we have more than
6,351✔
546
        // one URL, then move that one to end of the list.
6,351✔
547
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
9,189✔
548
                first := cfg.urls[0]
2,838✔
549
                copy(cfg.urls, cfg.urls[1:])
2,838✔
550
                cfg.urls[len(cfg.urls)-1] = first
2,838✔
551
        }
2,838✔
552
        cfg.curURL = cfg.urls[0]
6,351✔
553
        return cfg.curURL
6,351✔
554
}
555

556
// Returns the current URL
557
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
77✔
558
        cfg.RLock()
77✔
559
        defer cfg.RUnlock()
77✔
560
        return cfg.curURL
77✔
561
}
77✔
562

563
// Returns how long the server should wait before attempting
564
// to solicit a remote leafnode connection.
565
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,607✔
566
        cfg.RLock()
1,607✔
567
        delay := cfg.connDelay
1,607✔
568
        cfg.RUnlock()
1,607✔
569
        return delay
1,607✔
570
}
1,607✔
571

572
// Sets the connect delay.
573
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
154✔
574
        cfg.Lock()
154✔
575
        cfg.connDelay = delay
154✔
576
        cfg.Unlock()
154✔
577
}
154✔
578

579
// Ensure that non-exported options (used in tests) have
580
// been properly set.
581
func (s *Server) setLeafNodeNonExportedOptions() {
7,124✔
582
        opts := s.getOpts()
7,124✔
583
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
7,124✔
584
        if s.leafNodeOpts.dialTimeout == 0 {
14,247✔
585
                // Use same timeouts as routes for now.
7,123✔
586
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
7,123✔
587
        }
7,123✔
588
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
7,124✔
589
        if s.leafNodeOpts.resolver == nil {
14,245✔
590
                s.leafNodeOpts.resolver = net.DefaultResolver
7,121✔
591
        }
7,121✔
592
}
593

594
const sharedSysAccDelay = 250 * time.Millisecond
595

596
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
597
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
598
        proxyAddr, err := url.Parse(proxyURL)
11✔
599
        if err != nil {
11✔
600
                // This should not happen since proxy URL is validated during configuration parsing
×
601
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
602
        }
×
603

604
        // Connect to the proxy server
605
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
606
        if err != nil {
11✔
607
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
608
        }
×
609

610
        // Set deadline for the entire proxy handshake
611
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
612
                conn.Close()
×
613
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
614
        }
×
615

616
        req := &http.Request{
11✔
617
                Method: http.MethodConnect,
11✔
618
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
619
                Host:   targetHost,
11✔
620
                Header: make(http.Header),
11✔
621
        }
11✔
622

11✔
623
        // Add proxy authentication if provided
11✔
624
        if username != "" && password != "" {
13✔
625
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
626
        }
2✔
627

628
        if err := req.Write(conn); err != nil {
11✔
629
                conn.Close()
×
630
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
631
        }
×
632

633
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
634
        if err != nil {
11✔
635
                conn.Close()
×
636
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
637
        }
×
638

639
        if resp.StatusCode != http.StatusOK {
12✔
640
                resp.Body.Close()
1✔
641
                conn.Close()
1✔
642
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
643
        }
1✔
644

645
        // Close the response body
646
        resp.Body.Close()
10✔
647

10✔
648
        // Clear the deadline now that we've finished the proxy handshake
10✔
649
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
650
                conn.Close()
×
651
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
652
        }
×
653

654
        return conn, nil
10✔
655
}
656

657
// Connect to a remote leaf node asynchronously (that is, this function will do
658
// the connect in a go routine).
659
func (s *Server) connectToRemoteLeafNodeAsynchronously(remote *leafNodeCfg, firstConnect bool) {
1,419✔
660
        remote.setConnectInProgress(true)
1,419✔
661
        s.startGoRoutine(func() {
2,838✔
662
                defer s.grWG.Done()
1,419✔
663
                if !connectToRemoteLeafNode(s, remote, firstConnect) {
2,198✔
664
                        remote.setConnectInProgress(false)
779✔
665
                }
779✔
666
        })
667
}
668

669
// Connect to a remote leaf node. Should only be invoked from
670
// `s.connectToRemoteLeafNodeAsynchronously()` or `s.reConnectToRemoteLeafNode()`.
671
// Returns `true` if this function invoked `s.createLeafNode()`, false otherwise.
672
func connectToRemoteLeafNode(s *Server, remote *leafNodeCfg, firstConnect bool) bool {
1,607✔
673

1,607✔
674
        if remote == nil || len(remote.URLs) == 0 {
1,607✔
675
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
676
                return false
×
677
        }
×
678

679
        opts := s.getOpts()
1,607✔
680
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,607✔
681
        s.mu.RLock()
1,607✔
682
        dialTimeout := s.leafNodeOpts.dialTimeout
1,607✔
683
        resolver := s.leafNodeOpts.resolver
1,607✔
684
        var isSysAcc bool
1,607✔
685
        if s.eventsEnabled() {
3,178✔
686
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,571✔
687
        }
1,571✔
688
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,607✔
689
        s.mu.RUnlock()
1,607✔
690

1,607✔
691
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,607✔
692
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,678✔
693
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
71✔
694
                remote.setConnectDelay(sharedSysAccDelay)
71✔
695
        }
71✔
696

697
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,685✔
698
                select {
78✔
699
                case <-time.After(connDelay):
70✔
700
                case <-remote.quitCh:
×
701
                        return false
×
702
                case <-s.quitCh:
8✔
703
                        return false
8✔
704
                }
705
                remote.setConnectDelay(0)
70✔
706
        }
707

708
        var conn net.Conn
1,599✔
709

1,599✔
710
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,599✔
711

1,599✔
712
        // Capture proxy configuration once before the loop with proper locking
1,599✔
713
        remote.RLock()
1,599✔
714
        proxyURL := remote.Proxy.URL
1,599✔
715
        proxyUsername := remote.Proxy.Username
1,599✔
716
        proxyPassword := remote.Proxy.Password
1,599✔
717
        proxyTimeout := remote.Proxy.Timeout
1,599✔
718
        remote.RUnlock()
1,599✔
719

1,599✔
720
        // Set default proxy timeout if not specified
1,599✔
721
        if proxyTimeout == 0 {
3,190✔
722
                proxyTimeout = dialTimeout
1,591✔
723
        }
1,591✔
724

725
        attempts := 0
1,599✔
726

1,599✔
727
        // In case the migrate timer was created but not canceled, do it when
1,599✔
728
        // this function exits. Note that the timer would not be created if
1,599✔
729
        // `jetstreamMigrateDelay == 0`.
1,599✔
730
        if jetstreamMigrateDelay > 0 {
1,607✔
731
                defer remote.cancelMigrateTimer()
8✔
732
        }
8✔
733

734
        for s.isRunning() && remote.stillValid() {
7,950✔
735
                rURL := remote.pickNextURL()
6,351✔
736
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,351✔
737
                if err == nil {
12,695✔
738
                        var ipStr string
6,344✔
739
                        if url != rURL.Host {
6,414✔
740
                                ipStr = fmt.Sprintf(" (%s)", url)
70✔
741
                        }
70✔
742
                        // Some test may want to disable remotes from connecting
743
                        if s.isLeafConnectDisabled() {
6,474✔
744
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
130✔
745
                                err = ErrLeafNodeDisabled
130✔
746
                        } else {
6,344✔
747
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,214✔
748

6,214✔
749
                                // Check if proxy is configured
6,214✔
750
                                if proxyURL != _EMPTY_ {
6,222✔
751
                                        targetHost := rURL.Host
8✔
752
                                        // If URL doesn't include port, add the default port for the scheme
8✔
753
                                        if rURL.Port() == _EMPTY_ {
8✔
754
                                                defaultPort := "80"
×
755
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
756
                                                        defaultPort = "443"
×
757
                                                }
×
758
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
759
                                        }
760

761
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
762
                                } else {
6,206✔
763
                                        // Direct connection
6,206✔
764
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,206✔
765
                                }
6,206✔
766
                        }
767
                }
768
                if err != nil {
11,883✔
769
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,532✔
770
                        delay := reconnectDelay + jitter
5,532✔
771
                        attempts++
5,532✔
772
                        if s.shouldReportConnectErr(firstConnect, attempts) {
9,164✔
773
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,632✔
774
                        } else {
5,532✔
775
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,900✔
776
                        }
1,900✔
777
                        remote.Lock()
5,532✔
778
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,532✔
779
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,540✔
780
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
781
                                        s.checkJetStreamMigrate(remote)
8✔
782
                                })
8✔
783
                        }
784
                        remote.Unlock()
5,532✔
785
                        select {
5,532✔
786
                        case <-s.quitCh:
770✔
787
                                return false
770✔
788
                        case <-remote.quitCh:
1✔
789
                                return false
1✔
790
                        case <-time.After(delay):
4,760✔
791
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
4,760✔
792
                                // This will be used if JetStreamClusterMigrateDelay was not set
4,760✔
793
                                if jetstreamMigrateDelay == 0 {
9,448✔
794
                                        s.checkJetStreamMigrate(remote)
4,688✔
795
                                }
4,688✔
796
                                continue
4,760✔
797
                        }
798
                }
799
                remote.cancelMigrateTimer()
819✔
800
                // We can check here, but really we will have to check again when the server
819✔
801
                // is about to add to the `s.leafs` map later in the process.
819✔
802
                if !remote.stillValid() {
819✔
803
                        conn.Close()
×
804
                        return false
×
805
                }
×
806

807
                // We have a connection here to a remote server.
808
                // Go ahead and create our leaf node and return.
809
                s.createLeafNode(conn, rURL, remote, nil)
819✔
810

819✔
811
                // Clear any observer states if we had them.
819✔
812
                s.clearObserverState(remote)
819✔
813

819✔
814
                return true
819✔
815
        }
816

817
        return false
8✔
818
}
819

820
func (cfg *leafNodeCfg) cancelMigrateTimer() {
827✔
821
        cfg.Lock()
827✔
822
        stopAndClearTimer(&cfg.jsMigrateTimer)
827✔
823
        cfg.Unlock()
827✔
824
}
827✔
825

826
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
827
func (s *Server) clearObserverState(remote *leafNodeCfg) {
819✔
828
        s.mu.RLock()
819✔
829
        accName := remote.LocalAccount
819✔
830
        s.mu.RUnlock()
819✔
831

819✔
832
        acc, err := s.LookupAccount(accName)
819✔
833
        if err != nil {
821✔
834
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
835
                return
2✔
836
        }
2✔
837

838
        acc.jscmMu.Lock()
817✔
839
        defer acc.jscmMu.Unlock()
817✔
840

817✔
841
        // Walk all streams looking for any clustered stream, skip otherwise.
817✔
842
        for _, mset := range acc.streams() {
835✔
843
                node := mset.raftNode()
18✔
844
                if node == nil {
28✔
845
                        // Not R>1
10✔
846
                        continue
10✔
847
                }
848
                // Check consumers
849
                for _, o := range mset.getConsumers() {
10✔
850
                        if n := o.raftNode(); n != nil {
4✔
851
                                // Ensure we can become a leader again.
2✔
852
                                n.SetObserver(false)
2✔
853
                        }
2✔
854
                }
855
                // Ensure we can not become a leader again.
856
                node.SetObserver(false)
8✔
857
        }
858
}
859

860
// Check to see if we should migrate any assets from this account.
861
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,696✔
862
        s.mu.RLock()
4,696✔
863
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,696✔
864
        s.mu.RUnlock()
4,696✔
865

4,696✔
866
        if !shouldMigrate {
9,326✔
867
                return
4,630✔
868
        }
4,630✔
869

870
        acc, err := s.LookupAccount(accName)
66✔
871
        if err != nil {
66✔
872
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
873
                return
×
874
        }
×
875

876
        acc.jscmMu.Lock()
66✔
877
        defer acc.jscmMu.Unlock()
66✔
878

66✔
879
        // Walk all streams looking for any clustered stream, skip otherwise.
66✔
880
        // If we are the leader force stepdown.
66✔
881
        for _, mset := range acc.streams() {
99✔
882
                node := mset.raftNode()
33✔
883
                if node == nil {
33✔
884
                        // Not R>1
×
885
                        continue
×
886
                }
887
                // Collect any consumers
888
                for _, o := range mset.getConsumers() {
54✔
889
                        if n := o.raftNode(); n != nil {
42✔
890
                                n.StepDown()
21✔
891
                                // Ensure we can not become a leader while in this state.
21✔
892
                                n.SetObserver(true)
21✔
893
                        }
21✔
894
                }
895
                // Stepdown if this stream was leader.
896
                node.StepDown()
33✔
897
                // Ensure we can not become a leader while in this state.
33✔
898
                node.SetObserver(true)
33✔
899
        }
900
}
901

902
// Helper for checking.
903
func (s *Server) isLeafConnectDisabled() bool {
6,344✔
904
        s.mu.RLock()
6,344✔
905
        defer s.mu.RUnlock()
6,344✔
906
        return s.leafDisableConnect
6,344✔
907
}
6,344✔
908

909
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
910
// come from the server we connect to.
911
//
912
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
913
// However, this was causing failures for users that did not set the scheme (and
914
// their remote connections did not have a tls{} block).
915
// We now save the host name regardless in case the remote returns an INFO indicating
916
// that TLS is required.
917
//
918
// Lock held on entry.
919
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,510✔
920
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,529✔
921
                cfg.tlsName = u.Hostname()
19✔
922
        }
19✔
923
}
924

925
// Save off the username/password for when we connect using a bare URL
926
// that we get from the INFO protocol.
927
//
928
// Lock held on entry.
929
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,853✔
930
        if cfg.username == _EMPTY_ && u.User != nil {
2,171✔
931
                cfg.username = u.User.Username()
318✔
932
                cfg.password, _ = u.User.Password()
318✔
933
        }
318✔
934
}
935

936
// This starts the leafnode accept loop in a go routine, unless it
937
// is detected that the server has already been shutdown.
938
func (s *Server) startLeafNodeAcceptLoop() {
3,978✔
939
        // Snapshot server options.
3,978✔
940
        opts := s.getOpts()
3,978✔
941

3,978✔
942
        port := opts.LeafNode.Port
3,978✔
943
        if port == -1 {
7,780✔
944
                port = 0
3,802✔
945
        }
3,802✔
946

947
        if s.isShuttingDown() {
3,978✔
948
                return
×
949
        }
×
950

951
        s.mu.Lock()
3,978✔
952
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,978✔
953
        l, e := natsListen("tcp", hp)
3,978✔
954
        s.leafNodeListenerErr = e
3,978✔
955
        if e != nil {
3,978✔
956
                s.mu.Unlock()
×
957
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
958
                return
×
959
        }
×
960

961
        s.Noticef("Listening for leafnode connections on %s",
3,978✔
962
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,978✔
963

3,978✔
964
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,978✔
965
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,978✔
966
        // Do not set compression in this Info object, it would possibly cause
3,978✔
967
        // issues when sending asynchronous INFO to the remote.
3,978✔
968
        info := Info{
3,978✔
969
                ID:            s.info.ID,
3,978✔
970
                Name:          s.info.Name,
3,978✔
971
                Version:       s.info.Version,
3,978✔
972
                GitCommit:     gitCommit,
3,978✔
973
                GoVersion:     runtime.Version(),
3,978✔
974
                AuthRequired:  true,
3,978✔
975
                TLSRequired:   tlsRequired,
3,978✔
976
                TLSVerify:     tlsVerify,
3,978✔
977
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,978✔
978
                Headers:       s.supportsHeaders(),
3,978✔
979
                JetStream:     opts.JetStream,
3,978✔
980
                Domain:        opts.JetStreamDomain,
3,978✔
981
                Proto:         s.getServerProto(),
3,978✔
982
                InfoOnConnect: true,
3,978✔
983
                JSApiLevel:    JSApiLevel,
3,978✔
984
        }
3,978✔
985
        // If we have selected a random port...
3,978✔
986
        if port == 0 {
7,780✔
987
                // Write resolved port back to options.
3,802✔
988
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,802✔
989
        }
3,802✔
990

991
        s.leafNodeInfo = info
3,978✔
992
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,978✔
993
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,978✔
994
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
995
                l.Close()
×
996
                s.mu.Unlock()
×
997
                return
×
998
        }
×
999
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,978✔
1000
        s.generateLeafNodeInfoJSON()
3,978✔
1001

3,978✔
1002
        // Setup state that can enable shutdown
3,978✔
1003
        s.leafNodeListener = l
3,978✔
1004

3,978✔
1005
        // As of now, a server that does not have remotes configured would
3,978✔
1006
        // never solicit a connection, so we should not have to warn if
3,978✔
1007
        // InsecureSkipVerify is set in main LeafNodes config (since
3,978✔
1008
        // this TLS setting matters only when soliciting a connection).
3,978✔
1009
        // Still, warn if insecure is set in any of LeafNode block.
3,978✔
1010
        // We need to check remotes, even if tls is not required on accept.
3,978✔
1011
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,978✔
1012
        if !warn {
7,954✔
1013
                for _, r := range opts.LeafNode.Remotes {
4,169✔
1014
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
193✔
1015
                                warn = true
×
1016
                                break
×
1017
                        }
1018
                }
1019
        }
1020
        if warn {
3,980✔
1021
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
1022
        }
2✔
1023
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,848✔
1024
        s.mu.Unlock()
3,978✔
1025
}
1026

1027
// RegEx to match a creds file with user JWT and Seed.
1028
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
1029

1030
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
1031
// Lock should be held entering here.
1032
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
693✔
1033
        // We support basic user/pass and operator based user JWT with signatures.
693✔
1034
        cinfo := leafConnectInfo{
693✔
1035
                Version:       VERSION,
693✔
1036
                ID:            c.srv.info.ID,
693✔
1037
                Domain:        c.srv.info.Domain,
693✔
1038
                Name:          c.srv.info.Name,
693✔
1039
                Hub:           c.leaf.remote.Hub,
693✔
1040
                Cluster:       clusterName,
693✔
1041
                Headers:       headers,
693✔
1042
                JetStream:     c.acc.jetStreamConfigured(),
693✔
1043
                DenyPub:       c.leaf.remote.DenyImports,
693✔
1044
                Compression:   c.leaf.compression,
693✔
1045
                RemoteAccount: c.acc.GetName(),
693✔
1046
                Proto:         c.srv.getServerProto(),
693✔
1047
                Isolate:       c.leaf.remote.RequestIsolation,
693✔
1048
        }
693✔
1049

693✔
1050
        // If a signature callback is specified, this takes precedence over anything else.
693✔
1051
        if cb := c.leaf.remote.SignatureCB; cb != nil {
698✔
1052
                nonce := c.nonce
5✔
1053
                c.mu.Unlock()
5✔
1054
                jwt, sigraw, err := cb(nonce)
5✔
1055
                c.mu.Lock()
5✔
1056
                if err == nil && c.isClosed() {
6✔
1057
                        err = ErrConnectionClosed
1✔
1058
                }
1✔
1059
                if err != nil {
7✔
1060
                        c.Errorf("Error signing the nonce: %v", err)
2✔
1061
                        return err
2✔
1062
                }
2✔
1063
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1064
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
1065

1066
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
744✔
1067
                // Check for credentials first, that will take precedence..
56✔
1068
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1069
                contents, err := os.ReadFile(creds)
56✔
1070
                if err != nil {
56✔
1071
                        c.Errorf("%v", err)
×
1072
                        return err
×
1073
                }
×
1074
                defer wipeSlice(contents)
56✔
1075
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1076
                if len(items) < 2 {
56✔
1077
                        c.Errorf("Credentials file malformed")
×
1078
                        return err
×
1079
                }
×
1080
                // First result should be the user JWT.
1081
                // We copy here so that the file containing the seed will be wiped appropriately.
1082
                raw := items[0][1]
56✔
1083
                tmp := make([]byte, len(raw))
56✔
1084
                copy(tmp, raw)
56✔
1085
                // Seed is second item.
56✔
1086
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1087
                if err != nil {
56✔
1088
                        c.Errorf("Credentials file has malformed seed")
×
1089
                        return err
×
1090
                }
×
1091
                // Wipe our key on exit.
1092
                defer kp.Wipe()
56✔
1093

56✔
1094
                sigraw, _ := kp.Sign(c.nonce)
56✔
1095
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1096
                cinfo.JWT = bytesToString(tmp)
56✔
1097
                cinfo.Sig = sig
56✔
1098
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
637✔
1099
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1100
                if err != nil {
5✔
1101
                        c.Errorf("Remote nkey has malformed seed")
×
1102
                        return err
×
1103
                }
×
1104
                // Wipe our key on exit.
1105
                defer kp.Wipe()
5✔
1106
                sigraw, _ := kp.Sign(c.nonce)
5✔
1107
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1108
                pkey, _ := kp.PublicKey()
5✔
1109
                cinfo.Nkey = pkey
5✔
1110
                cinfo.Sig = sig
5✔
1111
        }
1112
        // In addition, and this is to allow auth callout, set user/password or
1113
        // token if applicable.
1114
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
1,033✔
1115
                cinfo.User = userInfo.Username()
342✔
1116
                var ok bool
342✔
1117
                cinfo.Pass, ok = userInfo.Password()
342✔
1118
                // For backward compatibility, if only username is provided, set both
342✔
1119
                // Token and User, not just Token.
342✔
1120
                if !ok {
351✔
1121
                        cinfo.Token = cinfo.User
9✔
1122
                }
9✔
1123
        } else if c.leaf.remote.username != _EMPTY_ {
356✔
1124
                cinfo.User = c.leaf.remote.username
7✔
1125
                cinfo.Pass = c.leaf.remote.password
7✔
1126
                // For backward compatibility, if only username is provided, set both
7✔
1127
                // Token and User, not just Token.
7✔
1128
                if cinfo.Pass == _EMPTY_ {
7✔
1129
                        cinfo.Token = cinfo.User
×
1130
                }
×
1131
        }
1132
        b, err := json.Marshal(cinfo)
691✔
1133
        if err != nil {
691✔
1134
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1135
                return err
×
1136
        }
×
1137
        // Although this call is made before the writeLoop is created,
1138
        // we don't really need to send in place. The protocol will be
1139
        // sent out by the writeLoop.
1140
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
691✔
1141
        return nil
691✔
1142
}
1143

1144
// Makes a deep copy of the LeafNode Info structure.
1145
// The server lock is held on entry.
1146
func (s *Server) copyLeafNodeInfo() *Info {
2,784✔
1147
        clone := s.leafNodeInfo
2,784✔
1148
        // Copy the array of urls.
2,784✔
1149
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
5,064✔
1150
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,280✔
1151
        }
2,280✔
1152
        return &clone
2,784✔
1153
}
1154

1155
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1156
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1157
// Returns a boolean indicating if the URL was added or not.
1158
// Server lock is held on entry
1159
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,896✔
1160
        if s.leafURLsMap.addUrl(urlStr) {
15,787✔
1161
                s.generateLeafNodeInfoJSON()
7,891✔
1162
                return true
7,891✔
1163
        }
7,891✔
1164
        return false
5✔
1165
}
1166

1167
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1168
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1169
// Returns a boolean indicating if the URL was removed or not.
1170
// Server lock is held on entry.
1171
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,890✔
1172
        // Don't need to do this if we are removing the route connection because
7,890✔
1173
        // we are shuting down...
7,890✔
1174
        if s.isShuttingDown() {
12,088✔
1175
                return false
4,198✔
1176
        }
4,198✔
1177
        if s.leafURLsMap.removeUrl(urlStr) {
7,380✔
1178
                s.generateLeafNodeInfoJSON()
3,688✔
1179
                return true
3,688✔
1180
        }
3,688✔
1181
        return false
4✔
1182
}
1183

1184
// Server lock is held on entry
1185
func (s *Server) generateLeafNodeInfoJSON() {
15,557✔
1186
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,557✔
1187
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,557✔
1188
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,557✔
1189
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,557✔
1190
}
15,557✔
1191

1192
// Sends an async INFO protocol so that the connected servers can update
1193
// their list of LeafNode urls.
1194
func (s *Server) sendAsyncLeafNodeInfo() {
11,579✔
1195
        for _, c := range s.leafs {
11,680✔
1196
                c.mu.Lock()
101✔
1197
                c.enqueueProto(s.leafNodeInfoJSON)
101✔
1198
                c.mu.Unlock()
101✔
1199
        }
101✔
1200
}
1201

1202
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1203
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,718✔
1204
        // Snapshot server options.
1,718✔
1205
        opts := s.getOpts()
1,718✔
1206

1,718✔
1207
        maxPay := int32(opts.MaxPayload)
1,718✔
1208
        maxSubs := int32(opts.MaxSubs)
1,718✔
1209
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,718✔
1210
        if maxSubs == 0 {
3,435✔
1211
                maxSubs = -1
1,717✔
1212
        }
1,717✔
1213
        now := time.Now().UTC()
1,718✔
1214

1,718✔
1215
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,718✔
1216
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,718✔
1217
        c.leaf = &leaf{}
1,718✔
1218

1,718✔
1219
        // If the leafnode subject interest should be isolated, flag it here.
1,718✔
1220
        s.optsMu.RLock()
1,718✔
1221
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,535✔
1222
                c.leaf.isolated = remote.LocalIsolation
817✔
1223
        }
817✔
1224
        s.optsMu.RUnlock()
1,718✔
1225

1,718✔
1226
        // For accepted LN connections, ws will be != nil if it was accepted
1,718✔
1227
        // through the Websocket port.
1,718✔
1228
        c.ws = ws
1,718✔
1229

1,718✔
1230
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,718✔
1231
        // a remote Leaf Node connection as a websocket connection.
1,718✔
1232
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,768✔
1233
                remote.RLock()
50✔
1234
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1235
                remote.RUnlock()
50✔
1236
        }
50✔
1237

1238
        // Determines if we are soliciting the connection or not.
1239
        var solicited bool
1,718✔
1240
        var acc *Account
1,718✔
1241
        var remoteSuffix string
1,718✔
1242
        if remote != nil {
2,537✔
1243
                // For now, if lookup fails, we will constantly try
819✔
1244
                // to recreate this LN connection.
819✔
1245
                lacc := remote.LocalAccount
819✔
1246
                var err error
819✔
1247
                acc, err = s.LookupAccount(lacc)
819✔
1248
                if err != nil {
821✔
1249
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1250
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1251
                        // remote needs to be set or retry won't happen
2✔
1252
                        c.leaf.remote = remote
2✔
1253
                        c.closeConnection(MissingAccount)
2✔
1254
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1255
                        return nil
2✔
1256
                }
2✔
1257
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
817✔
1258
        }
1259

1260
        c.mu.Lock()
1,716✔
1261
        c.initClient()
1,716✔
1262
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,716✔
1263

1,716✔
1264
        var (
1,716✔
1265
                tlsFirst         bool
1,716✔
1266
                tlsFirstFallback time.Duration
1,716✔
1267
                infoTimeout      time.Duration
1,716✔
1268
        )
1,716✔
1269
        if remote != nil {
2,533✔
1270
                solicited = true
817✔
1271
                remote.Lock()
817✔
1272
                c.leaf.remote = remote
817✔
1273
                c.setPermissions(remote.perms)
817✔
1274
                if !c.leaf.remote.Hub {
1,616✔
1275
                        c.leaf.isSpoke = true
799✔
1276
                }
799✔
1277
                tlsFirst = remote.TLSHandshakeFirst
817✔
1278
                infoTimeout = remote.FirstInfoTimeout
817✔
1279
                remote.Unlock()
817✔
1280
                c.acc = acc
817✔
1281
        } else {
899✔
1282
                c.flags.set(expectConnect)
899✔
1283
                if ws != nil {
928✔
1284
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1285
                }
29✔
1286
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
899✔
1287
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
900✔
1288
                        tlsFirstFallback = f
1✔
1289
                }
1✔
1290
        }
1291
        c.mu.Unlock()
1,716✔
1292

1,716✔
1293
        var nonce [nonceLen]byte
1,716✔
1294
        var info *Info
1,716✔
1295

1,716✔
1296
        // Grab this before the client lock below.
1,716✔
1297
        if !solicited {
2,615✔
1298
                // Grab server variables
899✔
1299
                s.mu.Lock()
899✔
1300
                info = s.copyLeafNodeInfo()
899✔
1301
                // For tests that want to simulate old servers, do not set the compression
899✔
1302
                // on the INFO protocol if configured with CompressionNotSupported.
899✔
1303
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,797✔
1304
                        info.Compression = cm
898✔
1305
                }
898✔
1306
                // We always send a nonce for LEAF connections. Do not change that without
1307
                // taking into account presence of proxy trusted keys.
1308
                s.generateNonce(nonce[:])
899✔
1309
                s.mu.Unlock()
899✔
1310
        }
1311

1312
        // Grab lock
1313
        c.mu.Lock()
1,716✔
1314

1,716✔
1315
        var preBuf []byte
1,716✔
1316
        if solicited {
2,533✔
1317
                // For websocket connection, we need to send an HTTP request,
817✔
1318
                // and get the response before starting the readLoop to get
817✔
1319
                // the INFO, etc..
817✔
1320
                if c.isWebsocket() {
867✔
1321
                        var err error
50✔
1322
                        var closeReason ClosedState
50✔
1323

50✔
1324
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1325
                        if err != nil {
71✔
1326
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1327
                                c.mu.Unlock()
21✔
1328
                                if closeReason != 0 {
38✔
1329
                                        c.closeConnection(closeReason)
17✔
1330
                                }
17✔
1331
                                return nil
21✔
1332
                        }
1333
                } else {
767✔
1334
                        // If configured to do TLS handshake first
767✔
1335
                        if tlsFirst {
771✔
1336
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1337
                                        c.mu.Unlock()
1✔
1338
                                        return nil
1✔
1339
                                }
1✔
1340
                        }
1341
                        // We need to wait for the info, but not for too long.
1342
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
766✔
1343
                }
1344

1345
                // We will process the INFO from the readloop and finish by
1346
                // sending the CONNECT and finish registration later.
1347
        } else {
899✔
1348
                // Send our info to the other side.
899✔
1349
                // Remember the nonce we sent here for signatures, etc.
899✔
1350
                c.nonce = make([]byte, nonceLen)
899✔
1351
                copy(c.nonce, nonce[:])
899✔
1352
                info.Nonce = bytesToString(c.nonce)
899✔
1353
                info.CID = c.cid
899✔
1354
                proto := generateInfoJSON(info)
899✔
1355

899✔
1356
                var pre []byte
899✔
1357
                // We need first to check for "TLS First" fallback delay.
899✔
1358
                if tlsFirstFallback > 0 {
900✔
1359
                        // We wait and see if we are getting any data. Since we did not send
1✔
1360
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1361
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1362
                        // if it is a rogue agent and not an actual client performing the
1✔
1363
                        // TLS handshake, the error will be detected when performing the
1✔
1364
                        // handshake on our side.
1✔
1365
                        pre = make([]byte, 4)
1✔
1366
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1367
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1368
                        c.nc.SetReadDeadline(time.Time{})
1✔
1369
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1370
                        // with the TLS handshake.
1✔
1371
                        if n > 0 {
1✔
1372
                                pre = pre[:n]
×
1373
                        } else {
1✔
1374
                                // We did not get anything so we will send the INFO protocol.
1✔
1375
                                pre = nil
1✔
1376
                                // Set the boolean to false for the rest of the function.
1✔
1377
                                tlsFirst = false
1✔
1378
                        }
1✔
1379
                }
1380

1381
                if !tlsFirst {
1,793✔
1382
                        // We have to send from this go routine because we may
894✔
1383
                        // have to block for TLS handshake before we start our
894✔
1384
                        // writeLoop go routine. The other side needs to receive
894✔
1385
                        // this before it can initiate the TLS handshake..
894✔
1386
                        c.sendProtoNow(proto)
894✔
1387

894✔
1388
                        // The above call could have marked the connection as closed (due to TCP error).
894✔
1389
                        if c.isClosed() {
894✔
1390
                                c.mu.Unlock()
×
1391
                                c.closeConnection(WriteError)
×
1392
                                return nil
×
1393
                        }
×
1394
                }
1395

1396
                // Check to see if we need to spin up TLS.
1397
                if !c.isWebsocket() && info.TLSRequired {
974✔
1398
                        // If we have a prebuffer create a multi-reader.
75✔
1399
                        if len(pre) > 0 {
75✔
1400
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1401
                        }
×
1402
                        // Perform server-side TLS handshake.
1403
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
124✔
1404
                                c.mu.Unlock()
49✔
1405
                                return nil
49✔
1406
                        }
49✔
1407
                }
1408

1409
                // If the user wants the TLS handshake to occur first, now that it is
1410
                // done, send the INFO protocol.
1411
                if tlsFirst {
853✔
1412
                        c.flags.set(didTLSFirst)
3✔
1413
                        c.sendProtoNow(proto)
3✔
1414
                        if c.isClosed() {
3✔
1415
                                c.mu.Unlock()
×
1416
                                c.closeConnection(WriteError)
×
1417
                                return nil
×
1418
                        }
×
1419
                }
1420

1421
                // Leaf nodes will always require a CONNECT to let us know
1422
                // when we are properly bound to an account.
1423
                //
1424
                // If compression is configured, we can't set the authTimer here because
1425
                // it would cause the parser to fail any incoming protocol that is not a
1426
                // CONNECT (and we need to exchange INFO protocols for compression
1427
                // negotiation). So instead, use the ping timer until we are done with
1428
                // negotiation and can set the auth timer.
1429
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
850✔
1430
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,471✔
1431
                        c.ping.tmr = time.AfterFunc(timeout, func() {
630✔
1432
                                c.authTimeout()
9✔
1433
                        })
9✔
1434
                } else {
229✔
1435
                        c.setAuthTimer(timeout)
229✔
1436
                }
229✔
1437
        }
1438

1439
        // Keep track in case server is shutdown before we can successfully register.
1440
        if !s.addToTempClients(c.cid, c) {
1,646✔
1441
                c.mu.Unlock()
1✔
1442
                c.setNoReconnect()
1✔
1443
                c.closeConnection(ServerShutdown)
1✔
1444
                return nil
1✔
1445
        }
1✔
1446

1447
        // Spin up the read loop.
1448
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,288✔
1449

1450
        // We will spin the write loop for solicited connections only
1451
        // when processing the INFO and after switching to TLS if needed.
1452
        if !solicited {
2,494✔
1453
                s.startGoRoutine(func() { c.writeLoop() })
1,700✔
1454
        }
1455

1456
        c.mu.Unlock()
1,644✔
1457

1,644✔
1458
        return c
1,644✔
1459
}
1460

1461
// Will perform the client-side TLS handshake if needed. Assumes that this
1462
// is called by the solicit side (remote will be non nil). Returns `true`
1463
// if TLS is required, `false` otherwise.
1464
// Lock held on entry.
1465
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,968✔
1466
        // Check if TLS is required and gather TLS config variables.
1,968✔
1467
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,968✔
1468
        if !tlsRequired {
3,859✔
1469
                return false, nil
1,891✔
1470
        }
1,891✔
1471

1472
        // If TLS required, peform handshake.
1473
        // Get the URL that was used to connect to the remote server.
1474
        rURL := remote.getCurrentURL()
77✔
1475

77✔
1476
        // Perform the client-side TLS handshake.
77✔
1477
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
114✔
1478
                // Check if we need to reset the remote's TLS name.
37✔
1479
                if resetTLSName {
37✔
1480
                        remote.Lock()
×
1481
                        remote.tlsName = _EMPTY_
×
1482
                        remote.Unlock()
×
1483
                }
×
1484
                return false, err
37✔
1485
        }
1486
        return true, nil
40✔
1487
}
1488

1489
func (c *client) processLeafnodeInfo(info *Info) {
2,723✔
1490
        c.mu.Lock()
2,723✔
1491
        if c.leaf == nil || c.isClosed() {
2,725✔
1492
                c.mu.Unlock()
2✔
1493
                return
2✔
1494
        }
2✔
1495
        s := c.srv
2,721✔
1496
        opts := s.getOpts()
2,721✔
1497
        remote := c.leaf.remote
2,721✔
1498
        didSolicit := remote != nil
2,721✔
1499
        firstINFO := !c.flags.isSet(infoReceived)
2,721✔
1500

2,721✔
1501
        // In case of websocket, the TLS handshake has been already done.
2,721✔
1502
        // So check only for non websocket connections and for configurations
2,721✔
1503
        // where the TLS Handshake was not done first.
2,721✔
1504
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,635✔
1505
                // If the server requires TLS, we need to set this in the remote
1,914✔
1506
                // otherwise if there is no TLS configuration block for the remote,
1,914✔
1507
                // the solicit side will not attempt to perform the TLS handshake.
1,914✔
1508
                if firstINFO && info.TLSRequired {
1,975✔
1509
                        // Check for TLS/proxy configuration mismatch
61✔
1510
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
61✔
1511
                                c.mu.Unlock()
×
1512
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1513
                                c.closeConnection(TLSHandshakeError)
×
1514
                                return
×
1515
                        }
×
1516
                        remote.TLS = true
61✔
1517
                }
1518
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,946✔
1519
                        c.mu.Unlock()
32✔
1520
                        return
32✔
1521
                }
32✔
1522
        }
1523

1524
        // Check for compression, unless already done.
1525
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
4,033✔
1526
                // A solicited leafnode connection must first receive a leafnode INFO.
1,344✔
1527
                // Classify wrong-port connections before any leaf-specific negotiation.
1,344✔
1528
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
1,398✔
1529
                        c.mu.Unlock()
54✔
1530
                        c.Errorf(ErrConnectedToWrongPort.Error())
54✔
1531
                        c.closeConnection(WrongPort)
54✔
1532
                        return
54✔
1533
                }
54✔
1534

1535
                // Prevent from getting back here.
1536
                c.flags.set(compressionNegotiated)
1,290✔
1537

1,290✔
1538
                var co *CompressionOpts
1,290✔
1539
                if !didSolicit {
1,883✔
1540
                        co = &opts.LeafNode.Compression
593✔
1541
                } else {
1,290✔
1542
                        co = &remote.Compression
697✔
1543
                }
697✔
1544
                if needsCompression(co.Mode) {
2,565✔
1545
                        // Release client lock since following function will need server lock.
1,275✔
1546
                        c.mu.Unlock()
1,275✔
1547
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,275✔
1548
                        if err != nil {
1,275✔
1549
                                c.sendErrAndErr(err.Error())
×
1550
                                c.closeConnection(ProtocolViolation)
×
1551
                                return
×
1552
                        }
×
1553
                        if compress {
2,460✔
1554
                                // Done for now, will get back another INFO protocol...
1,185✔
1555
                                return
1,185✔
1556
                        }
1,185✔
1557
                        // No compression because one side does not want/can't, so proceed.
1558
                        c.mu.Lock()
90✔
1559
                        // Check that the connection did not close if the lock was released.
90✔
1560
                        if c.isClosed() {
90✔
1561
                                c.mu.Unlock()
×
1562
                                return
×
1563
                        }
×
1564
                } else {
15✔
1565
                        // Coming from an old server, the Compression field would be the empty
15✔
1566
                        // string. For servers that are configured with CompressionNotSupported,
15✔
1567
                        // this makes them behave as old servers.
15✔
1568
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
17✔
1569
                                c.leaf.compression = CompressionNotSupported
2✔
1570
                        } else {
15✔
1571
                                c.leaf.compression = CompressionOff
13✔
1572
                        }
13✔
1573
                }
1574
                // Accepting side does not normally process an INFO protocol during
1575
                // initial connection handshake. So we keep it consistent by returning
1576
                // if we are not soliciting.
1577
                if !didSolicit {
107✔
1578
                        // If we had created the ping timer instead of the auth timer, we will
2✔
1579
                        // clear the ping timer and set the auth timer now that the compression
2✔
1580
                        // negotiation is done.
2✔
1581
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
3✔
1582
                                clearTimer(&c.ping.tmr)
1✔
1583
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
1✔
1584
                        }
1✔
1585
                        c.mu.Unlock()
2✔
1586
                        return
2✔
1587
                }
1588
                // Fall through and process the INFO protocol as usual.
1589
        }
1590

1591
        // Note: For now, only the initial INFO has a nonce. We
1592
        // will probably do auto key rotation at some point.
1593
        if firstINFO {
2,185✔
1594
                // Mark that the INFO protocol has been received.
737✔
1595
                c.flags.set(infoReceived)
737✔
1596
                // Prevent connecting to non leafnode port. Need to do this only for
737✔
1597
                // the first INFO, not for async INFO updates...
737✔
1598
                //
737✔
1599
                // Content of INFO sent by the server when accepting a tcp connection.
737✔
1600
                // -------------------------------------------------------------------
737✔
1601
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
737✔
1602
                // -------------------------------------------------------------------
737✔
1603
                //      CLIENT    |  X* |        X**        |              |         |
737✔
1604
                //      ROUTE     |     |        X**        |      X***    |         |
737✔
1605
                //     GATEWAY    |     |                   |              |    X    |
737✔
1606
                //     LEAFNODE   |  X  |                   |       X      |         |
737✔
1607
                // -------------------------------------------------------------------
737✔
1608
                // *   Not on older servers.
737✔
1609
                // **  Not if "no advertise" is enabled.
737✔
1610
                // *** Not if leafnode's "no advertise" is enabled.
737✔
1611
                //
737✔
1612
                // Reject a cluster that contains spaces.
737✔
1613
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
738✔
1614
                        c.mu.Unlock()
1✔
1615
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1616
                        c.closeConnection(ProtocolViolation)
1✔
1617
                        return
1✔
1618
                }
1✔
1619
                // For solicited outbound leaf connections, capture the remote's nonce.
1620
                // For inbound leaf connections, keep using the server-issued nonce that
1621
                // was sent in our initial INFO and must be signed in CONNECT.
1622
                if didSolicit {
1,429✔
1623
                        c.nonce = []byte(info.Nonce)
693✔
1624
                }
693✔
1625
                if info.TLSRequired && didSolicit {
765✔
1626
                        remote.TLS = true
29✔
1627
                }
29✔
1628
                supportsHeaders := c.srv.supportsHeaders()
736✔
1629
                c.headers = supportsHeaders && info.Headers
736✔
1630

736✔
1631
                // Remember the remote server.
736✔
1632
                // Pre 2.2.0 servers are not sending their server name.
736✔
1633
                // In that case, use info.ID, which, for those servers, matches
736✔
1634
                // the content of the field `Name` in the leafnode CONNECT protocol.
736✔
1635
                if info.Name == _EMPTY_ {
736✔
1636
                        c.leaf.remoteServer = info.ID
×
1637
                } else {
736✔
1638
                        c.leaf.remoteServer = info.Name
736✔
1639
                }
736✔
1640
                c.leaf.remoteDomain = info.Domain
736✔
1641
                c.leaf.remoteCluster = info.Cluster
736✔
1642
                // We send the protocol version in the INFO protocol.
736✔
1643
                // Keep track of it, so we know if this connection supports message
736✔
1644
                // tracing for instance.
736✔
1645
                c.opts.Protocol = info.Proto
736✔
1646
        }
1647

1648
        // For both initial INFO and async INFO protocols, Possibly
1649
        // update our list of remote leafnode URLs we can connect to,
1650
        // unless we are instructed not to.
1651
        if didSolicit && !remote.IgnoreDiscoveredServers &&
1,447✔
1652
                (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,807✔
1653
                // Consider the incoming array as the most up-to-date
1,360✔
1654
                // representation of the remote cluster's list of URLs.
1,360✔
1655
                c.updateLeafNodeURLs(info)
1,360✔
1656
        }
1,360✔
1657

1658
        // Only solicited leafnode connections trust permission updates from INFO.
1659
        if didSolicit && (info.Import != nil || info.Export != nil) {
1,467✔
1660
                perms := &Permissions{
20✔
1661
                        Publish:   info.Export,
20✔
1662
                        Subscribe: info.Import,
20✔
1663
                }
20✔
1664
                // Check if we have local deny clauses that we need to merge.
20✔
1665
                if remote := c.leaf.remote; remote != nil {
40✔
1666
                        if len(remote.DenyExports) > 0 {
21✔
1667
                                if perms.Publish == nil {
1✔
1668
                                        perms.Publish = &SubjectPermission{}
×
1669
                                }
×
1670
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1671
                        }
1672
                        if len(remote.DenyImports) > 0 {
21✔
1673
                                if perms.Subscribe == nil {
1✔
1674
                                        perms.Subscribe = &SubjectPermission{}
×
1675
                                }
×
1676
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1677
                        }
1678
                }
1679
                c.setPermissions(perms)
20✔
1680
        }
1681

1682
        var resumeConnect bool
1,447✔
1683

1,447✔
1684
        // If this is a remote connection and this is the first INFO protocol,
1,447✔
1685
        // then we need to finish the connect process by sending CONNECT, etc..
1,447✔
1686
        if firstINFO && didSolicit {
2,140✔
1687
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
693✔
1688
                c.nc.SetDeadline(time.Time{})
693✔
1689
                resumeConnect = true
693✔
1690
        } else if !firstINFO && didSolicit {
2,117✔
1691
                c.leaf.remoteAccName = info.RemoteAccount
670✔
1692
        }
670✔
1693

1694
        // Check if we have the remote account information and if so make sure it's stored.
1695
        if info.RemoteAccount != _EMPTY_ {
2,102✔
1696
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
655✔
1697
        }
655✔
1698
        c.mu.Unlock()
1,447✔
1699

1,447✔
1700
        finishConnect := info.ConnectInfo
1,447✔
1701
        if resumeConnect && s != nil {
2,140✔
1702
                s.leafNodeResumeConnectProcess(c)
693✔
1703
                if !info.InfoOnConnect {
693✔
1704
                        finishConnect = true
×
1705
                }
×
1706
        }
1707
        if finishConnect {
2,102✔
1708
                s.leafNodeFinishConnectProcess(c)
655✔
1709
        }
655✔
1710

1711
        // Check to see if we need to kick any internal source or mirror consumers.
1712
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1713
        // does not have jetstream.
1714
        s.checkInternalSyncConsumers(c.acc)
1,447✔
1715
}
1716

1717
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,275✔
1718
        // Negotiate the appropriate compression mode (or no compression)
1,275✔
1719
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,275✔
1720
        if err != nil {
1,275✔
1721
                return false, err
×
1722
        }
×
1723
        c.mu.Lock()
1,275✔
1724
        // For "auto" mode, set the initial compression mode based on RTT
1,275✔
1725
        if cm == CompressionS2Auto {
2,428✔
1726
                if c.rttStart.IsZero() {
2,306✔
1727
                        c.rtt = computeRTT(c.start)
1,153✔
1728
                }
1,153✔
1729
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,153✔
1730
        }
1731
        // Keep track of the negotiated compression mode.
1732
        c.leaf.compression = cm
1,275✔
1733
        cid := c.cid
1,275✔
1734
        var nonce string
1,275✔
1735
        if !didSolicit {
1,867✔
1736
                nonce = bytesToString(c.nonce)
592✔
1737
        }
592✔
1738
        c.mu.Unlock()
1,275✔
1739

1,275✔
1740
        if !needsCompression(cm) {
1,365✔
1741
                return false, nil
90✔
1742
        }
90✔
1743

1744
        // If we end-up doing compression...
1745

1746
        // Generate an INFO with the chosen compression mode.
1747
        s.mu.Lock()
1,185✔
1748
        info := s.copyLeafNodeInfo()
1,185✔
1749
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,185✔
1750
        infoProto := generateInfoJSON(info)
1,185✔
1751
        s.mu.Unlock()
1,185✔
1752

1,185✔
1753
        // If we solicited, then send this INFO protocol BEFORE switching
1,185✔
1754
        // to compression writer. However, if we did not, we send it after.
1,185✔
1755
        c.mu.Lock()
1,185✔
1756
        if didSolicit {
1,779✔
1757
                c.enqueueProto(infoProto)
594✔
1758
                // Make sure it is completely flushed (the pending bytes goes to
594✔
1759
                // 0) before proceeding.
594✔
1760
                for c.out.pb > 0 && !c.isClosed() {
1,188✔
1761
                        c.flushOutbound()
594✔
1762
                }
594✔
1763
        }
1764
        // This is to notify the readLoop that it should switch to a
1765
        // (de)compression reader.
1766
        c.in.flags.set(switchToCompression)
1,185✔
1767
        // Create the compress writer before queueing the INFO protocol for
1,185✔
1768
        // a route that did not solicit. It will make sure that that proto
1,185✔
1769
        // is sent with compression on.
1,185✔
1770
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,185✔
1771
        if !didSolicit {
1,776✔
1772
                c.enqueueProto(infoProto)
591✔
1773
        }
591✔
1774
        c.mu.Unlock()
1,185✔
1775
        return true, nil
1,185✔
1776
}
1777

1778
// When getting a leaf node INFO protocol, use the provided
1779
// array of urls to update the list of possible endpoints.
1780
func (c *client) updateLeafNodeURLs(info *Info) {
1,360✔
1781
        cfg := c.leaf.remote
1,360✔
1782
        cfg.Lock()
1,360✔
1783
        defer cfg.Unlock()
1,360✔
1784

1,360✔
1785
        // We have ensured that if a remote has a WS scheme, then all are.
1,360✔
1786
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,360✔
1787
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,418✔
1788
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1789
                // we will have already marked that the remote should use TLS anyway.
58✔
1790
                // But use proper scheme for log statements, etc...
58✔
1791
                proto := wsSchemePrefix
58✔
1792
                if cfg.TLS {
58✔
1793
                        proto = wsSchemePrefixTLS
×
1794
                }
×
1795
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1796
                return
58✔
1797
        }
1798
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,302✔
1799
}
1800

1801
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,360✔
1802
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,360✔
1803
        // Add the ones we receive in the protocol
1,360✔
1804
        for _, surl := range URLs {
3,751✔
1805
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,391✔
1806
                if err != nil {
2,391✔
1807
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1808
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1809
                        continue
×
1810
                }
1811
                // Do not add if it's the same as what we already have configured.
1812
                var dup bool
2,391✔
1813
                for _, u := range cfg.URLs {
6,028✔
1814
                        // URLs that we receive never have user info, but the
3,637✔
1815
                        // ones that were configured may have. Simply compare
3,637✔
1816
                        // host and port to decide if they are equal or not.
3,637✔
1817
                        if url.Host == u.Host && url.Port() == u.Port() {
5,371✔
1818
                                dup = true
1,734✔
1819
                                break
1,734✔
1820
                        }
1821
                }
1822
                if !dup {
3,048✔
1823
                        cfg.urls = append(cfg.urls, url)
657✔
1824
                        cfg.saveTLSHostname(url)
657✔
1825
                }
657✔
1826
        }
1827
        // Add the configured one
1828
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,360✔
1829
}
1830

1831
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1832
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,978✔
1833
        opts := s.getOpts()
3,978✔
1834
        if opts.LeafNode.Advertise != _EMPTY_ {
3,989✔
1835
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1836
                if err != nil {
11✔
1837
                        return err
×
1838
                }
×
1839
                s.leafNodeInfo.Host = advHost
11✔
1840
                s.leafNodeInfo.Port = advPort
11✔
1841
        } else {
3,967✔
1842
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,967✔
1843
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,967✔
1844
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,967✔
1845
                // This will return at most 1 IP.
3,967✔
1846
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,967✔
1847
                if err != nil {
3,967✔
1848
                        return err
×
1849
                }
×
1850
                if hostIsIPAny {
4,274✔
1851
                        if len(ips) == 0 {
307✔
1852
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1853
                                        s.leafNodeInfo.Host)
×
1854
                        } else {
307✔
1855
                                // Take the first from the list...
307✔
1856
                                s.leafNodeInfo.Host = ips[0]
307✔
1857
                        }
307✔
1858
                }
1859
        }
1860
        // Use just host:port for the IP
1861
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,978✔
1862
        if opts.LeafNode.Advertise != _EMPTY_ {
3,989✔
1863
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1864
        }
11✔
1865
        return nil
3,978✔
1866
}
1867

1868
// Add the connection to the map of leaf nodes.
1869
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1870
// if a connection already exists for the same server name and account.
1871
// That can happen when the remote is attempting to reconnect while the accepting
1872
// side did not detect the connection as broken yet.
1873
// But it can also happen when there is a misconfiguration and the remote is
1874
// creating two (or more) connections that bind to the same account on the accept
1875
// side.
1876
// When a duplicate is found, the new connection is accepted and the old is closed
1877
// (this solves the stale connection situation). An error is returned to help the
1878
// remote detect the misconfiguration when the duplicate is the result of that
1879
// misconfiguration.
1880
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) bool {
1,352✔
1881
        var accName string
1,352✔
1882
        c.mu.Lock()
1,352✔
1883
        cid := c.cid
1,352✔
1884
        acc := c.acc
1,352✔
1885
        if acc != nil {
2,704✔
1886
                accName = acc.Name
1,352✔
1887
        }
1,352✔
1888
        myRemoteDomain := c.leaf.remoteDomain
1,352✔
1889
        mySrvName := c.leaf.remoteServer
1,352✔
1890
        remoteAccName := c.leaf.remoteAccName
1,352✔
1891
        myClustName := c.leaf.remoteCluster
1,352✔
1892
        remote := c.leaf.remote
1,352✔
1893
        solicited := remote != nil
1,352✔
1894
        c.mu.Unlock()
1,352✔
1895

1,352✔
1896
        var old *client
1,352✔
1897
        s.mu.Lock()
1,352✔
1898
        // We check for empty because in some test we may send empty CONNECT{}
1,352✔
1899
        if checkForDup && srvName != _EMPTY_ {
2,012✔
1900
                for _, ol := range s.leafs {
1,084✔
1901
                        ol.mu.Lock()
424✔
1902
                        // We care here only about non solicited Leafnode. This function
424✔
1903
                        // is more about replacing stale connections than detecting loops.
424✔
1904
                        // We have code for the loop detection elsewhere, which also delays
424✔
1905
                        // attempt to reconnect.
424✔
1906
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
424✔
1907
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
424✔
1908
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
426✔
1909
                                old = ol
2✔
1910
                        }
2✔
1911
                        ol.mu.Unlock()
424✔
1912
                        if old != nil {
426✔
1913
                                break
2✔
1914
                        }
1915
                }
1916
        }
1917
        // Now that we are under the server lock and before adding it to the map,
1918
        // for a solicited leaf, we need to make sure that it has not been removed
1919
        // from the config or disabled.
1920
        if solicited {
2,005✔
1921
                // If no longer valid, do not add to the server map. The connection
653✔
1922
                // should have been marked so that it can't reconnect. When the caller
653✔
1923
                // calls closeConnection(), cleanup (including clearing the connect-
653✔
1924
                // in-progress flag) will occur at the appropriate time.
653✔
1925
                if !remote.stillValid() {
653✔
1926
                        // Prevent reconnect in case it was not yet done.
×
1927
                        c.setNoReconnect()
×
1928
                        s.mu.Unlock()
×
1929
                        s.removeFromTempClients(cid)
×
1930
                        return false
×
1931
                }
×
1932
                remote.setConnectInProgress(false)
653✔
1933
        }
1934
        // Store new connection in the map
1935
        s.leafs[cid] = c
1,352✔
1936
        s.mu.Unlock()
1,352✔
1937
        s.removeFromTempClients(cid)
1,352✔
1938

1,352✔
1939
        // If applicable, evict the old one.
1,352✔
1940
        if old != nil {
1,354✔
1941
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1942
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1943
                c.Warnf("Replacing connection from same server")
2✔
1944
        }
2✔
1945

1946
        srvDecorated := func() string {
1,565✔
1947
                if myClustName == _EMPTY_ {
240✔
1948
                        return mySrvName
27✔
1949
                }
27✔
1950
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
186✔
1951
        }
1952

1953
        opts := s.getOpts()
1,352✔
1954
        sysAcc := s.SystemAccount()
1,352✔
1955
        js := s.getJetStream()
1,352✔
1956
        var meta *raft
1,352✔
1957
        if js != nil {
1,899✔
1958
                if mg := js.getMetaGroup(); mg != nil {
976✔
1959
                        meta = mg.(*raft)
429✔
1960
                }
429✔
1961
        }
1962
        blockMappingOutgoing := false
1,352✔
1963
        // Deny (non domain) JetStream API traffic unless system account is shared
1,352✔
1964
        // and domain names are identical and extending is not disabled
1,352✔
1965

1,352✔
1966
        // Check if backwards compatibility has been enabled and needs to be acted on
1,352✔
1967
        forceSysAccDeny := false
1,352✔
1968
        if len(opts.JsAccDefaultDomain) > 0 {
1,390✔
1969
                if acc == sysAcc {
49✔
1970
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1971
                                if d == _EMPTY_ {
19✔
1972
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1973
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1974
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1975
                                        forceSysAccDeny = true
8✔
1976
                                        break
8✔
1977
                                }
1978
                        }
1979
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
43✔
1980
                        // for backwards compatibility with old setups that do not have a domain name set
16✔
1981
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
16✔
1982
                        return true
16✔
1983
                }
16✔
1984
        }
1985

1986
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1987
        // This is either signaled by js being disabled and a domain set,
1988
        // or in cases where no domain name exists, an extension hint is set.
1989
        // However, this is only relevant in mixed setups.
1990
        //
1991
        // If the system account connects but default domains are present, JetStream can't be extended.
1992
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,336✔
1993
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,513✔
1994
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,177✔
1995
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,177✔
1996
                if acc != nil && acc == sysAcc {
1,318✔
1997
                        c.Noticef("System account connected from %s", srvDecorated())
141✔
1998
                        c.Noticef("JetStream not extended, domains differ")
141✔
1999
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
141✔
2000
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
141✔
2001
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
141✔
2002
                        if solicited && meta != nil && meta.IsObserver() {
172✔
2003
                                meta.setObserver(false, extNotExtended)
31✔
2004
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
31✔
2005
                                // Take note that the domain was not extended to avoid this state from startup.
31✔
2006
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
31✔
2007
                                // Meta controller can't be leader yet.
31✔
2008
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
31✔
2009
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
31✔
2010
                                meta.Campaign()
31✔
2011
                        }
31✔
2012
                } else {
1,036✔
2013
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
1,036✔
2014
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
1,036✔
2015
                }
1,036✔
2016
                blockMappingOutgoing = true
1,177✔
2017
        } else if acc == sysAcc {
231✔
2018
                // system account and same domain
72✔
2019
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
2020
                        myRemoteDomain, srvDecorated())
72✔
2021
                // In an extension use case, pin leadership to server remotes connect to.
72✔
2022
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
2023
                if solicited && meta != nil && !meta.IsObserver() {
76✔
2024
                        meta.setObserver(true, extExtended)
4✔
2025
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
2026
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
2027
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
2028
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
2029
                        meta.StepDown()
4✔
2030
                }
4✔
2031
        } else {
87✔
2032
                // This deny is needed in all cases (system account shared or not)
87✔
2033
                // If the system account is shared, jsAllAPI traffic will go through the system account.
87✔
2034
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
87✔
2035
                // If the system account is NOT shared, jsAllAPI traffic has no business
87✔
2036
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
87✔
2037
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
87✔
2038
        }
87✔
2039
        // If we have a specified JetStream domain we will want to add a mapping to
2040
        // allow access cross domain for each non-system account.
2041
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,590✔
2042
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,540✔
2043
                        if err := acc.AddMapping(src, dest); err != nil {
2,286✔
2044
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
2045
                        } else {
2,286✔
2046
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,286✔
2047
                        }
2,286✔
2048
                }
2049
                if blockMappingOutgoing {
477✔
2050
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
223✔
2051
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
223✔
2052
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
223✔
2053
                        // of this issue, not all of them.
223✔
2054
                        // This guards against a hub and a spoke having the same domain name.
223✔
2055
                        // But not two spokes having the same one and the request coming from the hub.
223✔
2056
                        c.mergeDenyPermissionsLocked(pub, []string{src})
223✔
2057
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
223✔
2058
                }
223✔
2059
        }
2060
        return true
1,336✔
2061
}
2062

2063
func (s *Server) removeLeafNodeConnection(c *client) {
1,722✔
2064
        s.mu.Lock()
1,722✔
2065
        c.mu.Lock()
1,722✔
2066
        cid := c.cid
1,722✔
2067
        if c.leaf != nil {
3,444✔
2068
                if c.leaf.tsubt != nil {
2,953✔
2069
                        c.leaf.tsubt.Stop()
1,231✔
2070
                        c.leaf.tsubt = nil
1,231✔
2071
                }
1,231✔
2072
                if c.leaf.gwSub != nil {
2,375✔
2073
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
653✔
2074
                        // We need to set this to nil for GC to release the connection
653✔
2075
                        c.leaf.gwSub = nil
653✔
2076
                }
653✔
2077
                if remote := c.leaf.remote; remote != nil {
2,542✔
2078
                        // If "noReconnect" is true, then we won't attempt to reconnect, so
820✔
2079
                        // we will clear the "connect-in-progress" flag. However, if we can
820✔
2080
                        // reconnect, then we should set "connect-in-progress" to true while
820✔
2081
                        // we are under the server/client lock. The go routine that performs
820✔
2082
                        // the reconnect will be started later and there would be a gap with
820✔
2083
                        // the wrong flag value otherwise.
820✔
2084
                        remote.setConnectInProgress(!c.flags.isSet(noReconnect))
820✔
2085
                }
820✔
2086
        }
2087
        proxyKey := c.proxyKey
1,722✔
2088
        c.mu.Unlock()
1,722✔
2089
        delete(s.leafs, cid)
1,722✔
2090
        if proxyKey != _EMPTY_ {
1,726✔
2091
                s.removeProxiedConn(proxyKey, cid)
4✔
2092
        }
4✔
2093
        s.mu.Unlock()
1,722✔
2094
        s.removeFromTempClients(cid)
1,722✔
2095
}
2096

2097
// Connect information for solicited leafnodes.
2098
type leafConnectInfo struct {
2099
        Version   string   `json:"version,omitempty"`
2100
        Nkey      string   `json:"nkey,omitempty"`
2101
        JWT       string   `json:"jwt,omitempty"`
2102
        Sig       string   `json:"sig,omitempty"`
2103
        User      string   `json:"user,omitempty"`
2104
        Pass      string   `json:"pass,omitempty"`
2105
        Token     string   `json:"auth_token,omitempty"`
2106
        ID        string   `json:"server_id,omitempty"`
2107
        Domain    string   `json:"domain,omitempty"`
2108
        Name      string   `json:"name,omitempty"`
2109
        Hub       bool     `json:"is_hub,omitempty"`
2110
        Cluster   string   `json:"cluster,omitempty"`
2111
        Headers   bool     `json:"headers,omitempty"`
2112
        JetStream bool     `json:"jetstream,omitempty"`
2113
        DenyPub   []string `json:"deny_pub,omitempty"`
2114
        Isolate   bool     `json:"isolate,omitempty"`
2115

2116
        // There was an existing field called:
2117
        // >> Comp bool `json:"compression,omitempty"`
2118
        // that has never been used. With support for compression, we now need
2119
        // a field that is a string. So we use a different json tag:
2120
        Compression string `json:"compress_mode,omitempty"`
2121

2122
        // Just used to detect wrong connection attempts.
2123
        Gateway string `json:"gateway,omitempty"`
2124

2125
        // Tells the accept side which account the remote is binding to.
2126
        RemoteAccount string `json:"remote_account,omitempty"`
2127

2128
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2129
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2130
        // version as part of the CONNECT. It will indicate if a connection supports
2131
        // some features, such as message tracing.
2132
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2133
        // in the low level process CONNECT.
2134
        Proto int `json:"protocol,omitempty"`
2135
}
2136

2137
// processLeafNodeConnect will process the inbound connect args.
2138
// Once we are here we are bound to an account, so can send any interest that
2139
// we would have to the other side.
2140
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
704✔
2141
        // Way to detect clients that incorrectly connect to the route listen
704✔
2142
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
704✔
2143
        if lang != _EMPTY_ {
704✔
2144
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2145
                c.closeConnection(WrongPort)
×
2146
                return ErrClientConnectedToLeafNodePort
×
2147
        }
×
2148

2149
        // Unmarshal as a leaf node connect protocol
2150
        proto := &leafConnectInfo{}
704✔
2151
        if err := json.Unmarshal(arg, proto); err != nil {
704✔
2152
                return err
×
2153
        }
×
2154

2155
        // Reject a cluster that contains spaces.
2156
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
705✔
2157
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2158
                c.closeConnection(ProtocolViolation)
1✔
2159
                return ErrClusterNameHasSpaces
1✔
2160
        }
1✔
2161

2162
        // Check for cluster name collisions.
2163
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
706✔
2164
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2165
                c.closeConnection(ClusterNamesIdentical)
3✔
2166
                return ErrLeafNodeHasSameClusterName
3✔
2167
        }
3✔
2168

2169
        // Reject if this has Gateway which means that it would be from a gateway
2170
        // connection that incorrectly connects to the leafnode port.
2171
        if proto.Gateway != _EMPTY_ {
700✔
2172
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2173
                c.Errorf(errTxt)
×
2174
                c.sendErr(errTxt)
×
2175
                c.closeConnection(WrongGateway)
×
2176
                return ErrWrongGateway
×
2177
        }
×
2178

2179
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
702✔
2180
                major, minor, update, _ := versionComponents(mv)
2✔
2181
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2182
                        // Send back an INFO so recent remote servers process the rejection
1✔
2183
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2184
                        // reconnect delay when it processes the error.
1✔
2185
                        s.sendPermsAndAccountInfo(c)
1✔
2186
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2187
                        c.closeConnection(MinimumVersionRequired)
1✔
2188
                        return ErrMinimumVersionRequired
1✔
2189
                }
1✔
2190
        }
2191

2192
        // Check if this server supports headers.
2193
        supportHeaders := c.srv.supportsHeaders()
699✔
2194

699✔
2195
        c.mu.Lock()
699✔
2196
        // Leaf Nodes do not do echo or verbose or pedantic.
699✔
2197
        c.opts.Verbose = false
699✔
2198
        c.opts.Echo = false
699✔
2199
        c.opts.Pedantic = false
699✔
2200
        // This inbound connection will be marked as supporting headers if this server
699✔
2201
        // support headers and the remote has sent in the CONNECT protocol that it does
699✔
2202
        // support headers too.
699✔
2203
        c.headers = supportHeaders && proto.Headers
699✔
2204
        // If the compression level is still not set, set it based on what has been
699✔
2205
        // given to us in the CONNECT protocol.
699✔
2206
        if c.leaf.compression == _EMPTY_ {
837✔
2207
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
138✔
2208
                if proto.Compression == _EMPTY_ {
180✔
2209
                        c.leaf.compression = CompressionNotSupported
42✔
2210
                } else {
138✔
2211
                        c.leaf.compression = proto.Compression
96✔
2212
                }
96✔
2213
        }
2214

2215
        // Remember the remote server.
2216
        c.leaf.remoteServer = proto.Name
699✔
2217
        // Remember the remote account name
699✔
2218
        c.leaf.remoteAccName = proto.RemoteAccount
699✔
2219
        // Remember if the leafnode requested isolation.
699✔
2220
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
699✔
2221

699✔
2222
        // If the other side has declared itself a hub, so we will take on the spoke role.
699✔
2223
        if proto.Hub {
717✔
2224
                c.leaf.isSpoke = true
18✔
2225
        }
18✔
2226

2227
        // The soliciting side is part of a cluster.
2228
        if proto.Cluster != _EMPTY_ {
1,233✔
2229
                c.leaf.remoteCluster = proto.Cluster
534✔
2230
        }
534✔
2231

2232
        c.leaf.remoteDomain = proto.Domain
699✔
2233

699✔
2234
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
699✔
2235
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
699✔
2236
        if !c.isSolicitedLeafNode() && c.perms != nil {
722✔
2237
                sp, pp := c.perms.sub, c.perms.pub
23✔
2238
                c.perms.sub, c.perms.pub = pp, sp
23✔
2239
                if c.opts.Import != nil {
45✔
2240
                        c.darray = c.opts.Import.Deny
22✔
2241
                } else {
23✔
2242
                        c.darray = nil
1✔
2243
                }
1✔
2244
        }
2245

2246
        // Set the Ping timer
2247
        c.setFirstPingTimer()
699✔
2248

699✔
2249
        // If we received pub deny permissions from the other end, merge with existing ones.
699✔
2250
        c.mergeDenyPermissions(pub, proto.DenyPub)
699✔
2251

699✔
2252
        acc := c.acc
699✔
2253
        c.mu.Unlock()
699✔
2254

699✔
2255
        // If the account is not set (e.g. connection was closed due to auth
699✔
2256
        // timeout while still being processed), bail out to avoid a panic.
699✔
2257
        if acc == nil {
699✔
2258
                c.closeConnection(MissingAccount)
×
2259
                return ErrMissingAccount
×
2260
        }
×
2261

2262
        // Register the cluster, even if empty, as long as we are acting as a hub.
2263
        if !proto.Hub {
1,380✔
2264
                acc.registerLeafNodeCluster(proto.Cluster)
681✔
2265
        }
681✔
2266

2267
        // Add in the leafnode here since we passed through auth at this point.
2268
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
699✔
2269

699✔
2270
        // If we have permissions bound to this leafnode we need to send then back to the
699✔
2271
        // origin server for local enforcement.
699✔
2272
        s.sendPermsAndAccountInfo(c)
699✔
2273

699✔
2274
        // Create and initialize the smap since we know our bound account now.
699✔
2275
        // This will send all registered subs too.
699✔
2276
        s.initLeafNodeSmapAndSendSubs(c)
699✔
2277

699✔
2278
        // Announce the account connect event for a leaf node.
699✔
2279
        // This will be a no-op as needed.
699✔
2280
        s.sendLeafNodeConnect(c.acc)
699✔
2281

699✔
2282
        // Check to see if we need to kick any internal source or mirror consumers.
699✔
2283
        // This will be a no-op if JetStream not enabled for this server or if the bound account
699✔
2284
        // does not have jetstream.
699✔
2285
        s.checkInternalSyncConsumers(acc)
699✔
2286

699✔
2287
        return nil
699✔
2288
}
2289

2290
// checkInternalSyncConsumers
2291
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,146✔
2292
        // Grab our js
2,146✔
2293
        js := s.getJetStream()
2,146✔
2294

2,146✔
2295
        // Only applicable if we have JS and the leafnode has JS as well.
2,146✔
2296
        // We check for remote JS outside.
2,146✔
2297
        if !js.isEnabled() || acc == nil {
3,392✔
2298
                return
1,246✔
2299
        }
1,246✔
2300

2301
        // We will check all streams in our local account. They must be a leader and
2302
        // be sourcing or mirroring. We will check the external config on the stream itself
2303
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2304
        // extending the system across this leafnode connection and hence we would be extending
2305
        // our own domain.
2306
        jsa := js.lookupAccount(acc)
900✔
2307
        if jsa == nil {
1,248✔
2308
                return
348✔
2309
        }
348✔
2310

2311
        var streams []*stream
552✔
2312
        jsa.mu.RLock()
552✔
2313
        for _, mset := range jsa.streams {
607✔
2314
                mset.cfgMu.RLock()
55✔
2315
                // We need to have a mirror or source defined.
55✔
2316
                // We do not want to force another lock here to look for leader status,
55✔
2317
                // so collect and after we release jsa will make sure.
55✔
2318
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
67✔
2319
                        streams = append(streams, mset)
12✔
2320
                }
12✔
2321
                mset.cfgMu.RUnlock()
55✔
2322
        }
2323
        jsa.mu.RUnlock()
552✔
2324

552✔
2325
        // Now loop through all candidates and check if we are the leader and have NOT
552✔
2326
        // created the sync up consumer.
552✔
2327
        for _, mset := range streams {
564✔
2328
                mset.retryDisconnectedSyncConsumers()
12✔
2329
        }
12✔
2330
}
2331

2332
// Returns the remote cluster name. This is set only once so does not require a lock.
2333
func (c *client) remoteCluster() string {
173,612✔
2334
        if c.leaf == nil {
173,612✔
2335
                return _EMPTY_
×
2336
        }
×
2337
        return c.leaf.remoteCluster
173,612✔
2338
}
2339

2340
// Sends back an info block to the soliciting leafnode to let it know about
2341
// its permission settings for local enforcement.
2342
func (s *Server) sendPermsAndAccountInfo(c *client) {
700✔
2343
        // Copy
700✔
2344
        s.mu.Lock()
700✔
2345
        info := s.copyLeafNodeInfo()
700✔
2346
        s.mu.Unlock()
700✔
2347
        c.mu.Lock()
700✔
2348
        info.CID = c.cid
700✔
2349
        info.Import = c.opts.Import
700✔
2350
        info.Export = c.opts.Export
700✔
2351
        info.RemoteAccount = c.acc.Name
700✔
2352
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
700✔
2353
        info.IsSystemAccount = c.acc == s.SystemAccount()
700✔
2354
        info.ConnectInfo = true
700✔
2355
        c.enqueueProto(generateInfoJSON(info))
700✔
2356
        c.mu.Unlock()
700✔
2357
}
700✔
2358

2359
// Snapshot the current subscriptions from the sublist into our smap which
2360
// we will keep updated from now on.
2361
// Also send the registered subscriptions.
2362
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,352✔
2363
        acc := c.acc
1,352✔
2364
        if acc == nil {
1,352✔
2365
                c.Debugf("Leafnode does not have an account bound")
×
2366
                return
×
2367
        }
×
2368
        // Collect all account subs here.
2369
        _subs := [1024]*subscription{}
1,352✔
2370
        subs := _subs[:0]
1,352✔
2371
        ims := []string{}
1,352✔
2372

1,352✔
2373
        // Hold the client lock otherwise there can be a race and miss some subs.
1,352✔
2374
        c.mu.Lock()
1,352✔
2375
        defer c.mu.Unlock()
1,352✔
2376

1,352✔
2377
        acc.mu.RLock()
1,352✔
2378
        accName := acc.Name
1,352✔
2379
        accNTag := acc.nameTag
1,352✔
2380

1,352✔
2381
        // To make printing look better when no friendly name present.
1,352✔
2382
        if accNTag != _EMPTY_ {
1,364✔
2383
                accNTag = "/" + accNTag
12✔
2384
        }
12✔
2385

2386
        // If we are solicited we only send interest for local clients.
2387
        if c.isSpokeLeafNode() {
2,005✔
2388
                acc.sl.localSubs(&subs, true)
653✔
2389
        } else {
1,352✔
2390
                acc.sl.All(&subs)
699✔
2391
        }
699✔
2392

2393
        // Check if we have an existing service import reply.
2394
        siReply := copyBytes(acc.siReply)
1,352✔
2395

1,352✔
2396
        // Since leaf nodes only send on interest, if the bound
1,352✔
2397
        // account has import services we need to send those over.
1,352✔
2398
        for isubj := range acc.imports.services {
6,403✔
2399
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,351✔
2400
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
300✔
2401
                        continue
300✔
2402
                }
2403
                ims = append(ims, isubj)
4,751✔
2404
        }
2405
        // Likewise for mappings.
2406
        for _, m := range acc.mappings {
3,760✔
2407
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,454✔
2408
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2409
                        continue
46✔
2410
                }
2411
                ims = append(ims, m.src)
2,362✔
2412
        }
2413

2414
        // Create a unique subject that will be used for loop detection.
2415
        lds := acc.lds
1,352✔
2416
        acc.mu.RUnlock()
1,352✔
2417

1,352✔
2418
        // Check if we have to create the LDS.
1,352✔
2419
        if lds == _EMPTY_ {
2,421✔
2420
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,069✔
2421
                acc.mu.Lock()
1,069✔
2422
                acc.lds = lds
1,069✔
2423
                acc.mu.Unlock()
1,069✔
2424
        }
1,069✔
2425

2426
        // Now check for gateway interest. Leafnodes will put this into
2427
        // the proper mode to propagate, but they are not held in the account.
2428
        gwsa := [16]*client{}
1,352✔
2429
        gws := gwsa[:0]
1,352✔
2430
        s.getOutboundGatewayConnections(&gws)
1,352✔
2431
        for _, cgw := range gws {
1,435✔
2432
                cgw.mu.Lock()
83✔
2433
                gw := cgw.gw
83✔
2434
                cgw.mu.Unlock()
83✔
2435
                if gw != nil {
166✔
2436
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
166✔
2437
                                if e := ei.(*outsie); e != nil && e.sl != nil {
166✔
2438
                                        e.sl.All(&subs)
83✔
2439
                                }
83✔
2440
                        }
2441
                }
2442
        }
2443

2444
        applyGlobalRouting := s.gateway.enabled
1,352✔
2445
        if c.isSpokeLeafNode() {
2,005✔
2446
                // Add a fake subscription for this solicited leafnode connection
653✔
2447
                // so that we can send back directly for mapped GW replies.
653✔
2448
                // We need to keep track of this subscription so it can be removed
653✔
2449
                // when the connection is closed so that the GC can release it.
653✔
2450
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
653✔
2451
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
653✔
2452
        }
653✔
2453

2454
        // Now walk the results and add them to our smap
2455
        rc := c.leaf.remoteCluster
1,352✔
2456
        c.leaf.smap = make(map[string]int32)
1,352✔
2457
        for _, sub := range subs {
39,707✔
2458
                // Check perms regardless of role.
38,355✔
2459
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,749✔
2460
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,394✔
2461
                        continue
2,394✔
2462
                }
2463
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2464
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
35,976✔
2465
                        continue
15✔
2466
                }
2467
                // We ignore ourselves here.
2468
                // Also don't add the subscription if it has a origin cluster and the
2469
                // cluster name matches the one of the client we are sending to.
2470
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
66,450✔
2471
                        count := int32(1)
30,504✔
2472
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,515✔
2473
                                count = sub.qw
11✔
2474
                        }
11✔
2475
                        c.leaf.smap[keyFromSub(sub)] += count
30,504✔
2476
                        if c.leaf.tsub == nil {
31,772✔
2477
                                c.leaf.tsub = make(map[*subscription]struct{})
1,268✔
2478
                        }
1,268✔
2479
                        c.leaf.tsub[sub] = struct{}{}
30,504✔
2480
                }
2481
        }
2482
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2483
        for _, isubj := range ims {
8,465✔
2484
                c.leaf.smap[isubj]++
7,113✔
2485
        }
7,113✔
2486
        // If we have gateways enabled we need to make sure the other side sends us responses
2487
        // that have been augmented from the original subscription.
2488
        // TODO(dlc) - Should we lock this down more?
2489
        if applyGlobalRouting {
1,456✔
2490
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
104✔
2491
                c.leaf.smap[gwReplyPrefix+">"]++
104✔
2492
        }
104✔
2493
        // Detect loops by subscribing to a specific subject and checking
2494
        // if this sub is coming back to us.
2495
        c.leaf.smap[lds]++
1,352✔
2496

1,352✔
2497
        // Check if we need to add an existing siReply to our map.
1,352✔
2498
        // This will be a prefix so add on the wildcard.
1,352✔
2499
        if siReply != nil {
1,370✔
2500
                wcsub := append(siReply, '>')
18✔
2501
                c.leaf.smap[string(wcsub)]++
18✔
2502
        }
18✔
2503
        // Queue all protocols. There is no max pending limit for LN connection,
2504
        // so we don't need chunking. The writes will happen from the writeLoop.
2505
        var b bytes.Buffer
1,352✔
2506
        for key, n := range c.leaf.smap {
28,474✔
2507
                c.writeLeafSub(&b, key, n)
27,122✔
2508
        }
27,122✔
2509
        if b.Len() > 0 {
2,704✔
2510
                c.enqueueProto(b.Bytes())
1,352✔
2511
        }
1,352✔
2512
        if c.leaf.tsub != nil {
2,621✔
2513
                // Clear the tsub map after 5 seconds.
1,269✔
2514
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,307✔
2515
                        c.mu.Lock()
38✔
2516
                        if c.leaf != nil {
76✔
2517
                                c.leaf.tsub = nil
38✔
2518
                                c.leaf.tsubt = nil
38✔
2519
                        }
38✔
2520
                        c.mu.Unlock()
38✔
2521
                })
2522
        }
2523
}
2524

2525
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2526
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
208,251✔
2527
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
208,251✔
2528
        acc, err := s.lookupOrFetchAccount(accName, false)
208,251✔
2529
        if acc == nil || err != nil {
208,491✔
2530
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
240✔
2531
                return
240✔
2532
        }
240✔
2533
        acc.updateLeafNodes(sub, delta)
208,011✔
2534
}
2535

2536
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2537
// Will also forward to all leaf nodes as needed.
2538
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2539
// (that is, for which this server acts as a hub to them).
2540
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,590,434✔
2541
        if acc == nil || sub == nil {
2,590,434✔
2542
                return
×
2543
        }
×
2544

2545
        // We will do checks for no leafnodes and same cluster here inline and under the
2546
        // general account read lock.
2547
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2548
        // blocking routes or GWs.
2549

2550
        acc.mu.RLock()
2,590,434✔
2551
        // First check if we even have leafnodes here.
2,590,434✔
2552
        if acc.nleafs == 0 {
5,110,984✔
2553
                acc.mu.RUnlock()
2,520,550✔
2554
                return
2,520,550✔
2555
        }
2,520,550✔
2556

2557
        // Is this a loop detection subject.
2558
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
69,884✔
2559

69,884✔
2560
        // Capture the cluster even if its empty.
69,884✔
2561
        var cluster string
69,884✔
2562
        if sub.origin != nil {
119,733✔
2563
                cluster = bytesToString(sub.origin)
49,849✔
2564
        }
49,849✔
2565

2566
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2567
        // Empty clusters will return false for the check.
2568
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
91,047✔
2569
                acc.mu.RUnlock()
21,163✔
2570
                return
21,163✔
2571
        }
21,163✔
2572

2573
        // We can release the general account lock.
2574
        acc.mu.RUnlock()
48,721✔
2575

48,721✔
2576
        // We can hold the list lock here to avoid having to copy a large slice.
48,721✔
2577
        acc.lmu.RLock()
48,721✔
2578
        defer acc.lmu.RUnlock()
48,721✔
2579

48,721✔
2580
        // Do this once.
48,721✔
2581
        subject := string(sub.subject)
48,721✔
2582

48,721✔
2583
        // Walk the connected leafnodes.
48,721✔
2584
        for _, ln := range acc.lleafs {
108,999✔
2585
                if ln == sub.client {
91,705✔
2586
                        continue
31,427✔
2587
                }
2588
                ln.mu.Lock()
28,851✔
2589
                // Don't advertise interest from leafnodes to other isolated leafnodes.
28,851✔
2590
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
28,882✔
2591
                        ln.mu.Unlock()
31✔
2592
                        continue
31✔
2593
                }
2594
                // If `hubOnly` is true, it means that we want to update only leafnodes
2595
                // that connect to this server (so isHubLeafNode() would return `true`).
2596
                if hubOnly && !ln.isHubLeafNode() {
28,826✔
2597
                        ln.mu.Unlock()
6✔
2598
                        continue
6✔
2599
                }
2600
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2601
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2602
                // the detection of loops as long as different cluster.
2603
                clusterDifferent := cluster != ln.remoteCluster()
28,814✔
2604
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
52,900✔
2605
                        ln.updateSmap(sub, delta, isLDS)
24,086✔
2606
                }
24,086✔
2607
                ln.mu.Unlock()
28,814✔
2608
        }
2609
}
2610

2611
// updateLeafNodes will make sure to update the account smap for the subscription.
2612
// Will also forward to all leaf nodes as needed.
2613
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,590,411✔
2614
        acc.updateLeafNodesEx(sub, delta, false)
2,590,411✔
2615
}
2,590,411✔
2616

2617
// This will make an update to our internal smap and determine if we should send out
2618
// an interest update to the remote side.
2619
// Lock should be held.
2620
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
24,086✔
2621
        if c.leaf.smap == nil {
24,111✔
2622
                return
25✔
2623
        }
25✔
2624

2625
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2626
        skind := sub.client.kind
24,061✔
2627
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
24,061✔
2628
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,433✔
2629
                return
8,372✔
2630
        }
8,372✔
2631

2632
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2633
        if delta > 0 && c.leaf.tsub != nil {
23,324✔
2634
                if _, present := c.leaf.tsub[sub]; present {
7,640✔
2635
                        delete(c.leaf.tsub, sub)
5✔
2636
                        if len(c.leaf.tsub) == 0 {
5✔
2637
                                c.leaf.tsub = nil
×
2638
                                c.leaf.tsubt.Stop()
×
2639
                                c.leaf.tsubt = nil
×
2640
                        }
×
2641
                        return
5✔
2642
                }
2643
        }
2644

2645
        key := keyFromSub(sub)
15,684✔
2646
        n, ok := c.leaf.smap[key]
15,684✔
2647
        if delta < 0 && !ok {
16,772✔
2648
                return
1,088✔
2649
        }
1,088✔
2650

2651
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2652
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,596✔
2653
        n += delta
14,596✔
2654
        if n > 0 {
25,436✔
2655
                c.leaf.smap[key] = n
10,840✔
2656
        } else {
14,596✔
2657
                delete(c.leaf.smap, key)
3,756✔
2658
        }
3,756✔
2659
        if update {
24,756✔
2660
                c.sendLeafNodeSubUpdate(key, n)
10,160✔
2661
        }
10,160✔
2662
}
2663

2664
// Used to force add subjects to the subject map.
2665
func (c *client) forceAddToSmap(subj string) {
13✔
2666
        c.mu.Lock()
13✔
2667
        defer c.mu.Unlock()
13✔
2668

13✔
2669
        if c.leaf.smap == nil {
13✔
2670
                return
×
2671
        }
×
2672
        n := c.leaf.smap[subj]
13✔
2673
        if n != 0 {
14✔
2674
                return
1✔
2675
        }
1✔
2676
        // Place into the map since it was not there.
2677
        c.leaf.smap[subj] = 1
12✔
2678
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2679
}
2680

2681
// Used to force remove a subject from the subject map.
2682
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2683
        c.mu.Lock()
1✔
2684
        defer c.mu.Unlock()
1✔
2685

1✔
2686
        if c.leaf.smap == nil {
1✔
2687
                return
×
2688
        }
×
2689
        n := c.leaf.smap[subj]
1✔
2690
        if n == 0 {
1✔
2691
                return
×
2692
        }
×
2693
        n--
1✔
2694
        if n == 0 {
2✔
2695
                // Remove is now zero
1✔
2696
                delete(c.leaf.smap, subj)
1✔
2697
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2698
        } else {
1✔
2699
                c.leaf.smap[subj] = n
×
2700
        }
×
2701
}
2702

2703
// Send the subscription interest change to the other side.
2704
// Lock should be held.
2705
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
10,173✔
2706
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
10,173✔
2707
        if c.isSpokeLeafNode() {
12,715✔
2708
                checkPerms := true
2,542✔
2709
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
4,090✔
2710
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,548✔
2711
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,548✔
2712
                                strings.HasPrefix(key, gwReplyPrefix) {
1,644✔
2713
                                checkPerms = false
96✔
2714
                        }
96✔
2715
                }
2716
                if checkPerms {
4,988✔
2717
                        var subject string
2,446✔
2718
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,940✔
2719
                                subject = key[:sep]
494✔
2720
                        } else {
2,446✔
2721
                                subject = key
1,952✔
2722
                        }
1,952✔
2723
                        if !c.canSubscribe(subject) {
2,455✔
2724
                                return
9✔
2725
                        }
9✔
2726
                }
2727
        }
2728
        // If we are here we can send over to the other side.
2729
        _b := [64]byte{}
10,164✔
2730
        b := bytes.NewBuffer(_b[:0])
10,164✔
2731
        c.writeLeafSub(b, key, n)
10,164✔
2732
        c.enqueueProto(b.Bytes())
10,164✔
2733
}
2734

2735
// Helper function to build the key.
2736
func keyFromSub(sub *subscription) string {
47,225✔
2737
        var sb strings.Builder
47,225✔
2738
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
47,225✔
2739
        sb.Write(sub.subject)
47,225✔
2740
        if sub.queue != nil {
51,071✔
2741
                // Just make the key subject spc group, e.g. 'foo bar'
3,846✔
2742
                sb.WriteByte(' ')
3,846✔
2743
                sb.Write(sub.queue)
3,846✔
2744
        }
3,846✔
2745
        return sb.String()
47,225✔
2746
}
2747

2748
const (
2749
        keyRoutedSub         = "R"
2750
        keyRoutedSubByte     = 'R'
2751
        keyRoutedLeafSub     = "L"
2752
        keyRoutedLeafSubByte = 'L'
2753
)
2754

2755
// Helper function to build the key that prevents collisions between normal
2756
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2757
// Keys will look like this:
2758
// "R foo"          -> plain routed sub on "foo"
2759
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2760
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2761
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2762
func keyFromSubWithOrigin(sub *subscription) string {
721,701✔
2763
        var sb strings.Builder
721,701✔
2764
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
721,701✔
2765
        leaf := len(sub.origin) > 0
721,701✔
2766
        if leaf {
738,277✔
2767
                sb.WriteByte(keyRoutedLeafSubByte)
16,576✔
2768
        } else {
721,701✔
2769
                sb.WriteByte(keyRoutedSubByte)
705,125✔
2770
        }
705,125✔
2771
        sb.WriteByte(' ')
721,701✔
2772
        sb.Write(sub.subject)
721,701✔
2773
        if sub.queue != nil {
751,073✔
2774
                sb.WriteByte(' ')
29,372✔
2775
                sb.Write(sub.queue)
29,372✔
2776
        }
29,372✔
2777
        if leaf {
738,277✔
2778
                sb.WriteByte(' ')
16,576✔
2779
                sb.Write(sub.origin)
16,576✔
2780
        }
16,576✔
2781
        return sb.String()
721,701✔
2782
}
2783

2784
// Lock should be held.
2785
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
37,286✔
2786
        if key == _EMPTY_ {
37,286✔
2787
                return
×
2788
        }
×
2789
        if n > 0 {
70,815✔
2790
                w.WriteString("LS+ " + key)
33,529✔
2791
                // Check for queue semantics, if found write n.
33,529✔
2792
                if strings.Contains(key, " ") {
35,858✔
2793
                        w.WriteString(" ")
2,329✔
2794
                        var b [12]byte
2,329✔
2795
                        var i = len(b)
2,329✔
2796
                        for l := n; l > 0; l /= 10 {
5,576✔
2797
                                i--
3,247✔
2798
                                b[i] = digits[l%10]
3,247✔
2799
                        }
3,247✔
2800
                        w.Write(b[i:])
2,329✔
2801
                        if c.trace {
2,329✔
2802
                                arg := fmt.Sprintf("%s %d", key, n)
×
2803
                                c.traceOutOp("LS+", []byte(arg))
×
2804
                        }
×
2805
                } else if c.trace {
31,403✔
2806
                        c.traceOutOp("LS+", []byte(key))
203✔
2807
                }
203✔
2808
        } else {
3,757✔
2809
                w.WriteString("LS- " + key)
3,757✔
2810
                if c.trace {
3,771✔
2811
                        c.traceOutOp("LS-", []byte(key))
14✔
2812
                }
14✔
2813
        }
2814
        w.WriteString(CR_LF)
37,286✔
2815
}
2816

2817
// processLeafSub will process an inbound sub request for the remote leaf node.
2818
func (c *client) processLeafSub(argo []byte) (err error) {
33,198✔
2819
        // Indicate activity.
33,198✔
2820
        c.in.subs++
33,198✔
2821

33,198✔
2822
        srv := c.srv
33,198✔
2823
        if srv == nil {
33,198✔
2824
                return nil
×
2825
        }
×
2826

2827
        // Copy so we do not reference a potentially large buffer
2828
        arg := make([]byte, len(argo))
33,198✔
2829
        copy(arg, argo)
33,198✔
2830

33,198✔
2831
        args := splitArg(arg)
33,198✔
2832
        sub := &subscription{client: c}
33,198✔
2833

33,198✔
2834
        delta := int32(1)
33,198✔
2835
        switch len(args) {
33,198✔
2836
        case 1:
30,927✔
2837
                sub.queue = nil
30,927✔
2838
        case 3:
2,271✔
2839
                sub.queue = args[1]
2,271✔
2840
                sub.qw = int32(parseSize(args[2]))
2,271✔
2841
                // TODO: (ik) We should have a non empty queue name and a queue
2,271✔
2842
                // weight >= 1. For 2.11, we may want to return an error if that
2,271✔
2843
                // is not the case, but for now just overwrite `delta` if queue
2,271✔
2844
                // weight is greater than 1 (it is possible after a reconnect/
2,271✔
2845
                // server restart to receive a queue weight > 1 for a new sub).
2,271✔
2846
                if sub.qw > 1 {
3,937✔
2847
                        delta = sub.qw
1,666✔
2848
                }
1,666✔
2849
        default:
×
2850
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2851
        }
2852
        sub.subject = args[0]
33,198✔
2853

33,198✔
2854
        c.mu.Lock()
33,198✔
2855
        if c.isClosed() {
33,228✔
2856
                c.mu.Unlock()
30✔
2857
                return nil
30✔
2858
        }
30✔
2859

2860
        acc := c.acc
33,168✔
2861
        // Guard against LS+ arriving before CONNECT has been processed, which
33,168✔
2862
        // can happen when compression is enabled.
33,168✔
2863
        if acc == nil {
33,168✔
2864
                c.mu.Unlock()
×
2865
                c.sendErr("Authorization Violation")
×
2866
                c.closeConnection(ProtocolViolation)
×
2867
                return nil
×
2868
        }
×
2869
        // Check if we have a loop.
2870
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
33,168✔
2871

33,168✔
2872
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
33,174✔
2873
                c.mu.Unlock()
6✔
2874
                c.handleLeafNodeLoop(true)
6✔
2875
                return nil
6✔
2876
        }
6✔
2877

2878
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2879
        checkPerms := true
33,162✔
2880
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
63,346✔
2881
                if ldsPrefix ||
30,184✔
2882
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
30,184✔
2883
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
32,246✔
2884
                        checkPerms = false
2,062✔
2885
                }
2,062✔
2886
        }
2887

2888
        // If we are a hub check that we can publish to this subject.
2889
        if checkPerms {
64,262✔
2890
                subj := string(sub.subject)
31,100✔
2891
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
31,436✔
2892
                        c.mu.Unlock()
336✔
2893
                        c.leafSubPermViolation(sub.subject)
336✔
2894
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
336✔
2895
                        return nil
336✔
2896
                }
336✔
2897
        }
2898

2899
        // Check if we have a maximum on the number of subscriptions.
2900
        if c.subsAtLimit() {
32,834✔
2901
                c.mu.Unlock()
8✔
2902
                c.maxSubsExceeded()
8✔
2903
                return nil
8✔
2904
        }
8✔
2905

2906
        // If we have an origin cluster associated mark that in the sub.
2907
        if rc := c.remoteCluster(); rc != _EMPTY_ {
61,710✔
2908
                sub.origin = []byte(rc)
28,892✔
2909
        }
28,892✔
2910

2911
        // Like Routes, we store local subs by account and subject and optionally queue name.
2912
        // If we have a queue it will have a trailing weight which we do not want.
2913
        if sub.queue != nil {
34,790✔
2914
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,972✔
2915
        } else {
32,818✔
2916
                sub.sid = arg
30,846✔
2917
        }
30,846✔
2918
        key := bytesToString(sub.sid)
32,818✔
2919
        osub := c.subs[key]
32,818✔
2920
        if osub == nil {
64,124✔
2921
                c.subs[key] = sub
31,306✔
2922
                // Now place into the account sl.
31,306✔
2923
                if err := acc.sl.Insert(sub); err != nil {
31,306✔
2924
                        delete(c.subs, key)
×
2925
                        c.mu.Unlock()
×
2926
                        c.Errorf("Could not insert subscription: %v", err)
×
2927
                        c.sendErr("Invalid Subscription")
×
2928
                        return nil
×
2929
                }
×
2930
        } else if sub.queue != nil {
3,023✔
2931
                // For a queue we need to update the weight.
1,511✔
2932
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,511✔
2933
                atomic.StoreInt32(&osub.qw, sub.qw)
1,511✔
2934
                acc.sl.UpdateRemoteQSub(osub)
1,511✔
2935
        }
1,511✔
2936
        spoke := c.isSpokeLeafNode()
32,818✔
2937
        c.mu.Unlock()
32,818✔
2938

32,818✔
2939
        // Only add in shadow subs if a new sub or qsub.
32,818✔
2940
        if osub == nil {
64,124✔
2941
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
31,306✔
2942
                        c.Errorf(err.Error())
×
2943
                }
×
2944
        }
2945

2946
        // If we are not solicited, treat leaf node subscriptions similar to a
2947
        // client subscription, meaning we forward them to routes, gateways and
2948
        // other leaf nodes as needed.
2949
        if !spoke {
44,410✔
2950
                // If we are routing add to the route map for the associated account.
11,592✔
2951
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,592✔
2952
                if srv.gateway.enabled {
13,127✔
2953
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,535✔
2954
                }
1,535✔
2955
        }
2956
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2957
        // and non-solicited state in this call so we will do the right thing.
2958
        acc.updateLeafNodes(sub, delta)
32,818✔
2959

32,818✔
2960
        return nil
32,818✔
2961
}
2962

2963
// If the leafnode is a solicited, set the connect delay based on default
2964
// or private option (for tests). Sends the error to the other side, log and
2965
// close the connection.
2966
func (c *client) handleLeafNodeLoop(sendErr bool) {
16✔
2967
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
16✔
2968
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
16✔
2969
        if sendErr {
24✔
2970
                c.sendErr(errTxt)
8✔
2971
        }
8✔
2972

2973
        c.Errorf(errTxt)
16✔
2974
        // If we are here with "sendErr" false, it means that this is the server
16✔
2975
        // that received the error. The other side will have closed the connection,
16✔
2976
        // but does not hurt to close here too.
16✔
2977
        c.closeConnection(ProtocolViolation)
16✔
2978
}
2979

2980
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2981
func (c *client) processLeafUnsub(arg []byte) error {
3,405✔
2982
        // Indicate any activity, so pub and sub or unsubs.
3,405✔
2983
        c.in.subs++
3,405✔
2984

3,405✔
2985
        srv := c.srv
3,405✔
2986

3,405✔
2987
        c.mu.Lock()
3,405✔
2988
        if c.isClosed() {
3,443✔
2989
                c.mu.Unlock()
38✔
2990
                return nil
38✔
2991
        }
38✔
2992

2993
        acc := c.acc
3,367✔
2994
        // Guard against LS- arriving before CONNECT has been processed.
3,367✔
2995
        if acc == nil {
3,367✔
2996
                c.mu.Unlock()
×
2997
                c.sendErr("Authorization Violation")
×
2998
                c.closeConnection(ProtocolViolation)
×
2999
                return nil
×
3000
        }
×
3001

3002
        spoke := c.isSpokeLeafNode()
3,367✔
3003
        // We store local subs by account and subject and optionally queue name.
3,367✔
3004
        // LS- will have the arg exactly as the key.
3,367✔
3005
        sub, ok := c.subs[string(arg)]
3,367✔
3006
        if !ok {
3,380✔
3007
                // If not found, don't try to update routes/gws/leaf nodes.
13✔
3008
                c.mu.Unlock()
13✔
3009
                return nil
13✔
3010
        }
13✔
3011
        delta := int32(1)
3,354✔
3012
        if len(sub.queue) > 0 {
3,775✔
3013
                delta = sub.qw
421✔
3014
        }
421✔
3015
        c.mu.Unlock()
3,354✔
3016

3,354✔
3017
        c.unsubscribe(acc, sub, true, true)
3,354✔
3018
        if !spoke {
4,404✔
3019
                // If we are routing subtract from the route map for the associated account.
1,050✔
3020
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,050✔
3021
                // Gateways
1,050✔
3022
                if srv.gateway.enabled {
1,328✔
3023
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
278✔
3024
                }
278✔
3025
        }
3026
        // Now check on leafnode updates for other leaf nodes.
3027
        acc.updateLeafNodes(sub, -delta)
3,354✔
3028
        return nil
3,354✔
3029
}
3030

3031
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
564✔
3032
        // Unroll splitArgs to avoid runtime/heap issues
564✔
3033
        args := c.argsa[:0]
564✔
3034
        start := -1
564✔
3035
        for i, b := range arg {
37,358✔
3036
                switch b {
36,794✔
3037
                case ' ', '\t', '\r', '\n':
1,624✔
3038
                        if start >= 0 {
3,248✔
3039
                                args = append(args, arg[start:i])
1,624✔
3040
                                start = -1
1,624✔
3041
                        }
1,624✔
3042
                default:
35,170✔
3043
                        if start < 0 {
37,358✔
3044
                                start = i
2,188✔
3045
                        }
2,188✔
3046
                }
3047
        }
3048
        if start >= 0 {
1,128✔
3049
                args = append(args, arg[start:])
564✔
3050
        }
564✔
3051

3052
        c.pa.arg = arg
564✔
3053
        switch len(args) {
564✔
3054
        case 0, 1, 2:
×
3055
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
3056
        case 3:
86✔
3057
                c.pa.reply = nil
86✔
3058
                c.pa.queues = nil
86✔
3059
                c.pa.hdb = args[1]
86✔
3060
                c.pa.hdr = parseSize(args[1])
86✔
3061
                c.pa.szb = args[2]
86✔
3062
                c.pa.size = parseSize(args[2])
86✔
3063
        case 4:
464✔
3064
                c.pa.reply = args[1]
464✔
3065
                c.pa.queues = nil
464✔
3066
                c.pa.hdb = args[2]
464✔
3067
                c.pa.hdr = parseSize(args[2])
464✔
3068
                c.pa.szb = args[3]
464✔
3069
                c.pa.size = parseSize(args[3])
464✔
3070
        default:
14✔
3071
                // args[1] is our reply indicator. Should be + or | normally.
14✔
3072
                if len(args[1]) != 1 {
14✔
3073
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3074
                }
×
3075
                switch args[1][0] {
14✔
3076
                case '+':
4✔
3077
                        c.pa.reply = args[2]
4✔
3078
                case '|':
10✔
3079
                        c.pa.reply = nil
10✔
3080
                default:
×
3081
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3082
                }
3083
                // Grab header size.
3084
                c.pa.hdb = args[len(args)-2]
14✔
3085
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
3086

14✔
3087
                // Grab size.
14✔
3088
                c.pa.szb = args[len(args)-1]
14✔
3089
                c.pa.size = parseSize(c.pa.szb)
14✔
3090

14✔
3091
                // Grab queue names.
14✔
3092
                if c.pa.reply != nil {
18✔
3093
                        c.pa.queues = args[3 : len(args)-2]
4✔
3094
                } else {
14✔
3095
                        c.pa.queues = args[2 : len(args)-2]
10✔
3096
                }
10✔
3097
        }
3098
        if c.pa.hdr < 0 {
564✔
3099
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
3100
        }
×
3101
        if c.pa.size < 0 {
564✔
3102
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
3103
        }
×
3104
        if c.pa.hdr > c.pa.size {
564✔
3105
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3106
        }
×
3107
        maxPayload := atomic.LoadInt32(&c.mpay)
564✔
3108
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
565✔
3109
                c.maxPayloadViolation(c.pa.size, maxPayload)
1✔
3110
                return ErrMaxPayload
1✔
3111
        }
1✔
3112

3113
        // Common ones processed after check for arg length
3114
        c.pa.subject = args[0]
563✔
3115

563✔
3116
        return nil
563✔
3117
}
3118

3119
func (c *client) processLeafMsgArgs(arg []byte) error {
90,444✔
3120
        // Unroll splitArgs to avoid runtime/heap issues
90,444✔
3121
        args := c.argsa[:0]
90,444✔
3122
        start := -1
90,444✔
3123
        for i, b := range arg {
2,987,300✔
3124
                switch b {
2,896,856✔
3125
                case ' ', '\t', '\r', '\n':
142,160✔
3126
                        if start >= 0 {
284,320✔
3127
                                args = append(args, arg[start:i])
142,160✔
3128
                                start = -1
142,160✔
3129
                        }
142,160✔
3130
                default:
2,754,696✔
3131
                        if start < 0 {
2,987,300✔
3132
                                start = i
232,604✔
3133
                        }
232,604✔
3134
                }
3135
        }
3136
        if start >= 0 {
180,888✔
3137
                args = append(args, arg[start:])
90,444✔
3138
        }
90,444✔
3139

3140
        c.pa.arg = arg
90,444✔
3141
        switch len(args) {
90,444✔
3142
        case 0, 1:
×
3143
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3144
        case 2:
61,441✔
3145
                c.pa.reply = nil
61,441✔
3146
                c.pa.queues = nil
61,441✔
3147
                c.pa.szb = args[1]
61,441✔
3148
                c.pa.size = parseSize(args[1])
61,441✔
3149
        case 3:
6,449✔
3150
                c.pa.reply = args[1]
6,449✔
3151
                c.pa.queues = nil
6,449✔
3152
                c.pa.szb = args[2]
6,449✔
3153
                c.pa.size = parseSize(args[2])
6,449✔
3154
        default:
22,554✔
3155
                // args[1] is our reply indicator. Should be + or | normally.
22,554✔
3156
                if len(args[1]) != 1 {
22,554✔
3157
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3158
                }
×
3159
                switch args[1][0] {
22,554✔
3160
                case '+':
159✔
3161
                        c.pa.reply = args[2]
159✔
3162
                case '|':
22,395✔
3163
                        c.pa.reply = nil
22,395✔
3164
                default:
×
3165
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3166
                }
3167
                // Grab size.
3168
                c.pa.szb = args[len(args)-1]
22,554✔
3169
                c.pa.size = parseSize(c.pa.szb)
22,554✔
3170

22,554✔
3171
                // Grab queue names.
22,554✔
3172
                if c.pa.reply != nil {
22,713✔
3173
                        c.pa.queues = args[3 : len(args)-1]
159✔
3174
                } else {
22,554✔
3175
                        c.pa.queues = args[2 : len(args)-1]
22,395✔
3176
                }
22,395✔
3177
        }
3178
        if c.pa.size < 0 {
90,444✔
3179
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3180
        }
×
3181
        maxPayload := atomic.LoadInt32(&c.mpay)
90,444✔
3182
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
90,445✔
3183
                c.maxPayloadViolation(c.pa.size, maxPayload)
1✔
3184
                return ErrMaxPayload
1✔
3185
        }
1✔
3186

3187
        // Common ones processed after check for arg length
3188
        c.pa.subject = args[0]
90,443✔
3189

90,443✔
3190
        return nil
90,443✔
3191
}
3192

3193
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3194
func (c *client) processInboundLeafMsg(msg []byte) {
89,346✔
3195
        // Update statistics
89,346✔
3196
        // The msg includes the CR_LF, so pull back out for accounting.
89,346✔
3197
        c.in.msgs++
89,346✔
3198
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
89,346✔
3199

89,346✔
3200
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
89,346✔
3201

89,346✔
3202
        // Mostly under testing scenarios.
89,346✔
3203
        if srv == nil || acc == nil {
89,347✔
3204
                return
1✔
3205
        }
1✔
3206

3207
        // Check that leaf messages respect the subject permissions.
3208
        if c.perms != nil && !c.leafMsgAllowed() {
89,350✔
3209
                c.leafPubPermViolation(c.pa.subject)
5✔
3210
                return
5✔
3211
        }
5✔
3212

3213
        // Match the subscriptions. We will use our own L1 map if
3214
        // it's still valid, avoiding contention on the shared sublist.
3215
        var r *SublistResult
89,340✔
3216
        var ok bool
89,340✔
3217

89,340✔
3218
        genid := atomic.LoadUint64(&c.acc.sl.genid)
89,340✔
3219
        if genid == c.in.genid && c.in.results != nil {
176,253✔
3220
                r, ok = c.in.results[subject]
86,913✔
3221
        } else {
89,340✔
3222
                // Reset our L1 completely.
2,427✔
3223
                c.in.results = make(map[string]*SublistResult)
2,427✔
3224
                c.in.genid = genid
2,427✔
3225
        }
2,427✔
3226

3227
        // Go back to the sublist data structure.
3228
        if !ok {
148,046✔
3229
                r = c.acc.sl.Match(subject)
58,706✔
3230
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
58,706✔
3231
                if len(c.in.results) >= maxResultCacheSize {
60,305✔
3232
                        n := 0
1,599✔
3233
                        for subj := range c.in.results {
54,366✔
3234
                                delete(c.in.results, subj)
52,767✔
3235
                                if n++; n > pruneSize {
54,366✔
3236
                                        break
1,599✔
3237
                                }
3238
                        }
3239
                }
3240
                // Then add the new cache entry.
3241
                c.in.results[subject] = r
58,706✔
3242
        }
3243

3244
        // Collect queue names if needed.
3245
        var qnames [][]byte
89,340✔
3246

89,340✔
3247
        // Check for no interest, short circuit if so.
89,340✔
3248
        // This is the fanout scale.
89,340✔
3249
        if len(r.psubs)+len(r.qsubs) > 0 {
178,216✔
3250
                flag := pmrNoFlag
88,876✔
3251
                // If we have queue subs in this cluster, then if we run in gateway
88,876✔
3252
                // mode and the remote gateways have queue subs, then we need to
88,876✔
3253
                // collect the queue groups this message was sent to so that we
88,876✔
3254
                // exclude them when sending to gateways.
88,876✔
3255
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
88,876✔
3256
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
101,167✔
3257
                        flag |= pmrCollectQueueNames
12,291✔
3258
                }
12,291✔
3259
                // If this is a mapped subject that means the mapped interest
3260
                // is what got us here, but this might not have a queue designation
3261
                // If that is the case, make sure we ignore to process local queue subscribers.
3262
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
89,211✔
3263
                        flag |= pmrIgnoreEmptyQueueFilter
335✔
3264
                }
335✔
3265
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
88,876✔
3266
        }
3267

3268
        // Now deal with gateways
3269
        if c.srv.gateway.enabled {
102,715✔
3270
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,375✔
3271
        }
13,375✔
3272
}
3273

3274
// Checks whether the inbound leaf message is allowed by the
3275
// connection's permissions. On the hub side this enforces what
3276
// the remote leaf may publish. On the spoke side this enforces
3277
// import restrictions such as deny_imports.
3278
func (c *client) leafMsgAllowed() bool {
85,121✔
3279
        wireSubject := c.pa.subject
85,121✔
3280
        if len(c.pa.mapped) > 0 {
85,456✔
3281
                // Mappings rewrite c.pa.subject to the internal
335✔
3282
                // destination. For leaf ACLs, need to check
335✔
3283
                // the original wire subject from the remote side.
335✔
3284
                wireSubject = c.pa.mapped
335✔
3285
        }
335✔
3286
        // Strip any gateway routing prefix for the permission check.
3287
        subjectToCheck, isGW := getGWRoutedSubjectOrSelf(wireSubject)
85,121✔
3288

85,121✔
3289
        // Service-import replies (_R_), JS ack subjects ($JS.ACK.)
85,121✔
3290
        // are internal routing subjects forwarded via LS+ without
85,121✔
3291
        // permission checks.
85,121✔
3292
        if isServiceReply(subjectToCheck) || isJSAckSubject(subjectToCheck) {
85,165✔
3293
                return true
44✔
3294
        }
44✔
3295

3296
        c.mu.Lock()
85,077✔
3297
        defer c.mu.Unlock()
85,077✔
3298

85,077✔
3299
        if c.isSpokeLeafNode() {
130,239✔
3300
                // Gateway routed replies are forwarded without
45,162✔
3301
                // permission checks.
45,162✔
3302
                if isGW || c.leafReceiveAllowed(subjectToCheck) {
90,322✔
3303
                        return true
45,160✔
3304
                }
45,160✔
3305
        } else if c.leafSendAllowed(subjectToCheck) {
79,824✔
3306
                return true
39,909✔
3307
        }
39,909✔
3308
        // Check tracked reply permissions (allow_responses).
3309
        // Use the pre-strip subject since deliverMsg tracks
3310
        // replies under the original form, which includes
3311
        // the GW routing prefix for routed requests.
3312
        return c.responseAllowed(bytesToString(wireSubject))
8✔
3313
}
3314

3315
// Returns true if the leaf side ACLs allow importing this subject,
3316
// based on the permissions received over INFO and any local deny_imports.
3317
// Lock must be held.
3318
func (c *client) leafReceiveAllowed(subject []byte) bool {
45,162✔
3319
        return c.canSubscribe(bytesToString(subject))
45,162✔
3320
}
45,162✔
3321

3322
// Returns true if the hub side ACLs allow the remote leaf to send
3323
// this subject.
3324
// Lock must be held.
3325
func (c *client) leafSendAllowed(bsubject []byte) bool {
39,915✔
3326
        // Use the original export ACL captured for this accepted leaf.
39,915✔
3327
        // The live perms also contain additional JetStream denies used by
39,915✔
3328
        // the normal forwarding path, and applying them here would reject
39,915✔
3329
        // legitimate inbound JS API requests.
39,915✔
3330
        subject := bytesToString(bsubject)
39,915✔
3331
        perms := c.opts.Export
39,915✔
3332
        if perms == nil || (perms.Allow == nil && perms.Deny == nil) {
79,805✔
3333
                return true
39,890✔
3334
        }
39,890✔
3335

3336
        allowed := true
25✔
3337
        if perms.Allow != nil && !strings.HasPrefix(subject, mqttPrefix) {
36✔
3338
                allowed = false
11✔
3339
                for _, allowSubj := range perms.Allow {
21✔
3340
                        if matchLiteral(subject, allowSubj) {
16✔
3341
                                allowed = true
6✔
3342
                                break
6✔
3343
                        }
3344
                }
3345
        }
3346

3347
        if allowed && len(perms.Deny) > 0 {
39✔
3348
                for _, denySubj := range perms.Deny {
40✔
3349
                        if matchLiteral(subject, denySubj) {
27✔
3350
                                allowed = false
1✔
3351
                                break
1✔
3352
                        }
3353
                }
3354
        }
3355
        return allowed
25✔
3356
}
3357

3358
// Handles a subscription permission violation.
3359
// See leafPermViolation() for details.
3360
func (c *client) leafSubPermViolation(subj []byte) {
336✔
3361
        c.leafPermViolation(false, subj)
336✔
3362
}
336✔
3363

3364
// Handles a publish permission violation.
3365
// See leafPermViolation() for details.
3366
func (c *client) leafPubPermViolation(subj []byte) {
5✔
3367
        c.leafPermViolation(true, subj)
5✔
3368
}
5✔
3369

3370
// Common function to process publish or subscribe leafnode permission violation.
3371
// Sends the permission violation error to the remote, logs it and closes the connection.
3372
// If this is from a server soliciting, the reconnection will be delayed.
3373
func (c *client) leafPermViolation(pub bool, subj []byte) {
341✔
3374
        if c.isSpokeLeafNode() {
679✔
3375
                // For spokes these are no-ops since the hub server told us our permissions.
338✔
3376
                // We just need to not send these over to the other side since we will get cutoff.
338✔
3377
                return
338✔
3378
        }
338✔
3379
        // FIXME(dlc) ?
3380
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
3✔
3381
        var action string
3✔
3382
        if pub {
6✔
3383
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
3✔
3384
                action = "Publish"
3✔
3385
        } else {
3✔
3386
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3387
                action = "Subscription"
×
3388
        }
×
3389
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
3✔
3390
        // TODO: add a new close reason that is more appropriate?
3✔
3391
        c.closeConnection(ProtocolViolation)
3✔
3392
}
3393

3394
// Invoked from generic processErr() for LEAF connections.
3395
func (c *client) leafProcessErr(errStr string) {
47✔
3396
        // Check if we got a cluster name collision.
47✔
3397
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
50✔
3398
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3399
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3400
                return
3✔
3401
        }
3✔
3402
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
45✔
3403
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3404
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3405
                return
1✔
3406
        }
1✔
3407

3408
        // We will look for Loop detected error coming from the other side.
3409
        // If we solicit, set the connect delay.
3410
        if !strings.Contains(errStr, "Loop detected") {
78✔
3411
                return
35✔
3412
        }
35✔
3413
        c.handleLeafNodeLoop(false)
8✔
3414
}
3415

3416
// If this leaf connection solicits, sets the connect delay to the given value,
3417
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3418
// Returns the connection's account name and delay.
3419
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
23✔
3420
        c.mu.Lock()
23✔
3421
        if c.isSolicitedLeafNode() {
36✔
3422
                if s := c.srv; s != nil {
26✔
3423
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
17✔
3424
                                delay = srvdelay
4✔
3425
                        }
4✔
3426
                }
3427
                c.leaf.remote.setConnectDelay(delay)
13✔
3428
        }
3429
        var accName string
23✔
3430
        if c.acc != nil {
46✔
3431
                accName = c.acc.Name
23✔
3432
        }
23✔
3433
        c.mu.Unlock()
23✔
3434
        return accName, delay
23✔
3435
}
3436

3437
// For the given remote Leafnode configuration, this function returns
3438
// if TLS is required, and if so, will return a clone of the TLS Config
3439
// (since some fields will be changed during handshake), the TLS server
3440
// name that is remembered, and the TLS timeout.
3441
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,968✔
3442
        var (
1,968✔
3443
                tlsConfig  *tls.Config
1,968✔
3444
                tlsName    string
1,968✔
3445
                tlsTimeout float64
1,968✔
3446
        )
1,968✔
3447

1,968✔
3448
        remote.RLock()
1,968✔
3449
        defer remote.RUnlock()
1,968✔
3450

1,968✔
3451
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,968✔
3452
        if tlsRequired {
2,045✔
3453
                if remote.TLSConfig != nil {
127✔
3454
                        tlsConfig = remote.TLSConfig.Clone()
50✔
3455
                } else {
77✔
3456
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
27✔
3457
                }
27✔
3458
                tlsName = remote.tlsName
77✔
3459
                tlsTimeout = remote.TLSTimeout
77✔
3460
                if tlsTimeout == 0 {
121✔
3461
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
44✔
3462
                }
44✔
3463
        }
3464

3465
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,968✔
3466
}
3467

3468
// Initiates the LeafNode Websocket connection by:
3469
// - doing the TLS handshake if needed
3470
// - sending the HTTP request
3471
// - waiting for the HTTP response
3472
//
3473
// Since some bufio reader is used to consume the HTTP response, this function
3474
// returns the slice of buffered bytes (if any) so that the readLoop that will
3475
// be started after that consume those first before reading from the socket.
3476
// The boolean
3477
//
3478
// Lock held on entry.
3479
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3480
        remote.RLock()
50✔
3481
        compress := remote.Websocket.Compression
50✔
3482
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3483
        noMasking := remote.Websocket.NoMasking
50✔
3484
        infoTimeout := remote.FirstInfoTimeout
50✔
3485
        remote.RUnlock()
50✔
3486
        // Will do the client-side TLS handshake if needed.
50✔
3487
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3488
        if err != nil {
54✔
3489
                // 0 will indicate that the connection was already closed
4✔
3490
                return nil, 0, err
4✔
3491
        }
4✔
3492

3493
        // For http request, we need the passed URL to contain either http or https scheme.
3494
        scheme := "http"
46✔
3495
        if tlsRequired {
54✔
3496
                scheme = "https"
8✔
3497
        }
8✔
3498
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3499
        // create a LEAF connection, not a CLIENT.
3500
        // In case we use the user's URL path in the future, make sure we append the user's
3501
        // path to our `/leafnode` path.
3502
        lpath := leafNodeWSPath
46✔
3503
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3504
                if curPath[0] == '/' {
42✔
3505
                        curPath = curPath[1:]
21✔
3506
                }
21✔
3507
                lpath = path.Join(curPath, lpath)
21✔
3508
        } else {
25✔
3509
                lpath = lpath[1:]
25✔
3510
        }
25✔
3511
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3512
        u, _ := url.Parse(ustr)
46✔
3513
        req := &http.Request{
46✔
3514
                Method:     "GET",
46✔
3515
                URL:        u,
46✔
3516
                Proto:      "HTTP/1.1",
46✔
3517
                ProtoMajor: 1,
46✔
3518
                ProtoMinor: 1,
46✔
3519
                Header:     make(http.Header),
46✔
3520
                Host:       u.Host,
46✔
3521
        }
46✔
3522
        wsKey, err := wsMakeChallengeKey()
46✔
3523
        if err != nil {
46✔
3524
                return nil, WriteError, err
×
3525
        }
×
3526

3527
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3528
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3529
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3530
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3531
        if compress {
55✔
3532
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3533
        }
9✔
3534
        if noMasking {
56✔
3535
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3536
        }
10✔
3537
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3538
        if err := req.Write(c.nc); err != nil {
46✔
3539
                return nil, WriteError, err
×
3540
        }
×
3541

3542
        var resp *http.Response
46✔
3543

46✔
3544
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3545
        resp, err = http.ReadResponse(br, req)
46✔
3546
        if err == nil &&
46✔
3547
                (resp.StatusCode != 101 ||
46✔
3548
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3549
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3550
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3551

1✔
3552
                err = fmt.Errorf("invalid websocket connection")
1✔
3553
        }
1✔
3554
        // Check compression extension...
3555
        if err == nil && c.ws.compress {
55✔
3556
                // Check that not only permessage-deflate extension is present, but that
9✔
3557
                // we also have server and client no context take over.
9✔
3558
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3559

9✔
3560
                // If server does not support compression, then simply disable it in our side.
9✔
3561
                if !srvCompress {
13✔
3562
                        c.ws.compress = false
4✔
3563
                } else if !noCtxTakeover {
9✔
3564
                        err = fmt.Errorf("compression negotiation error")
×
3565
                }
×
3566
        }
3567
        // Same for no masking...
3568
        if err == nil && noMasking {
56✔
3569
                // Check if server accepts no masking
10✔
3570
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3571
                        // Nope, need to mask our writes as any client would do.
1✔
3572
                        c.ws.maskwrite = true
1✔
3573
                }
1✔
3574
        }
3575
        if resp != nil {
76✔
3576
                resp.Body.Close()
30✔
3577
        }
30✔
3578
        if err != nil {
63✔
3579
                return nil, ReadError, err
17✔
3580
        }
17✔
3581
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3582

29✔
3583
        var preBuf []byte
29✔
3584
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3585
        if n := br.Buffered(); n != 0 {
29✔
3586
                preBuf, _ = br.Peek(n)
×
3587
        }
×
3588
        return preBuf, 0, nil
29✔
3589
}
3590

3591
const connectProcessTimeout = 2 * time.Second
3592

3593
// This is invoked for remote LEAF remote connections after processing the INFO
3594
// protocol.
3595
func (s *Server) leafNodeResumeConnectProcess(c *client) {
693✔
3596
        clusterName := s.ClusterName()
693✔
3597

693✔
3598
        c.mu.Lock()
693✔
3599
        if c.isClosed() {
693✔
3600
                c.mu.Unlock()
×
3601
                return
×
3602
        }
×
3603
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
695✔
3604
                c.mu.Unlock()
2✔
3605
                c.closeConnection(WriteError)
2✔
3606
                return
2✔
3607
        }
2✔
3608

3609
        // Spin up the write loop.
3610
        s.startGoRoutine(func() { c.writeLoop() })
1,382✔
3611

3612
        // timeout leafNodeFinishConnectProcess
3613
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
691✔
3614
                c.mu.Lock()
×
3615
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3616
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3617
                        c.mu.Unlock()
×
3618
                        return
×
3619
                }
×
3620
                clearTimer(&c.ping.tmr)
×
3621
                closed := c.isClosed()
×
3622
                c.mu.Unlock()
×
3623
                if !closed {
×
3624
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3625
                        c.closeConnection(StaleConnection)
×
3626
                }
×
3627
        })
3628
        c.mu.Unlock()
691✔
3629
        c.Debugf("Remote leafnode connect msg sent")
691✔
3630
}
3631

3632
// This is invoked for remote LEAF connections after processing the INFO
3633
// protocol and leafNodeResumeConnectProcess.
3634
// This will send LS+ the CONNECT protocol and register the leaf node.
3635
func (s *Server) leafNodeFinishConnectProcess(c *client) {
655✔
3636
        c.mu.Lock()
655✔
3637
        if !c.flags.setIfNotSet(connectProcessFinished) {
655✔
3638
                c.mu.Unlock()
×
3639
                return
×
3640
        }
×
3641
        if c.isClosed() {
655✔
3642
                c.mu.Unlock()
×
3643
                s.removeLeafNodeConnection(c)
×
3644
                return
×
3645
        }
×
3646
        remote := c.leaf.remote
655✔
3647
        // Check if we will need to send the system connect event.
655✔
3648
        remote.RLock()
655✔
3649
        sendSysConnectEvent := remote.Hub
655✔
3650
        remote.RUnlock()
655✔
3651

655✔
3652
        // Capture account before releasing lock
655✔
3653
        acc := c.acc
655✔
3654
        // cancel connectProcessTimeout
655✔
3655
        clearTimer(&c.ping.tmr)
655✔
3656
        c.mu.Unlock()
655✔
3657

655✔
3658
        // Make sure we register with the account here.
655✔
3659
        if err := c.registerWithAccount(acc); err != nil {
657✔
3660
                if err == ErrTooManyAccountConnections {
2✔
3661
                        c.maxAccountConnExceeded()
×
3662
                        return
×
3663
                } else if err == ErrLeafNodeLoop {
4✔
3664
                        c.handleLeafNodeLoop(true)
2✔
3665
                        return
2✔
3666
                }
2✔
3667
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3668
                c.closeConnection(ProtocolViolation)
×
3669
                return
×
3670
        }
3671
        if !s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false) {
653✔
3672
                // Was not added, could be because the remote configuration has been removed.
×
3673
                c.closeConnection(ClientClosed)
×
3674
                return
×
3675
        }
×
3676
        s.initLeafNodeSmapAndSendSubs(c)
653✔
3677
        if sendSysConnectEvent {
671✔
3678
                s.sendLeafNodeConnect(acc)
18✔
3679
        }
18✔
3680
        s.accountConnectEvent(c)
653✔
3681

653✔
3682
        // The above functions are not running under the client lock, so it is
653✔
3683
        // possible that between the time we have started the read/write loops
653✔
3684
        // and now, that the connection was closed. This would leave the closed
653✔
3685
        // LN connection possibly registered with the account and/or the server's
653✔
3686
        // leafs map. So check if connection is closed, and if so, manually cleanup.
653✔
3687
        c.mu.Lock()
653✔
3688
        closed := c.isClosed()
653✔
3689
        if !closed {
1,305✔
3690
                c.setFirstPingTimer()
652✔
3691
        }
652✔
3692
        c.mu.Unlock()
653✔
3693
        if closed {
654✔
3694
                s.removeLeafNodeConnection(c)
1✔
3695
                if prev := acc.removeClient(c); prev == 1 {
1✔
3696
                        s.decActiveAccounts()
×
3697
                }
×
3698
        }
3699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc