• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24327339784

10 Apr 2026 02:22PM UTC coverage: 81.808% (-1.2%) from 82.972%
24327339784

push

github

web-flow
(2.14) [ADDED] Config reload: add/remove remote leafnodes (#7937)

The configuration reload now supports adding and/or removing remote
leafnodes. A remote is identified with the combination of its URLs list,
local account and credentials file name. This is what is used by the
server to detect changes for the remote leafnodes list.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

75037 of 91723 relevant lines covered (81.81%)

483084.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.26
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "regexp"
31
        "runtime"
32
        "strconv"
33
        "strings"
34
        "sync"
35
        "sync/atomic"
36
        "time"
37

38
        "github.com/klauspost/compress/s2"
39
        "github.com/nats-io/jwt/v2"
40
        "github.com/nats-io/nkeys"
41
        "github.com/nats-io/nuid"
42
)
43

44
const (
45
        // Warning when user configures leafnode TLS insecure
46
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
47

48
        // When a loop is detected, delay the reconnect of solicited connection.
49
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
50

51
        // When a server receives a message causing a permission violation, the
52
        // connection is closed and it won't attempt to reconnect for that long.
53
        leafNodeReconnectAfterPermViolation = 30 * time.Second
54

55
        // When we have the same cluster name as the hub.
56
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
57

58
        // Prefix for loop detection subject
59
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
60

61
        // Path added to URL to indicate to WS server that the connection is a
62
        // LEAF connection as opposed to a CLIENT.
63
        leafNodeWSPath = "/leafnode"
64

65
        // When a soliciting leafnode is rejected because it does not meet the
66
        // configured minimum version, delay the next reconnect attempt by this long.
67
        leafNodeMinVersionReconnectDelay = 5 * time.Second
68
)
69

70
type leaf struct {
71
        // We have any auth stuff here for solicited connections.
72
        remote *leafNodeCfg
73
        // isSpoke tells us what role we are playing.
74
        // Used when we receive a connection but otherside tells us they are a hub.
75
        isSpoke bool
76
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
77
        remoteCluster string
78
        // remoteServer holds onto the remote server's name or ID.
79
        remoteServer string
80
        // domain name of remote server
81
        remoteDomain string
82
        // account name of remote server
83
        remoteAccName string
84
        // Whether or not we want to propagate east-west interest from other LNs.
85
        isolated bool
86
        // Used to suppress sub and unsub interest. Same as routes but our audience
87
        // here is tied to this leaf node. This will hold all subscriptions except this
88
        // leaf nodes. This represents all the interest we want to send to the other side.
89
        smap map[string]int32
90
        // This map will contain all the subscriptions that have been added to the smap
91
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
92
        // race between processing of a sub where sub is added to account sublist but
93
        // updateSmap has not be called on that "thread", while in the LN readloop,
94
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
95
        // this subscription to smap. When processing of the sub then calls updateSmap,
96
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
97
        tsub  map[*subscription]struct{}
98
        tsubt *time.Timer
99
        // Selected compression mode, which may be different from the server configured mode.
100
        compression string
101
        // This is for GW map replies.
102
        gwSub *subscription
103
}
104

105
// Used for remote (solicited) leafnodes.
106
type leafNodeCfg struct {
107
        sync.RWMutex
108
        *RemoteLeafOpts
109
        urls           []*url.URL
110
        curURL         *url.URL
111
        tlsName        string
112
        username       string
113
        password       string
114
        perms          *Permissions
115
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
116
        jsMigrateTimer *time.Timer
117
        quitCh         chan struct{}
118
        removed        bool
119
        connInProgress bool
120
}
121

122
// Check to see if this is a solicited leafnode. We do special processing for solicited.
123
func (c *client) isSolicitedLeafNode() bool {
2,079✔
124
        return c.kind == LEAF && c.leaf.remote != nil
2,079✔
125
}
2,079✔
126

127
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
128
// connection leafnode where the otherside has declared itself to be the hub.
129
func (c *client) isSpokeLeafNode() bool {
10,791,637✔
130
        return c.kind == LEAF && c.leaf.isSpoke
10,791,637✔
131
}
10,791,637✔
132

133
func (c *client) isHubLeafNode() bool {
17,983✔
134
        return c.kind == LEAF && !c.leaf.isSpoke
17,983✔
135
}
17,983✔
136

137
func (c *client) isIsolatedLeafNode() bool {
11,577✔
138
        // TODO(nat): In future we may want to pass in and consider an isolation
11,577✔
139
        // group name here, which the hub and/or leaf could provide, so that we
11,577✔
140
        // can isolate away certain LNs but not others on an opt-in basis. For
11,577✔
141
        // now we will just isolate all LN interest until then.
11,577✔
142
        return c.kind == LEAF && c.leaf.isolated
11,577✔
143
}
11,577✔
144

145
// This will spin up go routines to solicit the remote leaf node connections.
146
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,219✔
147
        sysAccName := _EMPTY_
1,219✔
148
        sAcc := s.SystemAccount()
1,219✔
149
        if sAcc != nil {
2,415✔
150
                sysAccName = sAcc.Name
1,196✔
151
        }
1,196✔
152
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,582✔
153
                s.mu.Lock()
1,363✔
154
                remote := newLeafNodeCfg(r)
1,363✔
155
                creds := remote.Credentials
1,363✔
156
                accName := remote.LocalAccount
1,363✔
157
                if s.leafRemoteCfgs == nil {
2,581✔
158
                        s.leafRemoteCfgs = make(map[*leafNodeCfg]struct{})
1,218✔
159
                }
1,218✔
160
                s.leafRemoteCfgs[remote] = struct{}{}
1,363✔
161
                // Print notice if
1,363✔
162
                if isSysAccRemote {
1,458✔
163
                        if len(remote.DenyExports) > 0 {
96✔
164
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
165
                        }
1✔
166
                        if len(remote.DenyImports) > 0 {
96✔
167
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
168
                        }
1✔
169
                }
170
                s.mu.Unlock()
1,363✔
171
                if creds != _EMPTY_ {
1,415✔
172
                        contents, err := os.ReadFile(creds)
52✔
173
                        defer wipeSlice(contents)
52✔
174
                        if err != nil {
52✔
175
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
176
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
52✔
177
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
178
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
52✔
179
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
180
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
52✔
181
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
182
                        } else if isSysAccRemote {
56✔
183
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
184
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
185
                                }
1✔
186
                        } else {
48✔
187
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
54✔
188
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
189
                                }
6✔
190
                        }
191
                }
192
                return remote
1,363✔
193
        }
194
        for _, r := range remotes {
2,582✔
195
                // We need to call this, even if the leaf is disabled. This is so that
1,363✔
196
                // the number of internal configuration matches the options' remote leaf
1,363✔
197
                // configuration required for configuration reload.
1,363✔
198
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,363✔
199
                if !r.Disabled {
2,725✔
200
                        s.connectToRemoteLeafNodeAsynchronously(remote, true)
1,362✔
201
                }
1,362✔
202
        }
203
}
204

205
// Ensure that leafnode is properly configured.
206
func validateLeafNode(o *Options) error {
7,863✔
207
        if err := validateLeafNodeAuthOptions(o); err != nil {
7,865✔
208
                return err
2✔
209
        }
2✔
210

211
        if len(o.LeafNode.Remotes) > 0 {
9,122✔
212
                names := make(map[string]struct{})
1,261✔
213
                // Check for duplicate remotes, also, users can bind to any local account,
1,261✔
214
                // if its empty we will assume the $G account.
1,261✔
215
                for _, r := range o.LeafNode.Remotes {
2,677✔
216
                        if r.LocalAccount == _EMPTY_ {
1,857✔
217
                                r.LocalAccount = globalAccountName
441✔
218
                        }
441✔
219
                        rn := r.name()
1,416✔
220
                        if _, dup := names[rn]; dup {
1,419✔
221
                                return fmt.Errorf("duplicate remote %s", r.safeName())
3✔
222
                        }
3✔
223
                        names[rn] = struct{}{}
1,413✔
224
                }
225
        }
226

227
        // In local config mode, check that leafnode configuration refers to accounts that exist.
228
        if len(o.TrustedOperators) == 0 {
15,401✔
229
                accNames := map[string]struct{}{}
7,543✔
230
                for _, a := range o.Accounts {
16,009✔
231
                        accNames[a.Name] = struct{}{}
8,466✔
232
                }
8,466✔
233
                // global account is always created
234
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,543✔
235
                // in the context of leaf nodes, empty account means global account
7,543✔
236
                accNames[_EMPTY_] = struct{}{}
7,543✔
237
                // system account either exists or, if not disabled, will be created
7,543✔
238
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,559✔
239
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,016✔
240
                }
6,016✔
241
                checkAccountExists := func(accName string, cfgType string) error {
16,499✔
242
                        if _, ok := accNames[accName]; !ok {
8,958✔
243
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
244
                        }
2✔
245
                        return nil
8,954✔
246
                }
247
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,544✔
248
                        return err
1✔
249
                }
1✔
250
                for _, lu := range o.LeafNode.Users {
7,559✔
251
                        if lu.Account == nil { // means global account
27✔
252
                                continue
10✔
253
                        }
254
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
255
                                return err
×
256
                        }
×
257
                }
258
                for _, r := range o.LeafNode.Remotes {
8,948✔
259
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,407✔
260
                                return err
1✔
261
                        }
1✔
262
                }
263
        } else {
315✔
264
                if len(o.LeafNode.Users) != 0 {
316✔
265
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
266
                }
1✔
267
                for _, r := range o.LeafNode.Remotes {
315✔
268
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
269
                                return fmt.Errorf(
1✔
270
                                        "operator mode requires account nkeys in remotes. " +
1✔
271
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
272
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
273
                        }
1✔
274
                }
275
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
314✔
276
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
277
                }
1✔
278
        }
279

280
        // Validate compression settings
281
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,221✔
282
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,373✔
283
                        return err
5✔
284
                }
5✔
285
        }
286

287
        // If a remote has a websocket scheme, all need to have it.
288
        for _, rcfg := range o.LeafNode.Remotes {
9,253✔
289
                // Validate proxy configuration
1,405✔
290
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,411✔
291
                        return err
6✔
292
                }
6✔
293

294
                if len(rcfg.URLs) >= 2 {
1,601✔
295
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
202✔
296
                        for i := 1; i < len(rcfg.URLs); i++ {
613✔
297
                                u := rcfg.URLs[i]
411✔
298
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
418✔
299
                                        ok = false
7✔
300
                                        break
7✔
301
                                }
302
                        }
303
                        if !ok {
209✔
304
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
305
                        }
7✔
306
                }
307
                // Validate compression settings
308
                if rcfg.Compression.Mode != _EMPTY_ {
2,780✔
309
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,393✔
310
                                return err
5✔
311
                        }
5✔
312
                }
313
        }
314

315
        if o.LeafNode.Port == 0 {
11,890✔
316
                return nil
4,060✔
317
        }
4,060✔
318

319
        // If MinVersion is defined, check that it is valid.
320
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,774✔
321
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
322
                        return err
2✔
323
                }
2✔
324
        }
325

326
        // The checks below will be done only when detecting that we are configured
327
        // with gateways. So if an option validation needs to be done regardless,
328
        // it MUST be done before this point!
329

330
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,860✔
331
                return nil
3,092✔
332
        }
3,092✔
333
        // If we are here we have both leaf nodes and gateways defined, make sure there
334
        // is a system account defined.
335
        if o.SystemAccount == _EMPTY_ {
677✔
336
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
337
        }
1✔
338
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
675✔
339
                return fmt.Errorf("leafnode: %v", err)
×
340
        }
×
341
        return nil
675✔
342
}
343

344
func checkLeafMinVersionConfig(mv string) error {
8✔
345
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
346
                if err != nil {
6✔
347
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
348
                } else {
4✔
349
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
350
                }
2✔
351
        }
352
        return nil
4✔
353
}
354

355
// Used to validate user names in LeafNode configuration.
356
// - rejects mix of single and multiple users.
357
// - rejects duplicate user names.
358
func validateLeafNodeAuthOptions(o *Options) error {
7,912✔
359
        if len(o.LeafNode.Users) == 0 {
15,798✔
360
                return nil
7,886✔
361
        }
7,886✔
362
        if o.LeafNode.Username != _EMPTY_ {
28✔
363
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
364
        }
2✔
365
        if o.LeafNode.Nkey != _EMPTY_ {
24✔
366
                return fmt.Errorf("can not have a single nkey and a users array")
×
367
        }
×
368
        users := map[string]struct{}{}
24✔
369
        for _, u := range o.LeafNode.Users {
62✔
370
                if _, exists := users[u.Username]; exists {
40✔
371
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
372
                }
2✔
373
                users[u.Username] = struct{}{}
36✔
374
        }
375
        return nil
22✔
376
}
377

378
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
1,985✔
379
        var warnings []string
1,985✔
380

1,985✔
381
        if remote.Proxy.URL == _EMPTY_ {
3,944✔
382
                return warnings, nil
1,959✔
383
        }
1,959✔
384

385
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
386
        if err != nil {
27✔
387
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
388
        }
1✔
389

390
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
391
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
392
        }
2✔
393

394
        if proxyURL.Host == _EMPTY_ {
25✔
395
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
396
        }
2✔
397

398
        if remote.Proxy.Timeout < 0 {
22✔
399
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
400
        }
1✔
401

402
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
403
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
404
        }
4✔
405

406
        if len(remote.URLs) > 0 {
32✔
407
                hasWebSocketURL := false
16✔
408
                hasNonWebSocketURL := false
16✔
409

16✔
410
                for _, remoteURL := range remote.URLs {
33✔
411
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
412
                                hasWebSocketURL = true
13✔
413
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
414
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
415
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
416
                                }
1✔
417
                        } else {
4✔
418
                                hasNonWebSocketURL = true
4✔
419
                        }
4✔
420
                }
421

422
                if !hasWebSocketURL {
18✔
423
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
424
                } else if hasNonWebSocketURL {
16✔
425
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
426
                }
1✔
427
        }
428

429
        return warnings, nil
15✔
430
}
431

432
// Wait for the configured reconnect interval before attempting to connect
433
// again to the remote leafnode.
434
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
246✔
435
        clearInProgress := true
246✔
436
        defer func() {
491✔
437
                s.grWG.Done()
245✔
438
                if clearInProgress {
313✔
439
                        remote.setConnectInProgress(false)
68✔
440
                }
68✔
441
        }()
442
        delay := s.getOpts().LeafNode.ReconnectInterval
246✔
443
        select {
246✔
444
        case <-time.After(delay):
185✔
445
        case <-remote.quitCh:
×
446
                return
×
447
        case <-s.quitCh:
61✔
448
                return
61✔
449
        }
450
        clearInProgress = !connectToRemoteLeafNode(s, remote, false)
185✔
451
}
452

453
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
454
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,363✔
455
        cfg := &leafNodeCfg{
1,363✔
456
                RemoteLeafOpts: remote,
1,363✔
457
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,363✔
458
                quitCh:         make(chan struct{}, 1),
1,363✔
459
        }
1,363✔
460
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,371✔
461
                perms := &Permissions{}
8✔
462
                if len(remote.DenyExports) > 0 {
16✔
463
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
464
                }
8✔
465
                if len(remote.DenyImports) > 0 {
15✔
466
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
467
                }
7✔
468
                cfg.perms = perms
8✔
469
        }
470
        // Start with the one that is configured. We will add to this
471
        // array when receiving async leafnode INFOs.
472
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,363✔
473
        // If allowed to randomize, do it on our copy of URLs
1,363✔
474
        if !remote.NoRandomize {
2,725✔
475
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,750✔
476
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
388✔
477
                })
388✔
478
        }
479
        // If we are TLS make sure we save off a proper servername if possible.
480
        // Do same for user/password since we may need them to connect to
481
        // a bare URL that we get from INFO protocol.
482
        for _, u := range cfg.urls {
3,129✔
483
                cfg.saveTLSHostname(u)
1,766✔
484
                cfg.saveUserPassword(u)
1,766✔
485
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,766✔
486
                // config, mark that we should be using TLS anyway.
1,766✔
487
                if !cfg.TLS && isWSSURL(u) {
1,767✔
488
                        cfg.TLS = true
1✔
489
                }
1✔
490
        }
491
        return cfg
1,363✔
492
}
493

494
// Notifies the quit channel without blocking.
495
// No lock is needed to invoke this function.
496
func (cfg *leafNodeCfg) notifyQuitChannel() {
2✔
497
        select {
2✔
498
        case cfg.quitCh <- struct{}{}:
2✔
499
        default:
×
500
        }
501
}
502

503
// Sets the connect-in-progress status for this remote leaf configuration.
504
func (cfg *leafNodeCfg) setConnectInProgress(inProgress bool) {
3,608✔
505
        cfg.Lock()
3,608✔
506
        defer cfg.Unlock()
3,608✔
507
        // In both cases we want to drain the "quit" channel.
3,608✔
508
        select {
3,608✔
509
        case <-cfg.quitCh:
1✔
510
        default:
3,607✔
511
        }
512
        cfg.connInProgress = inProgress
3,608✔
513
}
514

515
// Returns `true` if this remote is in the middle of a connect, `false` otherwise.
516
func (cfg *leafNodeCfg) isConnectInProgress() bool {
×
517
        cfg.RLock()
×
518
        defer cfg.RUnlock()
×
519
        return cfg.connInProgress
×
520
}
×
521

522
// Mark that this remote is being removed from the configuration.
523
func (cfg *leafNodeCfg) markAsRemoved() {
×
524
        cfg.Lock()
×
525
        defer cfg.Unlock()
×
526
        // This function should be invoked only once, but protect.
×
527
        if cfg.removed {
×
528
                return
×
529
        }
×
530
        cfg.removed = true
×
531
        cfg.notifyQuitChannel()
×
532
}
533

534
// Returns false if it has been disabled or removed.
535
func (cfg *leafNodeCfg) stillValid() bool {
7,456✔
536
        cfg.RLock()
7,456✔
537
        defer cfg.RUnlock()
7,456✔
538
        return !cfg.Disabled && !cfg.removed
7,456✔
539
}
7,456✔
540

541
// Will pick an URL from the list of available URLs.
542
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,022✔
543
        cfg.Lock()
6,022✔
544
        defer cfg.Unlock()
6,022✔
545
        // If the current URL is the first in the list and we have more than
6,022✔
546
        // one URL, then move that one to end of the list.
6,022✔
547
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
8,610✔
548
                first := cfg.urls[0]
2,588✔
549
                copy(cfg.urls, cfg.urls[1:])
2,588✔
550
                cfg.urls[len(cfg.urls)-1] = first
2,588✔
551
        }
2,588✔
552
        cfg.curURL = cfg.urls[0]
6,022✔
553
        return cfg.curURL
6,022✔
554
}
555

556
// Returns the current URL
557
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
78✔
558
        cfg.RLock()
78✔
559
        defer cfg.RUnlock()
78✔
560
        return cfg.curURL
78✔
561
}
78✔
562

563
// Returns how long the server should wait before attempting
564
// to solicit a remote leafnode connection.
565
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,549✔
566
        cfg.RLock()
1,549✔
567
        delay := cfg.connDelay
1,549✔
568
        cfg.RUnlock()
1,549✔
569
        return delay
1,549✔
570
}
1,549✔
571

572
// Sets the connect delay.
573
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
152✔
574
        cfg.Lock()
152✔
575
        cfg.connDelay = delay
152✔
576
        cfg.Unlock()
152✔
577
}
152✔
578

579
// Ensure that non-exported options (used in tests) have
580
// been properly set.
581
func (s *Server) setLeafNodeNonExportedOptions() {
6,666✔
582
        opts := s.getOpts()
6,666✔
583
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,666✔
584
        if s.leafNodeOpts.dialTimeout == 0 {
13,331✔
585
                // Use same timeouts as routes for now.
6,665✔
586
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,665✔
587
        }
6,665✔
588
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,666✔
589
        if s.leafNodeOpts.resolver == nil {
13,329✔
590
                s.leafNodeOpts.resolver = net.DefaultResolver
6,663✔
591
        }
6,663✔
592
}
593

594
const sharedSysAccDelay = 250 * time.Millisecond
595

596
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
597
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
598
        proxyAddr, err := url.Parse(proxyURL)
11✔
599
        if err != nil {
11✔
600
                // This should not happen since proxy URL is validated during configuration parsing
×
601
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
602
        }
×
603

604
        // Connect to the proxy server
605
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
606
        if err != nil {
11✔
607
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
608
        }
×
609

610
        // Set deadline for the entire proxy handshake
611
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
612
                conn.Close()
×
613
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
614
        }
×
615

616
        req := &http.Request{
11✔
617
                Method: http.MethodConnect,
11✔
618
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
619
                Host:   targetHost,
11✔
620
                Header: make(http.Header),
11✔
621
        }
11✔
622

11✔
623
        // Add proxy authentication if provided
11✔
624
        if username != "" && password != "" {
13✔
625
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
626
        }
2✔
627

628
        if err := req.Write(conn); err != nil {
11✔
629
                conn.Close()
×
630
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
631
        }
×
632

633
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
634
        if err != nil {
11✔
635
                conn.Close()
×
636
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
637
        }
×
638

639
        if resp.StatusCode != http.StatusOK {
12✔
640
                resp.Body.Close()
1✔
641
                conn.Close()
1✔
642
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
643
        }
1✔
644

645
        // Close the response body
646
        resp.Body.Close()
10✔
647

10✔
648
        // Clear the deadline now that we've finished the proxy handshake
10✔
649
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
650
                conn.Close()
×
651
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
652
        }
×
653

654
        return conn, nil
10✔
655
}
656

657
// Connect to a remote leaf node asynchronously (that is, this function will do
658
// the connect in a go routine).
659
func (s *Server) connectToRemoteLeafNodeAsynchronously(remote *leafNodeCfg, firstConnect bool) {
1,364✔
660
        remote.setConnectInProgress(true)
1,364✔
661
        s.startGoRoutine(func() {
2,728✔
662
                defer s.grWG.Done()
1,364✔
663
                if !connectToRemoteLeafNode(s, remote, firstConnect) {
2,106✔
664
                        remote.setConnectInProgress(false)
742✔
665
                }
742✔
666
        })
667
}
668

669
// Connect to a remote leaf node. Should only be invoked from
670
// `s.connectToRemoteLeafNodeAsynchronously()` or `s.reConnectToRemoteLeafNode()`.
671
// Returns `true` if this function invoked `s.createLeafNode()`, false otherwise.
672
func connectToRemoteLeafNode(s *Server, remote *leafNodeCfg, firstConnect bool) bool {
1,549✔
673

1,549✔
674
        if remote == nil || len(remote.URLs) == 0 {
1,549✔
675
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
676
                return false
×
677
        }
×
678

679
        opts := s.getOpts()
1,549✔
680
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,549✔
681
        s.mu.RLock()
1,549✔
682
        dialTimeout := s.leafNodeOpts.dialTimeout
1,549✔
683
        resolver := s.leafNodeOpts.resolver
1,549✔
684
        var isSysAcc bool
1,549✔
685
        if s.eventsEnabled() {
3,059✔
686
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,510✔
687
        }
1,510✔
688
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,549✔
689
        s.mu.RUnlock()
1,549✔
690

1,549✔
691
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,549✔
692
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,619✔
693
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
70✔
694
                remote.setConnectDelay(sharedSysAccDelay)
70✔
695
        }
70✔
696

697
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,626✔
698
                select {
77✔
699
                case <-time.After(connDelay):
70✔
700
                case <-remote.quitCh:
×
701
                        return false
×
702
                case <-s.quitCh:
7✔
703
                        return false
7✔
704
                }
705
                remote.setConnectDelay(0)
70✔
706
        }
707

708
        var conn net.Conn
1,542✔
709

1,542✔
710
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,542✔
711

1,542✔
712
        // Capture proxy configuration once before the loop with proper locking
1,542✔
713
        remote.RLock()
1,542✔
714
        proxyURL := remote.Proxy.URL
1,542✔
715
        proxyUsername := remote.Proxy.Username
1,542✔
716
        proxyPassword := remote.Proxy.Password
1,542✔
717
        proxyTimeout := remote.Proxy.Timeout
1,542✔
718
        remote.RUnlock()
1,542✔
719

1,542✔
720
        // Set default proxy timeout if not specified
1,542✔
721
        if proxyTimeout == 0 {
3,076✔
722
                proxyTimeout = dialTimeout
1,534✔
723
        }
1,534✔
724

725
        attempts := 0
1,542✔
726

1,542✔
727
        // In case the migrate timer was created but not canceled, do it when
1,542✔
728
        // this function exits. Note that the timer would not be created if
1,542✔
729
        // `jetstreamMigrateDelay == 0`.
1,542✔
730
        if jetstreamMigrateDelay > 0 {
1,550✔
731
                defer remote.cancelMigrateTimer()
8✔
732
        }
8✔
733

734
        for s.isRunning() && remote.stillValid() {
7,564✔
735
                rURL := remote.pickNextURL()
6,022✔
736
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,022✔
737
                if err == nil {
12,039✔
738
                        var ipStr string
6,017✔
739
                        if url != rURL.Host {
6,094✔
740
                                ipStr = fmt.Sprintf(" (%s)", url)
77✔
741
                        }
77✔
742
                        // Some test may want to disable remotes from connecting
743
                        if s.isLeafConnectDisabled() {
6,150✔
744
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
133✔
745
                                err = ErrLeafNodeDisabled
133✔
746
                        } else {
6,017✔
747
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
5,884✔
748

5,884✔
749
                                // Check if proxy is configured
5,884✔
750
                                if proxyURL != _EMPTY_ {
5,892✔
751
                                        targetHost := rURL.Host
8✔
752
                                        // If URL doesn't include port, add the default port for the scheme
8✔
753
                                        if rURL.Port() == _EMPTY_ {
8✔
754
                                                defaultPort := "80"
×
755
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
756
                                                        defaultPort = "443"
×
757
                                                }
×
758
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
759
                                        }
760

761
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
762
                                } else {
5,876✔
763
                                        // Direct connection
5,876✔
764
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
5,876✔
765
                                }
5,876✔
766
                        }
767
                }
768
                if err != nil {
11,245✔
769
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,223✔
770
                        delay := reconnectDelay + jitter
5,223✔
771
                        attempts++
5,223✔
772
                        if s.shouldReportConnectErr(firstConnect, attempts) {
8,567✔
773
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,344✔
774
                        } else {
5,223✔
775
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,879✔
776
                        }
1,879✔
777
                        remote.Lock()
5,223✔
778
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,223✔
779
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,231✔
780
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
781
                                        s.checkJetStreamMigrate(remote)
8✔
782
                                })
8✔
783
                        }
784
                        remote.Unlock()
5,223✔
785
                        select {
5,223✔
786
                        case <-s.quitCh:
727✔
787
                                return false
727✔
788
                        case <-remote.quitCh:
1✔
789
                                return false
1✔
790
                        case <-time.After(delay):
4,494✔
791
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
4,494✔
792
                                // This will be used if JetStreamClusterMigrateDelay was not set
4,494✔
793
                                if jetstreamMigrateDelay == 0 {
8,914✔
794
                                        s.checkJetStreamMigrate(remote)
4,420✔
795
                                }
4,420✔
796
                                continue
4,494✔
797
                        }
798
                }
799
                remote.cancelMigrateTimer()
799✔
800
                // We can check here, but really we will have to check again when the server
799✔
801
                // is about to add to the `s.leafs` map later in the process.
799✔
802
                if !remote.stillValid() {
799✔
803
                        conn.Close()
×
804
                        return false
×
805
                }
×
806

807
                // We have a connection here to a remote server.
808
                // Go ahead and create our leaf node and return.
809
                s.createLeafNode(conn, rURL, remote, nil)
799✔
810

799✔
811
                // Clear any observer states if we had them.
799✔
812
                s.clearObserverState(remote)
799✔
813

799✔
814
                return true
799✔
815
        }
816

817
        return false
14✔
818
}
819

820
func (cfg *leafNodeCfg) cancelMigrateTimer() {
807✔
821
        cfg.Lock()
807✔
822
        stopAndClearTimer(&cfg.jsMigrateTimer)
807✔
823
        cfg.Unlock()
807✔
824
}
807✔
825

826
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
827
func (s *Server) clearObserverState(remote *leafNodeCfg) {
799✔
828
        s.mu.RLock()
799✔
829
        accName := remote.LocalAccount
799✔
830
        s.mu.RUnlock()
799✔
831

799✔
832
        acc, err := s.LookupAccount(accName)
799✔
833
        if err != nil {
801✔
834
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
835
                return
2✔
836
        }
2✔
837

838
        acc.jscmMu.Lock()
797✔
839
        defer acc.jscmMu.Unlock()
797✔
840

797✔
841
        // Walk all streams looking for any clustered stream, skip otherwise.
797✔
842
        for _, mset := range acc.streams() {
815✔
843
                node := mset.raftNode()
18✔
844
                if node == nil {
28✔
845
                        // Not R>1
10✔
846
                        continue
10✔
847
                }
848
                // Check consumers
849
                for _, o := range mset.getConsumers() {
10✔
850
                        if n := o.raftNode(); n != nil {
4✔
851
                                // Ensure we can become a leader again.
2✔
852
                                n.SetObserver(false)
2✔
853
                        }
2✔
854
                }
855
                // Ensure we can not become a leader again.
856
                node.SetObserver(false)
8✔
857
        }
858
}
859

860
// Check to see if we should migrate any assets from this account.
861
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,428✔
862
        s.mu.RLock()
4,428✔
863
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,428✔
864
        s.mu.RUnlock()
4,428✔
865

4,428✔
866
        if !shouldMigrate {
8,789✔
867
                return
4,361✔
868
        }
4,361✔
869

870
        acc, err := s.LookupAccount(accName)
67✔
871
        if err != nil {
67✔
872
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
873
                return
×
874
        }
×
875

876
        acc.jscmMu.Lock()
67✔
877
        defer acc.jscmMu.Unlock()
67✔
878

67✔
879
        // Walk all streams looking for any clustered stream, skip otherwise.
67✔
880
        // If we are the leader force stepdown.
67✔
881
        for _, mset := range acc.streams() {
100✔
882
                node := mset.raftNode()
33✔
883
                if node == nil {
33✔
884
                        // Not R>1
×
885
                        continue
×
886
                }
887
                // Collect any consumers
888
                for _, o := range mset.getConsumers() {
54✔
889
                        if n := o.raftNode(); n != nil {
42✔
890
                                n.StepDown()
21✔
891
                                // Ensure we can not become a leader while in this state.
21✔
892
                                n.SetObserver(true)
21✔
893
                        }
21✔
894
                }
895
                // Stepdown if this stream was leader.
896
                node.StepDown()
33✔
897
                // Ensure we can not become a leader while in this state.
33✔
898
                node.SetObserver(true)
33✔
899
        }
900
}
901

902
// Helper for checking.
903
func (s *Server) isLeafConnectDisabled() bool {
6,017✔
904
        s.mu.RLock()
6,017✔
905
        defer s.mu.RUnlock()
6,017✔
906
        return s.leafDisableConnect
6,017✔
907
}
6,017✔
908

909
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
910
// come from the server we connect to.
911
//
912
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
913
// However, this was causing failures for users that did not set the scheme (and
914
// their remote connections did not have a tls{} block).
915
// We now save the host name regardless in case the remote returns an INFO indicating
916
// that TLS is required.
917
//
918
// Lock held on entry.
919
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,389✔
920
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,406✔
921
                cfg.tlsName = u.Hostname()
17✔
922
        }
17✔
923
}
924

925
// Save off the username/password for when we connect using a bare URL
926
// that we get from the INFO protocol.
927
//
928
// Lock held on entry.
929
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,766✔
930
        if cfg.username == _EMPTY_ && u.User != nil {
2,065✔
931
                cfg.username = u.User.Username()
299✔
932
                cfg.password, _ = u.User.Password()
299✔
933
        }
299✔
934
}
935

936
// This starts the leafnode accept loop in a go routine, unless it
937
// is detected that the server has already been shutdown.
938
func (s *Server) startLeafNodeAcceptLoop() {
3,748✔
939
        // Snapshot server options.
3,748✔
940
        opts := s.getOpts()
3,748✔
941

3,748✔
942
        port := opts.LeafNode.Port
3,748✔
943
        if port == -1 {
7,320✔
944
                port = 0
3,572✔
945
        }
3,572✔
946

947
        if s.isShuttingDown() {
3,748✔
948
                return
×
949
        }
×
950

951
        s.mu.Lock()
3,748✔
952
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,748✔
953
        l, e := natsListen("tcp", hp)
3,748✔
954
        s.leafNodeListenerErr = e
3,748✔
955
        if e != nil {
3,748✔
956
                s.mu.Unlock()
×
957
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
958
                return
×
959
        }
×
960

961
        s.Noticef("Listening for leafnode connections on %s",
3,748✔
962
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,748✔
963

3,748✔
964
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,748✔
965
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,748✔
966
        // Do not set compression in this Info object, it would possibly cause
3,748✔
967
        // issues when sending asynchronous INFO to the remote.
3,748✔
968
        info := Info{
3,748✔
969
                ID:            s.info.ID,
3,748✔
970
                Name:          s.info.Name,
3,748✔
971
                Version:       s.info.Version,
3,748✔
972
                GitCommit:     gitCommit,
3,748✔
973
                GoVersion:     runtime.Version(),
3,748✔
974
                AuthRequired:  true,
3,748✔
975
                TLSRequired:   tlsRequired,
3,748✔
976
                TLSVerify:     tlsVerify,
3,748✔
977
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,748✔
978
                Headers:       s.supportsHeaders(),
3,748✔
979
                JetStream:     opts.JetStream,
3,748✔
980
                Domain:        opts.JetStreamDomain,
3,748✔
981
                Proto:         s.getServerProto(),
3,748✔
982
                InfoOnConnect: true,
3,748✔
983
                JSApiLevel:    JSApiLevel,
3,748✔
984
        }
3,748✔
985
        // If we have selected a random port...
3,748✔
986
        if port == 0 {
7,320✔
987
                // Write resolved port back to options.
3,572✔
988
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,572✔
989
        }
3,572✔
990

991
        s.leafNodeInfo = info
3,748✔
992
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,748✔
993
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,748✔
994
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
995
                l.Close()
×
996
                s.mu.Unlock()
×
997
                return
×
998
        }
×
999
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,748✔
1000
        s.generateLeafNodeInfoJSON()
3,748✔
1001

3,748✔
1002
        // Setup state that can enable shutdown
3,748✔
1003
        s.leafNodeListener = l
3,748✔
1004

3,748✔
1005
        // As of now, a server that does not have remotes configured would
3,748✔
1006
        // never solicit a connection, so we should not have to warn if
3,748✔
1007
        // InsecureSkipVerify is set in main LeafNodes config (since
3,748✔
1008
        // this TLS setting matters only when soliciting a connection).
3,748✔
1009
        // Still, warn if insecure is set in any of LeafNode block.
3,748✔
1010
        // We need to check remotes, even if tls is not required on accept.
3,748✔
1011
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,748✔
1012
        if !warn {
7,494✔
1013
                for _, r := range opts.LeafNode.Remotes {
3,937✔
1014
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
191✔
1015
                                warn = true
×
1016
                                break
×
1017
                        }
1018
                }
1019
        }
1020
        if warn {
3,750✔
1021
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
1022
        }
2✔
1023
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,592✔
1024
        s.mu.Unlock()
3,748✔
1025
}
1026

1027
// RegEx to match a creds file with user JWT and Seed.
1028
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
1029

1030
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
1031
// Lock should be held entering here.
1032
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
674✔
1033
        // We support basic user/pass and operator based user JWT with signatures.
674✔
1034
        cinfo := leafConnectInfo{
674✔
1035
                Version:       VERSION,
674✔
1036
                ID:            c.srv.info.ID,
674✔
1037
                Domain:        c.srv.info.Domain,
674✔
1038
                Name:          c.srv.info.Name,
674✔
1039
                Hub:           c.leaf.remote.Hub,
674✔
1040
                Cluster:       clusterName,
674✔
1041
                Headers:       headers,
674✔
1042
                JetStream:     c.acc.jetStreamConfigured(),
674✔
1043
                DenyPub:       c.leaf.remote.DenyImports,
674✔
1044
                Compression:   c.leaf.compression,
674✔
1045
                RemoteAccount: c.acc.GetName(),
674✔
1046
                Proto:         c.srv.getServerProto(),
674✔
1047
                Isolate:       c.leaf.remote.RequestIsolation,
674✔
1048
        }
674✔
1049

674✔
1050
        // If a signature callback is specified, this takes precedence over anything else.
674✔
1051
        if cb := c.leaf.remote.SignatureCB; cb != nil {
679✔
1052
                nonce := c.nonce
5✔
1053
                c.mu.Unlock()
5✔
1054
                jwt, sigraw, err := cb(nonce)
5✔
1055
                c.mu.Lock()
5✔
1056
                if err == nil && c.isClosed() {
6✔
1057
                        err = ErrConnectionClosed
1✔
1058
                }
1✔
1059
                if err != nil {
7✔
1060
                        c.Errorf("Error signing the nonce: %v", err)
2✔
1061
                        return err
2✔
1062
                }
2✔
1063
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1064
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
1065

1066
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
725✔
1067
                // Check for credentials first, that will take precedence..
56✔
1068
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
56✔
1069
                contents, err := os.ReadFile(creds)
56✔
1070
                if err != nil {
56✔
1071
                        c.Errorf("%v", err)
×
1072
                        return err
×
1073
                }
×
1074
                defer wipeSlice(contents)
56✔
1075
                items := credsRe.FindAllSubmatch(contents, -1)
56✔
1076
                if len(items) < 2 {
56✔
1077
                        c.Errorf("Credentials file malformed")
×
1078
                        return err
×
1079
                }
×
1080
                // First result should be the user JWT.
1081
                // We copy here so that the file containing the seed will be wiped appropriately.
1082
                raw := items[0][1]
56✔
1083
                tmp := make([]byte, len(raw))
56✔
1084
                copy(tmp, raw)
56✔
1085
                // Seed is second item.
56✔
1086
                kp, err := nkeys.FromSeed(items[1][1])
56✔
1087
                if err != nil {
56✔
1088
                        c.Errorf("Credentials file has malformed seed")
×
1089
                        return err
×
1090
                }
×
1091
                // Wipe our key on exit.
1092
                defer kp.Wipe()
56✔
1093

56✔
1094
                sigraw, _ := kp.Sign(c.nonce)
56✔
1095
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
56✔
1096
                cinfo.JWT = bytesToString(tmp)
56✔
1097
                cinfo.Sig = sig
56✔
1098
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
618✔
1099
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1100
                if err != nil {
5✔
1101
                        c.Errorf("Remote nkey has malformed seed")
×
1102
                        return err
×
1103
                }
×
1104
                // Wipe our key on exit.
1105
                defer kp.Wipe()
5✔
1106
                sigraw, _ := kp.Sign(c.nonce)
5✔
1107
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1108
                pkey, _ := kp.PublicKey()
5✔
1109
                cinfo.Nkey = pkey
5✔
1110
                cinfo.Sig = sig
5✔
1111
        }
1112
        // In addition, and this is to allow auth callout, set user/password or
1113
        // token if applicable.
1114
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
996✔
1115
                cinfo.User = userInfo.Username()
324✔
1116
                var ok bool
324✔
1117
                cinfo.Pass, ok = userInfo.Password()
324✔
1118
                // For backward compatibility, if only username is provided, set both
324✔
1119
                // Token and User, not just Token.
324✔
1120
                if !ok {
333✔
1121
                        cinfo.Token = cinfo.User
9✔
1122
                }
9✔
1123
        } else if c.leaf.remote.username != _EMPTY_ {
354✔
1124
                cinfo.User = c.leaf.remote.username
6✔
1125
                cinfo.Pass = c.leaf.remote.password
6✔
1126
                // For backward compatibility, if only username is provided, set both
6✔
1127
                // Token and User, not just Token.
6✔
1128
                if cinfo.Pass == _EMPTY_ {
6✔
1129
                        cinfo.Token = cinfo.User
×
1130
                }
×
1131
        }
1132
        b, err := json.Marshal(cinfo)
672✔
1133
        if err != nil {
672✔
1134
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1135
                return err
×
1136
        }
×
1137
        // Although this call is made before the writeLoop is created,
1138
        // we don't really need to send in place. The protocol will be
1139
        // sent out by the writeLoop.
1140
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
672✔
1141
        return nil
672✔
1142
}
1143

1144
// Makes a deep copy of the LeafNode Info structure.
1145
// The server lock is held on entry.
1146
func (s *Server) copyLeafNodeInfo() *Info {
2,723✔
1147
        clone := s.leafNodeInfo
2,723✔
1148
        // Copy the array of urls.
2,723✔
1149
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,937✔
1150
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,214✔
1151
        }
2,214✔
1152
        return &clone
2,723✔
1153
}
1154

1155
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1156
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1157
// Returns a boolean indicating if the URL was added or not.
1158
// Server lock is held on entry
1159
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,512✔
1160
        if s.leafURLsMap.addUrl(urlStr) {
15,019✔
1161
                s.generateLeafNodeInfoJSON()
7,507✔
1162
                return true
7,507✔
1163
        }
7,507✔
1164
        return false
5✔
1165
}
1166

1167
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1168
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1169
// Returns a boolean indicating if the URL was removed or not.
1170
// Server lock is held on entry.
1171
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,512✔
1172
        // Don't need to do this if we are removing the route connection because
7,512✔
1173
        // we are shuting down...
7,512✔
1174
        if s.isShuttingDown() {
11,437✔
1175
                return false
3,925✔
1176
        }
3,925✔
1177
        if s.leafURLsMap.removeUrl(urlStr) {
7,170✔
1178
                s.generateLeafNodeInfoJSON()
3,583✔
1179
                return true
3,583✔
1180
        }
3,583✔
1181
        return false
4✔
1182
}
1183

1184
// Server lock is held on entry
1185
func (s *Server) generateLeafNodeInfoJSON() {
14,838✔
1186
        s.leafNodeInfo.Cluster = s.cachedClusterName()
14,838✔
1187
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
14,838✔
1188
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
14,838✔
1189
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
14,838✔
1190
}
14,838✔
1191

1192
// Sends an async INFO protocol so that the connected servers can update
1193
// their list of LeafNode urls.
1194
func (s *Server) sendAsyncLeafNodeInfo() {
11,090✔
1195
        for _, c := range s.leafs {
11,203✔
1196
                c.mu.Lock()
113✔
1197
                c.enqueueProto(s.leafNodeInfoJSON)
113✔
1198
                c.mu.Unlock()
113✔
1199
        }
113✔
1200
}
1201

1202
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1203
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,672✔
1204
        // Snapshot server options.
1,672✔
1205
        opts := s.getOpts()
1,672✔
1206

1,672✔
1207
        maxPay := int32(opts.MaxPayload)
1,672✔
1208
        maxSubs := int32(opts.MaxSubs)
1,672✔
1209
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,672✔
1210
        if maxSubs == 0 {
3,343✔
1211
                maxSubs = -1
1,671✔
1212
        }
1,671✔
1213
        now := time.Now().UTC()
1,672✔
1214

1,672✔
1215
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,672✔
1216
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,672✔
1217
        c.leaf = &leaf{}
1,672✔
1218

1,672✔
1219
        // If the leafnode subject interest should be isolated, flag it here.
1,672✔
1220
        s.optsMu.RLock()
1,672✔
1221
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,469✔
1222
                c.leaf.isolated = remote.LocalIsolation
797✔
1223
        }
797✔
1224
        s.optsMu.RUnlock()
1,672✔
1225

1,672✔
1226
        // For accepted LN connections, ws will be != nil if it was accepted
1,672✔
1227
        // through the Websocket port.
1,672✔
1228
        c.ws = ws
1,672✔
1229

1,672✔
1230
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,672✔
1231
        // a remote Leaf Node connection as a websocket connection.
1,672✔
1232
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,722✔
1233
                remote.RLock()
50✔
1234
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
50✔
1235
                remote.RUnlock()
50✔
1236
        }
50✔
1237

1238
        // Determines if we are soliciting the connection or not.
1239
        var solicited bool
1,672✔
1240
        var acc *Account
1,672✔
1241
        var remoteSuffix string
1,672✔
1242
        if remote != nil {
2,471✔
1243
                // For now, if lookup fails, we will constantly try
799✔
1244
                // to recreate this LN connection.
799✔
1245
                lacc := remote.LocalAccount
799✔
1246
                var err error
799✔
1247
                acc, err = s.LookupAccount(lacc)
799✔
1248
                if err != nil {
801✔
1249
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1250
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1251
                        // remote needs to be set or retry won't happen
2✔
1252
                        c.leaf.remote = remote
2✔
1253
                        c.closeConnection(MissingAccount)
2✔
1254
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1255
                        return nil
2✔
1256
                }
2✔
1257
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
797✔
1258
        }
1259

1260
        c.mu.Lock()
1,670✔
1261
        c.initClient()
1,670✔
1262
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,670✔
1263

1,670✔
1264
        var (
1,670✔
1265
                tlsFirst         bool
1,670✔
1266
                tlsFirstFallback time.Duration
1,670✔
1267
                infoTimeout      time.Duration
1,670✔
1268
        )
1,670✔
1269
        if remote != nil {
2,467✔
1270
                solicited = true
797✔
1271
                remote.Lock()
797✔
1272
                c.leaf.remote = remote
797✔
1273
                c.setPermissions(remote.perms)
797✔
1274
                if !c.leaf.remote.Hub {
1,578✔
1275
                        c.leaf.isSpoke = true
781✔
1276
                }
781✔
1277
                tlsFirst = remote.TLSHandshakeFirst
797✔
1278
                infoTimeout = remote.FirstInfoTimeout
797✔
1279
                remote.Unlock()
797✔
1280
                c.acc = acc
797✔
1281
        } else {
873✔
1282
                c.flags.set(expectConnect)
873✔
1283
                if ws != nil {
902✔
1284
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
29✔
1285
                }
29✔
1286
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
873✔
1287
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
874✔
1288
                        tlsFirstFallback = f
1✔
1289
                }
1✔
1290
        }
1291
        c.mu.Unlock()
1,670✔
1292

1,670✔
1293
        var nonce [nonceLen]byte
1,670✔
1294
        var info *Info
1,670✔
1295

1,670✔
1296
        // Grab this before the client lock below.
1,670✔
1297
        if !solicited {
2,543✔
1298
                // Grab server variables
873✔
1299
                s.mu.Lock()
873✔
1300
                info = s.copyLeafNodeInfo()
873✔
1301
                // For tests that want to simulate old servers, do not set the compression
873✔
1302
                // on the INFO protocol if configured with CompressionNotSupported.
873✔
1303
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,745✔
1304
                        info.Compression = cm
872✔
1305
                }
872✔
1306
                // We always send a nonce for LEAF connections. Do not change that without
1307
                // taking into account presence of proxy trusted keys.
1308
                s.generateNonce(nonce[:])
873✔
1309
                s.mu.Unlock()
873✔
1310
        }
1311

1312
        // Grab lock
1313
        c.mu.Lock()
1,670✔
1314

1,670✔
1315
        var preBuf []byte
1,670✔
1316
        if solicited {
2,467✔
1317
                // For websocket connection, we need to send an HTTP request,
797✔
1318
                // and get the response before starting the readLoop to get
797✔
1319
                // the INFO, etc..
797✔
1320
                if c.isWebsocket() {
847✔
1321
                        var err error
50✔
1322
                        var closeReason ClosedState
50✔
1323

50✔
1324
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
50✔
1325
                        if err != nil {
71✔
1326
                                c.Errorf("Error soliciting websocket connection: %v", err)
21✔
1327
                                c.mu.Unlock()
21✔
1328
                                if closeReason != 0 {
38✔
1329
                                        c.closeConnection(closeReason)
17✔
1330
                                }
17✔
1331
                                return nil
21✔
1332
                        }
1333
                } else {
747✔
1334
                        // If configured to do TLS handshake first
747✔
1335
                        if tlsFirst {
751✔
1336
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1337
                                        c.mu.Unlock()
1✔
1338
                                        return nil
1✔
1339
                                }
1✔
1340
                        }
1341
                        // We need to wait for the info, but not for too long.
1342
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
746✔
1343
                }
1344

1345
                // We will process the INFO from the readloop and finish by
1346
                // sending the CONNECT and finish registration later.
1347
        } else {
873✔
1348
                // Send our info to the other side.
873✔
1349
                // Remember the nonce we sent here for signatures, etc.
873✔
1350
                c.nonce = make([]byte, nonceLen)
873✔
1351
                copy(c.nonce, nonce[:])
873✔
1352
                info.Nonce = bytesToString(c.nonce)
873✔
1353
                info.CID = c.cid
873✔
1354
                proto := generateInfoJSON(info)
873✔
1355

873✔
1356
                var pre []byte
873✔
1357
                // We need first to check for "TLS First" fallback delay.
873✔
1358
                if tlsFirstFallback > 0 {
874✔
1359
                        // We wait and see if we are getting any data. Since we did not send
1✔
1360
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1361
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1362
                        // if it is a rogue agent and not an actual client performing the
1✔
1363
                        // TLS handshake, the error will be detected when performing the
1✔
1364
                        // handshake on our side.
1✔
1365
                        pre = make([]byte, 4)
1✔
1366
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1367
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1368
                        c.nc.SetReadDeadline(time.Time{})
1✔
1369
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1370
                        // with the TLS handshake.
1✔
1371
                        if n > 0 {
1✔
1372
                                pre = pre[:n]
×
1373
                        } else {
1✔
1374
                                // We did not get anything so we will send the INFO protocol.
1✔
1375
                                pre = nil
1✔
1376
                                // Set the boolean to false for the rest of the function.
1✔
1377
                                tlsFirst = false
1✔
1378
                        }
1✔
1379
                }
1380

1381
                if !tlsFirst {
1,741✔
1382
                        // We have to send from this go routine because we may
868✔
1383
                        // have to block for TLS handshake before we start our
868✔
1384
                        // writeLoop go routine. The other side needs to receive
868✔
1385
                        // this before it can initiate the TLS handshake..
868✔
1386
                        c.sendProtoNow(proto)
868✔
1387

868✔
1388
                        // The above call could have marked the connection as closed (due to TCP error).
868✔
1389
                        if c.isClosed() {
868✔
1390
                                c.mu.Unlock()
×
1391
                                c.closeConnection(WriteError)
×
1392
                                return nil
×
1393
                        }
×
1394
                }
1395

1396
                // Check to see if we need to spin up TLS.
1397
                if !c.isWebsocket() && info.TLSRequired {
949✔
1398
                        // If we have a prebuffer create a multi-reader.
76✔
1399
                        if len(pre) > 0 {
76✔
1400
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1401
                        }
×
1402
                        // Perform server-side TLS handshake.
1403
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
125✔
1404
                                c.mu.Unlock()
49✔
1405
                                return nil
49✔
1406
                        }
49✔
1407
                }
1408

1409
                // If the user wants the TLS handshake to occur first, now that it is
1410
                // done, send the INFO protocol.
1411
                if tlsFirst {
827✔
1412
                        c.flags.set(didTLSFirst)
3✔
1413
                        c.sendProtoNow(proto)
3✔
1414
                        if c.isClosed() {
3✔
1415
                                c.mu.Unlock()
×
1416
                                c.closeConnection(WriteError)
×
1417
                                return nil
×
1418
                        }
×
1419
                }
1420

1421
                // Leaf nodes will always require a CONNECT to let us know
1422
                // when we are properly bound to an account.
1423
                //
1424
                // If compression is configured, we can't set the authTimer here because
1425
                // it would cause the parser to fail any incoming protocol that is not a
1426
                // CONNECT (and we need to exchange INFO protocols for compression
1427
                // negotiation). So instead, use the ping timer until we are done with
1428
                // negotiation and can set the auth timer.
1429
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
824✔
1430
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,430✔
1431
                        c.ping.tmr = time.AfterFunc(timeout, func() {
611✔
1432
                                c.authTimeout()
5✔
1433
                        })
5✔
1434
                } else {
218✔
1435
                        c.setAuthTimer(timeout)
218✔
1436
                }
218✔
1437
        }
1438

1439
        // Keep track in case server is shutdown before we can successfully register.
1440
        if !s.addToTempClients(c.cid, c) {
1,599✔
1441
                c.mu.Unlock()
×
1442
                c.setNoReconnect()
×
1443
                c.closeConnection(ServerShutdown)
×
1444
                return nil
×
1445
        }
×
1446

1447
        // Spin up the read loop.
1448
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,198✔
1449

1450
        // We will spin the write loop for solicited connections only
1451
        // when processing the INFO and after switching to TLS if needed.
1452
        if !solicited {
2,423✔
1453
                s.startGoRoutine(func() { c.writeLoop() })
1,648✔
1454
        }
1455

1456
        c.mu.Unlock()
1,599✔
1457

1,599✔
1458
        return c
1,599✔
1459
}
1460

1461
// Will perform the client-side TLS handshake if needed. Assumes that this
1462
// is called by the solicit side (remote will be non nil). Returns `true`
1463
// if TLS is required, `false` otherwise.
1464
// Lock held on entry.
1465
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,931✔
1466
        // Check if TLS is required and gather TLS config variables.
1,931✔
1467
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,931✔
1468
        if !tlsRequired {
3,784✔
1469
                return false, nil
1,853✔
1470
        }
1,853✔
1471

1472
        // If TLS required, peform handshake.
1473
        // Get the URL that was used to connect to the remote server.
1474
        rURL := remote.getCurrentURL()
78✔
1475

78✔
1476
        // Perform the client-side TLS handshake.
78✔
1477
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
115✔
1478
                // Check if we need to reset the remote's TLS name.
37✔
1479
                if resetTLSName {
37✔
1480
                        remote.Lock()
×
1481
                        remote.tlsName = _EMPTY_
×
1482
                        remote.Unlock()
×
1483
                }
×
1484
                return false, err
37✔
1485
        }
1486
        return true, nil
41✔
1487
}
1488

1489
func (c *client) processLeafnodeInfo(info *Info) {
2,681✔
1490
        c.mu.Lock()
2,681✔
1491
        if c.leaf == nil || c.isClosed() {
2,681✔
1492
                c.mu.Unlock()
×
1493
                return
×
1494
        }
×
1495
        s := c.srv
2,681✔
1496
        opts := s.getOpts()
2,681✔
1497
        remote := c.leaf.remote
2,681✔
1498
        didSolicit := remote != nil
2,681✔
1499
        firstINFO := !c.flags.isSet(infoReceived)
2,681✔
1500

2,681✔
1501
        // In case of websocket, the TLS handshake has been already done.
2,681✔
1502
        // So check only for non websocket connections and for configurations
2,681✔
1503
        // where the TLS Handshake was not done first.
2,681✔
1504
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,558✔
1505
                // If the server requires TLS, we need to set this in the remote
1,877✔
1506
                // otherwise if there is no TLS configuration block for the remote,
1,877✔
1507
                // the solicit side will not attempt to perform the TLS handshake.
1,877✔
1508
                if firstINFO && info.TLSRequired {
1,939✔
1509
                        // Check for TLS/proxy configuration mismatch
62✔
1510
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
62✔
1511
                                c.mu.Unlock()
×
1512
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1513
                                c.closeConnection(TLSHandshakeError)
×
1514
                                return
×
1515
                        }
×
1516
                        remote.TLS = true
62✔
1517
                }
1518
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,909✔
1519
                        c.mu.Unlock()
32✔
1520
                        return
32✔
1521
                }
32✔
1522
        }
1523

1524
        // Check for compression, unless already done.
1525
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,957✔
1526
                // Prevent from getting back here.
1,308✔
1527
                c.flags.set(compressionNegotiated)
1,308✔
1528

1,308✔
1529
                var co *CompressionOpts
1,308✔
1530
                if !didSolicit {
1,885✔
1531
                        co = &opts.LeafNode.Compression
577✔
1532
                } else {
1,308✔
1533
                        co = &remote.Compression
731✔
1534
                }
731✔
1535
                if needsCompression(co.Mode) {
2,607✔
1536
                        // Release client lock since following function will need server lock.
1,299✔
1537
                        c.mu.Unlock()
1,299✔
1538
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,299✔
1539
                        if err != nil {
1,299✔
1540
                                c.sendErrAndErr(err.Error())
×
1541
                                c.closeConnection(ProtocolViolation)
×
1542
                                return
×
1543
                        }
×
1544
                        if compress {
2,472✔
1545
                                // Done for now, will get back another INFO protocol...
1,173✔
1546
                                return
1,173✔
1547
                        }
1,173✔
1548
                        // No compression because one side does not want/can't, so proceed.
1549
                        c.mu.Lock()
126✔
1550
                        // Check that the connection did not close if the lock was released.
126✔
1551
                        if c.isClosed() {
126✔
1552
                                c.mu.Unlock()
×
1553
                                return
×
1554
                        }
×
1555
                } else {
9✔
1556
                        // Coming from an old server, the Compression field would be the empty
9✔
1557
                        // string. For servers that are configured with CompressionNotSupported,
9✔
1558
                        // this makes them behave as old servers.
9✔
1559
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
11✔
1560
                                c.leaf.compression = CompressionNotSupported
2✔
1561
                        } else {
9✔
1562
                                c.leaf.compression = CompressionOff
7✔
1563
                        }
7✔
1564
                }
1565
                // Accepting side does not normally process an INFO protocol during
1566
                // initial connection handshake. So we keep it consistent by returning
1567
                // if we are not soliciting.
1568
                if !didSolicit {
136✔
1569
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1570
                        // clear the ping timer and set the auth timer now that the compression
1✔
1571
                        // negotiation is done.
1✔
1572
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1573
                                clearTimer(&c.ping.tmr)
×
1574
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1575
                        }
×
1576
                        c.mu.Unlock()
1✔
1577
                        return
1✔
1578
                }
1579
                // Fall through and process the INFO protocol as usual.
1580
        }
1581

1582
        // Note: For now, only the initial INFO has a nonce. We
1583
        // will probably do auto key rotation at some point.
1584
        if firstINFO {
2,245✔
1585
                // Mark that the INFO protocol has been received.
770✔
1586
                c.flags.set(infoReceived)
770✔
1587
                // Prevent connecting to non leafnode port. Need to do this only for
770✔
1588
                // the first INFO, not for async INFO updates...
770✔
1589
                //
770✔
1590
                // Content of INFO sent by the server when accepting a tcp connection.
770✔
1591
                // -------------------------------------------------------------------
770✔
1592
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
770✔
1593
                // -------------------------------------------------------------------
770✔
1594
                //      CLIENT    |  X* |        X**        |              |         |
770✔
1595
                //      ROUTE     |     |        X**        |      X***    |         |
770✔
1596
                //     GATEWAY    |     |                   |              |    X    |
770✔
1597
                //     LEAFNODE   |  X  |                   |       X      |         |
770✔
1598
                // -------------------------------------------------------------------
770✔
1599
                // *   Not on older servers.
770✔
1600
                // **  Not if "no advertise" is enabled.
770✔
1601
                // *** Not if leafnode's "no advertise" is enabled.
770✔
1602
                //
770✔
1603
                // As seen from above, a solicited LeafNode connection should receive
770✔
1604
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
770✔
1605
                // else should be considered an attempt to connect to a wrong port.
770✔
1606
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
823✔
1607
                        c.mu.Unlock()
53✔
1608
                        c.Errorf(ErrConnectedToWrongPort.Error())
53✔
1609
                        c.closeConnection(WrongPort)
53✔
1610
                        return
53✔
1611
                }
53✔
1612
                // Reject a cluster that contains spaces.
1613
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
718✔
1614
                        c.mu.Unlock()
1✔
1615
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1616
                        c.closeConnection(ProtocolViolation)
1✔
1617
                        return
1✔
1618
                }
1✔
1619
                // Capture a nonce here.
1620
                c.nonce = []byte(info.Nonce)
716✔
1621
                if info.TLSRequired && didSolicit {
746✔
1622
                        remote.TLS = true
30✔
1623
                }
30✔
1624
                supportsHeaders := c.srv.supportsHeaders()
716✔
1625
                c.headers = supportsHeaders && info.Headers
716✔
1626

716✔
1627
                // Remember the remote server.
716✔
1628
                // Pre 2.2.0 servers are not sending their server name.
716✔
1629
                // In that case, use info.ID, which, for those servers, matches
716✔
1630
                // the content of the field `Name` in the leafnode CONNECT protocol.
716✔
1631
                if info.Name == _EMPTY_ {
716✔
1632
                        c.leaf.remoteServer = info.ID
×
1633
                } else {
716✔
1634
                        c.leaf.remoteServer = info.Name
716✔
1635
                }
716✔
1636
                c.leaf.remoteDomain = info.Domain
716✔
1637
                c.leaf.remoteCluster = info.Cluster
716✔
1638
                // We send the protocol version in the INFO protocol.
716✔
1639
                // Keep track of it, so we know if this connection supports message
716✔
1640
                // tracing for instance.
716✔
1641
                c.opts.Protocol = info.Proto
716✔
1642
        }
1643

1644
        // For both initial INFO and async INFO protocols, Possibly
1645
        // update our list of remote leafnode URLs we can connect to.
1646
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,744✔
1647
                // Consider the incoming array as the most up-to-date
1,323✔
1648
                // representation of the remote cluster's list of URLs.
1,323✔
1649
                c.updateLeafNodeURLs(info)
1,323✔
1650
        }
1,323✔
1651

1652
        // Check to see if we have permissions updates here.
1653
        if info.Import != nil || info.Export != nil {
1,437✔
1654
                perms := &Permissions{
16✔
1655
                        Publish:   info.Export,
16✔
1656
                        Subscribe: info.Import,
16✔
1657
                }
16✔
1658
                // Check if we have local deny clauses that we need to merge.
16✔
1659
                if remote := c.leaf.remote; remote != nil {
32✔
1660
                        if len(remote.DenyExports) > 0 {
17✔
1661
                                if perms.Publish == nil {
1✔
1662
                                        perms.Publish = &SubjectPermission{}
×
1663
                                }
×
1664
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1665
                        }
1666
                        if len(remote.DenyImports) > 0 {
17✔
1667
                                if perms.Subscribe == nil {
1✔
1668
                                        perms.Subscribe = &SubjectPermission{}
×
1669
                                }
×
1670
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1671
                        }
1672
                }
1673
                c.setPermissions(perms)
16✔
1674
        }
1675

1676
        var resumeConnect bool
1,421✔
1677

1,421✔
1678
        // If this is a remote connection and this is the first INFO protocol,
1,421✔
1679
        // then we need to finish the connect process by sending CONNECT, etc..
1,421✔
1680
        if firstINFO && didSolicit {
2,095✔
1681
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
674✔
1682
                c.nc.SetDeadline(time.Time{})
674✔
1683
                resumeConnect = true
674✔
1684
        } else if !firstINFO && didSolicit {
2,070✔
1685
                c.leaf.remoteAccName = info.RemoteAccount
649✔
1686
        }
649✔
1687

1688
        // Check if we have the remote account information and if so make sure it's stored.
1689
        if info.RemoteAccount != _EMPTY_ {
2,058✔
1690
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
637✔
1691
        }
637✔
1692
        c.mu.Unlock()
1,421✔
1693

1,421✔
1694
        finishConnect := info.ConnectInfo
1,421✔
1695
        if resumeConnect && s != nil {
2,095✔
1696
                s.leafNodeResumeConnectProcess(c)
674✔
1697
                if !info.InfoOnConnect {
674✔
1698
                        finishConnect = true
×
1699
                }
×
1700
        }
1701
        if finishConnect {
2,058✔
1702
                s.leafNodeFinishConnectProcess(c)
637✔
1703
        }
637✔
1704

1705
        // Check to see if we need to kick any internal source or mirror consumers.
1706
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1707
        // does not have jetstream.
1708
        s.checkInternalSyncConsumers(c.acc)
1,421✔
1709
}
1710

1711
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,299✔
1712
        // Negotiate the appropriate compression mode (or no compression)
1,299✔
1713
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,299✔
1714
        if err != nil {
1,299✔
1715
                return false, err
×
1716
        }
×
1717
        c.mu.Lock()
1,299✔
1718
        // For "auto" mode, set the initial compression mode based on RTT
1,299✔
1719
        if cm == CompressionS2Auto {
2,440✔
1720
                if c.rttStart.IsZero() {
2,282✔
1721
                        c.rtt = computeRTT(c.start)
1,141✔
1722
                }
1,141✔
1723
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,141✔
1724
        }
1725
        // Keep track of the negotiated compression mode.
1726
        c.leaf.compression = cm
1,299✔
1727
        cid := c.cid
1,299✔
1728
        var nonce string
1,299✔
1729
        if !didSolicit {
1,875✔
1730
                nonce = bytesToString(c.nonce)
576✔
1731
        }
576✔
1732
        c.mu.Unlock()
1,299✔
1733

1,299✔
1734
        if !needsCompression(cm) {
1,425✔
1735
                return false, nil
126✔
1736
        }
126✔
1737

1738
        // If we end-up doing compression...
1739

1740
        // Generate an INFO with the chosen compression mode.
1741
        s.mu.Lock()
1,173✔
1742
        info := s.copyLeafNodeInfo()
1,173✔
1743
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,173✔
1744
        infoProto := generateInfoJSON(info)
1,173✔
1745
        s.mu.Unlock()
1,173✔
1746

1,173✔
1747
        // If we solicited, then send this INFO protocol BEFORE switching
1,173✔
1748
        // to compression writer. However, if we did not, we send it after.
1,173✔
1749
        c.mu.Lock()
1,173✔
1750
        if didSolicit {
1,770✔
1751
                c.enqueueProto(infoProto)
597✔
1752
                // Make sure it is completely flushed (the pending bytes goes to
597✔
1753
                // 0) before proceeding.
597✔
1754
                for c.out.pb > 0 && !c.isClosed() {
1,193✔
1755
                        c.flushOutbound()
596✔
1756
                }
596✔
1757
        }
1758
        // This is to notify the readLoop that it should switch to a
1759
        // (de)compression reader.
1760
        c.in.flags.set(switchToCompression)
1,173✔
1761
        // Create the compress writer before queueing the INFO protocol for
1,173✔
1762
        // a route that did not solicit. It will make sure that that proto
1,173✔
1763
        // is sent with compression on.
1,173✔
1764
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,173✔
1765
        if !didSolicit {
1,749✔
1766
                c.enqueueProto(infoProto)
576✔
1767
        }
576✔
1768
        c.mu.Unlock()
1,173✔
1769
        return true, nil
1,173✔
1770
}
1771

1772
// When getting a leaf node INFO protocol, use the provided
1773
// array of urls to update the list of possible endpoints.
1774
func (c *client) updateLeafNodeURLs(info *Info) {
1,323✔
1775
        cfg := c.leaf.remote
1,323✔
1776
        cfg.Lock()
1,323✔
1777
        defer cfg.Unlock()
1,323✔
1778

1,323✔
1779
        // We have ensured that if a remote has a WS scheme, then all are.
1,323✔
1780
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,323✔
1781
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,381✔
1782
                // It does not really matter if we use "ws://" or "wss://" here since
58✔
1783
                // we will have already marked that the remote should use TLS anyway.
58✔
1784
                // But use proper scheme for log statements, etc...
58✔
1785
                proto := wsSchemePrefix
58✔
1786
                if cfg.TLS {
58✔
1787
                        proto = wsSchemePrefixTLS
×
1788
                }
×
1789
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
58✔
1790
                return
58✔
1791
        }
1792
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,265✔
1793
}
1794

1795
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,323✔
1796
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,323✔
1797
        // Add the ones we receive in the protocol
1,323✔
1798
        for _, surl := range URLs {
3,665✔
1799
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,342✔
1800
                if err != nil {
2,342✔
1801
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1802
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1803
                        continue
×
1804
                }
1805
                // Do not add if it's the same as what we already have configured.
1806
                var dup bool
2,342✔
1807
                for _, u := range cfg.URLs {
5,934✔
1808
                        // URLs that we receive never have user info, but the
3,592✔
1809
                        // ones that were configured may have. Simply compare
3,592✔
1810
                        // host and port to decide if they are equal or not.
3,592✔
1811
                        if url.Host == u.Host && url.Port() == u.Port() {
5,311✔
1812
                                dup = true
1,719✔
1813
                                break
1,719✔
1814
                        }
1815
                }
1816
                if !dup {
2,965✔
1817
                        cfg.urls = append(cfg.urls, url)
623✔
1818
                        cfg.saveTLSHostname(url)
623✔
1819
                }
623✔
1820
        }
1821
        // Add the configured one
1822
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,323✔
1823
}
1824

1825
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1826
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,748✔
1827
        opts := s.getOpts()
3,748✔
1828
        if opts.LeafNode.Advertise != _EMPTY_ {
3,759✔
1829
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1830
                if err != nil {
11✔
1831
                        return err
×
1832
                }
×
1833
                s.leafNodeInfo.Host = advHost
11✔
1834
                s.leafNodeInfo.Port = advPort
11✔
1835
        } else {
3,737✔
1836
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,737✔
1837
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,737✔
1838
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,737✔
1839
                // This will return at most 1 IP.
3,737✔
1840
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,737✔
1841
                if err != nil {
3,737✔
1842
                        return err
×
1843
                }
×
1844
                if hostIsIPAny {
4,033✔
1845
                        if len(ips) == 0 {
296✔
1846
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1847
                                        s.leafNodeInfo.Host)
×
1848
                        } else {
296✔
1849
                                // Take the first from the list...
296✔
1850
                                s.leafNodeInfo.Host = ips[0]
296✔
1851
                        }
296✔
1852
                }
1853
        }
1854
        // Use just host:port for the IP
1855
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,748✔
1856
        if opts.LeafNode.Advertise != _EMPTY_ {
3,759✔
1857
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1858
        }
11✔
1859
        return nil
3,748✔
1860
}
1861

1862
// Add the connection to the map of leaf nodes.
1863
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1864
// if a connection already exists for the same server name and account.
1865
// That can happen when the remote is attempting to reconnect while the accepting
1866
// side did not detect the connection as broken yet.
1867
// But it can also happen when there is a misconfiguration and the remote is
1868
// creating two (or more) connections that bind to the same account on the accept
1869
// side.
1870
// When a duplicate is found, the new connection is accepted and the old is closed
1871
// (this solves the stale connection situation). An error is returned to help the
1872
// remote detect the misconfiguration when the duplicate is the result of that
1873
// misconfiguration.
1874
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) bool {
1,311✔
1875
        var accName string
1,311✔
1876
        c.mu.Lock()
1,311✔
1877
        cid := c.cid
1,311✔
1878
        acc := c.acc
1,311✔
1879
        if acc != nil {
2,622✔
1880
                accName = acc.Name
1,311✔
1881
        }
1,311✔
1882
        myRemoteDomain := c.leaf.remoteDomain
1,311✔
1883
        mySrvName := c.leaf.remoteServer
1,311✔
1884
        remoteAccName := c.leaf.remoteAccName
1,311✔
1885
        myClustName := c.leaf.remoteCluster
1,311✔
1886
        remote := c.leaf.remote
1,311✔
1887
        solicited := remote != nil
1,311✔
1888
        c.mu.Unlock()
1,311✔
1889

1,311✔
1890
        var old *client
1,311✔
1891
        s.mu.Lock()
1,311✔
1892
        // We check for empty because in some test we may send empty CONNECT{}
1,311✔
1893
        if checkForDup && srvName != _EMPTY_ {
1,948✔
1894
                for _, ol := range s.leafs {
1,018✔
1895
                        ol.mu.Lock()
381✔
1896
                        // We care here only about non solicited Leafnode. This function
381✔
1897
                        // is more about replacing stale connections than detecting loops.
381✔
1898
                        // We have code for the loop detection elsewhere, which also delays
381✔
1899
                        // attempt to reconnect.
381✔
1900
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
381✔
1901
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
381✔
1902
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
383✔
1903
                                old = ol
2✔
1904
                        }
2✔
1905
                        ol.mu.Unlock()
381✔
1906
                        if old != nil {
383✔
1907
                                break
2✔
1908
                        }
1909
                }
1910
        }
1911
        // Now that we are under the server lock and before adding it to the map,
1912
        // for a solicited leaf, we need to make sure that it has not been removed
1913
        // from the config or disabled.
1914
        if solicited {
1,946✔
1915
                // If no longer valid, do not add to the server map. The connection
635✔
1916
                // should have been marked so that it can't reconnect. When the caller
635✔
1917
                // calls closeConnection(), cleanup (including clearing the connect-
635✔
1918
                // in-progress flag) will occur at the appropriate time.
635✔
1919
                if !remote.stillValid() {
635✔
1920
                        // Prevent reconnect in case it was not yet done.
×
1921
                        c.setNoReconnect()
×
1922
                        s.mu.Unlock()
×
1923
                        s.removeFromTempClients(cid)
×
1924
                        return false
×
1925
                }
×
1926
                remote.setConnectInProgress(false)
635✔
1927
        }
1928
        // Store new connection in the map
1929
        s.leafs[cid] = c
1,311✔
1930
        s.mu.Unlock()
1,311✔
1931
        s.removeFromTempClients(cid)
1,311✔
1932

1,311✔
1933
        // If applicable, evict the old one.
1,311✔
1934
        if old != nil {
1,313✔
1935
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1936
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1937
                c.Warnf("Replacing connection from same server")
2✔
1938
        }
2✔
1939

1940
        srvDecorated := func() string {
1,524✔
1941
                if myClustName == _EMPTY_ {
240✔
1942
                        return mySrvName
27✔
1943
                }
27✔
1944
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
186✔
1945
        }
1946

1947
        opts := s.getOpts()
1,311✔
1948
        sysAcc := s.SystemAccount()
1,311✔
1949
        js := s.getJetStream()
1,311✔
1950
        var meta *raft
1,311✔
1951
        if js != nil {
1,855✔
1952
                if mg := js.getMetaGroup(); mg != nil {
971✔
1953
                        meta = mg.(*raft)
427✔
1954
                }
427✔
1955
        }
1956
        blockMappingOutgoing := false
1,311✔
1957
        // Deny (non domain) JetStream API traffic unless system account is shared
1,311✔
1958
        // and domain names are identical and extending is not disabled
1,311✔
1959

1,311✔
1960
        // Check if backwards compatibility has been enabled and needs to be acted on
1,311✔
1961
        forceSysAccDeny := false
1,311✔
1962
        if len(opts.JsAccDefaultDomain) > 0 {
1,348✔
1963
                if acc == sysAcc {
48✔
1964
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1965
                                if d == _EMPTY_ {
19✔
1966
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1967
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1968
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1969
                                        forceSysAccDeny = true
8✔
1970
                                        break
8✔
1971
                                }
1972
                        }
1973
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1974
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1975
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1976
                        return true
15✔
1977
                }
15✔
1978
        }
1979

1980
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1981
        // This is either signaled by js being disabled and a domain set,
1982
        // or in cases where no domain name exists, an extension hint is set.
1983
        // However, this is only relevant in mixed setups.
1984
        //
1985
        // If the system account connects but default domains are present, JetStream can't be extended.
1986
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,296✔
1987
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,434✔
1988
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,138✔
1989
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,138✔
1990
                if acc != nil && acc == sysAcc {
1,279✔
1991
                        c.Noticef("System account connected from %s", srvDecorated())
141✔
1992
                        c.Noticef("JetStream not extended, domains differ")
141✔
1993
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
141✔
1994
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
141✔
1995
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
141✔
1996
                        if solicited && meta != nil && meta.IsObserver() {
171✔
1997
                                meta.setObserver(false, extNotExtended)
30✔
1998
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
30✔
1999
                                // Take note that the domain was not extended to avoid this state from startup.
30✔
2000
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
30✔
2001
                                // Meta controller can't be leader yet.
30✔
2002
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
30✔
2003
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
30✔
2004
                                meta.Campaign()
30✔
2005
                        }
30✔
2006
                } else {
997✔
2007
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
997✔
2008
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
997✔
2009
                }
997✔
2010
                blockMappingOutgoing = true
1,138✔
2011
        } else if acc == sysAcc {
230✔
2012
                // system account and same domain
72✔
2013
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
2014
                        myRemoteDomain, srvDecorated())
72✔
2015
                // In an extension use case, pin leadership to server remotes connect to.
72✔
2016
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
2017
                if solicited && meta != nil && !meta.IsObserver() {
76✔
2018
                        meta.setObserver(true, extExtended)
4✔
2019
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
2020
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
2021
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
2022
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
2023
                        meta.StepDown()
4✔
2024
                }
4✔
2025
        } else {
86✔
2026
                // This deny is needed in all cases (system account shared or not)
86✔
2027
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
2028
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
2029
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
2030
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
2031
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
2032
        }
86✔
2033
        // If we have a specified JetStream domain we will want to add a mapping to
2034
        // allow access cross domain for each non-system account.
2035
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,549✔
2036
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,530✔
2037
                        if err := acc.AddMapping(src, dest); err != nil {
2,277✔
2038
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
2039
                        } else {
2,277✔
2040
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,277✔
2041
                        }
2,277✔
2042
                }
2043
                if blockMappingOutgoing {
475✔
2044
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
222✔
2045
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
222✔
2046
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
222✔
2047
                        // of this issue, not all of them.
222✔
2048
                        // This guards against a hub and a spoke having the same domain name.
222✔
2049
                        // But not two spokes having the same one and the request coming from the hub.
222✔
2050
                        c.mergeDenyPermissionsLocked(pub, []string{src})
222✔
2051
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
222✔
2052
                }
222✔
2053
        }
2054
        return true
1,296✔
2055
}
2056

2057
func (s *Server) removeLeafNodeConnection(c *client) {
1,672✔
2058
        s.mu.Lock()
1,672✔
2059
        c.mu.Lock()
1,672✔
2060
        cid := c.cid
1,672✔
2061
        if c.leaf != nil {
3,344✔
2062
                if c.leaf.tsubt != nil {
2,862✔
2063
                        c.leaf.tsubt.Stop()
1,190✔
2064
                        c.leaf.tsubt = nil
1,190✔
2065
                }
1,190✔
2066
                if c.leaf.gwSub != nil {
2,307✔
2067
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
635✔
2068
                        // We need to set this to nil for GC to release the connection
635✔
2069
                        c.leaf.gwSub = nil
635✔
2070
                }
635✔
2071
                if remote := c.leaf.remote; remote != nil {
2,471✔
2072
                        // If "noReconnect" is true, then we won't attempt to reconnect, so
799✔
2073
                        // we will clear the "connect-in-progress" flag. However, if we can
799✔
2074
                        // reconnect, then we should set "connect-in-progress" to true while
799✔
2075
                        // we are under the server/client lock. The go routine that performs
799✔
2076
                        // the reconnect will be started later and there would be a gap with
799✔
2077
                        // the wrong flag value otherwise.
799✔
2078
                        remote.setConnectInProgress(!c.flags.isSet(noReconnect))
799✔
2079
                }
799✔
2080
        }
2081
        proxyKey := c.proxyKey
1,672✔
2082
        c.mu.Unlock()
1,672✔
2083
        delete(s.leafs, cid)
1,672✔
2084
        if proxyKey != _EMPTY_ {
1,676✔
2085
                s.removeProxiedConn(proxyKey, cid)
4✔
2086
        }
4✔
2087
        s.mu.Unlock()
1,672✔
2088
        s.removeFromTempClients(cid)
1,672✔
2089
}
2090

2091
// Connect information for solicited leafnodes.
2092
type leafConnectInfo struct {
2093
        Version   string   `json:"version,omitempty"`
2094
        Nkey      string   `json:"nkey,omitempty"`
2095
        JWT       string   `json:"jwt,omitempty"`
2096
        Sig       string   `json:"sig,omitempty"`
2097
        User      string   `json:"user,omitempty"`
2098
        Pass      string   `json:"pass,omitempty"`
2099
        Token     string   `json:"auth_token,omitempty"`
2100
        ID        string   `json:"server_id,omitempty"`
2101
        Domain    string   `json:"domain,omitempty"`
2102
        Name      string   `json:"name,omitempty"`
2103
        Hub       bool     `json:"is_hub,omitempty"`
2104
        Cluster   string   `json:"cluster,omitempty"`
2105
        Headers   bool     `json:"headers,omitempty"`
2106
        JetStream bool     `json:"jetstream,omitempty"`
2107
        DenyPub   []string `json:"deny_pub,omitempty"`
2108
        Isolate   bool     `json:"isolate,omitempty"`
2109

2110
        // There was an existing field called:
2111
        // >> Comp bool `json:"compression,omitempty"`
2112
        // that has never been used. With support for compression, we now need
2113
        // a field that is a string. So we use a different json tag:
2114
        Compression string `json:"compress_mode,omitempty"`
2115

2116
        // Just used to detect wrong connection attempts.
2117
        Gateway string `json:"gateway,omitempty"`
2118

2119
        // Tells the accept side which account the remote is binding to.
2120
        RemoteAccount string `json:"remote_account,omitempty"`
2121

2122
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2123
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2124
        // version as part of the CONNECT. It will indicate if a connection supports
2125
        // some features, such as message tracing.
2126
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2127
        // in the low level process CONNECT.
2128
        Proto int `json:"protocol,omitempty"`
2129
}
2130

2131
// processLeafNodeConnect will process the inbound connect args.
2132
// Once we are here we are bound to an account, so can send any interest that
2133
// we would have to the other side.
2134
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
681✔
2135
        // Way to detect clients that incorrectly connect to the route listen
681✔
2136
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
681✔
2137
        if lang != _EMPTY_ {
681✔
2138
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2139
                c.closeConnection(WrongPort)
×
2140
                return ErrClientConnectedToLeafNodePort
×
2141
        }
×
2142

2143
        // Unmarshal as a leaf node connect protocol
2144
        proto := &leafConnectInfo{}
681✔
2145
        if err := json.Unmarshal(arg, proto); err != nil {
681✔
2146
                return err
×
2147
        }
×
2148

2149
        // Reject a cluster that contains spaces.
2150
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
682✔
2151
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2152
                c.closeConnection(ProtocolViolation)
1✔
2153
                return ErrClusterNameHasSpaces
1✔
2154
        }
1✔
2155

2156
        // Check for cluster name collisions.
2157
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
683✔
2158
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2159
                c.closeConnection(ClusterNamesIdentical)
3✔
2160
                return ErrLeafNodeHasSameClusterName
3✔
2161
        }
3✔
2162

2163
        // Reject if this has Gateway which means that it would be from a gateway
2164
        // connection that incorrectly connects to the leafnode port.
2165
        if proto.Gateway != _EMPTY_ {
677✔
2166
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2167
                c.Errorf(errTxt)
×
2168
                c.sendErr(errTxt)
×
2169
                c.closeConnection(WrongGateway)
×
2170
                return ErrWrongGateway
×
2171
        }
×
2172

2173
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
679✔
2174
                major, minor, update, _ := versionComponents(mv)
2✔
2175
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2176
                        // Send back an INFO so recent remote servers process the rejection
1✔
2177
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2178
                        // reconnect delay when it processes the error.
1✔
2179
                        s.sendPermsAndAccountInfo(c)
1✔
2180
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2181
                        c.closeConnection(MinimumVersionRequired)
1✔
2182
                        return ErrMinimumVersionRequired
1✔
2183
                }
1✔
2184
        }
2185

2186
        // Check if this server supports headers.
2187
        supportHeaders := c.srv.supportsHeaders()
676✔
2188

676✔
2189
        c.mu.Lock()
676✔
2190
        // Leaf Nodes do not do echo or verbose or pedantic.
676✔
2191
        c.opts.Verbose = false
676✔
2192
        c.opts.Echo = false
676✔
2193
        c.opts.Pedantic = false
676✔
2194
        // This inbound connection will be marked as supporting headers if this server
676✔
2195
        // support headers and the remote has sent in the CONNECT protocol that it does
676✔
2196
        // support headers too.
676✔
2197
        c.headers = supportHeaders && proto.Headers
676✔
2198
        // If the compression level is still not set, set it based on what has been
676✔
2199
        // given to us in the CONNECT protocol.
676✔
2200
        if c.leaf.compression == _EMPTY_ {
805✔
2201
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
129✔
2202
                if proto.Compression == _EMPTY_ {
168✔
2203
                        c.leaf.compression = CompressionNotSupported
39✔
2204
                } else {
129✔
2205
                        c.leaf.compression = proto.Compression
90✔
2206
                }
90✔
2207
        }
2208

2209
        // Remember the remote server.
2210
        c.leaf.remoteServer = proto.Name
676✔
2211
        // Remember the remote account name
676✔
2212
        c.leaf.remoteAccName = proto.RemoteAccount
676✔
2213
        // Remember if the leafnode requested isolation.
676✔
2214
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
676✔
2215

676✔
2216
        // If the other side has declared itself a hub, so we will take on the spoke role.
676✔
2217
        if proto.Hub {
692✔
2218
                c.leaf.isSpoke = true
16✔
2219
        }
16✔
2220

2221
        // The soliciting side is part of a cluster.
2222
        if proto.Cluster != _EMPTY_ {
1,194✔
2223
                c.leaf.remoteCluster = proto.Cluster
518✔
2224
        }
518✔
2225

2226
        c.leaf.remoteDomain = proto.Domain
676✔
2227

676✔
2228
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
676✔
2229
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
676✔
2230
        if !c.isSolicitedLeafNode() && c.perms != nil {
693✔
2231
                sp, pp := c.perms.sub, c.perms.pub
17✔
2232
                c.perms.sub, c.perms.pub = pp, sp
17✔
2233
                if c.opts.Import != nil {
33✔
2234
                        c.darray = c.opts.Import.Deny
16✔
2235
                } else {
17✔
2236
                        c.darray = nil
1✔
2237
                }
1✔
2238
        }
2239

2240
        // Set the Ping timer
2241
        c.setFirstPingTimer()
676✔
2242

676✔
2243
        // If we received pub deny permissions from the other end, merge with existing ones.
676✔
2244
        c.mergeDenyPermissions(pub, proto.DenyPub)
676✔
2245

676✔
2246
        acc := c.acc
676✔
2247
        c.mu.Unlock()
676✔
2248

676✔
2249
        // If the account is not set (e.g. connection was closed due to auth
676✔
2250
        // timeout while still being processed), bail out to avoid a panic.
676✔
2251
        if acc == nil {
676✔
2252
                c.closeConnection(MissingAccount)
×
2253
                return ErrMissingAccount
×
2254
        }
×
2255

2256
        // Register the cluster, even if empty, as long as we are acting as a hub.
2257
        if !proto.Hub {
1,336✔
2258
                acc.registerLeafNodeCluster(proto.Cluster)
660✔
2259
        }
660✔
2260

2261
        // Add in the leafnode here since we passed through auth at this point.
2262
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
676✔
2263

676✔
2264
        // If we have permissions bound to this leafnode we need to send then back to the
676✔
2265
        // origin server for local enforcement.
676✔
2266
        s.sendPermsAndAccountInfo(c)
676✔
2267

676✔
2268
        // Create and initialize the smap since we know our bound account now.
676✔
2269
        // This will send all registered subs too.
676✔
2270
        s.initLeafNodeSmapAndSendSubs(c)
676✔
2271

676✔
2272
        // Announce the account connect event for a leaf node.
676✔
2273
        // This will be a no-op as needed.
676✔
2274
        s.sendLeafNodeConnect(c.acc)
676✔
2275

676✔
2276
        // Check to see if we need to kick any internal source or mirror consumers.
676✔
2277
        // This will be a no-op if JetStream not enabled for this server or if the bound account
676✔
2278
        // does not have jetstream.
676✔
2279
        s.checkInternalSyncConsumers(acc)
676✔
2280

676✔
2281
        return nil
676✔
2282
}
2283

2284
// checkInternalSyncConsumers
2285
func (s *Server) checkInternalSyncConsumers(acc *Account) {
2,097✔
2286
        // Grab our js
2,097✔
2287
        js := s.getJetStream()
2,097✔
2288

2,097✔
2289
        // Only applicable if we have JS and the leafnode has JS as well.
2,097✔
2290
        // We check for remote JS outside.
2,097✔
2291
        if !js.isEnabled() || acc == nil {
3,283✔
2292
                return
1,186✔
2293
        }
1,186✔
2294

2295
        // We will check all streams in our local account. They must be a leader and
2296
        // be sourcing or mirroring. We will check the external config on the stream itself
2297
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2298
        // extending the system across this leafnode connection and hence we would be extending
2299
        // our own domain.
2300
        jsa := js.lookupAccount(acc)
911✔
2301
        if jsa == nil {
1,262✔
2302
                return
351✔
2303
        }
351✔
2304

2305
        var streams []*stream
560✔
2306
        jsa.mu.RLock()
560✔
2307
        for _, mset := range jsa.streams {
616✔
2308
                mset.cfgMu.RLock()
56✔
2309
                // We need to have a mirror or source defined.
56✔
2310
                // We do not want to force another lock here to look for leader status,
56✔
2311
                // so collect and after we release jsa will make sure.
56✔
2312
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
68✔
2313
                        streams = append(streams, mset)
12✔
2314
                }
12✔
2315
                mset.cfgMu.RUnlock()
56✔
2316
        }
2317
        jsa.mu.RUnlock()
560✔
2318

560✔
2319
        // Now loop through all candidates and check if we are the leader and have NOT
560✔
2320
        // created the sync up consumer.
560✔
2321
        for _, mset := range streams {
572✔
2322
                mset.retryDisconnectedSyncConsumers()
12✔
2323
        }
12✔
2324
}
2325

2326
// Returns the remote cluster name. This is set only once so does not require a lock.
2327
func (c *client) remoteCluster() string {
172,664✔
2328
        if c.leaf == nil {
172,664✔
2329
                return _EMPTY_
×
2330
        }
×
2331
        return c.leaf.remoteCluster
172,664✔
2332
}
2333

2334
// Sends back an info block to the soliciting leafnode to let it know about
2335
// its permission settings for local enforcement.
2336
func (s *Server) sendPermsAndAccountInfo(c *client) {
677✔
2337
        // Copy
677✔
2338
        s.mu.Lock()
677✔
2339
        info := s.copyLeafNodeInfo()
677✔
2340
        s.mu.Unlock()
677✔
2341
        c.mu.Lock()
677✔
2342
        info.CID = c.cid
677✔
2343
        info.Import = c.opts.Import
677✔
2344
        info.Export = c.opts.Export
677✔
2345
        info.RemoteAccount = c.acc.Name
677✔
2346
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
677✔
2347
        info.IsSystemAccount = c.acc == s.SystemAccount()
677✔
2348
        info.ConnectInfo = true
677✔
2349
        c.enqueueProto(generateInfoJSON(info))
677✔
2350
        c.mu.Unlock()
677✔
2351
}
677✔
2352

2353
// Snapshot the current subscriptions from the sublist into our smap which
2354
// we will keep updated from now on.
2355
// Also send the registered subscriptions.
2356
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,311✔
2357
        acc := c.acc
1,311✔
2358
        if acc == nil {
1,311✔
2359
                c.Debugf("Leafnode does not have an account bound")
×
2360
                return
×
2361
        }
×
2362
        // Collect all account subs here.
2363
        _subs := [1024]*subscription{}
1,311✔
2364
        subs := _subs[:0]
1,311✔
2365
        ims := []string{}
1,311✔
2366

1,311✔
2367
        // Hold the client lock otherwise there can be a race and miss some subs.
1,311✔
2368
        c.mu.Lock()
1,311✔
2369
        defer c.mu.Unlock()
1,311✔
2370

1,311✔
2371
        acc.mu.RLock()
1,311✔
2372
        accName := acc.Name
1,311✔
2373
        accNTag := acc.nameTag
1,311✔
2374

1,311✔
2375
        // To make printing look better when no friendly name present.
1,311✔
2376
        if accNTag != _EMPTY_ {
1,323✔
2377
                accNTag = "/" + accNTag
12✔
2378
        }
12✔
2379

2380
        // If we are solicited we only send interest for local clients.
2381
        if c.isSpokeLeafNode() {
1,946✔
2382
                acc.sl.localSubs(&subs, true)
635✔
2383
        } else {
1,311✔
2384
                acc.sl.All(&subs)
676✔
2385
        }
676✔
2386

2387
        // Check if we have an existing service import reply.
2388
        siReply := copyBytes(acc.siReply)
1,311✔
2389

1,311✔
2390
        // Since leaf nodes only send on interest, if the bound
1,311✔
2391
        // account has import services we need to send those over.
1,311✔
2392
        for isubj := range acc.imports.services {
6,193✔
2393
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,174✔
2394
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
292✔
2395
                        continue
292✔
2396
                }
2397
                ims = append(ims, isubj)
4,590✔
2398
        }
2399
        // Likewise for mappings.
2400
        for _, m := range acc.mappings {
3,708✔
2401
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,443✔
2402
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
46✔
2403
                        continue
46✔
2404
                }
2405
                ims = append(ims, m.src)
2,351✔
2406
        }
2407

2408
        // Create a unique subject that will be used for loop detection.
2409
        lds := acc.lds
1,311✔
2410
        acc.mu.RUnlock()
1,311✔
2411

1,311✔
2412
        // Check if we have to create the LDS.
1,311✔
2413
        if lds == _EMPTY_ {
2,336✔
2414
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
1,025✔
2415
                acc.mu.Lock()
1,025✔
2416
                acc.lds = lds
1,025✔
2417
                acc.mu.Unlock()
1,025✔
2418
        }
1,025✔
2419

2420
        // Now check for gateway interest. Leafnodes will put this into
2421
        // the proper mode to propagate, but they are not held in the account.
2422
        gwsa := [16]*client{}
1,311✔
2423
        gws := gwsa[:0]
1,311✔
2424
        s.getOutboundGatewayConnections(&gws)
1,311✔
2425
        for _, cgw := range gws {
1,392✔
2426
                cgw.mu.Lock()
81✔
2427
                gw := cgw.gw
81✔
2428
                cgw.mu.Unlock()
81✔
2429
                if gw != nil {
162✔
2430
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
162✔
2431
                                if e := ei.(*outsie); e != nil && e.sl != nil {
162✔
2432
                                        e.sl.All(&subs)
81✔
2433
                                }
81✔
2434
                        }
2435
                }
2436
        }
2437

2438
        applyGlobalRouting := s.gateway.enabled
1,311✔
2439
        if c.isSpokeLeafNode() {
1,946✔
2440
                // Add a fake subscription for this solicited leafnode connection
635✔
2441
                // so that we can send back directly for mapped GW replies.
635✔
2442
                // We need to keep track of this subscription so it can be removed
635✔
2443
                // when the connection is closed so that the GC can release it.
635✔
2444
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
635✔
2445
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
635✔
2446
        }
635✔
2447

2448
        // Now walk the results and add them to our smap
2449
        rc := c.leaf.remoteCluster
1,311✔
2450
        c.leaf.smap = make(map[string]int32)
1,311✔
2451
        for _, sub := range subs {
39,732✔
2452
                // Check perms regardless of role.
38,421✔
2453
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
40,793✔
2454
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,372✔
2455
                        continue
2,372✔
2456
                }
2457
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2458
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
36,060✔
2459
                        continue
11✔
2460
                }
2461
                // We ignore ourselves here.
2462
                // Also don't add the subscription if it has a origin cluster and the
2463
                // cluster name matches the one of the client we are sending to.
2464
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
66,710✔
2465
                        count := int32(1)
30,672✔
2466
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,682✔
2467
                                count = sub.qw
10✔
2468
                        }
10✔
2469
                        c.leaf.smap[keyFromSub(sub)] += count
30,672✔
2470
                        if c.leaf.tsub == nil {
31,906✔
2471
                                c.leaf.tsub = make(map[*subscription]struct{})
1,234✔
2472
                        }
1,234✔
2473
                        c.leaf.tsub[sub] = struct{}{}
30,672✔
2474
                }
2475
        }
2476
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2477
        for _, isubj := range ims {
8,252✔
2478
                c.leaf.smap[isubj]++
6,941✔
2479
        }
6,941✔
2480
        // If we have gateways enabled we need to make sure the other side sends us responses
2481
        // that have been augmented from the original subscription.
2482
        // TODO(dlc) - Should we lock this down more?
2483
        if applyGlobalRouting {
1,412✔
2484
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
101✔
2485
                c.leaf.smap[gwReplyPrefix+">"]++
101✔
2486
        }
101✔
2487
        // Detect loops by subscribing to a specific subject and checking
2488
        // if this sub is coming back to us.
2489
        c.leaf.smap[lds]++
1,311✔
2490

1,311✔
2491
        // Check if we need to add an existing siReply to our map.
1,311✔
2492
        // This will be a prefix so add on the wildcard.
1,311✔
2493
        if siReply != nil {
1,329✔
2494
                wcsub := append(siReply, '>')
18✔
2495
                c.leaf.smap[string(wcsub)]++
18✔
2496
        }
18✔
2497
        // Queue all protocols. There is no max pending limit for LN connection,
2498
        // so we don't need chunking. The writes will happen from the writeLoop.
2499
        var b bytes.Buffer
1,311✔
2500
        for key, n := range c.leaf.smap {
28,281✔
2501
                c.writeLeafSub(&b, key, n)
26,970✔
2502
        }
26,970✔
2503
        if b.Len() > 0 {
2,622✔
2504
                c.enqueueProto(b.Bytes())
1,311✔
2505
        }
1,311✔
2506
        if c.leaf.tsub != nil {
2,546✔
2507
                // Clear the tsub map after 5 seconds.
1,235✔
2508
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,280✔
2509
                        c.mu.Lock()
45✔
2510
                        if c.leaf != nil {
90✔
2511
                                c.leaf.tsub = nil
45✔
2512
                                c.leaf.tsubt = nil
45✔
2513
                        }
45✔
2514
                        c.mu.Unlock()
45✔
2515
                })
2516
        }
2517
}
2518

2519
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2520
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
201,969✔
2521
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
201,969✔
2522
        acc, err := s.lookupOrFetchAccount(accName, false)
201,969✔
2523
        if acc == nil || err != nil {
202,202✔
2524
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
233✔
2525
                return
233✔
2526
        }
233✔
2527
        acc.updateLeafNodes(sub, delta)
201,736✔
2528
}
2529

2530
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2531
// Will also forward to all leaf nodes as needed.
2532
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2533
// (that is, for which this server acts as a hub to them).
2534
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,488,030✔
2535
        if acc == nil || sub == nil {
2,488,030✔
2536
                return
×
2537
        }
×
2538

2539
        // We will do checks for no leafnodes and same cluster here inline and under the
2540
        // general account read lock.
2541
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2542
        // blocking routes or GWs.
2543

2544
        acc.mu.RLock()
2,488,030✔
2545
        // First check if we even have leafnodes here.
2,488,030✔
2546
        if acc.nleafs == 0 {
4,906,342✔
2547
                acc.mu.RUnlock()
2,418,312✔
2548
                return
2,418,312✔
2549
        }
2,418,312✔
2550

2551
        // Is this a loop detection subject.
2552
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
69,718✔
2553

69,718✔
2554
        // Capture the cluster even if its empty.
69,718✔
2555
        var cluster string
69,718✔
2556
        if sub.origin != nil {
119,315✔
2557
                cluster = bytesToString(sub.origin)
49,597✔
2558
        }
49,597✔
2559

2560
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2561
        // Empty clusters will return false for the check.
2562
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
90,542✔
2563
                acc.mu.RUnlock()
20,824✔
2564
                return
20,824✔
2565
        }
20,824✔
2566

2567
        // We can release the general account lock.
2568
        acc.mu.RUnlock()
48,894✔
2569

48,894✔
2570
        // We can hold the list lock here to avoid having to copy a large slice.
48,894✔
2571
        acc.lmu.RLock()
48,894✔
2572
        defer acc.lmu.RUnlock()
48,894✔
2573

48,894✔
2574
        // Do this once.
48,894✔
2575
        subject := string(sub.subject)
48,894✔
2576

48,894✔
2577
        // Walk the connected leafnodes.
48,894✔
2578
        for _, ln := range acc.lleafs {
109,539✔
2579
                if ln == sub.client {
91,915✔
2580
                        continue
31,270✔
2581
                }
2582
                ln.mu.Lock()
29,375✔
2583
                // Don't advertise interest from leafnodes to other isolated leafnodes.
29,375✔
2584
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
29,411✔
2585
                        ln.mu.Unlock()
36✔
2586
                        continue
36✔
2587
                }
2588
                // If `hubOnly` is true, it means that we want to update only leafnodes
2589
                // that connect to this server (so isHubLeafNode() would return `true`).
2590
                if hubOnly && !ln.isHubLeafNode() {
29,345✔
2591
                        ln.mu.Unlock()
6✔
2592
                        continue
6✔
2593
                }
2594
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2595
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2596
                // the detection of loops as long as different cluster.
2597
                clusterDifferent := cluster != ln.remoteCluster()
29,333✔
2598
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
53,971✔
2599
                        ln.updateSmap(sub, delta, isLDS)
24,638✔
2600
                }
24,638✔
2601
                ln.mu.Unlock()
29,333✔
2602
        }
2603
}
2604

2605
// updateLeafNodes will make sure to update the account smap for the subscription.
2606
// Will also forward to all leaf nodes as needed.
2607
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,488,007✔
2608
        acc.updateLeafNodesEx(sub, delta, false)
2,488,007✔
2609
}
2,488,007✔
2610

2611
// This will make an update to our internal smap and determine if we should send out
2612
// an interest update to the remote side.
2613
// Lock should be held.
2614
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
24,638✔
2615
        if c.leaf.smap == nil {
24,662✔
2616
                return
24✔
2617
        }
24✔
2618

2619
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2620
        skind := sub.client.kind
24,614✔
2621
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
24,614✔
2622
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
32,969✔
2623
                return
8,355✔
2624
        }
8,355✔
2625

2626
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2627
        if delta > 0 && c.leaf.tsub != nil {
24,099✔
2628
                if _, present := c.leaf.tsub[sub]; present {
7,843✔
2629
                        delete(c.leaf.tsub, sub)
3✔
2630
                        if len(c.leaf.tsub) == 0 {
3✔
2631
                                c.leaf.tsub = nil
×
2632
                                c.leaf.tsubt.Stop()
×
2633
                                c.leaf.tsubt = nil
×
2634
                        }
×
2635
                        return
3✔
2636
                }
2637
        }
2638

2639
        key := keyFromSub(sub)
16,256✔
2640
        n, ok := c.leaf.smap[key]
16,256✔
2641
        if delta < 0 && !ok {
17,355✔
2642
                return
1,099✔
2643
        }
1,099✔
2644

2645
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2646
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
15,157✔
2647
        n += delta
15,157✔
2648
        if n > 0 {
26,544✔
2649
                c.leaf.smap[key] = n
11,387✔
2650
        } else {
15,157✔
2651
                delete(c.leaf.smap, key)
3,770✔
2652
        }
3,770✔
2653
        if update {
25,237✔
2654
                c.sendLeafNodeSubUpdate(key, n)
10,080✔
2655
        }
10,080✔
2656
}
2657

2658
// Used to force add subjects to the subject map.
2659
func (c *client) forceAddToSmap(subj string) {
13✔
2660
        c.mu.Lock()
13✔
2661
        defer c.mu.Unlock()
13✔
2662

13✔
2663
        if c.leaf.smap == nil {
13✔
2664
                return
×
2665
        }
×
2666
        n := c.leaf.smap[subj]
13✔
2667
        if n != 0 {
14✔
2668
                return
1✔
2669
        }
1✔
2670
        // Place into the map since it was not there.
2671
        c.leaf.smap[subj] = 1
12✔
2672
        c.sendLeafNodeSubUpdate(subj, 1)
12✔
2673
}
2674

2675
// Used to force remove a subject from the subject map.
2676
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2677
        c.mu.Lock()
1✔
2678
        defer c.mu.Unlock()
1✔
2679

1✔
2680
        if c.leaf.smap == nil {
1✔
2681
                return
×
2682
        }
×
2683
        n := c.leaf.smap[subj]
1✔
2684
        if n == 0 {
1✔
2685
                return
×
2686
        }
×
2687
        n--
1✔
2688
        if n == 0 {
2✔
2689
                // Remove is now zero
1✔
2690
                delete(c.leaf.smap, subj)
1✔
2691
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2692
        } else {
1✔
2693
                c.leaf.smap[subj] = n
×
2694
        }
×
2695
}
2696

2697
// Send the subscription interest change to the other side.
2698
// Lock should be held.
2699
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
10,093✔
2700
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
10,093✔
2701
        if c.isSpokeLeafNode() {
12,606✔
2702
                checkPerms := true
2,513✔
2703
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
4,076✔
2704
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,563✔
2705
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,563✔
2706
                                strings.HasPrefix(key, gwReplyPrefix) {
1,646✔
2707
                                checkPerms = false
83✔
2708
                        }
83✔
2709
                }
2710
                if checkPerms {
4,943✔
2711
                        var subject string
2,430✔
2712
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,915✔
2713
                                subject = key[:sep]
485✔
2714
                        } else {
2,430✔
2715
                                subject = key
1,945✔
2716
                        }
1,945✔
2717
                        if !c.canSubscribe(subject) {
2,439✔
2718
                                return
9✔
2719
                        }
9✔
2720
                }
2721
        }
2722
        // If we are here we can send over to the other side.
2723
        _b := [64]byte{}
10,084✔
2724
        b := bytes.NewBuffer(_b[:0])
10,084✔
2725
        c.writeLeafSub(b, key, n)
10,084✔
2726
        c.enqueueProto(b.Bytes())
10,084✔
2727
}
2728

2729
// Helper function to build the key.
2730
func keyFromSub(sub *subscription) string {
47,838✔
2731
        var sb strings.Builder
47,838✔
2732
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
47,838✔
2733
        sb.Write(sub.subject)
47,838✔
2734
        if sub.queue != nil {
51,538✔
2735
                // Just make the key subject spc group, e.g. 'foo bar'
3,700✔
2736
                sb.WriteByte(' ')
3,700✔
2737
                sb.Write(sub.queue)
3,700✔
2738
        }
3,700✔
2739
        return sb.String()
47,838✔
2740
}
2741

2742
const (
2743
        keyRoutedSub         = "R"
2744
        keyRoutedSubByte     = 'R'
2745
        keyRoutedLeafSub     = "L"
2746
        keyRoutedLeafSubByte = 'L'
2747
)
2748

2749
// Helper function to build the key that prevents collisions between normal
2750
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2751
// Keys will look like this:
2752
// "R foo"          -> plain routed sub on "foo"
2753
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2754
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2755
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2756
func keyFromSubWithOrigin(sub *subscription) string {
698,043✔
2757
        var sb strings.Builder
698,043✔
2758
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
698,043✔
2759
        leaf := len(sub.origin) > 0
698,043✔
2760
        if leaf {
714,714✔
2761
                sb.WriteByte(keyRoutedLeafSubByte)
16,671✔
2762
        } else {
698,043✔
2763
                sb.WriteByte(keyRoutedSubByte)
681,372✔
2764
        }
681,372✔
2765
        sb.WriteByte(' ')
698,043✔
2766
        sb.Write(sub.subject)
698,043✔
2767
        if sub.queue != nil {
727,080✔
2768
                sb.WriteByte(' ')
29,037✔
2769
                sb.Write(sub.queue)
29,037✔
2770
        }
29,037✔
2771
        if leaf {
714,714✔
2772
                sb.WriteByte(' ')
16,671✔
2773
                sb.Write(sub.origin)
16,671✔
2774
        }
16,671✔
2775
        return sb.String()
698,043✔
2776
}
2777

2778
// Lock should be held.
2779
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
37,054✔
2780
        if key == _EMPTY_ {
37,054✔
2781
                return
×
2782
        }
×
2783
        if n > 0 {
70,337✔
2784
                w.WriteString("LS+ " + key)
33,283✔
2785
                // Check for queue semantics, if found write n.
33,283✔
2786
                if strings.Contains(key, " ") {
35,595✔
2787
                        w.WriteString(" ")
2,312✔
2788
                        var b [12]byte
2,312✔
2789
                        var i = len(b)
2,312✔
2790
                        for l := n; l > 0; l /= 10 {
5,524✔
2791
                                i--
3,212✔
2792
                                b[i] = digits[l%10]
3,212✔
2793
                        }
3,212✔
2794
                        w.Write(b[i:])
2,312✔
2795
                        if c.trace {
2,312✔
2796
                                arg := fmt.Sprintf("%s %d", key, n)
×
2797
                                c.traceOutOp("LS+", []byte(arg))
×
2798
                        }
×
2799
                } else if c.trace {
31,174✔
2800
                        c.traceOutOp("LS+", []byte(key))
203✔
2801
                }
203✔
2802
        } else {
3,771✔
2803
                w.WriteString("LS- " + key)
3,771✔
2804
                if c.trace {
3,781✔
2805
                        c.traceOutOp("LS-", []byte(key))
10✔
2806
                }
10✔
2807
        }
2808
        w.WriteString(CR_LF)
37,054✔
2809
}
2810

2811
// processLeafSub will process an inbound sub request for the remote leaf node.
2812
func (c *client) processLeafSub(argo []byte) (err error) {
32,977✔
2813
        // Indicate activity.
32,977✔
2814
        c.in.subs++
32,977✔
2815

32,977✔
2816
        srv := c.srv
32,977✔
2817
        if srv == nil {
32,977✔
2818
                return nil
×
2819
        }
×
2820

2821
        // Copy so we do not reference a potentially large buffer
2822
        arg := make([]byte, len(argo))
32,977✔
2823
        copy(arg, argo)
32,977✔
2824

32,977✔
2825
        args := splitArg(arg)
32,977✔
2826
        sub := &subscription{client: c}
32,977✔
2827

32,977✔
2828
        delta := int32(1)
32,977✔
2829
        switch len(args) {
32,977✔
2830
        case 1:
30,726✔
2831
                sub.queue = nil
30,726✔
2832
        case 3:
2,251✔
2833
                sub.queue = args[1]
2,251✔
2834
                sub.qw = int32(parseSize(args[2]))
2,251✔
2835
                // TODO: (ik) We should have a non empty queue name and a queue
2,251✔
2836
                // weight >= 1. For 2.11, we may want to return an error if that
2,251✔
2837
                // is not the case, but for now just overwrite `delta` if queue
2,251✔
2838
                // weight is greater than 1 (it is possible after a reconnect/
2,251✔
2839
                // server restart to receive a queue weight > 1 for a new sub).
2,251✔
2840
                if sub.qw > 1 {
3,891✔
2841
                        delta = sub.qw
1,640✔
2842
                }
1,640✔
2843
        default:
×
2844
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2845
        }
2846
        sub.subject = args[0]
32,977✔
2847

32,977✔
2848
        c.mu.Lock()
32,977✔
2849
        if c.isClosed() {
32,998✔
2850
                c.mu.Unlock()
21✔
2851
                return nil
21✔
2852
        }
21✔
2853

2854
        acc := c.acc
32,956✔
2855
        // Guard against LS+ arriving before CONNECT has been processed, which
32,956✔
2856
        // can happen when compression is enabled.
32,956✔
2857
        if acc == nil {
32,959✔
2858
                c.mu.Unlock()
3✔
2859
                c.sendErr("Authorization Violation")
3✔
2860
                c.closeConnection(ProtocolViolation)
3✔
2861
                return nil
3✔
2862
        }
3✔
2863
        // Check if we have a loop.
2864
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
32,953✔
2865

32,953✔
2866
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
32,959✔
2867
                c.mu.Unlock()
6✔
2868
                c.handleLeafNodeLoop(true)
6✔
2869
                return nil
6✔
2870
        }
6✔
2871

2872
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2873
        checkPerms := true
32,947✔
2874
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
62,967✔
2875
                if ldsPrefix ||
30,020✔
2876
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
30,020✔
2877
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
32,018✔
2878
                        checkPerms = false
1,998✔
2879
                }
1,998✔
2880
        }
2881

2882
        // If we are a hub check that we can publish to this subject.
2883
        if checkPerms {
63,896✔
2884
                subj := string(sub.subject)
30,949✔
2885
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
31,265✔
2886
                        c.mu.Unlock()
316✔
2887
                        c.leafSubPermViolation(sub.subject)
316✔
2888
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
316✔
2889
                        return nil
316✔
2890
                }
316✔
2891
        }
2892

2893
        // Check if we have a maximum on the number of subscriptions.
2894
        if c.subsAtLimit() {
32,639✔
2895
                c.mu.Unlock()
8✔
2896
                c.maxSubsExceeded()
8✔
2897
                return nil
8✔
2898
        }
8✔
2899

2900
        // If we have an origin cluster associated mark that in the sub.
2901
        if rc := c.remoteCluster(); rc != _EMPTY_ {
61,385✔
2902
                sub.origin = []byte(rc)
28,762✔
2903
        }
28,762✔
2904

2905
        // Like Routes, we store local subs by account and subject and optionally queue name.
2906
        // If we have a queue it will have a trailing weight which we do not want.
2907
        if sub.queue != nil {
34,594✔
2908
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,971✔
2909
        } else {
32,623✔
2910
                sub.sid = arg
30,652✔
2911
        }
30,652✔
2912
        key := bytesToString(sub.sid)
32,623✔
2913
        osub := c.subs[key]
32,623✔
2914
        if osub == nil {
63,731✔
2915
                c.subs[key] = sub
31,108✔
2916
                // Now place into the account sl.
31,108✔
2917
                if err := acc.sl.Insert(sub); err != nil {
31,108✔
2918
                        delete(c.subs, key)
×
2919
                        c.mu.Unlock()
×
2920
                        c.Errorf("Could not insert subscription: %v", err)
×
2921
                        c.sendErr("Invalid Subscription")
×
2922
                        return nil
×
2923
                }
×
2924
        } else if sub.queue != nil {
3,029✔
2925
                // For a queue we need to update the weight.
1,514✔
2926
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,514✔
2927
                atomic.StoreInt32(&osub.qw, sub.qw)
1,514✔
2928
                acc.sl.UpdateRemoteQSub(osub)
1,514✔
2929
        }
1,514✔
2930
        spoke := c.isSpokeLeafNode()
32,623✔
2931
        c.mu.Unlock()
32,623✔
2932

32,623✔
2933
        // Only add in shadow subs if a new sub or qsub.
32,623✔
2934
        if osub == nil {
63,731✔
2935
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
31,108✔
2936
                        c.Errorf(err.Error())
×
2937
                }
×
2938
        }
2939

2940
        // If we are not solicited, treat leaf node subscriptions similar to a
2941
        // client subscription, meaning we forward them to routes, gateways and
2942
        // other leaf nodes as needed.
2943
        if !spoke {
44,135✔
2944
                // If we are routing add to the route map for the associated account.
11,512✔
2945
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,512✔
2946
                if srv.gateway.enabled {
13,027✔
2947
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,515✔
2948
                }
1,515✔
2949
        }
2950
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2951
        // and non-solicited state in this call so we will do the right thing.
2952
        acc.updateLeafNodes(sub, delta)
32,623✔
2953

32,623✔
2954
        return nil
32,623✔
2955
}
2956

2957
// If the leafnode is a solicited, set the connect delay based on default
2958
// or private option (for tests). Sends the error to the other side, log and
2959
// close the connection.
2960
func (c *client) handleLeafNodeLoop(sendErr bool) {
16✔
2961
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
16✔
2962
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
16✔
2963
        if sendErr {
24✔
2964
                c.sendErr(errTxt)
8✔
2965
        }
8✔
2966

2967
        c.Errorf(errTxt)
16✔
2968
        // If we are here with "sendErr" false, it means that this is the server
16✔
2969
        // that received the error. The other side will have closed the connection,
16✔
2970
        // but does not hurt to close here too.
16✔
2971
        c.closeConnection(ProtocolViolation)
16✔
2972
}
2973

2974
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2975
func (c *client) processLeafUnsub(arg []byte) error {
3,389✔
2976
        // Indicate any activity, so pub and sub or unsubs.
3,389✔
2977
        c.in.subs++
3,389✔
2978

3,389✔
2979
        srv := c.srv
3,389✔
2980

3,389✔
2981
        c.mu.Lock()
3,389✔
2982
        if c.isClosed() {
3,448✔
2983
                c.mu.Unlock()
59✔
2984
                return nil
59✔
2985
        }
59✔
2986

2987
        acc := c.acc
3,330✔
2988
        // Guard against LS- arriving before CONNECT has been processed.
3,330✔
2989
        if acc == nil {
3,331✔
2990
                c.mu.Unlock()
1✔
2991
                c.sendErr("Authorization Violation")
1✔
2992
                c.closeConnection(ProtocolViolation)
1✔
2993
                return nil
1✔
2994
        }
1✔
2995

2996
        spoke := c.isSpokeLeafNode()
3,329✔
2997
        // We store local subs by account and subject and optionally queue name.
3,329✔
2998
        // LS- will have the arg exactly as the key.
3,329✔
2999
        sub, ok := c.subs[string(arg)]
3,329✔
3000
        if !ok {
3,341✔
3001
                // If not found, don't try to update routes/gws/leaf nodes.
12✔
3002
                c.mu.Unlock()
12✔
3003
                return nil
12✔
3004
        }
12✔
3005
        delta := int32(1)
3,317✔
3006
        if len(sub.queue) > 0 {
3,734✔
3007
                delta = sub.qw
417✔
3008
        }
417✔
3009
        c.mu.Unlock()
3,317✔
3010

3,317✔
3011
        c.unsubscribe(acc, sub, true, true)
3,317✔
3012
        if !spoke {
4,365✔
3013
                // If we are routing subtract from the route map for the associated account.
1,048✔
3014
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,048✔
3015
                // Gateways
1,048✔
3016
                if srv.gateway.enabled {
1,322✔
3017
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
274✔
3018
                }
274✔
3019
        }
3020
        // Now check on leafnode updates for other leaf nodes.
3021
        acc.updateLeafNodes(sub, -delta)
3,317✔
3022
        return nil
3,317✔
3023
}
3024

3025
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
483✔
3026
        // Unroll splitArgs to avoid runtime/heap issues
483✔
3027
        args := c.argsa[:0]
483✔
3028
        start := -1
483✔
3029
        for i, b := range arg {
32,704✔
3030
                switch b {
32,221✔
3031
                case ' ', '\t', '\r', '\n':
1,377✔
3032
                        if start >= 0 {
2,754✔
3033
                                args = append(args, arg[start:i])
1,377✔
3034
                                start = -1
1,377✔
3035
                        }
1,377✔
3036
                default:
30,844✔
3037
                        if start < 0 {
32,704✔
3038
                                start = i
1,860✔
3039
                        }
1,860✔
3040
                }
3041
        }
3042
        if start >= 0 {
966✔
3043
                args = append(args, arg[start:])
483✔
3044
        }
483✔
3045

3046
        c.pa.arg = arg
483✔
3047
        switch len(args) {
483✔
3048
        case 0, 1, 2:
×
3049
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
3050
        case 3:
81✔
3051
                c.pa.reply = nil
81✔
3052
                c.pa.queues = nil
81✔
3053
                c.pa.hdb = args[1]
81✔
3054
                c.pa.hdr = parseSize(args[1])
81✔
3055
                c.pa.szb = args[2]
81✔
3056
                c.pa.size = parseSize(args[2])
81✔
3057
        case 4:
395✔
3058
                c.pa.reply = args[1]
395✔
3059
                c.pa.queues = nil
395✔
3060
                c.pa.hdb = args[2]
395✔
3061
                c.pa.hdr = parseSize(args[2])
395✔
3062
                c.pa.szb = args[3]
395✔
3063
                c.pa.size = parseSize(args[3])
395✔
3064
        default:
7✔
3065
                // args[1] is our reply indicator. Should be + or | normally.
7✔
3066
                if len(args[1]) != 1 {
7✔
3067
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3068
                }
×
3069
                switch args[1][0] {
7✔
3070
                case '+':
2✔
3071
                        c.pa.reply = args[2]
2✔
3072
                case '|':
5✔
3073
                        c.pa.reply = nil
5✔
3074
                default:
×
3075
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3076
                }
3077
                // Grab header size.
3078
                c.pa.hdb = args[len(args)-2]
7✔
3079
                c.pa.hdr = parseSize(c.pa.hdb)
7✔
3080

7✔
3081
                // Grab size.
7✔
3082
                c.pa.szb = args[len(args)-1]
7✔
3083
                c.pa.size = parseSize(c.pa.szb)
7✔
3084

7✔
3085
                // Grab queue names.
7✔
3086
                if c.pa.reply != nil {
9✔
3087
                        c.pa.queues = args[3 : len(args)-2]
2✔
3088
                } else {
7✔
3089
                        c.pa.queues = args[2 : len(args)-2]
5✔
3090
                }
5✔
3091
        }
3092
        if c.pa.hdr < 0 {
483✔
3093
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
3094
        }
×
3095
        if c.pa.size < 0 {
483✔
3096
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
3097
        }
×
3098
        if c.pa.hdr > c.pa.size {
483✔
3099
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3100
        }
×
3101

3102
        // Common ones processed after check for arg length
3103
        c.pa.subject = args[0]
483✔
3104

483✔
3105
        return nil
483✔
3106
}
3107

3108
func (c *client) processLeafMsgArgs(arg []byte) error {
84,917✔
3109
        // Unroll splitArgs to avoid runtime/heap issues
84,917✔
3110
        args := c.argsa[:0]
84,917✔
3111
        start := -1
84,917✔
3112
        for i, b := range arg {
2,741,297✔
3113
                switch b {
2,656,380✔
3114
                case ' ', '\t', '\r', '\n':
136,765✔
3115
                        if start >= 0 {
273,530✔
3116
                                args = append(args, arg[start:i])
136,765✔
3117
                                start = -1
136,765✔
3118
                        }
136,765✔
3119
                default:
2,519,615✔
3120
                        if start < 0 {
2,741,297✔
3121
                                start = i
221,682✔
3122
                        }
221,682✔
3123
                }
3124
        }
3125
        if start >= 0 {
169,834✔
3126
                args = append(args, arg[start:])
84,917✔
3127
        }
84,917✔
3128

3129
        c.pa.arg = arg
84,917✔
3130
        switch len(args) {
84,917✔
3131
        case 0, 1:
×
3132
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3133
        case 2:
55,782✔
3134
                c.pa.reply = nil
55,782✔
3135
                c.pa.queues = nil
55,782✔
3136
                c.pa.szb = args[1]
55,782✔
3137
                c.pa.size = parseSize(args[1])
55,782✔
3138
        case 3:
6,582✔
3139
                c.pa.reply = args[1]
6,582✔
3140
                c.pa.queues = nil
6,582✔
3141
                c.pa.szb = args[2]
6,582✔
3142
                c.pa.size = parseSize(args[2])
6,582✔
3143
        default:
22,553✔
3144
                // args[1] is our reply indicator. Should be + or | normally.
22,553✔
3145
                if len(args[1]) != 1 {
22,553✔
3146
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3147
                }
×
3148
                switch args[1][0] {
22,553✔
3149
                case '+':
160✔
3150
                        c.pa.reply = args[2]
160✔
3151
                case '|':
22,393✔
3152
                        c.pa.reply = nil
22,393✔
3153
                default:
×
3154
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3155
                }
3156
                // Grab size.
3157
                c.pa.szb = args[len(args)-1]
22,553✔
3158
                c.pa.size = parseSize(c.pa.szb)
22,553✔
3159

22,553✔
3160
                // Grab queue names.
22,553✔
3161
                if c.pa.reply != nil {
22,713✔
3162
                        c.pa.queues = args[3 : len(args)-1]
160✔
3163
                } else {
22,553✔
3164
                        c.pa.queues = args[2 : len(args)-1]
22,393✔
3165
                }
22,393✔
3166
        }
3167
        if c.pa.size < 0 {
84,917✔
3168
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3169
        }
×
3170

3171
        // Common ones processed after check for arg length
3172
        c.pa.subject = args[0]
84,917✔
3173

84,917✔
3174
        return nil
84,917✔
3175
}
3176

3177
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3178
func (c *client) processInboundLeafMsg(msg []byte) {
83,551✔
3179
        // Update statistics
83,551✔
3180
        // The msg includes the CR_LF, so pull back out for accounting.
83,551✔
3181
        c.in.msgs++
83,551✔
3182
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
83,551✔
3183

83,551✔
3184
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
83,551✔
3185

83,551✔
3186
        // Mostly under testing scenarios.
83,551✔
3187
        if srv == nil || acc == nil {
83,551✔
3188
                return
×
3189
        }
×
3190

3191
        // Match the subscriptions. We will use our own L1 map if
3192
        // it's still valid, avoiding contention on the shared sublist.
3193
        var r *SublistResult
83,551✔
3194
        var ok bool
83,551✔
3195

83,551✔
3196
        genid := atomic.LoadUint64(&c.acc.sl.genid)
83,551✔
3197
        if genid == c.in.genid && c.in.results != nil {
164,664✔
3198
                r, ok = c.in.results[subject]
81,113✔
3199
        } else {
83,551✔
3200
                // Reset our L1 completely.
2,438✔
3201
                c.in.results = make(map[string]*SublistResult)
2,438✔
3202
                c.in.genid = genid
2,438✔
3203
        }
2,438✔
3204

3205
        // Go back to the sublist data structure.
3206
        if !ok {
136,549✔
3207
                r = c.acc.sl.Match(subject)
52,998✔
3208
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
52,998✔
3209
                if len(c.in.results) >= maxResultCacheSize {
54,428✔
3210
                        n := 0
1,430✔
3211
                        for subj := range c.in.results {
48,620✔
3212
                                delete(c.in.results, subj)
47,190✔
3213
                                if n++; n > pruneSize {
48,620✔
3214
                                        break
1,430✔
3215
                                }
3216
                        }
3217
                }
3218
                // Then add the new cache entry.
3219
                c.in.results[subject] = r
52,998✔
3220
        }
3221

3222
        // Collect queue names if needed.
3223
        var qnames [][]byte
83,551✔
3224

83,551✔
3225
        // Check for no interest, short circuit if so.
83,551✔
3226
        // This is the fanout scale.
83,551✔
3227
        if len(r.psubs)+len(r.qsubs) > 0 {
166,705✔
3228
                flag := pmrNoFlag
83,154✔
3229
                // If we have queue subs in this cluster, then if we run in gateway
83,154✔
3230
                // mode and the remote gateways have queue subs, then we need to
83,154✔
3231
                // collect the queue groups this message was sent to so that we
83,154✔
3232
                // exclude them when sending to gateways.
83,154✔
3233
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
83,154✔
3234
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
95,404✔
3235
                        flag |= pmrCollectQueueNames
12,250✔
3236
                }
12,250✔
3237
                // If this is a mapped subject that means the mapped interest
3238
                // is what got us here, but this might not have a queue designation
3239
                // If that is the case, make sure we ignore to process local queue subscribers.
3240
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
83,487✔
3241
                        flag |= pmrIgnoreEmptyQueueFilter
333✔
3242
                }
333✔
3243
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
83,154✔
3244
        }
3245

3246
        // Now deal with gateways
3247
        if c.srv.gateway.enabled {
96,788✔
3248
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,237✔
3249
        }
13,237✔
3250
}
3251

3252
// Handles a subscription permission violation.
3253
// See leafPermViolation() for details.
3254
func (c *client) leafSubPermViolation(subj []byte) {
316✔
3255
        c.leafPermViolation(false, subj)
316✔
3256
}
316✔
3257

3258
// Common function to process publish or subscribe leafnode permission violation.
3259
// Sends the permission violation error to the remote, logs it and closes the connection.
3260
// If this is from a server soliciting, the reconnection will be delayed.
3261
func (c *client) leafPermViolation(pub bool, subj []byte) {
316✔
3262
        if c.isSpokeLeafNode() {
632✔
3263
                // For spokes these are no-ops since the hub server told us our permissions.
316✔
3264
                // We just need to not send these over to the other side since we will get cutoff.
316✔
3265
                return
316✔
3266
        }
316✔
3267
        // FIXME(dlc) ?
3268
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3269
        var action string
×
3270
        if pub {
×
3271
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3272
                action = "Publish"
×
3273
        } else {
×
3274
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3275
                action = "Subscription"
×
3276
        }
×
3277
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3278
        // TODO: add a new close reason that is more appropriate?
×
3279
        c.closeConnection(ProtocolViolation)
×
3280
}
3281

3282
// Invoked from generic processErr() for LEAF connections.
3283
func (c *client) leafProcessErr(errStr string) {
48✔
3284
        // Check if we got a cluster name collision.
48✔
3285
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
51✔
3286
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3287
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3288
                return
3✔
3289
        }
3✔
3290
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
46✔
3291
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3292
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3293
                return
1✔
3294
        }
1✔
3295

3296
        // We will look for Loop detected error coming from the other side.
3297
        // If we solicit, set the connect delay.
3298
        if !strings.Contains(errStr, "Loop detected") {
80✔
3299
                return
36✔
3300
        }
36✔
3301
        c.handleLeafNodeLoop(false)
8✔
3302
}
3303

3304
// If this leaf connection solicits, sets the connect delay to the given value,
3305
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3306
// Returns the connection's account name and delay.
3307
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
20✔
3308
        c.mu.Lock()
20✔
3309
        if c.isSolicitedLeafNode() {
32✔
3310
                if s := c.srv; s != nil {
24✔
3311
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
16✔
3312
                                delay = srvdelay
4✔
3313
                        }
4✔
3314
                }
3315
                c.leaf.remote.setConnectDelay(delay)
12✔
3316
        }
3317
        var accName string
20✔
3318
        if c.acc != nil {
39✔
3319
                accName = c.acc.Name
19✔
3320
        }
19✔
3321
        c.mu.Unlock()
20✔
3322
        return accName, delay
20✔
3323
}
3324

3325
// For the given remote Leafnode configuration, this function returns
3326
// if TLS is required, and if so, will return a clone of the TLS Config
3327
// (since some fields will be changed during handshake), the TLS server
3328
// name that is remembered, and the TLS timeout.
3329
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,931✔
3330
        var (
1,931✔
3331
                tlsConfig  *tls.Config
1,931✔
3332
                tlsName    string
1,931✔
3333
                tlsTimeout float64
1,931✔
3334
        )
1,931✔
3335

1,931✔
3336
        remote.RLock()
1,931✔
3337
        defer remote.RUnlock()
1,931✔
3338

1,931✔
3339
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,931✔
3340
        if tlsRequired {
2,009✔
3341
                if remote.TLSConfig != nil {
129✔
3342
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3343
                } else {
78✔
3344
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
27✔
3345
                }
27✔
3346
                tlsName = remote.tlsName
78✔
3347
                tlsTimeout = remote.TLSTimeout
78✔
3348
                if tlsTimeout == 0 {
122✔
3349
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
44✔
3350
                }
44✔
3351
        }
3352

3353
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,931✔
3354
}
3355

3356
// Initiates the LeafNode Websocket connection by:
3357
// - doing the TLS handshake if needed
3358
// - sending the HTTP request
3359
// - waiting for the HTTP response
3360
//
3361
// Since some bufio reader is used to consume the HTTP response, this function
3362
// returns the slice of buffered bytes (if any) so that the readLoop that will
3363
// be started after that consume those first before reading from the socket.
3364
// The boolean
3365
//
3366
// Lock held on entry.
3367
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
50✔
3368
        remote.RLock()
50✔
3369
        compress := remote.Websocket.Compression
50✔
3370
        // By default the server will mask outbound frames, but it can be disabled with this option.
50✔
3371
        noMasking := remote.Websocket.NoMasking
50✔
3372
        infoTimeout := remote.FirstInfoTimeout
50✔
3373
        remote.RUnlock()
50✔
3374
        // Will do the client-side TLS handshake if needed.
50✔
3375
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
50✔
3376
        if err != nil {
54✔
3377
                // 0 will indicate that the connection was already closed
4✔
3378
                return nil, 0, err
4✔
3379
        }
4✔
3380

3381
        // For http request, we need the passed URL to contain either http or https scheme.
3382
        scheme := "http"
46✔
3383
        if tlsRequired {
54✔
3384
                scheme = "https"
8✔
3385
        }
8✔
3386
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3387
        // create a LEAF connection, not a CLIENT.
3388
        // In case we use the user's URL path in the future, make sure we append the user's
3389
        // path to our `/leafnode` path.
3390
        lpath := leafNodeWSPath
46✔
3391
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
67✔
3392
                if curPath[0] == '/' {
42✔
3393
                        curPath = curPath[1:]
21✔
3394
                }
21✔
3395
                lpath = path.Join(curPath, lpath)
21✔
3396
        } else {
25✔
3397
                lpath = lpath[1:]
25✔
3398
        }
25✔
3399
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
46✔
3400
        u, _ := url.Parse(ustr)
46✔
3401
        req := &http.Request{
46✔
3402
                Method:     "GET",
46✔
3403
                URL:        u,
46✔
3404
                Proto:      "HTTP/1.1",
46✔
3405
                ProtoMajor: 1,
46✔
3406
                ProtoMinor: 1,
46✔
3407
                Header:     make(http.Header),
46✔
3408
                Host:       u.Host,
46✔
3409
        }
46✔
3410
        wsKey, err := wsMakeChallengeKey()
46✔
3411
        if err != nil {
46✔
3412
                return nil, WriteError, err
×
3413
        }
×
3414

3415
        req.Header["Upgrade"] = []string{"websocket"}
46✔
3416
        req.Header["Connection"] = []string{"Upgrade"}
46✔
3417
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
46✔
3418
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
46✔
3419
        if compress {
55✔
3420
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3421
        }
9✔
3422
        if noMasking {
56✔
3423
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3424
        }
10✔
3425
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
46✔
3426
        if err := req.Write(c.nc); err != nil {
46✔
3427
                return nil, WriteError, err
×
3428
        }
×
3429

3430
        var resp *http.Response
46✔
3431

46✔
3432
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
46✔
3433
        resp, err = http.ReadResponse(br, req)
46✔
3434
        if err == nil &&
46✔
3435
                (resp.StatusCode != 101 ||
46✔
3436
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
46✔
3437
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
46✔
3438
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
47✔
3439

1✔
3440
                err = fmt.Errorf("invalid websocket connection")
1✔
3441
        }
1✔
3442
        // Check compression extension...
3443
        if err == nil && c.ws.compress {
55✔
3444
                // Check that not only permessage-deflate extension is present, but that
9✔
3445
                // we also have server and client no context take over.
9✔
3446
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3447

9✔
3448
                // If server does not support compression, then simply disable it in our side.
9✔
3449
                if !srvCompress {
13✔
3450
                        c.ws.compress = false
4✔
3451
                } else if !noCtxTakeover {
9✔
3452
                        err = fmt.Errorf("compression negotiation error")
×
3453
                }
×
3454
        }
3455
        // Same for no masking...
3456
        if err == nil && noMasking {
56✔
3457
                // Check if server accepts no masking
10✔
3458
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3459
                        // Nope, need to mask our writes as any client would do.
1✔
3460
                        c.ws.maskwrite = true
1✔
3461
                }
1✔
3462
        }
3463
        if resp != nil {
76✔
3464
                resp.Body.Close()
30✔
3465
        }
30✔
3466
        if err != nil {
63✔
3467
                return nil, ReadError, err
17✔
3468
        }
17✔
3469
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
29✔
3470

29✔
3471
        var preBuf []byte
29✔
3472
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
29✔
3473
        if n := br.Buffered(); n != 0 {
29✔
3474
                preBuf, _ = br.Peek(n)
×
3475
        }
×
3476
        return preBuf, 0, nil
29✔
3477
}
3478

3479
const connectProcessTimeout = 2 * time.Second
3480

3481
// This is invoked for remote LEAF remote connections after processing the INFO
3482
// protocol.
3483
func (s *Server) leafNodeResumeConnectProcess(c *client) {
674✔
3484
        clusterName := s.ClusterName()
674✔
3485

674✔
3486
        c.mu.Lock()
674✔
3487
        if c.isClosed() {
674✔
3488
                c.mu.Unlock()
×
3489
                return
×
3490
        }
×
3491
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
676✔
3492
                c.mu.Unlock()
2✔
3493
                c.closeConnection(WriteError)
2✔
3494
                return
2✔
3495
        }
2✔
3496

3497
        // Spin up the write loop.
3498
        s.startGoRoutine(func() { c.writeLoop() })
1,344✔
3499

3500
        // timeout leafNodeFinishConnectProcess
3501
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
672✔
3502
                c.mu.Lock()
×
3503
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3504
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3505
                        c.mu.Unlock()
×
3506
                        return
×
3507
                }
×
3508
                clearTimer(&c.ping.tmr)
×
3509
                closed := c.isClosed()
×
3510
                c.mu.Unlock()
×
3511
                if !closed {
×
3512
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3513
                        c.closeConnection(StaleConnection)
×
3514
                }
×
3515
        })
3516
        c.mu.Unlock()
672✔
3517
        c.Debugf("Remote leafnode connect msg sent")
672✔
3518
}
3519

3520
// This is invoked for remote LEAF connections after processing the INFO
3521
// protocol and leafNodeResumeConnectProcess.
3522
// This will send LS+ the CONNECT protocol and register the leaf node.
3523
func (s *Server) leafNodeFinishConnectProcess(c *client) {
637✔
3524
        c.mu.Lock()
637✔
3525
        if !c.flags.setIfNotSet(connectProcessFinished) {
637✔
3526
                c.mu.Unlock()
×
3527
                return
×
3528
        }
×
3529
        if c.isClosed() {
637✔
3530
                c.mu.Unlock()
×
3531
                s.removeLeafNodeConnection(c)
×
3532
                return
×
3533
        }
×
3534
        remote := c.leaf.remote
637✔
3535
        // Check if we will need to send the system connect event.
637✔
3536
        remote.RLock()
637✔
3537
        sendSysConnectEvent := remote.Hub
637✔
3538
        remote.RUnlock()
637✔
3539

637✔
3540
        // Capture account before releasing lock
637✔
3541
        acc := c.acc
637✔
3542
        // cancel connectProcessTimeout
637✔
3543
        clearTimer(&c.ping.tmr)
637✔
3544
        c.mu.Unlock()
637✔
3545

637✔
3546
        // Make sure we register with the account here.
637✔
3547
        if err := c.registerWithAccount(acc); err != nil {
639✔
3548
                if err == ErrTooManyAccountConnections {
2✔
3549
                        c.maxAccountConnExceeded()
×
3550
                        return
×
3551
                } else if err == ErrLeafNodeLoop {
4✔
3552
                        c.handleLeafNodeLoop(true)
2✔
3553
                        return
2✔
3554
                }
2✔
3555
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3556
                c.closeConnection(ProtocolViolation)
×
3557
                return
×
3558
        }
3559
        if !s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false) {
635✔
3560
                // Was not added, could be because the remote configuration has been removed.
×
3561
                c.closeConnection(ClientClosed)
×
3562
                return
×
3563
        }
×
3564
        s.initLeafNodeSmapAndSendSubs(c)
635✔
3565
        if sendSysConnectEvent {
651✔
3566
                s.sendLeafNodeConnect(acc)
16✔
3567
        }
16✔
3568
        s.accountConnectEvent(c)
635✔
3569

635✔
3570
        // The above functions are not running under the client lock, so it is
635✔
3571
        // possible that between the time we have started the read/write loops
635✔
3572
        // and now, that the connection was closed. This would leave the closed
635✔
3573
        // LN connection possibly registered with the account and/or the server's
635✔
3574
        // leafs map. So check if connection is closed, and if so, manually cleanup.
635✔
3575
        c.mu.Lock()
635✔
3576
        closed := c.isClosed()
635✔
3577
        if !closed {
1,270✔
3578
                c.setFirstPingTimer()
635✔
3579
        }
635✔
3580
        c.mu.Unlock()
635✔
3581
        if closed {
635✔
3582
                s.removeLeafNodeConnection(c)
×
3583
                if prev := acc.removeClient(c); prev == 1 {
×
3584
                        s.decActiveAccounts()
×
3585
                }
×
3586
        }
3587
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc