• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 17339499407

29 Aug 2025 05:47PM UTC coverage: 86.039% (-0.02%) from 86.062%
17339499407

push

github

web-flow
(2.12) Atomic batch: support isolated reads (#7175)

To achieve isolated reads we now keep holding the `mset.mu`/stream lock
while writing as well as when doing reads. This ensures a batch writing
N messages will be atomically observed by any reads, instead of them
observing only part of the batch.

This PR also resolves an issue with rollups as the `mset.mu` lock was
previously unlocked and re-acquired in `processJetStreamMsg`. This
allowed for an inconsistent/non-isolated read when rollups are used,
where the new message could be observed but the rollup was not applied
yet. This would probably be less of an obvious issue than reading only
part of a batch with the new atomic batch support though.

This PR also resolves off-by-one inconsistencies in the num pending
count that was returned as part of direct batch get and direct multi
get. The num pending that's reported using those reads now matches
exactly what would have been reported if a consumer was used instead.

Resolves https://github.com/nats-io/nats-server/issues/7128

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>

73981 of 85985 relevant lines covered (86.04%)

335785.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.1
/server/leafnode.go
1
// Copyright 2019-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bufio"
18
        "bytes"
19
        "crypto/tls"
20
        "encoding/base64"
21
        "encoding/json"
22
        "fmt"
23
        "io"
24
        "math/rand"
25
        "net"
26
        "net/http"
27
        "net/url"
28
        "os"
29
        "path"
30
        "reflect"
31
        "regexp"
32
        "runtime"
33
        "strconv"
34
        "strings"
35
        "sync"
36
        "sync/atomic"
37
        "time"
38

39
        "github.com/klauspost/compress/s2"
40
        "github.com/nats-io/jwt/v2"
41
        "github.com/nats-io/nkeys"
42
        "github.com/nats-io/nuid"
43
)
44

45
const (
46
        // Warning when user configures leafnode TLS insecure
47
        leafnodeTLSInsecureWarning = "TLS certificate chain and hostname of solicited leafnodes will not be verified. DO NOT USE IN PRODUCTION!"
48

49
        // When a loop is detected, delay the reconnect of solicited connection.
50
        leafNodeReconnectDelayAfterLoopDetected = 30 * time.Second
51

52
        // When a server receives a message causing a permission violation, the
53
        // connection is closed and it won't attempt to reconnect for that long.
54
        leafNodeReconnectAfterPermViolation = 30 * time.Second
55

56
        // When we have the same cluster name as the hub.
57
        leafNodeReconnectDelayAfterClusterNameSame = 30 * time.Second
58

59
        // Prefix for loop detection subject
60
        leafNodeLoopDetectionSubjectPrefix = "$LDS."
61

62
        // Path added to URL to indicate to WS server that the connection is a
63
        // LEAF connection as opposed to a CLIENT.
64
        leafNodeWSPath = "/leafnode"
65

66
        // This is the time the server will wait, when receiving a CONNECT,
67
        // before closing the connection if the required minimum version is not met.
68
        leafNodeWaitBeforeClose = 5 * time.Second
69
)
70

71
type leaf struct {
72
        // We have any auth stuff here for solicited connections.
73
        remote *leafNodeCfg
74
        // isSpoke tells us what role we are playing.
75
        // Used when we receive a connection but otherside tells us they are a hub.
76
        isSpoke bool
77
        // remoteCluster is when we are a hub but the spoke leafnode is part of a cluster.
78
        remoteCluster string
79
        // remoteServer holds onto the remove server's name or ID.
80
        remoteServer string
81
        // domain name of remote server
82
        remoteDomain string
83
        // account name of remote server
84
        remoteAccName string
85
        // Used to suppress sub and unsub interest. Same as routes but our audience
86
        // here is tied to this leaf node. This will hold all subscriptions except this
87
        // leaf nodes. This represents all the interest we want to send to the other side.
88
        smap map[string]int32
89
        // This map will contain all the subscriptions that have been added to the smap
90
        // during initLeafNodeSmapAndSendSubs. It is short lived and is there to avoid
91
        // race between processing of a sub where sub is added to account sublist but
92
        // updateSmap has not be called on that "thread", while in the LN readloop,
93
        // when processing CONNECT, initLeafNodeSmapAndSendSubs is invoked and add
94
        // this subscription to smap. When processing of the sub then calls updateSmap,
95
        // we would add it a second time in the smap causing later unsub to suppress the LS-.
96
        tsub  map[*subscription]struct{}
97
        tsubt *time.Timer
98
        // Selected compression mode, which may be different from the server configured mode.
99
        compression string
100
        // This is for GW map replies.
101
        gwSub *subscription
102
}
103

104
// Used for remote (solicited) leafnodes.
105
type leafNodeCfg struct {
106
        sync.RWMutex
107
        *RemoteLeafOpts
108
        urls           []*url.URL
109
        curURL         *url.URL
110
        tlsName        string
111
        username       string
112
        password       string
113
        perms          *Permissions
114
        connDelay      time.Duration // Delay before a connect, could be used while detecting loop condition, etc..
115
        jsMigrateTimer *time.Timer
116
}
117

118
// Check to see if this is a solicited leafnode. We do special processing for solicited.
119
func (c *client) isSolicitedLeafNode() bool {
2,024✔
120
        return c.kind == LEAF && c.leaf.remote != nil
2,024✔
121
}
2,024✔
122

123
// Returns true if this is a solicited leafnode and is not configured to be treated as a hub or a receiving
124
// connection leafnode where the otherside has declared itself to be the hub.
125
func (c *client) isSpokeLeafNode() bool {
6,032,736✔
126
        return c.kind == LEAF && c.leaf.isSpoke
6,032,736✔
127
}
6,032,736✔
128

129
func (c *client) isHubLeafNode() bool {
18,384✔
130
        return c.kind == LEAF && !c.leaf.isSpoke
18,384✔
131
}
18,384✔
132

133
// This will spin up go routines to solicit the remote leaf node connections.
134
func (s *Server) solicitLeafNodeRemotes(remotes []*RemoteLeafOpts) {
1,152✔
135
        sysAccName := _EMPTY_
1,152✔
136
        sAcc := s.SystemAccount()
1,152✔
137
        if sAcc != nil {
2,281✔
138
                sysAccName = sAcc.Name
1,129✔
139
        }
1,129✔
140
        addRemote := func(r *RemoteLeafOpts, isSysAccRemote bool) *leafNodeCfg {
2,441✔
141
                s.mu.Lock()
1,289✔
142
                remote := newLeafNodeCfg(r)
1,289✔
143
                creds := remote.Credentials
1,289✔
144
                accName := remote.LocalAccount
1,289✔
145
                s.leafRemoteCfgs = append(s.leafRemoteCfgs, remote)
1,289✔
146
                // Print notice if
1,289✔
147
                if isSysAccRemote {
1,383✔
148
                        if len(remote.DenyExports) > 0 {
95✔
149
                                s.Noticef("Remote for System Account uses restricted export permissions")
1✔
150
                        }
1✔
151
                        if len(remote.DenyImports) > 0 {
95✔
152
                                s.Noticef("Remote for System Account uses restricted import permissions")
1✔
153
                        }
1✔
154
                }
155
                s.mu.Unlock()
1,289✔
156
                if creds != _EMPTY_ {
1,339✔
157
                        contents, err := os.ReadFile(creds)
50✔
158
                        defer wipeSlice(contents)
50✔
159
                        if err != nil {
50✔
160
                                s.Errorf("Error reading LeafNode Remote Credentials file %q: %v", creds, err)
×
161
                        } else if items := credsRe.FindAllSubmatch(contents, -1); len(items) < 2 {
50✔
162
                                s.Errorf("LeafNode Remote Credentials file %q malformed", creds)
×
163
                        } else if _, err := nkeys.FromSeed(items[1][1]); err != nil {
50✔
164
                                s.Errorf("LeafNode Remote Credentials file %q has malformed seed", creds)
×
165
                        } else if uc, err := jwt.DecodeUserClaims(string(items[0][1])); err != nil {
50✔
166
                                s.Errorf("LeafNode Remote Credentials file %q has malformed user jwt", creds)
×
167
                        } else if isSysAccRemote {
54✔
168
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
5✔
169
                                        s.Noticef("LeafNode Remote for System Account uses credentials file %q with restricted permissions", creds)
1✔
170
                                }
1✔
171
                        } else {
46✔
172
                                if !uc.Permissions.Pub.Empty() || !uc.Permissions.Sub.Empty() || uc.Permissions.Resp != nil {
52✔
173
                                        s.Noticef("LeafNode Remote for Account %s uses credentials file %q with restricted permissions", accName, creds)
6✔
174
                                }
6✔
175
                        }
176
                }
177
                return remote
1,289✔
178
        }
179
        for _, r := range remotes {
2,441✔
180
                // We need to call this, even if the leaf is disabled. This is so that
1,289✔
181
                // the number of internal configuration matches the options' remote leaf
1,289✔
182
                // configuration required for configuration reload.
1,289✔
183
                remote := addRemote(r, r.LocalAccount == sysAccName)
1,289✔
184
                if !r.Disabled {
2,577✔
185
                        s.startGoRoutine(func() { s.connectToRemoteLeafNode(remote, true) })
2,576✔
186
                }
187
        }
188
}
189

190
func (s *Server) remoteLeafNodeStillValid(remote *leafNodeCfg) bool {
7,691✔
191
        if remote.Disabled {
7,692✔
192
                return false
1✔
193
        }
1✔
194
        for _, ri := range s.getOpts().LeafNode.Remotes {
15,740✔
195
                // FIXME(dlc) - What about auth changes?
8,050✔
196
                if reflect.DeepEqual(ri.URLs, remote.URLs) {
15,740✔
197
                        return true
7,690✔
198
                }
7,690✔
199
        }
200
        return false
×
201
}
202

203
// Ensure that leafnode is properly configured.
204
func validateLeafNode(o *Options) error {
8,648✔
205
        if err := validateLeafNodeAuthOptions(o); err != nil {
8,650✔
206
                return err
2✔
207
        }
2✔
208

209
        // Users can bind to any local account, if its empty we will assume the $G account.
210
        for _, r := range o.LeafNode.Remotes {
9,976✔
211
                if r.LocalAccount == _EMPTY_ {
1,769✔
212
                        r.LocalAccount = globalAccountName
439✔
213
                }
439✔
214
        }
215

216
        // In local config mode, check that leafnode configuration refers to accounts that exist.
217
        if len(o.TrustedOperators) == 0 {
16,971✔
218
                accNames := map[string]struct{}{}
8,325✔
219
                for _, a := range o.Accounts {
17,434✔
220
                        accNames[a.Name] = struct{}{}
9,109✔
221
                }
9,109✔
222
                // global account is always created
223
                accNames[DEFAULT_GLOBAL_ACCOUNT] = struct{}{}
8,325✔
224
                // in the context of leaf nodes, empty account means global account
8,325✔
225
                accNames[_EMPTY_] = struct{}{}
8,325✔
226
                // system account either exists or, if not disabled, will be created
8,325✔
227
                if o.SystemAccount == _EMPTY_ && !o.NoSystemAccount {
14,937✔
228
                        accNames[DEFAULT_SYSTEM_ACCOUNT] = struct{}{}
6,612✔
229
                }
6,612✔
230
                checkAccountExists := func(accName string, cfgType string) error {
17,986✔
231
                        if _, ok := accNames[accName]; !ok {
9,663✔
232
                                return fmt.Errorf("cannot find local account %q specified in leafnode %s", accName, cfgType)
2✔
233
                        }
2✔
234
                        return nil
9,659✔
235
                }
236
                if err := checkAccountExists(o.LeafNode.Account, "authorization"); err != nil {
8,326✔
237
                        return err
1✔
238
                }
1✔
239
                for _, lu := range o.LeafNode.Users {
8,341✔
240
                        if lu.Account == nil { // means global account
27✔
241
                                continue
10✔
242
                        }
243
                        if err := checkAccountExists(lu.Account.Name, "authorization"); err != nil {
7✔
244
                                return err
×
245
                        }
×
246
                }
247
                for _, r := range o.LeafNode.Remotes {
9,653✔
248
                        if err := checkAccountExists(r.LocalAccount, "remote"); err != nil {
1,330✔
249
                                return err
1✔
250
                        }
1✔
251
                }
252
        } else {
321✔
253
                if len(o.LeafNode.Users) != 0 {
322✔
254
                        return fmt.Errorf("operator mode does not allow specifying users in leafnode config")
1✔
255
                }
1✔
256
                for _, r := range o.LeafNode.Remotes {
321✔
257
                        if !nkeys.IsValidPublicAccountKey(r.LocalAccount) {
2✔
258
                                return fmt.Errorf(
1✔
259
                                        "operator mode requires account nkeys in remotes. " +
1✔
260
                                                "Please add an `account` key to each remote in your `leafnodes` section, to assign it to an account. " +
1✔
261
                                                "Each account value should be a 56 character public key, starting with the letter 'A'")
1✔
262
                        }
1✔
263
                }
264
                if o.LeafNode.Port != 0 && o.LeafNode.Account != "" && !nkeys.IsValidPublicAccountKey(o.LeafNode.Account) {
320✔
265
                        return fmt.Errorf("operator mode and non account nkeys are incompatible")
1✔
266
                }
1✔
267
        }
268

269
        // Validate compression settings
270
        if o.LeafNode.Compression.Mode != _EMPTY_ {
12,695✔
271
                if err := validateAndNormalizeCompressionOption(&o.LeafNode.Compression, CompressionS2Auto); err != nil {
4,059✔
272
                        return err
5✔
273
                }
5✔
274
        }
275

276
        // If a remote has a websocket scheme, all need to have it.
277
        for _, rcfg := range o.LeafNode.Remotes {
9,964✔
278
                if len(rcfg.URLs) >= 2 {
1,536✔
279
                        firstIsWS, ok := isWSURL(rcfg.URLs[0]), true
208✔
280
                        for i := 1; i < len(rcfg.URLs); i++ {
661✔
281
                                u := rcfg.URLs[i]
453✔
282
                                if isWS := isWSURL(u); isWS && !firstIsWS || !isWS && firstIsWS {
460✔
283
                                        ok = false
7✔
284
                                        break
7✔
285
                                }
286
                        }
287
                        if !ok {
215✔
288
                                return fmt.Errorf("remote leaf node configuration cannot have a mix of websocket and non-websocket urls: %q", redactURLList(rcfg.URLs))
7✔
289
                        }
7✔
290
                }
291
                // Validate compression settings
292
                if rcfg.Compression.Mode != _EMPTY_ {
2,642✔
293
                        if err := validateAndNormalizeCompressionOption(&rcfg.Compression, CompressionS2Auto); err != nil {
1,326✔
294
                                return err
5✔
295
                        }
5✔
296
                }
297
        }
298

299
        if o.LeafNode.Port == 0 {
13,777✔
300
                return nil
5,153✔
301
        }
5,153✔
302

303
        // If MinVersion is defined, check that it is valid.
304
        if mv := o.LeafNode.MinVersion; mv != _EMPTY_ {
3,475✔
305
                if err := checkLeafMinVersionConfig(mv); err != nil {
6✔
306
                        return err
2✔
307
                }
2✔
308
        }
309

310
        // The checks below will be done only when detecting that we are configured
311
        // with gateways. So if an option validation needs to be done regardless,
312
        // it MUST be done before this point!
313

314
        if o.Gateway.Name == _EMPTY_ && o.Gateway.Port == 0 {
6,254✔
315
                return nil
2,785✔
316
        }
2,785✔
317
        // If we are here we have both leaf nodes and gateways defined, make sure there
318
        // is a system account defined.
319
        if o.SystemAccount == _EMPTY_ {
685✔
320
                return fmt.Errorf("leaf nodes and gateways (both being defined) require a system account to also be configured")
1✔
321
        }
1✔
322
        if err := validatePinnedCerts(o.LeafNode.TLSPinnedCerts); err != nil {
683✔
323
                return fmt.Errorf("leafnode: %v", err)
×
324
        }
×
325
        return nil
683✔
326
}
327

328
func checkLeafMinVersionConfig(mv string) error {
8✔
329
        if ok, err := versionAtLeastCheckError(mv, 2, 8, 0); !ok || err != nil {
12✔
330
                if err != nil {
6✔
331
                        return fmt.Errorf("invalid leafnode's minimum version: %v", err)
2✔
332
                } else {
4✔
333
                        return fmt.Errorf("the minimum version should be at least 2.8.0")
2✔
334
                }
2✔
335
        }
336
        return nil
4✔
337
}
338

339
// Used to validate user names in LeafNode configuration.
340
// - rejects mix of single and multiple users.
341
// - rejects duplicate user names.
342
func validateLeafNodeAuthOptions(o *Options) error {
8,707✔
343
        if len(o.LeafNode.Users) == 0 {
17,387✔
344
                return nil
8,680✔
345
        }
8,680✔
346
        if o.LeafNode.Username != _EMPTY_ {
29✔
347
                return fmt.Errorf("can not have a single user/pass and a users array")
2✔
348
        }
2✔
349
        if o.LeafNode.Nkey != _EMPTY_ {
25✔
350
                return fmt.Errorf("can not have a single nkey and a users array")
×
351
        }
×
352
        users := map[string]struct{}{}
25✔
353
        for _, u := range o.LeafNode.Users {
66✔
354
                if _, exists := users[u.Username]; exists {
43✔
355
                        return fmt.Errorf("duplicate user %q detected in leafnode authorization", u.Username)
2✔
356
                }
2✔
357
                users[u.Username] = struct{}{}
39✔
358
        }
359
        return nil
23✔
360
}
361

362
// Update remote LeafNode TLS configurations after a config reload.
363
func (s *Server) updateRemoteLeafNodesTLSConfig(opts *Options) {
15✔
364
        max := len(opts.LeafNode.Remotes)
15✔
365
        if max == 0 {
15✔
366
                return
×
367
        }
×
368

369
        s.mu.RLock()
15✔
370
        defer s.mu.RUnlock()
15✔
371

15✔
372
        // Changes in the list of remote leaf nodes is not supported.
15✔
373
        // However, make sure that we don't go over the arrays.
15✔
374
        if len(s.leafRemoteCfgs) < max {
15✔
375
                max = len(s.leafRemoteCfgs)
×
376
        }
×
377
        for i := 0; i < max; i++ {
34✔
378
                ro := opts.LeafNode.Remotes[i]
19✔
379
                cfg := s.leafRemoteCfgs[i]
19✔
380
                if ro.TLSConfig != nil {
21✔
381
                        cfg.Lock()
2✔
382
                        cfg.TLSConfig = ro.TLSConfig.Clone()
2✔
383
                        cfg.TLSHandshakeFirst = ro.TLSHandshakeFirst
2✔
384
                        cfg.Unlock()
2✔
385
                }
2✔
386
        }
387
}
388

389
func (s *Server) reConnectToRemoteLeafNode(remote *leafNodeCfg) {
236✔
390
        delay := s.getOpts().LeafNode.ReconnectInterval
236✔
391
        select {
236✔
392
        case <-time.After(delay):
181✔
393
        case <-s.quitCh:
55✔
394
                s.grWG.Done()
55✔
395
                return
55✔
396
        }
397
        s.connectToRemoteLeafNode(remote, false)
181✔
398
}
399

400
// Creates a leafNodeCfg object that wraps the RemoteLeafOpts.
401
func newLeafNodeCfg(remote *RemoteLeafOpts) *leafNodeCfg {
1,289✔
402
        cfg := &leafNodeCfg{
1,289✔
403
                RemoteLeafOpts: remote,
1,289✔
404
                urls:           make([]*url.URL, 0, len(remote.URLs)),
1,289✔
405
        }
1,289✔
406
        if len(remote.DenyExports) > 0 || len(remote.DenyImports) > 0 {
1,297✔
407
                perms := &Permissions{}
8✔
408
                if len(remote.DenyExports) > 0 {
16✔
409
                        perms.Publish = &SubjectPermission{Deny: remote.DenyExports}
8✔
410
                }
8✔
411
                if len(remote.DenyImports) > 0 {
15✔
412
                        perms.Subscribe = &SubjectPermission{Deny: remote.DenyImports}
7✔
413
                }
7✔
414
                cfg.perms = perms
8✔
415
        }
416
        // Start with the one that is configured. We will add to this
417
        // array when receiving async leafnode INFOs.
418
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,289✔
419
        // If allowed to randomize, do it on our copy of URLs
1,289✔
420
        if !remote.NoRandomize {
2,576✔
421
                rand.Shuffle(len(cfg.urls), func(i, j int) {
1,697✔
422
                        cfg.urls[i], cfg.urls[j] = cfg.urls[j], cfg.urls[i]
410✔
423
                })
410✔
424
        }
425
        // If we are TLS make sure we save off a proper servername if possible.
426
        // Do same for user/password since we may need them to connect to
427
        // a bare URL that we get from INFO protocol.
428
        for _, u := range cfg.urls {
3,018✔
429
                cfg.saveTLSHostname(u)
1,729✔
430
                cfg.saveUserPassword(u)
1,729✔
431
                // If the url(s) have the "wss://" scheme, and we don't have a TLS
1,729✔
432
                // config, mark that we should be using TLS anyway.
1,729✔
433
                if !cfg.TLS && isWSSURL(u) {
1,730✔
434
                        cfg.TLS = true
1✔
435
                }
1✔
436
        }
437
        return cfg
1,289✔
438
}
439

440
// Will pick an URL from the list of available URLs.
441
func (cfg *leafNodeCfg) pickNextURL() *url.URL {
6,920✔
442
        cfg.Lock()
6,920✔
443
        defer cfg.Unlock()
6,920✔
444
        // If the current URL is the first in the list and we have more than
6,920✔
445
        // one URL, then move that one to end of the list.
6,920✔
446
        if cfg.curURL != nil && len(cfg.urls) > 1 && urlsAreEqual(cfg.curURL, cfg.urls[0]) {
10,485✔
447
                first := cfg.urls[0]
3,565✔
448
                copy(cfg.urls, cfg.urls[1:])
3,565✔
449
                cfg.urls[len(cfg.urls)-1] = first
3,565✔
450
        }
3,565✔
451
        cfg.curURL = cfg.urls[0]
6,920✔
452
        return cfg.curURL
6,920✔
453
}
454

455
// Returns the current URL
456
func (cfg *leafNodeCfg) getCurrentURL() *url.URL {
76✔
457
        cfg.RLock()
76✔
458
        defer cfg.RUnlock()
76✔
459
        return cfg.curURL
76✔
460
}
76✔
461

462
// Returns how long the server should wait before attempting
463
// to solicit a remote leafnode connection.
464
func (cfg *leafNodeCfg) getConnectDelay() time.Duration {
1,471✔
465
        cfg.RLock()
1,471✔
466
        delay := cfg.connDelay
1,471✔
467
        cfg.RUnlock()
1,471✔
468
        return delay
1,471✔
469
}
1,471✔
470

471
// Sets the connect delay.
472
func (cfg *leafNodeCfg) setConnectDelay(delay time.Duration) {
150✔
473
        cfg.Lock()
150✔
474
        cfg.connDelay = delay
150✔
475
        cfg.Unlock()
150✔
476
}
150✔
477

478
// Ensure that non-exported options (used in tests) have
479
// been properly set.
480
func (s *Server) setLeafNodeNonExportedOptions() {
7,112✔
481
        opts := s.getOpts()
7,112✔
482
        s.leafNodeOpts.dialTimeout = opts.LeafNode.dialTimeout
7,112✔
483
        if s.leafNodeOpts.dialTimeout == 0 {
14,223✔
484
                // Use same timeouts as routes for now.
7,111✔
485
                s.leafNodeOpts.dialTimeout = DEFAULT_ROUTE_DIAL
7,111✔
486
        }
7,111✔
487
        s.leafNodeOpts.resolver = opts.LeafNode.resolver
7,112✔
488
        if s.leafNodeOpts.resolver == nil {
14,220✔
489
                s.leafNodeOpts.resolver = net.DefaultResolver
7,108✔
490
        }
7,108✔
491
}
492

493
const sharedSysAccDelay = 250 * time.Millisecond
494

495
func (s *Server) connectToRemoteLeafNode(remote *leafNodeCfg, firstConnect bool) {
1,471✔
496
        defer s.grWG.Done()
1,471✔
497

1,471✔
498
        if remote == nil || len(remote.URLs) == 0 {
1,471✔
499
                s.Debugf("Empty remote leafnode definition, nothing to connect")
×
500
                return
×
501
        }
×
502

503
        opts := s.getOpts()
1,471✔
504
        reconnectDelay := opts.LeafNode.ReconnectInterval
1,471✔
505
        s.mu.RLock()
1,471✔
506
        dialTimeout := s.leafNodeOpts.dialTimeout
1,471✔
507
        resolver := s.leafNodeOpts.resolver
1,471✔
508
        var isSysAcc bool
1,471✔
509
        if s.eventsEnabled() {
2,909✔
510
                isSysAcc = remote.LocalAccount == s.sys.account.Name
1,438✔
511
        }
1,438✔
512
        jetstreamMigrateDelay := remote.JetStreamClusterMigrateDelay
1,471✔
513
        s.mu.RUnlock()
1,471✔
514

1,471✔
515
        // If we are sharing a system account and we are not standalone delay to gather some info prior.
1,471✔
516
        if firstConnect && isSysAcc && !s.standAloneMode() {
1,542✔
517
                s.Debugf("Will delay first leafnode connect to shared system account due to clustering")
71✔
518
                remote.setConnectDelay(sharedSysAccDelay)
71✔
519
        }
71✔
520

521
        if connDelay := remote.getConnectDelay(); connDelay > 0 {
1,548✔
522
                select {
77✔
523
                case <-time.After(connDelay):
70✔
524
                case <-s.quitCh:
7✔
525
                        return
7✔
526
                }
527
                remote.setConnectDelay(0)
70✔
528
        }
529

530
        var conn net.Conn
1,464✔
531

1,464✔
532
        const connErrFmt = "Error trying to connect as leafnode to remote server %q (attempt %v): %v"
1,464✔
533

1,464✔
534
        attempts := 0
1,464✔
535

1,464✔
536
        for s.isRunning() && s.remoteLeafNodeStillValid(remote) {
8,384✔
537
                rURL := remote.pickNextURL()
6,920✔
538
                url, err := s.getRandomIP(resolver, rURL.Host, nil)
6,920✔
539
                if err == nil {
13,833✔
540
                        var ipStr string
6,913✔
541
                        if url != rURL.Host {
6,989✔
542
                                ipStr = fmt.Sprintf(" (%s)", url)
76✔
543
                        }
76✔
544
                        // Some test may want to disable remotes from connecting
545
                        if s.isLeafConnectDisabled() {
7,044✔
546
                                s.Debugf("Will not attempt to connect to remote server on %q%s, leafnodes currently disabled", rURL.Host, ipStr)
131✔
547
                                err = ErrLeafNodeDisabled
131✔
548
                        } else {
6,913✔
549
                                s.Debugf("Trying to connect as leafnode to remote server on %q%s", rURL.Host, ipStr)
6,782✔
550
                                conn, err = natsDialTimeout("tcp", url, dialTimeout)
6,782✔
551
                        }
6,782✔
552
                }
553
                if err != nil {
13,070✔
554
                        jitter := time.Duration(rand.Int63n(int64(reconnectDelay)))
6,150✔
555
                        delay := reconnectDelay + jitter
6,150✔
556
                        attempts++
6,150✔
557
                        if s.shouldReportConnectErr(firstConnect, attempts) {
10,430✔
558
                                s.Errorf(connErrFmt, rURL.Host, attempts, err)
4,280✔
559
                        } else {
6,150✔
560
                                s.Debugf(connErrFmt, rURL.Host, attempts, err)
1,870✔
561
                        }
1,870✔
562
                        remote.Lock()
6,150✔
563
                        // if we are using a delay to start migrating assets, kick off a migrate timer.
6,150✔
564
                        if remote.jsMigrateTimer == nil && jetstreamMigrateDelay > 0 {
6,158✔
565
                                remote.jsMigrateTimer = time.AfterFunc(jetstreamMigrateDelay, func() {
16✔
566
                                        s.checkJetStreamMigrate(remote)
8✔
567
                                })
8✔
568
                        }
569
                        remote.Unlock()
6,150✔
570
                        select {
6,150✔
571
                        case <-s.quitCh:
686✔
572
                                remote.cancelMigrateTimer()
686✔
573
                                return
686✔
574
                        case <-time.After(delay):
5,463✔
575
                                // Check if we should migrate any JetStream assets immediately while this remote is down.
5,463✔
576
                                // This will be used if JetStreamClusterMigrateDelay was not set
5,463✔
577
                                if jetstreamMigrateDelay == 0 {
10,856✔
578
                                        s.checkJetStreamMigrate(remote)
5,393✔
579
                                }
5,393✔
580
                                continue
5,463✔
581
                        }
582
                }
583
                remote.cancelMigrateTimer()
770✔
584
                if !s.remoteLeafNodeStillValid(remote) {
770✔
585
                        conn.Close()
×
586
                        return
×
587
                }
×
588

589
                // We have a connection here to a remote server.
590
                // Go ahead and create our leaf node and return.
591
                s.createLeafNode(conn, rURL, remote, nil)
770✔
592

770✔
593
                // Clear any observer states if we had them.
770✔
594
                s.clearObserverState(remote)
770✔
595

770✔
596
                return
770✔
597
        }
598
}
599

600
func (cfg *leafNodeCfg) cancelMigrateTimer() {
1,456✔
601
        cfg.Lock()
1,456✔
602
        stopAndClearTimer(&cfg.jsMigrateTimer)
1,456✔
603
        cfg.Unlock()
1,456✔
604
}
1,456✔
605

606
// This will clear any observer state such that stream or consumer assets on this server can become leaders again.
607
func (s *Server) clearObserverState(remote *leafNodeCfg) {
770✔
608
        s.mu.RLock()
770✔
609
        accName := remote.LocalAccount
770✔
610
        s.mu.RUnlock()
770✔
611

770✔
612
        acc, err := s.LookupAccount(accName)
770✔
613
        if err != nil {
772✔
614
                s.Warnf("Error looking up account [%s] checking for JetStream clear observer state on a leafnode", accName)
2✔
615
                return
2✔
616
        }
2✔
617

618
        acc.jscmMu.Lock()
768✔
619
        defer acc.jscmMu.Unlock()
768✔
620

768✔
621
        // Walk all streams looking for any clustered stream, skip otherwise.
768✔
622
        for _, mset := range acc.streams() {
787✔
623
                node := mset.raftNode()
19✔
624
                if node == nil {
30✔
625
                        // Not R>1
11✔
626
                        continue
11✔
627
                }
628
                // Check consumers
629
                for _, o := range mset.getConsumers() {
10✔
630
                        if n := o.raftNode(); n != nil {
4✔
631
                                // Ensure we can become a leader again.
2✔
632
                                n.SetObserver(false)
2✔
633
                        }
2✔
634
                }
635
                // Ensure we can not become a leader again.
636
                node.SetObserver(false)
8✔
637
        }
638
}
639

640
// Check to see if we should migrate any assets from this account.
641
func (s *Server) checkJetStreamMigrate(remote *leafNodeCfg) {
5,401✔
642
        s.mu.RLock()
5,401✔
643
        accName, shouldMigrate := remote.LocalAccount, remote.JetStreamClusterMigrate
5,401✔
644
        s.mu.RUnlock()
5,401✔
645

5,401✔
646
        if !shouldMigrate {
10,733✔
647
                return
5,332✔
648
        }
5,332✔
649

650
        acc, err := s.LookupAccount(accName)
69✔
651
        if err != nil {
69✔
652
                s.Warnf("Error looking up account [%s] checking for JetStream migration on a leafnode", accName)
×
653
                return
×
654
        }
×
655

656
        acc.jscmMu.Lock()
69✔
657
        defer acc.jscmMu.Unlock()
69✔
658

69✔
659
        // Walk all streams looking for any clustered stream, skip otherwise.
69✔
660
        // If we are the leader force stepdown.
69✔
661
        for _, mset := range acc.streams() {
104✔
662
                node := mset.raftNode()
35✔
663
                if node == nil {
35✔
664
                        // Not R>1
×
665
                        continue
×
666
                }
667
                // Collect any consumers
668
                for _, o := range mset.getConsumers() {
58✔
669
                        if n := o.raftNode(); n != nil {
46✔
670
                                n.StepDown()
23✔
671
                                // Ensure we can not become a leader while in this state.
23✔
672
                                n.SetObserver(true)
23✔
673
                        }
23✔
674
                }
675
                // Stepdown if this stream was leader.
676
                node.StepDown()
35✔
677
                // Ensure we can not become a leader while in this state.
35✔
678
                node.SetObserver(true)
35✔
679
        }
680
}
681

682
// Helper for checking.
683
func (s *Server) isLeafConnectDisabled() bool {
6,913✔
684
        s.mu.RLock()
6,913✔
685
        defer s.mu.RUnlock()
6,913✔
686
        return s.leafDisableConnect
6,913✔
687
}
6,913✔
688

689
// Save off the tlsName for when we use TLS and mix hostnames and IPs. IPs usually
690
// come from the server we connect to.
691
//
692
// We used to save the name only if there was a TLSConfig or scheme equal to "tls".
693
// However, this was causing failures for users that did not set the scheme (and
694
// their remote connections did not have a tls{} block).
695
// We now save the host name regardless in case the remote returns an INFO indicating
696
// that TLS is required.
697
func (cfg *leafNodeCfg) saveTLSHostname(u *url.URL) {
2,352✔
698
        if cfg.tlsName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
2,372✔
699
                cfg.tlsName = u.Hostname()
20✔
700
        }
20✔
701
}
702

703
// Save off the username/password for when we connect using a bare URL
704
// that we get from the INFO protocol.
705
func (cfg *leafNodeCfg) saveUserPassword(u *url.URL) {
1,729✔
706
        if cfg.username == _EMPTY_ && u.User != nil {
2,002✔
707
                cfg.username = u.User.Username()
273✔
708
                cfg.password, _ = u.User.Password()
273✔
709
        }
273✔
710
}
711

712
// This starts the leafnode accept loop in a go routine, unless it
713
// is detected that the server has already been shutdown.
714
func (s *Server) startLeafNodeAcceptLoop() {
3,437✔
715
        // Snapshot server options.
3,437✔
716
        opts := s.getOpts()
3,437✔
717

3,437✔
718
        port := opts.LeafNode.Port
3,437✔
719
        if port == -1 {
6,697✔
720
                port = 0
3,260✔
721
        }
3,260✔
722

723
        if s.isShuttingDown() {
3,438✔
724
                return
1✔
725
        }
1✔
726

727
        s.mu.Lock()
3,436✔
728
        hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
3,436✔
729
        l, e := natsListen("tcp", hp)
3,436✔
730
        s.leafNodeListenerErr = e
3,436✔
731
        if e != nil {
3,436✔
732
                s.mu.Unlock()
×
733
                s.Fatalf("Error listening on leafnode port: %d - %v", opts.LeafNode.Port, e)
×
734
                return
×
735
        }
×
736

737
        s.Noticef("Listening for leafnode connections on %s",
3,436✔
738
                net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
3,436✔
739

3,436✔
740
        tlsRequired := opts.LeafNode.TLSConfig != nil
3,436✔
741
        tlsVerify := tlsRequired && opts.LeafNode.TLSConfig.ClientAuth == tls.RequireAndVerifyClientCert
3,436✔
742
        // Do not set compression in this Info object, it would possibly cause
3,436✔
743
        // issues when sending asynchronous INFO to the remote.
3,436✔
744
        info := Info{
3,436✔
745
                ID:            s.info.ID,
3,436✔
746
                Name:          s.info.Name,
3,436✔
747
                Version:       s.info.Version,
3,436✔
748
                GitCommit:     gitCommit,
3,436✔
749
                GoVersion:     runtime.Version(),
3,436✔
750
                AuthRequired:  true,
3,436✔
751
                TLSRequired:   tlsRequired,
3,436✔
752
                TLSVerify:     tlsVerify,
3,436✔
753
                MaxPayload:    s.info.MaxPayload, // TODO(dlc) - Allow override?
3,436✔
754
                Headers:       s.supportsHeaders(),
3,436✔
755
                JetStream:     opts.JetStream,
3,436✔
756
                Domain:        opts.JetStreamDomain,
3,436✔
757
                Proto:         s.getServerProto(),
3,436✔
758
                InfoOnConnect: true,
3,436✔
759
                JSApiLevel:    JSApiLevel,
3,436✔
760
        }
3,436✔
761
        // If we have selected a random port...
3,436✔
762
        if port == 0 {
6,695✔
763
                // Write resolved port back to options.
3,259✔
764
                opts.LeafNode.Port = l.Addr().(*net.TCPAddr).Port
3,259✔
765
        }
3,259✔
766

767
        s.leafNodeInfo = info
3,436✔
768
        // Possibly override Host/Port and set IP based on Cluster.Advertise
3,436✔
769
        if err := s.setLeafNodeInfoHostPortAndIP(); err != nil {
3,436✔
770
                s.Fatalf("Error setting leafnode INFO with LeafNode.Advertise value of %s, err=%v", opts.LeafNode.Advertise, err)
×
771
                l.Close()
×
772
                s.mu.Unlock()
×
773
                return
×
774
        }
×
775
        s.leafURLsMap[s.leafNodeInfo.IP]++
3,436✔
776
        s.generateLeafNodeInfoJSON()
3,436✔
777

3,436✔
778
        // Setup state that can enable shutdown
3,436✔
779
        s.leafNodeListener = l
3,436✔
780

3,436✔
781
        // As of now, a server that does not have remotes configured would
3,436✔
782
        // never solicit a connection, so we should not have to warn if
3,436✔
783
        // InsecureSkipVerify is set in main LeafNodes config (since
3,436✔
784
        // this TLS setting matters only when soliciting a connection).
3,436✔
785
        // Still, warn if insecure is set in any of LeafNode block.
3,436✔
786
        // We need to check remotes, even if tls is not required on accept.
3,436✔
787
        warn := tlsRequired && opts.LeafNode.TLSConfig.InsecureSkipVerify
3,436✔
788
        if !warn {
6,868✔
789
                for _, r := range opts.LeafNode.Remotes {
3,618✔
790
                        if r.TLSConfig != nil && r.TLSConfig.InsecureSkipVerify {
187✔
791
                                warn = true
1✔
792
                                break
1✔
793
                        }
794
                }
795
        }
796
        if warn {
3,441✔
797
                s.Warnf(leafnodeTLSInsecureWarning)
5✔
798
        }
5✔
799
        go s.acceptConnections(l, "Leafnode", func(conn net.Conn) { s.createLeafNode(conn, nil, nil, nil) }, nil)
4,248✔
800
        s.mu.Unlock()
3,436✔
801
}
802

803
// RegEx to match a creds file with user JWT and Seed.
804
var credsRe = regexp.MustCompile(`\s*(?:(?:[-]{3,}.*[-]{3,}\r?\n)([\w\-.=]+)(?:\r?\n[-]{3,}.*[-]{3,}(\r?\n|\z)))`)
805

806
// clusterName is provided as argument to avoid lock ordering issues with the locked client c
807
// Lock should be held entering here.
808
func (c *client) sendLeafConnect(clusterName string, headers bool) error {
655✔
809
        // We support basic user/pass and operator based user JWT with signatures.
655✔
810
        cinfo := leafConnectInfo{
655✔
811
                Version:       VERSION,
655✔
812
                ID:            c.srv.info.ID,
655✔
813
                Domain:        c.srv.info.Domain,
655✔
814
                Name:          c.srv.info.Name,
655✔
815
                Hub:           c.leaf.remote.Hub,
655✔
816
                Cluster:       clusterName,
655✔
817
                Headers:       headers,
655✔
818
                JetStream:     c.acc.jetStreamConfigured(),
655✔
819
                DenyPub:       c.leaf.remote.DenyImports,
655✔
820
                Compression:   c.leaf.compression,
655✔
821
                RemoteAccount: c.acc.GetName(),
655✔
822
                Proto:         c.srv.getServerProto(),
655✔
823
        }
655✔
824

655✔
825
        // If a signature callback is specified, this takes precedence over anything else.
655✔
826
        if cb := c.leaf.remote.SignatureCB; cb != nil {
660✔
827
                nonce := c.nonce
5✔
828
                c.mu.Unlock()
5✔
829
                jwt, sigraw, err := cb(nonce)
5✔
830
                c.mu.Lock()
5✔
831
                if err == nil && c.isClosed() {
6✔
832
                        err = ErrConnectionClosed
1✔
833
                }
1✔
834
                if err != nil {
7✔
835
                        c.Errorf("Error signing the nonce: %v", err)
2✔
836
                        return err
2✔
837
                }
2✔
838
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
3✔
839
                cinfo.JWT, cinfo.Sig = jwt, sig
3✔
840

841
        } else if creds := c.leaf.remote.Credentials; creds != _EMPTY_ {
704✔
842
                // Check for credentials first, that will take precedence..
54✔
843
                c.Debugf("Authenticating with credentials file %q", c.leaf.remote.Credentials)
54✔
844
                contents, err := os.ReadFile(creds)
54✔
845
                if err != nil {
54✔
846
                        c.Errorf("%v", err)
×
847
                        return err
×
848
                }
×
849
                defer wipeSlice(contents)
54✔
850
                items := credsRe.FindAllSubmatch(contents, -1)
54✔
851
                if len(items) < 2 {
54✔
852
                        c.Errorf("Credentials file malformed")
×
853
                        return err
×
854
                }
×
855
                // First result should be the user JWT.
856
                // We copy here so that the file containing the seed will be wiped appropriately.
857
                raw := items[0][1]
54✔
858
                tmp := make([]byte, len(raw))
54✔
859
                copy(tmp, raw)
54✔
860
                // Seed is second item.
54✔
861
                kp, err := nkeys.FromSeed(items[1][1])
54✔
862
                if err != nil {
54✔
863
                        c.Errorf("Credentials file has malformed seed")
×
864
                        return err
×
865
                }
×
866
                // Wipe our key on exit.
867
                defer kp.Wipe()
54✔
868

54✔
869
                sigraw, _ := kp.Sign(c.nonce)
54✔
870
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
54✔
871
                cinfo.JWT = bytesToString(tmp)
54✔
872
                cinfo.Sig = sig
54✔
873
        } else if nkey := c.leaf.remote.Nkey; nkey != _EMPTY_ {
601✔
874
                kp, err := nkeys.FromSeed([]byte(nkey))
5✔
875
                if err != nil {
5✔
876
                        c.Errorf("Remote nkey has malformed seed")
×
877
                        return err
×
878
                }
×
879
                // Wipe our key on exit.
880
                defer kp.Wipe()
5✔
881
                sigraw, _ := kp.Sign(c.nonce)
5✔
882
                sig := base64.RawURLEncoding.EncodeToString(sigraw)
5✔
883
                pkey, _ := kp.PublicKey()
5✔
884
                cinfo.Nkey = pkey
5✔
885
                cinfo.Sig = sig
5✔
886
        }
887
        // In addition, and this is to allow auth callout, set user/password or
888
        // token if applicable.
889
        if userInfo := c.leaf.remote.curURL.User; userInfo != nil {
948✔
890
                // For backward compatibility, if only username is provided, set both
295✔
891
                // Token and User, not just Token.
295✔
892
                cinfo.User = userInfo.Username()
295✔
893
                var ok bool
295✔
894
                cinfo.Pass, ok = userInfo.Password()
295✔
895
                if !ok {
301✔
896
                        cinfo.Token = cinfo.User
6✔
897
                }
6✔
898
        } else if c.leaf.remote.username != _EMPTY_ {
365✔
899
                cinfo.User = c.leaf.remote.username
7✔
900
                cinfo.Pass = c.leaf.remote.password
7✔
901
        }
7✔
902
        b, err := json.Marshal(cinfo)
653✔
903
        if err != nil {
653✔
904
                c.Errorf("Error marshaling CONNECT to remote leafnode: %v\n", err)
×
905
                return err
×
906
        }
×
907
        // Although this call is made before the writeLoop is created,
908
        // we don't really need to send in place. The protocol will be
909
        // sent out by the writeLoop.
910
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
653✔
911
        return nil
653✔
912
}
913

914
// Makes a deep copy of the LeafNode Info structure.
915
// The server lock is held on entry.
916
func (s *Server) copyLeafNodeInfo() *Info {
2,622✔
917
        clone := s.leafNodeInfo
2,622✔
918
        // Copy the array of urls.
2,622✔
919
        if len(s.leafNodeInfo.LeafNodeURLs) > 0 {
4,754✔
920
                clone.LeafNodeURLs = append([]string(nil), s.leafNodeInfo.LeafNodeURLs...)
2,132✔
921
        }
2,132✔
922
        return &clone
2,622✔
923
}
924

925
// Adds a LeafNode URL that we get when a route connects to the Info structure.
926
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
927
// Returns a boolean indicating if the URL was added or not.
928
// Server lock is held on entry
929
func (s *Server) addLeafNodeURL(urlStr string) bool {
6,841✔
930
        if s.leafURLsMap.addUrl(urlStr) {
13,677✔
931
                s.generateLeafNodeInfoJSON()
6,836✔
932
                return true
6,836✔
933
        }
6,836✔
934
        return false
5✔
935
}
936

937
// Removes a LeafNode URL of the route that is disconnecting from the Info structure.
938
// Regenerates the JSON byte array so that it can be sent to LeafNode connections.
939
// Returns a boolean indicating if the URL was removed or not.
940
// Server lock is held on entry.
941
func (s *Server) removeLeafNodeURL(urlStr string) bool {
6,841✔
942
        // Don't need to do this if we are removing the route connection because
6,841✔
943
        // we are shuting down...
6,841✔
944
        if s.isShuttingDown() {
10,431✔
945
                return false
3,590✔
946
        }
3,590✔
947
        if s.leafURLsMap.removeUrl(urlStr) {
6,498✔
948
                s.generateLeafNodeInfoJSON()
3,247✔
949
                return true
3,247✔
950
        }
3,247✔
951
        return false
4✔
952
}
953

954
// Server lock is held on entry
955
func (s *Server) generateLeafNodeInfoJSON() {
13,519✔
956
        s.leafNodeInfo.Cluster = s.cachedClusterName()
13,519✔
957
        s.leafNodeInfo.LeafNodeURLs = s.leafURLsMap.getAsStringSlice()
13,519✔
958
        s.leafNodeInfo.WSConnectURLs = s.websocket.connectURLsMap.getAsStringSlice()
13,519✔
959
        s.leafNodeInfoJSON = generateInfoJSON(&s.leafNodeInfo)
13,519✔
960
}
13,519✔
961

962
// Sends an async INFO protocol so that the connected servers can update
963
// their list of LeafNode urls.
964
func (s *Server) sendAsyncLeafNodeInfo() {
10,083✔
965
        for _, c := range s.leafs {
10,181✔
966
                c.mu.Lock()
98✔
967
                c.enqueueProto(s.leafNodeInfoJSON)
98✔
968
                c.mu.Unlock()
98✔
969
        }
98✔
970
}
971

972
// Called when an inbound leafnode connection is accepted or we create one for a solicited leafnode.
973
func (s *Server) createLeafNode(conn net.Conn, rURL *url.URL, remote *leafNodeCfg, ws *websocket) *client {
1,609✔
974
        // Snapshot server options.
1,609✔
975
        opts := s.getOpts()
1,609✔
976

1,609✔
977
        maxPay := int32(opts.MaxPayload)
1,609✔
978
        maxSubs := int32(opts.MaxSubs)
1,609✔
979
        // For system, maxSubs of 0 means unlimited, so re-adjust here.
1,609✔
980
        if maxSubs == 0 {
3,217✔
981
                maxSubs = -1
1,608✔
982
        }
1,608✔
983
        now := time.Now().UTC()
1,609✔
984

1,609✔
985
        c := &client{srv: s, nc: conn, kind: LEAF, opts: defaultOpts, mpay: maxPay, msubs: maxSubs, start: now, last: now}
1,609✔
986
        // Do not update the smap here, we need to do it in initLeafNodeSmapAndSendSubs
1,609✔
987
        c.leaf = &leaf{}
1,609✔
988

1,609✔
989
        // For accepted LN connections, ws will be != nil if it was accepted
1,609✔
990
        // through the Websocket port.
1,609✔
991
        c.ws = ws
1,609✔
992

1,609✔
993
        // For remote, check if the scheme starts with "ws", if so, we will initiate
1,609✔
994
        // a remote Leaf Node connection as a websocket connection.
1,609✔
995
        if remote != nil && rURL != nil && isWSURL(rURL) {
1,652✔
996
                remote.RLock()
43✔
997
                c.ws = &websocket{compress: remote.Websocket.Compression, maskwrite: !remote.Websocket.NoMasking}
43✔
998
                remote.RUnlock()
43✔
999
        }
43✔
1000

1001
        // Determines if we are soliciting the connection or not.
1002
        var solicited bool
1,609✔
1003
        var acc *Account
1,609✔
1004
        var remoteSuffix string
1,609✔
1005
        if remote != nil {
2,379✔
1006
                // For now, if lookup fails, we will constantly try
770✔
1007
                // to recreate this LN connection.
770✔
1008
                lacc := remote.LocalAccount
770✔
1009
                var err error
770✔
1010
                acc, err = s.LookupAccount(lacc)
770✔
1011
                if err != nil {
772✔
1012
                        // An account not existing is something that can happen with nats/http account resolver and the account
2✔
1013
                        // has not yet been pushed, or the request failed for other reasons.
2✔
1014
                        // remote needs to be set or retry won't happen
2✔
1015
                        c.leaf.remote = remote
2✔
1016
                        c.closeConnection(MissingAccount)
2✔
1017
                        s.Errorf("Unable to lookup account %s for solicited leafnode connection: %v", lacc, err)
2✔
1018
                        return nil
2✔
1019
                }
2✔
1020
                remoteSuffix = fmt.Sprintf(" for account: %s", acc.traceLabel())
768✔
1021
        }
1022

1023
        c.mu.Lock()
1,607✔
1024
        c.initClient()
1,607✔
1025
        c.Noticef("Leafnode connection created%s %s", remoteSuffix, c.opts.Name)
1,607✔
1026

1,607✔
1027
        var (
1,607✔
1028
                tlsFirst         bool
1,607✔
1029
                tlsFirstFallback time.Duration
1,607✔
1030
                infoTimeout      time.Duration
1,607✔
1031
        )
1,607✔
1032
        if remote != nil {
2,375✔
1033
                solicited = true
768✔
1034
                remote.Lock()
768✔
1035
                c.leaf.remote = remote
768✔
1036
                c.setPermissions(remote.perms)
768✔
1037
                if !c.leaf.remote.Hub {
1,530✔
1038
                        c.leaf.isSpoke = true
762✔
1039
                }
762✔
1040
                tlsFirst = remote.TLSHandshakeFirst
768✔
1041
                infoTimeout = remote.FirstInfoTimeout
768✔
1042
                remote.Unlock()
768✔
1043
                c.acc = acc
768✔
1044
        } else {
839✔
1045
                c.flags.set(expectConnect)
839✔
1046
                if ws != nil {
866✔
1047
                        c.Debugf("Leafnode compression=%v", c.ws.compress)
27✔
1048
                }
27✔
1049
                tlsFirst = opts.LeafNode.TLSHandshakeFirst
839✔
1050
                if f := opts.LeafNode.TLSHandshakeFirstFallback; f > 0 {
840✔
1051
                        tlsFirstFallback = f
1✔
1052
                }
1✔
1053
        }
1054
        c.mu.Unlock()
1,607✔
1055

1,607✔
1056
        var nonce [nonceLen]byte
1,607✔
1057
        var info *Info
1,607✔
1058

1,607✔
1059
        // Grab this before the client lock below.
1,607✔
1060
        if !solicited {
2,446✔
1061
                // Grab server variables
839✔
1062
                s.mu.Lock()
839✔
1063
                info = s.copyLeafNodeInfo()
839✔
1064
                // For tests that want to simulate old servers, do not set the compression
839✔
1065
                // on the INFO protocol if configured with CompressionNotSupported.
839✔
1066
                if cm := opts.LeafNode.Compression.Mode; cm != CompressionNotSupported {
1,677✔
1067
                        info.Compression = cm
838✔
1068
                }
838✔
1069
                // We always send a nonce for LEAF connections. Do not change that without
1070
                // taking into account presence of proxy trusted keys.
1071
                s.generateNonce(nonce[:])
839✔
1072
                s.mu.Unlock()
839✔
1073
        }
1074

1075
        // Grab lock
1076
        c.mu.Lock()
1,607✔
1077

1,607✔
1078
        var preBuf []byte
1,607✔
1079
        if solicited {
2,375✔
1080
                // For websocket connection, we need to send an HTTP request,
768✔
1081
                // and get the response before starting the readLoop to get
768✔
1082
                // the INFO, etc..
768✔
1083
                if c.isWebsocket() {
811✔
1084
                        var err error
43✔
1085
                        var closeReason ClosedState
43✔
1086

43✔
1087
                        preBuf, closeReason, err = c.leafNodeSolicitWSConnection(opts, rURL, remote)
43✔
1088
                        if err != nil {
59✔
1089
                                c.Errorf("Error soliciting websocket connection: %v", err)
16✔
1090
                                c.mu.Unlock()
16✔
1091
                                if closeReason != 0 {
28✔
1092
                                        c.closeConnection(closeReason)
12✔
1093
                                }
12✔
1094
                                return nil
16✔
1095
                        }
1096
                } else {
725✔
1097
                        // If configured to do TLS handshake first
725✔
1098
                        if tlsFirst {
729✔
1099
                                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
5✔
1100
                                        c.mu.Unlock()
1✔
1101
                                        return nil
1✔
1102
                                }
1✔
1103
                        }
1104
                        // We need to wait for the info, but not for too long.
1105
                        c.nc.SetReadDeadline(time.Now().Add(infoTimeout))
724✔
1106
                }
1107

1108
                // We will process the INFO from the readloop and finish by
1109
                // sending the CONNECT and finish registration later.
1110
        } else {
839✔
1111
                // Send our info to the other side.
839✔
1112
                // Remember the nonce we sent here for signatures, etc.
839✔
1113
                c.nonce = make([]byte, nonceLen)
839✔
1114
                copy(c.nonce, nonce[:])
839✔
1115
                info.Nonce = bytesToString(c.nonce)
839✔
1116
                info.CID = c.cid
839✔
1117
                proto := generateInfoJSON(info)
839✔
1118

839✔
1119
                var pre []byte
839✔
1120
                // We need first to check for "TLS First" fallback delay.
839✔
1121
                if tlsFirstFallback > 0 {
840✔
1122
                        // We wait and see if we are getting any data. Since we did not send
1✔
1123
                        // the INFO protocol yet, only clients that use TLS first should be
1✔
1124
                        // sending data (the TLS handshake). We don't really check the content:
1✔
1125
                        // if it is a rogue agent and not an actual client performing the
1✔
1126
                        // TLS handshake, the error will be detected when performing the
1✔
1127
                        // handshake on our side.
1✔
1128
                        pre = make([]byte, 4)
1✔
1129
                        c.nc.SetReadDeadline(time.Now().Add(tlsFirstFallback))
1✔
1130
                        n, _ := io.ReadFull(c.nc, pre[:])
1✔
1131
                        c.nc.SetReadDeadline(time.Time{})
1✔
1132
                        // If we get any data (regardless of possible timeout), we will proceed
1✔
1133
                        // with the TLS handshake.
1✔
1134
                        if n > 0 {
1✔
1135
                                pre = pre[:n]
×
1136
                        } else {
1✔
1137
                                // We did not get anything so we will send the INFO protocol.
1✔
1138
                                pre = nil
1✔
1139
                                // Set the boolean to false for the rest of the function.
1✔
1140
                                tlsFirst = false
1✔
1141
                        }
1✔
1142
                }
1143

1144
                if !tlsFirst {
1,673✔
1145
                        // We have to send from this go routine because we may
834✔
1146
                        // have to block for TLS handshake before we start our
834✔
1147
                        // writeLoop go routine. The other side needs to receive
834✔
1148
                        // this before it can initiate the TLS handshake..
834✔
1149
                        c.sendProtoNow(proto)
834✔
1150

834✔
1151
                        // The above call could have marked the connection as closed (due to TCP error).
834✔
1152
                        if c.isClosed() {
834✔
1153
                                c.mu.Unlock()
×
1154
                                c.closeConnection(WriteError)
×
1155
                                return nil
×
1156
                        }
×
1157
                }
1158

1159
                // Check to see if we need to spin up TLS.
1160
                if !c.isWebsocket() && info.TLSRequired {
908✔
1161
                        // If we have a prebuffer create a multi-reader.
69✔
1162
                        if len(pre) > 0 {
69✔
1163
                                c.nc = &tlsMixConn{c.nc, bytes.NewBuffer(pre)}
×
1164
                        }
×
1165
                        // Perform server-side TLS handshake.
1166
                        if err := c.doTLSServerHandshake(tlsHandshakeLeaf, opts.LeafNode.TLSConfig, opts.LeafNode.TLSTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
110✔
1167
                                c.mu.Unlock()
41✔
1168
                                return nil
41✔
1169
                        }
41✔
1170
                }
1171

1172
                // If the user wants the TLS handshake to occur first, now that it is
1173
                // done, send the INFO protocol.
1174
                if tlsFirst {
801✔
1175
                        c.flags.set(didTLSFirst)
3✔
1176
                        c.sendProtoNow(proto)
3✔
1177
                        if c.isClosed() {
3✔
1178
                                c.mu.Unlock()
×
1179
                                c.closeConnection(WriteError)
×
1180
                                return nil
×
1181
                        }
×
1182
                }
1183

1184
                // Leaf nodes will always require a CONNECT to let us know
1185
                // when we are properly bound to an account.
1186
                //
1187
                // If compression is configured, we can't set the authTimer here because
1188
                // it would cause the parser to fail any incoming protocol that is not a
1189
                // CONNECT (and we need to exchange INFO protocols for compression
1190
                // negotiation). So instead, use the ping timer until we are done with
1191
                // negotiation and can set the auth timer.
1192
                timeout := secondsToDuration(opts.LeafNode.AuthTimeout)
798✔
1193
                if needsCompression(opts.LeafNode.Compression.Mode) {
1,377✔
1194
                        c.ping.tmr = time.AfterFunc(timeout, func() {
587✔
1195
                                c.authTimeout()
8✔
1196
                        })
8✔
1197
                } else {
219✔
1198
                        c.setAuthTimer(timeout)
219✔
1199
                }
219✔
1200
        }
1201

1202
        // Keep track in case server is shutdown before we can successfully register.
1203
        if !s.addToTempClients(c.cid, c) {
1,550✔
1204
                c.mu.Unlock()
1✔
1205
                c.setNoReconnect()
1✔
1206
                c.closeConnection(ServerShutdown)
1✔
1207
                return nil
1✔
1208
        }
1✔
1209

1210
        // Spin up the read loop.
1211
        s.startGoRoutine(func() { c.readLoop(preBuf) })
3,096✔
1212

1213
        // We will spin the write loop for solicited connections only
1214
        // when processing the INFO and after switching to TLS if needed.
1215
        if !solicited {
2,346✔
1216
                s.startGoRoutine(func() { c.writeLoop() })
1,596✔
1217
        }
1218

1219
        c.mu.Unlock()
1,548✔
1220

1,548✔
1221
        return c
1,548✔
1222
}
1223

1224
// Will perform the client-side TLS handshake if needed. Assumes that this
1225
// is called by the solicit side (remote will be non nil). Returns `true`
1226
// if TLS is required, `false` otherwise.
1227
// Lock held on entry.
1228
func (c *client) leafClientHandshakeIfNeeded(remote *leafNodeCfg, opts *Options) (bool, error) {
1,861✔
1229
        // Check if TLS is required and gather TLS config variables.
1,861✔
1230
        tlsRequired, tlsConfig, tlsName, tlsTimeout := c.leafNodeGetTLSConfigForSolicit(remote)
1,861✔
1231
        if !tlsRequired {
3,646✔
1232
                return false, nil
1,785✔
1233
        }
1,785✔
1234

1235
        // If TLS required, peform handshake.
1236
        // Get the URL that was used to connect to the remote server.
1237
        rURL := remote.getCurrentURL()
76✔
1238

76✔
1239
        // Perform the client-side TLS handshake.
76✔
1240
        if resetTLSName, err := c.doTLSClientHandshake(tlsHandshakeLeaf, rURL, tlsConfig, tlsName, tlsTimeout, opts.LeafNode.TLSPinnedCerts); err != nil {
110✔
1241
                // Check if we need to reset the remote's TLS name.
34✔
1242
                if resetTLSName {
34✔
1243
                        remote.Lock()
×
1244
                        remote.tlsName = _EMPTY_
×
1245
                        remote.Unlock()
×
1246
                }
×
1247
                return false, err
34✔
1248
        }
1249
        return true, nil
42✔
1250
}
1251

1252
func (c *client) processLeafnodeInfo(info *Info) {
2,575✔
1253
        c.mu.Lock()
2,575✔
1254
        if c.leaf == nil || c.isClosed() {
2,575✔
1255
                c.mu.Unlock()
×
1256
                return
×
1257
        }
×
1258
        s := c.srv
2,575✔
1259
        opts := s.getOpts()
2,575✔
1260
        remote := c.leaf.remote
2,575✔
1261
        didSolicit := remote != nil
2,575✔
1262
        firstINFO := !c.flags.isSet(infoReceived)
2,575✔
1263

2,575✔
1264
        // In case of websocket, the TLS handshake has been already done.
2,575✔
1265
        // So check only for non websocket connections and for configurations
2,575✔
1266
        // where the TLS Handshake was not done first.
2,575✔
1267
        if didSolicit && !c.flags.isSet(handshakeComplete) && !c.isWebsocket() && !remote.TLSHandshakeFirst {
4,389✔
1268
                // If the server requires TLS, we need to set this in the remote
1,814✔
1269
                // otherwise if there is no TLS configuration block for the remote,
1,814✔
1270
                // the solicit side will not attempt to perform the TLS handshake.
1,814✔
1271
                if firstINFO && info.TLSRequired {
1,874✔
1272
                        remote.TLS = true
60✔
1273
                }
60✔
1274
                if _, err := c.leafClientHandshakeIfNeeded(remote, opts); err != nil {
1,843✔
1275
                        c.mu.Unlock()
29✔
1276
                        return
29✔
1277
                }
29✔
1278
        }
1279

1280
        // Check for compression, unless already done.
1281
        if firstINFO && !c.flags.isSet(compressionNegotiated) {
3,809✔
1282
                // Prevent from getting back here.
1,263✔
1283
                c.flags.set(compressionNegotiated)
1,263✔
1284

1,263✔
1285
                var co *CompressionOpts
1,263✔
1286
                if !didSolicit {
1,816✔
1287
                        co = &opts.LeafNode.Compression
553✔
1288
                } else {
1,263✔
1289
                        co = &remote.Compression
710✔
1290
                }
710✔
1291
                if needsCompression(co.Mode) {
2,515✔
1292
                        // Release client lock since following function will need server lock.
1,252✔
1293
                        c.mu.Unlock()
1,252✔
1294
                        compress, err := s.negotiateLeafCompression(c, didSolicit, info.Compression, co)
1,252✔
1295
                        if err != nil {
1,252✔
1296
                                c.sendErrAndErr(err.Error())
×
1297
                                c.closeConnection(ProtocolViolation)
×
1298
                                return
×
1299
                        }
×
1300
                        if compress {
2,375✔
1301
                                // Done for now, will get back another INFO protocol...
1,123✔
1302
                                return
1,123✔
1303
                        }
1,123✔
1304
                        // No compression because one side does not want/can't, so proceed.
1305
                        c.mu.Lock()
129✔
1306
                        // Check that the connection did not close if the lock was released.
129✔
1307
                        if c.isClosed() {
129✔
1308
                                c.mu.Unlock()
×
1309
                                return
×
1310
                        }
×
1311
                } else {
11✔
1312
                        // Coming from an old server, the Compression field would be the empty
11✔
1313
                        // string. For servers that are configured with CompressionNotSupported,
11✔
1314
                        // this makes them behave as old servers.
11✔
1315
                        if info.Compression == _EMPTY_ || co.Mode == CompressionNotSupported {
14✔
1316
                                c.leaf.compression = CompressionNotSupported
3✔
1317
                        } else {
11✔
1318
                                c.leaf.compression = CompressionOff
8✔
1319
                        }
8✔
1320
                }
1321
                // Accepting side does not normally process an INFO protocol during
1322
                // initial connection handshake. So we keep it consistent by returning
1323
                // if we are not soliciting.
1324
                if !didSolicit {
141✔
1325
                        // If we had created the ping timer instead of the auth timer, we will
1✔
1326
                        // clear the ping timer and set the auth timer now that the compression
1✔
1327
                        // negotiation is done.
1✔
1328
                        if info.Compression != _EMPTY_ && c.ping.tmr != nil {
1✔
1329
                                clearTimer(&c.ping.tmr)
×
1330
                                c.setAuthTimer(secondsToDuration(opts.LeafNode.AuthTimeout))
×
1331
                        }
×
1332
                        c.mu.Unlock()
1✔
1333
                        return
1✔
1334
                }
1335
                // Fall through and process the INFO protocol as usual.
1336
        }
1337

1338
        // Note: For now, only the initial INFO has a nonce. We
1339
        // will probably do auto key rotation at some point.
1340
        if firstINFO {
2,171✔
1341
                // Mark that the INFO protocol has been received.
749✔
1342
                c.flags.set(infoReceived)
749✔
1343
                // Prevent connecting to non leafnode port. Need to do this only for
749✔
1344
                // the first INFO, not for async INFO updates...
749✔
1345
                //
749✔
1346
                // Content of INFO sent by the server when accepting a tcp connection.
749✔
1347
                // -------------------------------------------------------------------
749✔
1348
                // Listen Port Of | CID | ClientConnectURLs | LeafNodeURLs | Gateway |
749✔
1349
                // -------------------------------------------------------------------
749✔
1350
                //      CLIENT    |  X* |        X**        |              |         |
749✔
1351
                //      ROUTE     |     |        X**        |      X***    |         |
749✔
1352
                //     GATEWAY    |     |                   |              |    X    |
749✔
1353
                //     LEAFNODE   |  X  |                   |       X      |         |
749✔
1354
                // -------------------------------------------------------------------
749✔
1355
                // *   Not on older servers.
749✔
1356
                // **  Not if "no advertise" is enabled.
749✔
1357
                // *** Not if leafnode's "no advertise" is enabled.
749✔
1358
                //
749✔
1359
                // As seen from above, a solicited LeafNode connection should receive
749✔
1360
                // from the remote server an INFO with CID and LeafNodeURLs. Anything
749✔
1361
                // else should be considered an attempt to connect to a wrong port.
749✔
1362
                if didSolicit && (info.CID == 0 || info.LeafNodeURLs == nil) {
801✔
1363
                        c.mu.Unlock()
52✔
1364
                        c.Errorf(ErrConnectedToWrongPort.Error())
52✔
1365
                        c.closeConnection(WrongPort)
52✔
1366
                        return
52✔
1367
                }
52✔
1368
                // Reject a cluster that contains spaces.
1369
                if info.Cluster != _EMPTY_ && strings.Contains(info.Cluster, " ") {
698✔
1370
                        c.mu.Unlock()
1✔
1371
                        c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1372
                        c.closeConnection(ProtocolViolation)
1✔
1373
                        return
1✔
1374
                }
1✔
1375
                // Capture a nonce here.
1376
                c.nonce = []byte(info.Nonce)
696✔
1377
                if info.TLSRequired && didSolicit {
727✔
1378
                        remote.TLS = true
31✔
1379
                }
31✔
1380
                supportsHeaders := c.srv.supportsHeaders()
696✔
1381
                c.headers = supportsHeaders && info.Headers
696✔
1382

696✔
1383
                // Remember the remote server.
696✔
1384
                // Pre 2.2.0 servers are not sending their server name.
696✔
1385
                // In that case, use info.ID, which, for those servers, matches
696✔
1386
                // the content of the field `Name` in the leafnode CONNECT protocol.
696✔
1387
                if info.Name == _EMPTY_ {
696✔
1388
                        c.leaf.remoteServer = info.ID
×
1389
                } else {
696✔
1390
                        c.leaf.remoteServer = info.Name
696✔
1391
                }
696✔
1392
                c.leaf.remoteDomain = info.Domain
696✔
1393
                c.leaf.remoteCluster = info.Cluster
696✔
1394
                // We send the protocol version in the INFO protocol.
696✔
1395
                // Keep track of it, so we know if this connection supports message
696✔
1396
                // tracing for instance.
696✔
1397
                c.opts.Protocol = info.Proto
696✔
1398
        }
1399

1400
        // For both initial INFO and async INFO protocols, Possibly
1401
        // update our list of remote leafnode URLs we can connect to.
1402
        if didSolicit && (len(info.LeafNodeURLs) > 0 || len(info.WSConnectURLs) > 0) {
2,655✔
1403
                // Consider the incoming array as the most up-to-date
1,286✔
1404
                // representation of the remote cluster's list of URLs.
1,286✔
1405
                c.updateLeafNodeURLs(info)
1,286✔
1406
        }
1,286✔
1407

1408
        // Check to see if we have permissions updates here.
1409
        if info.Import != nil || info.Export != nil {
1,381✔
1410
                perms := &Permissions{
12✔
1411
                        Publish:   info.Export,
12✔
1412
                        Subscribe: info.Import,
12✔
1413
                }
12✔
1414
                // Check if we have local deny clauses that we need to merge.
12✔
1415
                if remote := c.leaf.remote; remote != nil {
24✔
1416
                        if len(remote.DenyExports) > 0 {
13✔
1417
                                if perms.Publish == nil {
1✔
1418
                                        perms.Publish = &SubjectPermission{}
×
1419
                                }
×
1420
                                perms.Publish.Deny = append(perms.Publish.Deny, remote.DenyExports...)
1✔
1421
                        }
1422
                        if len(remote.DenyImports) > 0 {
13✔
1423
                                if perms.Subscribe == nil {
1✔
1424
                                        perms.Subscribe = &SubjectPermission{}
×
1425
                                }
×
1426
                                perms.Subscribe.Deny = append(perms.Subscribe.Deny, remote.DenyImports...)
1✔
1427
                        }
1428
                }
1429
                c.setPermissions(perms)
12✔
1430
        }
1431

1432
        var resumeConnect, checkSyncConsumers bool
1,369✔
1433

1,369✔
1434
        // If this is a remote connection and this is the first INFO protocol,
1,369✔
1435
        // then we need to finish the connect process by sending CONNECT, etc..
1,369✔
1436
        if firstINFO && didSolicit {
2,024✔
1437
                // Clear deadline that was set in createLeafNode while waiting for the INFO.
655✔
1438
                c.nc.SetDeadline(time.Time{})
655✔
1439
                resumeConnect = true
655✔
1440
        } else if !firstINFO && didSolicit {
2,000✔
1441
                c.leaf.remoteAccName = info.RemoteAccount
631✔
1442
                checkSyncConsumers = info.JetStream
631✔
1443
        }
631✔
1444

1445
        // Check if we have the remote account information and if so make sure it's stored.
1446
        if info.RemoteAccount != _EMPTY_ {
1,988✔
1447
                s.leafRemoteAccounts.Store(c.acc.Name, info.RemoteAccount)
619✔
1448
        }
619✔
1449
        c.mu.Unlock()
1,369✔
1450

1,369✔
1451
        finishConnect := info.ConnectInfo
1,369✔
1452
        if resumeConnect && s != nil {
2,024✔
1453
                s.leafNodeResumeConnectProcess(c)
655✔
1454
                if !info.InfoOnConnect {
655✔
1455
                        finishConnect = true
×
1456
                }
×
1457
        }
1458
        if finishConnect {
1,988✔
1459
                s.leafNodeFinishConnectProcess(c)
619✔
1460
        }
619✔
1461

1462
        // If we have JS enabled and so does the other side, we will
1463
        // check to see if we need to kick any internal source or mirror consumers.
1464
        if checkSyncConsumers {
1,630✔
1465
                s.checkInternalSyncConsumers(c.acc, info.Domain)
261✔
1466
        }
261✔
1467
}
1468

1469
func (s *Server) negotiateLeafCompression(c *client, didSolicit bool, infoCompression string, co *CompressionOpts) (bool, error) {
1,252✔
1470
        // Negotiate the appropriate compression mode (or no compression)
1,252✔
1471
        cm, err := selectCompressionMode(co.Mode, infoCompression)
1,252✔
1472
        if err != nil {
1,252✔
1473
                return false, err
×
1474
        }
×
1475
        c.mu.Lock()
1,252✔
1476
        // For "auto" mode, set the initial compression mode based on RTT
1,252✔
1477
        if cm == CompressionS2Auto {
2,336✔
1478
                if c.rttStart.IsZero() {
2,168✔
1479
                        c.rtt = computeRTT(c.start)
1,084✔
1480
                }
1,084✔
1481
                cm = selectS2AutoModeBasedOnRTT(c.rtt, co.RTTThresholds)
1,084✔
1482
        }
1483
        // Keep track of the negotiated compression mode.
1484
        c.leaf.compression = cm
1,252✔
1485
        cid := c.cid
1,252✔
1486
        var nonce string
1,252✔
1487
        if !didSolicit {
1,804✔
1488
                nonce = bytesToString(c.nonce)
552✔
1489
        }
552✔
1490
        c.mu.Unlock()
1,252✔
1491

1,252✔
1492
        if !needsCompression(cm) {
1,381✔
1493
                return false, nil
129✔
1494
        }
129✔
1495

1496
        // If we end-up doing compression...
1497

1498
        // Generate an INFO with the chosen compression mode.
1499
        s.mu.Lock()
1,123✔
1500
        info := s.copyLeafNodeInfo()
1,123✔
1501
        info.Compression, info.CID, info.Nonce = compressionModeForInfoProtocol(co, cm), cid, nonce
1,123✔
1502
        infoProto := generateInfoJSON(info)
1,123✔
1503
        s.mu.Unlock()
1,123✔
1504

1,123✔
1505
        // If we solicited, then send this INFO protocol BEFORE switching
1,123✔
1506
        // to compression writer. However, if we did not, we send it after.
1,123✔
1507
        c.mu.Lock()
1,123✔
1508
        if didSolicit {
1,694✔
1509
                c.enqueueProto(infoProto)
571✔
1510
                // Make sure it is completely flushed (the pending bytes goes to
571✔
1511
                // 0) before proceeding.
571✔
1512
                for c.out.pb > 0 && !c.isClosed() {
1,142✔
1513
                        c.flushOutbound()
571✔
1514
                }
571✔
1515
        }
1516
        // This is to notify the readLoop that it should switch to a
1517
        // (de)compression reader.
1518
        c.in.flags.set(switchToCompression)
1,123✔
1519
        // Create the compress writer before queueing the INFO protocol for
1,123✔
1520
        // a route that did not solicit. It will make sure that that proto
1,123✔
1521
        // is sent with compression on.
1,123✔
1522
        c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
1,123✔
1523
        if !didSolicit {
1,675✔
1524
                c.enqueueProto(infoProto)
552✔
1525
        }
552✔
1526
        c.mu.Unlock()
1,123✔
1527
        return true, nil
1,123✔
1528
}
1529

1530
// When getting a leaf node INFO protocol, use the provided
1531
// array of urls to update the list of possible endpoints.
1532
func (c *client) updateLeafNodeURLs(info *Info) {
1,286✔
1533
        cfg := c.leaf.remote
1,286✔
1534
        cfg.Lock()
1,286✔
1535
        defer cfg.Unlock()
1,286✔
1536

1,286✔
1537
        // We have ensured that if a remote has a WS scheme, then all are.
1,286✔
1538
        // So check if first is WS, then add WS URLs, otherwise, add non WS ones.
1,286✔
1539
        if len(cfg.URLs) > 0 && isWSURL(cfg.URLs[0]) {
1,340✔
1540
                // It does not really matter if we use "ws://" or "wss://" here since
54✔
1541
                // we will have already marked that the remote should use TLS anyway.
54✔
1542
                // But use proper scheme for log statements, etc...
54✔
1543
                proto := wsSchemePrefix
54✔
1544
                if cfg.TLS {
54✔
1545
                        proto = wsSchemePrefixTLS
×
1546
                }
×
1547
                c.doUpdateLNURLs(cfg, proto, info.WSConnectURLs)
54✔
1548
                return
54✔
1549
        }
1550
        c.doUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeURLs)
1,232✔
1551
}
1552

1553
func (c *client) doUpdateLNURLs(cfg *leafNodeCfg, scheme string, URLs []string) {
1,286✔
1554
        cfg.urls = make([]*url.URL, 0, 1+len(URLs))
1,286✔
1555
        // Add the ones we receive in the protocol
1,286✔
1556
        for _, surl := range URLs {
3,605✔
1557
                url, err := url.Parse(fmt.Sprintf("%s://%s", scheme, surl))
2,319✔
1558
                if err != nil {
2,319✔
1559
                        // As per below, the URLs we receive should not have contained URL info, so this should be safe to log.
×
1560
                        c.Errorf("Error parsing url %q: %v", surl, err)
×
1561
                        continue
×
1562
                }
1563
                // Do not add if it's the same as what we already have configured.
1564
                var dup bool
2,319✔
1565
                for _, u := range cfg.URLs {
5,896✔
1566
                        // URLs that we receive never have user info, but the
3,577✔
1567
                        // ones that were configured may have. Simply compare
3,577✔
1568
                        // host and port to decide if they are equal or not.
3,577✔
1569
                        if url.Host == u.Host && url.Port() == u.Port() {
5,273✔
1570
                                dup = true
1,696✔
1571
                                break
1,696✔
1572
                        }
1573
                }
1574
                if !dup {
2,942✔
1575
                        cfg.urls = append(cfg.urls, url)
623✔
1576
                        cfg.saveTLSHostname(url)
623✔
1577
                }
623✔
1578
        }
1579
        // Add the configured one
1580
        cfg.urls = append(cfg.urls, cfg.URLs...)
1,286✔
1581
}
1582

1583
// Similar to setInfoHostPortAndGenerateJSON, but for leafNodeInfo.
1584
func (s *Server) setLeafNodeInfoHostPortAndIP() error {
3,436✔
1585
        opts := s.getOpts()
3,436✔
1586
        if opts.LeafNode.Advertise != _EMPTY_ {
3,447✔
1587
                advHost, advPort, err := parseHostPort(opts.LeafNode.Advertise, opts.LeafNode.Port)
11✔
1588
                if err != nil {
11✔
1589
                        return err
×
1590
                }
×
1591
                s.leafNodeInfo.Host = advHost
11✔
1592
                s.leafNodeInfo.Port = advPort
11✔
1593
        } else {
3,425✔
1594
                s.leafNodeInfo.Host = opts.LeafNode.Host
3,425✔
1595
                s.leafNodeInfo.Port = opts.LeafNode.Port
3,425✔
1596
                // If the host is "0.0.0.0" or "::" we need to resolve to a public IP.
3,425✔
1597
                // This will return at most 1 IP.
3,425✔
1598
                hostIsIPAny, ips, err := s.getNonLocalIPsIfHostIsIPAny(s.leafNodeInfo.Host, false)
3,425✔
1599
                if err != nil {
3,425✔
1600
                        return err
×
1601
                }
×
1602
                if hostIsIPAny {
3,718✔
1603
                        if len(ips) == 0 {
293✔
1604
                                s.Errorf("Could not find any non-local IP for leafnode's listen specification %q",
×
1605
                                        s.leafNodeInfo.Host)
×
1606
                        } else {
293✔
1607
                                // Take the first from the list...
293✔
1608
                                s.leafNodeInfo.Host = ips[0]
293✔
1609
                        }
293✔
1610
                }
1611
        }
1612
        // Use just host:port for the IP
1613
        s.leafNodeInfo.IP = net.JoinHostPort(s.leafNodeInfo.Host, strconv.Itoa(s.leafNodeInfo.Port))
3,436✔
1614
        if opts.LeafNode.Advertise != _EMPTY_ {
3,447✔
1615
                s.Noticef("Advertise address for leafnode is set to %s", s.leafNodeInfo.IP)
11✔
1616
        }
11✔
1617
        return nil
3,436✔
1618
}
1619

1620
// Add the connection to the map of leaf nodes.
1621
// If `checkForDup` is true (invoked when a leafnode is accepted), then we check
1622
// if a connection already exists for the same server name and account.
1623
// That can happen when the remote is attempting to reconnect while the accepting
1624
// side did not detect the connection as broken yet.
1625
// But it can also happen when there is a misconfiguration and the remote is
1626
// creating two (or more) connections that bind to the same account on the accept
1627
// side.
1628
// When a duplicate is found, the new connection is accepted and the old is closed
1629
// (this solves the stale connection situation). An error is returned to help the
1630
// remote detect the misconfiguration when the duplicate is the result of that
1631
// misconfiguration.
1632
func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, checkForDup bool) {
1,276✔
1633
        var accName string
1,276✔
1634
        c.mu.Lock()
1,276✔
1635
        cid := c.cid
1,276✔
1636
        acc := c.acc
1,276✔
1637
        if acc != nil {
2,552✔
1638
                accName = acc.Name
1,276✔
1639
        }
1,276✔
1640
        myRemoteDomain := c.leaf.remoteDomain
1,276✔
1641
        mySrvName := c.leaf.remoteServer
1,276✔
1642
        remoteAccName := c.leaf.remoteAccName
1,276✔
1643
        myClustName := c.leaf.remoteCluster
1,276✔
1644
        solicited := c.leaf.remote != nil
1,276✔
1645
        c.mu.Unlock()
1,276✔
1646

1,276✔
1647
        var old *client
1,276✔
1648
        s.mu.Lock()
1,276✔
1649
        // We check for empty because in some test we may send empty CONNECT{}
1,276✔
1650
        if checkForDup && srvName != _EMPTY_ {
1,895✔
1651
                for _, ol := range s.leafs {
993✔
1652
                        ol.mu.Lock()
374✔
1653
                        // We care here only about non solicited Leafnode. This function
374✔
1654
                        // is more about replacing stale connections than detecting loops.
374✔
1655
                        // We have code for the loop detection elsewhere, which also delays
374✔
1656
                        // attempt to reconnect.
374✔
1657
                        if !ol.isSolicitedLeafNode() && ol.leaf.remoteServer == srvName &&
374✔
1658
                                ol.leaf.remoteCluster == clusterName && ol.acc.Name == accName &&
374✔
1659
                                remoteAccName != _EMPTY_ && ol.leaf.remoteAccName == remoteAccName {
376✔
1660
                                old = ol
2✔
1661
                        }
2✔
1662
                        ol.mu.Unlock()
374✔
1663
                        if old != nil {
376✔
1664
                                break
2✔
1665
                        }
1666
                }
1667
        }
1668
        // Store new connection in the map
1669
        s.leafs[cid] = c
1,276✔
1670
        s.mu.Unlock()
1,276✔
1671
        s.removeFromTempClients(cid)
1,276✔
1672

1,276✔
1673
        // If applicable, evict the old one.
1,276✔
1674
        if old != nil {
1,278✔
1675
                old.sendErrAndErr(DuplicateRemoteLeafnodeConnection.String())
2✔
1676
                old.closeConnection(DuplicateRemoteLeafnodeConnection)
2✔
1677
                c.Warnf("Replacing connection from same server")
2✔
1678
        }
2✔
1679

1680
        srvDecorated := func() string {
1,485✔
1681
                if myClustName == _EMPTY_ {
231✔
1682
                        return mySrvName
22✔
1683
                }
22✔
1684
                return fmt.Sprintf("%s/%s", mySrvName, myClustName)
187✔
1685
        }
1686

1687
        opts := s.getOpts()
1,276✔
1688
        sysAcc := s.SystemAccount()
1,276✔
1689
        js := s.getJetStream()
1,276✔
1690
        var meta *raft
1,276✔
1691
        if js != nil {
1,799✔
1692
                if mg := js.getMetaGroup(); mg != nil {
953✔
1693
                        meta = mg.(*raft)
430✔
1694
                }
430✔
1695
        }
1696
        blockMappingOutgoing := false
1,276✔
1697
        // Deny (non domain) JetStream API traffic unless system account is shared
1,276✔
1698
        // and domain names are identical and extending is not disabled
1,276✔
1699

1,276✔
1700
        // Check if backwards compatibility has been enabled and needs to be acted on
1,276✔
1701
        forceSysAccDeny := false
1,276✔
1702
        if len(opts.JsAccDefaultDomain) > 0 {
1,313✔
1703
                if acc == sysAcc {
48✔
1704
                        for _, d := range opts.JsAccDefaultDomain {
22✔
1705
                                if d == _EMPTY_ {
19✔
1706
                                        // Extending JetStream via leaf node is mutually exclusive with a domain mapping to the empty/default domain.
8✔
1707
                                        // As soon as one mapping to "" is found, disable the ability to extend JS via a leaf node.
8✔
1708
                                        c.Noticef("Not extending remote JetStream domain %q due to presence of empty default domain", myRemoteDomain)
8✔
1709
                                        forceSysAccDeny = true
8✔
1710
                                        break
8✔
1711
                                }
1712
                        }
1713
                } else if domain, ok := opts.JsAccDefaultDomain[accName]; ok && domain == _EMPTY_ {
41✔
1714
                        // for backwards compatibility with old setups that do not have a domain name set
15✔
1715
                        c.Debugf("Skipping deny %q for account %q due to default domain", jsAllAPI, accName)
15✔
1716
                        return
15✔
1717
                }
15✔
1718
        }
1719

1720
        // If the server has JS disabled, it may still be part of a JetStream that could be extended.
1721
        // This is either signaled by js being disabled and a domain set,
1722
        // or in cases where no domain name exists, an extension hint is set.
1723
        // However, this is only relevant in mixed setups.
1724
        //
1725
        // If the system account connects but default domains are present, JetStream can't be extended.
1726
        if opts.JetStreamDomain != myRemoteDomain || (!opts.JetStream && (opts.JetStreamDomain == _EMPTY_ && opts.JetStreamExtHint != jsWillExtend)) ||
1,261✔
1727
                sysAcc == nil || acc == nil || forceSysAccDeny {
2,364✔
1728
                // If domain names mismatch always deny. This applies to system accounts as well as non system accounts.
1,103✔
1729
                // Not having a system account, account or JetStream disabled is considered a mismatch as well.
1,103✔
1730
                if acc != nil && acc == sysAcc {
1,240✔
1731
                        c.Noticef("System account connected from %s", srvDecorated())
137✔
1732
                        c.Noticef("JetStream not extended, domains differ")
137✔
1733
                        c.mergeDenyPermissionsLocked(both, denyAllJs)
137✔
1734
                        // When a remote with a system account is present in a server, unless otherwise disabled, the server will be
137✔
1735
                        // started in observer mode. Now that it is clear that this not used, turn the observer mode off.
137✔
1736
                        if solicited && meta != nil && meta.IsObserver() {
166✔
1737
                                meta.setObserver(false, extNotExtended)
29✔
1738
                                c.Debugf("Turning JetStream metadata controller Observer Mode off")
29✔
1739
                                // Take note that the domain was not extended to avoid this state from startup.
29✔
1740
                                writePeerState(js.config.StoreDir, meta.currentPeerState())
29✔
1741
                                // Meta controller can't be leader yet.
29✔
1742
                                // Yet it is possible that due to observer mode every server already stopped campaigning.
29✔
1743
                                // Therefore this server needs to be kicked into campaigning gear explicitly.
29✔
1744
                                meta.Campaign()
29✔
1745
                        }
29✔
1746
                } else {
966✔
1747
                        c.Noticef("JetStream using domains: local %q, remote %q", opts.JetStreamDomain, myRemoteDomain)
966✔
1748
                        c.mergeDenyPermissionsLocked(both, denyAllClientJs)
966✔
1749
                }
966✔
1750
                blockMappingOutgoing = true
1,103✔
1751
        } else if acc == sysAcc {
230✔
1752
                // system account and same domain
72✔
1753
                s.sys.client.Noticef("Extending JetStream domain %q as System Account connected from server %s",
72✔
1754
                        myRemoteDomain, srvDecorated())
72✔
1755
                // In an extension use case, pin leadership to server remotes connect to.
72✔
1756
                // Therefore, server with a remote that are not already in observer mode, need to be put into it.
72✔
1757
                if solicited && meta != nil && !meta.IsObserver() {
76✔
1758
                        meta.setObserver(true, extExtended)
4✔
1759
                        c.Debugf("Turning JetStream metadata controller Observer Mode on - System Account Connected")
4✔
1760
                        // Take note that the domain was not extended to avoid this state next startup.
4✔
1761
                        writePeerState(js.config.StoreDir, meta.currentPeerState())
4✔
1762
                        // If this server is the leader already, step down so a new leader can be elected (that is not an observer)
4✔
1763
                        meta.StepDown()
4✔
1764
                }
4✔
1765
        } else {
86✔
1766
                // This deny is needed in all cases (system account shared or not)
86✔
1767
                // If the system account is shared, jsAllAPI traffic will go through the system account.
86✔
1768
                // So in order to prevent duplicate delivery (from system and actual account) suppress it on the account.
86✔
1769
                // If the system account is NOT shared, jsAllAPI traffic has no business
86✔
1770
                c.Debugf("Adding deny %+v for account %q", denyAllClientJs, accName)
86✔
1771
                c.mergeDenyPermissionsLocked(both, denyAllClientJs)
86✔
1772
        }
86✔
1773
        // If we have a specified JetStream domain we will want to add a mapping to
1774
        // allow access cross domain for each non-system account.
1775
        if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
1,492✔
1776
                for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
2,310✔
1777
                        if err := acc.AddMapping(src, dest); err != nil {
2,079✔
1778
                                c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
×
1779
                        } else {
2,079✔
1780
                                c.Debugf("Adding JetStream Domain Mapping %q -> %s to account %q", src, dest, accName)
2,079✔
1781
                        }
2,079✔
1782
                }
1783
                if blockMappingOutgoing {
431✔
1784
                        src := fmt.Sprintf(jsDomainAPI, opts.JetStreamDomain)
200✔
1785
                        // make sure that messages intended for this domain, do not leave the cluster via this leaf node connection
200✔
1786
                        // This is a guard against a miss-config with two identical domain names and will only cover some forms
200✔
1787
                        // of this issue, not all of them.
200✔
1788
                        // This guards against a hub and a spoke having the same domain name.
200✔
1789
                        // But not two spokes having the same one and the request coming from the hub.
200✔
1790
                        c.mergeDenyPermissionsLocked(pub, []string{src})
200✔
1791
                        c.Debugf("Adding deny %q for outgoing messages to account %q", src, accName)
200✔
1792
                }
200✔
1793
        }
1794
}
1795

1796
func (s *Server) removeLeafNodeConnection(c *client) {
1,610✔
1797
        c.mu.Lock()
1,610✔
1798
        cid := c.cid
1,610✔
1799
        if c.leaf != nil {
3,220✔
1800
                if c.leaf.tsubt != nil {
2,769✔
1801
                        c.leaf.tsubt.Stop()
1,159✔
1802
                        c.leaf.tsubt = nil
1,159✔
1803
                }
1,159✔
1804
                if c.leaf.gwSub != nil {
2,227✔
1805
                        s.gwLeafSubs.Remove(c.leaf.gwSub)
617✔
1806
                        // We need to set this to nil for GC to release the connection
617✔
1807
                        c.leaf.gwSub = nil
617✔
1808
                }
617✔
1809
        }
1810
        proxyKey := c.proxyKey
1,610✔
1811
        c.mu.Unlock()
1,610✔
1812
        s.mu.Lock()
1,610✔
1813
        delete(s.leafs, cid)
1,610✔
1814
        if proxyKey != _EMPTY_ {
1,614✔
1815
                s.removeProxiedConn(proxyKey, cid)
4✔
1816
        }
4✔
1817
        s.mu.Unlock()
1,610✔
1818
        s.removeFromTempClients(cid)
1,610✔
1819
}
1820

1821
// Connect information for solicited leafnodes.
1822
type leafConnectInfo struct {
1823
        Version   string   `json:"version,omitempty"`
1824
        Nkey      string   `json:"nkey,omitempty"`
1825
        JWT       string   `json:"jwt,omitempty"`
1826
        Sig       string   `json:"sig,omitempty"`
1827
        User      string   `json:"user,omitempty"`
1828
        Pass      string   `json:"pass,omitempty"`
1829
        Token     string   `json:"auth_token,omitempty"`
1830
        ID        string   `json:"server_id,omitempty"`
1831
        Domain    string   `json:"domain,omitempty"`
1832
        Name      string   `json:"name,omitempty"`
1833
        Hub       bool     `json:"is_hub,omitempty"`
1834
        Cluster   string   `json:"cluster,omitempty"`
1835
        Headers   bool     `json:"headers,omitempty"`
1836
        JetStream bool     `json:"jetstream,omitempty"`
1837
        DenyPub   []string `json:"deny_pub,omitempty"`
1838

1839
        // There was an existing field called:
1840
        // >> Comp bool `json:"compression,omitempty"`
1841
        // that has never been used. With support for compression, we now need
1842
        // a field that is a string. So we use a different json tag:
1843
        Compression string `json:"compress_mode,omitempty"`
1844

1845
        // Just used to detect wrong connection attempts.
1846
        Gateway string `json:"gateway,omitempty"`
1847

1848
        // Tells the accept side which account the remote is binding to.
1849
        RemoteAccount string `json:"remote_account,omitempty"`
1850

1851
        // The accept side of a LEAF connection, unlike ROUTER and GATEWAY, receives
1852
        // only the CONNECT protocol, and no INFO. So we need to send the protocol
1853
        // version as part of the CONNECT. It will indicate if a connection supports
1854
        // some features, such as message tracing.
1855
        // We use `protocol` as the JSON tag, so this is automatically unmarshal'ed
1856
        // in the low level process CONNECT.
1857
        Proto int `json:"protocol,omitempty"`
1858
}
1859

1860
// processLeafNodeConnect will process the inbound connect args.
1861
// Once we are here we are bound to an account, so can send any interest that
1862
// we would have to the other side.
1863
func (c *client) processLeafNodeConnect(s *Server, arg []byte, lang string) error {
664✔
1864
        // Way to detect clients that incorrectly connect to the route listen
664✔
1865
        // port. Client provided "lang" in the CONNECT protocol while LEAFNODEs don't.
664✔
1866
        if lang != _EMPTY_ {
664✔
1867
                c.sendErrAndErr(ErrClientConnectedToLeafNodePort.Error())
×
1868
                c.closeConnection(WrongPort)
×
1869
                return ErrClientConnectedToLeafNodePort
×
1870
        }
×
1871

1872
        // Unmarshal as a leaf node connect protocol
1873
        proto := &leafConnectInfo{}
664✔
1874
        if err := json.Unmarshal(arg, proto); err != nil {
664✔
1875
                return err
×
1876
        }
×
1877

1878
        // Reject a cluster that contains spaces.
1879
        if proto.Cluster != _EMPTY_ && strings.Contains(proto.Cluster, " ") {
665✔
1880
                c.sendErrAndErr(ErrClusterNameHasSpaces.Error())
1✔
1881
                c.closeConnection(ProtocolViolation)
1✔
1882
                return ErrClusterNameHasSpaces
1✔
1883
        }
1✔
1884

1885
        // Check for cluster name collisions.
1886
        if cn := s.cachedClusterName(); cn != _EMPTY_ && proto.Cluster != _EMPTY_ && proto.Cluster == cn {
666✔
1887
                c.sendErrAndErr(ErrLeafNodeHasSameClusterName.Error())
3✔
1888
                c.closeConnection(ClusterNamesIdentical)
3✔
1889
                return ErrLeafNodeHasSameClusterName
3✔
1890
        }
3✔
1891

1892
        // Reject if this has Gateway which means that it would be from a gateway
1893
        // connection that incorrectly connects to the leafnode port.
1894
        if proto.Gateway != _EMPTY_ {
660✔
1895
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the leafnode port", proto.Gateway)
×
1896
                c.Errorf(errTxt)
×
1897
                c.sendErr(errTxt)
×
1898
                c.closeConnection(WrongGateway)
×
1899
                return ErrWrongGateway
×
1900
        }
×
1901

1902
        if mv := s.getOpts().LeafNode.MinVersion; mv != _EMPTY_ {
662✔
1903
                major, minor, update, _ := versionComponents(mv)
2✔
1904
                if !versionAtLeast(proto.Version, major, minor, update) {
3✔
1905
                        // We are going to send back an INFO because otherwise recent
1✔
1906
                        // versions of the remote server would simply break the connection
1✔
1907
                        // after 2 seconds if not receiving it. Instead, we want the
1✔
1908
                        // other side to just "stall" until we finish waiting for the holding
1✔
1909
                        // period and close the connection below.
1✔
1910
                        s.sendPermsAndAccountInfo(c)
1✔
1911
                        c.sendErrAndErr(fmt.Sprintf("connection rejected since minimum version required is %q", mv))
1✔
1912
                        select {
1✔
1913
                        case <-c.srv.quitCh:
1✔
1914
                        case <-time.After(leafNodeWaitBeforeClose):
×
1915
                        }
1916
                        c.closeConnection(MinimumVersionRequired)
1✔
1917
                        return ErrMinimumVersionRequired
1✔
1918
                }
1919
        }
1920

1921
        // Check if this server supports headers.
1922
        supportHeaders := c.srv.supportsHeaders()
659✔
1923

659✔
1924
        c.mu.Lock()
659✔
1925
        // Leaf Nodes do not do echo or verbose or pedantic.
659✔
1926
        c.opts.Verbose = false
659✔
1927
        c.opts.Echo = false
659✔
1928
        c.opts.Pedantic = false
659✔
1929
        // This inbound connection will be marked as supporting headers if this server
659✔
1930
        // support headers and the remote has sent in the CONNECT protocol that it does
659✔
1931
        // support headers too.
659✔
1932
        c.headers = supportHeaders && proto.Headers
659✔
1933
        // If the compression level is still not set, set it based on what has been
659✔
1934
        // given to us in the CONNECT protocol.
659✔
1935
        if c.leaf.compression == _EMPTY_ {
794✔
1936
                // But if proto.Compression is _EMPTY_, set it to CompressionNotSupported
135✔
1937
                if proto.Compression == _EMPTY_ {
175✔
1938
                        c.leaf.compression = CompressionNotSupported
40✔
1939
                } else {
135✔
1940
                        c.leaf.compression = proto.Compression
95✔
1941
                }
95✔
1942
        }
1943

1944
        // Remember the remote server.
1945
        c.leaf.remoteServer = proto.Name
659✔
1946
        // Remember the remote account name
659✔
1947
        c.leaf.remoteAccName = proto.RemoteAccount
659✔
1948

659✔
1949
        // If the other side has declared itself a hub, so we will take on the spoke role.
659✔
1950
        if proto.Hub {
665✔
1951
                c.leaf.isSpoke = true
6✔
1952
        }
6✔
1953

1954
        // The soliciting side is part of a cluster.
1955
        if proto.Cluster != _EMPTY_ {
1,164✔
1956
                c.leaf.remoteCluster = proto.Cluster
505✔
1957
        }
505✔
1958

1959
        c.leaf.remoteDomain = proto.Domain
659✔
1960

659✔
1961
        // When a leaf solicits a connection to a hub, the perms that it will use on the soliciting leafnode's
659✔
1962
        // behalf are correct for them, but inside the hub need to be reversed since data is flowing in the opposite direction.
659✔
1963
        if !c.isSolicitedLeafNode() && c.perms != nil {
672✔
1964
                sp, pp := c.perms.sub, c.perms.pub
13✔
1965
                c.perms.sub, c.perms.pub = pp, sp
13✔
1966
                if c.opts.Import != nil {
25✔
1967
                        c.darray = c.opts.Import.Deny
12✔
1968
                } else {
13✔
1969
                        c.darray = nil
1✔
1970
                }
1✔
1971
        }
1972

1973
        // Set the Ping timer
1974
        c.setFirstPingTimer()
659✔
1975

659✔
1976
        // If we received pub deny permissions from the other end, merge with existing ones.
659✔
1977
        c.mergeDenyPermissions(pub, proto.DenyPub)
659✔
1978

659✔
1979
        acc := c.acc
659✔
1980
        c.mu.Unlock()
659✔
1981

659✔
1982
        // Register the cluster, even if empty, as long as we are acting as a hub.
659✔
1983
        if !proto.Hub {
1,312✔
1984
                acc.registerLeafNodeCluster(proto.Cluster)
653✔
1985
        }
653✔
1986

1987
        // Add in the leafnode here since we passed through auth at this point.
1988
        s.addLeafNodeConnection(c, proto.Name, proto.Cluster, true)
659✔
1989

659✔
1990
        // If we have permissions bound to this leafnode we need to send then back to the
659✔
1991
        // origin server for local enforcement.
659✔
1992
        s.sendPermsAndAccountInfo(c)
659✔
1993

659✔
1994
        // Create and initialize the smap since we know our bound account now.
659✔
1995
        // This will send all registered subs too.
659✔
1996
        s.initLeafNodeSmapAndSendSubs(c)
659✔
1997

659✔
1998
        // Announce the account connect event for a leaf node.
659✔
1999
        // This will be a no-op as needed.
659✔
2000
        s.sendLeafNodeConnect(c.acc)
659✔
2001

659✔
2002
        // If we have JS enabled and so does the other side, we will
659✔
2003
        // check to see if we need to kick any internal source or mirror consumers.
659✔
2004
        if proto.JetStream {
899✔
2005
                s.checkInternalSyncConsumers(acc, proto.Domain)
240✔
2006
        }
240✔
2007
        return nil
659✔
2008
}
2009

2010
// checkInternalSyncConsumers
2011
func (s *Server) checkInternalSyncConsumers(acc *Account, remoteDomain string) {
501✔
2012
        // Grab our js
501✔
2013
        js := s.getJetStream()
501✔
2014

501✔
2015
        // Only applicable if we have JS and the leafnode has JS as well.
501✔
2016
        // We check for remote JS outside.
501✔
2017
        if !js.isEnabled() || acc == nil {
625✔
2018
                return
124✔
2019
        }
124✔
2020

2021
        // We will check all streams in our local account. They must be a leader and
2022
        // be sourcing or mirroring. We will check the external config on the stream itself
2023
        // if this is cross domain, or if the remote domain is empty, meaning we might be
2024
        // extedning the system across this leafnode connection and hence we would be extending
2025
        // our own domain.
2026
        jsa := js.lookupAccount(acc)
377✔
2027
        if jsa == nil {
489✔
2028
                return
112✔
2029
        }
112✔
2030
        var streams []*stream
265✔
2031
        jsa.mu.RLock()
265✔
2032
        for _, mset := range jsa.streams {
287✔
2033
                mset.cfgMu.RLock()
22✔
2034
                // We need to have a mirror or source defined.
22✔
2035
                // We do not want to force another lock here to look for leader status,
22✔
2036
                // so collect and after we release jsa will make sure.
22✔
2037
                if mset.cfg.Mirror != nil || len(mset.cfg.Sources) > 0 {
26✔
2038
                        streams = append(streams, mset)
4✔
2039
                }
4✔
2040
                mset.cfgMu.RUnlock()
22✔
2041
        }
2042
        jsa.mu.RUnlock()
265✔
2043

265✔
2044
        // Now loop through all candidates and check if we are the leader and have NOT
265✔
2045
        // created the sync up consumer.
265✔
2046
        for _, mset := range streams {
269✔
2047
                mset.retryDisconnectedSyncConsumers(remoteDomain)
4✔
2048
        }
4✔
2049
}
2050

2051
// Returns the remote cluster name. This is set only once so does not require a lock.
2052
func (c *client) remoteCluster() string {
130,363✔
2053
        if c.leaf == nil {
130,363✔
2054
                return _EMPTY_
×
2055
        }
×
2056
        return c.leaf.remoteCluster
130,363✔
2057
}
2058

2059
// Sends back an info block to the soliciting leafnode to let it know about
2060
// its permission settings for local enforcement.
2061
func (s *Server) sendPermsAndAccountInfo(c *client) {
660✔
2062
        // Copy
660✔
2063
        s.mu.Lock()
660✔
2064
        info := s.copyLeafNodeInfo()
660✔
2065
        s.mu.Unlock()
660✔
2066
        c.mu.Lock()
660✔
2067
        info.CID = c.cid
660✔
2068
        info.Import = c.opts.Import
660✔
2069
        info.Export = c.opts.Export
660✔
2070
        info.RemoteAccount = c.acc.Name
660✔
2071
        // s.SystemAccount() uses an atomic operation and does not get the server lock, so this is safe.
660✔
2072
        info.IsSystemAccount = c.acc == s.SystemAccount()
660✔
2073
        info.ConnectInfo = true
660✔
2074
        c.enqueueProto(generateInfoJSON(info))
660✔
2075
        c.mu.Unlock()
660✔
2076
}
660✔
2077

2078
// Snapshot the current subscriptions from the sublist into our smap which
2079
// we will keep updated from now on.
2080
// Also send the registered subscriptions.
2081
func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
1,276✔
2082
        acc := c.acc
1,276✔
2083
        if acc == nil {
1,276✔
2084
                c.Debugf("Leafnode does not have an account bound")
×
2085
                return
×
2086
        }
×
2087
        // Collect all account subs here.
2088
        _subs := [1024]*subscription{}
1,276✔
2089
        subs := _subs[:0]
1,276✔
2090
        ims := []string{}
1,276✔
2091

1,276✔
2092
        // Hold the client lock otherwise there can be a race and miss some subs.
1,276✔
2093
        c.mu.Lock()
1,276✔
2094
        defer c.mu.Unlock()
1,276✔
2095

1,276✔
2096
        acc.mu.RLock()
1,276✔
2097
        accName := acc.Name
1,276✔
2098
        accNTag := acc.nameTag
1,276✔
2099

1,276✔
2100
        // To make printing look better when no friendly name present.
1,276✔
2101
        if accNTag != _EMPTY_ {
1,287✔
2102
                accNTag = "/" + accNTag
11✔
2103
        }
11✔
2104

2105
        // If we are solicited we only send interest for local clients.
2106
        if c.isSpokeLeafNode() {
1,893✔
2107
                acc.sl.localSubs(&subs, true)
617✔
2108
        } else {
1,276✔
2109
                acc.sl.All(&subs)
659✔
2110
        }
659✔
2111

2112
        // Check if we have an existing service import reply.
2113
        siReply := copyBytes(acc.siReply)
1,276✔
2114

1,276✔
2115
        // Since leaf nodes only send on interest, if the bound
1,276✔
2116
        // account has import services we need to send those over.
1,276✔
2117
        for isubj := range acc.imports.services {
6,001✔
2118
                if c.isSpokeLeafNode() && !c.canSubscribe(isubj) {
5,000✔
2119
                        c.Debugf("Not permitted to import service %q on behalf of %s%s", isubj, accName, accNTag)
275✔
2120
                        continue
275✔
2121
                }
2122
                ims = append(ims, isubj)
4,450✔
2123
        }
2124
        // Likewise for mappings.
2125
        for _, m := range acc.mappings {
3,475✔
2126
                if c.isSpokeLeafNode() && !c.canSubscribe(m.src) {
2,235✔
2127
                        c.Debugf("Not permitted to import mapping %q on behalf of %s%s", m.src, accName, accNTag)
36✔
2128
                        continue
36✔
2129
                }
2130
                ims = append(ims, m.src)
2,163✔
2131
        }
2132

2133
        // Create a unique subject that will be used for loop detection.
2134
        lds := acc.lds
1,276✔
2135
        acc.mu.RUnlock()
1,276✔
2136

1,276✔
2137
        // Check if we have to create the LDS.
1,276✔
2138
        if lds == _EMPTY_ {
2,270✔
2139
                lds = leafNodeLoopDetectionSubjectPrefix + nuid.Next()
994✔
2140
                acc.mu.Lock()
994✔
2141
                acc.lds = lds
994✔
2142
                acc.mu.Unlock()
994✔
2143
        }
994✔
2144

2145
        // Now check for gateway interest. Leafnodes will put this into
2146
        // the proper mode to propagate, but they are not held in the account.
2147
        gwsa := [16]*client{}
1,276✔
2148
        gws := gwsa[:0]
1,276✔
2149
        s.getOutboundGatewayConnections(&gws)
1,276✔
2150
        for _, cgw := range gws {
1,358✔
2151
                cgw.mu.Lock()
82✔
2152
                gw := cgw.gw
82✔
2153
                cgw.mu.Unlock()
82✔
2154
                if gw != nil {
164✔
2155
                        if ei, _ := gw.outsim.Load(accName); ei != nil {
164✔
2156
                                if e := ei.(*outsie); e != nil && e.sl != nil {
164✔
2157
                                        e.sl.All(&subs)
82✔
2158
                                }
82✔
2159
                        }
2160
                }
2161
        }
2162

2163
        applyGlobalRouting := s.gateway.enabled
1,276✔
2164
        if c.isSpokeLeafNode() {
1,893✔
2165
                // Add a fake subscription for this solicited leafnode connection
617✔
2166
                // so that we can send back directly for mapped GW replies.
617✔
2167
                // We need to keep track of this subscription so it can be removed
617✔
2168
                // when the connection is closed so that the GC can release it.
617✔
2169
                c.leaf.gwSub = &subscription{client: c, subject: []byte(gwReplyPrefix + ">")}
617✔
2170
                c.srv.gwLeafSubs.Insert(c.leaf.gwSub)
617✔
2171
        }
617✔
2172

2173
        // Now walk the results and add them to our smap
2174
        rc := c.leaf.remoteCluster
1,276✔
2175
        c.leaf.smap = make(map[string]int32)
1,276✔
2176
        for _, sub := range subs {
38,878✔
2177
                // Check perms regardless of role.
37,602✔
2178
                if c.perms != nil && !c.canSubscribe(string(sub.subject)) {
39,967✔
2179
                        c.Debugf("Not permitted to subscribe to %q on behalf of %s%s", sub.subject, accName, accNTag)
2,365✔
2180
                        continue
2,365✔
2181
                }
2182
                // We ignore ourselves here.
2183
                // Also don't add the subscription if it has a origin cluster and the
2184
                // cluster name matches the one of the client we are sending to.
2185
                if c != sub.client && (sub.origin == nil || (bytesToString(sub.origin) != rc)) {
65,271✔
2186
                        count := int32(1)
30,034✔
2187
                        if len(sub.queue) > 0 && sub.qw > 0 {
30,046✔
2188
                                count = sub.qw
12✔
2189
                        }
12✔
2190
                        c.leaf.smap[keyFromSub(sub)] += count
30,034✔
2191
                        if c.leaf.tsub == nil {
31,233✔
2192
                                c.leaf.tsub = make(map[*subscription]struct{})
1,199✔
2193
                        }
1,199✔
2194
                        c.leaf.tsub[sub] = struct{}{}
30,034✔
2195
                }
2196
        }
2197
        // FIXME(dlc) - We need to update appropriately on an account claims update.
2198
        for _, isubj := range ims {
7,889✔
2199
                c.leaf.smap[isubj]++
6,613✔
2200
        }
6,613✔
2201
        // If we have gateways enabled we need to make sure the other side sends us responses
2202
        // that have been augmented from the original subscription.
2203
        // TODO(dlc) - Should we lock this down more?
2204
        if applyGlobalRouting {
1,379✔
2205
                c.leaf.smap[oldGWReplyPrefix+"*.>"]++
103✔
2206
                c.leaf.smap[gwReplyPrefix+">"]++
103✔
2207
        }
103✔
2208
        // Detect loops by subscribing to a specific subject and checking
2209
        // if this sub is coming back to us.
2210
        c.leaf.smap[lds]++
1,276✔
2211

1,276✔
2212
        // Check if we need to add an existing siReply to our map.
1,276✔
2213
        // This will be a prefix so add on the wildcard.
1,276✔
2214
        if siReply != nil {
1,297✔
2215
                wcsub := append(siReply, '>')
21✔
2216
                c.leaf.smap[string(wcsub)]++
21✔
2217
        }
21✔
2218
        // Queue all protocols. There is no max pending limit for LN connection,
2219
        // so we don't need chunking. The writes will happen from the writeLoop.
2220
        var b bytes.Buffer
1,276✔
2221
        for key, n := range c.leaf.smap {
27,497✔
2222
                c.writeLeafSub(&b, key, n)
26,221✔
2223
        }
26,221✔
2224
        if b.Len() > 0 {
2,552✔
2225
                c.enqueueProto(b.Bytes())
1,276✔
2226
        }
1,276✔
2227
        if c.leaf.tsub != nil {
2,476✔
2228
                // Clear the tsub map after 5 seconds.
1,200✔
2229
                c.leaf.tsubt = time.AfterFunc(5*time.Second, func() {
1,241✔
2230
                        c.mu.Lock()
41✔
2231
                        if c.leaf != nil {
82✔
2232
                                c.leaf.tsub = nil
41✔
2233
                                c.leaf.tsubt = nil
41✔
2234
                        }
41✔
2235
                        c.mu.Unlock()
41✔
2236
                })
2237
        }
2238
}
2239

2240
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
2241
func (s *Server) updateInterestForAccountOnGateway(accName string, sub *subscription, delta int32) {
198,478✔
2242
        acc, err := s.LookupAccount(accName)
198,478✔
2243
        if acc == nil || err != nil {
198,637✔
2244
                s.Debugf("No or bad account for %q, failed to update interest from gateway", accName)
159✔
2245
                return
159✔
2246
        }
159✔
2247
        acc.updateLeafNodes(sub, delta)
198,319✔
2248
}
2249

2250
// updateLeafNodes will make sure to update the account smap for the subscription.
2251
// Will also forward to all leaf nodes as needed.
2252
func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
2,421,163✔
2253
        if acc == nil || sub == nil {
2,421,163✔
2254
                return
×
2255
        }
×
2256

2257
        // We will do checks for no leafnodes and same cluster here inline and under the
2258
        // general account read lock.
2259
        // If we feel we need to update the leafnodes we will do that out of line to avoid
2260
        // blocking routes or GWs.
2261

2262
        acc.mu.RLock()
2,421,163✔
2263
        // First check if we even have leafnodes here.
2,421,163✔
2264
        if acc.nleafs == 0 {
4,775,860✔
2265
                acc.mu.RUnlock()
2,354,697✔
2266
                return
2,354,697✔
2267
        }
2,354,697✔
2268

2269
        // Is this a loop detection subject.
2270
        isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
66,466✔
2271

66,466✔
2272
        // Capture the cluster even if its empty.
66,466✔
2273
        var cluster string
66,466✔
2274
        if sub.origin != nil {
114,622✔
2275
                cluster = bytesToString(sub.origin)
48,156✔
2276
        }
48,156✔
2277

2278
        // If we have an isolated cluster we can return early, as long as it is not a loop detection subject.
2279
        // Empty clusters will return false for the check.
2280
        if !isLDS && acc.isLeafNodeClusterIsolated(cluster) {
87,175✔
2281
                acc.mu.RUnlock()
20,709✔
2282
                return
20,709✔
2283
        }
20,709✔
2284

2285
        // We can release the general account lock.
2286
        acc.mu.RUnlock()
45,757✔
2287

45,757✔
2288
        // We can hold the list lock here to avoid having to copy a large slice.
45,757✔
2289
        acc.lmu.RLock()
45,757✔
2290
        defer acc.lmu.RUnlock()
45,757✔
2291

45,757✔
2292
        // Do this once.
45,757✔
2293
        subject := string(sub.subject)
45,757✔
2294

45,757✔
2295
        // Walk the connected leafnodes.
45,757✔
2296
        for _, ln := range acc.lleafs {
103,345✔
2297
                if ln == sub.client {
87,178✔
2298
                        continue
29,590✔
2299
                }
2300
                // Check to make sure this sub does not have an origin cluster that matches the leafnode.
2301
                ln.mu.Lock()
27,998✔
2302
                // If skipped, make sure that we still let go the "$LDS." subscription that allows
27,998✔
2303
                // the detection of loops as long as different cluster.
27,998✔
2304
                clusterDifferent := cluster != ln.remoteCluster()
27,998✔
2305
                if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
51,542✔
2306
                        ln.updateSmap(sub, delta, isLDS)
23,544✔
2307
                }
23,544✔
2308
                ln.mu.Unlock()
27,998✔
2309
        }
2310
}
2311

2312
// This will make an update to our internal smap and determine if we should send out
2313
// an interest update to the remote side.
2314
// Lock should be held.
2315
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
23,544✔
2316
        if c.leaf.smap == nil {
23,549✔
2317
                return
5✔
2318
        }
5✔
2319

2320
        // If we are solicited make sure this is a local client or a non-solicited leaf node
2321
        skind := sub.client.kind
23,539✔
2322
        updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
23,539✔
2323
        if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
31,754✔
2324
                return
8,215✔
2325
        }
8,215✔
2326

2327
        // For additions, check if that sub has just been processed during initLeafNodeSmapAndSendSubs
2328
        if delta > 0 && c.leaf.tsub != nil {
22,690✔
2329
                if _, present := c.leaf.tsub[sub]; present {
7,366✔
2330
                        delete(c.leaf.tsub, sub)
×
2331
                        if len(c.leaf.tsub) == 0 {
×
2332
                                c.leaf.tsub = nil
×
2333
                                c.leaf.tsubt.Stop()
×
2334
                                c.leaf.tsubt = nil
×
2335
                        }
×
2336
                        return
×
2337
                }
2338
        }
2339

2340
        key := keyFromSub(sub)
15,324✔
2341
        n, ok := c.leaf.smap[key]
15,324✔
2342
        if delta < 0 && !ok {
16,280✔
2343
                return
956✔
2344
        }
956✔
2345

2346
        // We will update if its a queue, if count is zero (or negative), or we were 0 and are N > 0.
2347
        update := sub.queue != nil || (n <= 0 && n+delta > 0) || (n > 0 && n+delta <= 0)
14,368✔
2348
        n += delta
14,368✔
2349
        if n > 0 {
25,159✔
2350
                c.leaf.smap[key] = n
10,791✔
2351
        } else {
14,368✔
2352
                delete(c.leaf.smap, key)
3,577✔
2353
        }
3,577✔
2354
        if update {
23,864✔
2355
                c.sendLeafNodeSubUpdate(key, n)
9,496✔
2356
        }
9,496✔
2357
}
2358

2359
// Used to force add subjects to the subject map.
2360
func (c *client) forceAddToSmap(subj string) {
4✔
2361
        c.mu.Lock()
4✔
2362
        defer c.mu.Unlock()
4✔
2363

4✔
2364
        if c.leaf.smap == nil {
4✔
2365
                return
×
2366
        }
×
2367
        n := c.leaf.smap[subj]
4✔
2368
        if n != 0 {
5✔
2369
                return
1✔
2370
        }
1✔
2371
        // Place into the map since it was not there.
2372
        c.leaf.smap[subj] = 1
3✔
2373
        c.sendLeafNodeSubUpdate(subj, 1)
3✔
2374
}
2375

2376
// Used to force remove a subject from the subject map.
2377
func (c *client) forceRemoveFromSmap(subj string) {
1✔
2378
        c.mu.Lock()
1✔
2379
        defer c.mu.Unlock()
1✔
2380

1✔
2381
        if c.leaf.smap == nil {
1✔
2382
                return
×
2383
        }
×
2384
        n := c.leaf.smap[subj]
1✔
2385
        if n == 0 {
1✔
2386
                return
×
2387
        }
×
2388
        n--
1✔
2389
        if n == 0 {
2✔
2390
                // Remove is now zero
1✔
2391
                delete(c.leaf.smap, subj)
1✔
2392
                c.sendLeafNodeSubUpdate(subj, 0)
1✔
2393
        } else {
1✔
2394
                c.leaf.smap[subj] = n
×
2395
        }
×
2396
}
2397

2398
// Send the subscription interest change to the other side.
2399
// Lock should be held.
2400
func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
9,500✔
2401
        // If we are a spoke, we need to check if we are allowed to send this subscription over to the hub.
9,500✔
2402
        if c.isSpokeLeafNode() {
11,823✔
2403
                checkPerms := true
2,323✔
2404
                if len(key) > 0 && (key[0] == '$' || key[0] == '_') {
3,689✔
2405
                        if strings.HasPrefix(key, leafNodeLoopDetectionSubjectPrefix) ||
1,366✔
2406
                                strings.HasPrefix(key, oldGWReplyPrefix) ||
1,366✔
2407
                                strings.HasPrefix(key, gwReplyPrefix) {
1,450✔
2408
                                checkPerms = false
84✔
2409
                        }
84✔
2410
                }
2411
                if checkPerms {
4,562✔
2412
                        var subject string
2,239✔
2413
                        if sep := strings.IndexByte(key, ' '); sep != -1 {
2,731✔
2414
                                subject = key[:sep]
492✔
2415
                        } else {
2,239✔
2416
                                subject = key
1,747✔
2417
                        }
1,747✔
2418
                        if !c.canSubscribe(subject) {
2,239✔
2419
                                return
×
2420
                        }
×
2421
                }
2422
        }
2423
        // If we are here we can send over to the other side.
2424
        _b := [64]byte{}
9,500✔
2425
        b := bytes.NewBuffer(_b[:0])
9,500✔
2426
        c.writeLeafSub(b, key, n)
9,500✔
2427
        c.enqueueProto(b.Bytes())
9,500✔
2428
}
2429

2430
// Helper function to build the key.
2431
func keyFromSub(sub *subscription) string {
46,381✔
2432
        var sb strings.Builder
46,381✔
2433
        sb.Grow(len(sub.subject) + len(sub.queue) + 1)
46,381✔
2434
        sb.Write(sub.subject)
46,381✔
2435
        if sub.queue != nil {
50,190✔
2436
                // Just make the key subject spc group, e.g. 'foo bar'
3,809✔
2437
                sb.WriteByte(' ')
3,809✔
2438
                sb.Write(sub.queue)
3,809✔
2439
        }
3,809✔
2440
        return sb.String()
46,381✔
2441
}
2442

2443
const (
2444
        keyRoutedSub         = "R"
2445
        keyRoutedSubByte     = 'R'
2446
        keyRoutedLeafSub     = "L"
2447
        keyRoutedLeafSubByte = 'L'
2448
)
2449

2450
// Helper function to build the key that prevents collisions between normal
2451
// routed subscriptions and routed subscriptions on behalf of a leafnode.
2452
// Keys will look like this:
2453
// "R foo"          -> plain routed sub on "foo"
2454
// "R foo bar"      -> queue routed sub on "foo", queue "bar"
2455
// "L foo bar"      -> plain routed leaf sub on "foo", leaf "bar"
2456
// "L foo bar baz"  -> queue routed sub on "foo", queue "bar", leaf "baz"
2457
func keyFromSubWithOrigin(sub *subscription) string {
673,766✔
2458
        var sb strings.Builder
673,766✔
2459
        sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
673,766✔
2460
        leaf := len(sub.origin) > 0
673,766✔
2461
        if leaf {
690,523✔
2462
                sb.WriteByte(keyRoutedLeafSubByte)
16,757✔
2463
        } else {
673,766✔
2464
                sb.WriteByte(keyRoutedSubByte)
657,009✔
2465
        }
657,009✔
2466
        sb.WriteByte(' ')
673,766✔
2467
        sb.Write(sub.subject)
673,766✔
2468
        if sub.queue != nil {
696,572✔
2469
                sb.WriteByte(' ')
22,806✔
2470
                sb.Write(sub.queue)
22,806✔
2471
        }
22,806✔
2472
        if leaf {
690,523✔
2473
                sb.WriteByte(' ')
16,757✔
2474
                sb.Write(sub.origin)
16,757✔
2475
        }
16,757✔
2476
        return sb.String()
673,766✔
2477
}
2478

2479
// Lock should be held.
2480
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
35,721✔
2481
        if key == _EMPTY_ {
35,721✔
2482
                return
×
2483
        }
×
2484
        if n > 0 {
67,864✔
2485
                w.WriteString("LS+ " + key)
32,143✔
2486
                // Check for queue semantics, if found write n.
32,143✔
2487
                if strings.Contains(key, " ") {
34,458✔
2488
                        w.WriteString(" ")
2,315✔
2489
                        var b [12]byte
2,315✔
2490
                        var i = len(b)
2,315✔
2491
                        for l := n; l > 0; l /= 10 {
5,545✔
2492
                                i--
3,230✔
2493
                                b[i] = digits[l%10]
3,230✔
2494
                        }
3,230✔
2495
                        w.Write(b[i:])
2,315✔
2496
                        if c.trace {
2,315✔
2497
                                arg := fmt.Sprintf("%s %d", key, n)
×
2498
                                c.traceOutOp("LS+", []byte(arg))
×
2499
                        }
×
2500
                } else if c.trace {
30,024✔
2501
                        c.traceOutOp("LS+", []byte(key))
196✔
2502
                }
196✔
2503
        } else {
3,578✔
2504
                w.WriteString("LS- " + key)
3,578✔
2505
                if c.trace {
3,594✔
2506
                        c.traceOutOp("LS-", []byte(key))
16✔
2507
                }
16✔
2508
        }
2509
        w.WriteString(CR_LF)
35,721✔
2510
}
2511

2512
// processLeafSub will process an inbound sub request for the remote leaf node.
2513
func (c *client) processLeafSub(argo []byte) (err error) {
31,790✔
2514
        // Indicate activity.
31,790✔
2515
        c.in.subs++
31,790✔
2516

31,790✔
2517
        srv := c.srv
31,790✔
2518
        if srv == nil {
31,790✔
2519
                return nil
×
2520
        }
×
2521

2522
        // Copy so we do not reference a potentially large buffer
2523
        arg := make([]byte, len(argo))
31,790✔
2524
        copy(arg, argo)
31,790✔
2525

31,790✔
2526
        args := splitArg(arg)
31,790✔
2527
        sub := &subscription{client: c}
31,790✔
2528

31,790✔
2529
        delta := int32(1)
31,790✔
2530
        switch len(args) {
31,790✔
2531
        case 1:
29,569✔
2532
                sub.queue = nil
29,569✔
2533
        case 3:
2,220✔
2534
                sub.queue = args[1]
2,220✔
2535
                sub.qw = int32(parseSize(args[2]))
2,220✔
2536
                // TODO: (ik) We should have a non empty queue name and a queue
2,220✔
2537
                // weight >= 1. For 2.11, we may want to return an error if that
2,220✔
2538
                // is not the case, but for now just overwrite `delta` if queue
2,220✔
2539
                // weight is greater than 1 (it is possible after a reconnect/
2,220✔
2540
                // server restart to receive a queue weight > 1 for a new sub).
2,220✔
2541
                if sub.qw > 1 {
3,843✔
2542
                        delta = sub.qw
1,623✔
2543
                }
1,623✔
2544
        default:
1✔
2545
                return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
1✔
2546
        }
2547
        sub.subject = args[0]
31,789✔
2548

31,789✔
2549
        c.mu.Lock()
31,789✔
2550
        if c.isClosed() {
31,808✔
2551
                c.mu.Unlock()
19✔
2552
                return nil
19✔
2553
        }
19✔
2554

2555
        acc := c.acc
31,770✔
2556
        // Check if we have a loop.
31,770✔
2557
        ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
31,770✔
2558

31,770✔
2559
        if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
31,775✔
2560
                c.mu.Unlock()
5✔
2561
                c.handleLeafNodeLoop(true)
5✔
2562
                return nil
5✔
2563
        }
5✔
2564

2565
        // Check permissions if applicable. (but exclude the $LDS, $GR and _GR_)
2566
        checkPerms := true
31,765✔
2567
        if sub.subject[0] == '$' || sub.subject[0] == '_' {
60,673✔
2568
                if ldsPrefix ||
28,908✔
2569
                        bytes.HasPrefix(sub.subject, []byte(oldGWReplyPrefix)) ||
28,908✔
2570
                        bytes.HasPrefix(sub.subject, []byte(gwReplyPrefix)) {
30,858✔
2571
                        checkPerms = false
1,950✔
2572
                }
1,950✔
2573
        }
2574

2575
        // If we are a hub check that we can publish to this subject.
2576
        if checkPerms {
61,580✔
2577
                subj := string(sub.subject)
29,815✔
2578
                if subjectIsLiteral(subj) && !c.pubAllowedFullCheck(subj, true, true) {
30,119✔
2579
                        c.mu.Unlock()
304✔
2580
                        c.leafSubPermViolation(sub.subject)
304✔
2581
                        c.Debugf(fmt.Sprintf("Permissions Violation for Subscription to %q", sub.subject))
304✔
2582
                        return nil
304✔
2583
                }
304✔
2584
        }
2585

2586
        // Check if we have a maximum on the number of subscriptions.
2587
        if c.subsAtLimit() {
31,469✔
2588
                c.mu.Unlock()
8✔
2589
                c.maxSubsExceeded()
8✔
2590
                return nil
8✔
2591
        }
8✔
2592

2593
        // If we have an origin cluster associated mark that in the sub.
2594
        if rc := c.remoteCluster(); rc != _EMPTY_ {
59,817✔
2595
                sub.origin = []byte(rc)
28,364✔
2596
        }
28,364✔
2597

2598
        // Like Routes, we store local subs by account and subject and optionally queue name.
2599
        // If we have a queue it will have a trailing weight which we do not want.
2600
        if sub.queue != nil {
33,405✔
2601
                sub.sid = arg[:len(arg)-len(args[2])-1]
1,952✔
2602
        } else {
31,453✔
2603
                sub.sid = arg
29,501✔
2604
        }
29,501✔
2605
        key := bytesToString(sub.sid)
31,453✔
2606
        osub := c.subs[key]
31,453✔
2607
        if osub == nil {
61,408✔
2608
                c.subs[key] = sub
29,955✔
2609
                // Now place into the account sl.
29,955✔
2610
                if err := acc.sl.Insert(sub); err != nil {
29,955✔
2611
                        delete(c.subs, key)
×
2612
                        c.mu.Unlock()
×
2613
                        c.Errorf("Could not insert subscription: %v", err)
×
2614
                        c.sendErr("Invalid Subscription")
×
2615
                        return nil
×
2616
                }
×
2617
        } else if sub.queue != nil {
2,995✔
2618
                // For a queue we need to update the weight.
1,497✔
2619
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
1,497✔
2620
                atomic.StoreInt32(&osub.qw, sub.qw)
1,497✔
2621
                acc.sl.UpdateRemoteQSub(osub)
1,497✔
2622
        }
1,497✔
2623
        spoke := c.isSpokeLeafNode()
31,453✔
2624
        c.mu.Unlock()
31,453✔
2625

31,453✔
2626
        // Only add in shadow subs if a new sub or qsub.
31,453✔
2627
        if osub == nil {
61,408✔
2628
                if err := c.addShadowSubscriptions(acc, sub, true); err != nil {
29,955✔
2629
                        c.Errorf(err.Error())
×
2630
                }
×
2631
        }
2632

2633
        // If we are not solicited, treat leaf node subscriptions similar to a
2634
        // client subscription, meaning we forward them to routes, gateways and
2635
        // other leaf nodes as needed.
2636
        if !spoke {
42,484✔
2637
                // If we are routing add to the route map for the associated account.
11,031✔
2638
                srv.updateRouteSubscriptionMap(acc, sub, delta)
11,031✔
2639
                if srv.gateway.enabled {
12,556✔
2640
                        srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
1,525✔
2641
                }
1,525✔
2642
        }
2643
        // Now check on leafnode updates for other leaf nodes. We understand solicited
2644
        // and non-solicited state in this call so we will do the right thing.
2645
        acc.updateLeafNodes(sub, delta)
31,453✔
2646

31,453✔
2647
        return nil
31,453✔
2648
}
2649

2650
// If the leafnode is a solicited, set the connect delay based on default
2651
// or private option (for tests). Sends the error to the other side, log and
2652
// close the connection.
2653
func (c *client) handleLeafNodeLoop(sendErr bool) {
13✔
2654
        accName, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterLoopDetected)
13✔
2655
        errTxt := fmt.Sprintf("Loop detected for leafnode account=%q. Delaying attempt to reconnect for %v", accName, delay)
13✔
2656
        if sendErr {
20✔
2657
                c.sendErr(errTxt)
7✔
2658
        }
7✔
2659

2660
        c.Errorf(errTxt)
13✔
2661
        // If we are here with "sendErr" false, it means that this is the server
13✔
2662
        // that received the error. The other side will have closed the connection,
13✔
2663
        // but does not hurt to close here too.
13✔
2664
        c.closeConnection(ProtocolViolation)
13✔
2665
}
2666

2667
// processLeafUnsub will process an inbound unsub request for the remote leaf node.
2668
func (c *client) processLeafUnsub(arg []byte) error {
3,290✔
2669
        // Indicate any activity, so pub and sub or unsubs.
3,290✔
2670
        c.in.subs++
3,290✔
2671

3,290✔
2672
        acc := c.acc
3,290✔
2673
        srv := c.srv
3,290✔
2674

3,290✔
2675
        c.mu.Lock()
3,290✔
2676
        if c.isClosed() {
3,323✔
2677
                c.mu.Unlock()
33✔
2678
                return nil
33✔
2679
        }
33✔
2680

2681
        spoke := c.isSpokeLeafNode()
3,257✔
2682
        // We store local subs by account and subject and optionally queue name.
3,257✔
2683
        // LS- will have the arg exactly as the key.
3,257✔
2684
        sub, ok := c.subs[string(arg)]
3,257✔
2685
        if !ok {
3,263✔
2686
                // If not found, don't try to update routes/gws/leaf nodes.
6✔
2687
                c.mu.Unlock()
6✔
2688
                return nil
6✔
2689
        }
6✔
2690
        delta := int32(1)
3,251✔
2691
        if len(sub.queue) > 0 {
3,666✔
2692
                delta = sub.qw
415✔
2693
        }
415✔
2694
        c.mu.Unlock()
3,251✔
2695

3,251✔
2696
        c.unsubscribe(acc, sub, true, true)
3,251✔
2697
        if !spoke {
4,286✔
2698
                // If we are routing subtract from the route map for the associated account.
1,035✔
2699
                srv.updateRouteSubscriptionMap(acc, sub, -delta)
1,035✔
2700
                // Gateways
1,035✔
2701
                if srv.gateway.enabled {
1,329✔
2702
                        srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
294✔
2703
                }
294✔
2704
        }
2705
        // Now check on leafnode updates for other leaf nodes.
2706
        acc.updateLeafNodes(sub, -delta)
3,251✔
2707
        return nil
3,251✔
2708
}
2709

2710
func (c *client) processLeafHeaderMsgArgs(arg []byte) error {
478✔
2711
        // Unroll splitArgs to avoid runtime/heap issues
478✔
2712
        a := [MAX_MSG_ARGS][]byte{}
478✔
2713
        args := a[:0]
478✔
2714
        start := -1
478✔
2715
        for i, b := range arg {
31,238✔
2716
                switch b {
30,760✔
2717
                case ' ', '\t', '\r', '\n':
1,362✔
2718
                        if start >= 0 {
2,724✔
2719
                                args = append(args, arg[start:i])
1,362✔
2720
                                start = -1
1,362✔
2721
                        }
1,362✔
2722
                default:
29,398✔
2723
                        if start < 0 {
31,238✔
2724
                                start = i
1,840✔
2725
                        }
1,840✔
2726
                }
2727
        }
2728
        if start >= 0 {
956✔
2729
                args = append(args, arg[start:])
478✔
2730
        }
478✔
2731

2732
        c.pa.arg = arg
478✔
2733
        switch len(args) {
478✔
2734
        case 0, 1, 2:
×
2735
                return fmt.Errorf("processLeafHeaderMsgArgs Parse Error: '%s'", args)
×
2736
        case 3:
90✔
2737
                c.pa.reply = nil
90✔
2738
                c.pa.queues = nil
90✔
2739
                c.pa.hdb = args[1]
90✔
2740
                c.pa.hdr = parseSize(args[1])
90✔
2741
                c.pa.szb = args[2]
90✔
2742
                c.pa.size = parseSize(args[2])
90✔
2743
        case 4:
374✔
2744
                c.pa.reply = args[1]
374✔
2745
                c.pa.queues = nil
374✔
2746
                c.pa.hdb = args[2]
374✔
2747
                c.pa.hdr = parseSize(args[2])
374✔
2748
                c.pa.szb = args[3]
374✔
2749
                c.pa.size = parseSize(args[3])
374✔
2750
        default:
14✔
2751
                // args[1] is our reply indicator. Should be + or | normally.
14✔
2752
                if len(args[1]) != 1 {
14✔
2753
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2754
                }
×
2755
                switch args[1][0] {
14✔
2756
                case '+':
4✔
2757
                        c.pa.reply = args[2]
4✔
2758
                case '|':
10✔
2759
                        c.pa.reply = nil
10✔
2760
                default:
×
2761
                        return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2762
                }
2763
                // Grab header size.
2764
                c.pa.hdb = args[len(args)-2]
14✔
2765
                c.pa.hdr = parseSize(c.pa.hdb)
14✔
2766

14✔
2767
                // Grab size.
14✔
2768
                c.pa.szb = args[len(args)-1]
14✔
2769
                c.pa.size = parseSize(c.pa.szb)
14✔
2770

14✔
2771
                // Grab queue names.
14✔
2772
                if c.pa.reply != nil {
18✔
2773
                        c.pa.queues = args[3 : len(args)-2]
4✔
2774
                } else {
14✔
2775
                        c.pa.queues = args[2 : len(args)-2]
10✔
2776
                }
10✔
2777
        }
2778
        if c.pa.hdr < 0 {
478✔
2779
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
2780
        }
×
2781
        if c.pa.size < 0 {
478✔
2782
                return fmt.Errorf("processLeafHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
2783
        }
×
2784
        if c.pa.hdr > c.pa.size {
479✔
2785
                return fmt.Errorf("processLeafHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
1✔
2786
        }
1✔
2787

2788
        // Common ones processed after check for arg length
2789
        c.pa.subject = args[0]
477✔
2790

477✔
2791
        return nil
477✔
2792
}
2793

2794
func (c *client) processLeafMsgArgs(arg []byte) error {
45,597✔
2795
        // Unroll splitArgs to avoid runtime/heap issues
45,597✔
2796
        a := [MAX_MSG_ARGS][]byte{}
45,597✔
2797
        args := a[:0]
45,597✔
2798
        start := -1
45,597✔
2799
        for i, b := range arg {
1,471,974✔
2800
                switch b {
1,426,377✔
2801
                case ' ', '\t', '\r', '\n':
97,069✔
2802
                        if start >= 0 {
194,138✔
2803
                                args = append(args, arg[start:i])
97,069✔
2804
                                start = -1
97,069✔
2805
                        }
97,069✔
2806
                default:
1,329,308✔
2807
                        if start < 0 {
1,471,974✔
2808
                                start = i
142,666✔
2809
                        }
142,666✔
2810
                }
2811
        }
2812
        if start >= 0 {
91,194✔
2813
                args = append(args, arg[start:])
45,597✔
2814
        }
45,597✔
2815

2816
        c.pa.arg = arg
45,597✔
2817
        switch len(args) {
45,597✔
2818
        case 0, 1:
×
2819
                return fmt.Errorf("processLeafMsgArgs Parse Error: '%s'", args)
×
2820
        case 2:
16,843✔
2821
                c.pa.reply = nil
16,843✔
2822
                c.pa.queues = nil
16,843✔
2823
                c.pa.szb = args[1]
16,843✔
2824
                c.pa.size = parseSize(args[1])
16,843✔
2825
        case 3:
6,198✔
2826
                c.pa.reply = args[1]
6,198✔
2827
                c.pa.queues = nil
6,198✔
2828
                c.pa.szb = args[2]
6,198✔
2829
                c.pa.size = parseSize(args[2])
6,198✔
2830
        default:
22,556✔
2831
                // args[1] is our reply indicator. Should be + or | normally.
22,556✔
2832
                if len(args[1]) != 1 {
22,557✔
2833
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
1✔
2834
                }
1✔
2835
                switch args[1][0] {
22,555✔
2836
                case '+':
161✔
2837
                        c.pa.reply = args[2]
161✔
2838
                case '|':
22,394✔
2839
                        c.pa.reply = nil
22,394✔
2840
                default:
×
2841
                        return fmt.Errorf("processLeafMsgArgs Bad or Missing Reply Indicator: '%s'", args[1])
×
2842
                }
2843
                // Grab size.
2844
                c.pa.szb = args[len(args)-1]
22,555✔
2845
                c.pa.size = parseSize(c.pa.szb)
22,555✔
2846

22,555✔
2847
                // Grab queue names.
22,555✔
2848
                if c.pa.reply != nil {
22,716✔
2849
                        c.pa.queues = args[3 : len(args)-1]
161✔
2850
                } else {
22,555✔
2851
                        c.pa.queues = args[2 : len(args)-1]
22,394✔
2852
                }
22,394✔
2853
        }
2854
        if c.pa.size < 0 {
45,596✔
2855
                return fmt.Errorf("processLeafMsgArgs Bad or Missing Size: '%s'", args)
×
2856
        }
×
2857

2858
        // Common ones processed after check for arg length
2859
        c.pa.subject = args[0]
45,596✔
2860

45,596✔
2861
        return nil
45,596✔
2862
}
2863

2864
// processInboundLeafMsg is called to process an inbound msg from a leaf node.
2865
func (c *client) processInboundLeafMsg(msg []byte) {
44,738✔
2866
        // Update statistics
44,738✔
2867
        // The msg includes the CR_LF, so pull back out for accounting.
44,738✔
2868
        c.in.msgs++
44,738✔
2869
        c.in.bytes += int32(len(msg) - LEN_CR_LF)
44,738✔
2870

44,738✔
2871
        srv, acc, subject := c.srv, c.acc, string(c.pa.subject)
44,738✔
2872

44,738✔
2873
        // Mostly under testing scenarios.
44,738✔
2874
        if srv == nil || acc == nil {
44,740✔
2875
                return
2✔
2876
        }
2✔
2877

2878
        // Match the subscriptions. We will use our own L1 map if
2879
        // it's still valid, avoiding contention on the shared sublist.
2880
        var r *SublistResult
44,736✔
2881
        var ok bool
44,736✔
2882

44,736✔
2883
        genid := atomic.LoadUint64(&c.acc.sl.genid)
44,736✔
2884
        if genid == c.in.genid && c.in.results != nil {
87,130✔
2885
                r, ok = c.in.results[subject]
42,394✔
2886
        } else {
44,736✔
2887
                // Reset our L1 completely.
2,342✔
2888
                c.in.results = make(map[string]*SublistResult)
2,342✔
2889
                c.in.genid = genid
2,342✔
2890
        }
2,342✔
2891

2892
        // Go back to the sublist data structure.
2893
        if !ok {
59,376✔
2894
                r = c.acc.sl.Match(subject)
14,640✔
2895
                // Prune the results cache. Keeps us from unbounded growth. Random delete.
14,640✔
2896
                if len(c.in.results) >= maxResultCacheSize {
14,911✔
2897
                        n := 0
271✔
2898
                        for subj := range c.in.results {
9,214✔
2899
                                delete(c.in.results, subj)
8,943✔
2900
                                if n++; n > pruneSize {
9,214✔
2901
                                        break
271✔
2902
                                }
2903
                        }
2904
                }
2905
                // Then add the new cache entry.
2906
                c.in.results[subject] = r
14,640✔
2907
        }
2908

2909
        // Collect queue names if needed.
2910
        var qnames [][]byte
44,736✔
2911

44,736✔
2912
        // Check for no interest, short circuit if so.
44,736✔
2913
        // This is the fanout scale.
44,736✔
2914
        if len(r.psubs)+len(r.qsubs) > 0 {
89,205✔
2915
                flag := pmrNoFlag
44,469✔
2916
                // If we have queue subs in this cluster, then if we run in gateway
44,469✔
2917
                // mode and the remote gateways have queue subs, then we need to
44,469✔
2918
                // collect the queue groups this message was sent to so that we
44,469✔
2919
                // exclude them when sending to gateways.
44,469✔
2920
                if len(r.qsubs) > 0 && c.srv.gateway.enabled &&
44,469✔
2921
                        atomic.LoadInt64(&c.srv.gateway.totalQSubs) > 0 {
56,743✔
2922
                        flag |= pmrCollectQueueNames
12,274✔
2923
                }
12,274✔
2924
                // If this is a mapped subject that means the mapped interest
2925
                // is what got us here, but this might not have a queue designation
2926
                // If that is the case, make sure we ignore to process local queue subscribers.
2927
                if len(c.pa.mapped) > 0 && len(c.pa.queues) == 0 {
44,779✔
2928
                        flag |= pmrIgnoreEmptyQueueFilter
310✔
2929
                }
310✔
2930
                _, qnames = c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, flag)
44,469✔
2931
        }
2932

2933
        // Now deal with gateways
2934
        if c.srv.gateway.enabled {
57,995✔
2935
                c.sendMsgToGateways(acc, msg, c.pa.subject, c.pa.reply, qnames, true)
13,259✔
2936
        }
13,259✔
2937
}
2938

2939
// Handles a subscription permission violation.
2940
// See leafPermViolation() for details.
2941
func (c *client) leafSubPermViolation(subj []byte) {
304✔
2942
        c.leafPermViolation(false, subj)
304✔
2943
}
304✔
2944

2945
// Common function to process publish or subscribe leafnode permission violation.
2946
// Sends the permission violation error to the remote, logs it and closes the connection.
2947
// If this is from a server soliciting, the reconnection will be delayed.
2948
func (c *client) leafPermViolation(pub bool, subj []byte) {
304✔
2949
        if c.isSpokeLeafNode() {
608✔
2950
                // For spokes these are no-ops since the hub server told us our permissions.
304✔
2951
                // We just need to not send these over to the other side since we will get cutoff.
304✔
2952
                return
304✔
2953
        }
304✔
2954
        // FIXME(dlc) ?
2955
        c.setLeafConnectDelayIfSoliciting(leafNodeReconnectAfterPermViolation)
×
2956
        var action string
×
2957
        if pub {
×
2958
                c.sendErr(fmt.Sprintf("Permissions Violation for Publish to %q", subj))
×
2959
                action = "Publish"
×
2960
        } else {
×
2961
                c.sendErr(fmt.Sprintf("Permissions Violation for Subscription to %q", subj))
×
2962
                action = "Subscription"
×
2963
        }
×
2964
        c.Errorf("%s Violation on %q - Check other side configuration", action, subj)
×
2965
        // TODO: add a new close reason that is more appropriate?
×
2966
        c.closeConnection(ProtocolViolation)
×
2967
}
2968

2969
// Invoked from generic processErr() for LEAF connections.
2970
func (c *client) leafProcessErr(errStr string) {
44✔
2971
        // Check if we got a cluster name collision.
44✔
2972
        if strings.Contains(errStr, ErrLeafNodeHasSameClusterName.Error()) {
46✔
2973
                _, delay := c.setLeafConnectDelayIfSoliciting(leafNodeReconnectDelayAfterClusterNameSame)
2✔
2974
                c.Errorf("Leafnode connection dropped with same cluster name error. Delaying attempt to reconnect for %v", delay)
2✔
2975
                return
2✔
2976
        }
2✔
2977

2978
        // We will look for Loop detected error coming from the other side.
2979
        // If we solicit, set the connect delay.
2980
        if !strings.Contains(errStr, "Loop detected") {
78✔
2981
                return
36✔
2982
        }
36✔
2983
        c.handleLeafNodeLoop(false)
6✔
2984
}
2985

2986
// If this leaf connection solicits, sets the connect delay to the given value,
2987
// or the one from the server option's LeafNode.connDelay if one is set (for tests).
2988
// Returns the connection's account name and delay.
2989
func (c *client) setLeafConnectDelayIfSoliciting(delay time.Duration) (string, time.Duration) {
15✔
2990
        c.mu.Lock()
15✔
2991
        if c.isSolicitedLeafNode() {
24✔
2992
                if s := c.srv; s != nil {
18✔
2993
                        if srvdelay := s.getOpts().LeafNode.connDelay; srvdelay != 0 {
13✔
2994
                                delay = srvdelay
4✔
2995
                        }
4✔
2996
                }
2997
                c.leaf.remote.setConnectDelay(delay)
9✔
2998
        }
2999
        accName := c.acc.Name
15✔
3000
        c.mu.Unlock()
15✔
3001
        return accName, delay
15✔
3002
}
3003

3004
// For the given remote Leafnode configuration, this function returns
3005
// if TLS is required, and if so, will return a clone of the TLS Config
3006
// (since some fields will be changed during handshake), the TLS server
3007
// name that is remembered, and the TLS timeout.
3008
func (c *client) leafNodeGetTLSConfigForSolicit(remote *leafNodeCfg) (bool, *tls.Config, string, float64) {
1,861✔
3009
        var (
1,861✔
3010
                tlsConfig  *tls.Config
1,861✔
3011
                tlsName    string
1,861✔
3012
                tlsTimeout float64
1,861✔
3013
        )
1,861✔
3014

1,861✔
3015
        remote.RLock()
1,861✔
3016
        defer remote.RUnlock()
1,861✔
3017

1,861✔
3018
        tlsRequired := remote.TLS || remote.TLSConfig != nil
1,861✔
3019
        if tlsRequired {
1,937✔
3020
                if remote.TLSConfig != nil {
128✔
3021
                        tlsConfig = remote.TLSConfig.Clone()
52✔
3022
                } else {
76✔
3023
                        tlsConfig = &tls.Config{MinVersion: tls.VersionTLS12}
24✔
3024
                }
24✔
3025
                tlsName = remote.tlsName
76✔
3026
                tlsTimeout = remote.TLSTimeout
76✔
3027
                if tlsTimeout == 0 {
117✔
3028
                        tlsTimeout = float64(TLS_TIMEOUT / time.Second)
41✔
3029
                }
41✔
3030
        }
3031

3032
        return tlsRequired, tlsConfig, tlsName, tlsTimeout
1,861✔
3033
}
3034

3035
// Initiates the LeafNode Websocket connection by:
3036
// - doing the TLS handshake if needed
3037
// - sending the HTTP request
3038
// - waiting for the HTTP response
3039
//
3040
// Since some bufio reader is used to consume the HTTP response, this function
3041
// returns the slice of buffered bytes (if any) so that the readLoop that will
3042
// be started after that consume those first before reading from the socket.
3043
// The boolean
3044
//
3045
// Lock held on entry.
3046
func (c *client) leafNodeSolicitWSConnection(opts *Options, rURL *url.URL, remote *leafNodeCfg) ([]byte, ClosedState, error) {
43✔
3047
        remote.RLock()
43✔
3048
        compress := remote.Websocket.Compression
43✔
3049
        // By default the server will mask outbound frames, but it can be disabled with this option.
43✔
3050
        noMasking := remote.Websocket.NoMasking
43✔
3051
        infoTimeout := remote.FirstInfoTimeout
43✔
3052
        remote.RUnlock()
43✔
3053
        // Will do the client-side TLS handshake if needed.
43✔
3054
        tlsRequired, err := c.leafClientHandshakeIfNeeded(remote, opts)
43✔
3055
        if err != nil {
47✔
3056
                // 0 will indicate that the connection was already closed
4✔
3057
                return nil, 0, err
4✔
3058
        }
4✔
3059

3060
        // For http request, we need the passed URL to contain either http or https scheme.
3061
        scheme := "http"
39✔
3062
        if tlsRequired {
47✔
3063
                scheme = "https"
8✔
3064
        }
8✔
3065
        // We will use the `/leafnode` path to tell the accepting WS server that it should
3066
        // create a LEAF connection, not a CLIENT.
3067
        // In case we use the user's URL path in the future, make sure we append the user's
3068
        // path to our `/leafnode` path.
3069
        lpath := leafNodeWSPath
39✔
3070
        if curPath := rURL.EscapedPath(); curPath != _EMPTY_ {
60✔
3071
                if curPath[0] == '/' {
42✔
3072
                        curPath = curPath[1:]
21✔
3073
                }
21✔
3074
                lpath = path.Join(curPath, lpath)
21✔
3075
        } else {
18✔
3076
                lpath = lpath[1:]
18✔
3077
        }
18✔
3078
        ustr := fmt.Sprintf("%s://%s/%s", scheme, rURL.Host, lpath)
39✔
3079
        u, _ := url.Parse(ustr)
39✔
3080
        req := &http.Request{
39✔
3081
                Method:     "GET",
39✔
3082
                URL:        u,
39✔
3083
                Proto:      "HTTP/1.1",
39✔
3084
                ProtoMajor: 1,
39✔
3085
                ProtoMinor: 1,
39✔
3086
                Header:     make(http.Header),
39✔
3087
                Host:       u.Host,
39✔
3088
        }
39✔
3089
        wsKey, err := wsMakeChallengeKey()
39✔
3090
        if err != nil {
39✔
3091
                return nil, WriteError, err
×
3092
        }
×
3093

3094
        req.Header["Upgrade"] = []string{"websocket"}
39✔
3095
        req.Header["Connection"] = []string{"Upgrade"}
39✔
3096
        req.Header["Sec-WebSocket-Key"] = []string{wsKey}
39✔
3097
        req.Header["Sec-WebSocket-Version"] = []string{"13"}
39✔
3098
        if compress {
48✔
3099
                req.Header.Add("Sec-WebSocket-Extensions", wsPMCReqHeaderValue)
9✔
3100
        }
9✔
3101
        if noMasking {
49✔
3102
                req.Header.Add(wsNoMaskingHeader, wsNoMaskingValue)
10✔
3103
        }
10✔
3104
        c.nc.SetDeadline(time.Now().Add(infoTimeout))
39✔
3105
        if err := req.Write(c.nc); err != nil {
39✔
3106
                return nil, WriteError, err
×
3107
        }
×
3108

3109
        var resp *http.Response
39✔
3110

39✔
3111
        br := bufio.NewReaderSize(c.nc, MAX_CONTROL_LINE_SIZE)
39✔
3112
        resp, err = http.ReadResponse(br, req)
39✔
3113
        if err == nil &&
39✔
3114
                (resp.StatusCode != 101 ||
39✔
3115
                        !strings.EqualFold(resp.Header.Get("Upgrade"), "websocket") ||
39✔
3116
                        !strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
39✔
3117
                        resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
40✔
3118

1✔
3119
                err = fmt.Errorf("invalid websocket connection")
1✔
3120
        }
1✔
3121
        // Check compression extension...
3122
        if err == nil && c.ws.compress {
48✔
3123
                // Check that not only permessage-deflate extension is present, but that
9✔
3124
                // we also have server and client no context take over.
9✔
3125
                srvCompress, noCtxTakeover := wsPMCExtensionSupport(resp.Header, false)
9✔
3126

9✔
3127
                // If server does not support compression, then simply disable it in our side.
9✔
3128
                if !srvCompress {
13✔
3129
                        c.ws.compress = false
4✔
3130
                } else if !noCtxTakeover {
9✔
3131
                        err = fmt.Errorf("compression negotiation error")
×
3132
                }
×
3133
        }
3134
        // Same for no masking...
3135
        if err == nil && noMasking {
49✔
3136
                // Check if server accepts no masking
10✔
3137
                if resp.Header.Get(wsNoMaskingHeader) != wsNoMaskingValue {
11✔
3138
                        // Nope, need to mask our writes as any client would do.
1✔
3139
                        c.ws.maskwrite = true
1✔
3140
                }
1✔
3141
        }
3142
        if resp != nil {
67✔
3143
                resp.Body.Close()
28✔
3144
        }
28✔
3145
        if err != nil {
51✔
3146
                return nil, ReadError, err
12✔
3147
        }
12✔
3148
        c.Debugf("Leafnode compression=%v masking=%v", c.ws.compress, c.ws.maskwrite)
27✔
3149

27✔
3150
        var preBuf []byte
27✔
3151
        // We have to slurp whatever is in the bufio reader and pass that to the readloop.
27✔
3152
        if n := br.Buffered(); n != 0 {
27✔
3153
                preBuf, _ = br.Peek(n)
×
3154
        }
×
3155
        return preBuf, 0, nil
27✔
3156
}
3157

3158
const connectProcessTimeout = 2 * time.Second
3159

3160
// This is invoked for remote LEAF remote connections after processing the INFO
3161
// protocol.
3162
func (s *Server) leafNodeResumeConnectProcess(c *client) {
655✔
3163
        clusterName := s.ClusterName()
655✔
3164

655✔
3165
        c.mu.Lock()
655✔
3166
        if c.isClosed() {
655✔
3167
                c.mu.Unlock()
×
3168
                return
×
3169
        }
×
3170
        if err := c.sendLeafConnect(clusterName, c.headers); err != nil {
657✔
3171
                c.mu.Unlock()
2✔
3172
                c.closeConnection(WriteError)
2✔
3173
                return
2✔
3174
        }
2✔
3175

3176
        // Spin up the write loop.
3177
        s.startGoRoutine(func() { c.writeLoop() })
1,306✔
3178

3179
        // timeout leafNodeFinishConnectProcess
3180
        c.ping.tmr = time.AfterFunc(connectProcessTimeout, func() {
653✔
3181
                c.mu.Lock()
×
3182
                // check if leafNodeFinishConnectProcess was called and prevent later leafNodeFinishConnectProcess
×
3183
                if !c.flags.setIfNotSet(connectProcessFinished) {
×
3184
                        c.mu.Unlock()
×
3185
                        return
×
3186
                }
×
3187
                clearTimer(&c.ping.tmr)
×
3188
                closed := c.isClosed()
×
3189
                c.mu.Unlock()
×
3190
                if !closed {
×
3191
                        c.sendErrAndDebug("Stale Leaf Node Connection - Closing")
×
3192
                        c.closeConnection(StaleConnection)
×
3193
                }
×
3194
        })
3195
        c.mu.Unlock()
653✔
3196
        c.Debugf("Remote leafnode connect msg sent")
653✔
3197
}
3198

3199
// This is invoked for remote LEAF connections after processing the INFO
3200
// protocol and leafNodeResumeConnectProcess.
3201
// This will send LS+ the CONNECT protocol and register the leaf node.
3202
func (s *Server) leafNodeFinishConnectProcess(c *client) {
619✔
3203
        c.mu.Lock()
619✔
3204
        if !c.flags.setIfNotSet(connectProcessFinished) {
619✔
3205
                c.mu.Unlock()
×
3206
                return
×
3207
        }
×
3208
        if c.isClosed() {
619✔
3209
                c.mu.Unlock()
×
3210
                s.removeLeafNodeConnection(c)
×
3211
                return
×
3212
        }
×
3213
        remote := c.leaf.remote
619✔
3214
        // Check if we will need to send the system connect event.
619✔
3215
        remote.RLock()
619✔
3216
        sendSysConnectEvent := remote.Hub
619✔
3217
        remote.RUnlock()
619✔
3218

619✔
3219
        // Capture account before releasing lock
619✔
3220
        acc := c.acc
619✔
3221
        // cancel connectProcessTimeout
619✔
3222
        clearTimer(&c.ping.tmr)
619✔
3223
        c.mu.Unlock()
619✔
3224

619✔
3225
        // Make sure we register with the account here.
619✔
3226
        if err := c.registerWithAccount(acc); err != nil {
621✔
3227
                if err == ErrTooManyAccountConnections {
2✔
3228
                        c.maxAccountConnExceeded()
×
3229
                        return
×
3230
                } else if err == ErrLeafNodeLoop {
4✔
3231
                        c.handleLeafNodeLoop(true)
2✔
3232
                        return
2✔
3233
                }
2✔
3234
                c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
×
3235
                c.closeConnection(ProtocolViolation)
×
3236
                return
×
3237
        }
3238
        s.addLeafNodeConnection(c, _EMPTY_, _EMPTY_, false)
617✔
3239
        s.initLeafNodeSmapAndSendSubs(c)
617✔
3240
        if sendSysConnectEvent {
623✔
3241
                s.sendLeafNodeConnect(acc)
6✔
3242
        }
6✔
3243

3244
        // The above functions are not atomically under the client
3245
        // lock doing those operations. It is possible - since we
3246
        // have started the read/write loops - that the connection
3247
        // is closed before or in between. This would leave the
3248
        // closed LN connection possible registered with the account
3249
        // and/or the server's leafs map. So check if connection
3250
        // is closed, and if so, manually cleanup.
3251
        c.mu.Lock()
617✔
3252
        closed := c.isClosed()
617✔
3253
        if !closed {
1,233✔
3254
                c.setFirstPingTimer()
616✔
3255
        }
616✔
3256
        c.mu.Unlock()
617✔
3257
        if closed {
618✔
3258
                s.removeLeafNodeConnection(c)
1✔
3259
                if prev := acc.removeClient(c); prev == 1 {
2✔
3260
                        s.decActiveAccounts()
1✔
3261
                }
1✔
3262
        }
3263
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc