• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 25204149721

30 Apr 2026 04:24PM UTC coverage: 83.024% (+2.4%) from 80.64%
25204149721

push

github

web-flow
[FIXED] Cluster route compression uses Cluster.MaxPingsOut (#8093)

The global `MaxPingsOut` was used, even if `Cluster.MaxPingsOut` was
configured to override it.

77149 of 92924 relevant lines covered (83.02%)

431136.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.07
/server/leafnode.go
1
// Copyright 2019-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "regexp"
31
        "runtime"
32
        "strconv"
33
        "strings"
34
        "sync"
35
        "sync/atomic"
36
        "time"
37

38
        "github.com/klauspost/compress/s2"
39
        "github.com/nats-io/jwt/v2"
40
        "github.com/nats-io/nkeys"
41
        "github.com/nats-io/nuid"
42
)
43

44
const (
45
        // Warning when user configures leafnode TLS insecure
46
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
47

48
        // When a loop is detected, delay the reconnect of solicited connection.
49
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
50

51
        // When a server receives a message causing a permission violation, the
52
        // connection is closed and it won't attempt to reconnect for that long.
53
        leafNodeReconnectAfterPermViolation = 30 * time.Second
54

55
        // When we have the same cluster name as the hub.
56
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
57

58
        // Prefix for loop detection subject
59
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
60

61
        // Path added to URL to indicate to WS server that the connection is a
62
        // LEAF connection as opposed to a CLIENT.
63
        leafNodeWSPath = "/leafnode"
64

65
        // When a soliciting leafnode is rejected because it does not meet the
66
        // configured minimum version, delay the next reconnect attempt by this long.
67
        leafNodeMinVersionReconnectDelay = 5 * time.Second
68
)
69

70
type leaf struct {
71
        // We have any auth stuff here for solicited connections.
72
        remote *leafNodeCfg
73
        // isSpoke tells us what role we are playing.
74
        // Used when we receive a connection but otherside tells us they are a hub.
75
        isSpoke bool
76
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
77
        remoteCluster string
78
        // remoteServer holds onto the remote server's name or ID.
79
        remoteServer string
80
        // domain name of remote server
81
        remoteDomain string
82
        // account name of remote server
83
        remoteAccName string
84
        // Whether or not we want to propagate east-west interest from other LNs.
85
        isolated bool
86
        // Used to suppress sub and unsub interest. Same as routes but our audience
87
        // here is tied to this leaf node. This will hold all subscriptions except this
88
        // leaf nodes. This represents all the interest we want to send to the other side.
89
        smap map[string]int32
90
        // This map will contain all the subscriptions that have been added to the smap
91
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
92
        // race between processing of a sub where sub is added to account sublist but
93
        // updateSmap has not be called on that "thread", while in the LN readloop,
94
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
95
        // this subscription to smap. When processing of the sub then calls updateSmap,
96
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
97
        tsub  map[*subscription]struct{}
98
        tsubt *time.Timer
99
        // Selected compression mode, which may be different from the server configured mode.
100
        compression string
101
        // This is for GW map replies.
102
        gwSub *subscription
103
}
104

105
// Used for remote (solicited) leafnodes.
106
type leafNodeCfg struct {
107
        sync.RWMutex
108
        *RemoteLeafOpts
109
        urls           []*url.URL
110
        curURL         *url.URL
111
        tlsName        string
112
        username       string
113
        password       string
114
        perms          *Permissions
115
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
116
        jsMigrateTimer *time.Timer
117
        quitCh         chan struct{}
118
        removed        bool
119
        connInProgress bool
120
}
121

122
// Check to see if this is a solicited leafnode. We do special processing for solicited.
123
func (c *client) isSolicitedLeafNode() bool {
2,204✔
124
        return c.kind == LEAF && c.leaf != nil && c.leaf.remote != nil
2,204✔
125
}
2,204✔
126

127
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
128
// connection leafnode where the otherside has declared itself to be the hub.
129
func (c *client) isSpokeLeafNode() bool {
8,766,365✔
130
        return c.kind == LEAF && c.leaf != nil && c.leaf.isSpoke
8,766,365✔
131
}
8,766,365✔
132

133
func (c *client) isHubLeafNode() bool {
17,919✔
134
        return c.kind == LEAF && c.leaf != nil && !c.leaf.isSpoke
17,919✔
135
}
17,919✔
136

137
func (c *client) isIsolatedLeafNode() bool {
11,405✔
138
        // TODO(nat): In future we may want to pass in and consider an isolation
11,405✔
139
        // group name here, which the hub and/or leaf could provide, so that we
11,405✔
140
        // can isolate away certain LNs but not others on an opt-in basis. For
11,405✔
141
        // now we will just isolate all LN interest until then.
11,405✔
142
        return c.kind == LEAF && c.leaf != nil && c.leaf.isolated
11,405✔
143
}
11,405✔
144

145
// This will spin up go routines to solicit the remote leaf node connections.
146
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,276✔
147
        sysAccName := _EMPTY_
1,276✔
148
        sAcc := s.SystemAccount()
1,276✔
149
        if sAcc != nil {
2,529✔
150
                sysAccName = sAcc.Name
1,253✔
151
        }
1,253✔
152
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,703✔
153
                s.mu.Lock()
1,427✔
154
                remote := newLeafNodeCfg(r)
1,427✔
155
                creds := remote.Credentials
1,427✔
156
                accName := remote.LocalAccount
1,427✔
157
                if s.leafRemoteCfgs == nil {
2,702✔
158
                        s.leafRemoteCfgs = make(map[*leafNodeCfg]struct{})
1,275✔
159
                }
1,275✔
160
                s.leafRemoteCfgs[remote] = struct{}{}
1,427✔
161
                // Print notice if
1,427✔
162
                if isSysAccRemote {
1,526✔
163
                        if len(remote.DenyExports) > 0 {
100✔
164
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
165
                        }
1✔
166
                        if len(remote.DenyImports) > 0 {
100✔
167
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
168
                        }
1✔
169
                }
170
                s.mu.Unlock()
1,427✔
171
                if creds != _EMPTY_ {
1,479✔
172
                        contents, err := os.ReadFile(creds)
52✔
173
                        defer wipeSlice(contents)
52✔
174
                        if err != nil {
52✔
175
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
176
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
177
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
178
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
179
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
180
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
181
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
182
                        } else if isSysAccRemote {
56✔
183
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
184
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
185
                                }
1✔
186
                        } else {
48✔
187
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
188
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
189
                                }
6✔
190
                        }
191
                }
192
                return remote
1,427✔
193
        }
194
        for _, r := range remotes {
2,703✔
195
                // We need to call this, even if the leaf is disabled. This is so that
1,427✔
196
                // the number of internal configuration matches the options' remote leaf
1,427✔
197
                // configuration required for configuration reload.
1,427✔
198
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,427✔
199
                if !r.Disabled {
2,853✔
200
                        s.connectToRemoteLeafNodeAsynchronously(remote, true)
1,426✔
201
                }
1,426✔
202
        }
203
}
204

205
// Ensure that leafnode is properly configured.
206
func validateLeafNode(o *Options) error {
8,343✔
207
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,345✔
208
                return err
2✔
209
        }
2✔
210

211
        if len(o.LeafNode.Remotes) > 0 {
9,659✔
212
                names := make(map[string]struct{})
1,318✔
213
                // Check for duplicate remotes, also, users can bind to any local account,
1,318✔
214
                // if its empty we will assume the $G account.
1,318✔
215
                for _, r := range o.LeafNode.Remotes {
2,798✔
216
                        if r.LocalAccount == _EMPTY_ {
1,928✔
217
                                r.LocalAccount = globalAccountName
448✔
218
                        }
448✔
219
                        rn := r.name()
1,480✔
220
                        if _, dup := names[rn]; dup {
1,483✔
221
                                return fmt.Errorf("duplicate remote %s", r.safeName())
3✔
222
                        }
3✔
223
                        names[rn] = struct{}{}
1,477✔
224
                }
225
        }
226

227
        // In local config mode, check that leafnode configuration refers to accounts that exist.
228
        if len(o.TrustedOperators) == 0 {
16,355✔
229
                accNames := map[string]struct{}{}
8,017✔
230
                for _, a := range o.Accounts {
16,906✔
231
                        accNames[a.Name] = struct{}{}
8,889✔
232
                }
8,889✔
233
                // global account is always created
234
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,017✔
235
                // in the context of leaf nodes, empty account means global account
8,017✔
236
                accNames[_EMPTY_] = struct{}{}
8,017✔
237
                // system account either exists or, if not disabled, will be created
8,017✔
238
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,466✔
239
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,449✔
240
                }
6,449✔
241
                checkAccountExists := func(accName string, cfgType string) error {
17,511✔
242
                        if _, ok := accNames[accName]; !ok {
9,496✔
243
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
244
                        }
2✔
245
                        return nil
9,492✔
246
                }
247
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,018✔
248
                        return err
1✔
249
                }
1✔
250
                for _, lu := range o.LeafNode.Users {
8,033✔
251
                        if lu.Account == nil { // means global account
27✔
252
                                continue
10✔
253
                        }
254
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
255
                                return err
×
256
                        }
×
257
                }
258
                for _, r := range o.LeafNode.Remotes {
9,486✔
259
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,471✔
260
                                return err
1✔
261
                        }
1✔
262
                }
263
        } else {
321✔
264
                if len(o.LeafNode.Users) != 0 {
322✔
265
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
266
                }
1✔
267
                for _, r := range o.LeafNode.Remotes {
321✔
268
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
269
                                return fmt.Errorf(
1✔
270
                                        "operator mode requires account nkeys in remotes. " +
1✔
271
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
272
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
273
                        }
1✔
274
                }
275
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
320✔
276
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
277
                }
1✔
278
        }
279

280
        // Validate compression settings
281
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,923✔
282
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,595✔
283
                        return err
5✔
284
                }
5✔
285
        }
286

287
        // If a remote has a websocket scheme, all need to have it.
288
        for _, rcfg := range o.LeafNode.Remotes {
9,797✔
289
                // Validate proxy configuration
1,469✔
290
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,475✔
291
                        return err
6✔
292
                }
6✔
293

294
                if len(rcfg.URLs) >= 2 {
1,675✔
295
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
212✔
296
                        for i := 1; i < len(rcfg.URLs); i++ {
661✔
297
                                u := rcfg.URLs[i]
449✔
298
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
456✔
299
                                        ok = false
7✔
300
                                        break
7✔
301
                                }
302
                        }
303
                        if !ok {
219✔
304
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
305
                        }
7✔
306
                }
307
                // Validate compression settings
308
                if rcfg.Compression.Mode != _EMPTY_ {
2,908✔
309
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,457✔
310
                                return err
5✔
311
                        }
5✔
312
                }
313
        }
314

315
        if o.LeafNode.Port == 0 {
12,629✔
316
                return nil
4,319✔
317
        }
4,319✔
318

319
        // If MinVersion is defined, check that it is valid.
320
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,995✔
321
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
322
                        return err
2✔
323
                }
2✔
324
        }
325

326
        // The checks below will be done only when detecting that we are configured
327
        // with gateways. So if an option validation needs to be done regardless,
328
        // it MUST be done before this point!
329

330
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
7,293✔
331
                return nil
3,304✔
332
        }
3,304✔
333
        // If we are here we have both leaf nodes and gateways defined, make sure there
334
        // is a system account defined.
335
        if o.SystemAccount == _EMPTY_ {
686✔
336
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
337
        }
1✔
338
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
684✔
339
                return fmt.Errorf("leafnode: %v", err)
×
340
        }
×
341
        return nil
684✔
342
}
343

344
func checkLeafMinVersionConfig(mv string) error {
8✔
345
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
346
                if err != nil {
6✔
347
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
348
                } else {
4✔
349
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
350
                }
2✔
351
        }
352
        return nil
4✔
353
}
354

355
// Used to validate user names in LeafNode configuration.
356
// - rejects mix of single and multiple users.
357
// - rejects duplicate user names.
358
func validateLeafNodeAuthOptions(o *Options) error {
8,405✔
359
        if len(o.LeafNode.Users) == 0 {
16,783✔
360
                return nil
8,378✔
361
        }
8,378✔
362
        if o.LeafNode.Username != _EMPTY_ {
29✔
363
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
364
        }
2✔
365
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
366
                return fmt.Errorf("can not have a single nkey and a users array")
×
367
        }
×
368
        users := map[string]struct{}{}
25✔
369
        for _, u := range o.LeafNode.Users {
66✔
370
                if _, exists := users[u.Username]; exists {
43✔
371
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
372
                }
2✔
373
                users[u.Username] = struct{}{}
39✔
374
        }
375
        return nil
23✔
376
}
377

378
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
2,082✔
379
        var warnings []string
2,082✔
380

2,082✔
381
        if remote.Proxy.URL == _EMPTY_ {
4,138✔
382
                return warnings, nil
2,056✔
383
        }
2,056✔
384

385
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
386
        if err != nil {
27✔
387
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
388
        }
1✔
389

390
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
391
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
392
        }
2✔
393

394
        if proxyURL.Host == _EMPTY_ {
25✔
395
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
396
        }
2✔
397

398
        if remote.Proxy.Timeout < 0 {
22✔
399
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
400
        }
1✔
401

402
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
403
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
404
        }
4✔
405

406
        if len(remote.URLs) > 0 {
32✔
407
                hasWebSocketURL := false
16✔
408
                hasNonWebSocketURL := false
16✔
409

16✔
410
                for _, remoteURL := range remote.URLs {
33✔
411
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
412
                                hasWebSocketURL = true
13✔
413
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
414
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
415
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
416
                                }
1✔
417
                        } else {
4✔
418
                                hasNonWebSocketURL = true
4✔
419
                        }
4✔
420
                }
421

422
                if !hasWebSocketURL {
18✔
423
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
424
                } else if hasNonWebSocketURL {
16✔
425
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
426
                }
1✔
427
        }
428

429
        return warnings, nil
15✔
430
}
431

432
// Wait for the configured reconnect interval before attempting to connect
433
// again to the remote leafnode.
434
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
252✔
435
        clearInProgress := true
252✔
436
        defer func() {
503✔
437
                s.grWG.Done()
251✔
438
                if clearInProgress {
321✔
439
                        remote.setConnectInProgress(false)
70✔
440
                }
70✔
441
        }()
442
        delay := s.getOpts().LeafNode.ReconnectInterval
252✔
443
        select {
252✔
444
        case <-time.After(delay):
190✔
445
        case <-remote.quitCh:
×
446
                return
×
447
        case <-s.quitCh:
62✔
448
                return
62✔
449
        }
450
        clearInProgress = !connectToRemoteLeafNode(s, remote, false)
190✔
451
}
452

453
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
454
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,427✔
455
        cfg := &leafNodeCfg{
1,427✔
456
                RemoteLeafOpts: remote,
1,427✔
457
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,427✔
458
                quitCh:         make(chan struct{}, 1),
1,427✔
459
        }
1,427✔
460
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,437✔
461
                perms := &Permissions{}
10✔
462
                if len(remote.DenyExports) > 0 {
19✔
463
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
9✔
464
                }
9✔
465
                if len(remote.DenyImports) > 0 {
18✔
466
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
8✔
467
                }
8✔
468
                cfg.perms = perms
10✔
469
        }
470
        // Start with the one that is configured. We will add to this
471
        // array when receiving async leafnode INFOs.
472
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,427✔
473
        // If allowed to randomize, do it on our copy of URLs
1,427✔
474
        if !remote.NoRandomize {
2,852✔
475
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,836✔
476
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
411✔
477
                })
411✔
478
        }
479
        // If we are TLS make sure we save off a proper servername if possible.
480
        // Do same for user/password since we may need them to connect to
481
        // a bare URL that we get from INFO protocol.
482
        for _, u := range cfg.urls {
3,295✔
483
                cfg.saveTLSHostname(u)
1,868✔
484
                cfg.saveUserPassword(u)
1,868✔
485
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,868✔
486
                // config, mark that we should be using TLS anyway.
1,868✔
487
                if !cfg.TLS && isWSSURL(u) {
1,869✔
488
                        cfg.TLS = true
1✔
489
                }
1✔
490
        }
491
        return cfg
1,427✔
492
}
493

494
// Notifies the quit channel without blocking.
495
// No lock is needed to invoke this function.
496
func (cfg *leafNodeCfg) notifyQuitChannel() {
2✔
497
        select {
2✔
498
        case cfg.quitCh <- struct{}{}:
2✔
499
        default:
×
500
        }
501
}
502

503
// Sets the connect-in-progress status for this remote leaf configuration.
504
func (cfg *leafNodeCfg) setConnectInProgress(inProgress bool) {
3,767✔
505
        cfg.Lock()
3,767✔
506
        defer cfg.Unlock()
3,767✔
507
        // In both cases we want to drain the "quit" channel.
3,767✔
508
        select {
3,767✔
509
        case <-cfg.quitCh:
1✔
510
        default:
3,766✔
511
        }
512
        cfg.connInProgress = inProgress
3,767✔
513
}
514

515
// Returns `true` if this remote is in the middle of a connect, `false` otherwise.
516
func (cfg *leafNodeCfg) isConnectInProgress() bool {
×
517
        cfg.RLock()
×
518
        defer cfg.RUnlock()
×
519
        return cfg.connInProgress
×
520
}
×
521

522
// Mark that this remote is being removed from the configuration.
523
func (cfg *leafNodeCfg) markAsRemoved() {
×
524
        cfg.Lock()
×
525
        defer cfg.Unlock()
×
526
        // This function should be invoked only once, but protect.
×
527
        if cfg.removed {
×
528
                return
×
529
        }
×
530
        cfg.removed = true
×
531
        cfg.notifyQuitChannel()
×
532
}
533

534
// Returns false if it has been disabled or removed.
535
func (cfg *leafNodeCfg) stillValid() bool {
7,766✔
536
        cfg.RLock()
7,766✔
537
        defer cfg.RUnlock()
7,766✔
538
        return !cfg.Disabled && !cfg.removed
7,766✔
539
}
7,766✔
540

541
// Will pick an URL from the list of available URLs.
542
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,278✔
543
        cfg.Lock()
6,278✔
544
        defer cfg.Unlock()
6,278✔
545
        // If the current URL is the first in the list and we have more than
6,278✔
546
        // one URL, then move that one to end of the list.
6,278✔
547
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
9,012✔
548
                first := cfg.urls[0]
2,734✔
549
                copy(cfg.urls, cfg.urls[1:])
2,734✔
550
                cfg.urls[len(cfg.urls)-1] = first
2,734✔
551
        }
2,734✔
552
        cfg.curURL = cfg.urls[0]
6,278✔
553
        return cfg.curURL
6,278✔
554
}
555

556
// Returns the current URL
557
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
78✔
558
        cfg.RLock()
78✔
559
        defer cfg.RUnlock()
78✔
560
        return cfg.curURL
78✔
561
}
78✔
562

563
// Returns how long the server should wait before attempting
564
// to solicit a remote leafnode connection.
565
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,618✔
566
        cfg.RLock()
1,618✔
567
        delay := cfg.connDelay
1,618✔
568
        cfg.RUnlock()
1,618✔
569
        return delay
1,618✔
570
}
1,618✔
571

572
// Sets the connect delay.
573
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
161✔
574
        cfg.Lock()
161✔
575
        cfg.connDelay = delay
161✔
576
        cfg.Unlock()
161✔
577
}
161✔
578

579
// Ensure that non-exported options (used in tests) have
580
// been properly set.
581
func (s *Server) setLeafNodeNonExportedOptions() {
7,129✔
582
        opts := s.getOpts()
7,129✔
583
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
7,129✔
584
        if s.leafNodeOpts.dialTimeout == 0 {
14,257✔
585
                // Use same timeouts as routes for now.
7,128✔
586
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
7,128✔
587
        }
7,128✔
588
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
7,129✔
589
        if s.leafNodeOpts.resolver == nil {
14,255✔
590
                s.leafNodeOpts.resolver = net.DefaultResolver
7,126✔
591
        }
7,126✔
592
}
593

594
const sharedSysAccDelay = 250 * time.Millisecond
595

596
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
597
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
598
        proxyAddr, err := url.Parse(proxyURL)
11✔
599
        if err != nil {
11✔
600
                // This should not happen since proxy URL is validated during configuration parsing
×
601
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
602
        }
×
603

604
        // Connect to the proxy server
605
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
606
        if err != nil {
11✔
607
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
608
        }
×
609

610
        // Set deadline for the entire proxy handshake
611
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
612
                conn.Close()
×
613
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
614
        }
×
615

616
        req := &http.Request{
11✔
617
                Method: http.MethodConnect,
11✔
618
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
619
                Host:   targetHost,
11✔
620
                Header: make(http.Header),
11✔
621
        }
11✔
622

11✔
623
        // Add proxy authentication if provided
11✔
624
        if username != "" && password != "" {
13✔
625
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
626
        }
2✔
627

628
        if err := req.Write(conn); err != nil {
11✔
629
                conn.Close()
×
630
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
631
        }
×
632

633
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
634
        if err != nil {
11✔
635
                conn.Close()
×
636
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
637
        }
×
638

639
        if resp.StatusCode != http.StatusOK {
12✔
640
                resp.Body.Close()
1✔
641
                conn.Close()
1✔
642
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
643
        }
1✔
644

645
        // Close the response body
646
        resp.Body.Close()
10✔
647

10✔
648
        // Clear the deadline now that we've finished the proxy handshake
10✔
649
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
650
                conn.Close()
×
651
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
652
        }
×
653

654
        return conn, nil
10✔
655
}
656

657
// Connect to a remote leaf node asynchronously (that is, this function will do
658
// the connect in a go routine).
659
func (s *Server) connectToRemoteLeafNodeAsynchronously(remote *leafNodeCfg, firstConnect bool) {
1,428✔
660
        remote.setConnectInProgress(true)
1,428✔
661
        s.startGoRoutine(func() {
2,856✔
662
                defer s.grWG.Done()
1,428✔
663
                if !connectToRemoteLeafNode(s, remote, firstConnect) {
2,209✔
664
                        remote.setConnectInProgress(false)
781✔
665
                }
781✔
666
        })
667
}
668

669
// Connect to a remote leaf node. Should only be invoked from
670
// `s.connectToRemoteLeafNodeAsynchronously()` or `s.reConnectToRemoteLeafNode()`.
671
// Returns `true` if this function invoked `s.createLeafNode()`, false otherwise.
672
func connectToRemoteLeafNode(s *Server, remote *leafNodeCfg, firstConnect bool) bool {
1,618✔
673

1,618✔
674
        if remote == nil || len(remote.URLs) == 0 {
1,618✔
675
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
676
                return false
×
677
        }
×
678

679
        opts := s.getOpts()
1,618✔
680
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,618✔
681
        s.mu.RLock()
1,618✔
682
        dialTimeout := s.leafNodeOpts.dialTimeout
1,618✔
683
        resolver := s.leafNodeOpts.resolver
1,618✔
684
        var isSysAcc bool
1,618✔
685
        if s.eventsEnabled() {
3,197✔
686
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,579✔
687
        }
1,579✔
688
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,618✔
689
        s.mu.RUnlock()
1,618✔
690

1,618✔
691
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,618✔
692
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,692✔
693
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
74✔
694
                remote.setConnectDelay(sharedSysAccDelay)
74✔
695
        }
74✔
696

697
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,700✔
698
                select {
82✔
699
                case <-time.After(connDelay):
73✔
700
                case <-remote.quitCh:
×
701
                        return false
×
702
                case <-s.quitCh:
9✔
703
                        return false
9✔
704
                }
705
                remote.setConnectDelay(0)
73✔
706
        }
707

708
        var conn net.Conn
1,609✔
709

1,609✔
710
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,609✔
711

1,609✔
712
        // Capture proxy configuration once before the loop with proper locking
1,609✔
713
        remote.RLock()
1,609✔
714
        proxyURL := remote.Proxy.URL
1,609✔
715
        proxyUsername := remote.Proxy.Username
1,609✔
716
        proxyPassword := remote.Proxy.Password
1,609✔
717
        proxyTimeout := remote.Proxy.Timeout
1,609✔
718
        remote.RUnlock()
1,609✔
719

1,609✔
720
        // Set default proxy timeout if not specified
1,609✔
721
        if proxyTimeout == 0 {
3,210✔
722
                proxyTimeout = dialTimeout
1,601✔
723
        }
1,601✔
724

725
        attempts := 0
1,609✔
726

1,609✔
727
        // In case the migrate timer was created but not canceled, do it when
1,609✔
728
        // this function exits. Note that the timer would not be created if
1,609✔
729
        // `jetstreamMigrateDelay == 0`.
1,609✔
730
        if jetstreamMigrateDelay > 0 {
1,617✔
731
                defer remote.cancelMigrateTimer()
8✔
732
        }
8✔
733

734
        for s.isRunning() && remote.stillValid() {
7,887✔
735
                rURL := remote.pickNextURL()
6,278✔
736
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,278✔
737
                if err == nil {
12,549✔
738
                        var ipStr string
6,271✔
739
                        if url != rURL.Host {
6,342✔
740
                                ipStr = fmt.Sprintf(" (%s)", url)
71✔
741
                        }
71✔
742
                        // Some test may want to disable remotes from connecting
743
                        if s.isLeafConnectDisabled() {
6,411✔
744
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
140✔
745
                                err = ErrLeafNodeDisabled
140✔
746
                        } else {
6,271✔
747
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,131✔
748

6,131✔
749
                                // Check if proxy is configured
6,131✔
750
                                if proxyURL != _EMPTY_ {
6,139✔
751
                                        targetHost := rURL.Host
8✔
752
                                        // If URL doesn't include port, add the default port for the scheme
8✔
753
                                        if rURL.Port() == _EMPTY_ {
8✔
754
                                                defaultPort := "80"
×
755
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
756
                                                        defaultPort = "443"
×
757
                                                }
×
758
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
759
                                        }
760

761
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
762
                                } else {
6,123✔
763
                                        // Direct connection
6,123✔
764
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,123✔
765
                                }
6,123✔
766
                        }
767
                }
768
                if err != nil {
11,728✔
769
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,450✔
770
                        delay := reconnectDelay + jitter
5,450✔
771
                        attempts++
5,450✔
772
                        if s.shouldReportConnectErr(firstConnect, attempts) {
8,975✔
773
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,525✔
774
                        } else {
5,450✔
775
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,925✔
776
                        }
1,925✔
777
                        remote.Lock()
5,450✔
778
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,450✔
779
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,458✔
780
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
781
                                        s.checkJetStreamMigrate(remote)
8✔
782
                                })
8✔
783
                        }
784
                        remote.Unlock()
5,450✔
785
                        select {
5,450✔
786
                        case <-s.quitCh:
764✔
787
                                return false
764✔
788
                        case <-remote.quitCh:
1✔
789
                                return false
1✔
790
                        case <-time.After(delay):
4,684✔
791
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
4,684✔
792
                                // This will be used if JetStreamClusterMigrateDelay was not set
4,684✔
793
                                if jetstreamMigrateDelay == 0 {
9,294✔
794
                                        s.checkJetStreamMigrate(remote)
4,610✔
795
                                }
4,610✔
796
                                continue
4,684✔
797
                        }
798
                }
799
                remote.cancelMigrateTimer()
828✔
800
                // We can check here, but really we will have to check again when the server
828✔
801
                // is about to add to the `s.leafs` map later in the process.
828✔
802
                if !remote.stillValid() {
828✔
803
                        conn.Close()
×
804
                        return false
×
805
                }
×
806

807
                // We have a connection here to a remote server.
808
                // Go ahead and create our leaf node and return.
809
                s.createLeafNode(conn, rURL, remote, nil)
828✔
810

828✔
811
                // Clear any observer states if we had them.
828✔
812
                s.clearObserverState(remote)
828✔
813

828✔
814
                return true
828✔
815
        }
816

817
        return false
15✔
818
}
819

820
func (cfg *leafNodeCfg) cancelMigrateTimer() {
836✔
821
        cfg.Lock()
836✔
822
        stopAndClearTimer(&cfg.jsMigrateTimer)
836✔
823
        cfg.Unlock()
836✔
824
}
836✔
825

826
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
827
func (s *Server) clearObserverState(remote *leafNodeCfg) {
828✔
828
        s.mu.RLock()
828✔
829
        accName := remote.LocalAccount
828✔
830
        s.mu.RUnlock()
828✔
831

828✔
832
        acc, err := s.LookupAccount(accName)
828✔
833
        if err != nil {
830✔
834
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
835
                return
2✔
836
        }
2✔
837

838
        acc.jscmMu.Lock()
826✔
839
        defer acc.jscmMu.Unlock()
826✔
840

826✔
841
        // Walk all streams looking for any clustered stream, skip otherwise.
826✔
842
        for _, mset := range acc.streams() {
844✔
843
                node := mset.raftNode()
18✔
844
                if node == nil {
28✔
845
                        // Not R>1
10✔
846
                        continue
10✔
847
                }
848
                // Check consumers
849
                for _, o := range mset.getConsumers() {
10✔
850
                        if n := o.raftNode(); n != nil {
4✔
851
                                // Ensure we can become a leader again.
2✔
852
                                n.SetObserver(false)
2✔
853
                        }
2✔
854
                }
855
                // Ensure we can not become a leader again.
856
                node.SetObserver(false)
8✔
857
        }
858
}
859

860
// Check to see if we should migrate any assets from this account.
861
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,618✔
862
        s.mu.RLock()
4,618✔
863
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,618✔
864
        s.mu.RUnlock()
4,618✔
865

4,618✔
866
        if !shouldMigrate {
9,162✔
867
                return
4,544✔
868
        }
4,544✔
869

870
        acc, err := s.LookupAccount(accName)
74✔
871
        if err != nil {
74✔
872
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
873
                return
×
874
        }
×
875

876
        acc.jscmMu.Lock()
74✔
877
        defer acc.jscmMu.Unlock()
74✔
878

74✔
879
        // Walk all streams looking for any clustered stream, skip otherwise.
74✔
880
        // If we are the leader force stepdown.
74✔
881
        for _, mset := range acc.streams() {
112✔
882
                node := mset.raftNode()
38✔
883
                if node == nil {
38✔
884
                        // Not R>1
×
885
                        continue
×
886
                }
887
                // Collect any consumers
888
                for _, o := range mset.getConsumers() {
64✔
889
                        if n := o.raftNode(); n != nil {
52✔
890
                                n.StepDown()
26✔
891
                                // Ensure we can not become a leader while in this state.
26✔
892
                                n.SetObserver(true)
26✔
893
                        }
26✔
894
                }
895
                // Stepdown if this stream was leader.
896
                node.StepDown()
38✔
897
                // Ensure we can not become a leader while in this state.
38✔
898
                node.SetObserver(true)
38✔
899
        }
900
}
901

902
// Helper for checking.
903
func (s *Server) isLeafConnectDisabled() bool {
6,271✔
904
        s.mu.RLock()
6,271✔
905
        defer s.mu.RUnlock()
6,271✔
906
        return s.leafDisableConnect
6,271✔
907
}
6,271✔
908

909
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
910
// come from the server we connect to.
911
//
912
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
913
// However, this was causing failures for users that did not set the scheme (and
914
// their remote connections did not have a tls{} block).
915
// We now save the host name regardless in case the remote returns an INFO indicating
916
// that TLS is required.
917
//
918
// Lock held on entry.
919
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,523✔
920
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,542✔
921
                cfg.tlsName = u.Hostname()
19✔
922
        }
19✔
923
}
924

925
// Save off the username/password for when we connect using a bare URL
926
// that we get from the INFO protocol.
927
//
928
// Lock held on entry.
929
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,868✔
930
        if cfg.username == _EMPTY_ && u.User != nil {
2,189✔
931
                cfg.username = u.User.Username()
321✔
932
                cfg.password, _ = u.User.Password()
321✔
933
        }
321✔
934
}
935

936
// This starts the leafnode accept loop in a go routine, unless it
937
// is detected that the server has already been shutdown.
938
func (s *Server) startLeafNodeAcceptLoop() {
3,969✔
939
        // Snapshot server options.
3,969✔
940
        opts := s.getOpts()
3,969✔
941

3,969✔
942
        port := opts.LeafNode.Port
3,969✔
943
        if port == -1 {
7,762✔
944
                port = 0
3,793✔
945
        }
3,793✔
946

947
        if s.isShuttingDown() {
3,969✔
948
                return
×
949
        }
×
950

951
        s.mu.Lock()
3,969✔
952
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,969✔
953
        l, e := natsListen("tcp", hp)
3,969✔
954
        s.leafNodeListenerErr = e
3,969✔
955
        if e != nil {
3,969✔
956
                s.mu.Unlock()
×
957
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
958
                return
×
959
        }
×
960

961
        s.Noticef("Listening for leafnode connections on %s",
3,969✔
962
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,969✔
963

3,969✔
964
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,969✔
965
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,969✔
966
        // Do not set compression in this Info object, it would possibly cause
3,969✔
967
        // issues when sending asynchronous INFO to the remote.
3,969✔
968
        info := Info{
3,969✔
969
                ID:            s.info.ID,
3,969✔
970
                Name:          s.info.Name,
3,969✔
971
                Version:       s.info.Version,
3,969✔
972
                GitCommit:     gitCommit,
3,969✔
973
                GoVersion:     runtime.Version(),
3,969✔
974
                AuthRequired:  true,
3,969✔
975
                TLSRequired:   tlsRequired,
3,969✔
976
                TLSVerify:     tlsVerify,
3,969✔
977
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,969✔
978
                Headers:       s.supportsHeaders(),
3,969✔
979
                JetStream:     opts.JetStream,
3,969✔
980
                Domain:        opts.JetStreamDomain,
3,969✔
981
                Proto:         s.getServerProto(),
3,969✔
982
                InfoOnConnect: true,
3,969✔
983
                JSApiLevel:    JSApiLevel,
3,969✔
984
        }
3,969✔
985
        // If we have selected a random port...
3,969✔
986
        if port == 0 {
7,762✔
987
                // Write resolved port back to options.
3,793✔
988
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,793✔
989
        }
3,793✔
990

991
        s.leafNodeInfo = info
3,969✔
992
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,969✔
993
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,969✔
994
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
995
                l.Close()
×
996
                s.mu.Unlock()
×
997
                return
×
998
        }
×
999
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,969✔
1000
        s.generateLeafNodeInfoJSON()
3,969✔
1001

3,969✔
1002
        // Setup state that can enable shutdown
3,969✔
1003
        s.leafNodeListener = l
3,969✔
1004

3,969✔
1005
        // As of now, a server that does not have remotes configured would
3,969✔
1006
        // never solicit a connection, so we should not have to warn if
3,969✔
1007
        // InsecureSkipVerify is set in main LeafNodes config (since
3,969✔
1008
        // this TLS setting matters only when soliciting a connection).
3,969✔
1009
        // Still, warn if insecure is set in any of LeafNode block.
3,969✔
1010
        // We need to check remotes, even if tls is not required on accept.
3,969✔
1011
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,969✔
1012
        if !warn {
7,936✔
1013
                for _, r := range opts.LeafNode.Remotes {
4,160✔
1014
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
193✔
1015
                                warn = true
×
1016
                                break
×
1017
                        }
1018
                }
1019
        }
1020
        if warn {
3,971✔
1021
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
1022
        }
2✔
1023
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,848✔
1024
        s.mu.Unlock()
3,969✔
1025
}
1026

1027
// RegEx to match a creds file with user JWT and Seed.
1028
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
1029

1030
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
1031
// Lock should be held entering here.
1032
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
700✔
1033
        // We support basic user/pass and operator based user JWT with signatures.
700✔
1034
        cinfo := leafConnectInfo{
700✔
1035
                Version:       VERSION,
700✔
1036
                ID:            c.srv.info.ID,
700✔
1037
                Domain:        c.srv.info.Domain,
700✔
1038
                Name:          c.srv.info.Name,
700✔
1039
                Hub:           c.leaf.remote.Hub,
700✔
1040
                Cluster:       clusterName,
700✔
1041
                Headers:       headers,
700✔
1042
                JetStream:     c.acc.jetStreamConfigured(),
700✔
1043
                DenyPub:       c.leaf.remote.DenyImports,
700✔
1044
                Compression:   c.leaf.compression,
700✔
1045
                RemoteAccount: c.acc.GetName(),
700✔
1046
                Proto:         c.srv.getServerProto(),
700✔
1047
                Isolate:       c.leaf.remote.RequestIsolation,
700✔
1048
        }
700✔
1049

700✔
1050
        // If a signature callback is specified, this takes precedence over anything else.
700✔
1051
        if cb := c.leaf.remote.SignatureCB; cb != nil {
706✔
1052
                nonce := c.nonce
6✔
1053
                c.mu.Unlock()
6✔
1054
                jwt, sigraw, err := cb(nonce)
6✔
1055
                c.mu.Lock()
6✔
1056
                if err == nil && c.isClosed() {
7✔
1057
                        err = ErrConnectionClosed
1✔
1058
                }
1✔
1059
                if err != nil {
8✔
1060
                        c.Errorf("Error signing the nonce: %v", err)
2✔
1061
                        return err
2✔
1062
                }
2✔
1063
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
4✔
1064
                cinfo.JWT, cinfo.Sig = jwt, sig
4✔
1065

1066
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
750✔
1067
                // Check for credentials first, that will take precedence..
56✔
1068
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1069
                contents, err := os.ReadFile(creds)
56✔
1070
                if err != nil {
56✔
1071
                        c.Errorf("%v", err)
×
1072
                        return err
×
1073
                }
×
1074
                defer wipeSlice(contents)
56✔
1075
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1076
                if len(items) < 2 {
56✔
1077
                        c.Errorf("Credentials file malformed")
×
1078
                        return err
×
1079
                }
×
1080
                // First result should be the user JWT.
1081
                // We copy here so that the file containing the seed will be wiped appropriately.
1082
                raw := items[0][1]
56✔
1083
                tmp := make([]byte, len(raw))
56✔
1084
                copy(tmp, raw)
56✔
1085
                // Seed is second item.
56✔
1086
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1087
                if err != nil {
56✔
1088
                        c.Errorf("Credentials file has malformed seed")
×
1089
                        return err
×
1090
                }
×
1091
                // Wipe our key on exit.
1092
                defer kp.Wipe()
56✔
1093

56✔
1094
                sigraw, _ := kp.Sign(c.nonce)
56✔
1095
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1096
                cinfo.JWT = bytesToString(tmp)
56✔
1097
                cinfo.Sig = sig
56✔
1098
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
643✔
1099
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1100
                if err != nil {
5✔
1101
                        c.Errorf("Remote nkey has malformed seed")
×
1102
                        return err
×
1103
                }
×
1104
                // Wipe our key on exit.
1105
                defer kp.Wipe()
5✔
1106
                sigraw, _ := kp.Sign(c.nonce)
5✔
1107
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1108
                pkey, _ := kp.PublicKey()
5✔
1109
                cinfo.Nkey = pkey
5✔
1110
                cinfo.Sig = sig
5✔
1111
        }
1112
        // In addition, and this is to allow auth callout, set user/password or
1113
        // token if applicable.
1114
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
1,042✔
1115
                cinfo.User = userInfo.Username()
344✔
1116
                var ok bool
344✔
1117
                cinfo.Pass, ok = userInfo.Password()
344✔
1118
                // For backward compatibility, if only username is provided, set both
344✔
1119
                // Token and User, not just Token.
344✔
1120
                if !ok {
353✔
1121
                        cinfo.Token = cinfo.User
9✔
1122
                }
9✔
1123
        } else if c.leaf.remote.username != _EMPTY_ {
361✔
1124
                cinfo.User = c.leaf.remote.username
7✔
1125
                cinfo.Pass = c.leaf.remote.password
7✔
1126
                // For backward compatibility, if only username is provided, set both
7✔
1127
                // Token and User, not just Token.
7✔
1128
                if cinfo.Pass == _EMPTY_ {
7✔
1129
                        cinfo.Token = cinfo.User
×
1130
                }
×
1131
        }
1132
        b, err := json.Marshal(cinfo)
698✔
1133
        if err != nil {
698✔
1134
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1135
                return err
×
1136
        }
×
1137
        // Although this call is made before the writeLoop is created,
1138
        // we don't really need to send in place. The protocol will be
1139
        // sent out by the writeLoop.
1140
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
698✔
1141
        return nil
698✔
1142
}
1143

1144
// Makes a deep copy of the LeafNode Info structure.
1145
// The server lock is held on entry.
1146
func (s *Server) copyLeafNodeInfo() *Info {
2,810✔
1147
        clone := s.leafNodeInfo
2,810✔
1148
        // Copy the array of urls.
2,810✔
1149
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
5,111✔
1150
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,301✔
1151
        }
2,301✔
1152
        return &clone
2,810✔
1153
}
1154

1155
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1156
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1157
// Returns a boolean indicating if the URL was added or not.
1158
// Server lock is held on entry
1159
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,878✔
1160
        if s.leafURLsMap.addUrl(urlStr) {
15,751✔
1161
                s.generateLeafNodeInfoJSON()
7,873✔
1162
                return true
7,873✔
1163
        }
7,873✔
1164
        return false
5✔
1165
}
1166

1167
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1168
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1169
// Returns a boolean indicating if the URL was removed or not.
1170
// Server lock is held on entry.
1171
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,806✔
1172
        // Don't need to do this if we are removing the route connection because
7,806✔
1173
        // we are shuting down...
7,806✔
1174
        if s.isShuttingDown() {
11,933✔
1175
                return false
4,127✔
1176
        }
4,127✔
1177
        if s.leafURLsMap.removeUrl(urlStr) {
7,354✔
1178
                s.generateLeafNodeInfoJSON()
3,675✔
1179
                return true
3,675✔
1180
        }
3,675✔
1181
        return false
4✔
1182
}
1183

1184
// Server lock is held on entry
1185
func (s *Server) generateLeafNodeInfoJSON() {
15,517✔
1186
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,517✔
1187
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,517✔
1188
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,517✔
1189
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,517✔
1190
}
15,517✔
1191

1192
// Sends an async INFO protocol so that the connected servers can update
1193
// their list of LeafNode urls.
1194
func (s *Server) sendAsyncLeafNodeInfo() {
11,548✔
1195
        for _, c := range s.leafs {
11,650✔
1196
                c.mu.Lock()
102✔
1197
                c.enqueueProto(s.leafNodeInfoJSON)
102✔
1198
                c.mu.Unlock()
102✔
1199
        }
102✔
1200
}
1201

1202
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1203
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,736✔
1204
        // Snapshot server options.
1,736✔
1205
        opts := s.getOpts()
1,736✔
1206

1,736✔
1207
        maxPay := int32(opts.MaxPayload)
1,736✔
1208
        maxSubs := int32(opts.MaxSubs)
1,736✔
1209
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,736✔
1210
        if maxSubs == 0 {
3,471✔
1211
                maxSubs = -1
1,735✔
1212
        }
1,735✔
1213
        now := time.Now().UTC()
1,736✔
1214

1,736✔
1215
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,736✔
1216
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,736✔
1217
        c.leaf = &leaf{}
1,736✔
1218

1,736✔
1219
        // If the leafnode subject interest should be isolated, flag it here.
1,736✔
1220
        s.optsMu.RLock()
1,736✔
1221
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,562✔
1222
                c.leaf.isolated = remote.LocalIsolation
826✔
1223
        }
826✔
1224
        s.optsMu.RUnlock()
1,736✔
1225

1,736✔
1226
        // For accepted LN connections, ws will be != nil if it was accepted
1,736✔
1227
        // through the Websocket port.
1,736✔
1228
        c.ws = ws
1,736✔
1229

1,736✔
1230
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,736✔
1231
        // a remote Leaf Node connection as a websocket connection.
1,736✔
1232
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,786✔
1233
                remote.RLock()
50✔
1234
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1235
                remote.RUnlock()
50✔
1236
        }
50✔
1237

1238
        // Determines if we are soliciting the connection or not.
1239
        var solicited bool
1,736✔
1240
        var acc *Account
1,736✔
1241
        var remoteSuffix string
1,736✔
1242
        if remote != nil {
2,564✔
1243
                // For now, if lookup fails, we will constantly try
828✔
1244
                // to recreate this LN connection.
828✔
1245
                lacc := remote.LocalAccount
828✔
1246
                var err error
828✔
1247
                acc, err = s.LookupAccount(lacc)
828✔
1248
                if err != nil {
830✔
1249
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1250
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1251
                        // remote needs to be set or retry won't happen
2✔
1252
                        c.leaf.remote = remote
2✔
1253
                        c.closeConnection(MissingAccount)
2✔
1254
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1255
                        return nil
2✔
1256
                }
2✔
1257
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
826✔
1258
        }
1259

1260
        c.mu.Lock()
1,734✔
1261
        c.initClient()
1,734✔
1262
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,734✔
1263

1,734✔
1264
        var (
1,734✔
1265
                tlsFirst         bool
1,734✔
1266
                tlsFirstFallback time.Duration
1,734✔
1267
                infoTimeout      time.Duration
1,734✔
1268
        )
1,734✔
1269
        if remote != nil {
2,560✔
1270
                solicited = true
826✔
1271
                remote.Lock()
826✔
1272
                c.leaf.remote = remote
826✔
1273
                c.setPermissions(remote.perms)
826✔
1274
                if !c.leaf.remote.Hub {
1,634✔
1275
                        c.leaf.isSpoke = true
808✔
1276
                }
808✔
1277
                tlsFirst = remote.TLSHandshakeFirst
826✔
1278
                infoTimeout = remote.FirstInfoTimeout
826✔
1279
                remote.Unlock()
826✔
1280
                c.acc = acc
826✔
1281
        } else {
908✔
1282
                c.flags.set(expectConnect)
908✔
1283
                if ws != nil {
937✔
1284
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1285
                }
29✔
1286
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
908✔
1287
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
909✔
1288
                        tlsFirstFallback = f
1✔
1289
                }
1✔
1290
        }
1291
        c.mu.Unlock()
1,734✔
1292

1,734✔
1293
        var nonce [nonceLen]byte
1,734✔
1294
        var info *Info
1,734✔
1295

1,734✔
1296
        // Grab this before the client lock below.
1,734✔
1297
        if !solicited {
2,642✔
1298
                // Grab server variables
908✔
1299
                s.mu.Lock()
908✔
1300
                info = s.copyLeafNodeInfo()
908✔
1301
                // For tests that want to simulate old servers, do not set the compression
908✔
1302
                // on the INFO protocol if configured with CompressionNotSupported.
908✔
1303
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,815✔
1304
                        info.Compression = cm
907✔
1305
                }
907✔
1306
                // We always send a nonce for LEAF connections. Do not change that without
1307
                // taking into account presence of proxy trusted keys.
1308
                s.generateNonce(nonce[:])
908✔
1309
                s.mu.Unlock()
908✔
1310
        }
1311

1312
        // Grab lock
1313
        c.mu.Lock()
1,734✔
1314

1,734✔
1315
        var preBuf []byte
1,734✔
1316
        if solicited {
2,560✔
1317
                // For websocket connection, we need to send an HTTP request,
826✔
1318
                // and get the response before starting the readLoop to get
826✔
1319
                // the INFO, etc..
826✔
1320
                if c.isWebsocket() {
876✔
1321
                        var err error
50✔
1322
                        var closeReason ClosedState
50✔
1323

50✔
1324
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1325
                        if err != nil {
71✔
1326
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1327
                                c.mu.Unlock()
21✔
1328
                                if closeReason != 0 {
38✔
1329
                                        c.closeConnection(closeReason)
17✔
1330
                                }
17✔
1331
                                return nil
21✔
1332
                        }
1333
                } else {
776✔
1334
                        // If configured to do TLS handshake first
776✔
1335
                        if tlsFirst {
780✔
1336
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1337
                                        c.mu.Unlock()
1✔
1338
                                        return nil
1✔
1339
                                }
1✔
1340
                        }
1341
                        // We need to wait for the info, but not for too long.
1342
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
775✔
1343
                }
1344

1345
                // We will process the INFO from the readloop and finish by
1346
                // sending the CONNECT and finish registration later.
1347
        } else {
908✔
1348
                // Send our info to the other side.
908✔
1349
                // Remember the nonce we sent here for signatures, etc.
908✔
1350
                c.nonce = make([]byte, nonceLen)
908✔
1351
                copy(c.nonce, nonce[:])
908✔
1352
                info.Nonce = bytesToString(c.nonce)
908✔
1353
                info.CID = c.cid
908✔
1354
                proto := generateInfoJSON(info)
908✔
1355

908✔
1356
                var pre []byte
908✔
1357
                // We need first to check for "TLS First" fallback delay.
908✔
1358
                if tlsFirstFallback > 0 {
909✔
1359
                        // We wait and see if we are getting any data. Since we did not send
1✔
1360
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1361
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1362
                        // if it is a rogue agent and not an actual client performing the
1✔
1363
                        // TLS handshake, the error will be detected when performing the
1✔
1364
                        // handshake on our side.
1✔
1365
                        pre = make([]byte, 4)
1✔
1366
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1367
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1368
                        c.nc.SetReadDeadline(time.Time{})
1✔
1369
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1370
                        // with the TLS handshake.
1✔
1371
                        if n > 0 {
1✔
1372
                                pre = pre[:n]
×
1373
                        } else {
1✔
1374
                                // We did not get anything so we will send the INFO protocol.
1✔
1375
                                pre = nil
1✔
1376
                                // Set the boolean to false for the rest of the function.
1✔
1377
                                tlsFirst = false
1✔
1378
                        }
1✔
1379
                }
1380

1381
                if !tlsFirst {
1,811✔
1382
                        // We have to send from this go routine because we may
903✔
1383
                        // have to block for TLS handshake before we start our
903✔
1384
                        // writeLoop go routine. The other side needs to receive
903✔
1385
                        // this before it can initiate the TLS handshake..
903✔
1386
                        c.sendProtoNow(proto)
903✔
1387

903✔
1388
                        // The above call could have marked the connection as closed (due to TCP error).
903✔
1389
                        if c.isClosed() {
903✔
1390
                                c.mu.Unlock()
×
1391
                                c.closeConnection(WriteError)
×
1392
                                return nil
×
1393
                        }
×
1394
                }
1395

1396
                // Check to see if we need to spin up TLS.
1397
                if !c.isWebsocket() && info.TLSRequired {
984✔
1398
                        // If we have a prebuffer create a multi-reader.
76✔
1399
                        if len(pre) > 0 {
76✔
1400
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1401
                        }
×
1402
                        // Perform server-side TLS handshake.
1403
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
125✔
1404
                                c.mu.Unlock()
49✔
1405
                                return nil
49✔
1406
                        }
49✔
1407
                }
1408

1409
                // If the user wants the TLS handshake to occur first, now that it is
1410
                // done, send the INFO protocol.
1411
                if tlsFirst {
862✔
1412
                        c.flags.set(didTLSFirst)
3✔
1413
                        c.sendProtoNow(proto)
3✔
1414
                        if c.isClosed() {
3✔
1415
                                c.mu.Unlock()
×
1416
                                c.closeConnection(WriteError)
×
1417
                                return nil
×
1418
                        }
×
1419
                }
1420

1421
                // Leaf nodes will always require a CONNECT to let us know
1422
                // when we are properly bound to an account.
1423
                //
1424
                // If compression is configured, we can't set the authTimer here because
1425
                // it would cause the parser to fail any incoming protocol that is not a
1426
                // CONNECT (and we need to exchange INFO protocols for compression
1427
                // negotiation). So instead, use the ping timer until we are done with
1428
                // negotiation and can set the auth timer.
1429
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
859✔
1430
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,490✔
1431
                        c.ping.tmr = time.AfterFunc(timeout, func() {
636✔
1432
                                c.authTimeout()
5✔
1433
                        })
5✔
1434
                } else {
228✔
1435
                        c.setAuthTimer(timeout)
228✔
1436
                }
228✔
1437
        }
1438

1439
        // Keep track in case server is shutdown before we can successfully register.
1440
        if !s.addToTempClients(c.cid, c) {
1,664✔
1441
                c.mu.Unlock()
1✔
1442
                c.setNoReconnect()
1✔
1443
                c.closeConnection(ServerShutdown)
1✔
1444
                return nil
1✔
1445
        }
1✔
1446

1447
        // Spin up the read loop.
1448
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,324✔
1449

1450
        // We will spin the write loop for solicited connections only
1451
        // when processing the INFO and after switching to TLS if needed.
1452
        if !solicited {
2,521✔
1453
                s.startGoRoutine(func() { c.writeLoop() })
1,718✔
1454
        }
1455

1456
        c.mu.Unlock()
1,662✔
1457

1,662✔
1458
        return c
1,662✔
1459
}
1460

1461
// Will perform the client-side TLS handshake if needed. Assumes that this
1462
// is called by the solicit side (remote will be non nil). Returns `true`
1463
// if TLS is required, `false` otherwise.
1464
// Lock held on entry.
1465
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,984✔
1466
        // Check if TLS is required and gather TLS config variables.
1,984✔
1467
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,984✔
1468
        if !tlsRequired {
3,890✔
1469
                return false, nil
1,906✔
1470
        }
1,906✔
1471

1472
        // If TLS required, peform handshake.
1473
        // Get the URL that was used to connect to the remote server.
1474
        rURL := remote.getCurrentURL()
78✔
1475

78✔
1476
        // Perform the client-side TLS handshake.
78✔
1477
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
115✔
1478
                // Check if we need to reset the remote's TLS name.
37✔
1479
                if resetTLSName {
37✔
1480
                        remote.Lock()
×
1481
                        remote.tlsName = _EMPTY_
×
1482
                        remote.Unlock()
×
1483
                }
×
1484
                return false, err
37✔
1485
        }
1486
        return true, nil
41✔
1487
}
1488

1489
func (c *client) processLeafnodeInfo(info *Info) {
2,752✔
1490
        c.mu.Lock()
2,752✔
1491
        if c.leaf == nil || c.isClosed() {
2,754✔
1492
                c.mu.Unlock()
2✔
1493
                return
2✔
1494
        }
2✔
1495
        s := c.srv
2,750✔
1496
        opts := s.getOpts()
2,750✔
1497
        remote := c.leaf.remote
2,750✔
1498
        didSolicit := remote != nil
2,750✔
1499
        firstINFO := !c.flags.isSet(infoReceived)
2,750✔
1500

2,750✔
1501
        // In case of websocket, the TLS handshake has been already done.
2,750✔
1502
        // So check only for non websocket connections and for configurations
2,750✔
1503
        // where the TLS Handshake was not done first.
2,750✔
1504
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,680✔
1505
                // If the server requires TLS, we need to set this in the remote
1,930✔
1506
                // otherwise if there is no TLS configuration block for the remote,
1,930✔
1507
                // the solicit side will not attempt to perform the TLS handshake.
1,930✔
1508
                if firstINFO && info.TLSRequired {
1,992✔
1509
                        // Check for TLS/proxy configuration mismatch
62✔
1510
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
62✔
1511
                                c.mu.Unlock()
×
1512
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1513
                                c.closeConnection(TLSHandshakeError)
×
1514
                                return
×
1515
                        }
×
1516
                        remote.TLS = true
62✔
1517
                }
1518
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,962✔
1519
                        c.mu.Unlock()
32✔
1520
                        return
32✔
1521
                }
32✔
1522
        }
1523

1524
        // Check for compression, unless already done.
1525
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
4,076✔
1526
                // A solicited leafnode connection must first receive a leafnode INFO.
1,358✔
1527
                // Classify wrong-port connections before any leaf-specific negotiation.
1,358✔
1528
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
1,412✔
1529
                        c.mu.Unlock()
54✔
1530
                        c.Errorf(ErrConnectedToWrongPort.Error())
54✔
1531
                        c.closeConnection(WrongPort)
54✔
1532
                        return
54✔
1533
                }
54✔
1534

1535
                // Prevent from getting back here.
1536
                c.flags.set(compressionNegotiated)
1,304✔
1537

1,304✔
1538
                var co *CompressionOpts
1,304✔
1539
                if !didSolicit {
1,904✔
1540
                        co = &opts.LeafNode.Compression
600✔
1541
                } else {
1,304✔
1542
                        co = &remote.Compression
704✔
1543
                }
704✔
1544
                if needsCompression(co.Mode) {
2,593✔
1545
                        // Release client lock since following function will need server lock.
1,289✔
1546
                        c.mu.Unlock()
1,289✔
1547
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,289✔
1548
                        if err != nil {
1,289✔
1549
                                c.sendErrAndErr(err.Error())
×
1550
                                c.closeConnection(ProtocolViolation)
×
1551
                                return
×
1552
                        }
×
1553
                        if compress {
2,485✔
1554
                                // Done for now, will get back another INFO protocol...
1,196✔
1555
                                return
1,196✔
1556
                        }
1,196✔
1557
                        // No compression because one side does not want/can't, so proceed.
1558
                        c.mu.Lock()
93✔
1559
                        // Check that the connection did not close if the lock was released.
93✔
1560
                        if c.isClosed() {
93✔
1561
                                c.mu.Unlock()
×
1562
                                return
×
1563
                        }
×
1564
                } else {
15✔
1565
                        // Coming from an old server, the Compression field would be the empty
15✔
1566
                        // string. For servers that are configured with CompressionNotSupported,
15✔
1567
                        // this makes them behave as old servers.
15✔
1568
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
17✔
1569
                                c.leaf.compression = CompressionNotSupported
2✔
1570
                        } else {
15✔
1571
                                c.leaf.compression = CompressionOff
13✔
1572
                        }
13✔
1573
                }
1574
                // Accepting side does not normally process an INFO protocol during
1575
                // initial connection handshake. So we keep it consistent by returning
1576
                // if we are not soliciting.
1577
                if !didSolicit {
112✔
1578
                        // If we had created the ping timer instead of the auth timer, we will
4✔
1579
                        // clear the ping timer and set the auth timer now that the compression
4✔
1580
                        // negotiation is done.
4✔
1581
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
5✔
1582
                                clearTimer(&c.ping.tmr)
1✔
1583
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
1✔
1584
                        }
1✔
1585
                        c.mu.Unlock()
4✔
1586
                        return
4✔
1587
                }
1588
                // Fall through and process the INFO protocol as usual.
1589
        }
1590

1591
        // Note: For now, only the initial INFO has a nonce. We
1592
        // will probably do auto key rotation at some point.
1593
        if firstINFO {
2,210✔
1594
                // Mark that the INFO protocol has been received.
746✔
1595
                c.flags.set(infoReceived)
746✔
1596
                // Prevent connecting to non leafnode port. Need to do this only for
746✔
1597
                // the first INFO, not for async INFO updates...
746✔
1598
                //
746✔
1599
                // Content of INFO sent by the server when accepting a tcp connection.
746✔
1600
                // -------------------------------------------------------------------
746✔
1601
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
746✔
1602
                // -------------------------------------------------------------------
746✔
1603
                //      CLIENT    |  X* |        X**        |              |         |
746✔
1604
                //      ROUTE     |     |        X**        |      X***    |         |
746✔
1605
                //     GATEWAY    |     |                   |              |    X    |
746✔
1606
                //     LEAFNODE   |  X  |                   |       X      |         |
746✔
1607
                // -------------------------------------------------------------------
746✔
1608
                // *   Not on older servers.
746✔
1609
                // **  Not if "no advertise" is enabled.
746✔
1610
                // *** Not if leafnode's "no advertise" is enabled.
746✔
1611
                //
746✔
1612
                // Reject a cluster that contains spaces.
746✔
1613
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
747✔
1614
                        c.mu.Unlock()
1✔
1615
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1616
                        c.closeConnection(ProtocolViolation)
1✔
1617
                        return
1✔
1618
                }
1✔
1619
                // For solicited outbound leaf connections, capture the remote's nonce.
1620
                // For inbound leaf connections, keep using the server-issued nonce that
1621
                // was sent in our initial INFO and must be signed in CONNECT.
1622
                if didSolicit {
1,445✔
1623
                        c.nonce = []byte(info.Nonce)
700✔
1624
                }
700✔
1625
                if info.TLSRequired && didSolicit {
775✔
1626
                        remote.TLS = true
30✔
1627
                }
30✔
1628
                supportsHeaders := c.srv.supportsHeaders()
745✔
1629
                c.headers = supportsHeaders && info.Headers
745✔
1630

745✔
1631
                // Remember the remote server.
745✔
1632
                // Pre 2.2.0 servers are not sending their server name.
745✔
1633
                // In that case, use info.ID, which, for those servers, matches
745✔
1634
                // the content of the field `Name` in the leafnode CONNECT protocol.
745✔
1635
                if info.Name == _EMPTY_ {
747✔
1636
                        c.leaf.remoteServer = info.ID
2✔
1637
                } else {
745✔
1638
                        c.leaf.remoteServer = info.Name
743✔
1639
                }
743✔
1640
                c.leaf.remoteDomain = info.Domain
745✔
1641
                c.leaf.remoteCluster = info.Cluster
745✔
1642
                // We send the protocol version in the INFO protocol.
745✔
1643
                // Keep track of it, so we know if this connection supports message
745✔
1644
                // tracing for instance.
745✔
1645
                c.opts.Protocol = info.Proto
745✔
1646
        }
1647

1648
        // For both initial INFO and async INFO protocols, Possibly
1649
        // update our list of remote leafnode URLs we can connect to,
1650
        // unless we are instructed not to.
1651
        if didSolicit && !remote.IgnoreDiscoveredServers &&
1,463✔
1652
                (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,833✔
1653
                // Consider the incoming array as the most up-to-date
1,370✔
1654
                // representation of the remote cluster's list of URLs.
1,370✔
1655
                c.updateLeafNodeURLs(info)
1,370✔
1656
        }
1,370✔
1657

1658
        // Only solicited leafnode connections trust permission updates from INFO.
1659
        if didSolicit && (info.Import != nil || info.Export != nil) {
1,483✔
1660
                perms := &Permissions{
20✔
1661
                        Publish:   info.Export,
20✔
1662
                        Subscribe: info.Import,
20✔
1663
                }
20✔
1664
                // Check if we have local deny clauses that we need to merge.
20✔
1665
                if remote := c.leaf.remote; remote != nil {
40✔
1666
                        if len(remote.DenyExports) > 0 {
21✔
1667
                                if perms.Publish == nil {
1✔
1668
                                        perms.Publish = &SubjectPermission{}
×
1669
                                }
×
1670
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1671
                        }
1672
                        if len(remote.DenyImports) > 0 {
21✔
1673
                                if perms.Subscribe == nil {
1✔
1674
                                        perms.Subscribe = &SubjectPermission{}
×
1675
                                }
×
1676
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1677
                        }
1678
                }
1679
                c.setPermissions(perms)
20✔
1680
        }
1681

1682
        var resumeConnect bool
1,463✔
1683

1,463✔
1684
        // If this is a remote connection and this is the first INFO protocol,
1,463✔
1685
        // then we need to finish the connect process by sending CONNECT, etc..
1,463✔
1686
        if firstINFO && didSolicit {
2,163✔
1687
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
700✔
1688
                c.nc.SetDeadline(time.Time{})
700✔
1689
                resumeConnect = true
700✔
1690
        } else if !firstINFO && didSolicit {
2,136✔
1691
                c.leaf.remoteAccName = info.RemoteAccount
673✔
1692
        }
673✔
1693

1694
        // Check if we have the remote account information and if so make sure it's stored.
1695
        if info.RemoteAccount != _EMPTY_ {
2,126✔
1696
                if c.acc == nil {
664✔
1697
                        c.mu.Unlock()
1✔
1698
                        c.sendErr("Authorization Violation")
1✔
1699
                        c.closeConnection(ProtocolViolation)
1✔
1700
                        return
1✔
1701
                }
1✔
1702
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
662✔
1703
        }
1704
        c.mu.Unlock()
1,462✔
1705

1,462✔
1706
        finishConnect := info.ConnectInfo
1,462✔
1707
        if resumeConnect && s != nil {
2,162✔
1708
                s.leafNodeResumeConnectProcess(c)
700✔
1709
                if !info.InfoOnConnect {
700✔
1710
                        finishConnect = true
×
1711
                }
×
1712
        }
1713
        if finishConnect {
2,125✔
1714
                s.leafNodeFinishConnectProcess(c)
663✔
1715
        }
663✔
1716

1717
        // Check to see if we need to kick any internal source or mirror consumers.
1718
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1719
        // does not have jetstream.
1720
        s.checkInternalSyncConsumers(c.acc)
1,462✔
1721
}
1722

1723
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,289✔
1724
        // Negotiate the appropriate compression mode (or no compression)
1,289✔
1725
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,289✔
1726
        if err != nil {
1,289✔
1727
                return false, err
×
1728
        }
×
1729
        c.mu.Lock()
1,289✔
1730
        // For "auto" mode, set the initial compression mode based on RTT
1,289✔
1731
        if cm == CompressionS2Auto {
2,453✔
1732
                if c.rttStart.IsZero() {
2,328✔
1733
                        c.rtt = computeRTT(c.start)
1,164✔
1734
                }
1,164✔
1735
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,164✔
1736
        }
1737
        // Keep track of the negotiated compression mode.
1738
        c.leaf.compression = cm
1,289✔
1739
        cid := c.cid
1,289✔
1740
        var nonce string
1,289✔
1741
        if !didSolicit {
1,888✔
1742
                nonce = bytesToString(c.nonce)
599✔
1743
        }
599✔
1744
        c.mu.Unlock()
1,289✔
1745

1,289✔
1746
        if !needsCompression(cm) {
1,382✔
1747
                return false, nil
93✔
1748
        }
93✔
1749

1750
        // If we end-up doing compression...
1751

1752
        // Generate an INFO with the chosen compression mode.
1753
        s.mu.Lock()
1,196✔
1754
        info := s.copyLeafNodeInfo()
1,196✔
1755
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,196✔
1756
        infoProto := generateInfoJSON(info)
1,196✔
1757
        s.mu.Unlock()
1,196✔
1758

1,196✔
1759
        // If we solicited, then send this INFO protocol BEFORE switching
1,196✔
1760
        // to compression writer. However, if we did not, we send it after.
1,196✔
1761
        c.mu.Lock()
1,196✔
1762
        if didSolicit {
1,796✔
1763
                c.enqueueProto(infoProto)
600✔
1764
                // Make sure it is completely flushed (the pending bytes goes to
600✔
1765
                // 0) before proceeding.
600✔
1766
                for c.out.pb > 0 && !c.isClosed() {
1,199✔
1767
                        c.flushOutbound()
599✔
1768
                }
599✔
1769
        }
1770
        // This is to notify the readLoop that it should switch to a
1771
        // (de)compression reader.
1772
        c.in.flags.set(switchToCompression)
1,196✔
1773
        // Create the compress writer before queueing the INFO protocol for
1,196✔
1774
        // a route that did not solicit. It will make sure that that proto
1,196✔
1775
        // is sent with compression on.
1,196✔
1776
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,196✔
1777
        if !didSolicit {
1,792✔
1778
                c.enqueueProto(infoProto)
596✔
1779
        }
596✔
1780
        c.mu.Unlock()
1,196✔
1781
        return true, nil
1,196✔
1782
}
1783

1784
// When getting a leaf node INFO protocol, use the provided
1785
// array of urls to update the list of possible endpoints.
1786
func (c *client) updateLeafNodeURLs(info *Info) {
1,370✔
1787
        cfg := c.leaf.remote
1,370✔
1788
        cfg.Lock()
1,370✔
1789
        defer cfg.Unlock()
1,370✔
1790

1,370✔
1791
        // We have ensured that if a remote has a WS scheme, then all are.
1,370✔
1792
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,370✔
1793
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,428✔
1794
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1795
                // we will have already marked that the remote should use TLS anyway.
58✔
1796
                // But use proper scheme for log statements, etc...
58✔
1797
                proto := wsSchemePrefix
58✔
1798
                if cfg.TLS {
58✔
1799
                        proto = wsSchemePrefixTLS
×
1800
                }
×
1801
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1802
                return
58✔
1803
        }
1804
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,312✔
1805
}
1806

1807
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,370✔
1808
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,370✔
1809
        // Add the ones we receive in the protocol
1,370✔
1810
        for _, surl := range URLs {
3,782✔
1811
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,412✔
1812
                if err != nil {
2,412✔
1813
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1814
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1815
                        continue
×
1816
                }
1817
                // Do not add if it's the same as what we already have configured.
1818
                var dup bool
2,412✔
1819
                for _, u := range cfg.URLs {
6,073✔
1820
                        // URLs that we receive never have user info, but the
3,661✔
1821
                        // ones that were configured may have. Simply compare
3,661✔
1822
                        // host and port to decide if they are equal or not.
3,661✔
1823
                        if url.Host == u.Host && url.Port() == u.Port() {
5,418✔
1824
                                dup = true
1,757✔
1825
                                break
1,757✔
1826
                        }
1827
                }
1828
                if !dup {
3,067✔
1829
                        cfg.urls = append(cfg.urls, url)
655✔
1830
                        cfg.saveTLSHostname(url)
655✔
1831
                }
655✔
1832
        }
1833
        // Add the configured one
1834
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,370✔
1835
}
1836

1837
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1838
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,969✔
1839
        opts := s.getOpts()
3,969✔
1840
        if opts.LeafNode.Advertise != _EMPTY_ {
3,980✔
1841
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1842
                if err != nil {
11✔
1843
                        return err
×
1844
                }
×
1845
                s.leafNodeInfo.Host = advHost
11✔
1846
                s.leafNodeInfo.Port = advPort
11✔
1847
        } else {
3,958✔
1848
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,958✔
1849
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,958✔
1850
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,958✔
1851
                // This will return at most 1 IP.
3,958✔
1852
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,958✔
1853
                if err != nil {
3,958✔
1854
                        return err
×
1855
                }
×
1856
                if hostIsIPAny {
4,267✔
1857
                        if len(ips) == 0 {
309✔
1858
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1859
                                        s.leafNodeInfo.Host)
×
1860
                        } else {
309✔
1861
                                // Take the first from the list...
309✔
1862
                                s.leafNodeInfo.Host = ips[0]
309✔
1863
                        }
309✔
1864
                }
1865
        }
1866
        // Use just host:port for the IP
1867
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,969✔
1868
        if opts.LeafNode.Advertise != _EMPTY_ {
3,980✔
1869
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1870
        }
11✔
1871
        return nil
3,969✔
1872
}
1873

1874
// Add the connection to the map of leaf nodes.
1875
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1876
// if a connection already exists for the same server name and account.
1877
// That can happen when the remote is attempting to reconnect while the accepting
1878
// side did not detect the connection as broken yet.
1879
// But it can also happen when there is a misconfiguration and the remote is
1880
// creating two (or more) connections that bind to the same account on the accept
1881
// side.
1882
// When a duplicate is found, the new connection is accepted and the old is closed
1883
// (this solves the stale connection situation). An error is returned to help the
1884
// remote detect the misconfiguration when the duplicate is the result of that
1885
// misconfiguration.
1886
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) bool {
1,365✔
1887
        var accName string
1,365✔
1888
        c.mu.Lock()
1,365✔
1889
        cid := c.cid
1,365✔
1890
        acc := c.acc
1,365✔
1891
        if acc != nil {
2,730✔
1892
                accName = acc.Name
1,365✔
1893
        }
1,365✔
1894
        myRemoteDomain := c.leaf.remoteDomain
1,365✔
1895
        mySrvName := c.leaf.remoteServer
1,365✔
1896
        remoteAccName := c.leaf.remoteAccName
1,365✔
1897
        myClustName := c.leaf.remoteCluster
1,365✔
1898
        remote := c.leaf.remote
1,365✔
1899
        solicited := remote != nil
1,365✔
1900
        c.mu.Unlock()
1,365✔
1901

1,365✔
1902
        var old *client
1,365✔
1903
        s.mu.Lock()
1,365✔
1904
        // We check for empty because in some test we may send empty CONNECT{}
1,365✔
1905
        if checkForDup && srvName != _EMPTY_ {
2,031✔
1906
                for _, ol := range s.leafs {
1,062✔
1907
                        ol.mu.Lock()
396✔
1908
                        // We care here only about non solicited Leafnode. This function
396✔
1909
                        // is more about replacing stale connections than detecting loops.
396✔
1910
                        // We have code for the loop detection elsewhere, which also delays
396✔
1911
                        // attempt to reconnect.
396✔
1912
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
396✔
1913
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
396✔
1914
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
398✔
1915
                                old = ol
2✔
1916
                        }
2✔
1917
                        ol.mu.Unlock()
396✔
1918
                        if old != nil {
398✔
1919
                                break
2✔
1920
                        }
1921
                }
1922
        }
1923
        // Now that we are under the server lock and before adding it to the map,
1924
        // for a solicited leaf, we need to make sure that it has not been removed
1925
        // from the config or disabled.
1926
        if solicited {
2,025✔
1927
                // If no longer valid, do not add to the server map. The connection
660✔
1928
                // should have been marked so that it can't reconnect. When the caller
660✔
1929
                // calls closeConnection(), cleanup (including clearing the connect-
660✔
1930
                // in-progress flag) will occur at the appropriate time.
660✔
1931
                if !remote.stillValid() {
660✔
1932
                        // Prevent reconnect in case it was not yet done.
×
1933
                        c.setNoReconnect()
×
1934
                        s.mu.Unlock()
×
1935
                        s.removeFromTempClients(cid)
×
1936
                        return false
×
1937
                }
×
1938
                remote.setConnectInProgress(false)
660✔
1939
        }
1940
        // Store new connection in the map
1941
        s.leafs[cid] = c
1,365✔
1942
        s.mu.Unlock()
1,365✔
1943
        s.removeFromTempClients(cid)
1,365✔
1944

1,365✔
1945
        // If applicable, evict the old one.
1,365✔
1946
        if old != nil {
1,367✔
1947
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1948
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1949
                c.Warnf("Replacing connection from same server")
2✔
1950
        }
2✔
1951

1952
        srvDecorated := func() string {
1,582✔
1953
                if myClustName == _EMPTY_ {
244✔
1954
                        return mySrvName
27✔
1955
                }
27✔
1956
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
190✔
1957
        }
1958

1959
        opts := s.getOpts()
1,365✔
1960
        sysAcc := s.SystemAccount()
1,365✔
1961
        js := s.getJetStream()
1,365✔
1962
        var meta *raft
1,365✔
1963
        if js != nil {
1,924✔
1964
                if mg := js.getMetaGroup(); mg != nil {
999✔
1965
                        meta = mg.(*raft)
440✔
1966
                }
440✔
1967
        }
1968
        blockMappingOutgoing := false
1,365✔
1969
        // Deny (non domain) JetStream API traffic unless system account is shared
1,365✔
1970
        // and domain names are identical and extending is not disabled
1,365✔
1971

1,365✔
1972
        // Check if backwards compatibility has been enabled and needs to be acted on
1,365✔
1973
        forceSysAccDeny := false
1,365✔
1974
        if len(opts.JsAccDefaultDomain) > 0 {
1,403✔
1975
                if acc == sysAcc {
49✔
1976
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1977
                                if d == _EMPTY_ {
19✔
1978
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1979
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1980
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1981
                                        forceSysAccDeny = true
8✔
1982
                                        break
8✔
1983
                                }
1984
                        }
1985
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
43✔
1986
                        // for backwards compatibility with old setups that do not have a domain name set
16✔
1987
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
16✔
1988
                        return true
16✔
1989
                }
16✔
1990
        }
1991

1992
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1993
        // This is either signaled by js being disabled and a domain set,
1994
        // or in cases where no domain name exists, an extension hint is set.
1995
        // However, this is only relevant in mixed setups.
1996
        //
1997
        // If the system account connects but default domains are present, JetStream can't be extended.
1998
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,349✔
1999
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,537✔
2000
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,188✔
2001
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,188✔
2002
                if acc != nil && acc == sysAcc {
1,333✔
2003
                        c.Noticef("System account connected from %s", srvDecorated())
145✔
2004
                        c.Noticef("JetStream not extended, domains differ")
145✔
2005
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
145✔
2006
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
145✔
2007
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
145✔
2008
                        if solicited && meta != nil && meta.IsObserver() {
176✔
2009
                                meta.setObserver(false, extNotExtended)
31✔
2010
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
31✔
2011
                                // Take note that the domain was not extended to avoid this state from startup.
31✔
2012
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
31✔
2013
                                // Meta controller can't be leader yet.
31✔
2014
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
31✔
2015
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
31✔
2016
                                meta.Campaign()
31✔
2017
                        }
31✔
2018
                } else {
1,043✔
2019
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
1,043✔
2020
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
1,043✔
2021
                }
1,043✔
2022
                blockMappingOutgoing = true
1,188✔
2023
        } else if acc == sysAcc {
233✔
2024
                // system account and same domain
72✔
2025
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
2026
                        myRemoteDomain, srvDecorated())
72✔
2027
                // In an extension use case, pin leadership to server remotes connect to.
72✔
2028
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
2029
                if solicited && meta != nil && !meta.IsObserver() {
76✔
2030
                        meta.setObserver(true, extExtended)
4✔
2031
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
2032
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
2033
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
2034
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
2035
                        meta.StepDown()
4✔
2036
                }
4✔
2037
        } else {
89✔
2038
                // This deny is needed in all cases (system account shared or not)
89✔
2039
                // If the system account is shared, jsAllAPI traffic will go through the system account.
89✔
2040
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
89✔
2041
                // If the system account is NOT shared, jsAllAPI traffic has no business
89✔
2042
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
89✔
2043
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
89✔
2044
        }
89✔
2045
        // If we have a specified JetStream domain we will want to add a mapping to
2046
        // allow access cross domain for each non-system account.
2047
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,606✔
2048
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,570✔
2049
                        if err := acc.AddMapping(src, dest); err != nil {
2,313✔
2050
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
2051
                        } else {
2,313✔
2052
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,313✔
2053
                        }
2,313✔
2054
                }
2055
                if blockMappingOutgoing {
483✔
2056
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
226✔
2057
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
226✔
2058
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
226✔
2059
                        // of this issue, not all of them.
226✔
2060
                        // This guards against a hub and a spoke having the same domain name.
226✔
2061
                        // But not two spokes having the same one and the request coming from the hub.
226✔
2062
                        c.mergeDenyPermissionsLocked(pub, []string{src})
226✔
2063
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
226✔
2064
                }
226✔
2065
        }
2066
        return true
1,349✔
2067
}
2068

2069
func (s *Server) removeLeafNodeConnection(c *client) {
1,740✔
2070
        s.mu.Lock()
1,740✔
2071
        c.mu.Lock()
1,740✔
2072
        cid := c.cid
1,740✔
2073
        if c.leaf != nil {
3,479✔
2074
                if c.leaf.tsubt != nil {
2,986✔
2075
                        c.leaf.tsubt.Stop()
1,247✔
2076
                        c.leaf.tsubt = nil
1,247✔
2077
                }
1,247✔
2078
                if c.leaf.gwSub != nil {
2,399✔
2079
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
660✔
2080
                        // We need to set this to nil for GC to release the connection
660✔
2081
                        c.leaf.gwSub = nil
660✔
2082
                }
660✔
2083
                if remote := c.leaf.remote; remote != nil {
2,567✔
2084
                        // If "noReconnect" is true, then we won't attempt to reconnect, so
828✔
2085
                        // we will clear the "connect-in-progress" flag. However, if we can
828✔
2086
                        // reconnect, then we should set "connect-in-progress" to true while
828✔
2087
                        // we are under the server/client lock. The go routine that performs
828✔
2088
                        // the reconnect will be started later and there would be a gap with
828✔
2089
                        // the wrong flag value otherwise.
828✔
2090
                        remote.setConnectInProgress(!c.flags.isSet(noReconnect))
828✔
2091
                }
828✔
2092
        }
2093
        proxyKey := c.proxyKey
1,740✔
2094
        c.mu.Unlock()
1,740✔
2095
        delete(s.leafs, cid)
1,740✔
2096
        if proxyKey != _EMPTY_ {
1,744✔
2097
                s.removeProxiedConn(proxyKey, cid)
4✔
2098
        }
4✔
2099
        s.mu.Unlock()
1,740✔
2100
        s.removeFromTempClients(cid)
1,740✔
2101
}
2102

2103
// Connect information for solicited leafnodes.
2104
type leafConnectInfo struct {
2105
        Version   string   `json:"version,omitempty"`
2106
        Nkey      string   `json:"nkey,omitempty"`
2107
        JWT       string   `json:"jwt,omitempty"`
2108
        Sig       string   `json:"sig,omitempty"`
2109
        User      string   `json:"user,omitempty"`
2110
        Pass      string   `json:"pass,omitempty"`
2111
        Token     string   `json:"auth_token,omitempty"`
2112
        ID        string   `json:"server_id,omitempty"`
2113
        Domain    string   `json:"domain,omitempty"`
2114
        Name      string   `json:"name,omitempty"`
2115
        Hub       bool     `json:"is_hub,omitempty"`
2116
        Cluster   string   `json:"cluster,omitempty"`
2117
        Headers   bool     `json:"headers,omitempty"`
2118
        JetStream bool     `json:"jetstream,omitempty"`
2119
        DenyPub   []string `json:"deny_pub,omitempty"`
2120
        Isolate   bool     `json:"isolate,omitempty"`
2121

2122
        // There was an existing field called:
2123
        // >> Comp bool `json:"compression,omitempty"`
2124
        // that has never been used. With support for compression, we now need
2125
        // a field that is a string. So we use a different json tag:
2126
        Compression string `json:"compress_mode,omitempty"`
2127

2128
        // Just used to detect wrong connection attempts.
2129
        Gateway string `json:"gateway,omitempty"`
2130

2131
        // Tells the accept side which account the remote is binding to.
2132
        RemoteAccount string `json:"remote_account,omitempty"`
2133

2134
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2135
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2136
        // version as part of the CONNECT. It will indicate if a connection supports
2137
        // some features, such as message tracing.
2138
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2139
        // in the low level process CONNECT.
2140
        Proto int `json:"protocol,omitempty"`
2141
}
2142

2143
// processLeafNodeConnect will process the inbound connect args.
2144
// Once we are here we are bound to an account, so can send any interest that
2145
// we would have to the other side.
2146
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
710✔
2147
        // Way to detect clients that incorrectly connect to the route listen
710✔
2148
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
710✔
2149
        if lang != _EMPTY_ {
710✔
2150
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2151
                c.closeConnection(WrongPort)
×
2152
                return ErrClientConnectedToLeafNodePort
×
2153
        }
×
2154

2155
        // Unmarshal as a leaf node connect protocol
2156
        proto := &leafConnectInfo{}
710✔
2157
        if err := json.Unmarshal(arg, proto); err != nil {
710✔
2158
                return err
×
2159
        }
×
2160

2161
        // Reject a cluster that contains spaces.
2162
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
711✔
2163
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2164
                c.closeConnection(ProtocolViolation)
1✔
2165
                return ErrClusterNameHasSpaces
1✔
2166
        }
1✔
2167

2168
        // Check for cluster name collisions.
2169
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
712✔
2170
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2171
                c.closeConnection(ClusterNamesIdentical)
3✔
2172
                return ErrLeafNodeHasSameClusterName
3✔
2173
        }
3✔
2174

2175
        // Reject if this has Gateway which means that it would be from a gateway
2176
        // connection that incorrectly connects to the leafnode port.
2177
        if proto.Gateway != _EMPTY_ {
706✔
2178
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2179
                c.Errorf(errTxt)
×
2180
                c.sendErr(errTxt)
×
2181
                c.closeConnection(WrongGateway)
×
2182
                return ErrWrongGateway
×
2183
        }
×
2184

2185
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
708✔
2186
                major, minor, update, _ := versionComponents(mv)
2✔
2187
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2188
                        // Send back an INFO so recent remote servers process the rejection
1✔
2189
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2190
                        // reconnect delay when it processes the error.
1✔
2191
                        s.sendPermsAndAccountInfo(c)
1✔
2192
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2193
                        c.closeConnection(MinimumVersionRequired)
1✔
2194
                        return ErrMinimumVersionRequired
1✔
2195
                }
1✔
2196
        }
2197

2198
        // Check if this server supports headers.
2199
        supportHeaders := c.srv.supportsHeaders()
705✔
2200

705✔
2201
        c.mu.Lock()
705✔
2202
        // Leaf Nodes do not do echo or verbose or pedantic.
705✔
2203
        c.opts.Verbose = false
705✔
2204
        c.opts.Echo = false
705✔
2205
        c.opts.Pedantic = false
705✔
2206
        // This inbound connection will be marked as supporting headers if this server
705✔
2207
        // support headers and the remote has sent in the CONNECT protocol that it does
705✔
2208
        // support headers too.
705✔
2209
        c.headers = supportHeaders && proto.Headers
705✔
2210
        // If the compression level is still not set, set it based on what has been
705✔
2211
        // given to us in the CONNECT protocol.
705✔
2212
        if c.leaf.compression == _EMPTY_ {
843✔
2213
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
138✔
2214
                if proto.Compression == _EMPTY_ {
180✔
2215
                        c.leaf.compression = CompressionNotSupported
42✔
2216
                } else {
138✔
2217
                        c.leaf.compression = proto.Compression
96✔
2218
                }
96✔
2219
        }
2220

2221
        // Remember the remote server.
2222
        c.leaf.remoteServer = proto.Name
705✔
2223
        // Remember the remote account name
705✔
2224
        c.leaf.remoteAccName = proto.RemoteAccount
705✔
2225
        // Remember if the leafnode requested isolation.
705✔
2226
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
705✔
2227

705✔
2228
        // If the other side has declared itself a hub, so we will take on the spoke role.
705✔
2229
        if proto.Hub {
723✔
2230
                c.leaf.isSpoke = true
18✔
2231
        }
18✔
2232

2233
        // The soliciting side is part of a cluster.
2234
        if proto.Cluster != _EMPTY_ {
1,245✔
2235
                c.leaf.remoteCluster = proto.Cluster
540✔
2236
        }
540✔
2237

2238
        c.leaf.remoteDomain = proto.Domain
705✔
2239

705✔
2240
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
705✔
2241
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
705✔
2242
        if !c.isSolicitedLeafNode() && c.perms != nil {
728✔
2243
                sp, pp := c.perms.sub, c.perms.pub
23✔
2244
                c.perms.sub, c.perms.pub = pp, sp
23✔
2245
                if c.opts.Import != nil {
45✔
2246
                        c.darray = c.opts.Import.Deny
22✔
2247
                } else {
23✔
2248
                        c.darray = nil
1✔
2249
                }
1✔
2250
        }
2251

2252
        // Set the Ping timer
2253
        c.setFirstPingTimer()
705✔
2254

705✔
2255
        // If we received pub deny permissions from the other end, merge with existing ones.
705✔
2256
        c.mergeDenyPermissions(pub, proto.DenyPub)
705✔
2257

705✔
2258
        acc := c.acc
705✔
2259
        c.mu.Unlock()
705✔
2260

705✔
2261
        // If the account is not set (e.g. connection was closed due to auth
705✔
2262
        // timeout while still being processed), bail out to avoid a panic.
705✔
2263
        if acc == nil {
705✔
2264
                c.closeConnection(MissingAccount)
×
2265
                return ErrMissingAccount
×
2266
        }
×
2267

2268
        // Register the cluster, even if empty, as long as we are acting as a hub.
2269
        if !proto.Hub {
1,392✔
2270
                acc.registerLeafNodeCluster(proto.Cluster)
687✔
2271
        }
687✔
2272

2273
        // Add in the leafnode here since we passed through auth at this point.
2274
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
705✔
2275

705✔
2276
        // If we have permissions bound to this leafnode we need to send then back to the
705✔
2277
        // origin server for local enforcement.
705✔
2278
        s.sendPermsAndAccountInfo(c)
705✔
2279

705✔
2280
        // Create and initialize the smap since we know our bound account now.
705✔
2281
        // This will send all registered subs too.
705✔
2282
        s.initLeafNodeSmapAndSendSubs(c)
705✔
2283

705✔
2284
        // Announce the account connect event for a leaf node.
705✔
2285
        // This will be a no-op as needed.
705✔
2286
        s.sendLeafNodeConnect(c.acc)
705✔
2287

705✔
2288
        // Check to see if we need to kick any internal source or mirror consumers.
705✔
2289
        // This will be a no-op if JetStream not enabled for this server or if the bound account
705✔
2290
        // does not have jetstream.
705✔
2291
        s.checkInternalSyncConsumers(acc)
705✔
2292

705✔
2293
        return nil
705✔
2294
}
2295

2296
// checkInternalSyncConsumers
2297
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,167✔
2298
        // Grab our js
2,167✔
2299
        js := s.getJetStream()
2,167✔
2300

2,167✔
2301
        // Only applicable if we have JS and the leafnode has JS as well.
2,167✔
2302
        // We check for remote JS outside.
2,167✔
2303
        if !js.isEnabled() || acc == nil {
3,417✔
2304
                return
1,250✔
2305
        }
1,250✔
2306

2307
        // We will check all streams in our local account. They must be a leader and
2308
        // be sourcing or mirroring. We will check the external config on the stream itself
2309
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2310
        // extending the system across this leafnode connection and hence we would be extending
2311
        // our own domain.
2312
        jsa := js.lookupAccount(acc)
917✔
2313
        if jsa == nil {
1,273✔
2314
                return
356✔
2315
        }
356✔
2316

2317
        var streams []*stream
561✔
2318
        jsa.mu.RLock()
561✔
2319
        for _, mset := range jsa.streams {
616✔
2320
                mset.cfgMu.RLock()
55✔
2321
                // We need to have a mirror or source defined.
55✔
2322
                // We do not want to force another lock here to look for leader status,
55✔
2323
                // so collect and after we release jsa will make sure.
55✔
2324
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
67✔
2325
                        streams = append(streams, mset)
12✔
2326
                }
12✔
2327
                mset.cfgMu.RUnlock()
55✔
2328
        }
2329
        jsa.mu.RUnlock()
561✔
2330

561✔
2331
        // Now loop through all candidates and check if we are the leader and have NOT
561✔
2332
        // created the sync up consumer.
561✔
2333
        for _, mset := range streams {
573✔
2334
                mset.retryDisconnectedSyncConsumers()
12✔
2335
        }
12✔
2336
}
2337

2338
// Returns the remote cluster name. This is set only once so does not require a lock.
2339
func (c *client) remoteCluster() string {
173,870✔
2340
        if c.leaf == nil {
173,870✔
2341
                return _EMPTY_
×
2342
        }
×
2343
        return c.leaf.remoteCluster
173,870✔
2344
}
2345

2346
// Sends back an info block to the soliciting leafnode to let it know about
2347
// its permission settings for local enforcement.
2348
func (s *Server) sendPermsAndAccountInfo(c *client) {
706✔
2349
        // Copy
706✔
2350
        s.mu.Lock()
706✔
2351
        info := s.copyLeafNodeInfo()
706✔
2352
        s.mu.Unlock()
706✔
2353
        c.mu.Lock()
706✔
2354
        info.CID = c.cid
706✔
2355
        info.Import = c.opts.Import
706✔
2356
        info.Export = c.opts.Export
706✔
2357
        info.RemoteAccount = c.acc.Name
706✔
2358
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
706✔
2359
        info.IsSystemAccount = c.acc == s.SystemAccount()
706✔
2360
        info.ConnectInfo = true
706✔
2361
        c.enqueueProto(generateInfoJSON(info))
706✔
2362
        c.mu.Unlock()
706✔
2363
}
706✔
2364

2365
// Snapshot the current subscriptions from the sublist into our smap which
2366
// we will keep updated from now on.
2367
// Also send the registered subscriptions.
2368
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,365✔
2369
        acc := c.acc
1,365✔
2370
        if acc == nil {
1,365✔
2371
                c.Debugf("Leafnode does not have an account bound")
×
2372
                return
×
2373
        }
×
2374
        // Collect all account subs here.
2375
        _subs := [1024]*subscription{}
1,365✔
2376
        subs := _subs[:0]
1,365✔
2377
        ims := []string{}
1,365✔
2378

1,365✔
2379
        // Hold the client lock otherwise there can be a race and miss some subs.
1,365✔
2380
        c.mu.Lock()
1,365✔
2381
        defer c.mu.Unlock()
1,365✔
2382

1,365✔
2383
        acc.mu.RLock()
1,365✔
2384
        accName := acc.Name
1,365✔
2385
        accNTag := acc.nameTag
1,365✔
2386

1,365✔
2387
        // To make printing look better when no friendly name present.
1,365✔
2388
        if accNTag != _EMPTY_ {
1,377✔
2389
                accNTag = "/" + accNTag
12✔
2390
        }
12✔
2391

2392
        // If we are solicited we only send interest for local clients.
2393
        if c.isSpokeLeafNode() {
2,025✔
2394
                acc.sl.localSubs(&subs, true)
660✔
2395
        } else {
1,365✔
2396
                acc.sl.All(&subs)
705✔
2397
        }
705✔
2398

2399
        // Check if we have an existing service import reply.
2400
        siReply := copyBytes(acc.siReply)
1,365✔
2401

1,365✔
2402
        // Since leaf nodes only send on interest, if the bound
1,365✔
2403
        // account has import services we need to send those over.
1,365✔
2404
        for isubj := range acc.imports.services {
6,463✔
2405
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,402✔
2406
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
304✔
2407
                        continue
304✔
2408
                }
2409
                ims = append(ims, isubj)
4,794✔
2410
        }
2411
        // Likewise for mappings.
2412
        for _, m := range acc.mappings {
3,800✔
2413
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,481✔
2414
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2415
                        continue
46✔
2416
                }
2417
                ims = append(ims, m.src)
2,389✔
2418
        }
2419

2420
        // Create a unique subject that will be used for loop detection.
2421
        lds := acc.lds
1,365✔
2422
        acc.mu.RUnlock()
1,365✔
2423

1,365✔
2424
        // Check if we have to create the LDS.
1,365✔
2425
        if lds == _EMPTY_ {
2,433✔
2426
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,068✔
2427
                acc.mu.Lock()
1,068✔
2428
                acc.lds = lds
1,068✔
2429
                acc.mu.Unlock()
1,068✔
2430
        }
1,068✔
2431

2432
        // Now check for gateway interest. Leafnodes will put this into
2433
        // the proper mode to propagate, but they are not held in the account.
2434
        gwsa := [16]*client{}
1,365✔
2435
        gws := gwsa[:0]
1,365✔
2436
        s.getOutboundGatewayConnections(&gws)
1,365✔
2437
        for _, cgw := range gws {
1,448✔
2438
                cgw.mu.Lock()
83✔
2439
                gw := cgw.gw
83✔
2440
                cgw.mu.Unlock()
83✔
2441
                if gw != nil {
166✔
2442
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
166✔
2443
                                if e := ei.(*outsie); e != nil && e.sl != nil {
166✔
2444
                                        e.sl.All(&subs)
83✔
2445
                                }
83✔
2446
                        }
2447
                }
2448
        }
2449

2450
        applyGlobalRouting := s.gateway.enabled
1,365✔
2451
        if c.isSpokeLeafNode() {
2,025✔
2452
                // Add a fake subscription for this solicited leafnode connection
660✔
2453
                // so that we can send back directly for mapped GW replies.
660✔
2454
                // We need to keep track of this subscription so it can be removed
660✔
2455
                // when the connection is closed so that the GC can release it.
660✔
2456
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
660✔
2457
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
660✔
2458
        }
660✔
2459

2460
        // Now walk the results and add them to our smap
2461
        rc := c.leaf.remoteCluster
1,365✔
2462
        c.leaf.smap = make(map[string]int32)
1,365✔
2463
        for _, sub := range subs {
40,409✔
2464
                // Check perms regardless of role.
39,044✔
2465
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
41,509✔
2466
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,465✔
2467
                        continue
2,465✔
2468
                }
2469
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2470
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
36,594✔
2471
                        continue
15✔
2472
                }
2473
                // We ignore ourselves here.
2474
                // Also don't add the subscription if it has a origin cluster and the
2475
                // cluster name matches the one of the client we are sending to.
2476
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
67,632✔
2477
                        count := int32(1)
31,068✔
2478
                        if len(sub.queue) > 0 && sub.qw > 0 {
31,077✔
2479
                                count = sub.qw
9✔
2480
                        }
9✔
2481
                        c.leaf.smap[keyFromSub(sub)] += count
31,068✔
2482
                        if c.leaf.tsub == nil {
32,349✔
2483
                                c.leaf.tsub = make(map[*subscription]struct{})
1,281✔
2484
                        }
1,281✔
2485
                        c.leaf.tsub[sub] = struct{}{}
31,068✔
2486
                }
2487
        }
2488
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2489
        for _, isubj := range ims {
8,548✔
2490
                c.leaf.smap[isubj]++
7,183✔
2491
        }
7,183✔
2492
        // If we have gateways enabled we need to make sure the other side sends us responses
2493
        // that have been augmented from the original subscription.
2494
        // TODO(dlc) - Should we lock this down more?
2495
        if applyGlobalRouting {
1,469✔
2496
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
104✔
2497
                c.leaf.smap[gwReplyPrefix+">"]++
104✔
2498
        }
104✔
2499
        // Detect loops by subscribing to a specific subject and checking
2500
        // if this sub is coming back to us.
2501
        c.leaf.smap[lds]++
1,365✔
2502

1,365✔
2503
        // Check if we need to add an existing siReply to our map.
1,365✔
2504
        // This will be a prefix so add on the wildcard.
1,365✔
2505
        if siReply != nil {
1,381✔
2506
                wcsub := append(siReply, '>')
16✔
2507
                c.leaf.smap[string(wcsub)]++
16✔
2508
        }
16✔
2509
        // Queue all protocols. There is no max pending limit for LN connection,
2510
        // so we don't need chunking. The writes will happen from the writeLoop.
2511
        var b bytes.Buffer
1,365✔
2512
        for key, n := range c.leaf.smap {
28,946✔
2513
                c.writeLeafSub(&b, key, n)
27,581✔
2514
        }
27,581✔
2515
        if b.Len() > 0 {
2,730✔
2516
                c.enqueueProto(b.Bytes())
1,365✔
2517
        }
1,365✔
2518
        if c.leaf.tsub != nil {
2,647✔
2519
                // Clear the tsub map after 5 seconds.
1,282✔
2520
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,317✔
2521
                        c.mu.Lock()
35✔
2522
                        if c.leaf != nil {
70✔
2523
                                c.leaf.tsub = nil
35✔
2524
                                c.leaf.tsubt = nil
35✔
2525
                        }
35✔
2526
                        c.mu.Unlock()
35✔
2527
                })
2528
        }
2529
}
2530

2531
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2532
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
205,947✔
2533
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
205,947✔
2534
        acc, err := s.lookupOrFetchAccount(accName, false)
205,947✔
2535
        if acc == nil || err != nil {
206,291✔
2536
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
344✔
2537
                return
344✔
2538
        }
344✔
2539
        acc.updateLeafNodes(sub, delta)
205,603✔
2540
}
2541

2542
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2543
// Will also forward to all leaf nodes as needed.
2544
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2545
// (that is, for which this server acts as a hub to them).
2546
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,547,874✔
2547
        if acc == nil || sub == nil {
2,547,874✔
2548
                return
×
2549
        }
×
2550

2551
        // We will do checks for no leafnodes and same cluster here inline and under the
2552
        // general account read lock.
2553
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2554
        // blocking routes or GWs.
2555

2556
        acc.mu.RLock()
2,547,874✔
2557
        // First check if we even have leafnodes here.
2,547,874✔
2558
        if acc.nleafs == 0 {
5,025,173✔
2559
                acc.mu.RUnlock()
2,477,299✔
2560
                return
2,477,299✔
2561
        }
2,477,299✔
2562

2563
        // Is this a loop detection subject.
2564
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
70,575✔
2565

70,575✔
2566
        // Capture the cluster even if its empty.
70,575✔
2567
        var cluster string
70,575✔
2568
        if sub.origin != nil {
121,035✔
2569
                cluster = bytesToString(sub.origin)
50,460✔
2570
        }
50,460✔
2571

2572
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2573
        // Empty clusters will return false for the check.
2574
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
92,769✔
2575
                acc.mu.RUnlock()
22,194✔
2576
                return
22,194✔
2577
        }
22,194✔
2578

2579
        // We can release the general account lock.
2580
        acc.mu.RUnlock()
48,381✔
2581

48,381✔
2582
        // We can hold the list lock here to avoid having to copy a large slice.
48,381✔
2583
        acc.lmu.RLock()
48,381✔
2584
        defer acc.lmu.RUnlock()
48,381✔
2585

48,381✔
2586
        // Do this once.
48,381✔
2587
        subject := string(sub.subject)
48,381✔
2588

48,381✔
2589
        // Walk the connected leafnodes.
48,381✔
2590
        for _, ln := range acc.lleafs {
108,361✔
2591
                if ln == sub.client {
91,317✔
2592
                        continue
31,337✔
2593
                }
2594
                ln.mu.Lock()
28,643✔
2595
                // Don't advertise interest from leafnodes to other isolated leafnodes.
28,643✔
2596
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
28,674✔
2597
                        ln.mu.Unlock()
31✔
2598
                        continue
31✔
2599
                }
2600
                // If `hubOnly` is true, it means that we want to update only leafnodes
2601
                // that connect to this server (so isHubLeafNode() would return `true`).
2602
                if hubOnly && !ln.isHubLeafNode() {
28,618✔
2603
                        ln.mu.Unlock()
6✔
2604
                        continue
6✔
2605
                }
2606
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2607
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2608
                // the detection of loops as long as different cluster.
2609
                clusterDifferent := cluster != ln.remoteCluster()
28,606✔
2610
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
52,857✔
2611
                        ln.updateSmap(sub, delta, isLDS)
24,251✔
2612
                }
24,251✔
2613
                ln.mu.Unlock()
28,606✔
2614
        }
2615
}
2616

2617
// updateLeafNodes will make sure to update the account smap for the subscription.
2618
// Will also forward to all leaf nodes as needed.
2619
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,547,851✔
2620
        acc.updateLeafNodesEx(sub, delta, false)
2,547,851✔
2621
}
2,547,851✔
2622

2623
// This will make an update to our internal smap and determine if we should send out
2624
// an interest update to the remote side.
2625
// Lock should be held.
2626
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
24,251✔
2627
        if c.leaf.smap == nil {
24,273✔
2628
                return
22✔
2629
        }
22✔
2630

2631
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2632
        skind := sub.client.kind
24,229✔
2633
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
24,229✔
2634
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,672✔
2635
                return
8,443✔
2636
        }
8,443✔
2637

2638
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2639
        if delta > 0 && c.leaf.tsub != nil {
23,355✔
2640
                if _, present := c.leaf.tsub[sub]; present {
7,571✔
2641
                        delete(c.leaf.tsub, sub)
2✔
2642
                        if len(c.leaf.tsub) == 0 {
2✔
2643
                                c.leaf.tsub = nil
×
2644
                                c.leaf.tsubt.Stop()
×
2645
                                c.leaf.tsubt = nil
×
2646
                        }
×
2647
                        return
2✔
2648
                }
2649
        }
2650

2651
        key := keyFromSub(sub)
15,784✔
2652
        n, ok := c.leaf.smap[key]
15,784✔
2653
        if delta < 0 && !ok {
16,881✔
2654
                return
1,097✔
2655
        }
1,097✔
2656

2657
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2658
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,687✔
2659
        n += delta
14,687✔
2660
        if n > 0 {
25,557✔
2661
                c.leaf.smap[key] = n
10,870✔
2662
        } else {
14,687✔
2663
                delete(c.leaf.smap, key)
3,817✔
2664
        }
3,817✔
2665
        if update {
24,827✔
2666
                c.sendLeafNodeSubUpdate(key, n)
10,140✔
2667
        }
10,140✔
2668
}
2669

2670
// Used to force add subjects to the subject map.
2671
func (c *client) forceAddToSmap(subj string) {
13✔
2672
        c.mu.Lock()
13✔
2673
        defer c.mu.Unlock()
13✔
2674

13✔
2675
        if c.leaf.smap == nil {
13✔
2676
                return
×
2677
        }
×
2678
        n := c.leaf.smap[subj]
13✔
2679
        if n != 0 {
14✔
2680
                return
1✔
2681
        }
1✔
2682
        // Place into the map since it was not there.
2683
        c.leaf.smap[subj] = 1
12✔
2684
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2685
}
2686

2687
// Used to force remove a subject from the subject map.
2688
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2689
        c.mu.Lock()
1✔
2690
        defer c.mu.Unlock()
1✔
2691

1✔
2692
        if c.leaf.smap == nil {
1✔
2693
                return
×
2694
        }
×
2695
        n := c.leaf.smap[subj]
1✔
2696
        if n == 0 {
1✔
2697
                return
×
2698
        }
×
2699
        n--
1✔
2700
        if n == 0 {
2✔
2701
                // Remove is now zero
1✔
2702
                delete(c.leaf.smap, subj)
1✔
2703
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2704
        } else {
1✔
2705
                c.leaf.smap[subj] = n
×
2706
        }
×
2707
}
2708

2709
// Send the subscription interest change to the other side.
2710
// Lock should be held.
2711
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
10,153✔
2712
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
10,153✔
2713
        if c.isSpokeLeafNode() {
12,694✔
2714
                checkPerms := true
2,541✔
2715
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
4,082✔
2716
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,541✔
2717
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,541✔
2718
                                strings.HasPrefix(key, gwReplyPrefix) {
1,632✔
2719
                                checkPerms = false
91✔
2720
                        }
91✔
2721
                }
2722
                if checkPerms {
4,991✔
2723
                        var subject string
2,450✔
2724
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,945✔
2725
                                subject = key[:sep]
495✔
2726
                        } else {
2,450✔
2727
                                subject = key
1,955✔
2728
                        }
1,955✔
2729
                        if !c.canSubscribe(subject) {
2,459✔
2730
                                return
9✔
2731
                        }
9✔
2732
                }
2733
        }
2734
        // If we are here we can send over to the other side.
2735
        _b := [64]byte{}
10,144✔
2736
        b := bytes.NewBuffer(_b[:0])
10,144✔
2737
        c.writeLeafSub(b, key, n)
10,144✔
2738
        c.enqueueProto(b.Bytes())
10,144✔
2739
}
2740

2741
// Helper function to build the key.
2742
func keyFromSub(sub *subscription) string {
47,891✔
2743
        var sb strings.Builder
47,891✔
2744
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
47,891✔
2745
        sb.Write(sub.subject)
47,891✔
2746
        if sub.queue != nil {
51,742✔
2747
                // Just make the key subject spc group, e.g. 'foo bar'
3,851✔
2748
                sb.WriteByte(' ')
3,851✔
2749
                sb.Write(sub.queue)
3,851✔
2750
        }
3,851✔
2751
        return sb.String()
47,891✔
2752
}
2753

2754
const (
2755
        keyRoutedSub         = "R"
2756
        keyRoutedSubByte     = 'R'
2757
        keyRoutedLeafSub     = "L"
2758
        keyRoutedLeafSubByte = 'L'
2759
)
2760

2761
// Helper function to build the key that prevents collisions between normal
2762
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2763
// Keys will look like this:
2764
// "R foo"          -> plain routed sub on "foo"
2765
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2766
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2767
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2768
func keyFromSubWithOrigin(sub *subscription) string {
720,504✔
2769
        var sb strings.Builder
720,504✔
2770
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
720,504✔
2771
        leaf := len(sub.origin) > 0
720,504✔
2772
        if leaf {
737,394✔
2773
                sb.WriteByte(keyRoutedLeafSubByte)
16,890✔
2774
        } else {
720,504✔
2775
                sb.WriteByte(keyRoutedSubByte)
703,614✔
2776
        }
703,614✔
2777
        sb.WriteByte(' ')
720,504✔
2778
        sb.Write(sub.subject)
720,504✔
2779
        if sub.queue != nil {
749,641✔
2780
                sb.WriteByte(' ')
29,137✔
2781
                sb.Write(sub.queue)
29,137✔
2782
        }
29,137✔
2783
        if leaf {
737,394✔
2784
                sb.WriteByte(' ')
16,890✔
2785
                sb.Write(sub.origin)
16,890✔
2786
        }
16,890✔
2787
        return sb.String()
720,504✔
2788
}
2789

2790
// Lock should be held.
2791
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
37,725✔
2792
        if key == _EMPTY_ {
37,725✔
2793
                return
×
2794
        }
×
2795
        if n > 0 {
71,632✔
2796
                w.WriteString("LS+ " + key)
33,907✔
2797
                // Check for queue semantics, if found write n.
33,907✔
2798
                if strings.Contains(key, " ") {
36,241✔
2799
                        w.WriteString(" ")
2,334✔
2800
                        var b [12]byte
2,334✔
2801
                        var i = len(b)
2,334✔
2802
                        for l := n; l > 0; l /= 10 {
5,584✔
2803
                                i--
3,250✔
2804
                                b[i] = digits[l%10]
3,250✔
2805
                        }
3,250✔
2806
                        w.Write(b[i:])
2,334✔
2807
                        if c.trace {
2,334✔
2808
                                arg := fmt.Sprintf("%s %d", key, n)
×
2809
                                c.traceOutOp("LS+", []byte(arg))
×
2810
                        }
×
2811
                } else if c.trace {
31,776✔
2812
                        c.traceOutOp("LS+", []byte(key))
203✔
2813
                }
203✔
2814
        } else {
3,818✔
2815
                w.WriteString("LS- " + key)
3,818✔
2816
                if c.trace {
3,832✔
2817
                        c.traceOutOp("LS-", []byte(key))
14✔
2818
                }
14✔
2819
        }
2820
        w.WriteString(CR_LF)
37,725✔
2821
}
2822

2823
// processLeafSub will process an inbound sub request for the remote leaf node.
2824
func (c *client) processLeafSub(argo []byte) (err error) {
33,615✔
2825
        // Indicate activity.
33,615✔
2826
        c.in.subs++
33,615✔
2827

33,615✔
2828
        srv := c.srv
33,615✔
2829
        if srv == nil {
33,615✔
2830
                return nil
×
2831
        }
×
2832

2833
        // Copy so we do not reference a potentially large buffer
2834
        arg := make([]byte, len(argo))
33,615✔
2835
        copy(arg, argo)
33,615✔
2836

33,615✔
2837
        args := splitArg(arg)
33,615✔
2838
        sub := &subscription{client: c}
33,615✔
2839

33,615✔
2840
        delta := int32(1)
33,615✔
2841
        switch len(args) {
33,615✔
2842
        case 1:
31,305✔
2843
                sub.queue = nil
31,305✔
2844
        case 3:
2,310✔
2845
                sub.queue = args[1]
2,310✔
2846
                sub.qw = int32(parseSize(args[2]))
2,310✔
2847
                // TODO: (ik) We should have a non empty queue name and a queue
2,310✔
2848
                // weight >= 1. For 2.11, we may want to return an error if that
2,310✔
2849
                // is not the case, but for now just overwrite `delta` if queue
2,310✔
2850
                // weight is greater than 1 (it is possible after a reconnect/
2,310✔
2851
                // server restart to receive a queue weight > 1 for a new sub).
2,310✔
2852
                if sub.qw > 1 {
4,014✔
2853
                        delta = sub.qw
1,704✔
2854
                }
1,704✔
2855
        default:
×
2856
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2857
        }
2858
        sub.subject = args[0]
33,615✔
2859

33,615✔
2860
        c.mu.Lock()
33,615✔
2861
        if c.isClosed() {
33,638✔
2862
                c.mu.Unlock()
23✔
2863
                return nil
23✔
2864
        }
23✔
2865

2866
        acc := c.acc
33,592✔
2867
        // Guard against LS+ arriving before CONNECT has been processed, which
33,592✔
2868
        // can happen when compression is enabled.
33,592✔
2869
        if acc == nil {
33,592✔
2870
                c.mu.Unlock()
×
2871
                c.sendErr("Authorization Violation")
×
2872
                c.closeConnection(ProtocolViolation)
×
2873
                return nil
×
2874
        }
×
2875
        // Check if we have a loop.
2876
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
33,592✔
2877

33,592✔
2878
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
33,599✔
2879
                c.mu.Unlock()
7✔
2880
                c.handleLeafNodeLoop(true)
7✔
2881
                return nil
7✔
2882
        }
7✔
2883

2884
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2885
        checkPerms := true
33,585✔
2886
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
64,151✔
2887
                if ldsPrefix ||
30,566✔
2888
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
30,566✔
2889
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
32,640✔
2890
                        checkPerms = false
2,074✔
2891
                }
2,074✔
2892
        }
2893

2894
        // If we are a hub check that we can publish to this subject.
2895
        if checkPerms {
65,096✔
2896
                subj := string(sub.subject)
31,511✔
2897
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
31,870✔
2898
                        c.mu.Unlock()
359✔
2899
                        c.leafSubPermViolation(sub.subject)
359✔
2900
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
359✔
2901
                        return nil
359✔
2902
                }
359✔
2903
        }
2904

2905
        // Check if we have a maximum on the number of subscriptions.
2906
        if c.subsAtLimit() {
33,234✔
2907
                c.mu.Unlock()
8✔
2908
                c.maxSubsExceeded()
8✔
2909
                return nil
8✔
2910
        }
8✔
2911

2912
        // If we have an origin cluster associated mark that in the sub.
2913
        if rc := c.remoteCluster(); rc != _EMPTY_ {
62,508✔
2914
                sub.origin = []byte(rc)
29,290✔
2915
        }
29,290✔
2916

2917
        // Like Routes, we store local subs by account and subject and optionally queue name.
2918
        // If we have a queue it will have a trailing weight which we do not want.
2919
        if sub.queue != nil {
35,209✔
2920
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,991✔
2921
        } else {
33,218✔
2922
                sub.sid = arg
31,227✔
2923
        }
31,227✔
2924
        key := bytesToString(sub.sid)
33,218✔
2925
        osub := c.subs[key]
33,218✔
2926
        if osub == nil {
64,902✔
2927
                c.subs[key] = sub
31,684✔
2928
                // Now place into the account sl.
31,684✔
2929
                if err := acc.sl.Insert(sub); err != nil {
31,684✔
2930
                        delete(c.subs, key)
×
2931
                        c.mu.Unlock()
×
2932
                        c.Errorf("Could not insert subscription: %v", err)
×
2933
                        c.sendErr("Invalid Subscription")
×
2934
                        return nil
×
2935
                }
×
2936
        } else if sub.queue != nil {
3,067✔
2937
                // For a queue we need to update the weight.
1,533✔
2938
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,533✔
2939
                atomic.StoreInt32(&osub.qw, sub.qw)
1,533✔
2940
                acc.sl.UpdateRemoteQSub(osub)
1,533✔
2941
        }
1,533✔
2942
        spoke := c.isSpokeLeafNode()
33,218✔
2943
        c.mu.Unlock()
33,218✔
2944

33,218✔
2945
        // Only add in shadow subs if a new sub or qsub.
33,218✔
2946
        if osub == nil {
64,902✔
2947
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
31,684✔
2948
                        c.Errorf(err.Error())
×
2949
                }
×
2950
        }
2951

2952
        // If we are not solicited, treat leaf node subscriptions similar to a
2953
        // client subscription, meaning we forward them to routes, gateways and
2954
        // other leaf nodes as needed.
2955
        if !spoke {
44,981✔
2956
                // If we are routing add to the route map for the associated account.
11,763✔
2957
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,763✔
2958
                if srv.gateway.enabled {
13,297✔
2959
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,534✔
2960
                }
1,534✔
2961
        }
2962
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2963
        // and non-solicited state in this call so we will do the right thing.
2964
        acc.updateLeafNodes(sub, delta)
33,218✔
2965

33,218✔
2966
        return nil
33,218✔
2967
}
2968

2969
// If the leafnode is a solicited, set the connect delay based on default
2970
// or private option (for tests). Sends the error to the other side, log and
2971
// close the connection.
2972
func (c *client) handleLeafNodeLoop(sendErr bool) {
18✔
2973
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
18✔
2974
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
18✔
2975
        if sendErr {
27✔
2976
                c.sendErr(errTxt)
9✔
2977
        }
9✔
2978

2979
        c.Errorf(errTxt)
18✔
2980
        // If we are here with "sendErr" false, it means that this is the server
18✔
2981
        // that received the error. The other side will have closed the connection,
18✔
2982
        // but does not hurt to close here too.
18✔
2983
        c.closeConnection(ProtocolViolation)
18✔
2984
}
2985

2986
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2987
func (c *client) processLeafUnsub(arg []byte) error {
3,451✔
2988
        // Indicate any activity, so pub and sub or unsubs.
3,451✔
2989
        c.in.subs++
3,451✔
2990

3,451✔
2991
        srv := c.srv
3,451✔
2992

3,451✔
2993
        c.mu.Lock()
3,451✔
2994
        if c.isClosed() {
3,517✔
2995
                c.mu.Unlock()
66✔
2996
                return nil
66✔
2997
        }
66✔
2998

2999
        acc := c.acc
3,385✔
3000
        // Guard against LS- arriving before CONNECT has been processed.
3,385✔
3001
        if acc == nil {
3,385✔
3002
                c.mu.Unlock()
×
3003
                c.sendErr("Authorization Violation")
×
3004
                c.closeConnection(ProtocolViolation)
×
3005
                return nil
×
3006
        }
×
3007

3008
        spoke := c.isSpokeLeafNode()
3,385✔
3009
        // We store local subs by account and subject and optionally queue name.
3,385✔
3010
        // LS- will have the arg exactly as the key.
3,385✔
3011
        sub, ok := c.subs[string(arg)]
3,385✔
3012
        if !ok {
3,398✔
3013
                // If not found, don't try to update routes/gws/leaf nodes.
13✔
3014
                c.mu.Unlock()
13✔
3015
                return nil
13✔
3016
        }
13✔
3017
        delta := int32(1)
3,372✔
3018
        if len(sub.queue) > 0 {
3,791✔
3019
                delta = sub.qw
419✔
3020
        }
419✔
3021
        c.mu.Unlock()
3,372✔
3022

3,372✔
3023
        c.unsubscribe(acc, sub, true, true)
3,372✔
3024
        if !spoke {
4,405✔
3025
                // If we are routing subtract from the route map for the associated account.
1,033✔
3026
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,033✔
3027
                // Gateways
1,033✔
3028
                if srv.gateway.enabled {
1,307✔
3029
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
274✔
3030
                }
274✔
3031
        }
3032
        // Now check on leafnode updates for other leaf nodes.
3033
        acc.updateLeafNodes(sub, -delta)
3,372✔
3034
        return nil
3,372✔
3035
}
3036

3037
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
560✔
3038
        // Unroll splitArgs to avoid runtime/heap issues
560✔
3039
        args := c.argsa[:0]
560✔
3040
        start := -1
560✔
3041
        for i, b := range arg {
37,184✔
3042
                switch b {
36,624✔
3043
                case ' ', '\t', '\r', '\n':
1,611✔
3044
                        if start >= 0 {
3,222✔
3045
                                args = append(args, arg[start:i])
1,611✔
3046
                                start = -1
1,611✔
3047
                        }
1,611✔
3048
                default:
35,013✔
3049
                        if start < 0 {
37,184✔
3050
                                start = i
2,171✔
3051
                        }
2,171✔
3052
                }
3053
        }
3054
        if start >= 0 {
1,120✔
3055
                args = append(args, arg[start:])
560✔
3056
        }
560✔
3057

3058
        c.pa.arg = arg
560✔
3059
        switch len(args) {
560✔
3060
        case 0, 1, 2:
×
3061
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
3062
        case 3:
87✔
3063
                c.pa.reply = nil
87✔
3064
                c.pa.queues = nil
87✔
3065
                c.pa.hdb = args[1]
87✔
3066
                c.pa.hdr = parseSize(args[1])
87✔
3067
                c.pa.szb = args[2]
87✔
3068
                c.pa.size = parseSize(args[2])
87✔
3069
        case 4:
459✔
3070
                c.pa.reply = args[1]
459✔
3071
                c.pa.queues = nil
459✔
3072
                c.pa.hdb = args[2]
459✔
3073
                c.pa.hdr = parseSize(args[2])
459✔
3074
                c.pa.szb = args[3]
459✔
3075
                c.pa.size = parseSize(args[3])
459✔
3076
        default:
14✔
3077
                // args[1] is our reply indicator. Should be + or | normally.
14✔
3078
                if len(args[1]) != 1 {
14✔
3079
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3080
                }
×
3081
                switch args[1][0] {
14✔
3082
                case '+':
4✔
3083
                        c.pa.reply = args[2]
4✔
3084
                case '|':
10✔
3085
                        c.pa.reply = nil
10✔
3086
                default:
×
3087
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3088
                }
3089
                // Grab header size.
3090
                c.pa.hdb = args[len(args)-2]
14✔
3091
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
3092

14✔
3093
                // Grab size.
14✔
3094
                c.pa.szb = args[len(args)-1]
14✔
3095
                c.pa.size = parseSize(c.pa.szb)
14✔
3096

14✔
3097
                // Grab queue names.
14✔
3098
                if c.pa.reply != nil {
18✔
3099
                        c.pa.queues = args[3 : len(args)-2]
4✔
3100
                } else {
14✔
3101
                        c.pa.queues = args[2 : len(args)-2]
10✔
3102
                }
10✔
3103
        }
3104
        if c.pa.hdr < 0 {
560✔
3105
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
3106
        }
×
3107
        if c.pa.size < 0 {
560✔
3108
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
3109
        }
×
3110
        if c.pa.hdr > c.pa.size {
560✔
3111
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3112
        }
×
3113
        maxPayload := atomic.LoadInt32(&c.mpay)
560✔
3114
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
561✔
3115
                c.maxPayloadViolation(c.pa.size, maxPayload)
1✔
3116
                return ErrMaxPayload
1✔
3117
        }
1✔
3118

3119
        // Common ones processed after check for arg length
3120
        c.pa.subject = args[0]
559✔
3121

559✔
3122
        return nil
559✔
3123
}
3124

3125
func (c *client) processLeafMsgArgs(arg []byte) error {
87,364✔
3126
        // Unroll splitArgs to avoid runtime/heap issues
87,364✔
3127
        args := c.argsa[:0]
87,364✔
3128
        start := -1
87,364✔
3129
        for i, b := range arg {
2,881,037✔
3130
                switch b {
2,793,673✔
3131
                case ' ', '\t', '\r', '\n':
138,909✔
3132
                        if start >= 0 {
277,818✔
3133
                                args = append(args, arg[start:i])
138,909✔
3134
                                start = -1
138,909✔
3135
                        }
138,909✔
3136
                default:
2,654,764✔
3137
                        if start < 0 {
2,881,037✔
3138
                                start = i
226,273✔
3139
                        }
226,273✔
3140
                }
3141
        }
3142
        if start >= 0 {
174,728✔
3143
                args = append(args, arg[start:])
87,364✔
3144
        }
87,364✔
3145

3146
        c.pa.arg = arg
87,364✔
3147
        switch len(args) {
87,364✔
3148
        case 0, 1:
×
3149
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3150
        case 2:
58,536✔
3151
                c.pa.reply = nil
58,536✔
3152
                c.pa.queues = nil
58,536✔
3153
                c.pa.szb = args[1]
58,536✔
3154
                c.pa.size = parseSize(args[1])
58,536✔
3155
        case 3:
6,270✔
3156
                c.pa.reply = args[1]
6,270✔
3157
                c.pa.queues = nil
6,270✔
3158
                c.pa.szb = args[2]
6,270✔
3159
                c.pa.size = parseSize(args[2])
6,270✔
3160
        default:
22,558✔
3161
                // args[1] is our reply indicator. Should be + or | normally.
22,558✔
3162
                if len(args[1]) != 1 {
22,558✔
3163
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3164
                }
×
3165
                switch args[1][0] {
22,558✔
3166
                case '+':
159✔
3167
                        c.pa.reply = args[2]
159✔
3168
                case '|':
22,399✔
3169
                        c.pa.reply = nil
22,399✔
3170
                default:
×
3171
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3172
                }
3173
                // Grab size.
3174
                c.pa.szb = args[len(args)-1]
22,558✔
3175
                c.pa.size = parseSize(c.pa.szb)
22,558✔
3176

22,558✔
3177
                // Grab queue names.
22,558✔
3178
                if c.pa.reply != nil {
22,717✔
3179
                        c.pa.queues = args[3 : len(args)-1]
159✔
3180
                } else {
22,558✔
3181
                        c.pa.queues = args[2 : len(args)-1]
22,399✔
3182
                }
22,399✔
3183
        }
3184
        if c.pa.size < 0 {
87,364✔
3185
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3186
        }
×
3187
        maxPayload := atomic.LoadInt32(&c.mpay)
87,364✔
3188
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
87,365✔
3189
                c.maxPayloadViolation(c.pa.size, maxPayload)
1✔
3190
                return ErrMaxPayload
1✔
3191
        }
1✔
3192

3193
        // Common ones processed after check for arg length
3194
        c.pa.subject = args[0]
87,363✔
3195

87,363✔
3196
        return nil
87,363✔
3197
}
3198

3199
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3200
func (c *client) processInboundLeafMsg(msg []byte) {
86,366✔
3201
        // Update statistics
86,366✔
3202
        // The msg includes the CR_LF, so pull back out for accounting.
86,366✔
3203
        c.in.msgs++
86,366✔
3204
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
86,366✔
3205

86,366✔
3206
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
86,366✔
3207

86,366✔
3208
        // Mostly under testing scenarios.
86,366✔
3209
        if srv == nil || acc == nil {
86,367✔
3210
                return
1✔
3211
        }
1✔
3212

3213
        // Check that leaf messages respect the subject permissions.
3214
        if c.perms != nil && !c.leafMsgAllowed() {
86,370✔
3215
                c.leafPubPermViolation(c.pa.subject)
5✔
3216
                return
5✔
3217
        }
5✔
3218

3219
        // Match the subscriptions. We will use our own L1 map if
3220
        // it's still valid, avoiding contention on the shared sublist.
3221
        var r *SublistResult
86,360✔
3222
        var ok bool
86,360✔
3223

86,360✔
3224
        genid := atomic.LoadUint64(&c.acc.sl.genid)
86,360✔
3225
        if genid == c.in.genid && c.in.results != nil {
170,269✔
3226
                r, ok = c.in.results[subject]
83,909✔
3227
        } else {
86,360✔
3228
                // Reset our L1 completely.
2,451✔
3229
                c.in.results = make(map[string]*SublistResult)
2,451✔
3230
                c.in.genid = genid
2,451✔
3231
        }
2,451✔
3232

3233
        // Go back to the sublist data structure.
3234
        if !ok {
142,551✔
3235
                r = c.acc.sl.Match(subject)
56,191✔
3236
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
56,191✔
3237
                if len(c.in.results) >= maxResultCacheSize {
57,717✔
3238
                        n := 0
1,526✔
3239
                        for subj := range c.in.results {
51,884✔
3240
                                delete(c.in.results, subj)
50,358✔
3241
                                if n++; n > pruneSize {
51,884✔
3242
                                        break
1,526✔
3243
                                }
3244
                        }
3245
                }
3246
                // Then add the new cache entry.
3247
                c.in.results[subject] = r
56,191✔
3248
        }
3249

3250
        // Collect queue names if needed.
3251
        var qnames [][]byte
86,360✔
3252

86,360✔
3253
        // Check for no interest, short circuit if so.
86,360✔
3254
        // This is the fanout scale.
86,360✔
3255
        if len(r.psubs)+len(r.qsubs) > 0 {
172,456✔
3256
                flag := pmrNoFlag
86,096✔
3257
                // If we have queue subs in this cluster, then if we run in gateway
86,096✔
3258
                // mode and the remote gateways have queue subs, then we need to
86,096✔
3259
                // collect the queue groups this message was sent to so that we
86,096✔
3260
                // exclude them when sending to gateways.
86,096✔
3261
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
86,096✔
3262
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
98,402✔
3263
                        flag |= pmrCollectQueueNames
12,306✔
3264
                }
12,306✔
3265
                // If this is a mapped subject that means the mapped interest
3266
                // is what got us here, but this might not have a queue designation
3267
                // If that is the case, make sure we ignore to process local queue subscribers.
3268
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
86,432✔
3269
                        flag |= pmrIgnoreEmptyQueueFilter
336✔
3270
                }
336✔
3271
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
86,096✔
3272
        }
3273

3274
        // Now deal with gateways
3275
        if c.srv.gateway.enabled {
99,699✔
3276
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,339✔
3277
        }
13,339✔
3278
}
3279

3280
// Checks whether the inbound leaf message is allowed by the
3281
// connection's permissions. On the hub side this enforces what
3282
// the remote leaf may publish. On the spoke side this enforces
3283
// import restrictions such as deny_imports.
3284
func (c *client) leafMsgAllowed() bool {
82,613✔
3285
        wireSubject := c.pa.subject
82,613✔
3286
        if len(c.pa.mapped) > 0 {
82,949✔
3287
                // Mappings rewrite c.pa.subject to the internal
336✔
3288
                // destination. For leaf ACLs, need to check
336✔
3289
                // the original wire subject from the remote side.
336✔
3290
                wireSubject = c.pa.mapped
336✔
3291
        }
336✔
3292
        // Strip any gateway routing prefix for the permission check.
3293
        subjectToCheck, isGW := getGWRoutedSubjectOrSelf(wireSubject)
82,613✔
3294

82,613✔
3295
        // Service-import replies (_R_), JS ack subjects ($JS.ACK.)
82,613✔
3296
        // are internal routing subjects forwarded via LS+ without
82,613✔
3297
        // permission checks.
82,613✔
3298
        if isServiceReply(subjectToCheck) || isJSAckSubject(subjectToCheck) {
82,657✔
3299
                return true
44✔
3300
        }
44✔
3301

3302
        c.mu.Lock()
82,569✔
3303
        defer c.mu.Unlock()
82,569✔
3304

82,569✔
3305
        if c.isSpokeLeafNode() {
126,220✔
3306
                // Gateway routed replies are forwarded without
43,651✔
3307
                // permission checks.
43,651✔
3308
                if isGW || c.leafReceiveAllowed(subjectToCheck) {
87,300✔
3309
                        return true
43,649✔
3310
                }
43,649✔
3311
        } else if c.leafSendAllowed(subjectToCheck) {
77,830✔
3312
                return true
38,912✔
3313
        }
38,912✔
3314
        // Check tracked reply permissions (allow_responses).
3315
        // Use the pre-strip subject since deliverMsg tracks
3316
        // replies under the original form, which includes
3317
        // the GW routing prefix for routed requests.
3318
        return c.responseAllowed(bytesToString(wireSubject))
8✔
3319
}
3320

3321
// Returns true if the leaf side ACLs allow importing this subject,
3322
// based on the permissions received over INFO and any local deny_imports.
3323
// Lock must be held.
3324
func (c *client) leafReceiveAllowed(subject []byte) bool {
43,651✔
3325
        return c.canSubscribe(bytesToString(subject))
43,651✔
3326
}
43,651✔
3327

3328
// Returns true if the hub side ACLs allow the remote leaf to send
3329
// this subject.
3330
// Lock must be held.
3331
func (c *client) leafSendAllowed(bsubject []byte) bool {
38,918✔
3332
        // Use the original export ACL captured for this accepted leaf.
38,918✔
3333
        // The live perms also contain additional JetStream denies used by
38,918✔
3334
        // the normal forwarding path, and applying them here would reject
38,918✔
3335
        // legitimate inbound JS API requests.
38,918✔
3336
        subject := bytesToString(bsubject)
38,918✔
3337
        perms := c.opts.Export
38,918✔
3338
        if perms == nil || (perms.Allow == nil && perms.Deny == nil) {
77,811✔
3339
                return true
38,893✔
3340
        }
38,893✔
3341

3342
        allowed := true
25✔
3343
        if perms.Allow != nil && !strings.HasPrefix(subject, mqttPrefix) {
36✔
3344
                allowed = false
11✔
3345
                for _, allowSubj := range perms.Allow {
21✔
3346
                        if matchLiteral(subject, allowSubj) {
16✔
3347
                                allowed = true
6✔
3348
                                break
6✔
3349
                        }
3350
                }
3351
        }
3352

3353
        if allowed && len(perms.Deny) > 0 {
39✔
3354
                for _, denySubj := range perms.Deny {
40✔
3355
                        if matchLiteral(subject, denySubj) {
27✔
3356
                                allowed = false
1✔
3357
                                break
1✔
3358
                        }
3359
                }
3360
        }
3361
        return allowed
25✔
3362
}
3363

3364
// Handles a subscription permission violation.
3365
// See leafPermViolation() for details.
3366
func (c *client) leafSubPermViolation(subj []byte) {
359✔
3367
        c.leafPermViolation(false, subj)
359✔
3368
}
359✔
3369

3370
// Handles a publish permission violation.
3371
// See leafPermViolation() for details.
3372
func (c *client) leafPubPermViolation(subj []byte) {
5✔
3373
        c.leafPermViolation(true, subj)
5✔
3374
}
5✔
3375

3376
// Common function to process publish or subscribe leafnode permission violation.
3377
// Sends the permission violation error to the remote, logs it and closes the connection.
3378
// If this is from a server soliciting, the reconnection will be delayed.
3379
func (c *client) leafPermViolation(pub bool, subj []byte) {
364✔
3380
        if c.isSpokeLeafNode() {
725✔
3381
                // For spokes these are no-ops since the hub server told us our permissions.
361✔
3382
                // We just need to not send these over to the other side since we will get cutoff.
361✔
3383
                return
361✔
3384
        }
361✔
3385
        // FIXME(dlc) ?
3386
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
3✔
3387
        var action string
3✔
3388
        if pub {
6✔
3389
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
3✔
3390
                action = "Publish"
3✔
3391
        } else {
3✔
3392
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3393
                action = "Subscription"
×
3394
        }
×
3395
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
3✔
3396
        // TODO: add a new close reason that is more appropriate?
3✔
3397
        c.closeConnection(ProtocolViolation)
3✔
3398
}
3399

3400
// Invoked from generic processErr() for LEAF connections.
3401
func (c *client) leafProcessErr(errStr string) {
50✔
3402
        // Check if we got a cluster name collision.
50✔
3403
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
53✔
3404
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3405
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3406
                return
3✔
3407
        }
3✔
3408
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
48✔
3409
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3410
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3411
                return
1✔
3412
        }
1✔
3413

3414
        // We will look for Loop detected error coming from the other side.
3415
        // If we solicit, set the connect delay.
3416
        if !strings.Contains(errStr, "Loop detected") {
83✔
3417
                return
37✔
3418
        }
37✔
3419
        c.handleLeafNodeLoop(false)
9✔
3420
}
3421

3422
// If this leaf connection solicits, sets the connect delay to the given value,
3423
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3424
// Returns the connection's account name and delay.
3425
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
25✔
3426
        c.mu.Lock()
25✔
3427
        if c.isSolicitedLeafNode() {
39✔
3428
                if s := c.srv; s != nil {
28✔
3429
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
19✔
3430
                                delay = srvdelay
5✔
3431
                        }
5✔
3432
                }
3433
                c.leaf.remote.setConnectDelay(delay)
14✔
3434
        }
3435
        var accName string
25✔
3436
        if c.acc != nil {
50✔
3437
                accName = c.acc.Name
25✔
3438
        }
25✔
3439
        c.mu.Unlock()
25✔
3440
        return accName, delay
25✔
3441
}
3442

3443
// For the given remote Leafnode configuration, this function returns
3444
// if TLS is required, and if so, will return a clone of the TLS Config
3445
// (since some fields will be changed during handshake), the TLS server
3446
// name that is remembered, and the TLS timeout.
3447
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,984✔
3448
        var (
1,984✔
3449
                tlsConfig  *tls.Config
1,984✔
3450
                tlsName    string
1,984✔
3451
                tlsTimeout float64
1,984✔
3452
        )
1,984✔
3453

1,984✔
3454
        remote.RLock()
1,984✔
3455
        defer remote.RUnlock()
1,984✔
3456

1,984✔
3457
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,984✔
3458
        if tlsRequired {
2,062✔
3459
                if remote.TLSConfig != nil {
129✔
3460
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3461
                } else {
78✔
3462
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
27✔
3463
                }
27✔
3464
                tlsName = remote.tlsName
78✔
3465
                tlsTimeout = remote.TLSTimeout
78✔
3466
                if tlsTimeout == 0 {
122✔
3467
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
44✔
3468
                }
44✔
3469
        }
3470

3471
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,984✔
3472
}
3473

3474
// Initiates the LeafNode Websocket connection by:
3475
// - doing the TLS handshake if needed
3476
// - sending the HTTP request
3477
// - waiting for the HTTP response
3478
//
3479
// Since some bufio reader is used to consume the HTTP response, this function
3480
// returns the slice of buffered bytes (if any) so that the readLoop that will
3481
// be started after that consume those first before reading from the socket.
3482
// The boolean
3483
//
3484
// Lock held on entry.
3485
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3486
        remote.RLock()
50✔
3487
        compress := remote.Websocket.Compression
50✔
3488
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3489
        noMasking := remote.Websocket.NoMasking
50✔
3490
        infoTimeout := remote.FirstInfoTimeout
50✔
3491
        remote.RUnlock()
50✔
3492
        // Will do the client-side TLS handshake if needed.
50✔
3493
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3494
        if err != nil {
54✔
3495
                // 0 will indicate that the connection was already closed
4✔
3496
                return nil, 0, err
4✔
3497
        }
4✔
3498

3499
        // For http request, we need the passed URL to contain either http or https scheme.
3500
        scheme := "http"
46✔
3501
        if tlsRequired {
54✔
3502
                scheme = "https"
8✔
3503
        }
8✔
3504
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3505
        // create a LEAF connection, not a CLIENT.
3506
        // In case we use the user's URL path in the future, make sure we append the user's
3507
        // path to our `/leafnode` path.
3508
        lpath := leafNodeWSPath
46✔
3509
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3510
                if curPath[0] == '/' {
42✔
3511
                        curPath = curPath[1:]
21✔
3512
                }
21✔
3513
                lpath = path.Join(curPath, lpath)
21✔
3514
        } else {
25✔
3515
                lpath = lpath[1:]
25✔
3516
        }
25✔
3517
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3518
        u, _ := url.Parse(ustr)
46✔
3519
        req := &http.Request{
46✔
3520
                Method:     "GET",
46✔
3521
                URL:        u,
46✔
3522
                Proto:      "HTTP/1.1",
46✔
3523
                ProtoMajor: 1,
46✔
3524
                ProtoMinor: 1,
46✔
3525
                Header:     make(http.Header),
46✔
3526
                Host:       u.Host,
46✔
3527
        }
46✔
3528
        wsKey, err := wsMakeChallengeKey()
46✔
3529
        if err != nil {
46✔
3530
                return nil, WriteError, err
×
3531
        }
×
3532

3533
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3534
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3535
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3536
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3537
        if compress {
55✔
3538
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3539
        }
9✔
3540
        if noMasking {
56✔
3541
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3542
        }
10✔
3543
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3544
        if err := req.Write(c.nc); err != nil {
46✔
3545
                return nil, WriteError, err
×
3546
        }
×
3547

3548
        var resp *http.Response
46✔
3549

46✔
3550
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3551
        resp, err = http.ReadResponse(br, req)
46✔
3552
        if err == nil &&
46✔
3553
                (resp.StatusCode != 101 ||
46✔
3554
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3555
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3556
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3557

1✔
3558
                err = fmt.Errorf("invalid websocket connection")
1✔
3559
        }
1✔
3560
        // Check compression extension...
3561
        if err == nil && c.ws.compress {
55✔
3562
                // Check that not only permessage-deflate extension is present, but that
9✔
3563
                // we also have server and client no context take over.
9✔
3564
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3565

9✔
3566
                // If server does not support compression, then simply disable it in our side.
9✔
3567
                if !srvCompress {
13✔
3568
                        c.ws.compress = false
4✔
3569
                } else if !noCtxTakeover {
9✔
3570
                        err = fmt.Errorf("compression negotiation error")
×
3571
                }
×
3572
        }
3573
        // Same for no masking...
3574
        if err == nil && noMasking {
56✔
3575
                // Check if server accepts no masking
10✔
3576
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3577
                        // Nope, need to mask our writes as any client would do.
1✔
3578
                        c.ws.maskwrite = true
1✔
3579
                }
1✔
3580
        }
3581
        if resp != nil {
76✔
3582
                resp.Body.Close()
30✔
3583
        }
30✔
3584
        if err != nil {
63✔
3585
                return nil, ReadError, err
17✔
3586
        }
17✔
3587
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3588

29✔
3589
        var preBuf []byte
29✔
3590
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3591
        if n := br.Buffered(); n != 0 {
29✔
3592
                preBuf, _ = br.Peek(n)
×
3593
        }
×
3594
        return preBuf, 0, nil
29✔
3595
}
3596

3597
const connectProcessTimeout = 2 * time.Second
3598

3599
// This is invoked for remote LEAF remote connections after processing the INFO
3600
// protocol.
3601
func (s *Server) leafNodeResumeConnectProcess(c *client) {
700✔
3602
        clusterName := s.ClusterName()
700✔
3603

700✔
3604
        c.mu.Lock()
700✔
3605
        if c.isClosed() {
700✔
3606
                c.mu.Unlock()
×
3607
                return
×
3608
        }
×
3609
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
702✔
3610
                c.mu.Unlock()
2✔
3611
                c.closeConnection(WriteError)
2✔
3612
                return
2✔
3613
        }
2✔
3614

3615
        // Spin up the write loop.
3616
        s.startGoRoutine(func() { c.writeLoop() })
1,396✔
3617

3618
        // timeout leafNodeFinishConnectProcess
3619
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
698✔
3620
                c.mu.Lock()
×
3621
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3622
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3623
                        c.mu.Unlock()
×
3624
                        return
×
3625
                }
×
3626
                clearTimer(&c.ping.tmr)
×
3627
                closed := c.isClosed()
×
3628
                c.mu.Unlock()
×
3629
                if !closed {
×
3630
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3631
                        c.closeConnection(StaleConnection)
×
3632
                }
×
3633
        })
3634
        c.mu.Unlock()
698✔
3635
        c.Debugf("Remote leafnode connect msg sent")
698✔
3636
}
3637

3638
// This is invoked for remote LEAF connections after processing the INFO
3639
// protocol and leafNodeResumeConnectProcess.
3640
// This will send LS+ the CONNECT protocol and register the leaf node.
3641
func (s *Server) leafNodeFinishConnectProcess(c *client) {
663✔
3642
        c.mu.Lock()
663✔
3643
        if !c.flags.setIfNotSet(connectProcessFinished) {
663✔
3644
                c.mu.Unlock()
×
3645
                return
×
3646
        }
×
3647
        if c.isClosed() {
663✔
3648
                c.mu.Unlock()
×
3649
                s.removeLeafNodeConnection(c)
×
3650
                return
×
3651
        }
×
3652
        remote := c.leaf.remote
663✔
3653
        if remote == nil || c.acc == nil {
664✔
3654
                c.mu.Unlock()
1✔
3655
                c.sendErr("Authorization Violation")
1✔
3656
                c.closeConnection(ProtocolViolation)
1✔
3657
                return
1✔
3658
        }
1✔
3659
        // Check if we will need to send the system connect event.
3660
        remote.RLock()
662✔
3661
        sendSysConnectEvent := remote.Hub
662✔
3662
        remote.RUnlock()
662✔
3663

662✔
3664
        // Capture account before releasing lock
662✔
3665
        acc := c.acc
662✔
3666
        // cancel connectProcessTimeout
662✔
3667
        clearTimer(&c.ping.tmr)
662✔
3668
        c.mu.Unlock()
662✔
3669

662✔
3670
        // Make sure we register with the account here.
662✔
3671
        if err := c.registerWithAccount(acc); err != nil {
664✔
3672
                if err == ErrTooManyAccountConnections {
2✔
3673
                        c.maxAccountConnExceeded()
×
3674
                        return
×
3675
                } else if err == ErrLeafNodeLoop {
4✔
3676
                        c.handleLeafNodeLoop(true)
2✔
3677
                        return
2✔
3678
                }
2✔
3679
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3680
                c.closeConnection(ProtocolViolation)
×
3681
                return
×
3682
        }
3683
        if !s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false) {
660✔
3684
                // Was not added, could be because the remote configuration has been removed.
×
3685
                c.closeConnection(ClientClosed)
×
3686
                return
×
3687
        }
×
3688
        s.initLeafNodeSmapAndSendSubs(c)
660✔
3689
        if sendSysConnectEvent {
678✔
3690
                s.sendLeafNodeConnect(acc)
18✔
3691
        }
18✔
3692
        s.accountConnectEvent(c)
660✔
3693

660✔
3694
        // The above functions are not running under the client lock, so it is
660✔
3695
        // possible that between the time we have started the read/write loops
660✔
3696
        // and now, that the connection was closed. This would leave the closed
660✔
3697
        // LN connection possibly registered with the account and/or the server's
660✔
3698
        // leafs map. So check if connection is closed, and if so, manually cleanup.
660✔
3699
        c.mu.Lock()
660✔
3700
        closed := c.isClosed()
660✔
3701
        if !closed {
1,320✔
3702
                c.setFirstPingTimer()
660✔
3703
        }
660✔
3704
        c.mu.Unlock()
660✔
3705
        if closed {
660✔
3706
                s.removeLeafNodeConnection(c)
×
3707
                if prev := acc.removeClient(c); prev == 1 {
×
3708
                        s.decActiveAccounts()
×
3709
                }
×
3710
        }
3711
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc