• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 19188141845

07 Nov 2025 01:16PM UTC coverage: 84.586% (-1.4%) from 86.033%
19188141845

push

github

web-flow
Add meta snapshot metrics to jsz monitoring (#7524)

Exposes snapshot related metrics under /jsz

```js
"meta_cluster": {
    "pending": 0,
    "snapshot": {
        "pending_entries": 1,
        "pending_size": 1314,
        "last_time": "2025-11-06T18:14:40.659678019Z", # UTC
        "last_duration": 161096363
     }
}
```

Signed-off-by: Waldemar Quevedo <wally@nats.io>

73649 of 87070 relevant lines covered (84.59%)

340562.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.07
/server/route.go
1
// Copyright 2013-2025 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "bytes"
18
        "crypto/tls"
19
        "encoding/json"
20
        "fmt"
21
        "hash/fnv"
22
        "math/rand"
23
        "net"
24
        "net/url"
25
        "reflect"
26
        "runtime"
27
        "strconv"
28
        "strings"
29
        "sync/atomic"
30
        "time"
31

32
        "github.com/klauspost/compress/s2"
33
)
34

35
// RouteType designates the router type
36
type RouteType int
37

38
// Type of Route
39
const (
40
        // This route we learned from speaking to other routes.
41
        Implicit RouteType = iota
42
        // This route was explicitly configured.
43
        Explicit
44
)
45

46
// Include the space for the proto
47
var (
48
        aSubBytes   = []byte{'A', '+', ' '}
49
        aUnsubBytes = []byte{'A', '-', ' '}
50
        rSubBytes   = []byte{'R', 'S', '+', ' '}
51
        rUnsubBytes = []byte{'R', 'S', '-', ' '}
52
        lSubBytes   = []byte{'L', 'S', '+', ' '}
53
        lUnsubBytes = []byte{'L', 'S', '-', ' '}
54
)
55

56
type route struct {
57
        remoteID     string
58
        remoteName   string
59
        didSolicit   bool
60
        retry        bool
61
        lnoc         bool
62
        lnocu        bool
63
        routeType    RouteType
64
        url          *url.URL
65
        authRequired bool
66
        tlsRequired  bool
67
        jetstream    bool
68
        connectURLs  []string
69
        wsConnURLs   []string
70
        gatewayURL   string
71
        leafnodeURL  string
72
        hash         string
73
        idHash       string
74
        // Location of the route in the slice: s.routes[remoteID][]*client.
75
        // Initialized to -1 on creation, as to indicate that it is not
76
        // added to the list.
77
        poolIdx int
78
        // If this is set, it means that the route is dedicated for this
79
        // account and the account name will not be included in protocols.
80
        accName []byte
81
        // This is set to true if this is a route connection to an old
82
        // server or a server that has pooling completely disabled.
83
        noPool bool
84
        // Selected compression mode, which may be different from the
85
        // server configured mode.
86
        compression string
87
        // Transient value used to set the Info.GossipMode when initiating
88
        // an implicit route and sending to the remote.
89
        gossipMode byte
90
        // This will be set in case of pooling so that a route can trigger
91
        // the creation of the next after receiving the first PONG, ensuring
92
        // that authentication did not fail.
93
        startNewRoute *routeInfo
94
}
95

96
// This contains the information required to create a new route.
97
type routeInfo struct {
98
        url        *url.URL
99
        rtype      RouteType
100
        gossipMode byte
101
}
102

103
// Do not change the values/order since they are exchanged between servers.
104
const (
105
        gossipDefault = byte(iota)
106
        gossipDisabled
107
        gossipOverride
108
)
109

110
type connectInfo struct {
111
        Echo     bool   `json:"echo"`
112
        Verbose  bool   `json:"verbose"`
113
        Pedantic bool   `json:"pedantic"`
114
        User     string `json:"user,omitempty"`
115
        Pass     string `json:"pass,omitempty"`
116
        TLS      bool   `json:"tls_required"`
117
        Headers  bool   `json:"headers"`
118
        Name     string `json:"name"`
119
        Cluster  string `json:"cluster"`
120
        Dynamic  bool   `json:"cluster_dynamic,omitempty"`
121
        LNOC     bool   `json:"lnoc,omitempty"`
122
        LNOCU    bool   `json:"lnocu,omitempty"` // Support for LS- with origin cluster name
123
        Gateway  string `json:"gateway,omitempty"`
124
}
125

126
// Route protocol constants
127
const (
128
        ConProto  = "CONNECT %s" + _CRLF_
129
        InfoProto = "INFO %s" + _CRLF_
130
)
131

132
const (
133
        // Warning when user configures cluster TLS insecure
134
        clusterTLSInsecureWarning = "TLS certificate chain and hostname of solicited routes will not be verified. DO NOT USE IN PRODUCTION!"
135

136
        // The default ping interval is set to 2 minutes, which is fine for client
137
        // connections, etc.. but for route compression, the CompressionS2Auto
138
        // mode uses RTT measurements (ping/pong) to decide which compression level
139
        // to use, we want the interval to not be that high.
140
        defaultRouteMaxPingInterval = 30 * time.Second
141
)
142

143
// Can be changed for tests
144
var (
145
        routeConnectDelay    = DEFAULT_ROUTE_CONNECT
146
        routeConnectMaxDelay = DEFAULT_ROUTE_CONNECT_MAX
147
        routeMaxPingInterval = defaultRouteMaxPingInterval
148
)
149

150
// removeReplySub is called when we trip the max on remoteReply subs.
151
func (c *client) removeReplySub(sub *subscription) {
×
152
        if sub == nil {
×
153
                return
×
154
        }
×
155
        // Lookup the account based on sub.sid.
156
        if i := bytes.Index(sub.sid, []byte(" ")); i > 0 {
×
157
                // First part of SID for route is account name.
×
158
                if v, ok := c.srv.accounts.Load(bytesToString(sub.sid[:i])); ok {
×
159
                        (v.(*Account)).sl.Remove(sub)
×
160
                }
×
161
                c.mu.Lock()
×
162
                delete(c.subs, bytesToString(sub.sid))
×
163
                c.mu.Unlock()
×
164
        }
165
}
166

167
func (c *client) processAccountSub(arg []byte) error {
4✔
168
        if c.kind == GATEWAY {
8✔
169
                return c.processGatewayAccountSub(string(arg))
4✔
170
        }
4✔
171
        return nil
×
172
}
173

174
func (c *client) processAccountUnsub(arg []byte) {
28✔
175
        if c.kind == GATEWAY {
56✔
176
                c.processGatewayAccountUnsub(string(arg))
28✔
177
        }
28✔
178
}
179

180
// Process an inbound LMSG specification from the remote route. This means
181
// we have an origin cluster and we force header semantics.
182
func (c *client) processRoutedOriginClusterMsgArgs(arg []byte) error {
7,997✔
183
        // Unroll splitArgs to avoid runtime/heap issues
7,997✔
184
        a := [MAX_HMSG_ARGS + 1][]byte{}
7,997✔
185
        args := a[:0]
7,997✔
186
        start := -1
7,997✔
187
        for i, b := range arg {
314,467✔
188
                switch b {
306,470✔
189
                case ' ', '\t', '\r', '\n':
38,967✔
190
                        if start >= 0 {
77,934✔
191
                                args = append(args, arg[start:i])
38,967✔
192
                                start = -1
38,967✔
193
                        }
38,967✔
194
                default:
267,503✔
195
                        if start < 0 {
314,467✔
196
                                start = i
46,964✔
197
                        }
46,964✔
198
                }
199
        }
200
        if start >= 0 {
15,994✔
201
                args = append(args, arg[start:])
7,997✔
202
        }
7,997✔
203

204
        var an []byte
7,997✔
205
        if c.kind == ROUTER {
15,994✔
206
                if an = c.route.accName; len(an) > 0 && len(args) > 2 {
11,107✔
207
                        args = append(args[:2], args[1:]...)
3,110✔
208
                        args[1] = an
3,110✔
209
                }
3,110✔
210
        }
211
        c.pa.arg = arg
7,997✔
212
        switch len(args) {
7,997✔
213
        case 0, 1, 2, 3, 4:
×
214
                return fmt.Errorf("processRoutedOriginClusterMsgArgs Parse Error: '%s'", args)
×
215
        case 5:
2,671✔
216
                c.pa.reply = nil
2,671✔
217
                c.pa.queues = nil
2,671✔
218
                c.pa.hdb = args[3]
2,671✔
219
                c.pa.hdr = parseSize(args[3])
2,671✔
220
                c.pa.szb = args[4]
2,671✔
221
                c.pa.size = parseSize(args[4])
2,671✔
222
        case 6:
568✔
223
                c.pa.reply = args[3]
568✔
224
                c.pa.queues = nil
568✔
225
                c.pa.hdb = args[4]
568✔
226
                c.pa.hdr = parseSize(args[4])
568✔
227
                c.pa.szb = args[5]
568✔
228
                c.pa.size = parseSize(args[5])
568✔
229
        default:
4,758✔
230
                // args[2] is our reply indicator. Should be + or | normally.
4,758✔
231
                if len(args[3]) != 1 {
4,758✔
232
                        return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Reply Indicator: '%s'", args[3])
×
233
                }
×
234
                switch args[3][0] {
4,758✔
235
                case '+':
4✔
236
                        c.pa.reply = args[4]
4✔
237
                case '|':
4,754✔
238
                        c.pa.reply = nil
4,754✔
239
                default:
×
240
                        return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Reply Indicator: '%s'", args[3])
×
241
                }
242

243
                // Grab header size.
244
                c.pa.hdb = args[len(args)-2]
4,758✔
245
                c.pa.hdr = parseSize(c.pa.hdb)
4,758✔
246

4,758✔
247
                // Grab size.
4,758✔
248
                c.pa.szb = args[len(args)-1]
4,758✔
249
                c.pa.size = parseSize(c.pa.szb)
4,758✔
250

4,758✔
251
                // Grab queue names.
4,758✔
252
                if c.pa.reply != nil {
4,762✔
253
                        c.pa.queues = args[5 : len(args)-2]
4✔
254
                } else {
4,758✔
255
                        c.pa.queues = args[4 : len(args)-2]
4,754✔
256
                }
4,754✔
257
        }
258
        if c.pa.hdr < 0 {
7,997✔
259
                return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Header Size: '%s'", arg)
×
260
        }
×
261
        if c.pa.size < 0 {
7,997✔
262
                return fmt.Errorf("processRoutedOriginClusterMsgArgs Bad or Missing Size: '%s'", args)
×
263
        }
×
264
        if c.pa.hdr > c.pa.size {
7,997✔
265
                return fmt.Errorf("processRoutedOriginClusterMsgArgs Header Size larger then TotalSize: '%s'", arg)
×
266
        }
×
267

268
        // Common ones processed after check for arg length
269
        c.pa.origin = args[0]
7,997✔
270
        c.pa.account = args[1]
7,997✔
271
        c.pa.subject = args[2]
7,997✔
272
        if len(an) > 0 {
11,107✔
273
                c.pa.pacache = c.pa.subject
3,110✔
274
        } else {
7,997✔
275
                c.pa.pacache = arg[len(args[0])+1 : len(args[0])+len(args[1])+len(args[2])+2]
4,887✔
276
        }
4,887✔
277
        return nil
7,997✔
278
}
279

280
// Process an inbound HMSG specification from the remote route.
281
func (c *client) processRoutedHeaderMsgArgs(arg []byte) error {
501,365✔
282
        // Unroll splitArgs to avoid runtime/heap issues
501,365✔
283
        a := [MAX_HMSG_ARGS][]byte{}
501,365✔
284
        args := a[:0]
501,365✔
285
        var an []byte
501,365✔
286
        if c.kind == ROUTER {
997,626✔
287
                if an = c.route.accName; len(an) > 0 {
587,331✔
288
                        args = append(args, an)
91,070✔
289
                }
91,070✔
290
        }
291
        start := -1
501,365✔
292
        for i, b := range arg {
39,322,070✔
293
                switch b {
38,820,705✔
294
                case ' ', '\t', '\r', '\n':
1,901,699✔
295
                        if start >= 0 {
3,803,398✔
296
                                args = append(args, arg[start:i])
1,901,699✔
297
                                start = -1
1,901,699✔
298
                        }
1,901,699✔
299
                default:
36,919,006✔
300
                        if start < 0 {
39,322,070✔
301
                                start = i
2,403,064✔
302
                        }
2,403,064✔
303
                }
304
        }
305
        if start >= 0 {
1,002,730✔
306
                args = append(args, arg[start:])
501,365✔
307
        }
501,365✔
308

309
        c.pa.arg = arg
501,365✔
310
        switch len(args) {
501,365✔
311
        case 0, 1, 2, 3:
×
312
                return fmt.Errorf("processRoutedHeaderMsgArgs Parse Error: '%s'", args)
×
313
        case 4:
13,817✔
314
                c.pa.reply = nil
13,817✔
315
                c.pa.queues = nil
13,817✔
316
                c.pa.hdb = args[2]
13,817✔
317
                c.pa.hdr = parseSize(args[2])
13,817✔
318
                c.pa.szb = args[3]
13,817✔
319
                c.pa.size = parseSize(args[3])
13,817✔
320
        case 5:
486,982✔
321
                c.pa.reply = args[2]
486,982✔
322
                c.pa.queues = nil
486,982✔
323
                c.pa.hdb = args[3]
486,982✔
324
                c.pa.hdr = parseSize(args[3])
486,982✔
325
                c.pa.szb = args[4]
486,982✔
326
                c.pa.size = parseSize(args[4])
486,982✔
327
        default:
566✔
328
                // args[2] is our reply indicator. Should be + or | normally.
566✔
329
                if len(args[2]) != 1 {
566✔
330
                        return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[2])
×
331
                }
×
332
                switch args[2][0] {
566✔
333
                case '+':
552✔
334
                        c.pa.reply = args[3]
552✔
335
                case '|':
14✔
336
                        c.pa.reply = nil
14✔
337
                default:
×
338
                        return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Reply Indicator: '%s'", args[2])
×
339
                }
340

341
                // Grab header size.
342
                c.pa.hdb = args[len(args)-2]
566✔
343
                c.pa.hdr = parseSize(c.pa.hdb)
566✔
344

566✔
345
                // Grab size.
566✔
346
                c.pa.szb = args[len(args)-1]
566✔
347
                c.pa.size = parseSize(c.pa.szb)
566✔
348

566✔
349
                // Grab queue names.
566✔
350
                if c.pa.reply != nil {
1,118✔
351
                        c.pa.queues = args[4 : len(args)-2]
552✔
352
                } else {
566✔
353
                        c.pa.queues = args[3 : len(args)-2]
14✔
354
                }
14✔
355
        }
356
        if c.pa.hdr < 0 {
501,365✔
357
                return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
×
358
        }
×
359
        if c.pa.size < 0 {
501,365✔
360
                return fmt.Errorf("processRoutedHeaderMsgArgs Bad or Missing Size: '%s'", args)
×
361
        }
×
362
        if c.pa.hdr > c.pa.size {
501,366✔
363
                return fmt.Errorf("processRoutedHeaderMsgArgs Header Size larger then TotalSize: '%s'", arg)
1✔
364
        }
1✔
365

366
        // Common ones processed after check for arg length
367
        c.pa.account = args[0]
501,364✔
368
        c.pa.subject = args[1]
501,364✔
369
        if len(an) > 0 {
592,434✔
370
                c.pa.pacache = c.pa.subject
91,070✔
371
        } else {
501,364✔
372
                c.pa.pacache = arg[:len(args[0])+len(args[1])+1]
410,294✔
373
        }
410,294✔
374
        return nil
501,364✔
375
}
376

377
// Process an inbound RMSG or LMSG specification from the remote route.
378
func (c *client) processRoutedMsgArgs(arg []byte) error {
6,369,544✔
379
        // Unroll splitArgs to avoid runtime/heap issues
6,369,544✔
380
        a := [MAX_RMSG_ARGS][]byte{}
6,369,544✔
381
        args := a[:0]
6,369,544✔
382
        var an []byte
6,369,544✔
383
        if c.kind == ROUTER {
12,441,850✔
384
                if an = c.route.accName; len(an) > 0 {
11,284,859✔
385
                        args = append(args, an)
5,212,553✔
386
                }
5,212,553✔
387
        }
388
        start := -1
6,369,544✔
389
        for i, b := range arg {
234,627,507✔
390
                switch b {
228,257,963✔
391
                case ' ', '\t', '\r', '\n':
10,820,503✔
392
                        if start >= 0 {
21,641,006✔
393
                                args = append(args, arg[start:i])
10,820,503✔
394
                                start = -1
10,820,503✔
395
                        }
10,820,503✔
396
                default:
217,437,460✔
397
                        if start < 0 {
234,627,507✔
398
                                start = i
17,190,047✔
399
                        }
17,190,047✔
400
                }
401
        }
402
        if start >= 0 {
12,739,088✔
403
                args = append(args, arg[start:])
6,369,544✔
404
        }
6,369,544✔
405

406
        c.pa.arg = arg
6,369,544✔
407
        switch len(args) {
6,369,544✔
408
        case 0, 1, 2:
1✔
409
                return fmt.Errorf("processRoutedMsgArgs Parse Error: '%s'", args)
1✔
410
        case 3:
3,134,183✔
411
                c.pa.reply = nil
3,134,183✔
412
                c.pa.queues = nil
3,134,183✔
413
                c.pa.szb = args[2]
3,134,183✔
414
                c.pa.size = parseSize(args[2])
3,134,183✔
415
        case 4:
3,189,042✔
416
                c.pa.reply = args[2]
3,189,042✔
417
                c.pa.queues = nil
3,189,042✔
418
                c.pa.szb = args[3]
3,189,042✔
419
                c.pa.size = parseSize(args[3])
3,189,042✔
420
        default:
46,318✔
421
                // args[2] is our reply indicator. Should be + or | normally.
46,318✔
422
                if len(args[2]) != 1 {
46,318✔
423
                        return fmt.Errorf("processRoutedMsgArgs Bad or Missing Reply Indicator: '%s'", args[2])
×
424
                }
×
425
                switch args[2][0] {
46,318✔
426
                case '+':
11,197✔
427
                        c.pa.reply = args[3]
11,197✔
428
                case '|':
35,121✔
429
                        c.pa.reply = nil
35,121✔
430
                default:
×
431
                        return fmt.Errorf("processRoutedMsgArgs Bad or Missing Reply Indicator: '%s'", args[2])
×
432
                }
433
                // Grab size.
434
                c.pa.szb = args[len(args)-1]
46,318✔
435
                c.pa.size = parseSize(c.pa.szb)
46,318✔
436

46,318✔
437
                // Grab queue names.
46,318✔
438
                if c.pa.reply != nil {
57,515✔
439
                        c.pa.queues = args[4 : len(args)-1]
11,197✔
440
                } else {
46,318✔
441
                        c.pa.queues = args[3 : len(args)-1]
35,121✔
442
                }
35,121✔
443
        }
444
        if c.pa.size < 0 {
6,369,543✔
445
                return fmt.Errorf("processRoutedMsgArgs Bad or Missing Size: '%s'", args)
×
446
        }
×
447

448
        // Common ones processed after check for arg length
449
        c.pa.account = args[0]
6,369,543✔
450
        c.pa.subject = args[1]
6,369,543✔
451
        if len(an) > 0 {
11,582,096✔
452
                c.pa.pacache = c.pa.subject
5,212,553✔
453
        } else {
6,369,543✔
454
                c.pa.pacache = arg[:len(args[0])+len(args[1])+1]
1,156,990✔
455
        }
1,156,990✔
456
        return nil
6,369,543✔
457
}
458

459
// processInboundRoutedMsg is called to process an inbound msg from a route.
460
func (c *client) processInboundRoutedMsg(msg []byte) {
5,953,231✔
461
        // Update statistics
5,953,231✔
462
        c.in.msgs++
5,953,231✔
463
        // The msg includes the CR_LF, so pull back out for accounting.
5,953,231✔
464
        size := len(msg) - LEN_CR_LF
5,953,231✔
465
        c.in.bytes += int32(size)
5,953,231✔
466

5,953,231✔
467
        if c.opts.Verbose {
5,953,231✔
468
                c.sendOK()
×
469
        }
×
470

471
        // Mostly under testing scenarios.
472
        if c.srv == nil {
5,953,231✔
473
                return
×
474
        }
×
475

476
        // If the subject (c.pa.subject) has the gateway prefix, this function will handle it.
477
        if c.handleGatewayReply(msg) {
6,035,834✔
478
                // We are done here.
82,603✔
479
                return
82,603✔
480
        }
82,603✔
481

482
        acc, r := c.getAccAndResultFromCache()
5,870,628✔
483
        if acc == nil {
5,870,629✔
484
                c.Debugf("Unknown account %q for routed message on subject: %q", c.pa.account, c.pa.subject)
1✔
485
                return
1✔
486
        }
1✔
487

488
        acc.stats.Lock()
5,870,627✔
489
        acc.stats.inMsgs++
5,870,627✔
490
        acc.stats.inBytes += int64(size)
5,870,627✔
491
        acc.stats.rt.inMsgs++
5,870,627✔
492
        acc.stats.rt.inBytes += int64(size)
5,870,627✔
493
        acc.stats.Unlock()
5,870,627✔
494

5,870,627✔
495
        // Check for no interest, short circuit if so.
5,870,627✔
496
        // This is the fanout scale.
5,870,627✔
497
        if len(r.psubs)+len(r.qsubs) > 0 {
11,684,449✔
498
                c.processMsgResults(acc, r, msg, nil, c.pa.subject, c.pa.reply, pmrNoFlag)
5,813,822✔
499
        }
5,813,822✔
500
}
501

502
// Lock should be held entering here.
503
func (c *client) sendRouteConnect(clusterName string, tlsRequired bool) error {
30,609✔
504
        var user, pass string
30,609✔
505
        if userInfo := c.route.url.User; userInfo != nil {
30,698✔
506
                user = userInfo.Username()
89✔
507
                pass, _ = userInfo.Password()
89✔
508
        }
89✔
509
        s := c.srv
30,609✔
510
        cinfo := connectInfo{
30,609✔
511
                Echo:     true,
30,609✔
512
                Verbose:  false,
30,609✔
513
                Pedantic: false,
30,609✔
514
                User:     user,
30,609✔
515
                Pass:     pass,
30,609✔
516
                TLS:      tlsRequired,
30,609✔
517
                Name:     s.info.ID,
30,609✔
518
                Headers:  s.supportsHeaders(),
30,609✔
519
                Cluster:  clusterName,
30,609✔
520
                Dynamic:  s.isClusterNameDynamic(),
30,609✔
521
                LNOC:     true,
30,609✔
522
        }
30,609✔
523

30,609✔
524
        b, err := json.Marshal(cinfo)
30,609✔
525
        if err != nil {
30,609✔
526
                c.Errorf("Error marshaling CONNECT to route: %v\n", err)
×
527
                return err
×
528
        }
×
529
        c.enqueueProto([]byte(fmt.Sprintf(ConProto, b)))
30,609✔
530
        return nil
30,609✔
531
}
532

533
// Returns a route pool index for this account based on the given pool size.
534
// If `poolSize` is smaller or equal to 1, the returned value will always
535
// be 0, regardless of the account name. If not, the returned value will
536
// be in the range [0..poolSize-1]. The value for a given account name
537
// is constant and same on all servers (given the same `poolSize` value).
538
func computeRoutePoolIdx(poolSize int, an string) int {
13,291✔
539
        if poolSize <= 1 {
21,395✔
540
                return 0
8,104✔
541
        }
8,104✔
542
        h := fnv.New32a()
5,187✔
543
        h.Write([]byte(an))
5,187✔
544
        sum32 := h.Sum32()
5,187✔
545
        return int((sum32 % uint32(poolSize)))
5,187✔
546
}
547

548
// Process the info message if we are a route.
549
func (c *client) processRouteInfo(info *Info) {
66,789✔
550

66,789✔
551
        supportsHeaders := c.srv.supportsHeaders()
66,789✔
552
        clusterName := c.srv.ClusterName()
66,789✔
553
        srvName := c.srv.Name()
66,789✔
554

66,789✔
555
        c.mu.Lock()
66,789✔
556
        // Connection can be closed at any time (by auth timeout, etc).
66,789✔
557
        // Does not make sense to continue here if connection is gone.
66,789✔
558
        if c.route == nil || c.isClosed() {
66,799✔
559
                c.mu.Unlock()
10✔
560
                return
10✔
561
        }
10✔
562

563
        s := c.srv
66,779✔
564

66,779✔
565
        // Detect route to self.
66,779✔
566
        if info.ID == s.info.ID {
73,034✔
567
                // Need to set this so that the close does the right thing
6,255✔
568
                c.route.remoteID = info.ID
6,255✔
569
                c.mu.Unlock()
6,255✔
570
                c.closeConnection(DuplicateRoute)
6,255✔
571
                return
6,255✔
572
        }
6,255✔
573

574
        // Detect if we have a mismatch of cluster names.
575
        if info.Cluster != "" && info.Cluster != clusterName {
60,562✔
576
                c.mu.Unlock()
38✔
577
                // If we are dynamic we may update our cluster name.
38✔
578
                // Use other if remote is non dynamic or their name is "bigger"
38✔
579
                if s.isClusterNameDynamic() && (!info.Dynamic || (strings.Compare(clusterName, info.Cluster) < 0)) {
51✔
580
                        s.setClusterName(info.Cluster)
13✔
581
                        s.removeAllRoutesExcept(info.ID)
13✔
582
                        c.mu.Lock()
13✔
583
                } else {
38✔
584
                        c.closeConnection(ClusterNameConflict)
25✔
585
                        return
25✔
586
                }
25✔
587
        }
588

589
        opts := s.getOpts()
60,499✔
590

60,499✔
591
        didSolicit := c.route.didSolicit
60,499✔
592

60,499✔
593
        // If this is an async INFO from an existing route...
60,499✔
594
        if c.flags.isSet(infoReceived) {
79,574✔
595
                remoteID := c.route.remoteID
19,075✔
596

19,075✔
597
                // Check if this is an INFO about adding a per-account route during
19,075✔
598
                // a configuration reload.
19,075✔
599
                if info.RouteAccReqID != _EMPTY_ {
19,120✔
600
                        c.mu.Unlock()
45✔
601

45✔
602
                        // If there is an account name, then the remote server is telling
45✔
603
                        // us that this account will now have its dedicated route.
45✔
604
                        if an := info.RouteAccount; an != _EMPTY_ {
68✔
605
                                acc, err := s.LookupAccount(an)
23✔
606
                                if err != nil {
23✔
607
                                        s.Errorf("Error looking up account %q: %v", an, err)
×
608
                                        return
×
609
                                }
×
610
                                s.mu.Lock()
23✔
611
                                // If running without system account and adding a dedicated
23✔
612
                                // route for an account for the first time, it could be that
23✔
613
                                // the map is nil. If so, create it.
23✔
614
                                if s.accRoutes == nil {
24✔
615
                                        s.accRoutes = make(map[string]map[string]*client)
1✔
616
                                }
1✔
617
                                if _, ok := s.accRoutes[an]; !ok {
32✔
618
                                        s.accRoutes[an] = make(map[string]*client)
9✔
619
                                }
9✔
620
                                acc.mu.Lock()
23✔
621
                                sl := acc.sl
23✔
622
                                rpi := acc.routePoolIdx
23✔
623
                                // Make sure that the account was not already switched.
23✔
624
                                if rpi >= 0 {
32✔
625
                                        s.setRouteInfo(acc)
9✔
626
                                        // Change the route pool index to indicate that this
9✔
627
                                        // account is actually transitioning. This will be used
9✔
628
                                        // to suppress possible remote subscription interest coming
9✔
629
                                        // in while the transition is happening.
9✔
630
                                        acc.routePoolIdx = accTransitioningToDedicatedRoute
9✔
631
                                }
9✔
632
                                acc.mu.Unlock()
23✔
633
                                // Since v2.11.0, we support remotes with a different pool size
23✔
634
                                // (for rolling upgrades), so we need to use the remote route
23✔
635
                                // pool index (based on the remote configured pool size) since
23✔
636
                                // the remote subscriptions will be attached to the route at
23✔
637
                                // that index, not at our account's route pool index. But we
23✔
638
                                // need to compute only if rpi is negative or the pool sizes
23✔
639
                                // are different.
23✔
640
                                if rpi <= 0 || info.RoutePoolSize != s.routesPoolSize {
45✔
641
                                        rpi = computeRoutePoolIdx(info.RoutePoolSize, an)
22✔
642
                                }
22✔
643
                                // Go over each remote's route at pool index `rpi` and remove
644
                                // remote subs for this account.
645
                                s.forEachRouteIdx(rpi, func(r *client) bool {
66✔
646
                                        r.mu.Lock()
43✔
647
                                        // Exclude routes to servers that don't support pooling.
43✔
648
                                        if !r.route.noPool {
84✔
649
                                                if subs := r.removeRemoteSubsForAcc(an); len(subs) > 0 {
46✔
650
                                                        sl.RemoveBatch(subs)
5✔
651
                                                }
5✔
652
                                        }
653
                                        r.mu.Unlock()
43✔
654
                                        return true
43✔
655
                                })
656
                                // Respond to the remote by clearing the RouteAccount field.
657
                                info.RouteAccount = _EMPTY_
23✔
658
                                proto := generateInfoJSON(info)
23✔
659
                                c.mu.Lock()
23✔
660
                                c.enqueueProto(proto)
23✔
661
                                c.mu.Unlock()
23✔
662
                                s.mu.Unlock()
23✔
663
                        } else {
22✔
664
                                // If no account name is specified, this is a response from the
22✔
665
                                // remote. Simply send to the communication channel, if the
22✔
666
                                // request ID matches the current one.
22✔
667
                                s.mu.Lock()
22✔
668
                                if info.RouteAccReqID == s.accAddedReqID && s.accAddedCh != nil {
22✔
669
                                        select {
×
670
                                        case s.accAddedCh <- struct{}{}:
×
671
                                        default:
×
672
                                        }
673
                                }
674
                                s.mu.Unlock()
22✔
675
                        }
676
                        // In both cases, we are done here.
677
                        return
45✔
678
                }
679

680
                // Check if this is an INFO for gateways...
681
                if info.Gateway != _EMPTY_ {
25,310✔
682
                        c.mu.Unlock()
6,280✔
683
                        // If this server has no gateway configured, report error and return.
6,280✔
684
                        if !s.gateway.enabled {
6,281✔
685
                                // FIXME: Should this be a Fatalf()?
1✔
686
                                s.Errorf("Received information about gateway %q from %s, but gateway is not configured",
1✔
687
                                        info.Gateway, remoteID)
1✔
688
                                return
1✔
689
                        }
1✔
690
                        s.processGatewayInfoFromRoute(info, remoteID)
6,279✔
691
                        return
6,279✔
692
                }
693

694
                // We receive an INFO from a server that informs us about another server,
695
                // so the info.ID in the INFO protocol does not match the ID of this route.
696
                if remoteID != _EMPTY_ && remoteID != info.ID {
25,306✔
697
                        // We want to know if the existing route supports pooling/pinned-account
12,556✔
698
                        // or not when processing the implicit route.
12,556✔
699
                        noPool := c.route.noPool
12,556✔
700
                        c.mu.Unlock()
12,556✔
701

12,556✔
702
                        // Process this implicit route. We will check that it is not an explicit
12,556✔
703
                        // route and/or that it has not been connected already.
12,556✔
704
                        s.processImplicitRoute(info, noPool)
12,556✔
705
                        return
12,556✔
706
                }
12,556✔
707

708
                var connectURLs []string
194✔
709
                var wsConnectURLs []string
194✔
710
                var updateRoutePerms bool
194✔
711

194✔
712
                // If we are notified that the remote is going into LDM mode, capture route's connectURLs.
194✔
713
                if info.LameDuckMode {
203✔
714
                        connectURLs = c.route.connectURLs
9✔
715
                        wsConnectURLs = c.route.wsConnURLs
9✔
716
                } else {
194✔
717
                        // Update only if we detect a difference
185✔
718
                        updateRoutePerms = !reflect.DeepEqual(c.opts.Import, info.Import) || !reflect.DeepEqual(c.opts.Export, info.Export)
185✔
719
                }
185✔
720
                c.mu.Unlock()
194✔
721

194✔
722
                if updateRoutePerms {
379✔
723
                        s.updateRemoteRoutePerms(c, info)
185✔
724
                }
185✔
725

726
                // If the remote is going into LDM and there are client connect URLs
727
                // associated with this route and we are allowed to advertise, remove
728
                // those URLs and update our clients.
729
                if (len(connectURLs) > 0 || len(wsConnectURLs) > 0) && !opts.Cluster.NoAdvertise {
203✔
730
                        s.mu.Lock()
9✔
731
                        s.removeConnectURLsAndSendINFOToClients(connectURLs, wsConnectURLs)
9✔
732
                        s.mu.Unlock()
9✔
733
                }
9✔
734
                return
194✔
735
        }
736

737
        // Check if remote has same server name than this server.
738
        if !didSolicit && info.Name == srvName {
41,426✔
739
                c.mu.Unlock()
2✔
740
                // This is now an error and we close the connection. We need unique names for JetStream clustering.
2✔
741
                c.Errorf("Remote server has a duplicate name: %q", info.Name)
2✔
742
                c.closeConnection(DuplicateServerName)
2✔
743
                return
2✔
744
        }
2✔
745

746
        var sendDelayedInfo bool
41,422✔
747

41,422✔
748
        // First INFO, check if this server is configured for compression because
41,422✔
749
        // if that is the case, we need to negotiate it with the remote server.
41,422✔
750
        if needsCompression(opts.Cluster.Compression.Mode) {
81,713✔
751
                accName := bytesToString(c.route.accName)
40,291✔
752
                // If we did not yet negotiate...
40,291✔
753
                compNeg := c.flags.isSet(compressionNegotiated)
40,291✔
754
                if !compNeg {
80,550✔
755
                        // Prevent from getting back here.
40,259✔
756
                        c.flags.set(compressionNegotiated)
40,259✔
757
                        // Release client lock since following function will need server lock.
40,259✔
758
                        c.mu.Unlock()
40,259✔
759
                        compress, err := s.negotiateRouteCompression(c, didSolicit, accName, info.Compression, opts)
40,259✔
760
                        if err != nil {
40,259✔
761
                                c.sendErrAndErr(err.Error())
×
762
                                c.closeConnection(ProtocolViolation)
×
763
                                return
×
764
                        }
×
765
                        if compress {
40,312✔
766
                                // Done for now, will get back another INFO protocol...
53✔
767
                                return
53✔
768
                        }
53✔
769
                        // No compression because one side does not want/can't, so proceed.
770
                        c.mu.Lock()
40,206✔
771
                        // Check that the connection did not close if the lock was released.
40,206✔
772
                        if c.isClosed() {
40,206✔
773
                                c.mu.Unlock()
×
774
                                return
×
775
                        }
×
776
                }
777
                // We can set the ping timer after we just negotiated compression above,
778
                // or for solicited routes if we already negotiated.
779
                if !compNeg || didSolicit {
80,463✔
780
                        c.setFirstPingTimer()
40,225✔
781
                }
40,225✔
782
                // When compression is configured, we delay the initial INFO for any
783
                // solicited route. So we need to send the delayed INFO simply based
784
                // on the didSolicit boolean.
785
                sendDelayedInfo = didSolicit
40,238✔
786
        } else {
1,131✔
787
                // Coming from an old server, the Compression field would be the empty
1,131✔
788
                // string. For servers that are configured with CompressionNotSupported,
1,131✔
789
                // this makes them behave as old servers.
1,131✔
790
                if info.Compression == _EMPTY_ || opts.Cluster.Compression.Mode == CompressionNotSupported {
1,155✔
791
                        c.route.compression = CompressionNotSupported
24✔
792
                } else {
1,131✔
793
                        c.route.compression = CompressionOff
1,107✔
794
                }
1,107✔
795
                // When compression is not configured, we delay the initial INFO only
796
                // for solicited pooled routes, so use the same check that we did when
797
                // we decided to delay in createRoute().
798
                sendDelayedInfo = didSolicit && routeShouldDelayInfo(bytesToString(c.route.accName), opts)
1,131✔
799
        }
800

801
        // Mark that the INFO protocol has been received, so we can detect updates.
802
        c.flags.set(infoReceived)
41,369✔
803

41,369✔
804
        // Get the route's proto version. It will be used to check if the connection
41,369✔
805
        // supports certain features, such as message tracing.
41,369✔
806
        c.opts.Protocol = info.Proto
41,369✔
807

41,369✔
808
        // Headers
41,369✔
809
        c.headers = supportsHeaders && info.Headers
41,369✔
810

41,369✔
811
        // Copy over important information.
41,369✔
812
        c.route.remoteID = info.ID
41,369✔
813
        c.route.authRequired = info.AuthRequired
41,369✔
814
        c.route.tlsRequired = info.TLSRequired
41,369✔
815
        c.route.gatewayURL = info.GatewayURL
41,369✔
816
        c.route.remoteName = info.Name
41,369✔
817
        c.route.lnoc = info.LNOC
41,369✔
818
        c.route.lnocu = info.LNOCU
41,369✔
819
        c.route.jetstream = info.JetStream
41,369✔
820

41,369✔
821
        // When sent through route INFO, if the field is set, it should be of size 1.
41,369✔
822
        if len(info.LeafNodeURLs) == 1 {
76,450✔
823
                c.route.leafnodeURL = info.LeafNodeURLs[0]
35,081✔
824
        }
35,081✔
825
        // Compute the hash of this route based on remote server name
826
        c.route.hash = getHash(info.Name)
41,369✔
827
        // Same with remote server ID (used for GW mapped replies routing).
41,369✔
828
        // Use getGWHash since we don't use the same hash len for that
41,369✔
829
        // for backward compatibility.
41,369✔
830
        c.route.idHash = string(getGWHash(info.ID))
41,369✔
831

41,369✔
832
        // Copy over permissions as well.
41,369✔
833
        c.opts.Import = info.Import
41,369✔
834
        c.opts.Export = info.Export
41,369✔
835

41,369✔
836
        // If we do not know this route's URL, construct one on the fly
41,369✔
837
        // from the information provided.
41,369✔
838
        if c.route.url == nil {
58,420✔
839
                // Add in the URL from host and port
17,051✔
840
                hp := net.JoinHostPort(info.Host, strconv.Itoa(info.Port))
17,051✔
841
                url, err := url.Parse(fmt.Sprintf("nats-route://%s/", hp))
17,051✔
842
                if err != nil {
17,051✔
843
                        c.Errorf("Error parsing URL from INFO: %v\n", err)
×
844
                        c.mu.Unlock()
×
845
                        c.closeConnection(ParseError)
×
846
                        return
×
847
                }
×
848
                c.route.url = url
17,051✔
849
        }
850
        // The incoming INFO from the route will have IP set
851
        // if it has Cluster.Advertise. In that case, use that
852
        // otherwise construct it from the remote TCP address.
853
        if info.IP == _EMPTY_ {
82,724✔
854
                // Need to get the remote IP address.
41,355✔
855
                switch conn := c.nc.(type) {
41,355✔
856
                case *net.TCPConn, *tls.Conn:
41,355✔
857
                        addr := conn.RemoteAddr().(*net.TCPAddr)
41,355✔
858
                        info.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(addr.IP.String(),
41,355✔
859
                                strconv.Itoa(info.Port)))
41,355✔
860
                default:
×
861
                        info.IP = c.route.url.String()
×
862
                }
863
        }
864
        // For accounts that are configured to have their own route:
865
        // If this is a solicited route, we already have c.route.accName set in createRoute.
866
        // For non solicited route (the accept side), we will set the account name that
867
        // is present in the INFO protocol.
868
        if didSolicit && len(c.route.accName) > 0 {
48,942✔
869
                // Set it in the info.RouteAccount so that addRoute can use that
7,573✔
870
                // and we properly gossip that this is a route for an account.
7,573✔
871
                info.RouteAccount = string(c.route.accName)
7,573✔
872
        } else if !didSolicit && info.RouteAccount != _EMPTY_ {
45,392✔
873
                c.route.accName = []byte(info.RouteAccount)
4,023✔
874
        }
4,023✔
875
        accName := string(c.route.accName)
41,369✔
876

41,369✔
877
        // Capture the noGossip value and reset it here.
41,369✔
878
        gossipMode := c.route.gossipMode
41,369✔
879
        c.route.gossipMode = 0
41,369✔
880

41,369✔
881
        // Check to see if we have this remote already registered.
41,369✔
882
        // This can happen when both servers have routes to each other.
41,369✔
883
        c.mu.Unlock()
41,369✔
884

41,369✔
885
        if added := s.addRoute(c, didSolicit, sendDelayedInfo, gossipMode, info, accName); added {
74,947✔
886
                if accName != _EMPTY_ {
41,565✔
887
                        c.Debugf("Registering remote route %q for account %q", info.ID, accName)
7,987✔
888
                } else {
33,578✔
889
                        c.Debugf("Registering remote route %q", info.ID)
25,591✔
890
                }
25,591✔
891
        } else {
7,791✔
892
                c.Debugf("Detected duplicate remote route %q", info.ID)
7,791✔
893
                c.closeConnection(DuplicateRoute)
7,791✔
894
        }
7,791✔
895
}
896

897
func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, infoCompression string, opts *Options) (bool, error) {
40,259✔
898
        // Negotiate the appropriate compression mode (or no compression)
40,259✔
899
        cm, err := selectCompressionMode(opts.Cluster.Compression.Mode, infoCompression)
40,259✔
900
        if err != nil {
40,259✔
901
                return false, err
×
902
        }
×
903
        c.mu.Lock()
40,259✔
904
        // For "auto" mode, set the initial compression mode based on RTT
40,259✔
905
        if cm == CompressionS2Auto {
40,260✔
906
                if c.rttStart.IsZero() {
2✔
907
                        c.rtt = computeRTT(c.start)
1✔
908
                }
1✔
909
                cm = selectS2AutoModeBasedOnRTT(c.rtt, opts.Cluster.Compression.RTTThresholds)
1✔
910
        }
911
        // Keep track of the negotiated compression mode.
912
        c.route.compression = cm
40,259✔
913
        c.mu.Unlock()
40,259✔
914

40,259✔
915
        // If we end-up doing compression...
40,259✔
916
        if needsCompression(cm) {
40,312✔
917
                // Generate an INFO with the chosen compression mode.
53✔
918
                s.mu.Lock()
53✔
919
                infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, gossipDefault)
53✔
920
                s.mu.Unlock()
53✔
921

53✔
922
                // If we solicited, then send this INFO protocol BEFORE switching
53✔
923
                // to compression writer. However, if we did not, we send it after.
53✔
924
                c.mu.Lock()
53✔
925
                if didSolicit {
72✔
926
                        c.enqueueProto(infoProto)
19✔
927
                        // Make sure it is completely flushed (the pending bytes goes to
19✔
928
                        // 0) before proceeding.
19✔
929
                        for c.out.pb > 0 && !c.isClosed() {
38✔
930
                                c.flushOutbound()
19✔
931
                        }
19✔
932
                }
933
                // This is to notify the readLoop that it should switch to a
934
                // (de)compression reader.
935
                c.in.flags.set(switchToCompression)
53✔
936
                // Create the compress writer before queueing the INFO protocol for
53✔
937
                // a route that did not solicit. It will make sure that that proto
53✔
938
                // is sent with compression on.
53✔
939
                c.out.cw = s2.NewWriter(nil, s2WriterOptions(cm)...)
53✔
940
                if !didSolicit {
87✔
941
                        c.enqueueProto(infoProto)
34✔
942
                }
34✔
943
                // We can now set the ping timer.
944
                c.setFirstPingTimer()
53✔
945
                c.mu.Unlock()
53✔
946
                return true, nil
53✔
947
        }
948
        return false, nil
40,206✔
949
}
950

951
// Possibly sends local subscriptions interest to this route
952
// based on changes in the remote's Export permissions.
953
func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
185✔
954
        c.mu.Lock()
185✔
955
        // Interested only on Export permissions for the remote server.
185✔
956
        // Create "fake" clients that we will use to check permissions
185✔
957
        // using the old permissions...
185✔
958
        oldPerms := &RoutePermissions{Export: c.opts.Export}
185✔
959
        oldPermsTester := &client{}
185✔
960
        oldPermsTester.setRoutePermissions(oldPerms)
185✔
961
        // and the new ones.
185✔
962
        newPerms := &RoutePermissions{Export: info.Export}
185✔
963
        newPermsTester := &client{}
185✔
964
        newPermsTester.setRoutePermissions(newPerms)
185✔
965

185✔
966
        c.opts.Import = info.Import
185✔
967
        c.opts.Export = info.Export
185✔
968

185✔
969
        routeAcc, poolIdx, noPool := bytesToString(c.route.accName), c.route.poolIdx, c.route.noPool
185✔
970
        c.mu.Unlock()
185✔
971

185✔
972
        var (
185✔
973
                _localSubs [4096]*subscription
185✔
974
                _allSubs   [4096]*subscription
185✔
975
                allSubs    = _allSubs[:0]
185✔
976
        )
185✔
977

185✔
978
        s.accounts.Range(func(_, v any) bool {
1,054✔
979
                acc := v.(*Account)
869✔
980
                acc.mu.RLock()
869✔
981
                accName, sl, accPoolIdx := acc.Name, acc.sl, acc.routePoolIdx
869✔
982
                acc.mu.RUnlock()
869✔
983

869✔
984
                // Do this only for accounts handled by this route
869✔
985
                if (accPoolIdx >= 0 && accPoolIdx == poolIdx) || (routeAcc == accName) || noPool {
1,078✔
986
                        localSubs := _localSubs[:0]
209✔
987
                        sl.localSubs(&localSubs, false)
209✔
988
                        if len(localSubs) > 0 {
412✔
989
                                allSubs = append(allSubs, localSubs...)
203✔
990
                        }
203✔
991
                }
992
                return true
869✔
993
        })
994

995
        if len(allSubs) == 0 {
231✔
996
                return
46✔
997
        }
46✔
998

999
        c.mu.Lock()
139✔
1000
        c.sendRouteSubProtos(allSubs, false, func(sub *subscription) bool {
2,716✔
1001
                subj := string(sub.subject)
2,577✔
1002
                // If the remote can now export but could not before, and this server can import this
2,577✔
1003
                // subject, then send SUB protocol.
2,577✔
1004
                if newPermsTester.canExport(subj) && !oldPermsTester.canExport(subj) && c.canImport(subj) {
2,589✔
1005
                        return true
12✔
1006
                }
12✔
1007
                return false
2,565✔
1008
        })
1009
        c.mu.Unlock()
139✔
1010
}
1011

1012
// sendAsyncInfoToClients sends an INFO protocol to all
1013
// connected clients that accept async INFO updates.
1014
// The server lock is held on entry.
1015
func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
18,183✔
1016
        // If there are no clients supporting async INFO protocols, we are done.
18,183✔
1017
        // Also don't send if we are shutting down...
18,183✔
1018
        if s.cproto == 0 || s.isShuttingDown() {
35,961✔
1019
                return
17,778✔
1020
        }
17,778✔
1021
        info := s.copyInfo()
405✔
1022

405✔
1023
        for _, c := range s.clients {
946✔
1024
                c.mu.Lock()
541✔
1025
                // Here, we are going to send only to the clients that are fully
541✔
1026
                // registered (server has received CONNECT and first PING). For
541✔
1027
                // clients that are not at this stage, this will happen in the
541✔
1028
                // processing of the first PING (see client.processPing)
541✔
1029
                if ((regCli && !c.isWebsocket()) || (wsCli && c.isWebsocket())) &&
541✔
1030
                        c.opts.Protocol >= ClientProtoInfo &&
541✔
1031
                        c.flags.isSet(firstPongSent) {
1,068✔
1032
                        // sendInfo takes care of checking if the connection is still
527✔
1033
                        // valid or not, so don't duplicate tests here.
527✔
1034
                        c.enqueueProto(c.generateClientInfoJSON(info))
527✔
1035
                }
527✔
1036
                c.mu.Unlock()
541✔
1037
        }
1038
}
1039

1040
// This will process implicit route information received from another server.
1041
// We will check to see if we have configured or are already connected,
1042
// and if so we will ignore. Otherwise we will attempt to connect.
1043
func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) {
12,556✔
1044
        remoteID := info.ID
12,556✔
1045

12,556✔
1046
        s.mu.Lock()
12,556✔
1047
        defer s.mu.Unlock()
12,556✔
1048

12,556✔
1049
        // Don't connect to ourself
12,556✔
1050
        if remoteID == s.info.ID {
12,556✔
1051
                return
×
1052
        }
×
1053

1054
        // Snapshot server options.
1055
        opts := s.getOpts()
12,556✔
1056

12,556✔
1057
        // Check if this route already exists
12,556✔
1058
        if accName := info.RouteAccount; accName != _EMPTY_ {
18,803✔
1059
                // If we don't support pooling/pinned account, bail.
6,247✔
1060
                if opts.Cluster.PoolSize <= 0 {
6,247✔
1061
                        return
×
1062
                }
×
1063
                if remotes, ok := s.accRoutes[accName]; ok {
12,494✔
1064
                        if r := remotes[remoteID]; r != nil {
11,457✔
1065
                                return
5,210✔
1066
                        }
5,210✔
1067
                }
1068
        } else if _, exists := s.routes[remoteID]; exists {
11,167✔
1069
                return
4,858✔
1070
        }
4,858✔
1071
        // Check if we have this route as a configured route
1072
        if s.hasThisRouteConfigured(info) {
4,524✔
1073
                return
2,036✔
1074
        }
2,036✔
1075

1076
        // Initiate the connection, using info.IP instead of info.URL here...
1077
        r, err := url.Parse(info.IP)
452✔
1078
        if err != nil {
452✔
1079
                s.Errorf("Error parsing URL from INFO: %v\n", err)
×
1080
                return
×
1081
        }
×
1082

1083
        if info.AuthRequired {
456✔
1084
                r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
4✔
1085
        }
4✔
1086
        s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.GossipMode, info.RouteAccount) })
904✔
1087
        // If we are processing an implicit route from a route that does not
1088
        // support pooling/pinned-accounts, we won't receive an INFO for each of
1089
        // the pinned-accounts that we would normally receive. In that case, just
1090
        // initiate routes for all our configured pinned accounts.
1091
        if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 {
452✔
1092
                // Copy since we are going to pass as closure to a go routine.
×
1093
                rURL := r
×
1094
                for _, an := range opts.Cluster.PinnedAccounts {
×
1095
                        accName := an
×
1096
                        s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.GossipMode, accName) })
×
1097
                }
1098
        }
1099
}
1100

1101
// hasThisRouteConfigured returns true if info.Host:info.Port is present
1102
// in the server's opts.Routes, false otherwise.
1103
// Server lock is assumed to be held by caller.
1104
func (s *Server) hasThisRouteConfigured(info *Info) bool {
2,488✔
1105
        routes := s.getOpts().Routes
2,488✔
1106
        if len(routes) == 0 {
2,510✔
1107
                return false
22✔
1108
        }
22✔
1109
        // This could possibly be a 0.0.0.0 host so we will also construct a second
1110
        // url with the host section of the `info.IP` (if present).
1111
        sPort := strconv.Itoa(info.Port)
2,466✔
1112
        urlOne := strings.ToLower(net.JoinHostPort(info.Host, sPort))
2,466✔
1113
        var urlTwo string
2,466✔
1114
        if info.IP != _EMPTY_ {
4,932✔
1115
                if u, _ := url.Parse(info.IP); u != nil {
4,932✔
1116
                        urlTwo = strings.ToLower(net.JoinHostPort(u.Hostname(), sPort))
2,466✔
1117
                        // Ignore if same than the first
2,466✔
1118
                        if urlTwo == urlOne {
4,815✔
1119
                                urlTwo = _EMPTY_
2,349✔
1120
                        }
2,349✔
1121
                }
1122
        }
1123
        for _, ri := range routes {
11,240✔
1124
                rHost := strings.ToLower(ri.Host)
8,774✔
1125
                if rHost == urlOne {
10,810✔
1126
                        return true
2,036✔
1127
                }
2,036✔
1128
                if urlTwo != _EMPTY_ && rHost == urlTwo {
6,738✔
1129
                        return true
×
1130
                }
×
1131
        }
1132
        return false
430✔
1133
}
1134

1135
// forwardNewRouteInfoToKnownServers possibly sends the INFO protocol of the
1136
// new route to all routes known by this server. In turn, each server will
1137
// contact this new route.
1138
// Server lock held on entry.
1139
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info, rtype RouteType, didSolicit bool, localGossipMode byte) {
9,474✔
1140
        // Determine if this connection is resulting from a gossip notification.
9,474✔
1141
        fromGossip := didSolicit && rtype == Implicit
9,474✔
1142
        // If from gossip (but we are not overriding it) or if the remote disabled gossip, bail out.
9,474✔
1143
        if (fromGossip && localGossipMode != gossipOverride) || info.GossipMode == gossipDisabled {
10,264✔
1144
                return
790✔
1145
        }
790✔
1146

1147
        // Note: nonce is not used in routes.
1148
        // That being said, the info we get is the initial INFO which
1149
        // contains a nonce, but we now forward this to existing routes,
1150
        // so clear it now.
1151
        info.Nonce = _EMPTY_
8,684✔
1152

8,684✔
1153
        var (
8,684✔
1154
                infoGMDefault  []byte
8,684✔
1155
                infoGMDisabled []byte
8,684✔
1156
                infoGMOverride []byte
8,684✔
1157
        )
8,684✔
1158

8,684✔
1159
        generateJSON := func(gm byte) []byte {
17,400✔
1160
                info.GossipMode = gm
8,716✔
1161
                b, _ := json.Marshal(info)
8,716✔
1162
                return []byte(fmt.Sprintf(InfoProto, b))
8,716✔
1163
        }
8,716✔
1164

1165
        getJSON := func(r *client) []byte {
21,260✔
1166
                if (!didSolicit && r.route.routeType == Explicit) || (didSolicit && rtype == Explicit) {
24,596✔
1167
                        if infoGMOverride == nil {
20,399✔
1168
                                infoGMOverride = generateJSON(gossipOverride)
8,379✔
1169
                        }
8,379✔
1170
                        return infoGMOverride
12,020✔
1171
                } else if !didSolicit {
1,049✔
1172
                        if infoGMDisabled == nil {
781✔
1173
                                infoGMDisabled = generateJSON(gossipDisabled)
288✔
1174
                        }
288✔
1175
                        return infoGMDisabled
493✔
1176
                }
1177
                if infoGMDefault == nil {
112✔
1178
                        infoGMDefault = generateJSON(0)
49✔
1179
                }
49✔
1180
                return infoGMDefault
63✔
1181
        }
1182

1183
        var accRemotes map[string]*client
8,684✔
1184
        pinnedAccount := info.RouteAccount != _EMPTY_
8,684✔
1185
        // If this is for a pinned account, we will try to send the gossip
8,684✔
1186
        // through our pinned account routes, but fall back to the other
8,684✔
1187
        // routes in case we don't have one for a given remote.
8,684✔
1188
        if pinnedAccount {
12,931✔
1189
                var ok bool
4,247✔
1190
                if accRemotes, ok = s.accRoutes[info.RouteAccount]; ok {
8,494✔
1191
                        for remoteID, r := range accRemotes {
14,527✔
1192
                                if r == nil {
10,280✔
1193
                                        continue
×
1194
                                }
1195
                                r.mu.Lock()
10,280✔
1196
                                // Do not send to a remote that does not support pooling/pinned-accounts.
10,280✔
1197
                                if remoteID != info.ID && !r.route.noPool {
16,313✔
1198
                                        r.enqueueProto(getJSON(r))
6,033✔
1199
                                }
6,033✔
1200
                                r.mu.Unlock()
10,280✔
1201
                        }
1202
                }
1203
        }
1204

1205
        s.forEachRemote(func(r *client) {
27,422✔
1206
                r.mu.Lock()
18,738✔
1207
                remoteID := r.route.remoteID
18,738✔
1208
                if pinnedAccount {
26,717✔
1209
                        if _, processed := accRemotes[remoteID]; processed {
15,737✔
1210
                                r.mu.Unlock()
7,758✔
1211
                                return
7,758✔
1212
                        }
7,758✔
1213
                }
1214
                // If this is a new route for a given account, do not send to a server
1215
                // that does not support pooling/pinned-accounts.
1216
                if remoteID != info.ID && (!pinnedAccount || !r.route.noPool) {
17,523✔
1217
                        r.enqueueProto(getJSON(r))
6,543✔
1218
                }
6,543✔
1219
                r.mu.Unlock()
10,980✔
1220
        })
1221
}
1222

1223
// canImport is whether or not we will send a SUB for interest to the other side.
1224
// This is for ROUTER connections only.
1225
// Lock is held on entry.
1226
func (c *client) canImport(subject string) bool {
621,064✔
1227
        // Use pubAllowed() since this checks Publish permissions which
621,064✔
1228
        // is what Import maps to.
621,064✔
1229
        return c.pubAllowedFullCheck(subject, false, true)
621,064✔
1230
}
621,064✔
1231

1232
// canExport is whether or not we will accept a SUB from the remote for a given subject.
1233
// This is for ROUTER connections only.
1234
// Lock is held on entry
1235
func (c *client) canExport(subject string) bool {
4,640✔
1236
        // Use canSubscribe() since this checks Subscribe permissions which
4,640✔
1237
        // is what Export maps to.
4,640✔
1238
        return c.canSubscribe(subject)
4,640✔
1239
}
4,640✔
1240

1241
// Initialize or reset cluster's permissions.
1242
// This is for ROUTER connections only.
1243
// Client lock is held on entry
1244
func (c *client) setRoutePermissions(perms *RoutePermissions) {
61,912✔
1245
        // Reset if some were set
61,912✔
1246
        if perms == nil {
123,057✔
1247
                c.perms = nil
61,145✔
1248
                c.mperms = nil
61,145✔
1249
                return
61,145✔
1250
        }
61,145✔
1251
        // Convert route permissions to user permissions.
1252
        // The Import permission is mapped to Publish
1253
        // and Export permission is mapped to Subscribe.
1254
        // For meaning of Import/Export, see canImport and canExport.
1255
        p := &Permissions{
767✔
1256
                Publish:   perms.Import,
767✔
1257
                Subscribe: perms.Export,
767✔
1258
        }
767✔
1259
        c.setPermissions(p)
767✔
1260
}
1261

1262
// Type used to hold a list of subs on a per account basis.
1263
type asubs struct {
1264
        acc  *Account
1265
        subs []*subscription
1266
}
1267

1268
// Returns the account name from the subscription's key.
1269
// This is invoked knowing that the key contains an account name, so for a sub
1270
// that is not from a pinned-account route.
1271
// The `keyHasSubType` boolean indicates that the key starts with the indicator
1272
// for leaf or regular routed subscriptions.
1273
func getAccNameFromRoutedSubKey(sub *subscription, key string, keyHasSubType bool) string {
18,819✔
1274
        var accIdx int
18,819✔
1275
        if keyHasSubType {
37,631✔
1276
                // Start after the sub type indicator.
18,812✔
1277
                accIdx = 1
18,812✔
1278
                // But if there is an origin, bump its index.
18,812✔
1279
                if len(sub.origin) > 0 {
18,893✔
1280
                        accIdx = 2
81✔
1281
                }
81✔
1282
        }
1283
        return strings.Fields(key)[accIdx]
18,819✔
1284
}
1285

1286
// Returns if the route is dedicated to an account, its name, and a boolean
1287
// that indicates if this route uses the routed subscription indicator at
1288
// the beginning of the subscription key.
1289
// Lock held on entry.
1290
func (c *client) getRoutedSubKeyInfo() (bool, string, bool) {
45,306✔
1291
        var accName string
45,306✔
1292
        if an := c.route.accName; len(an) > 0 {
56,132✔
1293
                accName = string(an)
10,826✔
1294
        }
10,826✔
1295
        return accName != _EMPTY_, accName, c.route.lnocu
45,306✔
1296
}
1297

1298
// removeRemoteSubs will walk the subs and remove them from the appropriate account.
1299
func (c *client) removeRemoteSubs() {
45,032✔
1300
        // We need to gather these on a per account basis.
45,032✔
1301
        // FIXME(dlc) - We should be smarter about this..
45,032✔
1302
        as := map[string]*asubs{}
45,032✔
1303
        c.mu.Lock()
45,032✔
1304
        srv := c.srv
45,032✔
1305
        subs := c.subs
45,032✔
1306
        c.subs = nil
45,032✔
1307
        pa, accountName, hasSubType := c.getRoutedSubKeyInfo()
45,032✔
1308
        c.mu.Unlock()
45,032✔
1309

45,032✔
1310
        for key, sub := range subs {
292,876✔
1311
                c.mu.Lock()
247,844✔
1312
                sub.max = 0
247,844✔
1313
                c.mu.Unlock()
247,844✔
1314
                // If not a pinned-account route, we need to find the account
247,844✔
1315
                // name from the sub's key.
247,844✔
1316
                if !pa {
266,203✔
1317
                        accountName = getAccNameFromRoutedSubKey(sub, key, hasSubType)
18,359✔
1318
                }
18,359✔
1319
                ase := as[accountName]
247,844✔
1320
                if ase == nil {
252,889✔
1321
                        if v, ok := srv.accounts.Load(accountName); ok {
10,090✔
1322
                                ase = &asubs{acc: v.(*Account), subs: []*subscription{sub}}
5,045✔
1323
                                as[accountName] = ase
5,045✔
1324
                        } else {
5,045✔
1325
                                continue
×
1326
                        }
1327
                } else {
242,799✔
1328
                        ase.subs = append(ase.subs, sub)
242,799✔
1329
                }
242,799✔
1330
                delta := int32(1)
247,844✔
1331
                if len(sub.queue) > 0 {
248,074✔
1332
                        delta = sub.qw
230✔
1333
                }
230✔
1334
                if srv.gateway.enabled {
296,687✔
1335
                        srv.gatewayUpdateSubInterest(accountName, sub, -delta)
48,843✔
1336
                }
48,843✔
1337
                ase.acc.updateLeafNodes(sub, -delta)
247,844✔
1338
        }
1339

1340
        // Now remove the subs by batch for each account sublist.
1341
        for _, ase := range as {
50,077✔
1342
                c.Debugf("Removing %d subscriptions for account %q", len(ase.subs), ase.acc.Name)
5,045✔
1343
                ase.acc.mu.Lock()
5,045✔
1344
                ase.acc.sl.RemoveBatch(ase.subs)
5,045✔
1345
                ase.acc.mu.Unlock()
5,045✔
1346
        }
5,045✔
1347
}
1348

1349
// Removes (and returns) the subscriptions from this route's subscriptions map
1350
// that belong to the given account.
1351
// Lock is held on entry
1352
func (c *client) removeRemoteSubsForAcc(name string) []*subscription {
65✔
1353
        var subs []*subscription
65✔
1354
        _, _, hasSubType := c.getRoutedSubKeyInfo()
65✔
1355
        for key, sub := range c.subs {
475✔
1356
                an := getAccNameFromRoutedSubKey(sub, key, hasSubType)
410✔
1357
                if an == name {
820✔
1358
                        sub.max = 0
410✔
1359
                        subs = append(subs, sub)
410✔
1360
                        delete(c.subs, key)
410✔
1361
                }
410✔
1362
        }
1363
        return subs
65✔
1364
}
1365

1366
func (c *client) parseUnsubProto(arg []byte, accInProto, hasOrigin bool) ([]byte, string, []byte, []byte, error) {
205,825✔
1367
        // Indicate any activity, so pub and sub or unsubs.
205,825✔
1368
        c.in.subs++
205,825✔
1369

205,825✔
1370
        args := splitArg(arg)
205,825✔
1371

205,825✔
1372
        var (
205,825✔
1373
                origin      []byte
205,825✔
1374
                accountName string
205,825✔
1375
                queue       []byte
205,825✔
1376
                subjIdx     int
205,825✔
1377
        )
205,825✔
1378
        // If `hasOrigin` is true, then it means this is a LS- with origin in proto.
205,825✔
1379
        if hasOrigin {
217,255✔
1380
                // We would not be here if there was not at least 1 field.
11,430✔
1381
                origin = args[0]
11,430✔
1382
                subjIdx = 1
11,430✔
1383
        }
11,430✔
1384
        // If there is an account in the protocol, bump the subject index.
1385
        if accInProto {
278,789✔
1386
                subjIdx++
72,964✔
1387
        }
72,964✔
1388

1389
        switch len(args) {
205,825✔
1390
        case subjIdx + 1:
194,717✔
1391
        case subjIdx + 2:
11,108✔
1392
                queue = args[subjIdx+1]
11,108✔
1393
        default:
×
1394
                return nil, _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg)
×
1395
        }
1396
        if accInProto {
278,789✔
1397
                // If there is an account in the protocol, it is before the subject.
72,964✔
1398
                accountName = string(args[subjIdx-1])
72,964✔
1399
        }
72,964✔
1400
        return origin, accountName, args[subjIdx], queue, nil
205,825✔
1401
}
1402

1403
// Indicates no more interest in the given account/subject for the remote side.
1404
func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) {
164,400✔
1405
        srv := c.srv
164,400✔
1406
        if srv == nil {
164,400✔
1407
                return nil
×
1408
        }
×
1409

1410
        var accountName string
164,400✔
1411
        // Assume the account will be in the protocol.
164,400✔
1412
        accInProto := true
164,400✔
1413

164,400✔
1414
        c.mu.Lock()
164,400✔
1415
        originSupport := c.route.lnocu
164,400✔
1416
        if c.route != nil && len(c.route.accName) > 0 {
297,261✔
1417
                accountName, accInProto = string(c.route.accName), false
132,861✔
1418
        }
132,861✔
1419
        c.mu.Unlock()
164,400✔
1420

164,400✔
1421
        hasOrigin := leafUnsub && originSupport
164,400✔
1422
        _, accNameFromProto, subject, _, err := c.parseUnsubProto(arg, accInProto, hasOrigin)
164,400✔
1423
        if err != nil {
164,400✔
1424
                return fmt.Errorf("processRemoteUnsub %s", err.Error())
×
1425
        }
×
1426
        if accInProto {
195,939✔
1427
                accountName = accNameFromProto
31,539✔
1428
        }
31,539✔
1429
        // Lookup the account
1430
        var acc *Account
164,400✔
1431
        if v, ok := srv.accounts.Load(accountName); ok {
328,711✔
1432
                acc = v.(*Account)
164,311✔
1433
        } else {
164,400✔
1434
                c.Debugf("Unknown account %q for subject %q", accountName, subject)
89✔
1435
                return nil
89✔
1436
        }
89✔
1437

1438
        c.mu.Lock()
164,311✔
1439
        if c.isClosed() {
164,412✔
1440
                c.mu.Unlock()
101✔
1441
                return nil
101✔
1442
        }
101✔
1443

1444
        _keya := [128]byte{}
164,210✔
1445
        _key := _keya[:0]
164,210✔
1446

164,210✔
1447
        var key string
164,210✔
1448
        if !originSupport {
164,230✔
1449
                // If it is an LS- or RS-, we use the protocol as-is as the key.
20✔
1450
                key = bytesToString(arg)
20✔
1451
        } else {
164,210✔
1452
                // We need to prefix with the sub type.
164,190✔
1453
                if leafUnsub {
175,612✔
1454
                        _key = append(_key, keyRoutedLeafSubByte)
11,422✔
1455
                } else {
164,190✔
1456
                        _key = append(_key, keyRoutedSubByte)
152,768✔
1457
                }
152,768✔
1458
                _key = append(_key, ' ')
164,190✔
1459
                _key = append(_key, arg...)
164,190✔
1460
                key = bytesToString(_key)
164,190✔
1461
        }
1462
        delta := int32(1)
164,210✔
1463
        sub, ok := c.subs[key]
164,210✔
1464
        if ok {
328,403✔
1465
                delete(c.subs, key)
164,193✔
1466
                acc.sl.Remove(sub)
164,193✔
1467
                if len(sub.queue) > 0 {
165,003✔
1468
                        delta = sub.qw
810✔
1469
                }
810✔
1470
        }
1471
        c.mu.Unlock()
164,210✔
1472

164,210✔
1473
        // Update gateways and leaf nodes only if the subscription was found.
164,210✔
1474
        if ok {
328,403✔
1475
                if srv.gateway.enabled {
178,407✔
1476
                        srv.gatewayUpdateSubInterest(accountName, sub, -delta)
14,214✔
1477
                }
14,214✔
1478

1479
                // Now check on leafnode updates.
1480
                acc.updateLeafNodes(sub, -delta)
164,193✔
1481
        }
1482

1483
        if c.opts.Verbose {
164,210✔
1484
                c.sendOK()
×
1485
        }
×
1486
        return nil
164,210✔
1487
}
1488

1489
func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
819,632✔
1490
        // Indicate activity.
819,632✔
1491
        c.in.subs++
819,632✔
1492

819,632✔
1493
        srv := c.srv
819,632✔
1494
        if srv == nil {
819,632✔
1495
                return nil
×
1496
        }
×
1497

1498
        // We copy `argo` to not reference the read buffer. However, we will
1499
        // prefix with a code that says if the remote sub is for a leaf
1500
        // (hasOrigin == true) or not to prevent key collisions. Imagine:
1501
        // "RS+ foo bar baz 1\r\n" => "foo bar baz" (a routed queue sub)
1502
        // "LS+ foo bar baz\r\n"   => "foo bar baz" (a route leaf sub on "baz",
1503
        // for account "bar" with origin "foo").
1504
        //
1505
        // The sub.sid/key will be set respectively to "R foo bar baz" and
1506
        // "L foo bar baz".
1507
        //
1508
        // We also no longer add the account if it was not present (due to
1509
        // pinned-account route) since there is no need really.
1510
        //
1511
        // For routes to older server, we will still create the "arg" with
1512
        // the above layout, but we will create the sub.sid/key as before,
1513
        // that is, not including the origin for LS+ because older server
1514
        // only send LS- without origin, so we would not be able to find
1515
        // the sub in the map.
1516
        c.mu.Lock()
819,632✔
1517
        accountName := string(c.route.accName)
819,632✔
1518
        oldStyle := !c.route.lnocu
819,632✔
1519
        c.mu.Unlock()
819,632✔
1520

819,632✔
1521
        // Indicate if the account name should be in the protocol. It would be the
819,632✔
1522
        // case if accountName is empty.
819,632✔
1523
        accInProto := accountName == _EMPTY_
819,632✔
1524

819,632✔
1525
        // Copy so we do not reference a potentially large buffer.
819,632✔
1526
        // Add 2 more bytes for the routed sub type.
819,632✔
1527
        arg := make([]byte, 0, 2+len(argo))
819,632✔
1528
        if hasOrigin {
831,786✔
1529
                arg = append(arg, keyRoutedLeafSubByte)
12,154✔
1530
        } else {
819,632✔
1531
                arg = append(arg, keyRoutedSubByte)
807,478✔
1532
        }
807,478✔
1533
        arg = append(arg, ' ')
819,632✔
1534
        arg = append(arg, argo...)
819,632✔
1535

819,632✔
1536
        // Now split to get all fields. Unroll splitArgs to avoid runtime/heap issues.
819,632✔
1537
        a := [MAX_RSUB_ARGS][]byte{}
819,632✔
1538
        args := a[:0]
819,632✔
1539
        start := -1
819,632✔
1540
        for i, b := range arg {
30,822,637✔
1541
                switch b {
30,003,005✔
1542
                case ' ', '\t', '\r', '\n':
1,022,105✔
1543
                        if start >= 0 {
2,044,210✔
1544
                                args = append(args, arg[start:i])
1,022,105✔
1545
                                start = -1
1,022,105✔
1546
                        }
1,022,105✔
1547
                default:
28,980,900✔
1548
                        if start < 0 {
30,822,637✔
1549
                                start = i
1,841,737✔
1550
                        }
1,841,737✔
1551
                }
1552
        }
1553
        if start >= 0 {
1,639,264✔
1554
                args = append(args, arg[start:])
819,632✔
1555
        }
819,632✔
1556

1557
        delta := int32(1)
819,632✔
1558
        sub := &subscription{client: c}
819,632✔
1559

819,632✔
1560
        // There will always be at least a subject, but its location will depend
819,632✔
1561
        // on if there is an origin, an account name, etc.. Since we know that
819,632✔
1562
        // we have added the sub type indicator as the first field, the subject
819,632✔
1563
        // position will be at minimum at index 1.
819,632✔
1564
        subjIdx := 1
819,632✔
1565
        if hasOrigin {
831,786✔
1566
                subjIdx++
12,154✔
1567
        }
12,154✔
1568
        if accInProto {
998,858✔
1569
                subjIdx++
179,226✔
1570
        }
179,226✔
1571
        switch len(args) {
819,632✔
1572
        case subjIdx + 1:
814,084✔
1573
                sub.queue = nil
814,084✔
1574
        case subjIdx + 3:
5,547✔
1575
                sub.queue = args[subjIdx+1]
5,547✔
1576
                sub.qw = int32(parseSize(args[subjIdx+2]))
5,547✔
1577
                // TODO: (ik) We should have a non empty queue name and a queue
5,547✔
1578
                // weight >= 1. For 2.11, we may want to return an error if that
5,547✔
1579
                // is not the case, but for now just overwrite `delta` if queue
5,547✔
1580
                // weight is greater than 1 (it is possible after a reconnect/
5,547✔
1581
                // server restart to receive a queue weight > 1 for a new sub).
5,547✔
1582
                if sub.qw > 1 {
9,591✔
1583
                        delta = sub.qw
4,044✔
1584
                }
4,044✔
1585
        default:
1✔
1586
                return fmt.Errorf("processRemoteSub Parse Error: '%s'", arg)
1✔
1587
        }
1588
        // We know that the number of fields is correct. So we can access args[] based
1589
        // on where we expect the fields to be.
1590

1591
        // If there is an origin, it will be at index 1.
1592
        if hasOrigin {
831,785✔
1593
                sub.origin = args[1]
12,154✔
1594
        }
12,154✔
1595
        // For subject, use subjIdx.
1596
        sub.subject = args[subjIdx]
819,631✔
1597
        // If the account name is in the protocol, it will be before the subject.
819,631✔
1598
        if accInProto {
998,856✔
1599
                accountName = bytesToString(args[subjIdx-1])
179,225✔
1600
        }
179,225✔
1601
        // Now set the sub.sid from the arg slice. However, we will have a different
1602
        // one if we use the origin or not.
1603
        start = 0
819,631✔
1604
        end := len(arg)
819,631✔
1605
        if sub.queue != nil {
825,178✔
1606
                // Remove the ' <weight>' from the arg length.
5,547✔
1607
                end -= 1 + len(args[subjIdx+2])
5,547✔
1608
        }
5,547✔
1609
        if oldStyle {
819,661✔
1610
                // We will start at the account (if present) or at the subject.
30✔
1611
                // We first skip the "R " or "L "
30✔
1612
                start = 2
30✔
1613
                // And if there is an origin skip that.
30✔
1614
                if hasOrigin {
46✔
1615
                        start += len(sub.origin) + 1
16✔
1616
                }
16✔
1617
                // Here we are pointing at the account (if present), or at the subject.
1618
        }
1619
        sub.sid = arg[start:end]
819,631✔
1620

819,631✔
1621
        // Lookup account while avoiding fetch.
819,631✔
1622
        // A slow fetch delays subsequent remote messages. It also avoids the expired check (see below).
819,631✔
1623
        // With all but memory resolver lookup can be delayed or fail.
819,631✔
1624
        // It is also possible that the account can't be resolved yet.
819,631✔
1625
        // This does not apply to the memory resolver.
819,631✔
1626
        // When used, perform the fetch.
819,631✔
1627
        staticResolver := true
819,631✔
1628
        if res := srv.AccountResolver(); res != nil {
854,147✔
1629
                if _, ok := res.(*MemAccResolver); !ok {
56,700✔
1630
                        staticResolver = false
22,184✔
1631
                }
22,184✔
1632
        }
1633
        var acc *Account
819,631✔
1634
        if staticResolver {
1,617,078✔
1635
                acc, _ = srv.LookupAccount(accountName)
797,447✔
1636
        } else if v, ok := srv.accounts.Load(accountName); ok {
841,810✔
1637
                acc = v.(*Account)
22,179✔
1638
        }
22,179✔
1639
        if acc == nil {
820,099✔
1640
                // if the option of retrieving accounts later exists, create an expired one.
468✔
1641
                // When a client comes along, expiration will prevent it from being used,
468✔
1642
                // cause a fetch and update the account to what is should be.
468✔
1643
                if staticResolver {
931✔
1644
                        c.Errorf("Unknown account %q for remote subject %q", accountName, sub.subject)
463✔
1645
                        return
463✔
1646
                }
463✔
1647
                c.Debugf("Unknown account %q for remote subject %q", accountName, sub.subject)
5✔
1648

5✔
1649
                var isNew bool
5✔
1650
                if acc, isNew = srv.LookupOrRegisterAccount(accountName); isNew {
10✔
1651
                        acc.mu.Lock()
5✔
1652
                        acc.expired.Store(true)
5✔
1653
                        acc.incomplete = true
5✔
1654
                        acc.mu.Unlock()
5✔
1655
                }
5✔
1656
        }
1657

1658
        c.mu.Lock()
819,168✔
1659
        if c.isClosed() {
822,268✔
1660
                c.mu.Unlock()
3,100✔
1661
                return nil
3,100✔
1662
        }
3,100✔
1663

1664
        // Check permissions if applicable.
1665
        if c.perms != nil && !c.canExport(string(sub.subject)) {
816,764✔
1666
                c.mu.Unlock()
696✔
1667
                c.Debugf("Can not export %q, ignoring remote subscription request", sub.subject)
696✔
1668
                return nil
696✔
1669
        }
696✔
1670

1671
        // Check if we have a maximum on the number of subscriptions.
1672
        if c.subsAtLimit() {
815,372✔
1673
                c.mu.Unlock()
×
1674
                c.maxSubsExceeded()
×
1675
                return nil
×
1676
        }
×
1677

1678
        acc.mu.RLock()
815,372✔
1679
        // For routes (this can be called by leafnodes), check if the account is
815,372✔
1680
        // transitioning (from pool to dedicated route) and this route is not a
815,372✔
1681
        // per-account route (route.poolIdx >= 0). If so, ignore this subscription.
815,372✔
1682
        // Exclude "no pool" routes from this check.
815,372✔
1683
        if c.kind == ROUTER && !c.route.noPool &&
815,372✔
1684
                acc.routePoolIdx == accTransitioningToDedicatedRoute && c.route.poolIdx >= 0 {
815,372✔
1685
                acc.mu.RUnlock()
×
1686
                c.mu.Unlock()
×
1687
                // Do not return an error, which would cause the connection to be closed.
×
1688
                return nil
×
1689
        }
×
1690
        sl := acc.sl
815,372✔
1691
        acc.mu.RUnlock()
815,372✔
1692

815,372✔
1693
        // We use the sub.sid for the key of the c.subs map.
815,372✔
1694
        key := bytesToString(sub.sid)
815,372✔
1695
        osub := c.subs[key]
815,372✔
1696
        if osub == nil {
1,626,542✔
1697
                c.subs[key] = sub
811,170✔
1698
                // Now place into the account sl.
811,170✔
1699
                if err = sl.Insert(sub); err != nil {
811,170✔
1700
                        delete(c.subs, key)
×
1701
                        c.mu.Unlock()
×
1702
                        c.Errorf("Could not insert subscription: %v", err)
×
1703
                        c.sendErr("Invalid Subscription")
×
1704
                        return nil
×
1705
                }
×
1706
        } else if sub.queue != nil {
8,387✔
1707
                // For a queue we need to update the weight.
4,185✔
1708
                delta = sub.qw - atomic.LoadInt32(&osub.qw)
4,185✔
1709
                atomic.StoreInt32(&osub.qw, sub.qw)
4,185✔
1710
                sl.UpdateRemoteQSub(osub)
4,185✔
1711
        }
4,185✔
1712
        c.mu.Unlock()
815,372✔
1713

815,372✔
1714
        if srv.gateway.enabled {
936,201✔
1715
                srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
120,829✔
1716
        }
120,829✔
1717

1718
        // Now check on leafnode updates.
1719
        acc.updateLeafNodes(sub, delta)
815,372✔
1720

815,372✔
1721
        if c.opts.Verbose {
815,372✔
1722
                c.sendOK()
×
1723
        }
×
1724

1725
        return nil
815,372✔
1726
}
1727

1728
// Lock is held on entry
1729
func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *subscription, isSubProto bool) []byte {
988,382✔
1730
        // If we have an origin cluster and the other side supports leafnode origin clusters
988,382✔
1731
        // send an LS+/LS- version instead.
988,382✔
1732
        if len(sub.origin) > 0 && c.route.lnoc {
1,012,003✔
1733
                if isSubProto {
35,777✔
1734
                        buf = append(buf, lSubBytes...)
12,156✔
1735
                        buf = append(buf, sub.origin...)
12,156✔
1736
                        buf = append(buf, ' ')
12,156✔
1737
                } else {
23,621✔
1738
                        buf = append(buf, lUnsubBytes...)
11,465✔
1739
                        if c.route.lnocu {
22,914✔
1740
                                buf = append(buf, sub.origin...)
11,449✔
1741
                                buf = append(buf, ' ')
11,449✔
1742
                        }
11,449✔
1743
                }
1744
        } else {
964,761✔
1745
                if isSubProto {
1,773,865✔
1746
                        buf = append(buf, rSubBytes...)
809,104✔
1747
                } else {
964,761✔
1748
                        buf = append(buf, rUnsubBytes...)
155,657✔
1749
                }
155,657✔
1750
        }
1751
        if len(c.route.accName) == 0 {
1,200,090✔
1752
                buf = append(buf, accName...)
211,708✔
1753
                buf = append(buf, ' ')
211,708✔
1754
        }
211,708✔
1755
        buf = append(buf, sub.subject...)
988,382✔
1756
        if len(sub.queue) > 0 {
994,790✔
1757
                buf = append(buf, ' ')
6,408✔
1758
                buf = append(buf, sub.queue...)
6,408✔
1759
                // Send our weight if we are a sub proto
6,408✔
1760
                if isSubProto {
11,967✔
1761
                        buf = append(buf, ' ')
5,559✔
1762
                        var b [12]byte
5,559✔
1763
                        var i = len(b)
5,559✔
1764
                        for l := sub.qw; l > 0; l /= 10 {
13,232✔
1765
                                i--
7,673✔
1766
                                b[i] = digits[l%10]
7,673✔
1767
                        }
7,673✔
1768
                        buf = append(buf, b[i:]...)
5,559✔
1769
                }
1770
        }
1771
        buf = append(buf, CR_LF...)
988,382✔
1772
        return buf
988,382✔
1773
}
1774

1775
// sendSubsToRoute will send over our subject interest to
1776
// the remote side. For each account we will send the
1777
// complete interest for all subjects, both normal as a binary
1778
// and queue group weights.
1779
//
1780
// Server lock held on entry.
1781
func (s *Server) sendSubsToRoute(route *client, idx int, account string) {
33,594✔
1782
        var noPool bool
33,594✔
1783
        if idx >= 0 {
59,201✔
1784
                // We need to check if this route is "no_pool" in which case we
25,607✔
1785
                // need to select all accounts.
25,607✔
1786
                route.mu.Lock()
25,607✔
1787
                noPool = route.route.noPool
25,607✔
1788
                route.mu.Unlock()
25,607✔
1789
        }
25,607✔
1790
        // Estimated size of all protocols. It does not have to be accurate at all.
1791
        var eSize int
33,594✔
1792
        estimateProtosSize := func(a *Account, addAccountName bool) {
54,400✔
1793
                if ns := len(a.rm); ns > 0 {
29,707✔
1794
                        var accSize int
8,901✔
1795
                        if addAccountName {
9,865✔
1796
                                accSize = len(a.Name) + 1
964✔
1797
                        }
964✔
1798
                        // Proto looks like: "RS+ [<account name> ]<subject>[ <queue> <weight>]\r\n"
1799
                        eSize += ns * (len(rSubBytes) + 1 + accSize)
8,901✔
1800
                        for key := range a.rm {
624,745✔
1801
                                // Key contains "<subject>[ <queue>]"
615,844✔
1802
                                eSize += len(key)
615,844✔
1803
                                // In case this is a queue, just add some bytes for the queue weight.
615,844✔
1804
                                // If we want to be accurate, would have to check if "key" has a space,
615,844✔
1805
                                // if so, then figure out how many bytes we need to represent the weight.
615,844✔
1806
                                eSize += 5
615,844✔
1807
                        }
615,844✔
1808
                }
1809
        }
1810
        // Send over our account subscriptions.
1811
        accs := make([]*Account, 0, 1024)
33,594✔
1812
        if idx < 0 || account != _EMPTY_ {
41,597✔
1813
                if ai, ok := s.accounts.Load(account); ok {
16,006✔
1814
                        a := ai.(*Account)
8,003✔
1815
                        a.mu.RLock()
8,003✔
1816
                        // Estimate size and add account name in protocol if idx is not -1
8,003✔
1817
                        estimateProtosSize(a, idx >= 0)
8,003✔
1818
                        accs = append(accs, a)
8,003✔
1819
                        a.mu.RUnlock()
8,003✔
1820
                }
8,003✔
1821
        } else {
25,591✔
1822
                s.accounts.Range(func(k, v any) bool {
82,194✔
1823
                        a := v.(*Account)
56,603✔
1824
                        a.mu.RLock()
56,603✔
1825
                        // We are here for regular or pooled routes (not per-account).
56,603✔
1826
                        // So we collect all accounts whose routePoolIdx matches the
56,603✔
1827
                        // one for this route, or only the account provided, or all
56,603✔
1828
                        // accounts if dealing with a "no pool" route.
56,603✔
1829
                        if a.routePoolIdx == idx || noPool {
69,406✔
1830
                                estimateProtosSize(a, true)
12,803✔
1831
                                accs = append(accs, a)
12,803✔
1832
                        }
12,803✔
1833
                        a.mu.RUnlock()
56,603✔
1834
                        return true
56,603✔
1835
                })
1836
        }
1837

1838
        buf := make([]byte, 0, eSize)
33,594✔
1839

33,594✔
1840
        route.mu.Lock()
33,594✔
1841
        for _, a := range accs {
54,400✔
1842
                a.mu.RLock()
20,806✔
1843
                for key, n := range a.rm {
636,650✔
1844
                        var origin, qn []byte
615,844✔
1845
                        s := strings.Fields(key)
615,844✔
1846
                        // Subject will always be the second field (index 1).
615,844✔
1847
                        subj := stringToBytes(s[1])
615,844✔
1848
                        // Check if the key is for a leaf (will be field 0).
615,844✔
1849
                        forLeaf := s[0] == keyRoutedLeafSub
615,844✔
1850
                        // For queue, if not for a leaf, we need 3 fields "R foo bar",
615,844✔
1851
                        // but if for a leaf, we need 4 fields "L foo bar leaf_origin".
615,844✔
1852
                        if l := len(s); (!forLeaf && l == 3) || (forLeaf && l == 4) {
616,108✔
1853
                                qn = stringToBytes(s[2])
264✔
1854
                        }
264✔
1855
                        if forLeaf {
615,902✔
1856
                                // The leaf origin will be the last field.
58✔
1857
                                origin = stringToBytes(s[len(s)-1])
58✔
1858
                        }
58✔
1859
                        // s[1] is the subject and already as a string, so use that
1860
                        // instead of converting back `subj` to a string.
1861
                        if !route.canImport(s[1]) {
616,378✔
1862
                                continue
534✔
1863
                        }
1864
                        sub := subscription{origin: origin, subject: subj, queue: qn, qw: n}
615,310✔
1865
                        buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true)
615,310✔
1866
                }
1867
                a.mu.RUnlock()
20,806✔
1868
        }
1869
        if len(buf) > 0 {
42,470✔
1870
                route.enqueueProto(buf)
8,876✔
1871
                route.Debugf("Sent local subscriptions to route")
8,876✔
1872
        }
8,876✔
1873
        route.mu.Unlock()
33,594✔
1874
}
1875

1876
// Sends SUBs protocols for the given subscriptions. If a filter is specified, it is
1877
// invoked for each subscription. If the filter returns false, the subscription is skipped.
1878
// This function may release the route's lock due to flushing of outbound data. A boolean
1879
// is returned to indicate if the connection has been closed during this call.
1880
// Lock is held on entry.
1881
func (c *client) sendRouteSubProtos(subs []*subscription, trace bool, filter func(sub *subscription) bool) {
152✔
1882
        c.sendRouteSubOrUnSubProtos(subs, true, trace, filter)
152✔
1883
}
152✔
1884

1885
// Sends UNSUBs protocols for the given subscriptions. If a filter is specified, it is
1886
// invoked for each subscription. If the filter returns false, the subscription is skipped.
1887
// This function may release the route's lock due to flushing of outbound data. A boolean
1888
// is returned to indicate if the connection has been closed during this call.
1889
// Lock is held on entry.
1890
func (c *client) sendRouteUnSubProtos(subs []*subscription, trace bool, filter func(sub *subscription) bool) {
11✔
1891
        c.sendRouteSubOrUnSubProtos(subs, false, trace, filter)
11✔
1892
}
11✔
1893

1894
// Low-level function that sends RS+ or RS- protocols for the given subscriptions.
1895
// This can now also send LS+ and LS- for origin cluster based leafnode subscriptions for cluster no-echo.
1896
// Use sendRouteSubProtos or sendRouteUnSubProtos instead for clarity.
1897
// Lock is held on entry.
1898
func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, trace bool, filter func(sub *subscription) bool) {
373,199✔
1899
        var (
373,199✔
1900
                _buf [1024]byte
373,199✔
1901
                buf  = _buf[:0]
373,199✔
1902
        )
373,199✔
1903

373,199✔
1904
        for _, sub := range subs {
748,853✔
1905
                if filter != nil && !filter(sub) {
378,236✔
1906
                        continue
2,582✔
1907
                }
1908
                // Determine the account. If sub has an ImportMap entry, use that, otherwise scoped to
1909
                // client. Default to global if all else fails.
1910
                var accName string
373,072✔
1911
                if sub.client != nil && sub.client != c {
746,144✔
1912
                        sub.client.mu.Lock()
373,072✔
1913
                }
373,072✔
1914
                if sub.im != nil {
375,647✔
1915
                        accName = sub.im.acc.Name
2,575✔
1916
                } else if sub.client != nil && sub.client.acc != nil {
743,569✔
1917
                        accName = sub.client.acc.Name
370,497✔
1918
                } else {
370,497✔
1919
                        c.Debugf("Falling back to default account for sending subs")
×
1920
                        accName = globalAccountName
×
1921
                }
×
1922
                if sub.client != nil && sub.client != c {
746,144✔
1923
                        sub.client.mu.Unlock()
373,072✔
1924
                }
373,072✔
1925

1926
                as := len(buf)
373,072✔
1927
                buf = c.addRouteSubOrUnsubProtoToBuf(buf, accName, sub, isSubProto)
373,072✔
1928
                if trace {
374,347✔
1929
                        c.traceOutOp("", buf[as:len(buf)-LEN_CR_LF])
1,275✔
1930
                }
1,275✔
1931
        }
1932
        c.enqueueProto(buf)
373,199✔
1933
}
1934

1935
func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, gossipMode byte, accName string) *client {
61,522✔
1936
        // Snapshot server options.
61,522✔
1937
        opts := s.getOpts()
61,522✔
1938

61,522✔
1939
        didSolicit := rURL != nil
61,522✔
1940
        r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, gossipMode: gossipMode}
61,522✔
1941

61,522✔
1942
        c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()}
61,522✔
1943

61,522✔
1944
        // Is the server configured for compression?
61,522✔
1945
        compressionConfigured := needsCompression(opts.Cluster.Compression.Mode)
61,522✔
1946

61,522✔
1947
        var infoJSON []byte
61,522✔
1948
        // Grab server variables and generates route INFO Json. Note that we set
61,522✔
1949
        // and reset some of s.routeInfo fields when that happens, so we need
61,522✔
1950
        // the server write lock.
61,522✔
1951
        s.mu.Lock()
61,522✔
1952
        // If we are creating a pooled connection and this is the server soliciting
61,522✔
1953
        // the connection, we will delay sending the INFO after we have processed
61,522✔
1954
        // the incoming INFO from the remote. Also delay if configured for compression.
61,522✔
1955
        delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts))
61,522✔
1956
        if !delayInfo {
92,996✔
1957
                infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, gossipMode)
31,474✔
1958
        }
31,474✔
1959
        authRequired := s.routeInfo.AuthRequired
61,522✔
1960
        tlsRequired := s.routeInfo.TLSRequired
61,522✔
1961
        clusterName := s.info.Cluster
61,522✔
1962
        tlsName := s.routeTLSName
61,522✔
1963
        s.mu.Unlock()
61,522✔
1964

61,522✔
1965
        // Grab lock
61,522✔
1966
        c.mu.Lock()
61,522✔
1967

61,522✔
1968
        // Initialize
61,522✔
1969
        c.initClient()
61,522✔
1970

61,522✔
1971
        if didSolicit {
92,138✔
1972
                // Do this before the TLS code, otherwise, in case of failure
30,616✔
1973
                // and if route is explicit, it would try to reconnect to 'nil'...
30,616✔
1974
                r.url = rURL
30,616✔
1975
                r.accName = []byte(accName)
30,616✔
1976
        } else {
61,522✔
1977
                c.flags.set(expectConnect)
30,906✔
1978
        }
30,906✔
1979

1980
        // Check for TLS
1981
        if tlsRequired {
61,747✔
1982
                tlsConfig := opts.Cluster.TLSConfig
225✔
1983
                if didSolicit {
285✔
1984
                        // Copy off the config to add in ServerName if we need to.
60✔
1985
                        tlsConfig = tlsConfig.Clone()
60✔
1986
                }
60✔
1987
                // Perform (server or client side) TLS handshake.
1988
                if resetTLSName, err := c.doTLSHandshake("route", didSolicit, rURL, tlsConfig, tlsName, opts.Cluster.TLSTimeout, opts.Cluster.TLSPinnedCerts); err != nil {
338✔
1989
                        c.mu.Unlock()
113✔
1990
                        if resetTLSName {
113✔
1991
                                s.mu.Lock()
×
1992
                                s.routeTLSName = _EMPTY_
×
1993
                                s.mu.Unlock()
×
1994
                        }
×
1995
                        return nil
113✔
1996
                }
1997
        }
1998

1999
        // Do final client initialization
2000

2001
        // Initialize the per-account cache.
2002
        c.in.pacache = make(map[string]*perAccountCache)
61,409✔
2003
        if didSolicit {
92,019✔
2004
                // Set permissions associated with the route user (if applicable).
30,610✔
2005
                // No lock needed since we are already under client lock.
30,610✔
2006
                c.setRoutePermissions(opts.Cluster.Permissions)
30,610✔
2007
        }
30,610✔
2008

2009
        // We can't safely send the pings until we have negotiated compression
2010
        // with the remote, but we want to protect against a connection that
2011
        // does not perform the handshake. We will start a timer that will close
2012
        // the connection as stale based on the ping interval and max out values,
2013
        // but without actually sending pings.
2014
        if compressionConfigured {
121,520✔
2015
                pingInterval := opts.PingInterval
60,111✔
2016
                pingMax := opts.MaxPingsOut
60,111✔
2017
                if opts.Cluster.PingInterval > 0 {
60,111✔
2018
                        pingInterval = opts.Cluster.PingInterval
×
2019
                }
×
2020
                if opts.Cluster.MaxPingsOut > 0 {
60,111✔
2021
                        pingMax = opts.MaxPingsOut
×
2022
                }
×
2023
                c.watchForStaleConnection(adjustPingInterval(ROUTER, pingInterval), pingMax)
60,111✔
2024
        } else {
1,298✔
2025
                // Set the Ping timer
1,298✔
2026
                c.setFirstPingTimer()
1,298✔
2027
        }
1,298✔
2028

2029
        // For routes, the "client" is added to s.routes only when processing
2030
        // the INFO protocol, that is much later.
2031
        // In the meantime, if the server shutsdown, there would be no reference
2032
        // to the client (connection) to be closed, leaving this readLoop
2033
        // uinterrupted, causing the Shutdown() to wait indefinitively.
2034
        // We need to store the client in a special map, under a special lock.
2035
        if !s.addToTempClients(c.cid, c) {
61,411✔
2036
                c.mu.Unlock()
2✔
2037
                c.setNoReconnect()
2✔
2038
                c.closeConnection(ServerShutdown)
2✔
2039
                return nil
2✔
2040
        }
2✔
2041

2042
        // Check for Auth required state for incoming connections.
2043
        // Make sure to do this before spinning up readLoop.
2044
        if authRequired && !didSolicit {
61,554✔
2045
                ttl := secondsToDuration(opts.Cluster.AuthTimeout)
147✔
2046
                c.setAuthTimer(ttl)
147✔
2047
        }
147✔
2048

2049
        // Spin up the read loop.
2050
        s.startGoRoutine(func() { c.readLoop(nil) })
122,814✔
2051

2052
        // Spin up the write loop.
2053
        s.startGoRoutine(func() { c.writeLoop() })
122,814✔
2054

2055
        if tlsRequired {
61,519✔
2056
                c.Debugf("TLS handshake complete")
112✔
2057
                cs := c.nc.(*tls.Conn).ConnectionState()
112✔
2058
                c.Debugf("TLS version %s, cipher suite %s", tlsVersion(cs.Version), tls.CipherSuiteName(cs.CipherSuite))
112✔
2059
        }
112✔
2060

2061
        // Queue Connect proto if we solicited the connection.
2062
        if didSolicit {
92,016✔
2063
                c.Debugf("Route connect msg sent")
30,609✔
2064
                if err := c.sendRouteConnect(clusterName, tlsRequired); err != nil {
30,609✔
2065
                        c.mu.Unlock()
×
2066
                        c.closeConnection(ProtocolViolation)
×
2067
                        return nil
×
2068
                }
×
2069
        }
2070

2071
        if !delayInfo {
92,770✔
2072
                // Send our info to the other side.
31,363✔
2073
                // Our new version requires dynamic information for accounts and a nonce.
31,363✔
2074
                c.enqueueProto(infoJSON)
31,363✔
2075
        }
31,363✔
2076
        c.mu.Unlock()
61,407✔
2077

61,407✔
2078
        c.Noticef("Route connection created")
61,407✔
2079
        return c
61,407✔
2080
}
2081

2082
func routeShouldDelayInfo(accName string, opts *Options) bool {
1,128✔
2083
        return accName == _EMPTY_ && opts.Cluster.PoolSize >= 1
1,128✔
2084
}
1,128✔
2085

2086
// Generates a nonce and set some route info's fields before marshal'ing into JSON.
2087
// To be used only when a route is created (to send the initial INFO protocol).
2088
//
2089
// Server lock held on entry.
2090
func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, gossipMode byte) []byte {
48,011✔
2091
        // New proto wants a nonce (although not used in routes, that is, not signed in CONNECT)
48,011✔
2092
        var raw [nonceLen]byte
48,011✔
2093
        nonce := raw[:]
48,011✔
2094
        s.generateNonce(nonce)
48,011✔
2095
        ri := &s.routeInfo
48,011✔
2096
        // Override compression with s2_auto instead of actual compression level.
48,011✔
2097
        if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto {
48,013✔
2098
                compression = CompressionS2Auto
2✔
2099
        }
2✔
2100
        ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = string(nonce), accName, poolIdx, compression, gossipMode
48,011✔
2101
        infoJSON := generateInfoJSON(&s.routeInfo)
48,011✔
2102
        // Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send.
48,011✔
2103
        // Same for some other fields.
48,011✔
2104
        ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = _EMPTY_, _EMPTY_, 0, _EMPTY_, 0
48,011✔
2105
        return infoJSON
48,011✔
2106
}
2107

2108
const (
2109
        _CRLF_  = "\r\n"
2110
        _EMPTY_ = ""
2111
)
2112

2113
func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMode byte, info *Info, accName string) bool {
41,369✔
2114
        id := info.ID
41,369✔
2115

41,369✔
2116
        var acc *Account
41,369✔
2117
        if accName != _EMPTY_ {
52,965✔
2118
                var err error
11,596✔
2119
                acc, err = s.LookupAccount(accName)
11,596✔
2120
                if err != nil {
11,596✔
2121
                        c.sendErrAndErr(fmt.Sprintf("Unable to lookup account %q: %v", accName, err))
×
2122
                        c.closeConnection(MissingAccount)
×
2123
                        return false
×
2124
                }
×
2125
        }
2126

2127
        s.mu.Lock()
41,369✔
2128
        if !s.isRunning() || s.routesReject {
41,369✔
2129
                s.mu.Unlock()
×
2130
                return false
×
2131
        }
×
2132
        var invProtoErr string
41,369✔
2133

41,369✔
2134
        opts := s.getOpts()
41,369✔
2135

41,369✔
2136
        // Assume we are in pool mode if info.RoutePoolSize is set. We may disable
41,369✔
2137
        // in some cases.
41,369✔
2138
        pool := info.RoutePoolSize > 0
41,369✔
2139
        // This is used to prevent a server with pooling to constantly trying
41,369✔
2140
        // to connect to a server with no pooling (for instance old server) after
41,369✔
2141
        // the first connection is established.
41,369✔
2142
        var noReconnectForOldServer bool
41,369✔
2143

41,369✔
2144
        // To allow rolling updates, we now allow servers with different pool sizes
41,369✔
2145
        // so we will use as the effective pool size here, the max between our
41,369✔
2146
        // configured size and the size we receive in the info protocol.
41,369✔
2147
        effectivePoolSize := max(s.routesPoolSize, info.RoutePoolSize)
41,369✔
2148

41,369✔
2149
        // If the remote is an old server, info.RoutePoolSize will be 0, or if
41,369✔
2150
        // this server's Cluster.PoolSize is negative, we will behave as an old
41,369✔
2151
        // server and need to handle things differently.
41,369✔
2152
        if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 {
42,534✔
2153
                if accName != _EMPTY_ {
1,173✔
2154
                        invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName)
8✔
2155
                        // In this case, make sure this route does not attempt to reconnect
8✔
2156
                        c.setNoReconnect()
8✔
2157
                } else {
1,165✔
2158
                        // We will accept, but treat this remote has "no pool"
1,157✔
2159
                        pool, noReconnectForOldServer = false, true
1,157✔
2160
                        c.mu.Lock()
1,157✔
2161
                        c.route.poolIdx = 0
1,157✔
2162
                        c.route.noPool = true
1,157✔
2163
                        c.mu.Unlock()
1,157✔
2164
                        // Keep track of number of routes like that. We will use that when
1,157✔
2165
                        // sending subscriptions over routes.
1,157✔
2166
                        s.routesNoPool++
1,157✔
2167
                }
1,157✔
2168
        } else if didSolicit {
63,947✔
2169
                // For solicited route, the incoming's RoutePoolIdx should not be set.
23,743✔
2170
                if info.RoutePoolIdx != 0 {
23,743✔
2171
                        invProtoErr = fmt.Sprintf("Route pool index should not be set but is set to %v", info.RoutePoolIdx)
×
2172
                }
×
2173
        } else if info.RoutePoolIdx < 0 || info.RoutePoolIdx >= effectivePoolSize {
16,461✔
2174
                // For non solicited routes, if the remote sends a RoutePoolIdx, make
×
2175
                // sure it is a valid one (in range of the pool size).
×
2176
                invProtoErr = fmt.Sprintf("Invalid route pool index: %v - pool size is %v", info.RoutePoolIdx, info.RoutePoolSize)
×
2177
        }
×
2178
        if invProtoErr != _EMPTY_ {
41,377✔
2179
                s.mu.Unlock()
8✔
2180
                c.sendErrAndErr(invProtoErr)
8✔
2181
                c.closeConnection(ProtocolViolation)
8✔
2182
                return false
8✔
2183
        }
8✔
2184
        // If accName is set, we are dealing with a per-account connection.
2185
        if accName != _EMPTY_ {
52,949✔
2186
                // When an account has its own route, it will be an error if the given
11,588✔
2187
                // account name is not found in s.accRoutes map.
11,588✔
2188
                conns, exists := s.accRoutes[accName]
11,588✔
2189
                if !exists {
11,590✔
2190
                        s.mu.Unlock()
2✔
2191
                        c.sendErrAndErr(fmt.Sprintf("No route for account %q", accName))
2✔
2192
                        c.closeConnection(ProtocolViolation)
2✔
2193
                        return false
2✔
2194
                }
2✔
2195
                remote, exists := conns[id]
11,586✔
2196
                if !exists {
19,573✔
2197
                        conns[id] = c
7,987✔
2198
                        c.mu.Lock()
7,987✔
2199
                        idHash := c.route.idHash
7,987✔
2200
                        cid := c.cid
7,987✔
2201
                        rtype := c.route.routeType
7,987✔
2202
                        if sendDelayedInfo {
12,010✔
2203
                                cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression)
4,023✔
2204
                                c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, gossipMode))
4,023✔
2205
                        }
4,023✔
2206
                        if c.last.IsZero() {
12,012✔
2207
                                c.last = time.Now()
4,025✔
2208
                        }
4,025✔
2209
                        if acc != nil {
15,974✔
2210
                                c.acc = acc
7,987✔
2211
                        }
7,987✔
2212
                        c.mu.Unlock()
7,987✔
2213

7,987✔
2214
                        // Store this route with key being the route id hash + account name
7,987✔
2215
                        s.storeRouteByHash(idHash+accName, c)
7,987✔
2216

7,987✔
2217
                        // Now that we have registered the route, we can remove from the temp map.
7,987✔
2218
                        s.removeFromTempClients(cid)
7,987✔
2219

7,987✔
2220
                        // We don't need to send if the only route is the one we just accepted.
7,987✔
2221
                        if len(conns) > 1 {
12,439✔
2222
                                s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode)
4,452✔
2223
                        }
4,452✔
2224

2225
                        // Send subscription interest
2226
                        s.sendSubsToRoute(c, -1, accName)
7,987✔
2227
                } else {
3,599✔
2228
                        handleDuplicateRoute(remote, c, true)
3,599✔
2229
                }
3,599✔
2230
                s.mu.Unlock()
11,586✔
2231
                return !exists
11,586✔
2232
        }
2233
        var remote *client
29,773✔
2234
        // That will be the position of the connection in the slice, we initialize
29,773✔
2235
        // to -1 to indicate that no space was found.
29,773✔
2236
        idx := -1
29,773✔
2237
        // This will be the size (or number of connections) in a given slice.
29,773✔
2238
        sz := 0
29,773✔
2239
        // Check if we know about the remote server
29,773✔
2240
        conns, exists := s.routes[id]
29,773✔
2241
        if !exists {
38,954✔
2242
                // Now, create a slice for route connections of the size of the pool
9,181✔
2243
                // or 1 when not in pool mode.
9,181✔
2244
                conns = make([]*client, effectivePoolSize)
9,181✔
2245
                // Track this slice for this remote server.
9,181✔
2246
                s.routes[id] = conns
9,181✔
2247
                // Set the index to info.RoutePoolIdx because if this is a solicited
9,181✔
2248
                // route, this value will be 0, which is what we want, otherwise, we
9,181✔
2249
                // will use whatever index the remote has chosen.
9,181✔
2250
                idx = info.RoutePoolIdx
9,181✔
2251
        } else if pool {
50,298✔
2252
                // The remote could have done a config reload and increased the pool size.
20,525✔
2253
                // It will close the connections before soliciting again, however, if
20,525✔
2254
                // on this side, one of the route is not yet fully removed, but the
20,525✔
2255
                // first one is, it would accept the new connection (with a greater pool
20,525✔
2256
                // size) and we would not go through the phase of `!exists` above creating
20,525✔
2257
                // the slice with the right size. So we need to check here and add new empty
20,525✔
2258
                // entries to complete the effective pool size.
20,525✔
2259
                if n := effectivePoolSize - len(conns); n > 0 {
20,525✔
2260
                        for range n {
×
2261
                                conns = append(conns, nil)
×
2262
                        }
×
2263
                        s.routes[id] = conns
×
2264
                }
2265
                // The remote was found. If this is a non solicited route, we will place
2266
                // the connection in the pool at the index given by info.RoutePoolIdx.
2267
                // But if there is already one, close this incoming connection as a
2268
                // duplicate.
2269
                if !didSolicit {
28,947✔
2270
                        idx = info.RoutePoolIdx
8,422✔
2271
                        if remote = conns[idx]; remote != nil {
8,802✔
2272
                                handleDuplicateRoute(remote, c, false)
380✔
2273
                                s.mu.Unlock()
380✔
2274
                                return false
380✔
2275
                        }
380✔
2276
                        // Look if there is a solicited route in the pool. If there is one,
2277
                        // they should all be, so stop at the first.
2278
                        if url, rtype, hasSolicited := hasSolicitedRoute(conns); hasSolicited {
8,558✔
2279
                                upgradeRouteToSolicited(c, url, rtype)
516✔
2280
                        }
516✔
2281
                } else {
12,103✔
2282
                        // If we solicit, upgrade to solicited all non-solicited routes that
12,103✔
2283
                        // we may have registered.
12,103✔
2284
                        c.mu.Lock()
12,103✔
2285
                        url := c.route.url
12,103✔
2286
                        rtype := c.route.routeType
12,103✔
2287
                        c.mu.Unlock()
12,103✔
2288
                        for _, r := range conns {
48,497✔
2289
                                upgradeRouteToSolicited(r, url, rtype)
36,394✔
2290
                        }
36,394✔
2291
                }
2292
                // For all cases (solicited and not) we need to count how many connections
2293
                // we already have, and for solicited route, we will find a free spot in
2294
                // the slice.
2295
                for i, r := range conns {
80,747✔
2296
                        if idx == -1 && r == nil {
68,970✔
2297
                                idx = i
8,368✔
2298
                        } else if r != nil {
96,518✔
2299
                                sz++
35,916✔
2300
                        }
35,916✔
2301
                }
2302
        } else {
67✔
2303
                remote = conns[0]
67✔
2304
        }
67✔
2305
        // If there is a spot, idx will be greater or equal to 0.
2306
        if idx >= 0 {
54,984✔
2307
                c.mu.Lock()
25,591✔
2308
                c.route.connectURLs = info.ClientConnectURLs
25,591✔
2309
                c.route.wsConnURLs = info.WSConnectURLs
25,591✔
2310
                c.route.poolIdx = idx
25,591✔
2311
                rtype := c.route.routeType
25,591✔
2312
                cid := c.cid
25,591✔
2313
                idHash := c.route.idHash
25,591✔
2314
                rHash := c.route.hash
25,591✔
2315
                rn := c.route.remoteName
25,591✔
2316
                url := c.route.url
25,591✔
2317
                if sendDelayedInfo {
38,052✔
2318
                        cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression)
12,461✔
2319
                        c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, gossipMode))
12,461✔
2320
                }
12,461✔
2321
                if c.last.IsZero() {
38,566✔
2322
                        c.last = time.Now()
12,975✔
2323
                }
12,975✔
2324
                c.mu.Unlock()
25,591✔
2325

25,591✔
2326
                // With pooling, we keep track of the remote's configured route pool size.
25,591✔
2327
                // We do so when adding the connection in the first slot, not when `sz == 1`
25,591✔
2328
                // because there could be situations where we have old connections that have
25,591✔
2329
                // not yet been removed and so we would not have `sz == `. However, we will
25,591✔
2330
                // always have the condition where we are adding the new connection at `idx==0`
25,591✔
2331
                // so use that as the condition to store the remote pool size.
25,591✔
2332
                if pool && idx == 0 {
33,682✔
2333
                        if s.remoteRoutePoolSize == nil {
11,654✔
2334
                                s.remoteRoutePoolSize = make(map[string]int)
3,563✔
2335
                        }
3,563✔
2336
                        s.remoteRoutePoolSize[id] = info.RoutePoolSize
8,091✔
2337
                }
2338

2339
                // Add to the slice and bump the count of connections for this remote
2340
                conns[idx] = c
25,591✔
2341
                sz++
25,591✔
2342
                // This boolean will indicate that we are registering the only
25,591✔
2343
                // connection in non pooled situation or we stored the very first
25,591✔
2344
                // connection for a given remote server.
25,591✔
2345
                doOnce := !pool || sz == 1
25,591✔
2346
                if doOnce {
34,772✔
2347
                        // check to be consistent and future proof. but will be same domain
9,181✔
2348
                        if s.sameDomain(info.Domain) {
18,362✔
2349
                                s.nodeToInfo.Store(rHash, nodeInfo{
9,181✔
2350
                                        name:            rn,
9,181✔
2351
                                        version:         s.info.Version,
9,181✔
2352
                                        cluster:         s.info.Cluster,
9,181✔
2353
                                        domain:          info.Domain,
9,181✔
2354
                                        id:              id,
9,181✔
2355
                                        tags:            nil,
9,181✔
2356
                                        cfg:             nil,
9,181✔
2357
                                        stats:           nil,
9,181✔
2358
                                        offline:         false,
9,181✔
2359
                                        js:              info.JetStream,
9,181✔
2360
                                        binarySnapshots: true, // Updated default to true. Versions 2.10.0+ support it.
9,181✔
2361
                                        accountNRG:      false,
9,181✔
2362
                                })
9,181✔
2363
                        }
9,181✔
2364
                }
2365

2366
                // Store this route using the hash as the key
2367
                if pool {
50,092✔
2368
                        idHash += strconv.Itoa(idx)
24,501✔
2369
                }
24,501✔
2370
                s.storeRouteByHash(idHash, c)
25,591✔
2371

25,591✔
2372
                // Now that we have registered the route, we can remove from the temp map.
25,591✔
2373
                s.removeFromTempClients(cid)
25,591✔
2374

25,591✔
2375
                if doOnce {
34,772✔
2376
                        // If the INFO contains a Gateway URL, add it to the list for our cluster.
9,181✔
2377
                        if info.GatewayURL != _EMPTY_ && s.addGatewayURL(info.GatewayURL) {
11,029✔
2378
                                s.sendAsyncGatewayInfo()
1,848✔
2379
                        }
1,848✔
2380

2381
                        // We don't need to send if the only route is the one we just accepted.
2382
                        if len(s.routes) > 1 {
14,203✔
2383
                                s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode)
5,022✔
2384
                        }
5,022✔
2385

2386
                        // Send info about the known gateways to this route.
2387
                        s.sendGatewayConfigsToRoute(c)
9,181✔
2388

9,181✔
2389
                        // Unless disabled, possibly update the server's INFO protocol
9,181✔
2390
                        // and send to clients that know how to handle async INFOs.
9,181✔
2391
                        if !opts.Cluster.NoAdvertise {
18,285✔
2392
                                s.addConnectURLsAndSendINFOToClients(info.ClientConnectURLs, info.WSConnectURLs)
9,104✔
2393
                        }
9,104✔
2394

2395
                        // Add the remote's leafnodeURL to our list of URLs and send the update
2396
                        // to all LN connections. (Note that when coming from a route, LeafNodeURLs
2397
                        // is an array of size 1 max).
2398
                        if len(info.LeafNodeURLs) == 1 && s.addLeafNodeURL(info.LeafNodeURLs[0]) {
16,448✔
2399
                                s.sendAsyncLeafNodeInfo()
7,267✔
2400
                        }
7,267✔
2401
                }
2402

2403
                // Send the subscriptions interest.
2404
                s.sendSubsToRoute(c, idx, _EMPTY_)
25,591✔
2405

25,591✔
2406
                // In pool mode, if we did not yet reach the cap, try to connect a new connection,
25,591✔
2407
                // but do so only after receiving the first PONG to our PING, which will ensure
25,591✔
2408
                // that we have proper authentication.
25,591✔
2409
                if pool && didSolicit && sz != effectivePoolSize {
33,844✔
2410
                        c.mu.Lock()
8,253✔
2411
                        c.route.startNewRoute = &routeInfo{
8,253✔
2412
                                url:        url,
8,253✔
2413
                                rtype:      rtype,
8,253✔
2414
                                gossipMode: gossipMode,
8,253✔
2415
                        }
8,253✔
2416
                        c.sendPing()
8,253✔
2417
                        c.mu.Unlock()
8,253✔
2418
                }
8,253✔
2419
        }
2420
        s.mu.Unlock()
29,393✔
2421
        if pool {
57,629✔
2422
                if idx == -1 {
31,971✔
2423
                        // Was full, so need to close connection
3,735✔
2424
                        c.Debugf("Route pool size reached, closing extra connection to %q", id)
3,735✔
2425
                        handleDuplicateRoute(nil, c, true)
3,735✔
2426
                        return false
3,735✔
2427
                }
3,735✔
2428
                return true
24,501✔
2429
        }
2430
        // This is for non-pool mode at this point.
2431
        if exists {
1,224✔
2432
                handleDuplicateRoute(remote, c, noReconnectForOldServer)
67✔
2433
        }
67✔
2434

2435
        return !exists
1,157✔
2436
}
2437

2438
func hasSolicitedRoute(conns []*client) (*url.URL, RouteType, bool) {
8,042✔
2439
        var url *url.URL
8,042✔
2440
        var rtype RouteType
8,042✔
2441
        for _, r := range conns {
31,218✔
2442
                if r == nil {
34,599✔
2443
                        continue
11,423✔
2444
                }
2445
                r.mu.Lock()
11,753✔
2446
                if r.route.didSolicit {
12,269✔
2447
                        url = r.route.url
516✔
2448
                        rtype = r.route.routeType
516✔
2449
                }
516✔
2450
                r.mu.Unlock()
11,753✔
2451
                if url != nil {
12,269✔
2452
                        return url, rtype, true
516✔
2453
                }
516✔
2454
        }
2455
        return nil, 0, false
7,526✔
2456
}
2457

2458
func upgradeRouteToSolicited(r *client, url *url.URL, rtype RouteType) {
36,910✔
2459
        if r == nil {
49,492✔
2460
                return
12,582✔
2461
        }
12,582✔
2462
        r.mu.Lock()
24,328✔
2463
        if !r.route.didSolicit {
34,939✔
2464
                r.route.didSolicit = true
10,611✔
2465
                r.route.url = url
10,611✔
2466
        }
10,611✔
2467
        if rtype == Explicit {
48,266✔
2468
                r.route.routeType = Explicit
23,938✔
2469
        }
23,938✔
2470
        r.mu.Unlock()
24,328✔
2471
}
2472

2473
func handleDuplicateRoute(remote, c *client, setNoReconnect bool) {
7,781✔
2474
        // We used to clear some fields when closing a duplicate connection
7,781✔
2475
        // to prevent sending INFO protocols for the remotes to update
7,781✔
2476
        // their leafnode/gateway URLs. This is no longer needed since
7,781✔
2477
        // removeRoute() now does the right thing of doing that only when
7,781✔
2478
        // the closed connection was an added route connection.
7,781✔
2479
        c.mu.Lock()
7,781✔
2480
        didSolicit := c.route.didSolicit
7,781✔
2481
        url := c.route.url
7,781✔
2482
        rtype := c.route.routeType
7,781✔
2483
        if setNoReconnect {
15,182✔
2484
                c.flags.set(noReconnect)
7,401✔
2485
        }
7,401✔
2486
        c.mu.Unlock()
7,781✔
2487

7,781✔
2488
        if remote == nil {
11,516✔
2489
                return
3,735✔
2490
        }
3,735✔
2491

2492
        remote.mu.Lock()
4,046✔
2493
        if didSolicit && !remote.route.didSolicit {
7,598✔
2494
                remote.route.didSolicit = true
3,552✔
2495
                remote.route.url = url
3,552✔
2496
        }
3,552✔
2497
        // The extra route might be an configured explicit route
2498
        // so keep the state that the remote was configured.
2499
        if rtype == Explicit {
7,590✔
2500
                remote.route.routeType = rtype
3,544✔
2501
        }
3,544✔
2502
        // This is to mitigate the issue where both sides add the route
2503
        // on the opposite connection, and therefore end-up with both
2504
        // connections being dropped.
2505
        remote.route.retry = true
4,046✔
2506
        remote.mu.Unlock()
4,046✔
2507
}
2508

2509
// Import filter check.
2510
func (c *client) importFilter(sub *subscription) bool {
373,036✔
2511
        if c.perms == nil {
746,021✔
2512
                return true
372,985✔
2513
        }
372,985✔
2514
        return c.canImport(string(sub.subject))
51✔
2515
}
2516

2517
// updateRouteSubscriptionMap will make sure to update the route map for the subscription. Will
2518
// also forward to all routes if needed.
2519
func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, delta int32) {
785,489✔
2520
        if acc == nil || sub == nil {
785,489✔
2521
                return
×
2522
        }
×
2523

2524
        // We only store state on local subs for transmission across all other routes.
2525
        if sub.client == nil || sub.client.kind == ROUTER || sub.client.kind == GATEWAY {
785,489✔
2526
                return
×
2527
        }
×
2528

2529
        if sub.si {
836,512✔
2530
                return
51,023✔
2531
        }
51,023✔
2532

2533
        // Copy to hold outside acc lock.
2534
        var n int32
734,466✔
2535
        var ok bool
734,466✔
2536

734,466✔
2537
        isq := len(sub.queue) > 0
734,466✔
2538

734,466✔
2539
        accLock := func() {
1,468,932✔
2540
                // Not required for code correctness, but helps reduce the number of
734,466✔
2541
                // updates sent to the routes when processing high number of concurrent
734,466✔
2542
                // queue subscriptions updates (sub/unsub).
734,466✔
2543
                // See https://github.com/nats-io/nats-server/pull/1126 for more details.
734,466✔
2544
                if isq {
757,431✔
2545
                        acc.sqmu.Lock()
22,965✔
2546
                }
22,965✔
2547
                acc.mu.Lock()
734,466✔
2548
        }
2549
        accUnlock := func() {
1,468,932✔
2550
                acc.mu.Unlock()
734,466✔
2551
                if isq {
757,431✔
2552
                        acc.sqmu.Unlock()
22,965✔
2553
                }
22,965✔
2554
        }
2555

2556
        accLock()
734,466✔
2557

734,466✔
2558
        // This is non-nil when we know we are in cluster mode.
734,466✔
2559
        rm, lqws := acc.rm, acc.lqws
734,466✔
2560
        if rm == nil {
892,659✔
2561
                accUnlock()
158,193✔
2562
                return
158,193✔
2563
        }
158,193✔
2564

2565
        // Create the subscription key which will prevent collisions between regular
2566
        // and leaf routed subscriptions. See keyFromSubWithOrigin() for details.
2567
        key := keyFromSubWithOrigin(sub)
576,273✔
2568

576,273✔
2569
        // Decide whether we need to send an update out to all the routes.
576,273✔
2570
        update := isq
576,273✔
2571

576,273✔
2572
        // This is where we do update to account. For queues we need to take
576,273✔
2573
        // special care that this order of updates is same as what is sent out
576,273✔
2574
        // over routes.
576,273✔
2575
        if n, ok = rm[key]; ok {
703,462✔
2576
                n += delta
127,189✔
2577
                if n <= 0 {
248,564✔
2578
                        delete(rm, key)
121,375✔
2579
                        if isq {
131,790✔
2580
                                delete(lqws, key)
10,415✔
2581
                        }
10,415✔
2582
                        update = true // Update for deleting (N->0)
121,375✔
2583
                } else {
5,814✔
2584
                        rm[key] = n
5,814✔
2585
                }
5,814✔
2586
        } else if delta > 0 {
893,848✔
2587
                n = delta
444,764✔
2588
                rm[key] = delta
444,764✔
2589
                update = true // Adding a new entry for normal sub means update (0->1)
444,764✔
2590
        }
444,764✔
2591

2592
        accUnlock()
576,273✔
2593

576,273✔
2594
        if !update {
584,812✔
2595
                return
8,539✔
2596
        }
8,539✔
2597

2598
        // If we are sending a queue sub, make a copy and place in the queue weight.
2599
        // FIXME(dlc) - We can be smarter here and avoid copying and acquiring the lock.
2600
        if isq {
590,250✔
2601
                sub.client.mu.Lock()
22,516✔
2602
                nsub := *sub
22,516✔
2603
                sub.client.mu.Unlock()
22,516✔
2604
                nsub.qw = n
22,516✔
2605
                sub = &nsub
22,516✔
2606
        }
22,516✔
2607

2608
        // We need to send out this update. Gather routes
2609
        var _routes [32]*client
567,734✔
2610
        routes := _routes[:0]
567,734✔
2611

567,734✔
2612
        s.mu.RLock()
567,734✔
2613
        // The account's routePoolIdx field is set/updated under the server lock
567,734✔
2614
        // (but also the account's lock). So we don't need to acquire the account's
567,734✔
2615
        // lock here to get the value.
567,734✔
2616
        if poolIdx := acc.routePoolIdx; poolIdx < 0 {
953,116✔
2617
                if conns, ok := s.accRoutes[acc.Name]; ok {
770,764✔
2618
                        for _, r := range conns {
685,368✔
2619
                                routes = append(routes, r)
299,986✔
2620
                        }
299,986✔
2621
                }
2622
                if s.routesNoPool > 0 {
385,382✔
2623
                        // We also need to look for "no pool" remotes (that is, routes to older
×
2624
                        // servers or servers that have explicitly disabled pooling).
×
2625
                        s.forEachRemote(func(r *client) {
×
2626
                                r.mu.Lock()
×
2627
                                if r.route.noPool {
×
2628
                                        routes = append(routes, r)
×
2629
                                }
×
2630
                                r.mu.Unlock()
×
2631
                        })
2632
                }
2633
        } else {
182,352✔
2634
                // We can't use s.forEachRouteIdx here since we want to check/get the
182,352✔
2635
                // "no pool" route ONLY if we don't find a route at the given `poolIdx`.
182,352✔
2636
                for _, conns := range s.routes {
255,509✔
2637
                        if r := conns[poolIdx]; r != nil {
146,209✔
2638
                                routes = append(routes, r)
73,052✔
2639
                        } else if s.routesNoPool > 0 {
73,157✔
2640
                                // Check if we have a "no pool" route at index 0, and if so, it
×
2641
                                // means that for this remote, we have a single connection because
×
2642
                                // that server does not have pooling.
×
2643
                                if r := conns[0]; r != nil {
×
2644
                                        r.mu.Lock()
×
2645
                                        if r.route.noPool {
×
2646
                                                routes = append(routes, r)
×
2647
                                        }
×
2648
                                        r.mu.Unlock()
×
2649
                                }
2650
                        }
2651
                }
2652
        }
2653
        trace := atomic.LoadInt32(&s.logging.trace) == 1
567,734✔
2654
        s.mu.RUnlock()
567,734✔
2655

567,734✔
2656
        // If we are a queue subscriber we need to make sure our updates are serialized from
567,734✔
2657
        // potential multiple connections. We want to make sure that the order above is preserved
567,734✔
2658
        // here but not necessarily all updates need to be sent. We need to block and recheck the
567,734✔
2659
        // n count with the lock held through sending here. We will suppress duplicate sends of same qw.
567,734✔
2660
        if isq {
590,250✔
2661
                // However, we can't hold the acc.mu lock since we allow client.mu.Lock -> acc.mu.Lock
22,516✔
2662
                // but not the opposite. So use a dedicated lock while holding the route's lock.
22,516✔
2663
                acc.sqmu.Lock()
22,516✔
2664
                defer acc.sqmu.Unlock()
22,516✔
2665

22,516✔
2666
                acc.mu.Lock()
22,516✔
2667
                n = rm[key]
22,516✔
2668
                sub.qw = n
22,516✔
2669
                // Check the last sent weight here. If same, then someone
22,516✔
2670
                // beat us to it and we can just return here. Otherwise update
22,516✔
2671
                if ls, ok := lqws[key]; ok && ls == n {
22,517✔
2672
                        acc.mu.Unlock()
1✔
2673
                        return
1✔
2674
                } else if n > 0 {
34,614✔
2675
                        lqws[key] = n
12,098✔
2676
                }
12,098✔
2677
                acc.mu.Unlock()
22,515✔
2678
        }
2679

2680
        // Snapshot into array
2681
        subs := []*subscription{sub}
567,733✔
2682

567,733✔
2683
        // Deliver to all routes.
567,733✔
2684
        for _, route := range routes {
940,769✔
2685
                route.mu.Lock()
373,036✔
2686
                // Note that queue unsubs where n > 0 are still
373,036✔
2687
                // subscribes with a smaller weight.
373,036✔
2688
                route.sendRouteSubOrUnSubProtos(subs, n > 0, trace, route.importFilter)
373,036✔
2689
                route.mu.Unlock()
373,036✔
2690
        }
373,036✔
2691
}
2692

2693
// This starts the route accept loop in a go routine, unless it
2694
// is detected that the server has already been shutdown.
2695
// It will also start soliciting explicit routes.
2696
func (s *Server) startRouteAcceptLoop() {
4,530✔
2697
        if s.isShuttingDown() {
4,533✔
2698
                return
3✔
2699
        }
3✔
2700

2701
        // Snapshot server options.
2702
        opts := s.getOpts()
4,527✔
2703

4,527✔
2704
        // Snapshot server options.
4,527✔
2705
        port := opts.Cluster.Port
4,527✔
2706

4,527✔
2707
        if port == -1 {
5,593✔
2708
                port = 0
1,066✔
2709
        }
1,066✔
2710

2711
        // This requires lock, so do this outside of may block.
2712
        clusterName := s.ClusterName()
4,527✔
2713

4,527✔
2714
        s.mu.Lock()
4,527✔
2715
        s.Noticef("Cluster name is %s", clusterName)
4,527✔
2716
        if s.isClusterNameDynamic() {
4,568✔
2717
                s.Warnf("Cluster name was dynamically generated, consider setting one")
41✔
2718
        }
41✔
2719

2720
        hp := net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(port))
4,527✔
2721
        l, e := natsListen("tcp", hp)
4,527✔
2722
        s.routeListenerErr = e
4,527✔
2723
        if e != nil {
4,528✔
2724
                s.mu.Unlock()
1✔
2725
                s.Fatalf("Error listening on router port: %d - %v", opts.Cluster.Port, e)
1✔
2726
                return
1✔
2727
        }
1✔
2728
        s.Noticef("Listening for route connections on %s",
4,526✔
2729
                net.JoinHostPort(opts.Cluster.Host, strconv.Itoa(l.Addr().(*net.TCPAddr).Port)))
4,526✔
2730

4,526✔
2731
        // Check for TLSConfig
4,526✔
2732
        tlsReq := opts.Cluster.TLSConfig != nil
4,526✔
2733
        info := Info{
4,526✔
2734
                ID:           s.info.ID,
4,526✔
2735
                Name:         s.info.Name,
4,526✔
2736
                Version:      s.info.Version,
4,526✔
2737
                GoVersion:    runtime.Version(),
4,526✔
2738
                AuthRequired: false,
4,526✔
2739
                TLSRequired:  tlsReq,
4,526✔
2740
                TLSVerify:    tlsReq,
4,526✔
2741
                MaxPayload:   s.info.MaxPayload,
4,526✔
2742
                JetStream:    s.info.JetStream,
4,526✔
2743
                Proto:        s.getServerProto(),
4,526✔
2744
                GatewayURL:   s.getGatewayURL(),
4,526✔
2745
                Headers:      s.supportsHeaders(),
4,526✔
2746
                Cluster:      s.info.Cluster,
4,526✔
2747
                Domain:       s.info.Domain,
4,526✔
2748
                Dynamic:      s.isClusterNameDynamic(),
4,526✔
2749
                LNOC:         true,
4,526✔
2750
                LNOCU:        true,
4,526✔
2751
        }
4,526✔
2752
        // For tests that want to simulate old servers, do not set the compression
4,526✔
2753
        // on the INFO protocol if configured with CompressionNotSupported.
4,526✔
2754
        if cm := opts.Cluster.Compression.Mode; cm != CompressionNotSupported {
9,051✔
2755
                info.Compression = cm
4,525✔
2756
        }
4,525✔
2757
        if ps := opts.Cluster.PoolSize; ps > 0 {
8,545✔
2758
                info.RoutePoolSize = ps
4,019✔
2759
        }
4,019✔
2760
        // Set this if only if advertise is not disabled
2761
        if !opts.Cluster.NoAdvertise {
8,998✔
2762
                info.ClientConnectURLs = s.clientConnectURLs
4,472✔
2763
                info.WSConnectURLs = s.websocket.connectURLs
4,472✔
2764
        }
4,472✔
2765
        // If we have selected a random port...
2766
        if port == 0 {
5,592✔
2767
                // Write resolved port back to options.
1,066✔
2768
                opts.Cluster.Port = l.Addr().(*net.TCPAddr).Port
1,066✔
2769
        }
1,066✔
2770
        // Check for Auth items
2771
        if opts.Cluster.Username != "" {
4,641✔
2772
                info.AuthRequired = true
115✔
2773
        }
115✔
2774
        // Check for permissions.
2775
        if opts.Cluster.Permissions != nil {
4,563✔
2776
                info.Import = opts.Cluster.Permissions.Import
37✔
2777
                info.Export = opts.Cluster.Permissions.Export
37✔
2778
        }
37✔
2779
        // If this server has a LeafNode accept loop, s.leafNodeInfo.IP is,
2780
        // at this point, set to the host:port for the leafnode accept URL,
2781
        // taking into account possible advertise setting. Use the LeafNodeURLs
2782
        // and set this server's leafnode accept URL. This will be sent to
2783
        // routed servers.
2784
        if !opts.LeafNode.NoAdvertise && s.leafNodeInfo.IP != _EMPTY_ {
7,766✔
2785
                info.LeafNodeURLs = []string{s.leafNodeInfo.IP}
3,240✔
2786
        }
3,240✔
2787
        s.routeInfo = info
4,526✔
2788
        // Possibly override Host/Port and set IP based on Cluster.Advertise
4,526✔
2789
        if err := s.setRouteInfoHostPortAndIP(); err != nil {
4,527✔
2790
                s.Fatalf("Error setting route INFO with Cluster.Advertise value of %s, err=%v", opts.Cluster.Advertise, err)
1✔
2791
                l.Close()
1✔
2792
                s.mu.Unlock()
1✔
2793
                return
1✔
2794
        }
1✔
2795
        // Setup state that can enable shutdown
2796
        s.routeListener = l
4,525✔
2797
        // Warn if using Cluster.Insecure
4,525✔
2798
        if tlsReq && opts.Cluster.TLSConfig.InsecureSkipVerify {
4,529✔
2799
                s.Warnf(clusterTLSInsecureWarning)
4✔
2800
        }
4✔
2801

2802
        // Now that we have the port, keep track of all ip:port that resolve to this server.
2803
        if interfaceAddr, err := net.InterfaceAddrs(); err == nil {
9,050✔
2804
                var localIPs []string
4,525✔
2805
                for i := 0; i < len(interfaceAddr); i++ {
27,150✔
2806
                        interfaceIP, _, _ := net.ParseCIDR(interfaceAddr[i].String())
22,625✔
2807
                        ipStr := interfaceIP.String()
22,625✔
2808
                        if net.ParseIP(ipStr) != nil {
45,250✔
2809
                                localIPs = append(localIPs, ipStr)
22,625✔
2810
                        }
22,625✔
2811
                }
2812
                var portStr = strconv.FormatInt(int64(s.routeInfo.Port), 10)
4,525✔
2813
                for _, ip := range localIPs {
27,150✔
2814
                        ipPort := net.JoinHostPort(ip, portStr)
22,625✔
2815
                        s.routesToSelf[ipPort] = struct{}{}
22,625✔
2816
                }
22,625✔
2817
        }
2818

2819
        // Start the accept loop in a different go routine.
2820
        go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, gossipDefault, _EMPTY_) }, nil)
35,431✔
2821

2822
        // Solicit Routes if applicable. This will not block.
2823
        s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts)
4,525✔
2824

4,525✔
2825
        s.mu.Unlock()
4,525✔
2826
}
2827

2828
// Similar to setInfoHostPortAndGenerateJSON, but for routeInfo.
2829
func (s *Server) setRouteInfoHostPortAndIP() error {
4,716✔
2830
        opts := s.getOpts()
4,716✔
2831
        if opts.Cluster.Advertise != _EMPTY_ {
4,729✔
2832
                advHost, advPort, err := parseHostPort(opts.Cluster.Advertise, opts.Cluster.Port)
13✔
2833
                if err != nil {
14✔
2834
                        return err
1✔
2835
                }
1✔
2836
                s.routeInfo.Host = advHost
12✔
2837
                s.routeInfo.Port = advPort
12✔
2838
                s.routeInfo.IP = fmt.Sprintf("nats-route://%s/", net.JoinHostPort(advHost, strconv.Itoa(advPort)))
12✔
2839
        } else {
4,703✔
2840
                s.routeInfo.Host = opts.Cluster.Host
4,703✔
2841
                s.routeInfo.Port = opts.Cluster.Port
4,703✔
2842
                s.routeInfo.IP = ""
4,703✔
2843
        }
4,703✔
2844
        return nil
4,715✔
2845
}
2846

2847
// StartRouting will start the accept loop on the cluster host:port
2848
// and will actively try to connect to listed routes.
2849
func (s *Server) StartRouting(clientListenReady chan struct{}) {
4,530✔
2850
        defer s.grWG.Done()
4,530✔
2851

4,530✔
2852
        // Wait for the client and leafnode listen ports to be opened,
4,530✔
2853
        // and the possible ephemeral ports to be selected.
4,530✔
2854
        <-clientListenReady
4,530✔
2855

4,530✔
2856
        // Start the accept loop and solicitation of explicit routes (if applicable)
4,530✔
2857
        s.startRouteAcceptLoop()
4,530✔
2858

4,530✔
2859
}
4,530✔
2860

2861
func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string) {
15,344✔
2862
        // If A connects to B, and B to A (regardless if explicit or
15,344✔
2863
        // implicit - due to auto-discovery), and if each server first
15,344✔
2864
        // registers the route on the opposite TCP connection, the
15,344✔
2865
        // two connections will end-up being closed.
15,344✔
2866
        // Add some random delay to reduce risk of repeated failures.
15,344✔
2867
        delay := time.Duration(rand.Intn(100)) * time.Millisecond
15,344✔
2868
        if rtype == Explicit {
30,326✔
2869
                delay += DEFAULT_ROUTE_RECONNECT
14,982✔
2870
        }
14,982✔
2871
        select {
15,344✔
2872
        case <-time.After(delay):
1,118✔
2873
        case <-s.quitCh:
14,226✔
2874
                s.grWG.Done()
14,226✔
2875
                return
14,226✔
2876
        }
2877
        s.connectToRoute(rURL, rtype, false, gossipDefault, accName)
1,118✔
2878
}
2879

2880
// Checks to make sure the route is still valid.
2881
func (s *Server) routeStillValid(rURL *url.URL) bool {
114,446✔
2882
        for _, ri := range s.getOpts().Routes {
379,192✔
2883
                if urlsAreEqual(ri, rURL) {
379,189✔
2884
                        return true
114,443✔
2885
                }
114,443✔
2886
        }
2887
        return false
3✔
2888
}
2889

2890
func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect bool, gossipMode byte, accName string) {
30,963✔
2891
        defer s.grWG.Done()
30,963✔
2892
        if rURL == nil {
30,963✔
2893
                return
×
2894
        }
×
2895
        // For explicit routes, we will try to connect until we succeed. For implicit
2896
        // we will try only based on the number of ConnectRetries optin.
2897
        tryForEver := rtype == Explicit
30,963✔
2898

30,963✔
2899
        // Snapshot server options.
30,963✔
2900
        opts := s.getOpts()
30,963✔
2901

30,963✔
2902
        const connErrFmt = "Error trying to connect to route (attempt %v): %v"
30,963✔
2903

30,963✔
2904
        s.mu.RLock()
30,963✔
2905
        resolver := s.routeResolver
30,963✔
2906
        excludedAddresses := s.routesToSelf
30,963✔
2907
        s.mu.RUnlock()
30,963✔
2908

30,963✔
2909
        attemptDelay := routeConnectDelay
30,963✔
2910
        for attempts := 0; s.isRunning(); {
116,322✔
2911
                if tryForEver {
169,931✔
2912
                        if !s.routeStillValid(rURL) {
84,575✔
2913
                                return
3✔
2914
                        }
3✔
2915
                        if accName != _EMPTY_ {
116,142✔
2916
                                s.mu.RLock()
31,573✔
2917
                                _, valid := s.accRoutes[accName]
31,573✔
2918
                                s.mu.RUnlock()
31,573✔
2919
                                if !valid {
31,573✔
2920
                                        return
×
2921
                                }
×
2922
                        }
2923
                }
2924
                var conn net.Conn
85,356✔
2925
                address, err := s.getRandomIP(resolver, rURL.Host, excludedAddresses)
85,356✔
2926
                if err == errNoIPAvail {
85,361✔
2927
                        // This is ok, we are done.
5✔
2928
                        return
5✔
2929
                }
5✔
2930
                if err == nil {
170,700✔
2931
                        s.Debugf("Trying to connect to route on %s (%s)", rURL.Host, address)
85,349✔
2932
                        conn, err = natsDialTimeout("tcp", address, DEFAULT_ROUTE_DIAL)
85,349✔
2933
                }
85,349✔
2934
                if err != nil {
140,086✔
2935
                        attempts++
54,735✔
2936
                        if s.shouldReportConnectErr(firstConnect, attempts) {
86,996✔
2937
                                s.Errorf(connErrFmt, attempts, err)
32,261✔
2938
                        } else {
54,735✔
2939
                                s.Debugf(connErrFmt, attempts, err)
22,474✔
2940
                        }
22,474✔
2941
                        if !tryForEver {
54,780✔
2942
                                if opts.Cluster.ConnectRetries <= 0 {
72✔
2943
                                        return
27✔
2944
                                }
27✔
2945
                                if attempts > opts.Cluster.ConnectRetries {
22✔
2946
                                        return
4✔
2947
                                }
4✔
2948
                        }
2949
                        select {
54,704✔
2950
                        case <-s.quitCh:
225✔
2951
                                return
225✔
2952
                        case <-time.After(attemptDelay):
54,479✔
2953
                                if opts.Cluster.ConnectBackoff {
54,479✔
2954
                                        // Use exponential backoff for connection attempts.
×
2955
                                        attemptDelay *= 2
×
2956
                                        if attemptDelay > routeConnectMaxDelay {
×
2957
                                                attemptDelay = routeConnectMaxDelay
×
2958
                                        }
×
2959
                                }
2960
                                continue
54,479✔
2961
                        }
2962
                }
2963

2964
                if tryForEver && !s.routeStillValid(rURL) {
30,616✔
2965
                        conn.Close()
×
2966
                        return
×
2967
                }
×
2968

2969
                // We have a route connection here.
2970
                // Go ahead and create it and exit this func.
2971
                s.createRoute(conn, rURL, rtype, gossipMode, accName)
30,616✔
2972
                return
30,616✔
2973
        }
2974
}
2975

2976
func (c *client) isSolicitedRoute() bool {
50,136✔
2977
        c.mu.Lock()
50,136✔
2978
        defer c.mu.Unlock()
50,136✔
2979
        return c.kind == ROUTER && c.route != nil && c.route.didSolicit
50,136✔
2980
}
50,136✔
2981

2982
// Save the first hostname found in route URLs. This will be used in gossip mode
2983
// when trying to create a TLS connection by setting the tlsConfig.ServerName.
2984
// Lock is held on entry
2985
func (s *Server) saveRouteTLSName(routes []*url.URL) {
4,568✔
2986
        for _, u := range routes {
15,471✔
2987
                if s.routeTLSName == _EMPTY_ && net.ParseIP(u.Hostname()) == nil {
10,910✔
2988
                        s.routeTLSName = u.Hostname()
7✔
2989
                }
7✔
2990
        }
2991
}
2992

2993
// Start connection process to provided routes. Each route connection will
2994
// be started in a dedicated go routine.
2995
// Lock is held on entry
2996
func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) {
4,568✔
2997
        s.saveRouteTLSName(routes)
4,568✔
2998
        for _, r := range routes {
15,471✔
2999
                route := r
10,903✔
3000
                s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, _EMPTY_) })
21,806✔
3001
        }
3002
        // Now go over possible per-account routes and create them.
3003
        for _, an := range accounts {
8,313✔
3004
                for _, r := range routes {
14,173✔
3005
                        route, accName := r, an
10,428✔
3006
                        s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, accName) })
20,856✔
3007
                }
3008
        }
3009
}
3010

3011
func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error {
30,688✔
3012
        // Way to detect clients that incorrectly connect to the route listen
30,688✔
3013
        // port. Client provide Lang in the CONNECT protocol while ROUTEs don't.
30,688✔
3014
        if lang != "" {
30,710✔
3015
                c.sendErrAndErr(ErrClientConnectedToRoutePort.Error())
22✔
3016
                c.closeConnection(WrongPort)
22✔
3017
                return ErrClientConnectedToRoutePort
22✔
3018
        }
22✔
3019
        // Unmarshal as a route connect protocol
3020
        proto := &connectInfo{}
30,666✔
3021

30,666✔
3022
        if err := json.Unmarshal(arg, proto); err != nil {
30,666✔
3023
                return err
×
3024
        }
×
3025
        // Reject if this has Gateway which means that it would be from a gateway
3026
        // connection that incorrectly connects to the Route port.
3027
        if proto.Gateway != "" {
30,666✔
3028
                errTxt := fmt.Sprintf("Rejecting connection from gateway %q on the Route port", proto.Gateway)
×
3029
                c.Errorf(errTxt)
×
3030
                c.sendErr(errTxt)
×
3031
                c.closeConnection(WrongGateway)
×
3032
                return ErrWrongGateway
×
3033
        }
×
3034

3035
        if srv == nil {
30,666✔
3036
                return ErrServerNotRunning
×
3037
        }
×
3038

3039
        perms := srv.getOpts().Cluster.Permissions
30,666✔
3040
        clusterName := srv.ClusterName()
30,666✔
3041

30,666✔
3042
        // If we have a cluster name set, make sure it matches ours.
30,666✔
3043
        if proto.Cluster != clusterName {
30,702✔
3044
                shouldReject := true
36✔
3045
                // If we have a dynamic name we will do additional checks.
36✔
3046
                if srv.isClusterNameDynamic() {
69✔
3047
                        if !proto.Dynamic || strings.Compare(clusterName, proto.Cluster) < 0 {
54✔
3048
                                // We will take on their name since theirs is configured or higher then ours.
21✔
3049
                                srv.setClusterName(proto.Cluster)
21✔
3050
                                if !proto.Dynamic {
22✔
3051
                                        srv.optsMu.Lock()
1✔
3052
                                        srv.opts.Cluster.Name = proto.Cluster
1✔
3053
                                        srv.optsMu.Unlock()
1✔
3054
                                }
1✔
3055
                                c.mu.Lock()
21✔
3056
                                remoteID := c.opts.Name
21✔
3057
                                c.mu.Unlock()
21✔
3058
                                srv.removeAllRoutesExcept(remoteID)
21✔
3059
                                shouldReject = false
21✔
3060
                        }
3061
                }
3062
                if shouldReject {
51✔
3063
                        errTxt := fmt.Sprintf("Rejecting connection, cluster name %q does not match %q", proto.Cluster, clusterName)
15✔
3064
                        c.Errorf(errTxt)
15✔
3065
                        c.sendErr(errTxt)
15✔
3066
                        c.closeConnection(ClusterNameConflict)
15✔
3067
                        return ErrClusterNameRemoteConflict
15✔
3068
                }
15✔
3069
        }
3070

3071
        supportsHeaders := c.srv.supportsHeaders()
30,651✔
3072

30,651✔
3073
        // Grab connection name of remote route.
30,651✔
3074
        c.mu.Lock()
30,651✔
3075
        c.route.remoteID = c.opts.Name
30,651✔
3076
        c.route.lnoc = proto.LNOC
30,651✔
3077
        c.route.lnocu = proto.LNOCU
30,651✔
3078
        c.setRoutePermissions(perms)
30,651✔
3079
        c.headers = supportsHeaders && proto.Headers
30,651✔
3080
        c.mu.Unlock()
30,651✔
3081
        return nil
30,651✔
3082
}
3083

3084
// Called when we update our cluster name during negotiations with remotes.
3085
func (s *Server) removeAllRoutesExcept(remoteID string) {
34✔
3086
        s.mu.Lock()
34✔
3087
        routes := make([]*client, 0, s.numRoutes())
34✔
3088
        for rID, conns := range s.routes {
34✔
3089
                if rID == remoteID {
×
3090
                        continue
×
3091
                }
3092
                for _, r := range conns {
×
3093
                        if r != nil {
×
3094
                                routes = append(routes, r)
×
3095
                        }
×
3096
                }
3097
        }
3098
        for _, conns := range s.accRoutes {
38✔
3099
                for rID, r := range conns {
4✔
3100
                        if rID == remoteID {
×
3101
                                continue
×
3102
                        }
3103
                        routes = append(routes, r)
×
3104
                }
3105
        }
3106
        s.mu.Unlock()
34✔
3107

34✔
3108
        for _, r := range routes {
34✔
3109
                r.closeConnection(ClusterNameConflict)
×
3110
        }
×
3111
}
3112

3113
func (s *Server) removeRoute(c *client) {
61,514✔
3114
        s.mu.Lock()
61,514✔
3115
        defer s.mu.Unlock()
61,514✔
3116

61,514✔
3117
        var (
61,514✔
3118
                rID           string
61,514✔
3119
                lnURL         string
61,514✔
3120
                gwURL         string
61,514✔
3121
                idHash        string
61,514✔
3122
                accName       string
61,514✔
3123
                poolIdx       = -1
61,514✔
3124
                connectURLs   []string
61,514✔
3125
                wsConnectURLs []string
61,514✔
3126
                opts          = s.getOpts()
61,514✔
3127
                rURL          *url.URL
61,514✔
3128
                noPool        bool
61,514✔
3129
                rtype         RouteType
61,514✔
3130
        )
61,514✔
3131
        c.mu.Lock()
61,514✔
3132
        cid := c.cid
61,514✔
3133
        r := c.route
61,514✔
3134
        if r != nil {
123,028✔
3135
                rID = r.remoteID
61,514✔
3136
                lnURL = r.leafnodeURL
61,514✔
3137
                idHash = r.idHash
61,514✔
3138
                gwURL = r.gatewayURL
61,514✔
3139
                poolIdx = r.poolIdx
61,514✔
3140
                accName = bytesToString(r.accName)
61,514✔
3141
                if r.noPool {
62,671✔
3142
                        s.routesNoPool--
1,157✔
3143
                        noPool = true
1,157✔
3144
                }
1,157✔
3145
                connectURLs = r.connectURLs
61,514✔
3146
                wsConnectURLs = r.wsConnURLs
61,514✔
3147
                rURL = r.url
61,514✔
3148
                rtype = r.routeType
61,514✔
3149
        }
3150
        c.mu.Unlock()
61,514✔
3151
        if accName != _EMPTY_ {
76,241✔
3152
                if conns, ok := s.accRoutes[accName]; ok {
29,452✔
3153
                        if r := conns[rID]; r == c {
22,710✔
3154
                                s.removeRouteByHash(idHash + accName)
7,985✔
3155
                                delete(conns, rID)
7,985✔
3156
                                // Do not remove or set to nil when all remotes have been
7,985✔
3157
                                // removed from the map. The configured accounts must always
7,985✔
3158
                                // be in the accRoutes map and addRoute expects "conns" map
7,985✔
3159
                                // to be created.
7,985✔
3160
                        }
7,985✔
3161
                }
3162
        }
3163
        // If this is still -1, it means that it was not added to the routes
3164
        // so simply remove from temp clients and we are done.
3165
        if poolIdx == -1 || accName != _EMPTY_ {
97,376✔
3166
                s.removeFromTempClients(cid)
35,862✔
3167
                return
35,862✔
3168
        }
35,862✔
3169
        if conns, ok := s.routes[rID]; ok {
51,304✔
3170
                // If this route was not the one stored, simply remove from the
25,652✔
3171
                // temporary map and be done.
25,652✔
3172
                if conns[poolIdx] != c {
25,719✔
3173
                        s.removeFromTempClients(cid)
67✔
3174
                        return
67✔
3175
                }
67✔
3176
                conns[poolIdx] = nil
25,585✔
3177
                // Now check if this was the last connection to be removed.
25,585✔
3178
                empty := true
25,585✔
3179
                for _, c := range conns {
82,776✔
3180
                        if c != nil {
73,597✔
3181
                                empty = false
16,406✔
3182
                                break
16,406✔
3183
                        }
3184
                }
3185
                // This was the last route for this remote. Remove the remote entry
3186
                // and possibly send some async INFO protocols regarding gateway
3187
                // and leafnode URLs.
3188
                if empty {
34,764✔
3189
                        delete(s.routes, rID)
9,179✔
3190

9,179✔
3191
                        // Since this is the last route for this remote, possibly update
9,179✔
3192
                        // the client connect URLs and send an update to connected
9,179✔
3193
                        // clients.
9,179✔
3194
                        if (len(connectURLs) > 0 || len(wsConnectURLs) > 0) && !opts.Cluster.NoAdvertise {
18,267✔
3195
                                s.removeConnectURLsAndSendINFOToClients(connectURLs, wsConnectURLs)
9,088✔
3196
                        }
9,088✔
3197
                        // Remove the remote's gateway URL from our list and
3198
                        // send update to inbound Gateway connections.
3199
                        if gwURL != _EMPTY_ && s.removeGatewayURL(gwURL) {
9,938✔
3200
                                s.sendAsyncGatewayInfo()
759✔
3201
                        }
759✔
3202
                        // Remove the remote's leafNode URL from
3203
                        // our list and send update to LN connections.
3204
                        if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
12,656✔
3205
                                s.sendAsyncLeafNodeInfo()
3,477✔
3206
                        }
3,477✔
3207
                        // We can remove the configured route pool size of this remote.
3208
                        delete(s.remoteRoutePoolSize, rID)
9,179✔
3209
                        // If this server has pooling/pinned accounts and the route for
9,179✔
3210
                        // this remote was a "no pool" route, attempt to reconnect.
9,179✔
3211
                        if noPool {
10,269✔
3212
                                if s.routesPoolSize > 1 {
1,099✔
3213
                                        s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, _EMPTY_) })
17✔
3214
                                }
3215
                                if len(opts.Cluster.PinnedAccounts) > 0 {
1,100✔
3216
                                        for _, an := range opts.Cluster.PinnedAccounts {
22✔
3217
                                                accName := an
12✔
3218
                                                s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, accName) })
23✔
3219
                                        }
3220
                                }
3221
                        }
3222
                }
3223
                // This is for gateway code. Remove this route from a map that uses
3224
                // the route hash in combination with the pool index as the key.
3225
                if s.routesPoolSize > 1 {
50,080✔
3226
                        idHash += strconv.Itoa(poolIdx)
24,495✔
3227
                }
24,495✔
3228
                s.removeRouteByHash(idHash)
25,585✔
3229
        }
3230
        s.removeFromTempClients(cid)
25,585✔
3231
}
3232

3233
func (s *Server) isDuplicateServerName(name string) bool {
14,384✔
3234
        if name == _EMPTY_ {
24,571✔
3235
                return false
10,187✔
3236
        }
10,187✔
3237
        s.mu.RLock()
4,197✔
3238
        defer s.mu.RUnlock()
4,197✔
3239

4,197✔
3240
        if s.info.Name == name {
4,202✔
3241
                return true
5✔
3242
        }
5✔
3243
        for _, conns := range s.routes {
11,026✔
3244
                for _, r := range conns {
13,668✔
3245
                        if r != nil {
13,668✔
3246
                                r.mu.Lock()
6,834✔
3247
                                duplicate := r.route.remoteName == name
6,834✔
3248
                                r.mu.Unlock()
6,834✔
3249
                                if duplicate {
6,838✔
3250
                                        return true
4✔
3251
                                }
4✔
3252
                                break
6,830✔
3253
                        }
3254
                }
3255
        }
3256
        return false
4,188✔
3257
}
3258

3259
// Goes over each non-nil route connection for all remote servers
3260
// and invokes the function `f`. It does not go over per-account
3261
// routes.
3262
// Server lock is held on entry.
3263
func (s *Server) forEachNonPerAccountRoute(f func(r *client)) {
1,550,186✔
3264
        for _, conns := range s.routes {
2,096,297✔
3265
                for _, r := range conns {
2,179,211✔
3266
                        if r != nil {
3,231,080✔
3267
                                f(r)
1,597,980✔
3268
                        }
1,597,980✔
3269
                }
3270
        }
3271
}
3272

3273
// Goes over each non-nil route connection for all remote servers
3274
// and invokes the function `f`. This also includes the per-account
3275
// routes.
3276
// Server lock is held on entry.
3277
func (s *Server) forEachRoute(f func(r *client)) {
1,550,158✔
3278
        s.forEachNonPerAccountRoute(f)
1,550,158✔
3279
        for _, conns := range s.accRoutes {
1,585,598✔
3280
                for _, r := range conns {
111,954✔
3281
                        f(r)
76,514✔
3282
                }
76,514✔
3283
        }
3284
}
3285

3286
// Goes over each non-nil route connection at the given pool index
3287
// location in the slice and invokes the function `f`. If the
3288
// callback returns `true`, this function moves to the next remote.
3289
// Otherwise, the iteration over removes stops.
3290
// This does not include per-account routes.
3291
// Server lock is held on entry.
3292
func (s *Server) forEachRouteIdx(idx int, f func(r *client) bool) {
33✔
3293
        for _, conns := range s.routes {
95✔
3294
                if r := conns[idx]; r != nil {
123✔
3295
                        if !f(r) {
63✔
3296
                                return
2✔
3297
                        }
2✔
3298
                }
3299
        }
3300
}
3301

3302
// Goes over each remote and for the first non nil route connection,
3303
// invokes the function `f`.
3304
// Server lock is held on entry.
3305
func (s *Server) forEachRemote(f func(r *client)) {
12,068✔
3306
        for _, conns := range s.routes {
36,496✔
3307
                for _, r := range conns {
48,860✔
3308
                        if r != nil {
48,860✔
3309
                                f(r)
24,428✔
3310
                                break
24,428✔
3311
                        }
3312
                }
3313
        }
3314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc