• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 26079329094

18 May 2026 03:50PM UTC coverage: 77.165% (-5.9%) from 83.064%
26079329094

push

github

web-flow
[IMPROVED] Replace time.After with reusable timer in snapshot and catchup loops (#8186)

## Overview

`time.After` creates a new channel and timer that cannot be garbage
collected until the timer fires. When used inside a loop, each iteration
leaks a timer for the full timeout duration.

PR #4756 fixed identical instances in `client.go`, `consumer.go`,
`jetstream_cluster.go`, and `mqtt.go`, but two instances were missed (or
introduced later):

1. **`server/jetstream_api.go`, snapshot chunk-streaming loop**
(introduced in #7828):
The `for index := 1; ; index++` loop runs once per snapshot chunk. A 1
GB snapshot with 128 KB chunks creates ~8,000 iterations, each leaking a
5-second timer (`snapshotAckTimeout`).

2. **`server/jetstream_cluster.go`, `runCatchup` loop** (introduced in
#5454):
The `for { select { ... } }` loop runs continuously during stream
catchup. When `nextBatchC` or `cbKick` fires instead of the 500 ms
timer, the timer is abandoned every iteration.

## Changes

Replace `time.After(duration)` with a `time.NewTimer` created before the
loop, using the `Stop`/drain/`Reset` pattern to reuse it across
iterations. This matches the exact pattern established in #4756.

## Testing

Existing snapshot and catchup tests pass:
- `TestJetStreamSnapshots` (0.61s)
- `TestJetStreamSnapshotsAPI` (0.24s)
- `TestJetStreamSnapshotRestoreStallAndHealthz` (0.04s)
- `TestJetStreamClusterStreamCatchupNoState` (5.68s)
- `TestJetStreamClusterStreamCatchupWithTruncateAndPriorSnapshot`
(2.25s)
- `TestJetStreamClusterStreamCatchupInteriorNilMsgs` (1.12s)

Closes #8185. Relates to #4756.

72024 of 93338 relevant lines covered (77.16%)

400212.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.73
/server/leafnode.go
1
// Copyright 2019-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "regexp"
31
        "runtime"
32
        "strconv"
33
        "strings"
34
        "sync"
35
        "sync/atomic"
36
        "time"
37

38
        "github.com/klauspost/compress/s2"
39
        "github.com/nats-io/jwt/v2"
40
        "github.com/nats-io/nkeys"
41
        "github.com/nats-io/nuid"
42
)
43

44
const (
45
        // Warning when user configures leafnode TLS insecure
46
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
47

48
        // When a loop is detected, delay the reconnect of solicited connection.
49
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
50

51
        // When a server receives a message causing a permission violation, the
52
        // connection is closed and it won't attempt to reconnect for that long.
53
        leafNodeReconnectAfterPermViolation = 30 * time.Second
54

55
        // When we have the same cluster name as the hub.
56
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
57

58
        // Prefix for loop detection subject
59
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
60

61
        // Path added to URL to indicate to WS server that the connection is a
62
        // LEAF connection as opposed to a CLIENT.
63
        leafNodeWSPath = "/leafnode"
64

65
        // When a soliciting leafnode is rejected because it does not meet the
66
        // configured minimum version, delay the next reconnect attempt by this long.
67
        leafNodeMinVersionReconnectDelay = 5 * time.Second
68
)
69

70
type leaf struct {
71
        // We have any auth stuff here for solicited connections.
72
        remote *leafNodeCfg
73
        // isSpoke tells us what role we are playing.
74
        // Used when we receive a connection but otherside tells us they are a hub.
75
        isSpoke bool
76
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
77
        remoteCluster string
78
        // remoteServer holds onto the remote server's name or ID.
79
        remoteServer string
80
        // domain name of remote server
81
        remoteDomain string
82
        // account name of remote server
83
        remoteAccName string
84
        // Whether or not we want to propagate east-west interest from other LNs.
85
        isolated bool
86
        // Used to suppress sub and unsub interest. Same as routes but our audience
87
        // here is tied to this leaf node. This will hold all subscriptions except this
88
        // leaf nodes. This represents all the interest we want to send to the other side.
89
        smap map[string]int32
90
        // This map will contain all the subscriptions that have been added to the smap
91
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
92
        // race between processing of a sub where sub is added to account sublist but
93
        // updateSmap has not be called on that "thread", while in the LN readloop,
94
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
95
        // this subscription to smap. When processing of the sub then calls updateSmap,
96
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
97
        tsub  map[*subscription]struct{}
98
        tsubt *time.Timer
99
        // Selected compression mode, which may be different from the server configured mode.
100
        compression string
101
        // This is for GW map replies.
102
        gwSub *subscription
103
}
104

105
// Used for remote (solicited) leafnodes.
106
type leafNodeCfg struct {
107
        sync.RWMutex
108
        *RemoteLeafOpts
109
        urls           []*url.URL
110
        curURL         *url.URL
111
        tlsName        string
112
        username       string
113
        password       string
114
        perms          *Permissions
115
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
116
        jsMigrateTimer *time.Timer
117
        quitCh         chan struct{}
118
        removed        bool
119
        connInProgress bool
120
}
121

122
// Check to see if this is a solicited leafnode. We do special processing for solicited.
123
func (c *client) isSolicitedLeafNode() bool {
1,912✔
124
        return c.kind == LEAF && c.leaf != nil && c.leaf.remote != nil
1,912✔
125
}
1,912✔
126

127
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
128
// connection leafnode where the otherside has declared itself to be the hub.
129
func (c *client) isSpokeLeafNode() bool {
8,121,308✔
130
        return c.kind == LEAF && c.leaf != nil && c.leaf.isSpoke
8,121,308✔
131
}
8,121,308✔
132

133
func (c *client) isHubLeafNode() bool {
15,833✔
134
        return c.kind == LEAF && c.leaf != nil && !c.leaf.isSpoke
15,833✔
135
}
15,833✔
136

137
func (c *client) isIsolatedLeafNode() bool {
10,350✔
138
        // TODO(nat): In future we may want to pass in and consider an isolation
10,350✔
139
        // group name here, which the hub and/or leaf could provide, so that we
10,350✔
140
        // can isolate away certain LNs but not others on an opt-in basis. For
10,350✔
141
        // now we will just isolate all LN interest until then.
10,350✔
142
        return c.kind == LEAF && c.leaf != nil && c.leaf.isolated
10,350✔
143
}
10,350✔
144

145
// This will spin up go routines to solicit the remote leaf node connections.
146
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,217✔
147
        sysAccName := _EMPTY_
1,217✔
148
        sAcc := s.SystemAccount()
1,217✔
149
        if sAcc != nil {
2,412✔
150
                sysAccName = sAcc.Name
1,195✔
151
        }
1,195✔
152
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,560✔
153
                s.mu.Lock()
1,343✔
154
                remote := newLeafNodeCfg(r)
1,343✔
155
                creds := remote.Credentials
1,343✔
156
                accName := remote.LocalAccount
1,343✔
157
                if s.leafRemoteCfgs == nil {
2,559✔
158
                        s.leafRemoteCfgs = make(map[*leafNodeCfg]struct{})
1,216✔
159
                }
1,216✔
160
                s.leafRemoteCfgs[remote] = struct{}{}
1,343✔
161
                // Print notice if
1,343✔
162
                if isSysAccRemote {
1,426✔
163
                        if len(remote.DenyExports) > 0 {
84✔
164
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
165
                        }
1✔
166
                        if len(remote.DenyImports) > 0 {
84✔
167
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
168
                        }
1✔
169
                }
170
                s.mu.Unlock()
1,343✔
171
                if creds != _EMPTY_ {
1,383✔
172
                        contents, err := os.ReadFile(creds)
40✔
173
                        defer wipeSlice(contents)
40✔
174
                        if err != nil {
40✔
175
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
176
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
40✔
177
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
178
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
40✔
179
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
180
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
40✔
181
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
182
                        } else if isSysAccRemote {
44✔
183
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
184
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
185
                                }
1✔
186
                        } else {
36✔
187
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
40✔
188
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
4✔
189
                                }
4✔
190
                        }
191
                }
192
                return remote
1,343✔
193
        }
194
        for _, r := range remotes {
2,560✔
195
                // We need to call this, even if the leaf is disabled. This is so that
1,343✔
196
                // the number of internal configuration matches the options' remote leaf
1,343✔
197
                // configuration required for configuration reload.
1,343✔
198
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,343✔
199
                if !r.Disabled {
2,686✔
200
                        s.connectToRemoteLeafNodeAsynchronously(remote, true)
1,343✔
201
                }
1,343✔
202
        }
203
}
204

205
// Ensure that leafnode is properly configured.
206
func validateLeafNode(o *Options) error {
7,619✔
207
        if err := validateLeafNodeAuthOptions(o); err != nil {
7,621✔
208
                return err
2✔
209
        }
2✔
210

211
        if len(o.LeafNode.Remotes) > 0 {
8,873✔
212
                names := make(map[string]struct{})
1,256✔
213
                // Check for duplicate remotes, also, users can bind to any local account,
1,256✔
214
                // if its empty we will assume the $G account.
1,256✔
215
                for _, r := range o.LeafNode.Remotes {
2,644✔
216
                        if r.LocalAccount == _EMPTY_ {
1,799✔
217
                                r.LocalAccount = globalAccountName
411✔
218
                        }
411✔
219
                        rn := r.name()
1,388✔
220
                        if _, dup := names[rn]; dup {
1,391✔
221
                                return fmt.Errorf("duplicate remote %s", r.safeName())
3✔
222
                        }
3✔
223
                        names[rn] = struct{}{}
1,385✔
224
                }
225
        }
226

227
        // In local config mode, check that leafnode configuration refers to accounts that exist.
228
        if len(o.TrustedOperators) == 0 {
14,948✔
229
                accNames := map[string]struct{}{}
7,334✔
230
                for _, a := range o.Accounts {
15,768✔
231
                        accNames[a.Name] = struct{}{}
8,434✔
232
                }
8,434✔
233
                // global account is always created
234
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
7,334✔
235
                // in the context of leaf nodes, empty account means global account
7,334✔
236
                accNames[_EMPTY_] = struct{}{}
7,334✔
237
                // system account either exists or, if not disabled, will be created
7,334✔
238
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
13,264✔
239
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
5,930✔
240
                }
5,930✔
241
                checkAccountExists := func(accName string, cfgType string) error {
16,053✔
242
                        if _, ok := accNames[accName]; !ok {
8,721✔
243
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
244
                        }
2✔
245
                        return nil
8,717✔
246
                }
247
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
7,335✔
248
                        return err
1✔
249
                }
1✔
250
                for _, lu := range o.LeafNode.Users {
7,350✔
251
                        if lu.Account == nil { // means global account
27✔
252
                                continue
10✔
253
                        }
254
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
255
                                return err
×
256
                        }
×
257
                }
258
                for _, r := range o.LeafNode.Remotes {
8,711✔
259
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,379✔
260
                                return err
1✔
261
                        }
1✔
262
                }
263
        } else {
280✔
264
                if len(o.LeafNode.Users) != 0 {
281✔
265
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
266
                }
1✔
267
                for _, r := range o.LeafNode.Remotes {
280✔
268
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
269
                                return fmt.Errorf(
1✔
270
                                        "operator mode requires account nkeys in remotes. " +
1✔
271
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
272
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
273
                        }
1✔
274
                }
275
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
279✔
276
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
277
                }
1✔
278
        }
279

280
        // Validate compression settings
281
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,094✔
282
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,490✔
283
                        return err
5✔
284
                }
5✔
285
        }
286

287
        // If a remote has a websocket scheme, all need to have it.
288
        for _, rcfg := range o.LeafNode.Remotes {
8,981✔
289
                // Validate proxy configuration
1,377✔
290
                if _, err := validateLeafNodeProxyOptions(rcfg); err != nil {
1,383✔
291
                        return err
6✔
292
                }
6✔
293

294
                if len(rcfg.URLs) >= 2 {
1,547✔
295
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
176✔
296
                        for i := 1; i < len(rcfg.URLs); i++ {
535✔
297
                                u := rcfg.URLs[i]
359✔
298
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
366✔
299
                                        ok = false
7✔
300
                                        break
7✔
301
                                }
302
                        }
303
                        if !ok {
183✔
304
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
305
                        }
7✔
306
                }
307
                if !wsAllowedFIPS() {
1,364✔
308
                        for _, u := range rcfg.URLs {
×
309
                                if isWSURL(u) {
×
310
                                        return fmt.Errorf("remote leaf node URL %q cannot be used in FIPS-140 mode when built with this Go version, use Go 1.26 or later", redactURLString(u.String()))
×
311
                                }
×
312
                        }
313
                }
314
                // Validate compression settings
315
                if rcfg.Compression.Mode != _EMPTY_ {
2,722✔
316
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,363✔
317
                                return err
5✔
318
                        }
5✔
319
                }
320
        }
321

322
        if o.LeafNode.Port == 0 {
11,278✔
323
                return nil
3,692✔
324
        }
3,692✔
325

326
        // If MinVersion is defined, check that it is valid.
327
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,898✔
328
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
329
                        return err
2✔
330
                }
2✔
331
        }
332

333
        // The checks below will be done only when detecting that we are configured
334
        // with gateways. So if an option validation needs to be done regardless,
335
        // it MUST be done before this point!
336

337
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
7,136✔
338
                return nil
3,244✔
339
        }
3,244✔
340
        // If we are here we have both leaf nodes and gateways defined, make sure there
341
        // is a system account defined.
342
        if o.SystemAccount == _EMPTY_ {
649✔
343
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
344
        }
1✔
345
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
647✔
346
                return fmt.Errorf("leafnode: %v", err)
×
347
        }
×
348
        return nil
647✔
349
}
350

351
func checkLeafMinVersionConfig(mv string) error {
8✔
352
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
353
                if err != nil {
6✔
354
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
355
                } else {
4✔
356
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
357
                }
2✔
358
        }
359
        return nil
4✔
360
}
361

362
// Used to validate user names in LeafNode configuration.
363
// - rejects mix of single and multiple users.
364
// - rejects duplicate user names.
365
func validateLeafNodeAuthOptions(o *Options) error {
7,667✔
366
        if len(o.LeafNode.Users) == 0 {
15,308✔
367
                return nil
7,641✔
368
        }
7,641✔
369
        if o.LeafNode.Username != _EMPTY_ {
28✔
370
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
371
        }
2✔
372
        if o.LeafNode.Nkey != _EMPTY_ {
24✔
373
                return fmt.Errorf("can not have a single nkey and a users array")
×
374
        }
×
375
        users := map[string]struct{}{}
24✔
376
        for _, u := range o.LeafNode.Users {
62✔
377
                if _, exists := users[u.Username]; exists {
40✔
378
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
379
                }
2✔
380
                users[u.Username] = struct{}{}
36✔
381
        }
382
        return nil
22✔
383
}
384

385
func validateLeafNodeProxyOptions(remote *RemoteLeafOpts) ([]string, error) {
1,884✔
386
        var warnings []string
1,884✔
387

1,884✔
388
        if remote.Proxy.URL == _EMPTY_ {
3,742✔
389
                return warnings, nil
1,858✔
390
        }
1,858✔
391

392
        proxyURL, err := url.Parse(remote.Proxy.URL)
26✔
393
        if err != nil {
27✔
394
                return warnings, fmt.Errorf("invalid proxy URL: %v", err)
1✔
395
        }
1✔
396

397
        if proxyURL.Scheme != "http" && proxyURL.Scheme != "https" {
27✔
398
                return warnings, fmt.Errorf("proxy URL scheme must be http or https, got: %s", proxyURL.Scheme)
2✔
399
        }
2✔
400

401
        if proxyURL.Host == _EMPTY_ {
25✔
402
                return warnings, fmt.Errorf("proxy URL must specify a host")
2✔
403
        }
2✔
404

405
        if remote.Proxy.Timeout < 0 {
22✔
406
                return warnings, fmt.Errorf("proxy timeout must be >= 0")
1✔
407
        }
1✔
408

409
        if (remote.Proxy.Username == _EMPTY_) != (remote.Proxy.Password == _EMPTY_) {
24✔
410
                return warnings, fmt.Errorf("proxy username and password must both be specified or both be empty")
4✔
411
        }
4✔
412

413
        if len(remote.URLs) > 0 {
32✔
414
                hasWebSocketURL := false
16✔
415
                hasNonWebSocketURL := false
16✔
416

16✔
417
                for _, remoteURL := range remote.URLs {
33✔
418
                        if remoteURL.Scheme == wsSchemePrefix || remoteURL.Scheme == wsSchemePrefixTLS {
30✔
419
                                hasWebSocketURL = true
13✔
420
                                if (remoteURL.Scheme == wsSchemePrefixTLS) &&
13✔
421
                                        remote.TLSConfig == nil && !remote.TLS {
14✔
422
                                        return warnings, fmt.Errorf("proxy is configured but remote URL %s requires TLS and no TLS configuration is provided. When using proxy with TLS endpoints, ensure TLS is properly configured for the leafnode remote", remoteURL.String())
1✔
423
                                }
1✔
424
                        } else {
4✔
425
                                hasNonWebSocketURL = true
4✔
426
                        }
4✔
427
                }
428

429
                if !hasWebSocketURL {
18✔
430
                        warnings = append(warnings, "proxy configuration will be ignored: proxy settings only apply to WebSocket connections (ws:// or wss://), but all configured URLs use TCP connections (nats://)")
3✔
431
                } else if hasNonWebSocketURL {
16✔
432
                        warnings = append(warnings, "proxy configuration will only be used for WebSocket URLs: proxy settings do not apply to TCP connections (nats://)")
1✔
433
                }
1✔
434
        }
435

436
        return warnings, nil
15✔
437
}
438

439
// Wait for the configured reconnect interval before attempting to connect
440
// again to the remote leafnode.
441
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
244✔
442
        clearInProgress := true
244✔
443
        defer func() {
487✔
444
                s.grWG.Done()
243✔
445
                if clearInProgress {
306✔
446
                        remote.setConnectInProgress(false)
63✔
447
                }
63✔
448
        }()
449
        delay := s.getOpts().LeafNode.ReconnectInterval
244✔
450
        select {
244✔
451
        case <-time.After(delay):
187✔
452
        case <-remote.quitCh:
×
453
                return
×
454
        case <-s.quitCh:
57✔
455
                return
57✔
456
        }
457
        clearInProgress = !connectToRemoteLeafNode(s, remote, false)
187✔
458
}
459

460
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
461
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,343✔
462
        cfg := &leafNodeCfg{
1,343✔
463
                RemoteLeafOpts: remote,
1,343✔
464
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,343✔
465
                quitCh:         make(chan struct{}, 1),
1,343✔
466
        }
1,343✔
467
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,351✔
468
                perms := &Permissions{}
8✔
469
                if len(remote.DenyExports) > 0 {
16✔
470
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
471
                }
8✔
472
                if len(remote.DenyImports) > 0 {
15✔
473
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
474
                }
7✔
475
                cfg.perms = perms
8✔
476
        }
477
        // Start with the one that is configured. We will add to this
478
        // array when receiving async leafnode INFOs.
479
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,343✔
480
        // If allowed to randomize, do it on our copy of URLs
1,343✔
481
        if !remote.NoRandomize {
2,685✔
482
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,678✔
483
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
336✔
484
                })
336✔
485
        }
486
        // If we are TLS make sure we save off a proper servername if possible.
487
        // Do same for user/password since we may need them to connect to
488
        // a bare URL that we get from INFO protocol.
489
        for _, u := range cfg.urls {
3,037✔
490
                cfg.saveTLSHostname(u)
1,694✔
491
                cfg.saveUserPassword(u)
1,694✔
492
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,694✔
493
                // config, mark that we should be using TLS anyway.
1,694✔
494
                if !cfg.TLS && isWSSURL(u) {
1,695✔
495
                        cfg.TLS = true
1✔
496
                }
1✔
497
        }
498
        return cfg
1,343✔
499
}
500

501
// Notifies the quit channel without blocking.
502
// No lock is needed to invoke this function.
503
func (cfg *leafNodeCfg) notifyQuitChannel() {
×
504
        select {
×
505
        case cfg.quitCh <- struct{}{}:
×
506
        default:
×
507
        }
508
}
509

510
// Sets the connect-in-progress status for this remote leaf configuration.
511
func (cfg *leafNodeCfg) setConnectInProgress(inProgress bool) {
3,485✔
512
        cfg.Lock()
3,485✔
513
        defer cfg.Unlock()
3,485✔
514
        // In both cases we want to drain the "quit" channel.
3,485✔
515
        select {
3,485✔
516
        case <-cfg.quitCh:
×
517
        default:
3,485✔
518
        }
519
        cfg.connInProgress = inProgress
3,485✔
520
}
521

522
// Returns `true` if this remote is in the middle of a connect, `false` otherwise.
523
func (cfg *leafNodeCfg) isConnectInProgress() bool {
×
524
        cfg.RLock()
×
525
        defer cfg.RUnlock()
×
526
        return cfg.connInProgress
×
527
}
×
528

529
// Mark that this remote is being removed from the configuration.
530
func (cfg *leafNodeCfg) markAsRemoved() {
×
531
        cfg.Lock()
×
532
        defer cfg.Unlock()
×
533
        // This function should be invoked only once, but protect.
×
534
        if cfg.removed {
×
535
                return
×
536
        }
×
537
        cfg.removed = true
×
538
        cfg.notifyQuitChannel()
×
539
}
540

541
// Returns false if it has been disabled or removed.
542
func (cfg *leafNodeCfg) stillValid() bool {
7,192✔
543
        cfg.RLock()
7,192✔
544
        defer cfg.RUnlock()
7,192✔
545
        return !cfg.Disabled && !cfg.removed
7,192✔
546
}
7,192✔
547

548
// Will pick an URL from the list of available URLs.
549
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
5,906✔
550
        cfg.Lock()
5,906✔
551
        defer cfg.Unlock()
5,906✔
552
        // If the current URL is the first in the list and we have more than
5,906✔
553
        // one URL, then move that one to end of the list.
5,906✔
554
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
8,240✔
555
                first := cfg.urls[0]
2,334✔
556
                copy(cfg.urls, cfg.urls[1:])
2,334✔
557
                cfg.urls[len(cfg.urls)-1] = first
2,334✔
558
        }
2,334✔
559
        cfg.curURL = cfg.urls[0]
5,906✔
560
        return cfg.curURL
5,906✔
561
}
562

563
// Returns the current URL
564
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
75✔
565
        cfg.RLock()
75✔
566
        defer cfg.RUnlock()
75✔
567
        return cfg.curURL
75✔
568
}
75✔
569

570
// Returns how long the server should wait before attempting
571
// to solicit a remote leafnode connection.
572
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,530✔
573
        cfg.RLock()
1,530✔
574
        delay := cfg.connDelay
1,530✔
575
        cfg.RUnlock()
1,530✔
576
        return delay
1,530✔
577
}
1,530✔
578

579
// Sets the connect delay.
580
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
132✔
581
        cfg.Lock()
132✔
582
        cfg.connDelay = delay
132✔
583
        cfg.Unlock()
132✔
584
}
132✔
585

586
// Ensure that non-exported options (used in tests) have
587
// been properly set.
588
func (s *Server) setLeafNodeNonExportedOptions() {
6,439✔
589
        opts := s.getOpts()
6,439✔
590
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
6,439✔
591
        if s.leafNodeOpts.dialTimeout == 0 {
12,877✔
592
                // Use same timeouts as routes for now.
6,438✔
593
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
6,438✔
594
        }
6,438✔
595
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
6,439✔
596
        if s.leafNodeOpts.resolver == nil {
12,875✔
597
                s.leafNodeOpts.resolver = net.DefaultResolver
6,436✔
598
        }
6,436✔
599
}
600

601
const sharedSysAccDelay = 250 * time.Millisecond
602

603
// establishHTTPProxyTunnel establishes an HTTP CONNECT tunnel through a proxy server
604
func establishHTTPProxyTunnel(proxyURL, targetHost string, timeout time.Duration, username, password string) (net.Conn, error) {
11✔
605
        proxyAddr, err := url.Parse(proxyURL)
11✔
606
        if err != nil {
11✔
607
                // This should not happen since proxy URL is validated during configuration parsing
×
608
                return nil, fmt.Errorf("unexpected proxy URL parse error (URL was pre-validated): %v", err)
×
609
        }
×
610

611
        // Connect to the proxy server
612
        conn, err := natsDialTimeout("tcp", proxyAddr.Host, timeout)
11✔
613
        if err != nil {
11✔
614
                return nil, fmt.Errorf("failed to connect to proxy: %v", err)
×
615
        }
×
616

617
        // Set deadline for the entire proxy handshake
618
        if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
11✔
619
                conn.Close()
×
620
                return nil, fmt.Errorf("failed to set deadline: %v", err)
×
621
        }
×
622

623
        req := &http.Request{
11✔
624
                Method: http.MethodConnect,
11✔
625
                URL:    &url.URL{Opaque: targetHost}, // Opaque is required for CONNECT
11✔
626
                Host:   targetHost,
11✔
627
                Header: make(http.Header),
11✔
628
        }
11✔
629

11✔
630
        // Add proxy authentication if provided
11✔
631
        if username != "" && password != "" {
13✔
632
                req.Header.Set("Proxy-Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(username+":"+password)))
2✔
633
        }
2✔
634

635
        if err := req.Write(conn); err != nil {
11✔
636
                conn.Close()
×
637
                return nil, fmt.Errorf("failed to write CONNECT request: %v", err)
×
638
        }
×
639

640
        resp, err := http.ReadResponse(bufio.NewReader(conn), req)
11✔
641
        if err != nil {
11✔
642
                conn.Close()
×
643
                return nil, fmt.Errorf("failed to read proxy response: %v", err)
×
644
        }
×
645

646
        if resp.StatusCode != http.StatusOK {
12✔
647
                resp.Body.Close()
1✔
648
                conn.Close()
1✔
649
                return nil, fmt.Errorf("proxy CONNECT failed: %s", resp.Status)
1✔
650
        }
1✔
651

652
        // Close the response body
653
        resp.Body.Close()
10✔
654

10✔
655
        // Clear the deadline now that we've finished the proxy handshake
10✔
656
        if err := conn.SetDeadline(time.Time{}); err != nil {
10✔
657
                conn.Close()
×
658
                return nil, fmt.Errorf("failed to clear deadline: %v", err)
×
659
        }
×
660

661
        return conn, nil
10✔
662
}
663

664
// Connect to a remote leaf node asynchronously (that is, this function will do
665
// the connect in a go routine).
666
func (s *Server) connectToRemoteLeafNodeAsynchronously(remote *leafNodeCfg, firstConnect bool) {
1,343✔
667
        remote.setConnectInProgress(true)
1,343✔
668
        s.startGoRoutine(func() {
2,686✔
669
                defer s.grWG.Done()
1,343✔
670
                if !connectToRemoteLeafNode(s, remote, firstConnect) {
2,137✔
671
                        remote.setConnectInProgress(false)
794✔
672
                }
794✔
673
        })
674
}
675

676
// Connect to a remote leaf node. Should only be invoked from
677
// `s.connectToRemoteLeafNodeAsynchronously()` or `s.reConnectToRemoteLeafNode()`.
678
// Returns `true` if this function invoked `s.createLeafNode()`, false otherwise.
679
func connectToRemoteLeafNode(s *Server, remote *leafNodeCfg, firstConnect bool) bool {
1,530✔
680

1,530✔
681
        if remote == nil || len(remote.URLs) == 0 {
1,530✔
682
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
683
                return false
×
684
        }
×
685

686
        opts := s.getOpts()
1,530✔
687
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,530✔
688
        s.mu.RLock()
1,530✔
689
        dialTimeout := s.leafNodeOpts.dialTimeout
1,530✔
690
        resolver := s.leafNodeOpts.resolver
1,530✔
691
        var isSysAcc bool
1,530✔
692
        if s.eventsEnabled() {
3,027✔
693
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,497✔
694
        }
1,497✔
695
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,530✔
696
        s.mu.RUnlock()
1,530✔
697

1,530✔
698
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,530✔
699
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,590✔
700
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
60✔
701
                remote.setConnectDelay(sharedSysAccDelay)
60✔
702
        }
60✔
703

704
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,597✔
705
                select {
67✔
706
                case <-time.After(connDelay):
61✔
707
                case <-remote.quitCh:
×
708
                        return false
×
709
                case <-s.quitCh:
6✔
710
                        return false
6✔
711
                }
712
                remote.setConnectDelay(0)
61✔
713
        }
714

715
        var conn net.Conn
1,524✔
716

1,524✔
717
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,524✔
718

1,524✔
719
        // Capture proxy configuration once before the loop with proper locking
1,524✔
720
        remote.RLock()
1,524✔
721
        proxyURL := remote.Proxy.URL
1,524✔
722
        proxyUsername := remote.Proxy.Username
1,524✔
723
        proxyPassword := remote.Proxy.Password
1,524✔
724
        proxyTimeout := remote.Proxy.Timeout
1,524✔
725
        remote.RUnlock()
1,524✔
726

1,524✔
727
        // Set default proxy timeout if not specified
1,524✔
728
        if proxyTimeout == 0 {
3,040✔
729
                proxyTimeout = dialTimeout
1,516✔
730
        }
1,516✔
731

732
        attempts := 0
1,524✔
733

1,524✔
734
        // In case the migrate timer was created but not canceled, do it when
1,524✔
735
        // this function exits. Note that the timer would not be created if
1,524✔
736
        // `jetstreamMigrateDelay == 0`.
1,524✔
737
        if jetstreamMigrateDelay > 0 {
1,532✔
738
                defer remote.cancelMigrateTimer()
8✔
739
        }
8✔
740

741
        for s.isRunning() && remote.stillValid() {
7,430✔
742
                rURL := remote.pickNextURL()
5,906✔
743
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
5,906✔
744
                if err == nil {
11,807✔
745
                        var ipStr string
5,901✔
746
                        if url != rURL.Host {
5,970✔
747
                                ipStr = fmt.Sprintf(" (%s)", url)
69✔
748
                        }
69✔
749
                        // Some test may want to disable remotes from connecting
750
                        if s.isLeafConnectDisabled() {
6,029✔
751
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
128✔
752
                                err = ErrLeafNodeDisabled
128✔
753
                        } else {
5,901✔
754
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
5,773✔
755

5,773✔
756
                                // Check if proxy is configured
5,773✔
757
                                if proxyURL != _EMPTY_ {
5,781✔
758
                                        targetHost := rURL.Host
8✔
759
                                        // If URL doesn't include port, add the default port for the scheme
8✔
760
                                        if rURL.Port() == _EMPTY_ {
8✔
761
                                                defaultPort := "80"
×
762
                                                if rURL.Scheme == wsSchemePrefixTLS {
×
763
                                                        defaultPort = "443"
×
764
                                                }
×
765
                                                targetHost = net.JoinHostPort(rURL.Hostname(), defaultPort)
×
766
                                        }
767

768
                                        conn, err = establishHTTPProxyTunnel(proxyURL, targetHost, proxyTimeout, proxyUsername, proxyPassword)
8✔
769
                                } else {
5,765✔
770
                                        // Direct connection
5,765✔
771
                                        conn, err = natsDialTimeout("tcp", url, dialTimeout)
5,765✔
772
                                }
5,765✔
773
                        }
774
                }
775
                if err != nil {
11,083✔
776
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
5,177✔
777
                        delay := reconnectDelay + jitter
5,177✔
778
                        attempts++
5,177✔
779
                        if s.shouldReportConnectErr(firstConnect, attempts) {
8,315✔
780
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
3,138✔
781
                        } else {
5,177✔
782
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
2,039✔
783
                        }
2,039✔
784
                        remote.Lock()
5,177✔
785
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
5,177✔
786
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
5,185✔
787
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
788
                                        s.checkJetStreamMigrate(remote)
8✔
789
                                })
8✔
790
                        }
791
                        remote.Unlock()
5,177✔
792
                        select {
5,177✔
793
                        case <-s.quitCh:
784✔
794
                                return false
784✔
795
                        case <-remote.quitCh:
×
796
                                return false
×
797
                        case <-time.After(delay):
4,392✔
798
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
4,392✔
799
                                // This will be used if JetStreamClusterMigrateDelay was not set
4,392✔
800
                                if jetstreamMigrateDelay == 0 {
8,709✔
801
                                        s.checkJetStreamMigrate(remote)
4,317✔
802
                                }
4,317✔
803
                                continue
4,392✔
804
                        }
805
                }
806
                remote.cancelMigrateTimer()
729✔
807
                // We can check here, but really we will have to check again when the server
729✔
808
                // is about to add to the `s.leafs` map later in the process.
729✔
809
                if !remote.stillValid() {
729✔
810
                        conn.Close()
×
811
                        return false
×
812
                }
×
813

814
                // We have a connection here to a remote server.
815
                // Go ahead and create our leaf node and return.
816
                s.createLeafNode(conn, rURL, remote, nil)
729✔
817

729✔
818
                // Clear any observer states if we had them.
729✔
819
                s.clearObserverState(remote)
729✔
820

729✔
821
                return true
729✔
822
        }
823

824
        return false
10✔
825
}
826

827
func (cfg *leafNodeCfg) cancelMigrateTimer() {
737✔
828
        cfg.Lock()
737✔
829
        stopAndClearTimer(&cfg.jsMigrateTimer)
737✔
830
        cfg.Unlock()
737✔
831
}
737✔
832

833
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
834
func (s *Server) clearObserverState(remote *leafNodeCfg) {
729✔
835
        s.mu.RLock()
729✔
836
        accName := remote.LocalAccount
729✔
837
        s.mu.RUnlock()
729✔
838

729✔
839
        acc, err := s.LookupAccount(accName)
729✔
840
        if err != nil {
731✔
841
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
842
                return
2✔
843
        }
2✔
844

845
        acc.jscmMu.Lock()
727✔
846
        defer acc.jscmMu.Unlock()
727✔
847

727✔
848
        // Walk all streams looking for any clustered stream, skip otherwise.
727✔
849
        for _, mset := range acc.streams() {
770✔
850
                node := mset.raftNode()
43✔
851
                if node == nil {
78✔
852
                        // Not R>1
35✔
853
                        continue
35✔
854
                }
855
                // Check consumers
856
                for _, o := range mset.getConsumers() {
10✔
857
                        if n := o.raftNode(); n != nil {
4✔
858
                                // Ensure we can become a leader again.
2✔
859
                                n.SetObserver(false)
2✔
860
                        }
2✔
861
                }
862
                // Ensure we can not become a leader again.
863
                node.SetObserver(false)
8✔
864
        }
865
}
866

867
// Check to see if we should migrate any assets from this account.
868
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
4,325✔
869
        s.mu.RLock()
4,325✔
870
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
4,325✔
871
        s.mu.RUnlock()
4,325✔
872

4,325✔
873
        if !shouldMigrate {
8,589✔
874
                return
4,264✔
875
        }
4,264✔
876

877
        acc, err := s.LookupAccount(accName)
61✔
878
        if err != nil {
61✔
879
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
880
                return
×
881
        }
×
882

883
        acc.jscmMu.Lock()
61✔
884
        defer acc.jscmMu.Unlock()
61✔
885

61✔
886
        // Walk all streams looking for any clustered stream, skip otherwise.
61✔
887
        // If we are the leader force stepdown.
61✔
888
        for _, mset := range acc.streams() {
91✔
889
                node := mset.raftNode()
30✔
890
                if node == nil {
30✔
891
                        // Not R>1
×
892
                        continue
×
893
                }
894
                // Collect any consumers
895
                for _, o := range mset.getConsumers() {
49✔
896
                        if n := o.raftNode(); n != nil {
38✔
897
                                n.StepDown()
19✔
898
                                // Ensure we can not become a leader while in this state.
19✔
899
                                n.SetObserver(true)
19✔
900
                        }
19✔
901
                }
902
                // Stepdown if this stream was leader.
903
                node.StepDown()
30✔
904
                // Ensure we can not become a leader while in this state.
30✔
905
                node.SetObserver(true)
30✔
906
        }
907
}
908

909
// Helper for checking.
910
func (s *Server) isLeafConnectDisabled() bool {
5,901✔
911
        s.mu.RLock()
5,901✔
912
        defer s.mu.RUnlock()
5,901✔
913
        return s.leafDisableConnect
5,901✔
914
}
5,901✔
915

916
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
917
// come from the server we connect to.
918
//
919
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
920
// However, this was causing failures for users that did not set the scheme (and
921
// their remote connections did not have a tls{} block).
922
// We now save the host name regardless in case the remote returns an INFO indicating
923
// that TLS is required.
924
//
925
// Lock held on entry.
926
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,229✔
927
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,245✔
928
                cfg.tlsName = u.Hostname()
16✔
929
        }
16✔
930
}
931

932
// Save off the username/password for when we connect using a bare URL
933
// that we get from the INFO protocol.
934
//
935
// Lock held on entry.
936
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,694✔
937
        if cfg.username == _EMPTY_ && u.User != nil {
1,952✔
938
                cfg.username = u.User.Username()
258✔
939
                cfg.password, _ = u.User.Password()
258✔
940
        }
258✔
941
}
942

943
// This starts the leafnode accept loop in a go routine, unless it
944
// is detected that the server has already been shutdown.
945
func (s *Server) startLeafNodeAcceptLoop() {
3,872✔
946
        // Snapshot server options.
3,872✔
947
        opts := s.getOpts()
3,872✔
948

3,872✔
949
        port := opts.LeafNode.Port
3,872✔
950
        if port == -1 {
7,592✔
951
                port = 0
3,720✔
952
        }
3,720✔
953

954
        if s.isShuttingDown() {
3,872✔
955
                return
×
956
        }
×
957

958
        s.mu.Lock()
3,872✔
959
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,872✔
960
        l, e := natsListen("tcp", hp)
3,872✔
961
        s.leafNodeListenerErr = e
3,872✔
962
        if e != nil {
3,872✔
963
                s.mu.Unlock()
×
964
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
965
                return
×
966
        }
×
967

968
        s.Noticef("Listening for leafnode connections on %s",
3,872✔
969
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,872✔
970

3,872✔
971
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,872✔
972
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,872✔
973
        // Do not set compression in this Info object, it would possibly cause
3,872✔
974
        // issues when sending asynchronous INFO to the remote.
3,872✔
975
        info := Info{
3,872✔
976
                ID:            s.info.ID,
3,872✔
977
                Name:          s.info.Name,
3,872✔
978
                Version:       s.info.Version,
3,872✔
979
                GitCommit:     gitCommit,
3,872✔
980
                GoVersion:     runtime.Version(),
3,872✔
981
                AuthRequired:  true,
3,872✔
982
                TLSRequired:   tlsRequired,
3,872✔
983
                TLSVerify:     tlsVerify,
3,872✔
984
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,872✔
985
                Headers:       s.supportsHeaders(),
3,872✔
986
                JetStream:     opts.JetStream,
3,872✔
987
                Domain:        opts.JetStreamDomain,
3,872✔
988
                Proto:         s.getServerProto(),
3,872✔
989
                InfoOnConnect: true,
3,872✔
990
                JSApiLevel:    JSApiLevel,
3,872✔
991
        }
3,872✔
992
        // If we have selected a random port...
3,872✔
993
        if port == 0 {
7,592✔
994
                // Write resolved port back to options.
3,720✔
995
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,720✔
996
        }
3,720✔
997

998
        s.leafNodeInfo = info
3,872✔
999
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,872✔
1000
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,872✔
1001
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
1002
                l.Close()
×
1003
                s.mu.Unlock()
×
1004
                return
×
1005
        }
×
1006
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,872✔
1007
        s.generateLeafNodeInfoJSON()
3,872✔
1008

3,872✔
1009
        // Setup state that can enable shutdown
3,872✔
1010
        s.leafNodeListener = l
3,872✔
1011

3,872✔
1012
        // As of now, a server that does not have remotes configured would
3,872✔
1013
        // never solicit a connection, so we should not have to warn if
3,872✔
1014
        // InsecureSkipVerify is set in main LeafNodes config (since
3,872✔
1015
        // this TLS setting matters only when soliciting a connection).
3,872✔
1016
        // Still, warn if insecure is set in any of LeafNode block.
3,872✔
1017
        // We need to check remotes, even if tls is not required on accept.
3,872✔
1018
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,872✔
1019
        if !warn {
7,742✔
1020
                for _, r := range opts.LeafNode.Remotes {
4,051✔
1021
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
181✔
1022
                                warn = true
×
1023
                                break
×
1024
                        }
1025
                }
1026
        }
1027
        if warn {
3,874✔
1028
                s.Warnf(leafnodeTLSInsecureWarning)
2✔
1029
        }
2✔
1030
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,629✔
1031
        s.mu.Unlock()
3,872✔
1032
}
1033

1034
// RegEx to match a creds file with user JWT and Seed.
1035
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
1036

1037
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
1038
// Lock should be held entering here.
1039
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
597✔
1040
        // We support basic user/pass and operator based user JWT with signatures.
597✔
1041
        cinfo := leafConnectInfo{
597✔
1042
                Version:       VERSION,
597✔
1043
                ID:            c.srv.info.ID,
597✔
1044
                Domain:        c.srv.info.Domain,
597✔
1045
                Name:          c.srv.info.Name,
597✔
1046
                Hub:           c.leaf.remote.Hub,
597✔
1047
                Cluster:       clusterName,
597✔
1048
                Headers:       headers,
597✔
1049
                JetStream:     c.acc.jetStreamConfigured(),
597✔
1050
                DenyPub:       c.leaf.remote.DenyImports,
597✔
1051
                Compression:   c.leaf.compression,
597✔
1052
                RemoteAccount: c.acc.GetName(),
597✔
1053
                Proto:         c.srv.getServerProto(),
597✔
1054
                Isolate:       c.leaf.remote.RequestIsolation,
597✔
1055
        }
597✔
1056

597✔
1057
        // If a signature callback is specified, this takes precedence over anything else.
597✔
1058
        if cb := c.leaf.remote.SignatureCB; cb != nil {
602✔
1059
                nonce := c.nonce
5✔
1060
                c.mu.Unlock()
5✔
1061
                jwt, sigraw, err := cb(nonce)
5✔
1062
                c.mu.Lock()
5✔
1063
                if err == nil && c.isClosed() {
6✔
1064
                        err = ErrConnectionClosed
1✔
1065
                }
1✔
1066
                if err != nil {
7✔
1067
                        c.Errorf("Error signing the nonce: %v", err)
2✔
1068
                        return err
2✔
1069
                }
2✔
1070
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
1071
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
1072

1073
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
636✔
1074
                // Check for credentials first, that will take precedence..
44✔
1075
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
44✔
1076
                contents, err := os.ReadFile(creds)
44✔
1077
                if err != nil {
44✔
1078
                        c.Errorf("%v", err)
×
1079
                        return err
×
1080
                }
×
1081
                defer wipeSlice(contents)
44✔
1082
                items := credsRe.FindAllSubmatch(contents, -1)
44✔
1083
                if len(items) < 2 {
44✔
1084
                        c.Errorf("Credentials file malformed")
×
1085
                        return err
×
1086
                }
×
1087
                // First result should be the user JWT.
1088
                // We copy here so that the file containing the seed will be wiped appropriately.
1089
                raw := items[0][1]
44✔
1090
                tmp := make([]byte, len(raw))
44✔
1091
                copy(tmp, raw)
44✔
1092
                // Seed is second item.
44✔
1093
                kp, err := nkeys.FromSeed(items[1][1])
44✔
1094
                if err != nil {
44✔
1095
                        c.Errorf("Credentials file has malformed seed")
×
1096
                        return err
×
1097
                }
×
1098
                // Wipe our key on exit.
1099
                defer kp.Wipe()
44✔
1100

44✔
1101
                sigraw, _ := kp.Sign(c.nonce)
44✔
1102
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
44✔
1103
                cinfo.JWT = bytesToString(tmp)
44✔
1104
                cinfo.Sig = sig
44✔
1105
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
553✔
1106
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
1107
                if err != nil {
5✔
1108
                        c.Errorf("Remote nkey has malformed seed")
×
1109
                        return err
×
1110
                }
×
1111
                // Wipe our key on exit.
1112
                defer kp.Wipe()
5✔
1113
                sigraw, _ := kp.Sign(c.nonce)
5✔
1114
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
1115
                pkey, _ := kp.PublicKey()
5✔
1116
                cinfo.Nkey = pkey
5✔
1117
                cinfo.Sig = sig
5✔
1118
        }
1119
        // In addition, and this is to allow auth callout, set user/password or
1120
        // token if applicable.
1121
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
876✔
1122
                cinfo.User = userInfo.Username()
281✔
1123
                var ok bool
281✔
1124
                cinfo.Pass, ok = userInfo.Password()
281✔
1125
                // For backward compatibility, if only username is provided, set both
281✔
1126
                // Token and User, not just Token.
281✔
1127
                if !ok {
289✔
1128
                        cinfo.Token = cinfo.User
8✔
1129
                }
8✔
1130
        } else if c.leaf.remote.username != _EMPTY_ {
320✔
1131
                cinfo.User = c.leaf.remote.username
6✔
1132
                cinfo.Pass = c.leaf.remote.password
6✔
1133
                // For backward compatibility, if only username is provided, set both
6✔
1134
                // Token and User, not just Token.
6✔
1135
                if cinfo.Pass == _EMPTY_ {
6✔
1136
                        cinfo.Token = cinfo.User
×
1137
                }
×
1138
        }
1139
        b, err := json.Marshal(cinfo)
595✔
1140
        if err != nil {
595✔
1141
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
1142
                return err
×
1143
        }
×
1144
        // Although this call is made before the writeLoop is created,
1145
        // we don't really need to send in place. The protocol will be
1146
        // sent out by the writeLoop.
1147
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
595✔
1148
        return nil
595✔
1149
}
1150

1151
// Makes a deep copy of the LeafNode Info structure.
1152
// The server lock is held on entry.
1153
func (s *Server) copyLeafNodeInfo() *Info {
2,384✔
1154
        clone := s.leafNodeInfo
2,384✔
1155
        // Copy the array of urls.
2,384✔
1156
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,349✔
1157
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
1,965✔
1158
        }
1,965✔
1159
        return &clone
2,384✔
1160
}
1161

1162
// Adds a LeafNode URL that we get when a route connects to the Info structure.
1163
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1164
// Returns a boolean indicating if the URL was added or not.
1165
// Server lock is held on entry
1166
func (s *Server) addLeafNodeURL(urlStr string) bool {
7,892✔
1167
        if s.leafURLsMap.addUrl(urlStr) {
15,779✔
1168
                s.generateLeafNodeInfoJSON()
7,887✔
1169
                return true
7,887✔
1170
        }
7,887✔
1171
        return false
5✔
1172
}
1173

1174
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
1175
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
1176
// Returns a boolean indicating if the URL was removed or not.
1177
// Server lock is held on entry.
1178
func (s *Server) removeLeafNodeURL(urlStr string) bool {
7,892✔
1179
        // Don't need to do this if we are removing the route connection because
7,892✔
1180
        // we are shuting down...
7,892✔
1181
        if s.isShuttingDown() {
12,047✔
1182
                return false
4,155✔
1183
        }
4,155✔
1184
        if s.leafURLsMap.removeUrl(urlStr) {
7,470✔
1185
                s.generateLeafNodeInfoJSON()
3,733✔
1186
                return true
3,733✔
1187
        }
3,733✔
1188
        return false
4✔
1189
}
1190

1191
// Server lock is held on entry
1192
func (s *Server) generateLeafNodeInfoJSON() {
15,492✔
1193
        s.leafNodeInfo.Cluster = s.cachedClusterName()
15,492✔
1194
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
15,492✔
1195
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
15,492✔
1196
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
15,492✔
1197
}
15,492✔
1198

1199
// Sends an async INFO protocol so that the connected servers can update
1200
// their list of LeafNode urls.
1201
func (s *Server) sendAsyncLeafNodeInfo() {
11,620✔
1202
        for _, c := range s.leafs {
11,715✔
1203
                c.mu.Lock()
95✔
1204
                c.enqueueProto(s.leafNodeInfoJSON)
95✔
1205
                c.mu.Unlock()
95✔
1206
        }
95✔
1207
}
1208

1209
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
1210
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,520✔
1211
        // Snapshot server options.
1,520✔
1212
        opts := s.getOpts()
1,520✔
1213

1,520✔
1214
        maxPay := int32(opts.MaxPayload)
1,520✔
1215
        maxSubs := int32(opts.MaxSubs)
1,520✔
1216
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,520✔
1217
        if maxSubs == 0 {
3,039✔
1218
                maxSubs = -1
1,519✔
1219
        }
1,519✔
1220
        now := time.Now().UTC()
1,520✔
1221

1,520✔
1222
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,520✔
1223
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,520✔
1224
        c.leaf = &leaf{}
1,520✔
1225

1,520✔
1226
        // If the leafnode subject interest should be isolated, flag it here.
1,520✔
1227
        s.optsMu.RLock()
1,520✔
1228
        if c.leaf.isolated = s.opts.LeafNode.IsolateLeafnodeInterest; !c.leaf.isolated && remote != nil {
2,249✔
1229
                c.leaf.isolated = remote.LocalIsolation
729✔
1230
        }
729✔
1231
        s.optsMu.RUnlock()
1,520✔
1232

1,520✔
1233
        // For accepted LN connections, ws will be != nil if it was accepted
1,520✔
1234
        // through the Websocket port.
1,520✔
1235
        c.ws = ws
1,520✔
1236

1,520✔
1237
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,520✔
1238
        // a remote Leaf Node connection as a websocket connection.
1,520✔
1239
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,572✔
1240
                remote.RLock()
52✔
1241
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
52✔
1242
                remote.RUnlock()
52✔
1243
        }
52✔
1244

1245
        // Determines if we are soliciting the connection or not.
1246
        var solicited bool
1,520✔
1247
        var acc *Account
1,520✔
1248
        var remoteSuffix string
1,520✔
1249
        if remote != nil {
2,249✔
1250
                // For now, if lookup fails, we will constantly try
729✔
1251
                // to recreate this LN connection.
729✔
1252
                lacc := remote.LocalAccount
729✔
1253
                var err error
729✔
1254
                acc, err = s.LookupAccount(lacc)
729✔
1255
                if err != nil {
731✔
1256
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1257
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1258
                        // remote needs to be set or retry won't happen
2✔
1259
                        c.leaf.remote = remote
2✔
1260
                        c.closeConnection(MissingAccount)
2✔
1261
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1262
                        return nil
2✔
1263
                }
2✔
1264
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
727✔
1265
        }
1266

1267
        c.mu.Lock()
1,518✔
1268
        c.initClient()
1,518✔
1269
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,518✔
1270

1,518✔
1271
        var (
1,518✔
1272
                tlsFirst         bool
1,518✔
1273
                tlsFirstFallback time.Duration
1,518✔
1274
                infoTimeout      time.Duration
1,518✔
1275
        )
1,518✔
1276
        if remote != nil {
2,245✔
1277
                solicited = true
727✔
1278
                remote.Lock()
727✔
1279
                c.leaf.remote = remote
727✔
1280
                c.setPermissions(remote.perms)
727✔
1281
                if !c.leaf.remote.Hub {
1,442✔
1282
                        c.leaf.isSpoke = true
715✔
1283
                }
715✔
1284
                tlsFirst = remote.TLSHandshakeFirst
727✔
1285
                infoTimeout = remote.FirstInfoTimeout
727✔
1286
                remote.Unlock()
727✔
1287
                c.acc = acc
727✔
1288
        } else {
791✔
1289
                c.flags.set(expectConnect)
791✔
1290
                if ws != nil {
825✔
1291
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
34✔
1292
                }
34✔
1293
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
791✔
1294
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
792✔
1295
                        tlsFirstFallback = f
1✔
1296
                }
1✔
1297
        }
1298
        c.mu.Unlock()
1,518✔
1299

1,518✔
1300
        var nonce [nonceLen]byte
1,518✔
1301
        var info *Info
1,518✔
1302

1,518✔
1303
        // Grab this before the client lock below.
1,518✔
1304
        if !solicited {
2,309✔
1305
                // Grab server variables
791✔
1306
                s.mu.Lock()
791✔
1307
                info = s.copyLeafNodeInfo()
791✔
1308
                // For tests that want to simulate old servers, do not set the compression
791✔
1309
                // on the INFO protocol if configured with CompressionNotSupported.
791✔
1310
                // Also suppress it if WebSocket compression is already in use, otherwise
791✔
1311
                // an old soliciting peer would honor the advertised mode, switch to S2,
791✔
1312
                // and then wait forever for a compressed INFO response from us.
791✔
1313
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported && (ws == nil || !ws.compress) {
1,574✔
1314
                        info.Compression = cm
783✔
1315
                }
783✔
1316
                // We always send a nonce for LEAF connections. Do not change that without
1317
                // taking into account presence of proxy trusted keys.
1318
                s.generateNonce(nonce[:])
791✔
1319
                s.mu.Unlock()
791✔
1320
        }
1321

1322
        // Grab lock
1323
        c.mu.Lock()
1,518✔
1324

1,518✔
1325
        var preBuf []byte
1,518✔
1326
        if solicited {
2,245✔
1327
                // For websocket connection, we need to send an HTTP request,
727✔
1328
                // and get the response before starting the readLoop to get
727✔
1329
                // the INFO, etc..
727✔
1330
                if c.isWebsocket() {
779✔
1331
                        var err error
52✔
1332
                        var closeReason ClosedState
52✔
1333

52✔
1334
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
52✔
1335
                        if err != nil {
72✔
1336
                                c.Errorf("Error soliciting websocket connection: %v", err)
20✔
1337
                                c.mu.Unlock()
20✔
1338
                                if closeReason != 0 {
36✔
1339
                                        c.closeConnection(closeReason)
16✔
1340
                                }
16✔
1341
                                return nil
20✔
1342
                        }
1343
                } else {
675✔
1344
                        // If configured to do TLS handshake first
675✔
1345
                        if tlsFirst {
679✔
1346
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1347
                                        c.mu.Unlock()
1✔
1348
                                        return nil
1✔
1349
                                }
1✔
1350
                        }
1351
                        // We need to wait for the info, but not for too long.
1352
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
674✔
1353
                }
1354

1355
                // We will process the INFO from the readloop and finish by
1356
                // sending the CONNECT and finish registration later.
1357
        } else {
791✔
1358
                // Send our info to the other side.
791✔
1359
                // Remember the nonce we sent here for signatures, etc.
791✔
1360
                c.nonce = make([]byte, nonceLen)
791✔
1361
                copy(c.nonce, nonce[:])
791✔
1362
                info.Nonce = bytesToString(c.nonce)
791✔
1363
                info.CID = c.cid
791✔
1364
                proto := generateInfoJSON(info)
791✔
1365

791✔
1366
                var pre []byte
791✔
1367
                // We need first to check for "TLS First" fallback delay.
791✔
1368
                if tlsFirstFallback > 0 {
792✔
1369
                        // We wait and see if we are getting any data. Since we did not send
1✔
1370
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1371
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1372
                        // if it is a rogue agent and not an actual client performing the
1✔
1373
                        // TLS handshake, the error will be detected when performing the
1✔
1374
                        // handshake on our side.
1✔
1375
                        pre = make([]byte, 4)
1✔
1376
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1377
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1378
                        c.nc.SetReadDeadline(time.Time{})
1✔
1379
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1380
                        // with the TLS handshake.
1✔
1381
                        if n > 0 {
1✔
1382
                                pre = pre[:n]
×
1383
                        } else {
1✔
1384
                                // We did not get anything so we will send the INFO protocol.
1✔
1385
                                pre = nil
1✔
1386
                                // Set the boolean to false for the rest of the function.
1✔
1387
                                tlsFirst = false
1✔
1388
                        }
1✔
1389
                }
1390

1391
                if !tlsFirst {
1,577✔
1392
                        // We have to send from this go routine because we may
786✔
1393
                        // have to block for TLS handshake before we start our
786✔
1394
                        // writeLoop go routine. The other side needs to receive
786✔
1395
                        // this before it can initiate the TLS handshake..
786✔
1396
                        c.sendProtoNow(proto)
786✔
1397

786✔
1398
                        // The above call could have marked the connection as closed (due to TCP error).
786✔
1399
                        if c.isClosed() {
786✔
1400
                                c.mu.Unlock()
×
1401
                                c.closeConnection(WriteError)
×
1402
                                return nil
×
1403
                        }
×
1404
                }
1405

1406
                // Check to see if we need to spin up TLS.
1407
                if !c.isWebsocket() && info.TLSRequired {
864✔
1408
                        // If we have a prebuffer create a multi-reader.
73✔
1409
                        if len(pre) > 0 {
73✔
1410
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1411
                        }
×
1412
                        // Perform server-side TLS handshake.
1413
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
119✔
1414
                                c.mu.Unlock()
46✔
1415
                                return nil
46✔
1416
                        }
46✔
1417
                }
1418

1419
                // If the user wants the TLS handshake to occur first, now that it is
1420
                // done, send the INFO protocol.
1421
                if tlsFirst {
748✔
1422
                        c.flags.set(didTLSFirst)
3✔
1423
                        c.sendProtoNow(proto)
3✔
1424
                        if c.isClosed() {
3✔
1425
                                c.mu.Unlock()
×
1426
                                c.closeConnection(WriteError)
×
1427
                                return nil
×
1428
                        }
×
1429
                }
1430

1431
                // Leaf nodes will always require a CONNECT to let us know
1432
                // when we are properly bound to an account.
1433
                //
1434
                // If compression is configured, we can't set the authTimer here because
1435
                // it would cause the parser to fail any incoming protocol that is not a
1436
                // CONNECT (and we need to exchange INFO protocols for compression
1437
                // negotiation). So instead, use the ping timer until we are done with
1438
                // negotiation and can set the auth timer.
1439
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
745✔
1440
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,273✔
1441
                        c.ping.tmr = time.AfterFunc(timeout, func() {
538✔
1442
                                c.authTimeout()
10✔
1443
                        })
10✔
1444
                } else {
217✔
1445
                        c.setAuthTimer(timeout)
217✔
1446
                }
217✔
1447
        }
1448

1449
        // Keep track in case server is shutdown before we can successfully register.
1450
        if !s.addToTempClients(c.cid, c) {
1,452✔
1451
                c.mu.Unlock()
1✔
1452
                c.setNoReconnect()
1✔
1453
                c.closeConnection(ServerShutdown)
1✔
1454
                return nil
1✔
1455
        }
1✔
1456

1457
        // Spin up the read loop.
1458
        s.startGoRoutine(func() { c.readLoop(preBuf) })
2,900✔
1459

1460
        // We will spin the write loop for solicited connections only
1461
        // when processing the INFO and after switching to TLS if needed.
1462
        if !solicited {
2,195✔
1463
                s.startGoRoutine(func() { c.writeLoop() })
1,490✔
1464
        }
1465

1466
        c.mu.Unlock()
1,450✔
1467

1,450✔
1468
        return c
1,450✔
1469
}
1470

1471
// Will perform the client-side TLS handshake if needed. Assumes that this
1472
// is called by the solicit side (remote will be non nil). Returns `true`
1473
// if TLS is required, `false` otherwise.
1474
// Lock held on entry.
1475
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,680✔
1476
        // Check if TLS is required and gather TLS config variables.
1,680✔
1477
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,680✔
1478
        if !tlsRequired {
3,285✔
1479
                return false, nil
1,605✔
1480
        }
1,605✔
1481

1482
        // If TLS required, peform handshake.
1483
        // Get the URL that was used to connect to the remote server.
1484
        rURL := remote.getCurrentURL()
75✔
1485

75✔
1486
        // Perform the client-side TLS handshake.
75✔
1487
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
109✔
1488
                // Check if we need to reset the remote's TLS name.
34✔
1489
                if resetTLSName {
34✔
1490
                        remote.Lock()
×
1491
                        remote.tlsName = _EMPTY_
×
1492
                        remote.Unlock()
×
1493
                }
×
1494
                return false, err
34✔
1495
        }
1496
        return true, nil
41✔
1497
}
1498

1499
func (c *client) processLeafnodeInfo(info *Info) {
2,337✔
1500
        c.mu.Lock()
2,337✔
1501
        if c.leaf == nil || c.isClosed() {
2,338✔
1502
                c.mu.Unlock()
1✔
1503
                return
1✔
1504
        }
1✔
1505
        s := c.srv
2,336✔
1506
        opts := s.getOpts()
2,336✔
1507
        remote := c.leaf.remote
2,336✔
1508
        didSolicit := remote != nil
2,336✔
1509
        firstINFO := !c.flags.isSet(infoReceived)
2,336✔
1510

2,336✔
1511
        // In case of websocket, the TLS handshake has been already done.
2,336✔
1512
        // So check only for non websocket connections and for configurations
2,336✔
1513
        // where the TLS Handshake was not done first.
2,336✔
1514
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
3,960✔
1515
                // If the server requires TLS, we need to set this in the remote
1,624✔
1516
                // otherwise if there is no TLS configuration block for the remote,
1,624✔
1517
                // the solicit side will not attempt to perform the TLS handshake.
1,624✔
1518
                if firstINFO && info.TLSRequired {
1,683✔
1519
                        // Check for TLS/proxy configuration mismatch
59✔
1520
                        if remote.Proxy.URL != _EMPTY_ && !remote.TLS && remote.TLSConfig == nil {
59✔
1521
                                c.mu.Unlock()
×
1522
                                c.Errorf("TLS configuration mismatch: Hub requires TLS but leafnode remote is not configured for TLS. When using a proxy, ensure TLS leafnode configuration matches the Hub requirements.")
×
1523
                                c.closeConnection(TLSHandshakeError)
×
1524
                                return
×
1525
                        }
×
1526
                        remote.TLS = true
59✔
1527
                }
1528
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,653✔
1529
                        c.mu.Unlock()
29✔
1530
                        return
29✔
1531
                }
29✔
1532
        }
1533

1534
        // Check for compression, unless already done.
1535
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,467✔
1536
                // A solicited leafnode connection must first receive a leafnode INFO.
1,160✔
1537
                // Classify wrong-port connections before any leaf-specific negotiation.
1,160✔
1538
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
1,222✔
1539
                        c.mu.Unlock()
62✔
1540
                        c.Errorf(ErrConnectedToWrongPort.Error())
62✔
1541
                        c.closeConnection(WrongPort)
62✔
1542
                        return
62✔
1543
                }
62✔
1544

1545
                // Prevent from getting back here.
1546
                c.flags.set(compressionNegotiated)
1,098✔
1547

1,098✔
1548
                var co *CompressionOpts
1,098✔
1549
                if !didSolicit {
1,594✔
1550
                        co = &opts.LeafNode.Compression
496✔
1551
                } else {
1,098✔
1552
                        co = &remote.Compression
602✔
1553
                }
602✔
1554
                if needsCompression(co.Mode) {
2,187✔
1555
                        // Release client lock since following function will need server lock.
1,089✔
1556
                        c.mu.Unlock()
1,089✔
1557
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,089✔
1558
                        if err != nil {
1,089✔
1559
                                c.sendErrAndErr(err.Error())
×
1560
                                c.closeConnection(ProtocolViolation)
×
1561
                                return
×
1562
                        }
×
1563
                        if compress {
2,082✔
1564
                                // Done for now, will get back another INFO protocol...
993✔
1565
                                return
993✔
1566
                        }
993✔
1567
                        // No compression because one side does not want/can't, so proceed.
1568
                        c.mu.Lock()
96✔
1569
                        // Check that the connection did not close if the lock was released.
96✔
1570
                        if c.isClosed() {
96✔
1571
                                c.mu.Unlock()
×
1572
                                return
×
1573
                        }
×
1574
                } else {
9✔
1575
                        // Coming from an old server, the Compression field would be the empty
9✔
1576
                        // string. For servers that are configured with CompressionNotSupported,
9✔
1577
                        // this makes them behave as old servers.
9✔
1578
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
11✔
1579
                                c.leaf.compression = CompressionNotSupported
2✔
1580
                        } else {
9✔
1581
                                c.leaf.compression = CompressionOff
7✔
1582
                        }
7✔
1583
                }
1584
                // Accepting side does not normally process an INFO protocol during
1585
                // initial connection handshake. So we keep it consistent by returning
1586
                // if we are not soliciting.
1587
                if !didSolicit {
106✔
1588
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1589
                        // clear the ping timer and set the auth timer now that the compression
1✔
1590
                        // negotiation is done.
1✔
1591
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1592
                                clearTimer(&c.ping.tmr)
×
1593
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1594
                        }
×
1595
                        c.mu.Unlock()
1✔
1596
                        return
1✔
1597
                }
1598
                // Fall through and process the INFO protocol as usual.
1599
        }
1600

1601
        // Note: For now, only the initial INFO has a nonce. We
1602
        // will probably do auto key rotation at some point.
1603
        if firstINFO {
1,890✔
1604
                // Mark that the INFO protocol has been received.
639✔
1605
                c.flags.set(infoReceived)
639✔
1606
                // Prevent connecting to non leafnode port. Need to do this only for
639✔
1607
                // the first INFO, not for async INFO updates...
639✔
1608
                //
639✔
1609
                // Content of INFO sent by the server when accepting a tcp connection.
639✔
1610
                // -------------------------------------------------------------------
639✔
1611
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
639✔
1612
                // -------------------------------------------------------------------
639✔
1613
                //      CLIENT    |  X* |        X**        |              |         |
639✔
1614
                //      ROUTE     |     |        X**        |      X***    |         |
639✔
1615
                //     GATEWAY    |     |                   |              |    X    |
639✔
1616
                //     LEAFNODE   |  X  |                   |       X      |         |
639✔
1617
                // -------------------------------------------------------------------
639✔
1618
                // *   Not on older servers.
639✔
1619
                // **  Not if "no advertise" is enabled.
639✔
1620
                // *** Not if leafnode's "no advertise" is enabled.
639✔
1621
                //
639✔
1622
                // Reject a cluster that contains spaces.
639✔
1623
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
640✔
1624
                        c.mu.Unlock()
1✔
1625
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1626
                        c.closeConnection(ProtocolViolation)
1✔
1627
                        return
1✔
1628
                }
1✔
1629
                // For solicited outbound leaf connections, capture the remote's nonce.
1630
                // For inbound leaf connections, keep using the server-issued nonce that
1631
                // was sent in our initial INFO and must be signed in CONNECT.
1632
                if didSolicit {
1,235✔
1633
                        c.nonce = []byte(info.Nonce)
597✔
1634
                }
597✔
1635
                if info.TLSRequired && didSolicit {
668✔
1636
                        remote.TLS = true
30✔
1637
                }
30✔
1638
                supportsHeaders := c.srv.supportsHeaders()
638✔
1639
                c.headers = supportsHeaders && info.Headers
638✔
1640

638✔
1641
                // Remember the remote server.
638✔
1642
                // Pre 2.2.0 servers are not sending their server name.
638✔
1643
                // In that case, use info.ID, which, for those servers, matches
638✔
1644
                // the content of the field `Name` in the leafnode CONNECT protocol.
638✔
1645
                if info.Name == _EMPTY_ {
638✔
1646
                        c.leaf.remoteServer = info.ID
×
1647
                } else {
638✔
1648
                        c.leaf.remoteServer = info.Name
638✔
1649
                }
638✔
1650
                c.leaf.remoteDomain = info.Domain
638✔
1651
                c.leaf.remoteCluster = info.Cluster
638✔
1652
                // We send the protocol version in the INFO protocol.
638✔
1653
                // Keep track of it, so we know if this connection supports message
638✔
1654
                // tracing for instance.
638✔
1655
                c.opts.Protocol = info.Proto
638✔
1656
        }
1657

1658
        // For both initial INFO and async INFO protocols, Possibly
1659
        // update our list of remote leafnode URLs we can connect to,
1660
        // unless we are instructed not to.
1661
        if didSolicit && !remote.IgnoreDiscoveredServers &&
1,250✔
1662
                (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,416✔
1663
                // Consider the incoming array as the most up-to-date
1,166✔
1664
                // representation of the remote cluster's list of URLs.
1,166✔
1665
                c.updateLeafNodeURLs(info)
1,166✔
1666
        }
1,166✔
1667

1668
        // Only solicited leafnode connections trust permission updates from INFO.
1669
        if didSolicit && (info.Import != nil || info.Export != nil) {
1,259✔
1670
                perms := &Permissions{
9✔
1671
                        Publish:   info.Export,
9✔
1672
                        Subscribe: info.Import,
9✔
1673
                }
9✔
1674
                // Check if we have local deny clauses that we need to merge.
9✔
1675
                if remote := c.leaf.remote; remote != nil {
18✔
1676
                        if len(remote.DenyExports) > 0 {
10✔
1677
                                if perms.Publish == nil {
1✔
1678
                                        perms.Publish = &SubjectPermission{}
×
1679
                                }
×
1680
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1681
                        }
1682
                        if len(remote.DenyImports) > 0 {
10✔
1683
                                if perms.Subscribe == nil {
1✔
1684
                                        perms.Subscribe = &SubjectPermission{}
×
1685
                                }
×
1686
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1687
                        }
1688
                }
1689
                c.setPermissions(perms)
9✔
1690
        }
1691

1692
        var resumeConnect bool
1,250✔
1693

1,250✔
1694
        // If this is a remote connection and this is the first INFO protocol,
1,250✔
1695
        // then we need to finish the connect process by sending CONNECT, etc..
1,250✔
1696
        if firstINFO && didSolicit {
1,847✔
1697
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
597✔
1698
                c.nc.SetDeadline(time.Time{})
597✔
1699
                resumeConnect = true
597✔
1700
        } else if !firstINFO && didSolicit {
1,820✔
1701
                c.leaf.remoteAccName = info.RemoteAccount
570✔
1702
        }
570✔
1703

1704
        // Check if we have the remote account information and if so make sure it's stored.
1705
        if info.RemoteAccount != _EMPTY_ {
1,809✔
1706
                if c.acc == nil {
559✔
1707
                        c.mu.Unlock()
×
1708
                        c.sendErr("Authorization Violation")
×
1709
                        c.closeConnection(ProtocolViolation)
×
1710
                        return
×
1711
                }
×
1712
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
559✔
1713
        }
1714
        c.mu.Unlock()
1,250✔
1715

1,250✔
1716
        finishConnect := info.ConnectInfo
1,250✔
1717
        if resumeConnect && s != nil {
1,847✔
1718
                s.leafNodeResumeConnectProcess(c)
597✔
1719
                if !info.InfoOnConnect {
597✔
1720
                        finishConnect = true
×
1721
                }
×
1722
        }
1723
        if finishConnect {
1,809✔
1724
                s.leafNodeFinishConnectProcess(c)
559✔
1725
        }
559✔
1726

1727
        // Check to see if we need to kick any internal source or mirror consumers.
1728
        // This will be a no-op if JetStream not enabled for this server or if the bound account
1729
        // does not have jetstream.
1730
        s.checkInternalSyncConsumers(c.acc)
1,250✔
1731
}
1732

1733
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,089✔
1734
        // If WebSocket compression is already negotiated on this connection then
1,089✔
1735
        // we shouldn't layer S2 compression on top of it.
1,089✔
1736
        c.mu.Lock()
1,089✔
1737
        if c.ws != nil && c.ws.compress {
1,095✔
1738
                c.leaf.compression = CompressionOff
6✔
1739
                c.mu.Unlock()
6✔
1740
                return false, nil
6✔
1741
        }
6✔
1742
        c.mu.Unlock()
1,083✔
1743
        // Negotiate the appropriate compression mode (or no compression)
1,083✔
1744
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,083✔
1745
        if err != nil {
1,083✔
1746
                return false, err
×
1747
        }
×
1748
        c.mu.Lock()
1,083✔
1749
        // For "auto" mode, set the initial compression mode based on RTT
1,083✔
1750
        if cm == CompressionS2Auto {
2,040✔
1751
                if c.rttStart.IsZero() {
1,914✔
1752
                        c.rtt = computeRTT(c.start)
957✔
1753
                }
957✔
1754
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
957✔
1755
        }
1756
        // Keep track of the negotiated compression mode.
1757
        c.leaf.compression = cm
1,083✔
1758
        cid := c.cid
1,083✔
1759
        var nonce string
1,083✔
1760
        if !didSolicit {
1,578✔
1761
                nonce = bytesToString(c.nonce)
495✔
1762
        }
495✔
1763
        c.mu.Unlock()
1,083✔
1764

1,083✔
1765
        if !needsCompression(cm) {
1,173✔
1766
                return false, nil
90✔
1767
        }
90✔
1768

1769
        // If we end-up doing compression...
1770

1771
        // Generate an INFO with the chosen compression mode.
1772
        s.mu.Lock()
993✔
1773
        info := s.copyLeafNodeInfo()
993✔
1774
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
993✔
1775
        infoProto := generateInfoJSON(info)
993✔
1776
        s.mu.Unlock()
993✔
1777

993✔
1778
        // If we solicited, then send this INFO protocol BEFORE switching
993✔
1779
        // to compression writer. However, if we did not, we send it after.
993✔
1780
        c.mu.Lock()
993✔
1781
        if didSolicit {
1,491✔
1782
                c.enqueueProto(infoProto)
498✔
1783
                // Make sure it is completely flushed (the pending bytes goes to
498✔
1784
                // 0) before proceeding.
498✔
1785
                for c.out.pb > 0 && !c.isClosed() {
996✔
1786
                        c.flushOutbound()
498✔
1787
                }
498✔
1788
        }
1789
        // This is to notify the readLoop that it should switch to a
1790
        // (de)compression reader.
1791
        c.in.flags.set(switchToCompression)
993✔
1792
        // Create the compress writer before queueing the INFO protocol for
993✔
1793
        // a route that did not solicit. It will make sure that that proto
993✔
1794
        // is sent with compression on.
993✔
1795
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
993✔
1796
        if !didSolicit {
1,488✔
1797
                c.enqueueProto(infoProto)
495✔
1798
        }
495✔
1799
        c.mu.Unlock()
993✔
1800
        return true, nil
993✔
1801
}
1802

1803
// When getting a leaf node INFO protocol, use the provided
1804
// array of urls to update the list of possible endpoints.
1805
func (c *client) updateLeafNodeURLs(info *Info) {
1,166✔
1806
        cfg := c.leaf.remote
1,166✔
1807
        cfg.Lock()
1,166✔
1808
        defer cfg.Unlock()
1,166✔
1809

1,166✔
1810
        // We have ensured that if a remote has a WS scheme, then all are.
1,166✔
1811
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,166✔
1812
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,230✔
1813
                // It does not really matter if we use "ws://" or "wss://" here since
64✔
1814
                // we will have already marked that the remote should use TLS anyway.
64✔
1815
                // But use proper scheme for log statements, etc...
64✔
1816
                proto := wsSchemePrefix
64✔
1817
                if cfg.TLS {
64✔
1818
                        proto = wsSchemePrefixTLS
×
1819
                }
×
1820
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
64✔
1821
                return
64✔
1822
        }
1823
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,102✔
1824
}
1825

1826
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,166✔
1827
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,166✔
1828
        // Add the ones we receive in the protocol
1,166✔
1829
        for _, surl := range URLs {
3,210✔
1830
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,044✔
1831
                if err != nil {
2,044✔
1832
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1833
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1834
                        continue
×
1835
                }
1836
                // Do not add if it's the same as what we already have configured.
1837
                var dup bool
2,044✔
1838
                for _, u := range cfg.URLs {
5,175✔
1839
                        // URLs that we receive never have user info, but the
3,131✔
1840
                        // ones that were configured may have. Simply compare
3,131✔
1841
                        // host and port to decide if they are equal or not.
3,131✔
1842
                        if url.Host == u.Host && url.Port() == u.Port() {
4,640✔
1843
                                dup = true
1,509✔
1844
                                break
1,509✔
1845
                        }
1846
                }
1847
                if !dup {
2,579✔
1848
                        cfg.urls = append(cfg.urls, url)
535✔
1849
                        cfg.saveTLSHostname(url)
535✔
1850
                }
535✔
1851
        }
1852
        // Add the configured one
1853
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,166✔
1854
}
1855

1856
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1857
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,872✔
1858
        opts := s.getOpts()
3,872✔
1859
        if opts.LeafNode.Advertise != _EMPTY_ {
3,883✔
1860
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1861
                if err != nil {
11✔
1862
                        return err
×
1863
                }
×
1864
                s.leafNodeInfo.Host = advHost
11✔
1865
                s.leafNodeInfo.Port = advPort
11✔
1866
        } else {
3,861✔
1867
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,861✔
1868
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,861✔
1869
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,861✔
1870
                // This will return at most 1 IP.
3,861✔
1871
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,861✔
1872
                if err != nil {
3,861✔
1873
                        return err
×
1874
                }
×
1875
                if hostIsIPAny {
4,118✔
1876
                        if len(ips) == 0 {
257✔
1877
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1878
                                        s.leafNodeInfo.Host)
×
1879
                        } else {
257✔
1880
                                // Take the first from the list...
257✔
1881
                                s.leafNodeInfo.Host = ips[0]
257✔
1882
                        }
257✔
1883
                }
1884
        }
1885
        // Use just host:port for the IP
1886
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,872✔
1887
        if opts.LeafNode.Advertise != _EMPTY_ {
3,883✔
1888
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1889
        }
11✔
1890
        return nil
3,872✔
1891
}
1892

1893
// Add the connection to the map of leaf nodes.
1894
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1895
// if a connection already exists for the same server name and account.
1896
// That can happen when the remote is attempting to reconnect while the accepting
1897
// side did not detect the connection as broken yet.
1898
// But it can also happen when there is a misconfiguration and the remote is
1899
// creating two (or more) connections that bind to the same account on the accept
1900
// side.
1901
// When a duplicate is found, the new connection is accepted and the old is closed
1902
// (this solves the stale connection situation). An error is returned to help the
1903
// remote detect the misconfiguration when the duplicate is the result of that
1904
// misconfiguration.
1905
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) bool {
1,156✔
1906
        var accName string
1,156✔
1907
        c.mu.Lock()
1,156✔
1908
        cid := c.cid
1,156✔
1909
        acc := c.acc
1,156✔
1910
        if acc != nil {
2,312✔
1911
                accName = acc.Name
1,156✔
1912
        }
1,156✔
1913
        myRemoteDomain := c.leaf.remoteDomain
1,156✔
1914
        mySrvName := c.leaf.remoteServer
1,156✔
1915
        remoteAccName := c.leaf.remoteAccName
1,156✔
1916
        myClustName := c.leaf.remoteCluster
1,156✔
1917
        remote := c.leaf.remote
1,156✔
1918
        solicited := remote != nil
1,156✔
1919
        c.mu.Unlock()
1,156✔
1920

1,156✔
1921
        var old *client
1,156✔
1922
        s.mu.Lock()
1,156✔
1923
        // We check for empty because in some test we may send empty CONNECT{}
1,156✔
1924
        if checkForDup && srvName != _EMPTY_ {
1,716✔
1925
                for _, ol := range s.leafs {
906✔
1926
                        ol.mu.Lock()
346✔
1927
                        // We care here only about non solicited Leafnode. This function
346✔
1928
                        // is more about replacing stale connections than detecting loops.
346✔
1929
                        // We have code for the loop detection elsewhere, which also delays
346✔
1930
                        // attempt to reconnect.
346✔
1931
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
346✔
1932
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
346✔
1933
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
348✔
1934
                                old = ol
2✔
1935
                        }
2✔
1936
                        ol.mu.Unlock()
346✔
1937
                        if old != nil {
348✔
1938
                                break
2✔
1939
                        }
1940
                }
1941
        }
1942
        // Now that we are under the server lock and before adding it to the map,
1943
        // for a solicited leaf, we need to make sure that it has not been removed
1944
        // from the config or disabled.
1945
        if solicited {
1,713✔
1946
                // If no longer valid, do not add to the server map. The connection
557✔
1947
                // should have been marked so that it can't reconnect. When the caller
557✔
1948
                // calls closeConnection(), cleanup (including clearing the connect-
557✔
1949
                // in-progress flag) will occur at the appropriate time.
557✔
1950
                if !remote.stillValid() {
557✔
1951
                        // Prevent reconnect in case it was not yet done.
×
1952
                        c.setNoReconnect()
×
1953
                        s.mu.Unlock()
×
1954
                        s.removeFromTempClients(cid)
×
1955
                        return false
×
1956
                }
×
1957
                remote.setConnectInProgress(false)
557✔
1958
        }
1959
        // Store new connection in the map
1960
        s.leafs[cid] = c
1,156✔
1961
        s.mu.Unlock()
1,156✔
1962
        s.removeFromTempClients(cid)
1,156✔
1963

1,156✔
1964
        // If applicable, evict the old one.
1,156✔
1965
        if old != nil {
1,158✔
1966
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1967
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1968
                c.Warnf("Replacing connection from same server")
2✔
1969
        }
2✔
1970

1971
        srvDecorated := func() string {
1,345✔
1972
                if myClustName == _EMPTY_ {
211✔
1973
                        return mySrvName
22✔
1974
                }
22✔
1975
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
167✔
1976
        }
1977

1978
        opts := s.getOpts()
1,156✔
1979
        sysAcc := s.SystemAccount()
1,156✔
1980
        js := s.getJetStream()
1,156✔
1981
        var meta *raft
1,156✔
1982
        if js != nil {
1,648✔
1983
                if mg := js.getMetaGroup(); mg != nil {
868✔
1984
                        meta = mg.(*raft)
376✔
1985
                }
376✔
1986
        }
1987
        blockMappingOutgoing := false
1,156✔
1988
        // Deny (non domain) JetStream API traffic unless system account is shared
1,156✔
1989
        // and domain names are identical and extending is not disabled
1,156✔
1990

1,156✔
1991
        // Check if backwards compatibility has been enabled and needs to be acted on
1,156✔
1992
        forceSysAccDeny := false
1,156✔
1993
        if len(opts.JsAccDefaultDomain) > 0 {
1,189✔
1994
                if acc == sysAcc {
44✔
1995
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1996
                                if d == _EMPTY_ {
19✔
1997
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1998
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1999
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
2000
                                        forceSysAccDeny = true
8✔
2001
                                        break
8✔
2002
                                }
2003
                        }
2004
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
35✔
2005
                        // for backwards compatibility with old setups that do not have a domain name set
13✔
2006
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
13✔
2007
                        return true
13✔
2008
                }
13✔
2009
        }
2010

2011
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
2012
        // This is either signaled by js being disabled and a domain set,
2013
        // or in cases where no domain name exists, an extension hint is set.
2014
        // However, this is only relevant in mixed setups.
2015
        //
2016
        // If the system account connects but default domains are present, JetStream can't be extended.
2017
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,143✔
2018
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,148✔
2019
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,005✔
2020
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,005✔
2021
                if acc != nil && acc == sysAcc {
1,128✔
2022
                        c.Noticef("System account connected from %s", srvDecorated())
123✔
2023
                        c.Noticef("JetStream not extended, domains differ")
123✔
2024
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
123✔
2025
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
123✔
2026
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
123✔
2027
                        if solicited && meta != nil && meta.IsObserver() {
145✔
2028
                                meta.setObserver(false, extNotExtended)
22✔
2029
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
22✔
2030
                                // Take note that the domain was not extended to avoid this state from startup.
22✔
2031
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
22✔
2032
                                // Meta controller can't be leader yet.
22✔
2033
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
22✔
2034
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
22✔
2035
                                meta.Campaign()
22✔
2036
                        }
22✔
2037
                } else {
882✔
2038
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
882✔
2039
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
882✔
2040
                }
882✔
2041
                blockMappingOutgoing = true
1,005✔
2042
        } else if acc == sysAcc {
204✔
2043
                // system account and same domain
66✔
2044
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
66✔
2045
                        myRemoteDomain, srvDecorated())
66✔
2046
                // In an extension use case, pin leadership to server remotes connect to.
66✔
2047
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
66✔
2048
                if solicited && meta != nil && !meta.IsObserver() {
70✔
2049
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
2050
                        // Discard any local metagroup state accumulated before the SYS-account
4✔
2051
                        // leaf came up (e.g. the wrong-hint case where this server bootstrapped
4✔
2052
                        // its own metagroup). The parent's view is now authoritative; without
4✔
2053
                        // this reset the two raft logs stay forked because the standalone log's
4✔
2054
                        // commit prefix short-circuits the follower's AE handling.
4✔
2055
                        meta.setObserver(true, extExtended)
4✔
2056
                        meta.Reset()
4✔
2057
                }
4✔
2058
        } else {
72✔
2059
                // This deny is needed in all cases (system account shared or not)
72✔
2060
                // If the system account is shared, jsAllAPI traffic will go through the system account.
72✔
2061
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
72✔
2062
                // If the system account is NOT shared, jsAllAPI traffic has no business
72✔
2063
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
72✔
2064
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
72✔
2065
        }
72✔
2066
        // If we have a specified JetStream domain we will want to add a mapping to
2067
        // allow access cross domain for each non-system account.
2068
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,384✔
2069
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,410✔
2070
                        if err := acc.AddMapping(src, dest); err != nil {
2,169✔
2071
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
2072
                        } else {
2,169✔
2073
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,169✔
2074
                        }
2,169✔
2075
                }
2076
                if blockMappingOutgoing {
451✔
2077
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
210✔
2078
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
210✔
2079
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
210✔
2080
                        // of this issue, not all of them.
210✔
2081
                        // This guards against a hub and a spoke having the same domain name.
210✔
2082
                        // But not two spokes having the same one and the request coming from the hub.
210✔
2083
                        c.mergeDenyPermissionsLocked(pub, []string{src})
210✔
2084
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
210✔
2085
                }
210✔
2086
        }
2087
        return true
1,143✔
2088
}
2089

2090
func (s *Server) removeLeafNodeConnection(c *client) {
1,519✔
2091
        s.mu.Lock()
1,519✔
2092
        c.mu.Lock()
1,519✔
2093
        cid := c.cid
1,519✔
2094
        if c.leaf != nil {
3,037✔
2095
                if c.leaf.tsubt != nil {
2,576✔
2096
                        c.leaf.tsubt.Stop()
1,058✔
2097
                        c.leaf.tsubt = nil
1,058✔
2098
                }
1,058✔
2099
                if c.leaf.gwSub != nil {
2,075✔
2100
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
557✔
2101
                        // We need to set this to nil for GC to release the connection
557✔
2102
                        c.leaf.gwSub = nil
557✔
2103
                }
557✔
2104
                if remote := c.leaf.remote; remote != nil {
2,246✔
2105
                        // If "noReconnect" is true, then we won't attempt to reconnect, so
728✔
2106
                        // we will clear the "connect-in-progress" flag. However, if we can
728✔
2107
                        // reconnect, then we should set "connect-in-progress" to true while
728✔
2108
                        // we are under the server/client lock. The go routine that performs
728✔
2109
                        // the reconnect will be started later and there would be a gap with
728✔
2110
                        // the wrong flag value otherwise.
728✔
2111
                        remote.setConnectInProgress(!c.flags.isSet(noReconnect))
728✔
2112
                }
728✔
2113
        }
2114
        proxyKey := c.proxyKey
1,519✔
2115
        c.mu.Unlock()
1,519✔
2116
        delete(s.leafs, cid)
1,519✔
2117
        if proxyKey != _EMPTY_ {
1,523✔
2118
                s.removeProxiedConn(proxyKey, cid)
4✔
2119
        }
4✔
2120
        s.mu.Unlock()
1,519✔
2121
        s.removeFromTempClients(cid)
1,519✔
2122
}
2123

2124
// Connect information for solicited leafnodes.
2125
type leafConnectInfo struct {
2126
        Version   string   `json:"version,omitempty"`
2127
        Nkey      string   `json:"nkey,omitempty"`
2128
        JWT       string   `json:"jwt,omitempty"`
2129
        Sig       string   `json:"sig,omitempty"`
2130
        User      string   `json:"user,omitempty"`
2131
        Pass      string   `json:"pass,omitempty"`
2132
        Token     string   `json:"auth_token,omitempty"`
2133
        ID        string   `json:"server_id,omitempty"`
2134
        Domain    string   `json:"domain,omitempty"`
2135
        Name      string   `json:"name,omitempty"`
2136
        Hub       bool     `json:"is_hub,omitempty"`
2137
        Cluster   string   `json:"cluster,omitempty"`
2138
        Headers   bool     `json:"headers,omitempty"`
2139
        JetStream bool     `json:"jetstream,omitempty"`
2140
        DenyPub   []string `json:"deny_pub,omitempty"`
2141
        Isolate   bool     `json:"isolate,omitempty"`
2142

2143
        // There was an existing field called:
2144
        // >> Comp bool `json:"compression,omitempty"`
2145
        // that has never been used. With support for compression, we now need
2146
        // a field that is a string. So we use a different json tag:
2147
        Compression string `json:"compress_mode,omitempty"`
2148

2149
        // Just used to detect wrong connection attempts.
2150
        Gateway string `json:"gateway,omitempty"`
2151

2152
        // Tells the accept side which account the remote is binding to.
2153
        RemoteAccount string `json:"remote_account,omitempty"`
2154

2155
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
2156
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
2157
        // version as part of the CONNECT. It will indicate if a connection supports
2158
        // some features, such as message tracing.
2159
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
2160
        // in the low level process CONNECT.
2161
        Proto int `json:"protocol,omitempty"`
2162
}
2163

2164
// processLeafNodeConnect will process the inbound connect args.
2165
// Once we are here we are bound to an account, so can send any interest that
2166
// we would have to the other side.
2167
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
604✔
2168
        // Way to detect clients that incorrectly connect to the route listen
604✔
2169
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
604✔
2170
        if lang != _EMPTY_ {
604✔
2171
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
2172
                c.closeConnection(WrongPort)
×
2173
                return ErrClientConnectedToLeafNodePort
×
2174
        }
×
2175

2176
        // Unmarshal as a leaf node connect protocol
2177
        proto := &leafConnectInfo{}
604✔
2178
        if err := json.Unmarshal(arg, proto); err != nil {
604✔
2179
                return err
×
2180
        }
×
2181

2182
        // Reject a cluster that contains spaces.
2183
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
605✔
2184
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
2185
                c.closeConnection(ProtocolViolation)
1✔
2186
                return ErrClusterNameHasSpaces
1✔
2187
        }
1✔
2188

2189
        // Check for cluster name collisions.
2190
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
606✔
2191
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
2192
                c.closeConnection(ClusterNamesIdentical)
3✔
2193
                return ErrLeafNodeHasSameClusterName
3✔
2194
        }
3✔
2195

2196
        // Reject if this has Gateway which means that it would be from a gateway
2197
        // connection that incorrectly connects to the leafnode port.
2198
        if proto.Gateway != _EMPTY_ {
600✔
2199
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
2200
                c.Errorf(errTxt)
×
2201
                c.sendErr(errTxt)
×
2202
                c.closeConnection(WrongGateway)
×
2203
                return ErrWrongGateway
×
2204
        }
×
2205

2206
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
602✔
2207
                major, minor, update, _ := versionComponents(mv)
2✔
2208
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
2209
                        // Send back an INFO so recent remote servers process the rejection
1✔
2210
                        // cleanly, then close immediately. The soliciting side applies the
1✔
2211
                        // reconnect delay when it processes the error.
1✔
2212
                        s.sendPermsAndAccountInfo(c)
1✔
2213
                        c.sendErrAndErr(fmt.Sprintf("%s %q", ErrLeafNodeMinVersionRejected, mv))
1✔
2214
                        c.closeConnection(MinimumVersionRequired)
1✔
2215
                        return ErrMinimumVersionRequired
1✔
2216
                }
1✔
2217
        }
2218

2219
        // Check if this server supports headers.
2220
        supportHeaders := c.srv.supportsHeaders()
599✔
2221

599✔
2222
        c.mu.Lock()
599✔
2223
        // Leaf Nodes do not do echo or verbose or pedantic.
599✔
2224
        c.opts.Verbose = false
599✔
2225
        c.opts.Echo = false
599✔
2226
        c.opts.Pedantic = false
599✔
2227
        // This inbound connection will be marked as supporting headers if this server
599✔
2228
        // support headers and the remote has sent in the CONNECT protocol that it does
599✔
2229
        // support headers too.
599✔
2230
        c.headers = supportHeaders && proto.Headers
599✔
2231
        // If the compression level is still not set, set it based on what has been
599✔
2232
        // given to us in the CONNECT protocol.
599✔
2233
        if c.leaf.compression == _EMPTY_ {
734✔
2234
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
135✔
2235
                if proto.Compression == _EMPTY_ {
174✔
2236
                        c.leaf.compression = CompressionNotSupported
39✔
2237
                } else {
135✔
2238
                        c.leaf.compression = proto.Compression
96✔
2239
                }
96✔
2240
        }
2241

2242
        // Remember the remote server.
2243
        c.leaf.remoteServer = proto.Name
599✔
2244
        // Remember the remote account name
599✔
2245
        c.leaf.remoteAccName = proto.RemoteAccount
599✔
2246
        // Remember if the leafnode requested isolation.
599✔
2247
        c.leaf.isolated = c.leaf.isolated || proto.Isolate
599✔
2248

599✔
2249
        // If the other side has declared itself a hub, so we will take on the spoke role.
599✔
2250
        if proto.Hub {
611✔
2251
                c.leaf.isSpoke = true
12✔
2252
        }
12✔
2253

2254
        // The soliciting side is part of a cluster.
2255
        if proto.Cluster != _EMPTY_ {
1,052✔
2256
                c.leaf.remoteCluster = proto.Cluster
453✔
2257
        }
453✔
2258

2259
        c.leaf.remoteDomain = proto.Domain
599✔
2260

599✔
2261
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
599✔
2262
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
599✔
2263
        if !c.isSolicitedLeafNode() && c.perms != nil {
608✔
2264
                sp, pp := c.perms.sub, c.perms.pub
9✔
2265
                c.perms.sub, c.perms.pub = pp, sp
9✔
2266
                if c.opts.Import != nil {
17✔
2267
                        c.darray = c.opts.Import.Deny
8✔
2268
                } else {
9✔
2269
                        c.darray = nil
1✔
2270
                }
1✔
2271
        }
2272

2273
        // Set the Ping timer
2274
        c.setFirstPingTimer()
599✔
2275

599✔
2276
        // If we received pub deny permissions from the other end, merge with existing ones.
599✔
2277
        c.mergeDenyPermissions(pub, proto.DenyPub)
599✔
2278

599✔
2279
        acc := c.acc
599✔
2280
        c.mu.Unlock()
599✔
2281

599✔
2282
        // If the account is not set (e.g. connection was closed due to auth
599✔
2283
        // timeout while still being processed), bail out to avoid a panic.
599✔
2284
        if acc == nil {
599✔
2285
                c.closeConnection(MissingAccount)
×
2286
                return ErrMissingAccount
×
2287
        }
×
2288

2289
        // Register the cluster, even if empty, as long as we are acting as a hub.
2290
        if !proto.Hub {
1,186✔
2291
                acc.registerLeafNodeCluster(proto.Cluster)
587✔
2292
        }
587✔
2293

2294
        // Add in the leafnode here since we passed through auth at this point.
2295
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
599✔
2296

599✔
2297
        // If we have permissions bound to this leafnode we need to send then back to the
599✔
2298
        // origin server for local enforcement.
599✔
2299
        s.sendPermsAndAccountInfo(c)
599✔
2300

599✔
2301
        // Create and initialize the smap since we know our bound account now.
599✔
2302
        // This will send all registered subs too.
599✔
2303
        s.initLeafNodeSmapAndSendSubs(c)
599✔
2304

599✔
2305
        // Announce the account connect event for a leaf node.
599✔
2306
        // This will be a no-op as needed.
599✔
2307
        s.sendLeafNodeConnect(c.acc)
599✔
2308

599✔
2309
        // Check to see if we need to kick any internal source or mirror consumers.
599✔
2310
        // This will be a no-op if JetStream not enabled for this server or if the bound account
599✔
2311
        // does not have jetstream.
599✔
2312
        s.checkInternalSyncConsumers(acc)
599✔
2313

599✔
2314
        return nil
599✔
2315
}
2316

2317
// checkInternalSyncConsumers
2318
func (s *Server) checkInternalSyncConsumers(acc *Account) {
1,849✔
2319
        // Grab our js
1,849✔
2320
        js := s.getJetStream()
1,849✔
2321

1,849✔
2322
        // Only applicable if we have JS and the leafnode has JS as well.
1,849✔
2323
        // We check for remote JS outside.
1,849✔
2324
        if !js.isEnabled() || acc == nil {
2,873✔
2325
                return
1,024✔
2326
        }
1,024✔
2327

2328
        // We will check all streams in our local account. They must be a leader and
2329
        // be sourcing or mirroring. We will check the external config on the stream itself
2330
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2331
        // extending the system across this leafnode connection and hence we would be extending
2332
        // our own domain.
2333
        jsa := js.lookupAccount(acc)
825✔
2334
        if jsa == nil {
1,138✔
2335
                return
313✔
2336
        }
313✔
2337

2338
        var streams []*stream
512✔
2339
        jsa.mu.RLock()
512✔
2340
        for _, mset := range jsa.streams {
570✔
2341
                mset.cfgMu.RLock()
58✔
2342
                // We need to have a mirror or source defined.
58✔
2343
                // We do not want to force another lock here to look for leader status,
58✔
2344
                // so collect and after we release jsa will make sure.
58✔
2345
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
71✔
2346
                        streams = append(streams, mset)
13✔
2347
                }
13✔
2348
                mset.cfgMu.RUnlock()
58✔
2349
        }
2350
        jsa.mu.RUnlock()
512✔
2351

512✔
2352
        // Now loop through all candidates and check if we are the leader and have NOT
512✔
2353
        // created the sync up consumer.
512✔
2354
        for _, mset := range streams {
525✔
2355
                mset.retryDisconnectedSyncConsumers()
13✔
2356
        }
13✔
2357
}
2358

2359
// Returns the remote cluster name. This is set only once so does not require a lock.
2360
func (c *client) remoteCluster() string {
140,853✔
2361
        if c.leaf == nil {
140,853✔
2362
                return _EMPTY_
×
2363
        }
×
2364
        return c.leaf.remoteCluster
140,853✔
2365
}
2366

2367
// Sends back an info block to the soliciting leafnode to let it know about
2368
// its permission settings for local enforcement.
2369
func (s *Server) sendPermsAndAccountInfo(c *client) {
600✔
2370
        // Copy
600✔
2371
        s.mu.Lock()
600✔
2372
        info := s.copyLeafNodeInfo()
600✔
2373
        s.mu.Unlock()
600✔
2374
        c.mu.Lock()
600✔
2375
        info.CID = c.cid
600✔
2376
        info.Import = c.opts.Import
600✔
2377
        info.Export = c.opts.Export
600✔
2378
        info.RemoteAccount = c.acc.Name
600✔
2379
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
600✔
2380
        info.IsSystemAccount = c.acc == s.SystemAccount()
600✔
2381
        info.ConnectInfo = true
600✔
2382
        c.enqueueProto(generateInfoJSON(info))
600✔
2383
        c.mu.Unlock()
600✔
2384
}
600✔
2385

2386
// Snapshot the current subscriptions from the sublist into our smap which
2387
// we will keep updated from now on.
2388
// Also send the registered subscriptions.
2389
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,156✔
2390
        acc := c.acc
1,156✔
2391
        if acc == nil {
1,156✔
2392
                c.Debugf("Leafnode does not have an account bound")
×
2393
                return
×
2394
        }
×
2395
        // Collect all account subs here.
2396
        _subs := [1024]*subscription{}
1,156✔
2397
        subs := _subs[:0]
1,156✔
2398
        ims := []string{}
1,156✔
2399

1,156✔
2400
        // Hold the client lock otherwise there can be a race and miss some subs.
1,156✔
2401
        c.mu.Lock()
1,156✔
2402
        defer c.mu.Unlock()
1,156✔
2403

1,156✔
2404
        acc.mu.RLock()
1,156✔
2405
        accName := acc.Name
1,156✔
2406
        accNTag := acc.nameTag
1,156✔
2407

1,156✔
2408
        // To make printing look better when no friendly name present.
1,156✔
2409
        if accNTag != _EMPTY_ {
1,159✔
2410
                accNTag = "/" + accNTag
3✔
2411
        }
3✔
2412

2413
        // If we are solicited we only send interest for local clients.
2414
        if c.isSpokeLeafNode() {
1,713✔
2415
                acc.sl.localSubs(&subs, true)
557✔
2416
        } else {
1,156✔
2417
                acc.sl.All(&subs)
599✔
2418
        }
599✔
2419

2420
        // Check if we have an existing service import reply.
2421
        siReply := copyBytes(acc.siReply)
1,156✔
2422

1,156✔
2423
        // Since leaf nodes only send on interest, if the bound
1,156✔
2424
        // account has import services we need to send those over.
1,156✔
2425
        for isubj := range acc.imports.services {
5,453✔
2426
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
4,561✔
2427
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
264✔
2428
                        continue
264✔
2429
                }
2430
                ims = append(ims, isubj)
4,033✔
2431
        }
2432
        // Likewise for mappings.
2433
        for _, m := range acc.mappings {
3,417✔
2434
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,279✔
2435
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
18✔
2436
                        continue
18✔
2437
                }
2438
                ims = append(ims, m.src)
2,243✔
2439
        }
2440

2441
        // Create a unique subject that will be used for loop detection.
2442
        lds := acc.lds
1,156✔
2443
        acc.mu.RUnlock()
1,156✔
2444

1,156✔
2445
        // Check if we have to create the LDS.
1,156✔
2446
        if lds == _EMPTY_ {
2,058✔
2447
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
902✔
2448
                acc.mu.Lock()
902✔
2449
                acc.lds = lds
902✔
2450
                acc.mu.Unlock()
902✔
2451
        }
902✔
2452

2453
        // Now check for gateway interest. Leafnodes will put this into
2454
        // the proper mode to propagate, but they are not held in the account.
2455
        gwsa := [16]*client{}
1,156✔
2456
        gws := gwsa[:0]
1,156✔
2457
        s.getOutboundGatewayConnections(&gws)
1,156✔
2458
        for _, cgw := range gws {
1,230✔
2459
                cgw.mu.Lock()
74✔
2460
                gw := cgw.gw
74✔
2461
                cgw.mu.Unlock()
74✔
2462
                if gw != nil {
148✔
2463
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
148✔
2464
                                if e := ei.(*outsie); e != nil && e.sl != nil {
148✔
2465
                                        e.sl.All(&subs)
74✔
2466
                                }
74✔
2467
                        }
2468
                }
2469
        }
2470

2471
        applyGlobalRouting := s.gateway.enabled
1,156✔
2472
        if c.isSpokeLeafNode() {
1,713✔
2473
                // Add a fake subscription for this solicited leafnode connection
557✔
2474
                // so that we can send back directly for mapped GW replies.
557✔
2475
                // We need to keep track of this subscription so it can be removed
557✔
2476
                // when the connection is closed so that the GC can release it.
557✔
2477
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
557✔
2478
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
557✔
2479
        }
557✔
2480

2481
        // Now walk the results and add them to our smap
2482
        rc := c.leaf.remoteCluster
1,156✔
2483
        c.leaf.smap = make(map[string]int32)
1,156✔
2484
        for _, sub := range subs {
35,110✔
2485
                // Check perms regardless of role.
33,954✔
2486
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
36,075✔
2487
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,121✔
2488
                        continue
2,121✔
2489
                }
2490
                // Don't advertise interest from leafnodes to other isolated leafnodes.
2491
                if sub.client.kind == LEAF && c.isIsolatedLeafNode() {
31,833✔
2492
                        continue
×
2493
                }
2494
                // We ignore ourselves here.
2495
                // Also don't add the subscription if it has a origin cluster and the
2496
                // cluster name matches the one of the client we are sending to.
2497
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
58,753✔
2498
                        count := int32(1)
26,920✔
2499
                        if len(sub.queue) > 0 && sub.qw > 0 {
26,930✔
2500
                                count = sub.qw
10✔
2501
                        }
10✔
2502
                        c.leaf.smap[keyFromSub(sub)] += count
26,920✔
2503
                        if c.leaf.tsub == nil {
28,006✔
2504
                                c.leaf.tsub = make(map[*subscription]struct{})
1,086✔
2505
                        }
1,086✔
2506
                        c.leaf.tsub[sub] = struct{}{}
26,920✔
2507
                }
2508
        }
2509
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2510
        for _, isubj := range ims {
7,432✔
2511
                c.leaf.smap[isubj]++
6,276✔
2512
        }
6,276✔
2513
        // If we have gateways enabled we need to make sure the other side sends us responses
2514
        // that have been augmented from the original subscription.
2515
        // TODO(dlc) - Should we lock this down more?
2516
        if applyGlobalRouting {
1,243✔
2517
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
87✔
2518
                c.leaf.smap[gwReplyPrefix+">"]++
87✔
2519
        }
87✔
2520
        // Detect loops by subscribing to a specific subject and checking
2521
        // if this sub is coming back to us.
2522
        c.leaf.smap[lds]++
1,156✔
2523

1,156✔
2524
        // Check if we need to add an existing siReply to our map.
1,156✔
2525
        // This will be a prefix so add on the wildcard.
1,156✔
2526
        if siReply != nil {
1,172✔
2527
                wcsub := append(siReply, '>')
16✔
2528
                c.leaf.smap[string(wcsub)]++
16✔
2529
        }
16✔
2530
        // Queue all protocols. There is no max pending limit for LN connection,
2531
        // so we don't need chunking. The writes will happen from the writeLoop.
2532
        var b bytes.Buffer
1,156✔
2533
        for key, n := range c.leaf.smap {
25,119✔
2534
                c.writeLeafSub(&b, key, n)
23,963✔
2535
        }
23,963✔
2536
        if b.Len() > 0 {
2,312✔
2537
                c.enqueueProto(b.Bytes())
1,156✔
2538
        }
1,156✔
2539
        if c.leaf.tsub != nil {
2,243✔
2540
                // Clear the tsub map after 5 seconds.
1,087✔
2541
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,116✔
2542
                        c.mu.Lock()
29✔
2543
                        if c.leaf != nil {
58✔
2544
                                c.leaf.tsub = nil
29✔
2545
                                c.leaf.tsubt = nil
29✔
2546
                        }
29✔
2547
                        c.mu.Unlock()
29✔
2548
                })
2549
        }
2550
}
2551

2552
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2553
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
196,843✔
2554
        // Since we're in the gateway's readLoop, and we would otherwise block, don't allow fetching.
196,843✔
2555
        acc, err := s.lookupOrFetchAccount(accName, false)
196,843✔
2556
        if acc == nil || err != nil {
197,171✔
2557
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
328✔
2558
                return
328✔
2559
        }
328✔
2560
        acc.updateLeafNodes(sub, delta)
196,515✔
2561
}
2562

2563
// updateLeafNodesEx will make sure to update the account smap for the subscription.
2564
// Will also forward to all leaf nodes as needed.
2565
// If `hubOnly` is true, then will update only leaf nodes that connect to this server
2566
// (that is, for which this server acts as a hub to them).
2567
func (acc *Account) updateLeafNodesEx(sub *subscription, delta int32, hubOnly bool) {
2,357,993✔
2568
        if acc == nil || sub == nil {
2,357,993✔
2569
                return
×
2570
        }
×
2571

2572
        // We will do checks for no leafnodes and same cluster here inline and under the
2573
        // general account read lock.
2574
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2575
        // blocking routes or GWs.
2576

2577
        acc.mu.RLock()
2,357,993✔
2578
        // First check if we even have leafnodes here.
2,357,993✔
2579
        if acc.nleafs == 0 {
4,655,680✔
2580
                acc.mu.RUnlock()
2,297,687✔
2581
                return
2,297,687✔
2582
        }
2,297,687✔
2583

2584
        // Is this a loop detection subject.
2585
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
60,306✔
2586

60,306✔
2587
        // Capture the cluster even if its empty.
60,306✔
2588
        var cluster string
60,306✔
2589
        if sub.origin != nil {
104,695✔
2590
                cluster = bytesToString(sub.origin)
44,389✔
2591
        }
44,389✔
2592

2593
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2594
        // Empty clusters will return false for the check.
2595
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
78,624✔
2596
                acc.mu.RUnlock()
18,318✔
2597
                return
18,318✔
2598
        }
18,318✔
2599

2600
        // We can release the general account lock.
2601
        acc.mu.RUnlock()
41,988✔
2602

41,988✔
2603
        // We can hold the list lock here to avoid having to copy a large slice.
41,988✔
2604
        acc.lmu.RLock()
41,988✔
2605
        defer acc.lmu.RUnlock()
41,988✔
2606

41,988✔
2607
        // Do this once.
41,988✔
2608
        subject := string(sub.subject)
41,988✔
2609

41,988✔
2610
        // Walk the connected leafnodes from a random starting point to avoid
41,988✔
2611
        // concurrent callers all contending over leafs in the same order.
41,988✔
2612
        nleafs := len(acc.lleafs)
41,988✔
2613
        start := 0
41,988✔
2614
        if nleafs > 1 {
48,947✔
2615
                start = rand.Intn(nleafs)
6,959✔
2616
        }
6,959✔
2617
        for i := 0; i < nleafs; i++ {
94,915✔
2618
                ln := acc.lleafs[(start+i)%nleafs]
52,927✔
2619
                if ln == sub.client {
80,846✔
2620
                        continue
27,919✔
2621
                }
2622
                ln.mu.RLock()
25,008✔
2623
                // Don't advertise interest from leafnodes to other isolated leafnodes.
25,008✔
2624
                if sub.client.kind == LEAF && ln.isIsolatedLeafNode() {
25,008✔
2625
                        ln.mu.RUnlock()
×
2626
                        continue
×
2627
                }
2628
                // If `hubOnly` is true, it means that we want to update only leafnodes
2629
                // that connect to this server (so isHubLeafNode() would return `true`).
2630
                if hubOnly && !ln.isHubLeafNode() {
25,014✔
2631
                        ln.mu.RUnlock()
6✔
2632
                        continue
6✔
2633
                }
2634
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2635
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
2636
                // the detection of loops as long as different cluster.
2637
                clusterDifferent := cluster != ln.remoteCluster()
25,002✔
2638
                update := (isLDS && clusterDifferent) ||
25,002✔
2639
                        ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribeInternal(subject)))
25,002✔
2640
                ln.mu.RUnlock()
25,002✔
2641
                if update {
46,791✔
2642
                        ln.mu.Lock()
21,789✔
2643
                        // The leaf role, isolation mode, and remote cluster are stable
21,789✔
2644
                        // for the connection. Recheck canSubscribe here since permissions
21,789✔
2645
                        // can change, and to initializes mperms for wildcard subscriptions
21,789✔
2646
                        // that collide with deny rules.
21,789✔
2647
                        if isLDS || delta <= 0 || ln.canSubscribe(subject) {
43,578✔
2648
                                ln.updateSmap(sub, delta, isLDS)
21,789✔
2649
                        }
21,789✔
2650
                        ln.mu.Unlock()
21,789✔
2651
                }
2652
        }
2653
}
2654

2655
// updateLeafNodes will make sure to update the account smap for the subscription.
2656
// Will also forward to all leaf nodes as needed.
2657
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,357,980✔
2658
        acc.updateLeafNodesEx(sub, delta, false)
2,357,980✔
2659
}
2,357,980✔
2660

2661
// This will make an update to our internal smap and determine if we should send out
2662
// an interest update to the remote side.
2663
// Lock should be held.
2664
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
21,789✔
2665
        if c.leaf.smap == nil {
21,837✔
2666
                return
48✔
2667
        }
48✔
2668

2669
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2670
        skind := sub.client.kind
21,741✔
2671
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
21,741✔
2672
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
28,944✔
2673
                return
7,203✔
2674
        }
7,203✔
2675

2676
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2677
        if delta > 0 && c.leaf.tsub != nil {
21,623✔
2678
                if _, present := c.leaf.tsub[sub]; present {
7,088✔
2679
                        delete(c.leaf.tsub, sub)
3✔
2680
                        if len(c.leaf.tsub) == 0 {
3✔
2681
                                c.leaf.tsub = nil
×
2682
                                c.leaf.tsubt.Stop()
×
2683
                                c.leaf.tsubt = nil
×
2684
                        }
×
2685
                        return
3✔
2686
                }
2687
        }
2688

2689
        key := keyFromSub(sub)
14,535✔
2690
        n, ok := c.leaf.smap[key]
14,535✔
2691
        if delta < 0 && !ok {
15,288✔
2692
                return
753✔
2693
        }
753✔
2694

2695
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2696
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
13,782✔
2697
        n += delta
13,782✔
2698
        if n > 0 {
24,172✔
2699
                c.leaf.smap[key] = n
10,390✔
2700
        } else {
13,782✔
2701
                delete(c.leaf.smap, key)
3,392✔
2702
        }
3,392✔
2703
        if update {
22,970✔
2704
                c.sendLeafNodeSubUpdate(key, n)
9,188✔
2705
        }
9,188✔
2706
}
2707

2708
// Used to force add subjects to the subject map.
2709
func (c *client) forceAddToSmap(subj string) {
4✔
2710
        c.mu.Lock()
4✔
2711
        defer c.mu.Unlock()
4✔
2712

4✔
2713
        if c.leaf.smap == nil {
4✔
2714
                return
×
2715
        }
×
2716
        n := c.leaf.smap[subj]
4✔
2717
        if n != 0 {
5✔
2718
                return
1✔
2719
        }
1✔
2720
        // Place into the map since it was not there.
2721
        c.leaf.smap[subj] = 1
3✔
2722
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2723
}
2724

2725
// Used to force remove a subject from the subject map.
2726
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2727
        c.mu.Lock()
1✔
2728
        defer c.mu.Unlock()
1✔
2729

1✔
2730
        if c.leaf.smap == nil {
1✔
2731
                return
×
2732
        }
×
2733
        n := c.leaf.smap[subj]
1✔
2734
        if n == 0 {
1✔
2735
                return
×
2736
        }
×
2737
        n--
1✔
2738
        if n == 0 {
2✔
2739
                // Remove is now zero
1✔
2740
                delete(c.leaf.smap, subj)
1✔
2741
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2742
        } else {
1✔
2743
                c.leaf.smap[subj] = n
×
2744
        }
×
2745
}
2746

2747
// Send the subscription interest change to the other side.
2748
// Lock should be held.
2749
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,192✔
2750
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,192✔
2751
        if c.isSpokeLeafNode() {
11,322✔
2752
                checkPerms := true
2,130✔
2753
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,360✔
2754
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,230✔
2755
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,230✔
2756
                                strings.HasPrefix(key, gwReplyPrefix) {
1,308✔
2757
                                checkPerms = false
78✔
2758
                        }
78✔
2759
                }
2760
                if checkPerms {
4,182✔
2761
                        var subject string
2,052✔
2762
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,532✔
2763
                                subject = key[:sep]
480✔
2764
                        } else {
2,052✔
2765
                                subject = key
1,572✔
2766
                        }
1,572✔
2767
                        if !c.canSubscribe(subject) {
2,052✔
2768
                                return
×
2769
                        }
×
2770
                }
2771
        }
2772
        // If we are here we can send over to the other side.
2773
        _b := [64]byte{}
9,192✔
2774
        b := bytes.NewBuffer(_b[:0])
9,192✔
2775
        c.writeLeafSub(b, key, n)
9,192✔
2776
        c.enqueueProto(b.Bytes())
9,192✔
2777
}
2778

2779
// Helper function to build the key.
2780
func keyFromSub(sub *subscription) string {
42,334✔
2781
        var sb strings.Builder
42,334✔
2782
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
42,334✔
2783
        sb.Write(sub.subject)
42,334✔
2784
        if sub.queue != nil {
46,011✔
2785
                // Just make the key subject spc group, e.g. 'foo bar'
3,677✔
2786
                sb.WriteByte(' ')
3,677✔
2787
                sb.Write(sub.queue)
3,677✔
2788
        }
3,677✔
2789
        return sb.String()
42,334✔
2790
}
2791

2792
const (
2793
        keyRoutedSub         = "R"
2794
        keyRoutedSubByte     = 'R'
2795
        keyRoutedLeafSub     = "L"
2796
        keyRoutedLeafSubByte = 'L'
2797
)
2798

2799
// Helper function to build the key that prevents collisions between normal
2800
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2801
// Keys will look like this:
2802
// "R foo"          -> plain routed sub on "foo"
2803
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2804
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2805
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2806
func keyFromSubWithOrigin(sub *subscription) string {
606,945✔
2807
        var sb strings.Builder
606,945✔
2808
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
606,945✔
2809
        leaf := len(sub.origin) > 0
606,945✔
2810
        if leaf {
621,731✔
2811
                sb.WriteByte(keyRoutedLeafSubByte)
14,786✔
2812
        } else {
606,945✔
2813
                sb.WriteByte(keyRoutedSubByte)
592,159✔
2814
        }
592,159✔
2815
        sb.WriteByte(' ')
606,945✔
2816
        sb.Write(sub.subject)
606,945✔
2817
        if sub.queue != nil {
634,708✔
2818
                sb.WriteByte(' ')
27,763✔
2819
                sb.Write(sub.queue)
27,763✔
2820
        }
27,763✔
2821
        if leaf {
621,731✔
2822
                sb.WriteByte(' ')
14,786✔
2823
                sb.Write(sub.origin)
14,786✔
2824
        }
14,786✔
2825
        return sb.String()
606,945✔
2826
}
2827

2828
// Lock should be held.
2829
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
33,155✔
2830
        if key == _EMPTY_ {
33,155✔
2831
                return
×
2832
        }
×
2833
        if n > 0 {
62,917✔
2834
                w.WriteString("LS+ " + key)
29,762✔
2835
                // Check for queue semantics, if found write n.
29,762✔
2836
                if strings.Contains(key, " ") {
32,088✔
2837
                        w.WriteString(" ")
2,326✔
2838
                        var b [12]byte
2,326✔
2839
                        var i = len(b)
2,326✔
2840
                        for l := n; l > 0; l /= 10 {
5,566✔
2841
                                i--
3,240✔
2842
                                b[i] = digits[l%10]
3,240✔
2843
                        }
3,240✔
2844
                        w.Write(b[i:])
2,326✔
2845
                        if c.trace {
2,326✔
2846
                                arg := fmt.Sprintf("%s %d", key, n)
×
2847
                                c.traceOutOp("LS+", []byte(arg))
×
2848
                        }
×
2849
                } else if c.trace {
27,453✔
2850
                        c.traceOutOp("LS+", []byte(key))
17✔
2851
                }
17✔
2852
        } else {
3,393✔
2853
                w.WriteString("LS- " + key)
3,393✔
2854
                if c.trace {
3,393✔
2855
                        c.traceOutOp("LS-", []byte(key))
×
2856
                }
×
2857
        }
2858
        w.WriteString(CR_LF)
33,155✔
2859
}
2860

2861
// processLeafSub will process an inbound sub request for the remote leaf node.
2862
func (c *client) processLeafSub(argo []byte) (err error) {
29,444✔
2863
        // Indicate activity.
29,444✔
2864
        c.in.subs++
29,444✔
2865

29,444✔
2866
        srv := c.srv
29,444✔
2867
        if srv == nil {
29,444✔
2868
                return nil
×
2869
        }
×
2870

2871
        // Copy so we do not reference a potentially large buffer
2872
        arg := make([]byte, len(argo))
29,444✔
2873
        copy(arg, argo)
29,444✔
2874

29,444✔
2875
        args := splitArg(arg)
29,444✔
2876
        sub := &subscription{client: c}
29,444✔
2877

29,444✔
2878
        delta := int32(1)
29,444✔
2879
        switch len(args) {
29,444✔
2880
        case 1:
27,185✔
2881
                sub.queue = nil
27,185✔
2882
        case 3:
2,259✔
2883
                sub.queue = args[1]
2,259✔
2884
                sub.qw = int32(parseSize(args[2]))
2,259✔
2885
                // TODO: (ik) We should have a non empty queue name and a queue
2,259✔
2886
                // weight >= 1. For 2.11, we may want to return an error if that
2,259✔
2887
                // is not the case, but for now just overwrite `delta` if queue
2,259✔
2888
                // weight is greater than 1 (it is possible after a reconnect/
2,259✔
2889
                // server restart to receive a queue weight > 1 for a new sub).
2,259✔
2890
                if sub.qw > 1 {
3,913✔
2891
                        delta = sub.qw
1,654✔
2892
                }
1,654✔
2893
        default:
×
2894
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
×
2895
        }
2896
        sub.subject = args[0]
29,444✔
2897

29,444✔
2898
        c.mu.Lock()
29,444✔
2899
        if c.isClosed() {
29,458✔
2900
                c.mu.Unlock()
14✔
2901
                return nil
14✔
2902
        }
14✔
2903

2904
        acc := c.acc
29,430✔
2905
        // Guard against LS+ arriving before CONNECT has been processed, which
29,430✔
2906
        // can happen when compression is enabled.
29,430✔
2907
        if acc == nil {
29,430✔
2908
                c.mu.Unlock()
×
2909
                c.sendErr("Authorization Violation")
×
2910
                c.closeConnection(ProtocolViolation)
×
2911
                return nil
×
2912
        }
×
2913
        // Check if we have a loop.
2914
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
29,430✔
2915

29,430✔
2916
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
29,435✔
2917
                c.mu.Unlock()
5✔
2918
                c.handleLeafNodeLoop(true)
5✔
2919
                return nil
5✔
2920
        }
5✔
2921

2922
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2923
        checkPerms := true
29,425✔
2924
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
56,014✔
2925
                if ldsPrefix ||
26,589✔
2926
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
26,589✔
2927
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
28,384✔
2928
                        checkPerms = false
1,795✔
2929
                }
1,795✔
2930
        }
2931

2932
        // If we are a hub check that we can publish to this subject.
2933
        if checkPerms {
57,055✔
2934
                subj := string(sub.subject)
27,630✔
2935
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
27,954✔
2936
                        c.mu.Unlock()
324✔
2937
                        c.leafSubPermViolation(sub.subject)
324✔
2938
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
324✔
2939
                        return nil
324✔
2940
                }
324✔
2941
        }
2942

2943
        // Check if we have a maximum on the number of subscriptions.
2944
        if c.subsAtLimit() {
29,109✔
2945
                c.mu.Unlock()
8✔
2946
                c.maxSubsExceeded()
8✔
2947
                return nil
8✔
2948
        }
8✔
2949

2950
        // If we have an origin cluster associated mark that in the sub.
2951
        if rc := c.remoteCluster(); rc != _EMPTY_ {
54,736✔
2952
                sub.origin = []byte(rc)
25,643✔
2953
        }
25,643✔
2954

2955
        // Like Routes, we store local subs by account and subject and optionally queue name.
2956
        // If we have a queue it will have a trailing weight which we do not want.
2957
        if sub.queue != nil {
31,060✔
2958
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,967✔
2959
        } else {
29,093✔
2960
                sub.sid = arg
27,126✔
2961
        }
27,126✔
2962
        key := bytesToString(sub.sid)
29,093✔
2963
        osub := c.subs[key]
29,093✔
2964
        if osub == nil {
56,671✔
2965
                c.subs[key] = sub
27,578✔
2966
                // Now place into the account sl.
27,578✔
2967
                if err := acc.sl.Insert(sub); err != nil {
27,578✔
2968
                        delete(c.subs, key)
×
2969
                        c.mu.Unlock()
×
2970
                        c.Errorf("Could not insert subscription: %v", err)
×
2971
                        c.sendErr("Invalid Subscription")
×
2972
                        return nil
×
2973
                }
×
2974
        } else if sub.queue != nil {
3,029✔
2975
                // For a queue we need to update the weight.
1,514✔
2976
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,514✔
2977
                atomic.StoreInt32(&osub.qw, sub.qw)
1,514✔
2978
                acc.sl.UpdateRemoteQSub(osub)
1,514✔
2979
        }
1,514✔
2980
        spoke := c.isSpokeLeafNode()
29,093✔
2981
        c.mu.Unlock()
29,093✔
2982

29,093✔
2983
        // Only add in shadow subs if a new sub or qsub.
29,093✔
2984
        if osub == nil {
56,671✔
2985
                if err := c.addShadowSubscriptions(acc, sub); err != nil {
27,578✔
2986
                        c.Errorf(err.Error())
×
2987
                }
×
2988
        }
2989

2990
        // If we are not solicited, treat leaf node subscriptions similar to a
2991
        // client subscription, meaning we forward them to routes, gateways and
2992
        // other leaf nodes as needed.
2993
        if !spoke {
39,271✔
2994
                // If we are routing add to the route map for the associated account.
10,178✔
2995
                srv.updateRouteSubscriptionMap(acc, sub, delta)
10,178✔
2996
                if srv.gateway.enabled {
11,334✔
2997
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,156✔
2998
                }
1,156✔
2999
        }
3000
        // Now check on leafnode updates for other leaf nodes. We understand solicited
3001
        // and non-solicited state in this call so we will do the right thing.
3002
        acc.updateLeafNodes(sub, delta)
29,093✔
3003

29,093✔
3004
        return nil
29,093✔
3005
}
3006

3007
// If the leafnode is a solicited, set the connect delay based on default
3008
// or private option (for tests). Sends the error to the other side, log and
3009
// close the connection.
3010
func (c *client) handleLeafNodeLoop(sendErr bool) {
14✔
3011
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
14✔
3012
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
14✔
3013
        if sendErr {
21✔
3014
                c.sendErr(errTxt)
7✔
3015
        }
7✔
3016

3017
        c.Errorf(errTxt)
14✔
3018
        // If we are here with "sendErr" false, it means that this is the server
14✔
3019
        // that received the error. The other side will have closed the connection,
14✔
3020
        // but does not hurt to close here too.
14✔
3021
        c.closeConnection(ProtocolViolation)
14✔
3022
}
3023

3024
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
3025
func (c *client) processLeafUnsub(arg []byte) error {
3,105✔
3026
        // Indicate any activity, so pub and sub or unsubs.
3,105✔
3027
        c.in.subs++
3,105✔
3028

3,105✔
3029
        srv := c.srv
3,105✔
3030

3,105✔
3031
        c.mu.Lock()
3,105✔
3032
        if c.isClosed() {
3,154✔
3033
                c.mu.Unlock()
49✔
3034
                return nil
49✔
3035
        }
49✔
3036

3037
        acc := c.acc
3,056✔
3038
        // Guard against LS- arriving before CONNECT has been processed.
3,056✔
3039
        if acc == nil {
3,056✔
3040
                c.mu.Unlock()
×
3041
                c.sendErr("Authorization Violation")
×
3042
                c.closeConnection(ProtocolViolation)
×
3043
                return nil
×
3044
        }
×
3045

3046
        spoke := c.isSpokeLeafNode()
3,056✔
3047
        // We store local subs by account and subject and optionally queue name.
3,056✔
3048
        // LS- will have the arg exactly as the key.
3,056✔
3049
        sub, ok := c.subs[string(arg)]
3,056✔
3050
        if !ok {
3,072✔
3051
                // If not found, don't try to update routes/gws/leaf nodes.
16✔
3052
                c.mu.Unlock()
16✔
3053
                return nil
16✔
3054
        }
16✔
3055
        delta := int32(1)
3,040✔
3056
        if len(sub.queue) > 0 {
3,454✔
3057
                delta = sub.qw
414✔
3058
        }
414✔
3059
        c.mu.Unlock()
3,040✔
3060

3,040✔
3061
        c.unsubscribe(acc, sub, true, true)
3,040✔
3062
        if !spoke {
3,945✔
3063
                // If we are routing subtract from the route map for the associated account.
905✔
3064
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
905✔
3065
                // Gateways
905✔
3066
                if srv.gateway.enabled {
1,098✔
3067
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
193✔
3068
                }
193✔
3069
        }
3070
        // Now check on leafnode updates for other leaf nodes.
3071
        acc.updateLeafNodes(sub, -delta)
3,040✔
3072
        return nil
3,040✔
3073
}
3074

3075
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
238✔
3076
        // Unroll splitArgs to avoid runtime/heap issues
238✔
3077
        args := c.argsa[:0]
238✔
3078
        start := -1
238✔
3079
        for i, b := range arg {
13,186✔
3080
                switch b {
12,948✔
3081
                case ' ', '\t', '\r', '\n':
698✔
3082
                        if start >= 0 {
1,396✔
3083
                                args = append(args, arg[start:i])
698✔
3084
                                start = -1
698✔
3085
                        }
698✔
3086
                default:
12,250✔
3087
                        if start < 0 {
13,186✔
3088
                                start = i
936✔
3089
                        }
936✔
3090
                }
3091
        }
3092
        if start >= 0 {
476✔
3093
                args = append(args, arg[start:])
238✔
3094
        }
238✔
3095

3096
        c.pa.arg = arg
238✔
3097
        switch len(args) {
238✔
3098
        case 0, 1, 2:
×
3099
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
3100
        case 3:
21✔
3101
                c.pa.reply = nil
21✔
3102
                c.pa.queues = nil
21✔
3103
                c.pa.hdb = args[1]
21✔
3104
                c.pa.hdr = parseSize(args[1])
21✔
3105
                c.pa.szb = args[2]
21✔
3106
                c.pa.size = parseSize(args[2])
21✔
3107
        case 4:
214✔
3108
                c.pa.reply = args[1]
214✔
3109
                c.pa.queues = nil
214✔
3110
                c.pa.hdb = args[2]
214✔
3111
                c.pa.hdr = parseSize(args[2])
214✔
3112
                c.pa.szb = args[3]
214✔
3113
                c.pa.size = parseSize(args[3])
214✔
3114
        default:
3✔
3115
                // args[1] is our reply indicator. Should be + or | normally.
3✔
3116
                if len(args[1]) != 1 {
3✔
3117
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3118
                }
×
3119
                switch args[1][0] {
3✔
3120
                case '+':
2✔
3121
                        c.pa.reply = args[2]
2✔
3122
                case '|':
1✔
3123
                        c.pa.reply = nil
1✔
3124
                default:
×
3125
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3126
                }
3127
                // Grab header size.
3128
                c.pa.hdb = args[len(args)-2]
3✔
3129
                c.pa.hdr = parseSize(c.pa.hdb)
3✔
3130

3✔
3131
                // Grab size.
3✔
3132
                c.pa.szb = args[len(args)-1]
3✔
3133
                c.pa.size = parseSize(c.pa.szb)
3✔
3134

3✔
3135
                // Grab queue names.
3✔
3136
                if c.pa.reply != nil {
5✔
3137
                        c.pa.queues = args[3 : len(args)-2]
2✔
3138
                } else {
3✔
3139
                        c.pa.queues = args[2 : len(args)-2]
1✔
3140
                }
1✔
3141
        }
3142
        if c.pa.hdr < 0 {
238✔
3143
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
3144
        }
×
3145
        if c.pa.size < 0 {
238✔
3146
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
3147
        }
×
3148
        if c.pa.hdr > c.pa.size {
238✔
3149
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
3150
        }
×
3151
        maxPayload := atomic.LoadInt32(&c.mpay)
238✔
3152
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
238✔
3153
                c.maxPayloadViolation(c.pa.size, maxPayload)
×
3154
                return ErrMaxPayload
×
3155
        }
×
3156

3157
        // Common ones processed after check for arg length
3158
        c.pa.subject = args[0]
238✔
3159

238✔
3160
        return nil
238✔
3161
}
3162

3163
func (c *client) processLeafMsgArgs(arg []byte) error {
64,935✔
3164
        // Unroll splitArgs to avoid runtime/heap issues
64,935✔
3165
        args := c.argsa[:0]
64,935✔
3166
        start := -1
64,935✔
3167
        for i, b := range arg {
2,144,188✔
3168
                switch b {
2,079,253✔
3169
                case ' ', '\t', '\r', '\n':
116,580✔
3170
                        if start >= 0 {
233,160✔
3171
                                args = append(args, arg[start:i])
116,580✔
3172
                                start = -1
116,580✔
3173
                        }
116,580✔
3174
                default:
1,962,673✔
3175
                        if start < 0 {
2,144,188✔
3176
                                start = i
181,515✔
3177
                        }
181,515✔
3178
                }
3179
        }
3180
        if start >= 0 {
129,870✔
3181
                args = append(args, arg[start:])
64,935✔
3182
        }
64,935✔
3183

3184
        c.pa.arg = arg
64,935✔
3185
        switch len(args) {
64,935✔
3186
        case 0, 1:
×
3187
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
3188
        case 2:
36,002✔
3189
                c.pa.reply = nil
36,002✔
3190
                c.pa.queues = nil
36,002✔
3191
                c.pa.szb = args[1]
36,002✔
3192
                c.pa.size = parseSize(args[1])
36,002✔
3193
        case 3:
6,380✔
3194
                c.pa.reply = args[1]
6,380✔
3195
                c.pa.queues = nil
6,380✔
3196
                c.pa.szb = args[2]
6,380✔
3197
                c.pa.size = parseSize(args[2])
6,380✔
3198
        default:
22,553✔
3199
                // args[1] is our reply indicator. Should be + or | normally.
22,553✔
3200
                if len(args[1]) != 1 {
22,553✔
3201
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3202
                }
×
3203
                switch args[1][0] {
22,553✔
3204
                case '+':
159✔
3205
                        c.pa.reply = args[2]
159✔
3206
                case '|':
22,394✔
3207
                        c.pa.reply = nil
22,394✔
3208
                default:
×
3209
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
3210
                }
3211
                // Grab size.
3212
                c.pa.szb = args[len(args)-1]
22,553✔
3213
                c.pa.size = parseSize(c.pa.szb)
22,553✔
3214

22,553✔
3215
                // Grab queue names.
22,553✔
3216
                if c.pa.reply != nil {
22,712✔
3217
                        c.pa.queues = args[3 : len(args)-1]
159✔
3218
                } else {
22,553✔
3219
                        c.pa.queues = args[2 : len(args)-1]
22,394✔
3220
                }
22,394✔
3221
        }
3222
        if c.pa.size < 0 {
64,935✔
3223
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
3224
        }
×
3225
        maxPayload := atomic.LoadInt32(&c.mpay)
64,935✔
3226
        if maxPayload != jwt.NoLimit && int64(c.pa.size) > int64(maxPayload) {
64,935✔
3227
                c.maxPayloadViolation(c.pa.size, maxPayload)
×
3228
                return ErrMaxPayload
×
3229
        }
×
3230

3231
        // Common ones processed after check for arg length
3232
        c.pa.subject = args[0]
64,935✔
3233

64,935✔
3234
        return nil
64,935✔
3235
}
3236

3237
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
3238
func (c *client) processInboundLeafMsg(msg []byte) {
63,758✔
3239
        // Update statistics
63,758✔
3240
        // The msg includes the CR_LF, so pull back out for accounting.
63,758✔
3241
        c.in.msgs++
63,758✔
3242
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
63,758✔
3243

63,758✔
3244
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
63,758✔
3245

63,758✔
3246
        // Mostly under testing scenarios.
63,758✔
3247
        if srv == nil || acc == nil {
63,758✔
3248
                return
×
3249
        }
×
3250

3251
        // Check that leaf messages respect the subject permissions.
3252
        if c.perms != nil && !c.leafMsgAllowed() {
63,758✔
3253
                c.leafPubPermViolation(c.pa.subject)
×
3254
                return
×
3255
        }
×
3256

3257
        // Match the subscriptions. We will use our own L1 map if
3258
        // it's still valid, avoiding contention on the shared sublist.
3259
        var r *SublistResult
63,758✔
3260
        var ok bool
63,758✔
3261

63,758✔
3262
        genid := atomic.LoadUint64(&c.acc.sl.genid)
63,758✔
3263
        if genid == c.in.genid && c.in.results != nil {
125,513✔
3264
                r, ok = c.in.results[subject]
61,755✔
3265
        } else {
63,758✔
3266
                // Reset our L1 completely.
2,003✔
3267
                c.in.results = make(map[string]*SublistResult)
2,003✔
3268
                c.in.genid = genid
2,003✔
3269
        }
2,003✔
3270

3271
        // Go back to the sublist data structure.
3272
        if !ok {
97,444✔
3273
                r = c.acc.sl.Match(subject)
33,686✔
3274
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
33,686✔
3275
                if len(c.in.results) >= maxResultCacheSize {
34,555✔
3276
                        n := 0
869✔
3277
                        for subj := range c.in.results {
29,546✔
3278
                                delete(c.in.results, subj)
28,677✔
3279
                                if n++; n > pruneSize {
29,546✔
3280
                                        break
869✔
3281
                                }
3282
                        }
3283
                }
3284
                // Then add the new cache entry.
3285
                c.in.results[subject] = r
33,686✔
3286
        }
3287

3288
        // Collect queue names if needed.
3289
        var qnames [][]byte
63,758✔
3290

63,758✔
3291
        // Check for no interest, short circuit if so.
63,758✔
3292
        // This is the fanout scale.
63,758✔
3293
        if len(r.psubs)+len(r.qsubs) > 0 {
127,179✔
3294
                flag := pmrNoFlag
63,421✔
3295
                // If we have queue subs in this cluster, then if we run in gateway
63,421✔
3296
                // mode and the remote gateways have queue subs, then we need to
63,421✔
3297
                // collect the queue groups this message was sent to so that we
63,421✔
3298
                // exclude them when sending to gateways.
63,421✔
3299
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
63,421✔
3300
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
75,706✔
3301
                        flag |= pmrCollectQueueNames
12,285✔
3302
                }
12,285✔
3303
                // If this is a mapped subject that means the mapped interest
3304
                // is what got us here, but this might not have a queue designation
3305
                // If that is the case, make sure we ignore to process local queue subscribers.
3306
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
63,669✔
3307
                        flag |= pmrIgnoreEmptyQueueFilter
248✔
3308
                }
248✔
3309
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
63,421✔
3310
        }
3311

3312
        // Now deal with gateways
3313
        if c.srv.gateway.enabled {
76,854✔
3314
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,096✔
3315
        }
13,096✔
3316
}
3317

3318
// Checks whether the inbound leaf message is allowed by the
3319
// connection's permissions. On the hub side this enforces what
3320
// the remote leaf may publish. On the spoke side this enforces
3321
// import restrictions such as deny_imports.
3322
func (c *client) leafMsgAllowed() bool {
59,865✔
3323
        wireSubject := c.pa.subject
59,865✔
3324
        if len(c.pa.mapped) > 0 {
60,113✔
3325
                // Mappings rewrite c.pa.subject to the internal
248✔
3326
                // destination. For leaf ACLs, need to check
248✔
3327
                // the original wire subject from the remote side.
248✔
3328
                wireSubject = c.pa.mapped
248✔
3329
        }
248✔
3330
        // Strip any gateway routing prefix for the permission check.
3331
        subjectToCheck, isGW := getGWRoutedSubjectOrSelf(wireSubject)
59,865✔
3332

59,865✔
3333
        // Service-import replies (_R_), JS ack subjects ($JS.ACK.)
59,865✔
3334
        // are internal routing subjects forwarded via LS+ without
59,865✔
3335
        // permission checks.
59,865✔
3336
        if isServiceReply(subjectToCheck) || isJSAckSubject(subjectToCheck) {
59,895✔
3337
                return true
30✔
3338
        }
30✔
3339

3340
        c.mu.RLock()
59,835✔
3341
        if c.isSpokeLeafNode() {
87,905✔
3342
                // Gateway routed replies are forwarded without
28,070✔
3343
                // permission checks.
28,070✔
3344
                if isGW || c.leafReceiveAllowed(subjectToCheck) {
56,140✔
3345
                        c.mu.RUnlock()
28,070✔
3346
                        return true
28,070✔
3347
                }
28,070✔
3348
        } else if c.leafSendAllowed(subjectToCheck) {
63,530✔
3349
                c.mu.RUnlock()
31,765✔
3350
                return true
31,765✔
3351
        }
31,765✔
3352

3353
        // If allow_responses is not configured, or there is no tracked reply for
3354
        // this subject, the answer is "denied" and we can return it while still
3355
        // holding only the read lock.
3356
        replySubject := bytesToString(wireSubject)
×
3357
        if c.perms == nil || c.perms.resp == nil || c.replies[replySubject] == nil {
×
3358
                c.mu.RUnlock()
×
3359
                return false
×
3360
        }
×
3361
        c.mu.RUnlock()
×
3362

×
3363
        // Check tracked reply permissions (allow_responses).
×
3364
        // Use the pre-strip subject since deliverMsg tracks
×
3365
        // replies under the original form, which includes
×
3366
        // the GW routing prefix for routed requests.
×
3367
        c.mu.Lock()
×
3368
        defer c.mu.Unlock()
×
3369
        return c.responseAllowed(replySubject)
×
3370
}
3371

3372
// Returns true if the leaf side ACLs allow importing this subject,
3373
// based on the permissions received over INFO and any local deny_imports.
3374
// At least a read lock must be held.
3375
func (c *client) leafReceiveAllowed(subject []byte) bool {
28,070✔
3376
        return c.canSubscribeInternal(bytesToString(subject))
28,070✔
3377
}
28,070✔
3378

3379
// Returns true if the hub side ACLs allow the remote leaf to send
3380
// this subject.
3381
// At least a read lock must be held.
3382
func (c *client) leafSendAllowed(bsubject []byte) bool {
31,765✔
3383
        // Use the original export ACL captured for this accepted leaf.
31,765✔
3384
        // The live perms also contain additional JetStream denies used by
31,765✔
3385
        // the normal forwarding path, and applying them here would reject
31,765✔
3386
        // legitimate inbound JS API requests.
31,765✔
3387
        subject := bytesToString(bsubject)
31,765✔
3388
        perms := c.opts.Export
31,765✔
3389
        if perms == nil || (perms.Allow == nil && perms.Deny == nil) {
63,527✔
3390
                return true
31,762✔
3391
        }
31,762✔
3392

3393
        allowed := true
3✔
3394
        if perms.Allow != nil && !strings.HasPrefix(subject, mqttPrefix) {
5✔
3395
                allowed = false
2✔
3396
                for _, allowSubj := range perms.Allow {
5✔
3397
                        if matchLiteral(subject, allowSubj) {
5✔
3398
                                allowed = true
2✔
3399
                                break
2✔
3400
                        }
3401
                }
3402
        }
3403

3404
        if allowed && len(perms.Deny) > 0 {
4✔
3405
                for _, denySubj := range perms.Deny {
2✔
3406
                        if matchLiteral(subject, denySubj) {
1✔
3407
                                allowed = false
×
3408
                                break
×
3409
                        }
3410
                }
3411
        }
3412
        return allowed
3✔
3413
}
3414

3415
// Handles a subscription permission violation.
3416
// See leafPermViolation() for details.
3417
func (c *client) leafSubPermViolation(subj []byte) {
324✔
3418
        c.leafPermViolation(false, subj)
324✔
3419
}
324✔
3420

3421
// Handles a publish permission violation.
3422
// See leafPermViolation() for details.
3423
func (c *client) leafPubPermViolation(subj []byte) {
×
3424
        c.leafPermViolation(true, subj)
×
3425
}
×
3426

3427
// Common function to process publish or subscribe leafnode permission violation.
3428
// Sends the permission violation error to the remote, logs it and closes the connection.
3429
// If this is from a server soliciting, the reconnection will be delayed.
3430
func (c *client) leafPermViolation(pub bool, subj []byte) {
324✔
3431
        if c.isSpokeLeafNode() {
648✔
3432
                // For spokes these are no-ops since the hub server told us our permissions.
324✔
3433
                // We just need to not send these over to the other side since we will get cutoff.
324✔
3434
                return
324✔
3435
        }
324✔
3436
        // FIXME(dlc) ?
3437
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
3438
        var action string
×
3439
        if pub {
×
3440
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
3441
                action = "Publish"
×
3442
        } else {
×
3443
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
3444
                action = "Subscription"
×
3445
        }
×
3446
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
3447
        // TODO: add a new close reason that is more appropriate?
×
3448
        c.closeConnection(ProtocolViolation)
×
3449
}
3450

3451
// Invoked from generic processErr() for LEAF connections.
3452
func (c *client) leafProcessErr(errStr string) {
47✔
3453
        // Check if we got a cluster name collision.
47✔
3454
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
50✔
3455
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
3✔
3456
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
3✔
3457
                return
3✔
3458
        }
3✔
3459
        if strings.Contains(errStr, ErrLeafNodeMinVersionRejected.Error()) {
45✔
3460
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeMinVersionReconnectDelay)
1✔
3461
                c.Errorf("Leafnode connection dropped due to minimum version requirement. Delaying attempt to reconnect for %v", delay)
1✔
3462
                return
1✔
3463
        }
1✔
3464

3465
        // We will look for Loop detected error coming from the other side.
3466
        // If we solicit, set the connect delay.
3467
        if !strings.Contains(errStr, "Loop detected") {
79✔
3468
                return
36✔
3469
        }
36✔
3470
        c.handleLeafNodeLoop(false)
7✔
3471
}
3472

3473
// If this leaf connection solicits, sets the connect delay to the given value,
3474
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
3475
// Returns the connection's account name and delay.
3476
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
18✔
3477
        c.mu.Lock()
18✔
3478
        if c.isSolicitedLeafNode() {
29✔
3479
                if s := c.srv; s != nil {
22✔
3480
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
15✔
3481
                                delay = srvdelay
4✔
3482
                        }
4✔
3483
                }
3484
                c.leaf.remote.setConnectDelay(delay)
11✔
3485
        }
3486
        var accName string
18✔
3487
        if c.acc != nil {
36✔
3488
                accName = c.acc.Name
18✔
3489
        }
18✔
3490
        c.mu.Unlock()
18✔
3491
        return accName, delay
18✔
3492
}
3493

3494
// For the given remote Leafnode configuration, this function returns
3495
// if TLS is required, and if so, will return a clone of the TLS Config
3496
// (since some fields will be changed during handshake), the TLS server
3497
// name that is remembered, and the TLS timeout.
3498
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,680✔
3499
        var (
1,680✔
3500
                tlsConfig  *tls.Config
1,680✔
3501
                tlsName    string
1,680✔
3502
                tlsTimeout float64
1,680✔
3503
        )
1,680✔
3504

1,680✔
3505
        remote.RLock()
1,680✔
3506
        defer remote.RUnlock()
1,680✔
3507

1,680✔
3508
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,680✔
3509
        if tlsRequired {
1,755✔
3510
                if remote.TLSConfig != nil {
126✔
3511
                        tlsConfig = remote.TLSConfig.Clone()
51✔
3512
                } else {
75✔
3513
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
24✔
3514
                }
24✔
3515
                tlsName = remote.tlsName
75✔
3516
                tlsTimeout = remote.TLSTimeout
75✔
3517
                if tlsTimeout == 0 {
116✔
3518
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
41✔
3519
                }
41✔
3520
        }
3521

3522
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,680✔
3523
}
3524

3525
// Initiates the LeafNode Websocket connection by:
3526
// - doing the TLS handshake if needed
3527
// - sending the HTTP request
3528
// - waiting for the HTTP response
3529
//
3530
// Since some bufio reader is used to consume the HTTP response, this function
3531
// returns the slice of buffered bytes (if any) so that the readLoop that will
3532
// be started after that consume those first before reading from the socket.
3533
// The boolean
3534
//
3535
// Lock held on entry.
3536
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
52✔
3537
        remote.RLock()
52✔
3538
        compress := remote.Websocket.Compression
52✔
3539
        // By default the server will mask outbound frames, but it can be disabled with this option.
52✔
3540
        noMasking := remote.Websocket.NoMasking
52✔
3541
        infoTimeout := remote.FirstInfoTimeout
52✔
3542
        remote.RUnlock()
52✔
3543
        // Will do the client-side TLS handshake if needed.
52✔
3544
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
52✔
3545
        if err != nil {
56✔
3546
                // 0 will indicate that the connection was already closed
4✔
3547
                return nil, 0, err
4✔
3548
        }
4✔
3549

3550
        // For http request, we need the passed URL to contain either http or https scheme.
3551
        scheme := "http"
48✔
3552
        if tlsRequired {
56✔
3553
                scheme = "https"
8✔
3554
        }
8✔
3555
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3556
        // create a LEAF connection, not a CLIENT.
3557
        // In case we use the user's URL path in the future, make sure we append the user's
3558
        // path to our `/leafnode` path.
3559
        lpath := leafNodeWSPath
48✔
3560
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
69✔
3561
                if curPath[0] == '/' {
42✔
3562
                        curPath = curPath[1:]
21✔
3563
                }
21✔
3564
                lpath = path.Join(curPath, lpath)
21✔
3565
        } else {
27✔
3566
                lpath = lpath[1:]
27✔
3567
        }
27✔
3568
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
48✔
3569
        u, _ := url.Parse(ustr)
48✔
3570
        req := &http.Request{
48✔
3571
                Method:     "GET",
48✔
3572
                URL:        u,
48✔
3573
                Proto:      "HTTP/1.1",
48✔
3574
                ProtoMajor: 1,
48✔
3575
                ProtoMinor: 1,
48✔
3576
                Header:     make(http.Header),
48✔
3577
                Host:       u.Host,
48✔
3578
        }
48✔
3579
        wsKey, err := wsMakeChallengeKey()
48✔
3580
        if err != nil {
48✔
3581
                return nil, WriteError, err
×
3582
        }
×
3583

3584
        req.Header["Upgrade"] = []string{"websocket"}
48✔
3585
        req.Header["Connection"] = []string{"Upgrade"}
48✔
3586
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
48✔
3587
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
48✔
3588
        if compress {
59✔
3589
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
11✔
3590
        }
11✔
3591
        if noMasking {
58✔
3592
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3593
        }
10✔
3594
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
48✔
3595
        if err := req.Write(c.nc); err != nil {
48✔
3596
                return nil, WriteError, err
×
3597
        }
×
3598

3599
        var resp *http.Response
48✔
3600

48✔
3601
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
48✔
3602
        resp, err = http.ReadResponse(br, req)
48✔
3603
        if err == nil &&
48✔
3604
                (resp.StatusCode != 101 ||
48✔
3605
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
48✔
3606
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
48✔
3607
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
49✔
3608

1✔
3609
                err = fmt.Errorf("invalid websocket connection")
1✔
3610
        }
1✔
3611
        // Check compression extension...
3612
        if err == nil && c.ws.compress {
59✔
3613
                // Check that not only permessage-deflate extension is present, but that
11✔
3614
                // we also have server and client no context take over.
11✔
3615
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
11✔
3616

11✔
3617
                // If server does not support compression, then simply disable it in our side.
11✔
3618
                if !srvCompress {
16✔
3619
                        c.ws.compress = false
5✔
3620
                } else if !noCtxTakeover {
11✔
3621
                        err = fmt.Errorf("compression negotiation error")
×
3622
                }
×
3623
        }
3624
        // Same for no masking...
3625
        if err == nil && noMasking {
58✔
3626
                // Check if server accepts no masking
10✔
3627
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3628
                        // Nope, need to mask our writes as any client would do.
1✔
3629
                        c.ws.maskwrite = true
1✔
3630
                }
1✔
3631
        }
3632
        if resp != nil {
81✔
3633
                resp.Body.Close()
33✔
3634
        }
33✔
3635
        if err != nil {
64✔
3636
                return nil, ReadError, err
16✔
3637
        }
16✔
3638
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
32✔
3639

32✔
3640
        var preBuf []byte
32✔
3641
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
32✔
3642
        if n := br.Buffered(); n != 0 {
32✔
3643
                preBuf, _ = br.Peek(n)
×
3644
        }
×
3645
        return preBuf, 0, nil
32✔
3646
}
3647

3648
const connectProcessTimeout = 2 * time.Second
3649

3650
// This is invoked for remote LEAF remote connections after processing the INFO
3651
// protocol.
3652
func (s *Server) leafNodeResumeConnectProcess(c *client) {
597✔
3653
        clusterName := s.ClusterName()
597✔
3654

597✔
3655
        c.mu.Lock()
597✔
3656
        if c.isClosed() {
597✔
3657
                c.mu.Unlock()
×
3658
                return
×
3659
        }
×
3660
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
599✔
3661
                c.mu.Unlock()
2✔
3662
                c.closeConnection(WriteError)
2✔
3663
                return
2✔
3664
        }
2✔
3665

3666
        // Spin up the write loop.
3667
        s.startGoRoutine(func() { c.writeLoop() })
1,190✔
3668

3669
        // timeout leafNodeFinishConnectProcess
3670
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
595✔
3671
                c.mu.Lock()
×
3672
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3673
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3674
                        c.mu.Unlock()
×
3675
                        return
×
3676
                }
×
3677
                clearTimer(&c.ping.tmr)
×
3678
                closed := c.isClosed()
×
3679
                c.mu.Unlock()
×
3680
                if !closed {
×
3681
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3682
                        c.closeConnection(StaleConnection)
×
3683
                }
×
3684
        })
3685
        c.mu.Unlock()
595✔
3686
        c.Debugf("Remote leafnode connect msg sent")
595✔
3687
}
3688

3689
// This is invoked for remote LEAF connections after processing the INFO
3690
// protocol and leafNodeResumeConnectProcess.
3691
// This will send LS+ the CONNECT protocol and register the leaf node.
3692
func (s *Server) leafNodeFinishConnectProcess(c *client) {
559✔
3693
        c.mu.Lock()
559✔
3694
        if !c.flags.setIfNotSet(connectProcessFinished) {
559✔
3695
                c.mu.Unlock()
×
3696
                return
×
3697
        }
×
3698
        if c.isClosed() {
559✔
3699
                c.mu.Unlock()
×
3700
                s.removeLeafNodeConnection(c)
×
3701
                return
×
3702
        }
×
3703
        remote := c.leaf.remote
559✔
3704
        if remote == nil || c.acc == nil {
559✔
3705
                c.mu.Unlock()
×
3706
                c.sendErr("Authorization Violation")
×
3707
                c.closeConnection(ProtocolViolation)
×
3708
                return
×
3709
        }
×
3710
        // Check if we will need to send the system connect event.
3711
        remote.RLock()
559✔
3712
        sendSysConnectEvent := remote.Hub
559✔
3713
        remote.RUnlock()
559✔
3714

559✔
3715
        // Capture account before releasing lock
559✔
3716
        acc := c.acc
559✔
3717
        // cancel connectProcessTimeout
559✔
3718
        clearTimer(&c.ping.tmr)
559✔
3719
        c.mu.Unlock()
559✔
3720

559✔
3721
        // Make sure we register with the account here.
559✔
3722
        if err := c.registerWithAccount(acc); err != nil {
561✔
3723
                if err == ErrTooManyAccountConnections {
2✔
3724
                        c.maxAccountConnExceeded()
×
3725
                        return
×
3726
                } else if err == ErrLeafNodeLoop {
4✔
3727
                        c.handleLeafNodeLoop(true)
2✔
3728
                        return
2✔
3729
                }
2✔
3730
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3731
                c.closeConnection(ProtocolViolation)
×
3732
                return
×
3733
        }
3734
        if !s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false) {
557✔
3735
                // Was not added, could be because the remote configuration has been removed.
×
3736
                c.closeConnection(ClientClosed)
×
3737
                return
×
3738
        }
×
3739
        s.initLeafNodeSmapAndSendSubs(c)
557✔
3740
        if sendSysConnectEvent {
569✔
3741
                s.sendLeafNodeConnect(acc)
12✔
3742
        }
12✔
3743
        s.accountConnectEvent(c)
557✔
3744

557✔
3745
        // The above functions are not running under the client lock, so it is
557✔
3746
        // possible that between the time we have started the read/write loops
557✔
3747
        // and now, that the connection was closed. This would leave the closed
557✔
3748
        // LN connection possibly registered with the account and/or the server's
557✔
3749
        // leafs map. So check if connection is closed, and if so, manually cleanup.
557✔
3750
        c.mu.Lock()
557✔
3751
        closed := c.isClosed()
557✔
3752
        if !closed {
1,114✔
3753
                c.setFirstPingTimer()
557✔
3754
        }
557✔
3755
        c.mu.Unlock()
557✔
3756
        if closed {
557✔
3757
                s.removeLeafNodeConnection(c)
×
3758
                if prev := acc.removeClient(c); prev == 1 {
×
3759
                        s.decActiveAccounts()
×
3760
                }
×
3761
        }
3762
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc