• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

nats-io / nats-server / 24949216239

24 Apr 2026 08:34AM UTC coverage: 80.645% (-2.4%) from 83.05%
24949216239

push

github

web-flow
(2.14) [ADDED] `RemoteLeafOpts.IgnoreDiscoveredServers` option (#8067)

For a given leafnode remote, if this is set to true, this remote will
ignore any server leafnode URLs returned by the hub, allowing the user
to fully manage the servers this remote can connect to.

Resolves #8002

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>

74685 of 92610 relevant lines covered (80.64%)

632737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.27
/server/reload.go
1
// Copyright 2017-2026 The NATS Authors
2
// Licensed under the Apache License, Version 2.0 (the "License");
3
// you may not use this file except in compliance with the License.
4
// You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software
9
// distributed under the License is distributed on an "AS IS" BASIS,
10
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11
// See the License for the specific language governing permissions and
12
// limitations under the License.
13

14
package server
15

16
import (
17
        "cmp"
18
        "crypto/tls"
19
        "errors"
20
        "fmt"
21
        "net/url"
22
        "reflect"
23
        "slices"
24
        "strings"
25
        "sync/atomic"
26
        "time"
27

28
        "github.com/klauspost/compress/s2"
29

30
        "github.com/nats-io/jwt/v2"
31
        "github.com/nats-io/nuid"
32
)
33

34
// FlagSnapshot captures the server options as specified by CLI flags at
35
// startup. This should not be modified once the server has started.
36
var FlagSnapshot *Options
37

38
type reloadContext struct {
39
        oldClusterPerms *RoutePermissions
40
}
41

42
// option is a hot-swappable configuration setting.
43
type option interface {
44
        // Apply the server option.
45
        Apply(server *Server)
46

47
        // IsLoggingChange indicates if this option requires reloading the logger.
48
        IsLoggingChange() bool
49

50
        // IsTraceLevelChange indicates if this option requires reloading cached trace level.
51
        // Clients store trace level separately.
52
        IsTraceLevelChange() bool
53

54
        // IsAuthChange indicates if this option requires reloading authorization.
55
        IsAuthChange() bool
56

57
        // IsTLSChange indicates if this option requires reloading TLS.
58
        IsTLSChange() bool
59

60
        // IsClusterPermsChange indicates if this option requires reloading
61
        // cluster permissions.
62
        IsClusterPermsChange() bool
63

64
        // IsClusterPoolSizeOrAccountsChange indicates if this option requires
65
        // special handling for changes in cluster's pool size or accounts list.
66
        IsClusterPoolSizeOrAccountsChange() bool
67

68
        // IsJetStreamChange inidicates a change in the servers config for JetStream.
69
        // Account changes will be handled separately in reloadAuthorization.
70
        IsJetStreamChange() bool
71

72
        // Indicates a change in the server that requires publishing the server's statz
73
        IsStatszChange() bool
74
}
75

76
// noopOption is a base struct that provides default no-op behaviors.
77
type noopOption struct{}
78

79
func (n noopOption) IsLoggingChange() bool {
2,351✔
80
        return false
2,351✔
81
}
2,351✔
82

83
func (n noopOption) IsTraceLevelChange() bool {
2,351✔
84
        return false
2,351✔
85
}
2,351✔
86

87
func (n noopOption) IsAuthChange() bool {
66✔
88
        return false
66✔
89
}
66✔
90

91
func (n noopOption) IsTLSChange() bool {
×
92
        return false
×
93
}
×
94

95
func (n noopOption) IsClusterPermsChange() bool {
2,328✔
96
        return false
2,328✔
97
}
2,328✔
98

99
func (n noopOption) IsClusterPoolSizeOrAccountsChange() bool {
2,328✔
100
        return false
2,328✔
101
}
2,328✔
102

103
func (n noopOption) IsJetStreamChange() bool {
2,348✔
104
        return false
2,348✔
105
}
2,348✔
106

107
func (n noopOption) IsStatszChange() bool {
2,347✔
108
        return false
2,347✔
109
}
2,347✔
110

111
// loggingOption is a base struct that provides default option behaviors for
112
// logging-related options.
113
type loggingOption struct {
114
        noopOption
115
}
116

117
func (l loggingOption) IsLoggingChange() bool {
×
118
        return true
×
119
}
×
120

121
// traceLevelOption is a base struct that provides default option behaviors for
122
// tracelevel-related options.
123
type traceLevelOption struct {
124
        loggingOption
125
}
126

127
func (l traceLevelOption) IsTraceLevelChange() bool {
×
128
        return true
×
129
}
×
130

131
// traceOption implements the option interface for the `trace` setting.
132
type traceOption struct {
133
        traceLevelOption
134
        newValue bool
135
}
136

137
// Apply is a no-op because logging will be reloaded after options are applied.
138
func (t *traceOption) Apply(server *Server) {
×
139
        server.Noticef("Reloaded: trace = %v", t.newValue)
×
140
}
×
141

142
// traceVersboseOption implements the option interface for the `trace_verbose` setting.
143
type traceVerboseOption struct {
144
        traceLevelOption
145
        newValue bool
146
}
147

148
// Apply is a no-op because logging will be reloaded after options are applied.
149
func (t *traceVerboseOption) Apply(server *Server) {
×
150
        server.Noticef("Reloaded: trace_verbose = %v", t.newValue)
×
151
}
×
152

153
// traceHeadersOption implements the option interface for the `trace_headers` setting.
154
type traceHeadersOption struct {
155
        traceLevelOption
156
        newValue bool
157
}
158

159
// Apply is a no-op because logging will be reloaded after options are applied.
160
func (t *traceHeadersOption) Apply(server *Server) {
×
161
        server.Noticef("Reloaded: trace_headers = %v", t.newValue)
×
162
}
×
163

164
// debugOption implements the option interface for the `debug` setting.
165
type debugOption struct {
166
        loggingOption
167
        newValue bool
168
}
169

170
// Apply is mostly a no-op because logging will be reloaded after options are applied.
171
// However we will kick the raft nodes if they exist to reload.
172
func (d *debugOption) Apply(server *Server) {
×
173
        server.Noticef("Reloaded: debug = %v", d.newValue)
×
174
        server.reloadDebugRaftNodes(d.newValue)
×
175
}
×
176

177
// logtimeOption implements the option interface for the `logtime` setting.
178
type logtimeOption struct {
179
        loggingOption
180
        newValue bool
181
}
182

183
// Apply is a no-op because logging will be reloaded after options are applied.
184
func (l *logtimeOption) Apply(server *Server) {
×
185
        server.Noticef("Reloaded: logtime = %v", l.newValue)
×
186
}
×
187

188
// logtimeUTCOption implements the option interface for the `logtime_utc` setting.
189
type logtimeUTCOption struct {
190
        loggingOption
191
        newValue bool
192
}
193

194
// Apply is a no-op because logging will be reloaded after options are applied.
195
func (l *logtimeUTCOption) Apply(server *Server) {
×
196
        server.Noticef("Reloaded: logtime_utc = %v", l.newValue)
×
197
}
×
198

199
// logfileOption implements the option interface for the `log_file` setting.
200
type logfileOption struct {
201
        loggingOption
202
        newValue string
203
}
204

205
// Apply is a no-op because logging will be reloaded after options are applied.
206
func (l *logfileOption) Apply(server *Server) {
×
207
        server.Noticef("Reloaded: log_file = %v", l.newValue)
×
208
}
×
209

210
// syslogOption implements the option interface for the `syslog` setting.
211
type syslogOption struct {
212
        loggingOption
213
        newValue bool
214
}
215

216
// Apply is a no-op because logging will be reloaded after options are applied.
217
func (s *syslogOption) Apply(server *Server) {
×
218
        server.Noticef("Reloaded: syslog = %v", s.newValue)
×
219
}
×
220

221
// remoteSyslogOption implements the option interface for the `remote_syslog`
222
// setting.
223
type remoteSyslogOption struct {
224
        loggingOption
225
        newValue string
226
}
227

228
// Apply is a no-op because logging will be reloaded after options are applied.
229
func (r *remoteSyslogOption) Apply(server *Server) {
×
230
        server.Noticef("Reloaded: remote_syslog = %v", r.newValue)
×
231
}
×
232

233
// tlsOption implements the option interface for the `tls` setting.
234
type tlsOption struct {
235
        noopOption
236
        newValue *tls.Config
237
}
238

239
// Apply the tls change.
240
func (t *tlsOption) Apply(server *Server) {
18✔
241
        server.mu.Lock()
18✔
242
        tlsRequired := t.newValue != nil
18✔
243
        server.info.TLSRequired = tlsRequired && !server.getOpts().AllowNonTLS
18✔
244
        message := "disabled"
18✔
245
        if tlsRequired {
36✔
246
                server.info.TLSVerify = (t.newValue.ClientAuth == tls.RequireAndVerifyClientCert)
18✔
247
                message = "enabled"
18✔
248
        }
18✔
249
        server.mu.Unlock()
18✔
250
        server.Noticef("Reloaded: tls = %s", message)
18✔
251
}
252

253
func (t *tlsOption) IsTLSChange() bool {
×
254
        return true
×
255
}
×
256

257
// tlsTimeoutOption implements the option interface for the tls `timeout`
258
// setting.
259
type tlsTimeoutOption struct {
260
        noopOption
261
        newValue float64
262
}
263

264
// Apply is a no-op because the timeout will be reloaded after options are
265
// applied.
266
func (t *tlsTimeoutOption) Apply(server *Server) {
×
267
        server.Noticef("Reloaded: tls timeout = %v", t.newValue)
×
268
}
×
269

270
// tlsPinnedCertOption implements the option interface for the tls `pinned_certs` setting.
271
type tlsPinnedCertOption struct {
272
        noopOption
273
        newValue PinnedCertSet
274
}
275

276
// Apply is a no-op because the pinned certs will be reloaded after options are  applied.
277
func (t *tlsPinnedCertOption) Apply(server *Server) {
1✔
278
        server.Noticef("Reloaded: %d pinned_certs", len(t.newValue))
1✔
279
}
1✔
280

281
// tlsHandshakeFirst implements the option interface for the tls `handshake first` setting.
282
type tlsHandshakeFirst struct {
283
        noopOption
284
        newValue bool
285
}
286

287
// Apply is a no-op because the timeout will be reloaded after options are applied.
288
func (t *tlsHandshakeFirst) Apply(server *Server) {
2✔
289
        server.Noticef("Reloaded: Client TLS handshake first: %v", t.newValue)
2✔
290
}
2✔
291

292
// tlsHandshakeFirstFallback implements the option interface for the tls `handshake first fallback delay` setting.
293
type tlsHandshakeFirstFallback struct {
294
        noopOption
295
        newValue time.Duration
296
}
297

298
// Apply is a no-op because the timeout will be reloaded after options are applied.
299
func (t *tlsHandshakeFirstFallback) Apply(server *Server) {
2✔
300
        server.Noticef("Reloaded: Client TLS handshake first fallback delay: %v", t.newValue)
2✔
301
}
2✔
302

303
// authOption is a base struct that provides default option behaviors.
304
type authOption struct {
305
        noopOption
306
}
307

308
func (o authOption) IsAuthChange() bool {
2,285✔
309
        return true
2,285✔
310
}
2,285✔
311

312
// usernameOption implements the option interface for the `username` setting.
313
type usernameOption struct {
314
        authOption
315
}
316

317
// Apply is a no-op because authorization will be reloaded after options are
318
// applied.
319
func (u *usernameOption) Apply(server *Server) {
×
320
        server.Noticef("Reloaded: authorization username")
×
321
}
×
322

323
// passwordOption implements the option interface for the `password` setting.
324
type passwordOption struct {
325
        authOption
326
}
327

328
// Apply is a no-op because authorization will be reloaded after options are
329
// applied.
330
func (p *passwordOption) Apply(server *Server) {
×
331
        server.Noticef("Reloaded: authorization password")
×
332
}
×
333

334
// authorizationOption implements the option interface for the `token`
335
// authorization setting.
336
type authorizationOption struct {
337
        authOption
338
}
339

340
// Apply is a no-op because authorization will be reloaded after options are
341
// applied.
342
func (a *authorizationOption) Apply(server *Server) {
×
343
        server.Noticef("Reloaded: authorization token")
×
344
}
×
345

346
// authTimeoutOption implements the option interface for the authorization
347
// `timeout` setting.
348
type authTimeoutOption struct {
349
        noopOption // Not authOption because this is a no-op; will be reloaded with options.
350
        newValue   float64
351
}
352

353
// Apply is a no-op because the timeout will be reloaded after options are
354
// applied.
355
func (a *authTimeoutOption) Apply(server *Server) {
×
356
        server.Noticef("Reloaded: authorization timeout = %v", a.newValue)
×
357
}
×
358

359
// tagsOption implements the option interface for the `tags` setting.
360
type tagsOption struct {
361
        noopOption // Not authOption because this is a no-op; will be reloaded with options.
362
}
363

364
func (u *tagsOption) Apply(server *Server) {
1✔
365
        server.Noticef("Reloaded: tags")
1✔
366
}
1✔
367

368
func (u *tagsOption) IsStatszChange() bool {
1✔
369
        return true
1✔
370
}
1✔
371

372
// metadataOption implements the option interface for the `metadata` setting.
373
type metadataOption struct {
374
        noopOption // Not authOption because this is a no-op; will be reloaded with options.
375
}
376

377
func (u *metadataOption) Apply(server *Server) {
×
378
        server.Noticef("Reloaded: metadata")
×
379
}
×
380

381
func (u *metadataOption) IsStatszChange() bool {
×
382
        return true
×
383
}
×
384

385
// usersOption implements the option interface for the authorization `users`
386
// setting.
387
type usersOption struct {
388
        authOption
389
}
390

391
func (u *usersOption) Apply(server *Server) {
1,127✔
392
        server.Noticef("Reloaded: authorization users")
1,127✔
393
}
1,127✔
394

395
// nkeysOption implements the option interface for the authorization `users`
396
// setting.
397
type nkeysOption struct {
398
        authOption
399
}
400

401
func (u *nkeysOption) Apply(server *Server) {
×
402
        server.Noticef("Reloaded: authorization nkey users")
×
403
}
×
404

405
// clusterOption implements the option interface for the `cluster` setting.
406
type clusterOption struct {
407
        authOption
408
        newValue        ClusterOpts
409
        permsChanged    bool
410
        accsAdded       []string
411
        accsRemoved     []string
412
        poolSizeChanged bool
413
        compressChanged bool
414
}
415

416
// Apply the cluster change.
417
func (c *clusterOption) Apply(s *Server) {
23✔
418
        // TODO: support enabling/disabling clustering.
23✔
419
        s.mu.Lock()
23✔
420
        tlsRequired := c.newValue.TLSConfig != nil
23✔
421
        s.routeInfo.TLSRequired = tlsRequired
23✔
422
        s.routeInfo.TLSVerify = tlsRequired
23✔
423
        s.routeInfo.AuthRequired = c.newValue.Username != ""
23✔
424
        if c.newValue.NoAdvertise {
23✔
425
                s.routeInfo.ClientConnectURLs = nil
×
426
                s.routeInfo.WSConnectURLs = nil
×
427
        } else {
23✔
428
                s.routeInfo.ClientConnectURLs = s.clientConnectURLs
23✔
429
                s.routeInfo.WSConnectURLs = s.websocket.connectURLs
23✔
430
        }
23✔
431
        s.setRouteInfoHostPortAndIP()
23✔
432
        var routes []*client
23✔
433
        if c.compressChanged {
42✔
434
                co := &s.getOpts().Cluster.Compression
19✔
435
                newMode := co.Mode
19✔
436
                s.forEachRoute(func(r *client) {
20✔
437
                        r.mu.Lock()
1✔
438
                        // Skip routes that are "not supported" (because they will never do
1✔
439
                        // compression) or the routes that have already the new compression
1✔
440
                        // mode.
1✔
441
                        if r.route.compression == CompressionNotSupported || r.route.compression == newMode {
1✔
442
                                r.mu.Unlock()
×
443
                                return
×
444
                        }
×
445
                        // We need to close the route if it had compression "off" or the new
446
                        // mode is compression "off", or if the new mode is "accept", because
447
                        // these require negotiation.
448
                        if r.route.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept {
2✔
449
                                routes = append(routes, r)
1✔
450
                        } else if newMode == CompressionS2Auto {
1✔
451
                                // If the mode is "s2_auto", we need to check if there is really
×
452
                                // need to change, and at any rate, we want to save the actual
×
453
                                // compression level here, not s2_auto.
×
454
                                r.updateS2AutoCompressionLevel(co, &r.route.compression)
×
455
                        } else {
×
456
                                // Simply change the compression writer
×
457
                                r.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...)
×
458
                                r.route.compression = newMode
×
459
                        }
×
460
                        r.mu.Unlock()
1✔
461
                })
462
        }
463
        s.mu.Unlock()
23✔
464
        if c.newValue.Name != "" && c.newValue.Name != s.ClusterName() {
23✔
465
                s.setClusterName(c.newValue.Name)
×
466
        }
×
467
        for _, r := range routes {
24✔
468
                r.closeConnection(ClientClosed)
1✔
469
        }
1✔
470
        s.Noticef("Reloaded: cluster")
23✔
471
        if tlsRequired && c.newValue.TLSConfig.InsecureSkipVerify {
24✔
472
                s.Warnf(clusterTLSInsecureWarning)
1✔
473
        }
1✔
474
}
475

476
func (c *clusterOption) IsClusterPermsChange() bool {
23✔
477
        return c.permsChanged
23✔
478
}
23✔
479

480
func (c *clusterOption) IsClusterPoolSizeOrAccountsChange() bool {
23✔
481
        return c.poolSizeChanged || len(c.accsAdded) > 0 || len(c.accsRemoved) > 0
23✔
482
}
23✔
483

484
func (c *clusterOption) diffPoolAndAccounts(old *ClusterOpts) {
23✔
485
        c.poolSizeChanged = c.newValue.PoolSize != old.PoolSize
23✔
486
addLoop:
23✔
487
        for _, na := range c.newValue.PinnedAccounts {
24✔
488
                for _, oa := range old.PinnedAccounts {
1✔
489
                        if na == oa {
×
490
                                continue addLoop
×
491
                        }
492
                }
493
                c.accsAdded = append(c.accsAdded, na)
1✔
494
        }
495
removeLoop:
23✔
496
        for _, oa := range old.PinnedAccounts {
23✔
497
                for _, na := range c.newValue.PinnedAccounts {
×
498
                        if oa == na {
×
499
                                continue removeLoop
×
500
                        }
501
                }
502
                c.accsRemoved = append(c.accsRemoved, oa)
×
503
        }
504
}
505

506
// routesOption implements the option interface for the cluster `routes`
507
// setting.
508
type routesOption struct {
509
        noopOption
510
        add    []*url.URL
511
        remove []*url.URL
512
}
513

514
// Apply the route changes by adding and removing the necessary routes.
515
func (r *routesOption) Apply(server *Server) {
2✔
516
        server.mu.Lock()
2✔
517
        routes := make([]*client, server.numRoutes())
2✔
518
        i := 0
2✔
519
        server.forEachRoute(func(r *client) {
10✔
520
                routes[i] = r
8✔
521
                i++
8✔
522
        })
8✔
523
        // If there was a change, notify monitoring code that it should
524
        // update the route URLs if /varz endpoint is inspected.
525
        if len(r.add)+len(r.remove) > 0 {
4✔
526
                server.varzUpdateRouteURLs = true
2✔
527
        }
2✔
528
        server.mu.Unlock()
2✔
529

2✔
530
        // Remove routes.
2✔
531
        for _, remove := range r.remove {
4✔
532
                for _, client := range routes {
10✔
533
                        var url *url.URL
8✔
534
                        client.mu.Lock()
8✔
535
                        if client.route != nil {
16✔
536
                                url = client.route.url
8✔
537
                        }
8✔
538
                        client.mu.Unlock()
8✔
539
                        if url != nil && urlsAreEqual(url, remove) {
12✔
540
                                // Do not attempt to reconnect when route is removed.
4✔
541
                                client.setNoReconnect()
4✔
542
                                client.closeConnection(RouteRemoved)
4✔
543
                                server.Noticef("Removed route %v", remove)
4✔
544
                        }
4✔
545
                }
546
        }
547

548
        // Add routes.
549
        server.mu.Lock()
2✔
550
        server.solicitRoutes(r.add, server.getOpts().Cluster.PinnedAccounts)
2✔
551
        server.mu.Unlock()
2✔
552

2✔
553
        server.Noticef("Reloaded: cluster routes")
2✔
554
}
555

556
// maxConnOption implements the option interface for the `max_connections`
557
// setting.
558
type maxConnOption struct {
559
        noopOption
560
        newValue int
561
}
562

563
// Apply the max connections change by closing random connections til we are
564
// below the limit if necessary.
565
func (m *maxConnOption) Apply(server *Server) {
×
566
        server.mu.Lock()
×
567
        clients := make([]*client, 0, len(server.clients))
×
568
        // Map iteration is random, which allows us to close random connections.
×
569
        for _, client := range server.clients {
×
570
                if isInternalClient(client.kind) {
×
571
                        continue
×
572
                }
573
                clients = append(clients, client)
×
574
        }
575
        server.mu.Unlock()
×
576

×
577
        if newc := max(0, m.newValue); len(clients) > newc {
×
578
                // Close connections til we are within the limit.
×
579
                var (
×
580
                        numClose = len(clients) - newc
×
581
                        closed   = 0
×
582
                )
×
583
                for _, client := range clients {
×
584
                        client.maxConnExceeded()
×
585
                        closed++
×
586
                        if closed >= numClose {
×
587
                                break
×
588
                        }
589
                }
590
                server.Noticef("Closed %d connections to fall within max_connections", closed)
×
591
        }
592
        server.Noticef("Reloaded: max_connections = %v", m.newValue)
×
593
}
594

595
// pidFileOption implements the option interface for the `pid_file` setting.
596
type pidFileOption struct {
597
        noopOption
598
        newValue string
599
}
600

601
// Apply the setting by logging the pid to the new file.
602
func (p *pidFileOption) Apply(server *Server) {
×
603
        if p.newValue == "" {
×
604
                return
×
605
        }
×
606
        if err := server.logPid(); err != nil {
×
607
                server.Errorf("Failed to write pidfile: %v", err)
×
608
        }
×
609
        server.Noticef("Reloaded: pid_file = %v", p.newValue)
×
610
}
611

612
// portsFileDirOption implements the option interface for the `portFileDir` setting.
613
type portsFileDirOption struct {
614
        noopOption
615
        oldValue string
616
        newValue string
617
}
618

619
func (p *portsFileDirOption) Apply(server *Server) {
1✔
620
        server.deletePortsFile(p.oldValue)
1✔
621
        server.logPorts()
1✔
622
        server.Noticef("Reloaded: ports_file_dir = %v", p.newValue)
1✔
623
}
1✔
624

625
// maxControlLineOption implements the option interface for the
626
// `max_control_line` setting.
627
type maxControlLineOption struct {
628
        noopOption
629
        newValue int32
630
}
631

632
// Apply the setting by updating each client.
633
func (m *maxControlLineOption) Apply(server *Server) {
×
634
        mcl := int32(m.newValue)
×
635
        server.mu.Lock()
×
636
        for _, client := range server.clients {
×
637
                atomic.StoreInt32(&client.mcl, mcl)
×
638
        }
×
639
        server.mu.Unlock()
×
640
        server.Noticef("Reloaded: max_control_line = %d", mcl)
×
641
}
642

643
// maxPayloadOption implements the option interface for the `max_payload`
644
// setting.
645
type maxPayloadOption struct {
646
        noopOption
647
        newValue int32
648
}
649

650
// Apply the setting by updating the server info and each client.
651
func (m *maxPayloadOption) Apply(server *Server) {
×
652
        server.mu.Lock()
×
653
        server.info.MaxPayload = m.newValue
×
654
        for _, client := range server.clients {
×
655
                atomic.StoreInt32(&client.mpay, int32(m.newValue))
×
656
        }
×
657
        server.mu.Unlock()
×
658
        server.Noticef("Reloaded: max_payload = %d", m.newValue)
×
659
}
660

661
// pingIntervalOption implements the option interface for the `ping_interval`
662
// setting.
663
type pingIntervalOption struct {
664
        noopOption
665
        newValue time.Duration
666
}
667

668
// Apply is a no-op because the ping interval will be reloaded after options
669
// are applied.
670
func (p *pingIntervalOption) Apply(server *Server) {
2✔
671
        server.Noticef("Reloaded: ping_interval = %s", p.newValue)
2✔
672
}
2✔
673

674
// maxPingsOutOption implements the option interface for the `ping_max`
675
// setting.
676
type maxPingsOutOption struct {
677
        noopOption
678
        newValue int
679
}
680

681
// Apply is a no-op because the ping interval will be reloaded after options
682
// are applied.
683
func (m *maxPingsOutOption) Apply(server *Server) {
×
684
        server.Noticef("Reloaded: ping_max = %d", m.newValue)
×
685
}
×
686

687
// writeDeadlineOption implements the option interface for the `write_deadline`
688
// setting.
689
type writeDeadlineOption struct {
690
        noopOption
691
        newValue time.Duration
692
}
693

694
// Apply is a no-op because the write deadline will be reloaded after options
695
// are applied.
696
func (w *writeDeadlineOption) Apply(server *Server) {
×
697
        server.Noticef("Reloaded: write_deadline = %s", w.newValue)
×
698
}
×
699

700
// clientAdvertiseOption implements the option interface for the `client_advertise` setting.
701
type clientAdvertiseOption struct {
702
        noopOption
703
        newValue string
704
}
705

706
// Apply the setting by updating the server info and regenerate the infoJSON byte array.
707
func (c *clientAdvertiseOption) Apply(server *Server) {
×
708
        server.mu.Lock()
×
709
        server.setInfoHostPort()
×
710
        server.mu.Unlock()
×
711
        server.Noticef("Reload: client_advertise = %s", c.newValue)
×
712
}
×
713

714
// accountsOption implements the option interface.
715
// Ensure that authorization code is executed if any change in accounts
716
type accountsOption struct {
717
        authOption
718
}
719

720
// Apply is a no-op. Changes will be applied in reloadAuthorization
721
func (a *accountsOption) Apply(s *Server) {
1,135✔
722
        s.Noticef("Reloaded: accounts")
1,135✔
723
}
1,135✔
724

725
// For changes to a server's config.
726
type jetStreamOption struct {
727
        noopOption
728
        newValue bool
729
}
730

731
func (a *jetStreamOption) Apply(s *Server) {
3✔
732
        s.Noticef("Reloaded: JetStream")
3✔
733
}
3✔
734

735
func (jso jetStreamOption) IsJetStreamChange() bool {
3✔
736
        return true
3✔
737
}
3✔
738

739
func (jso jetStreamOption) IsStatszChange() bool {
3✔
740
        return true
3✔
741
}
3✔
742

743
type jetStreamLimitsOption struct {
744
        noopOption
745
        newMaxMemory int64
746
        newMaxStore  int64
747
}
748

749
func (jso *jetStreamLimitsOption) Apply(s *Server) {
×
750
        js := s.getJetStream()
×
751
        if js == nil {
×
752
                return
×
753
        }
×
754
        js.mu.Lock()
×
755
        if jso.newMaxMemory > 0 {
×
756
                js.config.MaxMemory = jso.newMaxMemory
×
757
                atomic.StoreInt64(&js.memMax, js.config.MaxMemory)
×
758
                s.Noticef("Reloaded: JetStream max_mem_store = %s", friendlyBytes(jso.newMaxMemory))
×
759
        }
×
760
        if jso.newMaxStore > 0 {
×
761
                js.config.MaxStore = jso.newMaxStore
×
762
                atomic.StoreInt64(&js.storeMax, js.config.MaxStore)
×
763
                s.Noticef("Reloaded: JetStream max_file_store = %s", friendlyBytes(jso.newMaxStore))
×
764
        }
×
765
        js.mu.Unlock()
×
766
}
767

768
func (jso *jetStreamLimitsOption) IsStatszChange() bool {
×
769
        return true
×
770
}
×
771

772
type defaultSentinelOption struct {
773
        noopOption
774
        newValue string
775
}
776

777
func (so *defaultSentinelOption) Apply(s *Server) {
×
778
        s.Noticef("Reloaded: default_sentinel = %s", so.newValue)
×
779
}
×
780

781
type ocspOption struct {
782
        tlsOption
783
        newValue *OCSPConfig
784
}
785

786
func (a *ocspOption) Apply(s *Server) {
2✔
787
        s.Noticef("Reloaded: OCSP")
2✔
788
}
2✔
789

790
type ocspResponseCacheOption struct {
791
        tlsOption
792
        newValue *OCSPResponseCacheConfig
793
}
794

795
func (a *ocspResponseCacheOption) Apply(s *Server) {
1✔
796
        s.Noticef("Reloaded OCSP peer cache")
1✔
797
}
1✔
798

799
// connectErrorReports implements the option interface for the `connect_error_reports`
800
// setting.
801
type connectErrorReports struct {
802
        noopOption
803
        newValue int
804
}
805

806
// Apply is a no-op because the value will be reloaded after options are applied.
807
func (c *connectErrorReports) Apply(s *Server) {
×
808
        s.Noticef("Reloaded: connect_error_reports = %v", c.newValue)
×
809
}
×
810

811
// connectErrorReports implements the option interface for the `connect_error_reports`
812
// setting.
813
type reconnectErrorReports struct {
814
        noopOption
815
        newValue int
816
}
817

818
// Apply is a no-op because the value will be reloaded after options are applied.
819
func (r *reconnectErrorReports) Apply(s *Server) {
×
820
        s.Noticef("Reloaded: reconnect_error_reports = %v", r.newValue)
×
821
}
×
822

823
// maxTracedMsgLenOption implements the option interface for the `max_traced_msg_len` setting.
824
type maxTracedMsgLenOption struct {
825
        noopOption
826
        newValue int
827
}
828

829
// Apply the setting by updating the maximum traced message length.
830
func (m *maxTracedMsgLenOption) Apply(server *Server) {
×
831
        server.mu.Lock()
×
832
        defer server.mu.Unlock()
×
833
        server.opts.MaxTracedMsgLen = m.newValue
×
834
        server.Noticef("Reloaded: max_traced_msg_len = %d", m.newValue)
×
835
}
×
836

837
type mqttAckWaitReload struct {
838
        noopOption
839
        newValue time.Duration
840
}
841

842
func (o *mqttAckWaitReload) Apply(s *Server) {
×
843
        s.Noticef("Reloaded: MQTT ack_wait = %v", o.newValue)
×
844
}
×
845

846
type mqttMaxAckPendingReload struct {
847
        noopOption
848
        newValue uint16
849
}
850

851
func (o *mqttMaxAckPendingReload) Apply(s *Server) {
×
852
        s.mqttUpdateMaxAckPending(o.newValue)
×
853
        s.Noticef("Reloaded: MQTT max_ack_pending = %v", o.newValue)
×
854
}
×
855

856
type mqttStreamReplicasReload struct {
857
        noopOption
858
        newValue int
859
}
860

861
func (o *mqttStreamReplicasReload) Apply(s *Server) {
×
862
        s.Noticef("Reloaded: MQTT stream_replicas = %v", o.newValue)
×
863
}
×
864

865
type mqttConsumerReplicasReload struct {
866
        noopOption
867
        newValue int
868
}
869

870
func (o *mqttConsumerReplicasReload) Apply(s *Server) {
×
871
        s.Noticef("Reloaded: MQTT consumer_replicas = %v", o.newValue)
×
872
}
×
873

874
type mqttConsumerMemoryStorageReload struct {
875
        noopOption
876
        newValue bool
877
}
878

879
func (o *mqttConsumerMemoryStorageReload) Apply(s *Server) {
×
880
        s.Noticef("Reloaded: MQTT consumer_memory_storage = %v", o.newValue)
×
881
}
×
882

883
type mqttInactiveThresholdReload struct {
884
        noopOption
885
        newValue time.Duration
886
}
887

888
func (o *mqttInactiveThresholdReload) Apply(s *Server) {
×
889
        s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue)
×
890
}
×
891

892
type profBlockRateReload struct {
893
        noopOption
894
        newValue int
895
}
896

897
func (o *profBlockRateReload) Apply(s *Server) {
×
898
        s.setBlockProfileRate(o.newValue)
×
899
        s.Noticef("Reloaded: prof_block_rate = %v", o.newValue)
×
900
}
×
901

902
type leafNodeOption struct {
903
        noopOption
904
        tlsFirstChanged    bool
905
        compressionChanged bool
906
        // These are for the remotes
907
        added   []*RemoteLeafOpts
908
        changed map[*leafNodeCfg]*remoteLeafOption
909
}
910

911
type remoteLeafOption struct {
912
        tlsFirstChanged    bool
913
        compressionChanged bool
914
        disabledChanged    bool
915
        opts               *RemoteLeafOpts
916
}
917

918
// Given `old` and `new` Leafnode options, this function will return the structure
919
// used for applying the configuration, or an error is there are changes that
920
// are not supported.
921
func getLeafNodeOptionsChanges(s *Server, old, new *LeafNodeOpts) (*leafNodeOption, error) {
33✔
922

33✔
923
        // We can't use DeepEqual for `Users` field, so do custom check.
33✔
924
        if usersHaveChanged(old.Users, new.Users) {
33✔
925
                return nil, fmt.Errorf("field \"Users\": old=%v, new=%v", old.Users, new.Users)
×
926
        }
×
927

928
        // Check the main leafnodes{} block to see if there are any changes that are
929
        // not supported. We provide a list of fields to ignore (we already checked,
930
        // allow them to be modified or will check later).
931
        if err := checkConfigsEqual(old, new, []string{
33✔
932
                "Compression",
33✔
933
                "Remotes",
33✔
934
                "TLSHandshakeFirst",
33✔
935
                "TLSHandshakeFirstFallback",
33✔
936
                "TLSConfig",
33✔
937
                "Users",
33✔
938
        }); err != nil {
33✔
939
                return nil, err
×
940
        }
×
941

942
        const (
33✔
943
                remoteErrFormat = "remote %s: %s"
33✔
944
                maxAttempts     = 20
33✔
945
        )
33✔
946
        var (
33✔
947
                nlo *leafNodeOption
33✔
948
                // Track whether any existing remote was not found (i.e. removed).
33✔
949
                removed bool
33✔
950
        )
33✔
951

33✔
952
forLoop:
33✔
953
        for failed := range maxAttempts {
66✔
954
                removed = false
33✔
955
                if failed > 0 {
33✔
956
                        // If we failed once, we will wait a bit before trying again the remotes.
×
957
                        // This could give enough time for connections that were in progress to complete.
×
958
                        select {
×
959
                        case <-time.After(50 * time.Millisecond):
×
960
                        case <-s.quitCh:
×
961
                                return nil, ErrServerNotRunning
×
962
                        }
963
                }
964
                nlo = &leafNodeOption{
33✔
965
                        tlsFirstChanged:    (old.TLSHandshakeFirst != new.TLSHandshakeFirst || old.TLSHandshakeFirstFallback != new.TLSHandshakeFirstFallback),
33✔
966
                        compressionChanged: !old.Compression.equals(&new.Compression),
33✔
967
                        // Start with all, will update when processing existing ones.
33✔
968
                        // Since the list will be modified, we need to clone it.
33✔
969
                        added: slices.Clone(new.Remotes),
33✔
970
                }
33✔
971

33✔
972
                s.mu.RLock()
33✔
973
                // Go through the list of existing remote configurations.
33✔
974
                for lrc := range s.leafRemoteCfgs {
43✔
975
                        var rlo *RemoteLeafOpts
10✔
976
                        // Look for the corresponding `*RemoteLeafOpts` in the `nlo.added`
10✔
977
                        // list. If it is found, that function returns an updated list
10✔
978
                        // with the element removed from it.
10✔
979
                        lrc.RLock()
10✔
980
                        rlo, nlo.added = getRemoteLeafOpts(lrc.name(), nlo.added)
10✔
981
                        if rlo == nil {
10✔
982
                                // Not found, will be removed in leafNodeOption.Apply().
×
983
                                removed = true
×
984
                                lrc.RUnlock()
×
985
                                continue
×
986
                        }
987
                        // Now we need to make sure that there are no changes that we don't
988
                        // support for a RemoteLeafOpts.
989
                        err := checkConfigsEqual(lrc.RemoteLeafOpts, rlo, []string{
10✔
990
                                "Compression",
10✔
991
                                "Disabled",
10✔
992
                                "TLS",
10✔
993
                                "TLSHandshakeFirst",
10✔
994
                                "TLSConfig",
10✔
995
                        })
10✔
996
                        if err != nil {
10✔
997
                                lrc.RUnlock()
×
998
                                s.mu.RUnlock()
×
999
                                return nil, fmt.Errorf(remoteErrFormat, rlo.safeName(), err)
×
1000
                        }
×
1001
                        disabledChanged := lrc.Disabled != rlo.Disabled
10✔
1002
                        // If this remote was disabled and is now enabled, we need to make sure
10✔
1003
                        // that there is no connect in progress. If that is the case, either
10✔
1004
                        // try again (if it is the first failure) or return an error.
10✔
1005
                        if disabledChanged && lrc.Disabled && lrc.connInProgress {
10✔
1006
                                lrc.RUnlock()
×
1007
                                s.mu.RUnlock()
×
1008
                                if failed < maxAttempts-1 {
×
1009
                                        continue forLoop
×
1010
                                }
1011
                                return nil, fmt.Errorf(remoteErrFormat, rlo.safeName(),
×
1012
                                        "cannot be enabled at the moment, try again")
×
1013
                        }
1014
                        // Since we will use the new `rlo.TLSConfig` later on, consider all
1015
                        // existing remote configs as "changed" and store them in the
1016
                        // `nlo.changed` map.
1017
                        if nlo.changed == nil {
16✔
1018
                                nlo.changed = make(map[*leafNodeCfg]*remoteLeafOption)
6✔
1019
                        }
6✔
1020
                        lnro := &remoteLeafOption{
10✔
1021
                                tlsFirstChanged:    lrc.TLSHandshakeFirst != rlo.TLSHandshakeFirst,
10✔
1022
                                compressionChanged: !lrc.Compression.equals(&rlo.Compression),
10✔
1023
                                disabledChanged:    disabledChanged,
10✔
1024
                                opts:               rlo,
10✔
1025
                        }
10✔
1026
                        lrc.RUnlock()
10✔
1027
                        nlo.changed[lrc] = lnro
10✔
1028
                }
1029
                if len(nlo.added) > 0 {
33✔
1030
                        // Go through the added list and check if an added was recently removed and,
×
1031
                        // if that is the case, is it still in the `s.rmLeafRemoteCfgs` map, which
×
1032
                        // may mean that there was a connect-in-progress that did not complete yet.
×
1033
                        // Either try again (if it is the first failure) or return an error.
×
1034
                        for _, rlo := range nlo.added {
×
1035
                                if _, cip := s.rmLeafRemoteCfgs[rlo.name()]; cip {
×
1036
                                        s.mu.RUnlock()
×
1037
                                        if failed < maxAttempts-1 {
×
1038
                                                continue forLoop
×
1039
                                        }
1040
                                        return nil, fmt.Errorf(remoteErrFormat, rlo.safeName(),
×
1041
                                                "cannot be added at the moment, try again")
×
1042
                                }
1043
                        }
1044
                }
1045
                s.mu.RUnlock()
33✔
1046
                break
33✔
1047
        }
1048

1049
        // Now we want to make sure that there were actual changes, so that we don't
1050
        // cause a reload of leafnodes for nothing. However, if one has (or all have)
1051
        // been removed we still need to invoke leafNodeOption.Apply().
1052
        if !nlo.tlsFirstChanged && !nlo.compressionChanged && !removed && len(nlo.added) == 0 && len(nlo.changed) == 0 {
36✔
1053
                return nil, nil
3✔
1054
        }
3✔
1055

1056
        return nlo, nil
30✔
1057
}
1058

1059
func usersHaveChanged(ousers, nusers []*User) bool {
33✔
1060
        if len(ousers) != len(nusers) {
33✔
1061
                return true
×
1062
        }
×
1063
        // We did not do a strict list order check in the past, so maintain this to
1064
        // avoid possible breaking changes.
1065
        oua := make(map[string]*User, len(ousers))
33✔
1066
        nua := make(map[string]*User, len(nusers))
33✔
1067
        for _, u := range ousers {
34✔
1068
                oua[u.Username] = u
1✔
1069
        }
1✔
1070
        for _, u := range nusers {
34✔
1071
                nua[u.Username] = u
1✔
1072
        }
1✔
1073
        for uname, u := range oua {
34✔
1074
                // If we can not find new one with same name, consider that they have changed.
1✔
1075
                nu, ok := nua[uname]
1✔
1076
                if !ok {
1✔
1077
                        return true
×
1078
                }
×
1079
                // Same if password or account has changed.
1080
                if u.Password != nu.Password || u.Account.GetName() != nu.Account.GetName() {
1✔
1081
                        return true
×
1082
                }
×
1083
        }
1084
        return false
33✔
1085
}
1086

1087
// Given the `search` remote leafnode options name, searches for a match in the `list`.
1088
// If found, returns the `*RemoteLeafOpts` from the list, and the updated list
1089
// without the element in it. If not found, returns `nil` and the unmodified list.
1090
func getRemoteLeafOpts(search string, list []*RemoteLeafOpts) (*RemoteLeafOpts, []*RemoteLeafOpts) {
10✔
1091
        for i, rlo := range list {
21✔
1092
                if search == rlo.name() {
21✔
1093
                        lastIdx := len(list) - 1
10✔
1094
                        if lastIdx == 0 {
16✔
1095
                                return rlo, nil
6✔
1096
                        }
6✔
1097
                        if i < lastIdx {
7✔
1098
                                list[i] = list[lastIdx]
3✔
1099
                        }
3✔
1100
                        list = list[:lastIdx]
4✔
1101
                        return rlo, list
4✔
1102
                }
1103
        }
1104
        return nil, list
×
1105
}
1106

1107
func (l *leafNodeOption) Apply(s *Server) {
30✔
1108
        opts := s.getOpts()
30✔
1109
        if l.tlsFirstChanged {
33✔
1110
                s.Noticef("Reloaded: LeafNode TLS HandshakeFirst value is: %v", opts.LeafNode.TLSHandshakeFirst)
3✔
1111
                s.Noticef("Reloaded: LeafNode TLS HandshakeFirstFallback value is: %v", opts.LeafNode.TLSHandshakeFirstFallback)
3✔
1112
        }
3✔
1113
        if l.compressionChanged {
51✔
1114
                s.Noticef("Reloaded: LeafNode Compression value is: %v", opts.LeafNode.Compression)
21✔
1115
        }
21✔
1116

1117
        var close []*client
30✔
1118
        var enable []*leafNodeCfg
30✔
1119
        var removed bool
30✔
1120

30✔
1121
        s.mu.Lock()
30✔
1122
        acceptSideCompOpts := &opts.LeafNode.Compression
30✔
1123
        // First go over existing leafnode remote configurations and
30✔
1124
        // either remove if no longer present, or update the config.
30✔
1125
        for lrc := range s.leafRemoteCfgs {
40✔
1126
                rlo := l.changed[lrc]
10✔
1127
                if rlo == nil {
10✔
1128
                        delete(s.leafRemoteCfgs, lrc)
×
1129
                        removed = true
×
1130
                        if s.rmLeafRemoteCfgs == nil {
×
1131
                                s.rmLeafRemoteCfgs = make(map[string]*leafNodeCfg)
×
1132
                        }
×
1133
                        s.rmLeafRemoteCfgs[lrc.name()] = lrc
×
1134
                        lrc.markAsRemoved()
×
1135
                        s.Noticef("Reloaded: LeafNode Remote %s removed", lrc.RemoteLeafOpts.safeName())
×
1136
                        // We will close the existing connection in the next for-loop.
×
1137
                        continue
×
1138
                }
1139
                lrc.Lock()
10✔
1140
                // TLSConfig is always applied.
10✔
1141
                lrc.TLSConfig = rlo.opts.TLSConfig.Clone()
10✔
1142
                // Now update what has been detected has changed.
10✔
1143
                if rlo.tlsFirstChanged {
11✔
1144
                        lrc.TLSHandshakeFirst = rlo.opts.TLSHandshakeFirst
1✔
1145
                        s.Noticef("Reloaded: LeafNode Remote %s TLS HandshakeFirst value is: %v",
1✔
1146
                                lrc.RemoteLeafOpts.safeName(), rlo.opts.TLSHandshakeFirst)
1✔
1147
                }
1✔
1148
                if rlo.compressionChanged {
10✔
1149
                        lrc.Compression = rlo.opts.Compression
×
1150
                        s.Noticef("Reloaded: LeafNode Remote %s Compression value is: %v",
×
1151
                                lrc.RemoteLeafOpts.safeName(), rlo.opts.Compression)
×
1152
                }
×
1153
                if rlo.disabledChanged {
14✔
1154
                        // Change to new value.
4✔
1155
                        lrc.Disabled = rlo.opts.Disabled
4✔
1156
                        if lrc.Disabled {
6✔
1157
                                lrc.notifyQuitChannel()
2✔
1158
                        } else {
4✔
1159
                                enable = append(enable, lrc)
2✔
1160
                        }
2✔
1161
                        s.Noticef("Reloaded: LeafNode Remote %s Disabled value is: %v",
4✔
1162
                                lrc.RemoteLeafOpts.safeName(), rlo.opts.Disabled)
4✔
1163
                }
1164
                lrc.Unlock()
10✔
1165
        }
1166
        // Second, go over existing leaf connections and apply compression
1167
        // changes (if applicable) and collect connections that need to be
1168
        // closed and/or disabled.
1169
        for _, c := range s.leafs {
37✔
1170
                var co *CompressionOpts
7✔
1171

7✔
1172
                c.mu.Lock()
7✔
1173
                if r := c.leaf.remote; r != nil {
11✔
1174
                        rlo := l.changed[r]
4✔
1175
                        // If the config is not in the `changed` map, or the new config says that
4✔
1176
                        // the connection is disabled, collect so we can close it after the server
4✔
1177
                        // lock is released.
4✔
1178
                        if rlo == nil || (rlo.disabledChanged && rlo.opts.Disabled) {
5✔
1179
                                c.flags.set(noReconnect)
1✔
1180
                                close = append(close, c)
1✔
1181
                                c.mu.Unlock()
1✔
1182
                                continue
1✔
1183
                        }
1184
                        if rlo.compressionChanged {
3✔
1185
                                co = &r.Compression
×
1186
                        }
×
1187
                } else if l.compressionChanged {
6✔
1188
                        co = acceptSideCompOpts
3✔
1189
                }
3✔
1190
                if co != nil && applyCompressionChanges(c, co) {
8✔
1191
                        close = append(close, c)
2✔
1192
                }
2✔
1193
                c.mu.Unlock()
6✔
1194
        }
1195
        s.mu.Unlock()
30✔
1196

30✔
1197
        // Close the connections for which negotiation is required, have been disabled
30✔
1198
        // or simply removed.
30✔
1199
        for _, c := range close {
33✔
1200
                c.closeConnection(ClientClosed)
3✔
1201
        }
3✔
1202
        // Start the ones that have been enabled.
1203
        for _, r := range enable {
32✔
1204
                s.connectToRemoteLeafNodeAsynchronously(r, true)
2✔
1205
        }
2✔
1206
        // Finally, deal with the ones that have been added.
1207
        if len(l.added) > 0 {
30✔
1208
                s.solicitLeafNodeRemotes(l.added)
×
1209
        }
×
1210
        // Deal with removed configs. Make sure there are no connect-in-progress.
1211
        // If there are still some, have a go routine to check in the background.
1212
        if removed {
30✔
1213
                if checkAgain := checkRemovedLeafNodeCfgs(s); checkAgain {
×
1214
                        checkRemovedLeafNodeCfgsAsync(s)
×
1215
                }
×
1216
        }
1217
}
1218

1219
// Go through the removed remote leafnode configs map to check if the
1220
// connect-in-progress flag is set. If not, remove from the map.
1221
// Returns `true` if there are still some that are in progress.
1222
func checkRemovedLeafNodeCfgs(s *Server) bool {
×
1223
        var inProgress int
×
1224
        s.mu.Lock()
×
1225
        for rn, r := range s.rmLeafRemoteCfgs {
×
1226
                if r.isConnectInProgress() {
×
1227
                        inProgress++
×
1228
                } else {
×
1229
                        delete(s.rmLeafRemoteCfgs, rn)
×
1230
                }
×
1231
        }
1232
        s.mu.Unlock()
×
1233
        // Needs to be called again if inProgress > 0
×
1234
        return inProgress > 0
×
1235
}
1236

1237
// Will start a go routine that will periodically call `checkRemovedLeafNodeCfgs`.
1238
// When the removed map has been emptied, the go routine will end. It is ok for
1239
// this function to be invoked multiple times and have multiple instances running
1240
// concurrently.
1241
func checkRemovedLeafNodeCfgsAsync(s *Server) {
×
1242
        s.startGoRoutine(func() {
×
1243
                defer s.grWG.Done()
×
1244
                tick := time.NewTicker(50 * time.Millisecond)
×
1245
                defer tick.Stop()
×
1246
                for {
×
1247
                        select {
×
1248
                        case <-tick.C:
×
1249
                                if checkAgain := checkRemovedLeafNodeCfgs(s); !checkAgain {
×
1250
                                        return
×
1251
                                }
×
1252
                        case <-s.quitCh:
×
1253
                                return
×
1254
                        }
1255
                }
1256
        })
1257
}
1258

1259
// The `co` compression options are applied to the given leaf connection `c`.
1260
// If a "restart" of the connection is needed, will return true, false otherwise.
1261
func applyCompressionChanges(c *client, co *CompressionOpts) bool {
3✔
1262
        newMode := co.Mode
3✔
1263
        // Skip leaf connections that are "not supported" (because they
3✔
1264
        // will never do compression) or the ones that have already the
3✔
1265
        // new compression mode.
3✔
1266
        if c.leaf.compression == CompressionNotSupported || c.leaf.compression == newMode {
4✔
1267
                return false
1✔
1268
        }
1✔
1269
        // We need to close the connections if it had compression "off" or the new
1270
        // mode is compression "off", or if the new mode is "accept", because
1271
        // these require negotiation.
1272
        if c.leaf.compression == CompressionOff || newMode == CompressionOff || newMode == CompressionAccept {
4✔
1273
                return true
2✔
1274
        } else if newMode == CompressionS2Auto {
2✔
1275
                // If the mode is "s2_auto", we need to check if there is really
×
1276
                // need to change, and at any rate, we want to save the actual
×
1277
                // compression level here, not s2_auto.
×
1278
                c.updateS2AutoCompressionLevel(co, &c.leaf.compression)
×
1279
        } else {
×
1280
                // Simply change the compression writer
×
1281
                c.out.cw = s2.NewWriter(nil, s2WriterOptions(newMode)...)
×
1282
                c.leaf.compression = newMode
×
1283
        }
×
1284
        return false
×
1285
}
1286

1287
type noFastProdStallReload struct {
1288
        noopOption
1289
        noStall bool
1290
}
1291

1292
func (l *noFastProdStallReload) Apply(s *Server) {
×
1293
        var not string
×
1294
        if l.noStall {
×
1295
                not = "not "
×
1296
        }
×
1297
        s.Noticef("Reloaded: fast producers will %sbe stalled", not)
×
1298
}
1299

1300
// Compares options and disconnects clients that are no longer listed in pinned certs. Lock must not be held.
1301
func (s *Server) recheckPinnedCerts(curOpts *Options, newOpts *Options) {
1,127✔
1302
        s.mu.Lock()
1,127✔
1303
        disconnectClients := []*client{}
1,127✔
1304
        protoToPinned := map[int]PinnedCertSet{}
1,127✔
1305
        if !reflect.DeepEqual(newOpts.TLSPinnedCerts, curOpts.TLSPinnedCerts) {
1,128✔
1306
                protoToPinned[NATS] = curOpts.TLSPinnedCerts
1✔
1307
        }
1✔
1308
        if !reflect.DeepEqual(newOpts.MQTT.TLSPinnedCerts, curOpts.MQTT.TLSPinnedCerts) {
1,127✔
1309
                protoToPinned[MQTT] = curOpts.MQTT.TLSPinnedCerts
×
1310
        }
×
1311
        if !reflect.DeepEqual(newOpts.Websocket.TLSPinnedCerts, curOpts.Websocket.TLSPinnedCerts) {
1,127✔
1312
                protoToPinned[WS] = curOpts.Websocket.TLSPinnedCerts
×
1313
        }
×
1314
        for _, c := range s.clients {
3,214✔
1315
                if c.kind != CLIENT {
2,087✔
1316
                        continue
×
1317
                }
1318
                if pinned, ok := protoToPinned[c.clientType()]; ok {
2,087✔
1319
                        if !c.matchesPinnedCert(pinned) {
×
1320
                                disconnectClients = append(disconnectClients, c)
×
1321
                        }
×
1322
                }
1323
        }
1324
        checkClients := func(kind int, clients map[uint64]*client, set PinnedCertSet) {
1,195✔
1325
                for _, c := range clients {
170✔
1326
                        if c.kind == kind && !c.matchesPinnedCert(set) {
102✔
1327
                                disconnectClients = append(disconnectClients, c)
×
1328
                        }
×
1329
                }
1330
        }
1331
        if !reflect.DeepEqual(newOpts.LeafNode.TLSPinnedCerts, curOpts.LeafNode.TLSPinnedCerts) {
1,127✔
1332
                checkClients(LEAF, s.leafs, newOpts.LeafNode.TLSPinnedCerts)
×
1333
        }
×
1334
        if !reflect.DeepEqual(newOpts.Cluster.TLSPinnedCerts, curOpts.Cluster.TLSPinnedCerts) {
1,128✔
1335
                s.forEachRoute(func(c *client) {
1✔
1336
                        if !c.matchesPinnedCert(newOpts.Cluster.TLSPinnedCerts) {
×
1337
                                disconnectClients = append(disconnectClients, c)
×
1338
                        }
×
1339
                })
1340
        }
1341
        if s.gateway.enabled && reflect.DeepEqual(newOpts.Gateway.TLSPinnedCerts, curOpts.Gateway.TLSPinnedCerts) {
1,195✔
1342
                gw := s.gateway
68✔
1343
                gw.RLock()
68✔
1344
                for _, c := range gw.out {
171✔
1345
                        if !c.matchesPinnedCert(newOpts.Gateway.TLSPinnedCerts) {
103✔
1346
                                disconnectClients = append(disconnectClients, c)
×
1347
                        }
×
1348
                }
1349
                checkClients(GATEWAY, gw.in, newOpts.Gateway.TLSPinnedCerts)
68✔
1350
                gw.RUnlock()
68✔
1351
        }
1352
        s.mu.Unlock()
1,127✔
1353
        if len(disconnectClients) > 0 {
1,127✔
1354
                s.Noticef("Disconnect %d clients due to pinned certs reload", len(disconnectClients))
×
1355
                for _, c := range disconnectClients {
×
1356
                        c.closeConnection(TLSHandshakeError)
×
1357
                }
×
1358
        }
1359
}
1360

1361
type proxiesReload struct {
1362
        noopOption
1363
        add []string
1364
        del []string
1365
}
1366

1367
func (p *proxiesReload) Apply(s *Server) {
1✔
1368
        var clients []*client
1✔
1369
        s.mu.Lock()
1✔
1370
        for _, k := range p.del {
2✔
1371
                cc := s.proxiedConns[k]
1✔
1372
                delete(s.proxiedConns, k)
1✔
1373
                if len(cc) > 0 {
2✔
1374
                        for _, c := range cc {
3✔
1375
                                clients = append(clients, c)
2✔
1376
                        }
2✔
1377
                }
1378
        }
1379
        s.processProxiesTrustedKeys()
1✔
1380
        s.mu.Unlock()
1✔
1381
        if len(p.del) > 0 {
2✔
1382
                for _, c := range clients {
3✔
1383
                        c.setAuthError(ErrAuthProxyNotTrusted)
2✔
1384
                        c.authViolation()
2✔
1385
                }
2✔
1386
                s.Noticef("Reloaded: proxies trusted keys %q were removed", p.del)
1✔
1387
        }
1388
        if len(p.add) > 0 {
2✔
1389
                s.Noticef("Reloaded: proxies trusted keys %q were added", p.add)
1✔
1390
        }
1✔
1391
}
1392

1393
// Reload reads the current configuration file and calls out to ReloadOptions
1394
// to apply the changes. This returns an error if the server was not started
1395
// with a config file or an option which doesn't support hot-swapping was changed.
1396
func (s *Server) Reload() error {
1,131✔
1397
        s.mu.Lock()
1,131✔
1398
        configFile := s.configFile
1,131✔
1399
        s.mu.Unlock()
1,131✔
1400
        if configFile == "" {
1,131✔
1401
                return errors.New("can only reload config when a file is provided using -c or --config")
×
1402
        }
×
1403

1404
        newOpts, err := ProcessConfigFile(configFile)
1,131✔
1405
        if err != nil {
1,133✔
1406
                // TODO: Dump previous good config to a .bak file?
2✔
1407
                return err
2✔
1408
        }
2✔
1409
        return s.ReloadOptions(newOpts)
1,129✔
1410
}
1411

1412
// ReloadOptions applies any supported options from the provided Options
1413
// type. This returns an error if an option which doesn't support
1414
// hot-swapping was changed.
1415
// The provided Options type should not be re-used afterwards.
1416
// Either use Options.Clone() to pass a copy, or make a new one.
1417
func (s *Server) ReloadOptions(newOpts *Options) error {
1,129✔
1418
        s.reloadMu.Lock()
1,129✔
1419
        defer s.reloadMu.Unlock()
1,129✔
1420

1,129✔
1421
        s.mu.Lock()
1,129✔
1422

1,129✔
1423
        curOpts := s.getOpts()
1,129✔
1424

1,129✔
1425
        clientOrgPort := curOpts.Port
1,129✔
1426
        clusterOrgPort := curOpts.Cluster.Port
1,129✔
1427
        gatewayOrgPort := curOpts.Gateway.Port
1,129✔
1428
        leafnodesOrgPort := curOpts.LeafNode.Port
1,129✔
1429
        websocketOrgPort := curOpts.Websocket.Port
1,129✔
1430
        mqttOrgPort := curOpts.MQTT.Port
1,129✔
1431

1,129✔
1432
        s.mu.Unlock()
1,129✔
1433

1,129✔
1434
        // In case "-cluster ..." was provided through the command line, this will
1,129✔
1435
        // properly set the Cluster.Host/Port etc...
1,129✔
1436
        if l := curOpts.Cluster.ListenStr; l != _EMPTY_ {
1,129✔
1437
                newOpts.Cluster.ListenStr = l
×
1438
                overrideCluster(newOpts)
×
1439
        }
×
1440

1441
        // Apply flags over config file settings.
1442
        newOpts = MergeOptions(newOpts, FlagSnapshot)
1,129✔
1443

1,129✔
1444
        // Need more processing for boolean flags...
1,129✔
1445
        if FlagSnapshot != nil {
1,129✔
1446
                applyBoolFlags(newOpts, FlagSnapshot)
×
1447
        }
×
1448

1449
        setBaselineOptions(newOpts)
1,129✔
1450

1,129✔
1451
        // setBaselineOptions sets Port to 0 if set to -1 (RANDOM port)
1,129✔
1452
        // If that's the case, set it to the saved value when the accept loop was
1,129✔
1453
        // created.
1,129✔
1454
        if newOpts.Port == 0 {
2,257✔
1455
                newOpts.Port = clientOrgPort
1,128✔
1456
        }
1,128✔
1457
        // We don't do that for cluster, so check against -1.
1458
        if newOpts.Cluster.Port == -1 {
1,140✔
1459
                newOpts.Cluster.Port = clusterOrgPort
11✔
1460
        }
11✔
1461
        if newOpts.Gateway.Port == -1 {
1,194✔
1462
                newOpts.Gateway.Port = gatewayOrgPort
65✔
1463
        }
65✔
1464
        if newOpts.LeafNode.Port == -1 {
1,143✔
1465
                newOpts.LeafNode.Port = leafnodesOrgPort
14✔
1466
        }
14✔
1467
        if newOpts.Websocket.Port == -1 {
1,130✔
1468
                newOpts.Websocket.Port = websocketOrgPort
1✔
1469
        }
1✔
1470
        if newOpts.MQTT.Port == -1 {
1,129✔
1471
                newOpts.MQTT.Port = mqttOrgPort
×
1472
        }
×
1473

1474
        if err := s.reloadOptions(curOpts, newOpts); err != nil {
1,131✔
1475
                return err
2✔
1476
        }
2✔
1477

1478
        s.recheckPinnedCerts(curOpts, newOpts)
1,127✔
1479

1,127✔
1480
        s.mu.Lock()
1,127✔
1481
        s.configTime = time.Now().UTC()
1,127✔
1482
        s.updateVarzConfigReloadableFields(s.varz)
1,127✔
1483
        s.mu.Unlock()
1,127✔
1484
        return nil
1,127✔
1485
}
1486
func applyBoolFlags(newOpts, flagOpts *Options) {
×
1487
        // Reset fields that may have been set to `true` in
×
1488
        // MergeOptions() when some of the flags default to `true`
×
1489
        // but have not been explicitly set and therefore value
×
1490
        // from config file should take precedence.
×
1491
        for name, val := range newOpts.inConfig {
×
1492
                f := reflect.ValueOf(newOpts).Elem()
×
1493
                names := strings.Split(name, ".")
×
1494
                for _, name := range names {
×
1495
                        f = f.FieldByName(name)
×
1496
                }
×
1497
                f.SetBool(val)
×
1498
        }
1499
        // Now apply value (true or false) from flags that have
1500
        // been explicitly set in command line
1501
        for name, val := range flagOpts.inCmdLine {
×
1502
                f := reflect.ValueOf(newOpts).Elem()
×
1503
                names := strings.Split(name, ".")
×
1504
                for _, name := range names {
×
1505
                        f = f.FieldByName(name)
×
1506
                }
×
1507
                f.SetBool(val)
×
1508
        }
1509
}
1510

1511
// reloadOptions reloads the server config with the provided options. If an
1512
// option that doesn't support hot-swapping is changed, this returns an error.
1513
func (s *Server) reloadOptions(curOpts, newOpts *Options) error {
1,129✔
1514
        // Apply to the new options some of the options that may have been set
1,129✔
1515
        // that can't be configured in the config file (this can happen in
1,129✔
1516
        // applications starting NATS Server programmatically).
1,129✔
1517
        newOpts.CustomClientAuthentication = curOpts.CustomClientAuthentication
1,129✔
1518
        newOpts.CustomRouterAuthentication = curOpts.CustomRouterAuthentication
1,129✔
1519

1,129✔
1520
        // Do the validation before checking for differences. We need to ensure
1,129✔
1521
        // that the new options are valid. Note that there are possible side
1,129✔
1522
        // effects of calling validateOptions(), in that some default values
1,129✔
1523
        // may be set, etc... but that should be ok since the current options
1,129✔
1524
        // went through the same process on startup/previous reload.
1,129✔
1525
        if err := validateOptions(newOpts); err != nil {
1,129✔
1526
                return err
×
1527
        }
×
1528

1529
        changed, err := s.diffOptions(newOpts)
1,129✔
1530
        if err != nil {
1,131✔
1531
                return err
2✔
1532
        }
2✔
1533

1534
        // Create a context that is used to pass special info that we may need
1535
        // while applying the new options.
1536
        ctx := reloadContext{oldClusterPerms: curOpts.Cluster.Permissions}
1,127✔
1537
        s.setOpts(newOpts)
1,127✔
1538
        s.applyOptions(&ctx, changed)
1,127✔
1539
        return nil
1,127✔
1540
}
1541

1542
// For the purpose of comparing, impose a order on slice data types where order does not matter
1543
func imposeOrder(value any) error {
270,698✔
1544
        switch value := value.(type) {
270,698✔
1545
        case []*Account:
2,258✔
1546
                slices.SortFunc(value, func(i, j *Account) int { return cmp.Compare(i.Name, j.Name) })
7,283✔
1547
                for _, a := range value {
8,538✔
1548
                        slices.SortFunc(a.imports.streams, func(i, j *streamImport) int { return cmp.Compare(i.acc.Name, j.acc.Name) })
6,280✔
1549
                }
1550
        case []*User:
2,258✔
1551
                slices.SortFunc(value, func(i, j *User) int { return cmp.Compare(i.Username, j.Username) })
7,287✔
1552
        case []*NkeyUser:
2,258✔
1553
                slices.SortFunc(value, func(i, j *NkeyUser) int { return cmp.Compare(i.Nkey, j.Nkey) })
2,258✔
1554
        case []*url.URL:
2,254✔
1555
                slices.SortFunc(value, func(i, j *url.URL) int { return cmp.Compare(i.String(), j.String()) })
2,264✔
1556
        case []string:
2,236✔
1557
                slices.Sort(value)
2,236✔
1558
        case []*jwt.OperatorClaims:
2,254✔
1559
                slices.SortFunc(value, func(i, j *jwt.OperatorClaims) int { return cmp.Compare(i.Issuer, j.Issuer) })
2,254✔
1560
        case GatewayOpts:
2,258✔
1561
                slices.SortFunc(value.Gateways, func(i, j *RemoteGatewayOpts) int { return cmp.Compare(i.Name, j.Name) })
2,262✔
1562
        case WebsocketOpts:
2,254✔
1563
                slices.Sort(value.AllowedOrigins)
2,254✔
1564
        case string, bool, uint8, uint16, uint64, int, int32, int64, time.Duration, float64, nil, LeafNodeOpts, ClusterOpts, *tls.Config, PinnedCertSet,
1565
                *URLAccResolver, *MemAccResolver, *DirAccResolver, *CacheDirAccResolver, Authentication, MQTTOpts, jwt.TagList,
1566
                *OCSPConfig, map[string]string, map[string]bool, JSLimitOpts, StoreCipher, *OCSPResponseCacheConfig, *ProxiesConfig, WriteTimeoutPolicy:
248,154✔
1567
                // explicitly skipped types
1568
        case *AuthCallout:
2,258✔
1569
        case JSTpmOpts:
2,256✔
1570
        default:
×
1571
                // this will fail during unit tests
×
1572
                return fmt.Errorf("OnReload, sort or explicitly skip type: %s",
×
1573
                        reflect.TypeOf(value))
×
1574
        }
1575
        return nil
270,698✔
1576
}
1577

1578
// diffOptions returns a slice containing options which have been changed. If
1579
// an option that doesn't support hot-swapping is changed, this returns an
1580
// error.
1581
func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
1,129✔
1582
        var (
1,129✔
1583
                oldOpts   = s.getOpts()
1,129✔
1584
                oldConfig = reflect.ValueOf(oldOpts).Elem()
1,129✔
1585
                newConfig = reflect.ValueOf(newOpts).Elem()
1,129✔
1586
                diffOpts  = []option{}
1,129✔
1587
                skipTKeys = len(oldOpts.TrustedOperators) > 0 && len(oldOpts.TrustedKeys) > 0
1,129✔
1588

1,129✔
1589
                // Need to keep track of whether JS is being disabled
1,129✔
1590
                // to prevent changing limits at runtime.
1,129✔
1591
                jsEnabled           = s.JetStreamEnabled()
1,129✔
1592
                disableJS           bool
1,129✔
1593
                jsMemLimitsChanged  bool
1,129✔
1594
                jsFileLimitsChanged bool
1,129✔
1595
                jsStoreDirChanged   bool
1,129✔
1596
                jsLimitsUpdate      *jetStreamLimitsOption
1,129✔
1597
        )
1,129✔
1598
        for i := 0; i < oldConfig.NumField(); i++ {
151,138✔
1599
                field := oldConfig.Type().Field(i)
150,009✔
1600
                // field.PkgPath is empty for exported fields, and is not for unexported ones.
150,009✔
1601
                // We skip the unexported fields.
150,009✔
1602
                if field.PkgPath != _EMPTY_ {
164,660✔
1603
                        continue
14,651✔
1604
                }
1605
                optName := strings.ToLower(field.Name)
135,358✔
1606
                if skipTKeys && optName == "trustedkeys" {
135,367✔
1607
                        // TrustedOperators and TrustedKeys change is not supported. During options
9✔
1608
                        // validation, if they are both specified, a conflict error is returned.
9✔
1609
                        // If only TrustedOperators is specified, the TrustedKeys is filled with
9✔
1610
                        // the operators' signing keys. So here, if we detect that the current
9✔
1611
                        // options have operators, we don't do the trusted keys comparison, so
9✔
1612
                        // we can fail with the "not supported for TrustedOperators" config reload
9✔
1613
                        // error instead of TrustedKeys (that the user would not have set).
9✔
1614
                        continue
9✔
1615
                }
1616
                var (
135,349✔
1617
                        oldValue = oldConfig.Field(i).Interface()
135,349✔
1618
                        newValue = newConfig.Field(i).Interface()
135,349✔
1619
                )
135,349✔
1620
                if err := imposeOrder(oldValue); err != nil {
135,349✔
1621
                        return nil, err
×
1622
                }
×
1623
                if err := imposeOrder(newValue); err != nil {
135,349✔
1624
                        return nil, err
×
1625
                }
×
1626

1627
                // accounts and users (referencing accounts) will always differ as accounts
1628
                // contain internal state, say locks etc..., so we don't bother here.
1629
                // This also avoids races with atomic stats counters
1630
                if optName != "accounts" && optName != "users" {
268,440✔
1631
                        if changed := !reflect.DeepEqual(oldValue, newValue); !changed {
264,897✔
1632
                                // Check to make sure we are running JetStream if we think we should be.
131,806✔
1633
                                if optName == "jetstream" && newValue.(bool) {
131,817✔
1634
                                        if !jsEnabled {
11✔
1635
                                                diffOpts = append(diffOpts, &jetStreamOption{newValue: true})
×
1636
                                        }
×
1637
                                }
1638
                                continue
131,806✔
1639
                        }
1640
                }
1641
                switch optName {
3,543✔
1642
                case "traceverbose":
×
1643
                        diffOpts = append(diffOpts, &traceVerboseOption{newValue: newValue.(bool)})
×
1644
                case "traceheaders":
×
1645
                        diffOpts = append(diffOpts, &traceHeadersOption{newValue: newValue.(bool)})
×
1646
                case "trace":
×
1647
                        diffOpts = append(diffOpts, &traceOption{newValue: newValue.(bool)})
×
1648
                case "debug":
×
1649
                        diffOpts = append(diffOpts, &debugOption{newValue: newValue.(bool)})
×
1650
                case "logtime":
×
1651
                        diffOpts = append(diffOpts, &logtimeOption{newValue: newValue.(bool)})
×
1652
                case "logtimeutc":
×
1653
                        diffOpts = append(diffOpts, &logtimeUTCOption{newValue: newValue.(bool)})
×
1654
                case "logfile":
×
1655
                        diffOpts = append(diffOpts, &logfileOption{newValue: newValue.(string)})
×
1656
                case "syslog":
×
1657
                        diffOpts = append(diffOpts, &syslogOption{newValue: newValue.(bool)})
×
1658
                case "remotesyslog":
×
1659
                        diffOpts = append(diffOpts, &remoteSyslogOption{newValue: newValue.(string)})
×
1660
                case "tlsconfig":
18✔
1661
                        diffOpts = append(diffOpts, &tlsOption{newValue: newValue.(*tls.Config)})
18✔
1662
                case "tlstimeout":
×
1663
                        diffOpts = append(diffOpts, &tlsTimeoutOption{newValue: newValue.(float64)})
×
1664
                case "tlspinnedcerts":
1✔
1665
                        diffOpts = append(diffOpts, &tlsPinnedCertOption{newValue: newValue.(PinnedCertSet)})
1✔
1666
                case "tlshandshakefirst":
2✔
1667
                        diffOpts = append(diffOpts, &tlsHandshakeFirst{newValue: newValue.(bool)})
2✔
1668
                case "tlshandshakefirstfallback":
2✔
1669
                        diffOpts = append(diffOpts, &tlsHandshakeFirstFallback{newValue: newValue.(time.Duration)})
2✔
1670
                case "username":
×
1671
                        diffOpts = append(diffOpts, &usernameOption{})
×
1672
                case "password":
×
1673
                        diffOpts = append(diffOpts, &passwordOption{})
×
1674
                case "tags":
1✔
1675
                        diffOpts = append(diffOpts, &tagsOption{})
1✔
1676
                case "metadata":
×
1677
                        diffOpts = append(diffOpts, &metadataOption{})
×
1678
                case "authorization":
×
1679
                        diffOpts = append(diffOpts, &authorizationOption{})
×
1680
                case "authtimeout":
×
1681
                        diffOpts = append(diffOpts, &authTimeoutOption{newValue: newValue.(float64)})
×
1682
                case "users":
1,129✔
1683
                        diffOpts = append(diffOpts, &usersOption{})
1,129✔
1684
                case "nkeys":
×
1685
                        diffOpts = append(diffOpts, &nkeysOption{})
×
1686
                case "cluster":
23✔
1687
                        newClusterOpts := newValue.(ClusterOpts)
23✔
1688
                        oldClusterOpts := oldValue.(ClusterOpts)
23✔
1689
                        if err := validateClusterOpts(oldClusterOpts, newClusterOpts); err != nil {
23✔
1690
                                return nil, err
×
1691
                        }
×
1692
                        co := &clusterOption{
23✔
1693
                                newValue:        newClusterOpts,
23✔
1694
                                permsChanged:    !reflect.DeepEqual(newClusterOpts.Permissions, oldClusterOpts.Permissions),
23✔
1695
                                compressChanged: !oldClusterOpts.Compression.equals(&newClusterOpts.Compression),
23✔
1696
                        }
23✔
1697
                        co.diffPoolAndAccounts(&oldClusterOpts)
23✔
1698
                        // If there are added accounts, first make sure that we can look them up.
23✔
1699
                        // If we can't let's fail the reload.
23✔
1700
                        for _, acc := range co.accsAdded {
24✔
1701
                                if _, err := s.LookupAccount(acc); err != nil {
1✔
1702
                                        return nil, fmt.Errorf("unable to add account %q to the list of dedicated routes: %v", acc, err)
×
1703
                                }
×
1704
                        }
1705
                        // If pool_size has been set to negative (but was not before), then let's
1706
                        // add the system account to the list of removed accounts (we don't have
1707
                        // to check if already there, duplicates are ok in that case).
1708
                        if newClusterOpts.PoolSize < 0 && oldClusterOpts.PoolSize >= 0 {
23✔
1709
                                if sys := s.SystemAccount(); sys != nil {
×
1710
                                        co.accsRemoved = append(co.accsRemoved, sys.GetName())
×
1711
                                }
×
1712
                        }
1713
                        diffOpts = append(diffOpts, co)
23✔
1714
                case "routes":
2✔
1715
                        add, remove := diffRoutes(oldValue.([]*url.URL), newValue.([]*url.URL))
2✔
1716
                        diffOpts = append(diffOpts, &routesOption{add: add, remove: remove})
2✔
1717
                case "maxconn":
×
1718
                        diffOpts = append(diffOpts, &maxConnOption{newValue: newValue.(int)})
×
1719
                case "pidfile":
×
1720
                        diffOpts = append(diffOpts, &pidFileOption{newValue: newValue.(string)})
×
1721
                case "portsfiledir":
1✔
1722
                        diffOpts = append(diffOpts, &portsFileDirOption{newValue: newValue.(string), oldValue: oldValue.(string)})
1✔
1723
                case "maxcontrolline":
×
1724
                        diffOpts = append(diffOpts, &maxControlLineOption{newValue: newValue.(int32)})
×
1725
                case "maxpayload":
×
1726
                        diffOpts = append(diffOpts, &maxPayloadOption{newValue: newValue.(int32)})
×
1727
                case "pinginterval":
2✔
1728
                        diffOpts = append(diffOpts, &pingIntervalOption{newValue: newValue.(time.Duration)})
2✔
1729
                case "maxpingsout":
×
1730
                        diffOpts = append(diffOpts, &maxPingsOutOption{newValue: newValue.(int)})
×
1731
                case "writedeadline":
×
1732
                        diffOpts = append(diffOpts, &writeDeadlineOption{newValue: newValue.(time.Duration)})
×
1733
                case "clientadvertise":
×
1734
                        cliAdv := newValue.(string)
×
1735
                        if cliAdv != "" {
×
1736
                                // Validate ClientAdvertise syntax
×
1737
                                if _, _, err := parseHostPort(cliAdv, 0); err != nil {
×
1738
                                        return nil, fmt.Errorf("invalid ClientAdvertise value of %s, err=%v", cliAdv, err)
×
1739
                                }
×
1740
                        }
1741
                        diffOpts = append(diffOpts, &clientAdvertiseOption{newValue: cliAdv})
×
1742
                case "accounts":
1,129✔
1743
                        diffOpts = append(diffOpts, &accountsOption{})
1,129✔
1744
                case "resolver", "accountresolver", "accountsresolver":
8✔
1745
                        // We can't move from no resolver to one. So check for that.
8✔
1746
                        if (oldValue == nil && newValue != nil) ||
8✔
1747
                                (oldValue != nil && newValue == nil) {
8✔
1748
                                return nil, fmt.Errorf("config reload does not support moving to or from an account resolver")
×
1749
                        }
×
1750
                        diffOpts = append(diffOpts, &accountsOption{})
8✔
1751
                case "accountresolvertlsconfig":
×
1752
                        diffOpts = append(diffOpts, &accountsOption{})
×
1753
                case "gateway":
62✔
1754
                        // Not supported for now, but report warning if configuration of gateway
62✔
1755
                        // is actually changed so that user knows that it won't take effect.
62✔
1756

62✔
1757
                        // Any deep-equal is likely to fail for when there is a TLSConfig. so
62✔
1758
                        // remove for the test.
62✔
1759
                        tmpOld := oldValue.(GatewayOpts)
62✔
1760
                        tmpNew := newValue.(GatewayOpts)
62✔
1761
                        tmpOld.TLSConfig = nil
62✔
1762
                        tmpNew.TLSConfig = nil
62✔
1763
                        tmpOld.tlsConfigOpts = nil
62✔
1764
                        tmpNew.tlsConfigOpts = nil
62✔
1765

62✔
1766
                        // Need to do the same for remote gateways' TLS configs.
62✔
1767
                        // But we can't just set remotes' TLSConfig to nil otherwise this
62✔
1768
                        // would lose the real TLS configuration.
62✔
1769
                        tmpOld.Gateways = copyRemoteGWConfigsWithoutTLSConfig(tmpOld.Gateways)
62✔
1770
                        tmpNew.Gateways = copyRemoteGWConfigsWithoutTLSConfig(tmpNew.Gateways)
62✔
1771

62✔
1772
                        // If there is really a change prevents reload.
62✔
1773
                        if !reflect.DeepEqual(tmpOld, tmpNew) {
62✔
1774
                                // See TODO(ik) note below about printing old/new values.
×
1775
                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1776
                                        field.Name, oldValue, newValue)
×
1777
                        }
×
1778
                case "leafnode":
33✔
1779
                        tmpOld := oldValue.(LeafNodeOpts)
33✔
1780
                        tmpNew := newValue.(LeafNodeOpts)
33✔
1781

33✔
1782
                        lno, err := getLeafNodeOptionsChanges(s, &tmpOld, &tmpNew)
33✔
1783
                        // If there was an unsupported change, we will get an error with the name
33✔
1784
                        // of the (first) field and its old and new value.
33✔
1785
                        if err != nil {
33✔
1786
                                return nil, fmt.Errorf("config reload not supported for %s: %v", field.Name, err)
×
1787
                        }
×
1788
                        // If there was an actual change...
1789
                        if lno != nil {
63✔
1790
                                diffOpts = append(diffOpts, lno)
30✔
1791
                        }
30✔
1792
                case "jetstream":
3✔
1793
                        new := newValue.(bool)
3✔
1794
                        old := oldValue.(bool)
3✔
1795
                        if new != old {
6✔
1796
                                diffOpts = append(diffOpts, &jetStreamOption{newValue: new})
3✔
1797
                        }
3✔
1798

1799
                        // Mark whether JS will be disabled.
1800
                        disableJS = !new
3✔
1801
                case "storedir":
3✔
1802
                        new := newValue.(string)
3✔
1803
                        old := oldValue.(string)
3✔
1804
                        modified := new != old
3✔
1805

3✔
1806
                        // Check whether JS is being disabled and/or storage dir attempted to change.
3✔
1807
                        if jsEnabled && modified {
5✔
1808
                                if new == _EMPTY_ {
4✔
1809
                                        // This means that either JS is being disabled or it is using an temp dir.
2✔
1810
                                        // Allow the change but error in case JS was not disabled.
2✔
1811
                                        jsStoreDirChanged = true
2✔
1812
                                } else {
2✔
1813
                                        return nil, fmt.Errorf("config reload not supported for jetstream storage directory")
×
1814
                                }
×
1815
                        }
1816
                case "jetstreammaxmemory", "jetstreammaxstore":
6✔
1817
                        old := oldValue.(int64)
6✔
1818
                        new := newValue.(int64)
6✔
1819

6✔
1820
                        // Check whether JS is being disabled and/or limits are being changed.
6✔
1821
                        var (
6✔
1822
                                modified  = new != old
6✔
1823
                                fromUnset = old == -1
6✔
1824
                                fromSet   = !fromUnset
6✔
1825
                                toUnset   = new == -1
6✔
1826
                                toSet     = !toUnset
6✔
1827
                                increased = fromSet && toSet && new > old
6✔
1828
                        )
6✔
1829
                        if jsEnabled && modified {
10✔
1830
                                // Cannot change limits from dynamic storage at runtime.
4✔
1831
                                switch {
4✔
1832
                                case increased:
×
1833
                                        // Allowed to increase, but not decrease.
×
1834
                                        if jsLimitsUpdate == nil {
×
1835
                                                jsLimitsUpdate = &jetStreamLimitsOption{}
×
1836
                                                diffOpts = append(diffOpts, jsLimitsUpdate)
×
1837
                                        }
×
1838
                                        if optName == "jetstreammaxmemory" {
×
1839
                                                jsLimitsUpdate.newMaxMemory = new
×
1840
                                        } else {
×
1841
                                                jsLimitsUpdate.newMaxStore = new
×
1842
                                        }
×
1843
                                case fromSet && toUnset:
4✔
1844
                                        // Limits changed but it may mean that JS is being disabled,
4✔
1845
                                        // keep track of the change and error in case it is not.
4✔
1846
                                        if optName == "jetstreammaxmemory" {
6✔
1847
                                                jsMemLimitsChanged = true
2✔
1848
                                        } else {
4✔
1849
                                                jsFileLimitsChanged = true
2✔
1850
                                        }
2✔
1851
                                case fromUnset && toSet:
×
1852
                                        // Prevent changing from dynamic max memory / file at runtime.
×
1853
                                        return nil, fmt.Errorf("config reload not supported for jetstream dynamic max memory and store")
×
1854
                                default:
×
1855
                                        return nil, fmt.Errorf("config reload not supported for decreasing jetstream max memory and store")
×
1856
                                }
1857
                        }
1858
                case "jetstreammetacompact", "jetstreammetacompactsize", "jetstreammetacompactsync":
2✔
1859
                        // Allowed at runtime but monitorCluster looks at s.opts directly, so no further work needed here.
1860
                case "websocket":
×
1861
                        // Similar to gateways
×
1862
                        tmpOld := oldValue.(WebsocketOpts)
×
1863
                        tmpNew := newValue.(WebsocketOpts)
×
1864
                        tmpOld.TLSConfig, tmpOld.tlsConfigOpts = nil, nil
×
1865
                        tmpNew.TLSConfig, tmpNew.tlsConfigOpts = nil, nil
×
1866
                        // If there is really a change prevents reload.
×
1867
                        if !reflect.DeepEqual(tmpOld, tmpNew) {
×
1868
                                // See TODO(ik) note below about printing old/new values.
×
1869
                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1870
                                        field.Name, oldValue, newValue)
×
1871
                        }
×
1872
                case "mqtt":
×
1873
                        diffOpts = append(diffOpts, &mqttAckWaitReload{newValue: newValue.(MQTTOpts).AckWait})
×
1874
                        diffOpts = append(diffOpts, &mqttMaxAckPendingReload{newValue: newValue.(MQTTOpts).MaxAckPending})
×
1875
                        diffOpts = append(diffOpts, &mqttStreamReplicasReload{newValue: newValue.(MQTTOpts).StreamReplicas})
×
1876
                        diffOpts = append(diffOpts, &mqttConsumerReplicasReload{newValue: newValue.(MQTTOpts).ConsumerReplicas})
×
1877
                        diffOpts = append(diffOpts, &mqttConsumerMemoryStorageReload{newValue: newValue.(MQTTOpts).ConsumerMemoryStorage})
×
1878
                        diffOpts = append(diffOpts, &mqttInactiveThresholdReload{newValue: newValue.(MQTTOpts).ConsumerInactiveThreshold})
×
1879

×
1880
                        // Nil out/set to 0 the options that we allow to be reloaded so that
×
1881
                        // we only fail reload if some that we don't support are changed.
×
1882
                        tmpOld := oldValue.(MQTTOpts)
×
1883
                        tmpNew := newValue.(MQTTOpts)
×
1884
                        tmpOld.TLSConfig, tmpOld.tlsConfigOpts, tmpOld.AckWait, tmpOld.MaxAckPending, tmpOld.StreamReplicas, tmpOld.ConsumerReplicas, tmpOld.ConsumerMemoryStorage = nil, nil, 0, 0, 0, 0, false
×
1885
                        tmpOld.ConsumerInactiveThreshold = 0
×
1886
                        tmpNew.TLSConfig, tmpNew.tlsConfigOpts, tmpNew.AckWait, tmpNew.MaxAckPending, tmpNew.StreamReplicas, tmpNew.ConsumerReplicas, tmpNew.ConsumerMemoryStorage = nil, nil, 0, 0, 0, 0, false
×
1887
                        tmpNew.ConsumerInactiveThreshold = 0
×
1888

×
1889
                        if !reflect.DeepEqual(tmpOld, tmpNew) {
×
1890
                                // See TODO(ik) note below about printing old/new values.
×
1891
                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1892
                                        field.Name, oldValue, newValue)
×
1893
                        }
×
1894
                        tmpNew.AckWait = newValue.(MQTTOpts).AckWait
×
1895
                        tmpNew.MaxAckPending = newValue.(MQTTOpts).MaxAckPending
×
1896
                        tmpNew.StreamReplicas = newValue.(MQTTOpts).StreamReplicas
×
1897
                        tmpNew.ConsumerReplicas = newValue.(MQTTOpts).ConsumerReplicas
×
1898
                        tmpNew.ConsumerMemoryStorage = newValue.(MQTTOpts).ConsumerMemoryStorage
×
1899
                        tmpNew.ConsumerInactiveThreshold = newValue.(MQTTOpts).ConsumerInactiveThreshold
×
1900
                case "connecterrorreports":
×
1901
                        diffOpts = append(diffOpts, &connectErrorReports{newValue: newValue.(int)})
×
1902
                case "reconnecterrorreports":
×
1903
                        diffOpts = append(diffOpts, &reconnectErrorReports{newValue: newValue.(int)})
×
1904
                case "nolog", "nosigs":
106✔
1905
                        // Ignore NoLog and NoSigs options since they are not parsed and only used in
106✔
1906
                        // testing.
106✔
1907
                        continue
106✔
1908
                case "disableshortfirstping":
×
1909
                        newOpts.DisableShortFirstPing = oldValue.(bool)
×
1910
                        continue
×
1911
                case "maxtracedmsglen":
×
1912
                        diffOpts = append(diffOpts, &maxTracedMsgLenOption{newValue: newValue.(int)})
×
1913
                case "port":
×
1914
                        // check to see if newValue == 0 and continue if so.
×
1915
                        if newValue == 0 {
×
1916
                                // ignore RANDOM_PORT
×
1917
                                continue
×
1918
                        }
1919
                        fallthrough
×
1920
                case "noauthuser":
2✔
1921
                        if oldValue != _EMPTY_ && newValue == _EMPTY_ {
4✔
1922
                                for _, user := range newOpts.Users {
4✔
1923
                                        if user.Username == oldValue {
2✔
1924
                                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1925
                                                        field.Name, oldValue, newValue)
×
1926
                                        }
×
1927
                                }
1928
                        } else {
×
1929
                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1930
                                        field.Name, oldValue, newValue)
×
1931
                        }
×
1932
                case "defaultsentinel":
×
1933
                        diffOpts = append(diffOpts, &defaultSentinelOption{newValue: newValue.(string)})
×
1934
                case "systemaccount":
1,002✔
1935
                        if oldValue != DEFAULT_SYSTEM_ACCOUNT || newValue != _EMPTY_ {
1,002✔
1936
                                return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
×
1937
                                        field.Name, oldValue, newValue)
×
1938
                        }
×
1939
                case "ocspconfig":
2✔
1940
                        diffOpts = append(diffOpts, &ocspOption{newValue: newValue.(*OCSPConfig)})
2✔
1941
                case "ocspcacheconfig":
1✔
1942
                        diffOpts = append(diffOpts, &ocspResponseCacheOption{newValue: newValue.(*OCSPResponseCacheConfig)})
1✔
1943
                case "profblockrate":
×
1944
                        new := newValue.(int)
×
1945
                        old := oldValue.(int)
×
1946
                        if new != old {
×
1947
                                diffOpts = append(diffOpts, &profBlockRateReload{newValue: new})
×
1948
                        }
×
1949
                case "configdigest":
×
1950
                        // skip changes in config digest, this is handled already while
×
1951
                        // processing the config.
×
1952
                        continue
×
1953
                case "nofastproducerstall":
×
1954
                        diffOpts = append(diffOpts, &noFastProdStallReload{noStall: newValue.(bool)})
×
1955
                case "proxies":
1✔
1956
                        new := newValue.(*ProxiesConfig)
1✔
1957
                        old := oldValue.(*ProxiesConfig)
1✔
1958
                        if add, del := diffProxiesTrustedKeys(old.Trusted, new.Trusted); len(add) > 0 || len(del) > 0 {
2✔
1959
                                diffOpts = append(diffOpts, &proxiesReload{add: add, del: del})
1✔
1960
                        }
1✔
1961
                default:
2✔
1962
                        // TODO(ik): Implement String() on those options to have a nice print.
2✔
1963
                        // %v is difficult to figure what's what, %+v print private fields and
2✔
1964
                        // would print passwords. Tried json.Marshal but it is too verbose for
2✔
1965
                        // the URL array.
2✔
1966

2✔
1967
                        // Bail out if attempting to reload any unsupported options.
2✔
1968
                        return nil, fmt.Errorf("config reload not supported for %s: old=%v, new=%v",
2✔
1969
                                field.Name, oldValue, newValue)
2✔
1970
                }
1971
        }
1972

1973
        // If not disabling JS but limits have changed then it is an error.
1974
        if !disableJS {
2,252✔
1975
                if jsMemLimitsChanged || jsFileLimitsChanged {
1,125✔
1976
                        return nil, fmt.Errorf("config reload not supported for jetstream max memory and max store")
×
1977
                }
×
1978
                if jsStoreDirChanged {
1,125✔
1979
                        return nil, fmt.Errorf("config reload not supported for jetstream storage dir")
×
1980
                }
×
1981
        }
1982

1983
        return diffOpts, nil
1,127✔
1984
}
1985

1986
func copyRemoteGWConfigsWithoutTLSConfig(current []*RemoteGatewayOpts) []*RemoteGatewayOpts {
124✔
1987
        l := len(current)
124✔
1988
        if l == 0 {
166✔
1989
                return nil
42✔
1990
        }
42✔
1991
        rgws := make([]*RemoteGatewayOpts, 0, l)
82✔
1992
        for _, rcfg := range current {
164✔
1993
                cp := *rcfg
82✔
1994
                cp.TLSConfig = nil
82✔
1995
                cp.tlsConfigOpts = nil
82✔
1996
                rgws = append(rgws, &cp)
82✔
1997
        }
82✔
1998
        return rgws
82✔
1999
}
2000

2001
func (s *Server) applyOptions(ctx *reloadContext, opts []option) {
1,127✔
2002
        var (
1,127✔
2003
                reloadLogging      = false
1,127✔
2004
                reloadAuth         = false
1,127✔
2005
                reloadClusterPerms = false
1,127✔
2006
                reloadClientTrcLvl = false
1,127✔
2007
                reloadJetstream    = false
1,127✔
2008
                jsEnabled          = false
1,127✔
2009
                isStatszChange     = false
1,127✔
2010
                co                 *clusterOption
1,127✔
2011
        )
1,127✔
2012
        for _, opt := range opts {
3,478✔
2013
                opt.Apply(s)
2,351✔
2014
                if opt.IsLoggingChange() {
2,351✔
2015
                        reloadLogging = true
×
2016
                }
×
2017
                if opt.IsTraceLevelChange() {
2,351✔
2018
                        reloadClientTrcLvl = true
×
2019
                }
×
2020
                if opt.IsAuthChange() {
4,636✔
2021
                        reloadAuth = true
2,285✔
2022
                }
2,285✔
2023
                if opt.IsClusterPoolSizeOrAccountsChange() {
2,368✔
2024
                        co = opt.(*clusterOption)
17✔
2025
                }
17✔
2026
                if opt.IsClusterPermsChange() {
2,351✔
2027
                        reloadClusterPerms = true
×
2028
                }
×
2029
                if opt.IsJetStreamChange() {
2,354✔
2030
                        reloadJetstream = true
3✔
2031
                        jsEnabled = opt.(*jetStreamOption).newValue
3✔
2032
                }
3✔
2033
                if opt.IsStatszChange() {
2,355✔
2034
                        isStatszChange = true
4✔
2035
                }
4✔
2036
        }
2037

2038
        if reloadLogging {
1,127✔
2039
                s.ConfigureLogger()
×
2040
        }
×
2041
        if reloadClientTrcLvl {
1,127✔
2042
                s.reloadClientTraceLevel()
×
2043
        }
×
2044
        if reloadAuth {
2,254✔
2045
                s.reloadAuthorization()
1,127✔
2046
        }
1,127✔
2047
        if reloadClusterPerms {
1,127✔
2048
                s.reloadClusterPermissions(ctx.oldClusterPerms)
×
2049
        }
×
2050
        newOpts := s.getOpts()
1,127✔
2051
        // If we need to reload cluster pool/per-account, then co will be not nil
1,127✔
2052
        if co != nil {
1,144✔
2053
                s.reloadClusterPoolAndAccounts(co, newOpts)
17✔
2054
        }
17✔
2055
        if reloadJetstream {
1,130✔
2056
                if !jsEnabled {
5✔
2057
                        s.DisableJetStream()
2✔
2058
                } else if !s.JetStreamEnabled() {
4✔
2059
                        if err := s.restartJetStream(); err != nil {
1✔
2060
                                s.Warnf("Can't start JetStream: %v", err)
×
2061
                        }
×
2062
                }
2063
                // Make sure to reset the internal loop's version of JS.
2064
                s.resetInternalLoopInfo()
3✔
2065
        }
2066
        if isStatszChange {
1,131✔
2067
                s.sendStatszUpdate()
4✔
2068
        }
4✔
2069

2070
        // For remote gateways, make sure that their TLS configuration
2071
        // is updated (since the config is "captured" early and changes would otherwise
2072
        // not be visible).
2073
        if s.gateway.enabled {
1,195✔
2074
                s.gateway.updateRemotesTLSConfig(newOpts)
68✔
2075
        }
68✔
2076

2077
        // Always restart OCSP monitoring on reload.
2078
        if err := s.reloadOCSP(); err != nil {
1,128✔
2079
                s.Warnf("Can't restart OCSP features: %v", err)
1✔
2080
        }
1✔
2081
        var cd string
1,127✔
2082
        if newOpts.configDigest != "" {
2,254✔
2083
                cd = fmt.Sprintf("(%s)", newOpts.configDigest)
1,127✔
2084
        }
1,127✔
2085
        s.Noticef("Reloaded server configuration %s", cd)
1,127✔
2086
}
2087

2088
// This will send a reset to the internal send loop.
2089
func (s *Server) resetInternalLoopInfo() {
3✔
2090
        var resetCh chan struct{}
3✔
2091
        s.mu.Lock()
3✔
2092
        if s.sys != nil {
6✔
2093
                // can't hold the lock as go routine reading it may be waiting for lock as well
3✔
2094
                resetCh = s.sys.resetCh
3✔
2095
        }
3✔
2096
        s.mu.Unlock()
3✔
2097

3✔
2098
        if resetCh != nil {
6✔
2099
                resetCh <- struct{}{}
3✔
2100
        }
3✔
2101
}
2102

2103
// Update all cached debug and trace settings for every client
2104
func (s *Server) reloadClientTraceLevel() {
×
2105
        opts := s.getOpts()
×
2106

×
2107
        if opts.NoLog {
×
2108
                return
×
2109
        }
×
2110

2111
        // Create a list of all clients.
2112
        // Update their trace level when not holding server or gateway lock
2113

2114
        s.mu.Lock()
×
2115
        clientCnt := 1 + len(s.clients) + len(s.grTmpClients) + s.numRoutes() + len(s.leafs)
×
2116
        s.mu.Unlock()
×
2117

×
2118
        s.gateway.RLock()
×
2119
        clientCnt += len(s.gateway.in) + len(s.gateway.outo)
×
2120
        s.gateway.RUnlock()
×
2121

×
2122
        clients := make([]*client, 0, clientCnt)
×
2123

×
2124
        s.mu.Lock()
×
2125
        if s.eventsEnabled() {
×
2126
                clients = append(clients, s.sys.client)
×
2127
        }
×
2128

2129
        cMaps := []map[uint64]*client{s.clients, s.grTmpClients, s.leafs}
×
2130
        for _, m := range cMaps {
×
2131
                for _, c := range m {
×
2132
                        clients = append(clients, c)
×
2133
                }
×
2134
        }
2135
        s.forEachRoute(func(c *client) {
×
2136
                clients = append(clients, c)
×
2137
        })
×
2138
        s.mu.Unlock()
×
2139

×
2140
        s.gateway.RLock()
×
2141
        for _, c := range s.gateway.in {
×
2142
                clients = append(clients, c)
×
2143
        }
×
2144
        clients = append(clients, s.gateway.outo...)
×
2145
        s.gateway.RUnlock()
×
2146

×
2147
        for _, c := range clients {
×
2148
                // client.trace is commonly read while holding the lock
×
2149
                c.mu.Lock()
×
2150
                c.setTraceLevel()
×
2151
                c.mu.Unlock()
×
2152
        }
×
2153
}
2154

2155
// reloadAuthorization reconfigures the server authorization settings,
2156
// disconnects any clients who are no longer authorized, and removes any
2157
// unauthorized subscriptions.
2158
func (s *Server) reloadAuthorization() {
1,127✔
2159
        // This map will contain the names of accounts that have their streams
1,127✔
2160
        // import configuration changed.
1,127✔
2161
        var awcsti map[string]struct{}
1,127✔
2162
        checkJetStream := false
1,127✔
2163
        opts := s.getOpts()
1,127✔
2164
        s.mu.Lock()
1,127✔
2165

1,127✔
2166
        deletedAccounts := make(map[string]*Account)
1,127✔
2167

1,127✔
2168
        // This can not be changed for now so ok to check server's trustedKeys unlocked.
1,127✔
2169
        // If plain configured accounts, process here.
1,127✔
2170
        if s.trustedKeys == nil {
2,245✔
2171
                // Make a map of the configured account names so we figure out the accounts
1,118✔
2172
                // that should be removed later on.
1,118✔
2173
                configAccs := make(map[string]struct{}, len(opts.Accounts))
1,118✔
2174
                for _, acc := range opts.Accounts {
4,258✔
2175
                        configAccs[acc.GetName()] = struct{}{}
3,140✔
2176
                }
3,140✔
2177
                // Now range over existing accounts and keep track of the ones deleted
2178
                // so some cleanup can be made after releasing the server lock.
2179
                s.accounts.Range(func(k, v any) bool {
5,430✔
2180
                        an, acc := k.(string), v.(*Account)
4,312✔
2181
                        // Exclude default and system account from this test since those
4,312✔
2182
                        // may not actually be in opts.Accounts.
4,312✔
2183
                        if an == DEFAULT_GLOBAL_ACCOUNT || an == DEFAULT_SYSTEM_ACCOUNT {
6,489✔
2184
                                return true
2,177✔
2185
                        }
2,177✔
2186
                        // Check check if existing account is still in opts.Accounts.
2187
                        if _, ok := configAccs[an]; !ok {
2,135✔
2188
                                deletedAccounts[an] = acc
×
2189
                                s.accounts.Delete(k)
×
2190
                        }
×
2191
                        return true
2,135✔
2192
                })
2193
                // This will update existing and add new ones.
2194
                awcsti, _ = s.configureAccounts(true)
1,118✔
2195
                s.configureAuthorization()
1,118✔
2196
                // Double check any JetStream configs.
1,118✔
2197
                checkJetStream = s.getJetStream() != nil
1,118✔
2198
        } else if opts.AccountResolver != nil {
18✔
2199
                s.configureResolver()
9✔
2200
                if _, ok := s.accResolver.(*MemAccResolver); ok {
17✔
2201
                        // Check preloads so we can issue warnings etc if needed.
8✔
2202
                        s.checkResolvePreloads()
8✔
2203
                        // With a memory resolver we want to do something similar to configured accounts.
8✔
2204
                        // We will walk the accounts and delete them if they are no longer present via fetch.
8✔
2205
                        // If they are present we will force a claim update to process changes.
8✔
2206
                        s.accounts.Range(func(k, v any) bool {
35✔
2207
                                acc := v.(*Account)
27✔
2208
                                // Skip global account.
27✔
2209
                                if acc == s.gacc {
35✔
2210
                                        return true
8✔
2211
                                }
8✔
2212
                                accName := acc.GetName()
19✔
2213
                                // Release server lock for following actions
19✔
2214
                                s.mu.Unlock()
19✔
2215
                                accClaims, claimJWT, _ := s.fetchAccountClaims(accName)
19✔
2216
                                if accClaims != nil {
36✔
2217
                                        if err := s.updateAccountWithClaimJWT(acc, claimJWT); err != nil {
17✔
2218
                                                s.Noticef("Reloaded: deleting account [bad claims]: %q", accName)
×
2219
                                                s.accounts.Delete(k)
×
2220
                                        }
×
2221
                                } else {
2✔
2222
                                        s.Noticef("Reloaded: deleting account [removed]: %q", accName)
2✔
2223
                                        s.accounts.Delete(k)
2✔
2224
                                }
2✔
2225
                                // Regrab server lock.
2226
                                s.mu.Lock()
19✔
2227
                                return true
19✔
2228
                        })
2229
                }
2230
        }
2231

2232
        var (
1,127✔
2233
                cclientsa [64]*client
1,127✔
2234
                cclients  = cclientsa[:0]
1,127✔
2235
                clientsa  [64]*client
1,127✔
2236
                clients   = clientsa[:0]
1,127✔
2237
                routesa   [64]*client
1,127✔
2238
                routes    = routesa[:0]
1,127✔
2239
        )
1,127✔
2240

1,127✔
2241
        // Gather clients that changed accounts. We will close them and they
1,127✔
2242
        // will reconnect, doing the right thing.
1,127✔
2243
        for _, client := range s.clients {
3,215✔
2244
                if s.clientHasMovedToDifferentAccount(client) {
2,088✔
2245
                        cclients = append(cclients, client)
×
2246
                } else {
2,088✔
2247
                        clients = append(clients, client)
2,088✔
2248
                }
2,088✔
2249
        }
2250
        s.forEachRoute(func(route *client) {
1,156✔
2251
                routes = append(routes, route)
29✔
2252
        })
29✔
2253
        // Check here for any system/internal clients which will not be in the servers map of normal clients.
2254
        if s.sys != nil && s.sys.account != nil && !opts.NoSystemAccount {
2,252✔
2255
                s.accounts.Store(s.sys.account.Name, s.sys.account)
1,125✔
2256
        }
1,125✔
2257

2258
        s.accounts.Range(func(k, v any) bool {
5,467✔
2259
                acc := v.(*Account)
4,340✔
2260
                acc.mu.RLock()
4,340✔
2261
                // Check for sysclients accounting, ignore the system account.
4,340✔
2262
                if acc.sysclients > 0 && (s.sys == nil || s.sys.account != acc) {
4,344✔
2263
                        for c := range acc.clients {
15✔
2264
                                if c.kind != CLIENT && c.kind != LEAF {
19✔
2265
                                        clients = append(clients, c)
8✔
2266
                                }
8✔
2267
                        }
2268
                }
2269
                acc.mu.RUnlock()
4,340✔
2270
                return true
4,340✔
2271
        })
2272

2273
        var resetCh chan struct{}
1,127✔
2274
        if s.sys != nil {
2,252✔
2275
                // can't hold the lock as go routine reading it may be waiting for lock as well
1,125✔
2276
                resetCh = s.sys.resetCh
1,125✔
2277
        }
1,125✔
2278
        s.mu.Unlock()
1,127✔
2279

1,127✔
2280
        // Clear some timers and remove service import subs for deleted accounts.
1,127✔
2281
        for _, acc := range deletedAccounts {
1,127✔
2282
                acc.mu.Lock()
×
2283
                clearTimer(&acc.etmr)
×
2284
                clearTimer(&acc.ctmr)
×
2285
                for _, se := range acc.exports.services {
×
2286
                        se.clearResponseThresholdTimer()
×
2287
                }
×
2288
                acc.mu.Unlock()
×
2289
                acc.removeAllServiceImportSubs()
×
2290
        }
2291

2292
        if resetCh != nil {
2,252✔
2293
                resetCh <- struct{}{}
1,125✔
2294
        }
1,125✔
2295

2296
        // Close clients that have moved accounts
2297
        for _, client := range cclients {
1,127✔
2298
                client.closeConnection(ClientClosed)
×
2299
        }
×
2300

2301
        for _, c := range clients {
3,223✔
2302
                // Disconnect any unauthorized clients.
2,096✔
2303
                // Ignore internal clients.
2,096✔
2304
                if (c.kind == CLIENT || c.kind == LEAF) && !s.isClientAuthorized(c) {
2,097✔
2305
                        c.authViolation()
1✔
2306
                        continue
1✔
2307
                }
2308
                // Check to make sure account is correct.
2309
                c.swapAccountAfterReload()
2,095✔
2310
                // Remove any unauthorized subscriptions and check for account imports.
2,095✔
2311
                c.processSubsOnConfigReload(awcsti)
2,095✔
2312
        }
2313

2314
        for _, route := range routes {
1,156✔
2315
                // Disconnect any unauthorized routes.
29✔
2316
                // Do this only for routes that were accepted, not initiated
29✔
2317
                // because in the later case, we don't have the user name/password
29✔
2318
                // of the remote server.
29✔
2319
                if !route.isSolicitedRoute() && !s.isRouterAuthorized(route) {
29✔
2320
                        route.setNoReconnect()
×
2321
                        route.authViolation()
×
2322
                }
×
2323
        }
2324

2325
        if res := s.AccountResolver(); res != nil {
1,136✔
2326
                res.Reload()
9✔
2327
        }
9✔
2328

2329
        // We will double check all JetStream configs on a reload.
2330
        if checkJetStream {
1,138✔
2331
                if err := s.enableJetStreamAccounts(); err != nil {
11✔
2332
                        s.Errorf(err.Error())
×
2333
                }
×
2334
        }
2335

2336
        // Check that publish retained messages sources are still allowed to publish.
2337
        // Do this after dealing with JetStream.
2338
        s.mqttCheckPubRetainedPerms()
1,127✔
2339
}
2340

2341
// Returns true if given client current account has changed (or user
2342
// no longer exist) in the new config, false if the user did not
2343
// change accounts.
2344
// Server lock is held on entry.
2345
func (s *Server) clientHasMovedToDifferentAccount(c *client) bool {
2,088✔
2346
        var (
2,088✔
2347
                nu *NkeyUser
2,088✔
2348
                u  *User
2,088✔
2349
        )
2,088✔
2350
        c.mu.Lock()
2,088✔
2351
        defer c.mu.Unlock()
2,088✔
2352
        if c.opts.Nkey != _EMPTY_ {
2,088✔
2353
                if s.nkeys != nil {
×
2354
                        nu = s.nkeys[c.opts.Nkey]
×
2355
                }
×
2356
        } else if c.opts.Username != _EMPTY_ {
4,160✔
2357
                if s.users != nil {
4,144✔
2358
                        u = s.users[c.opts.Username]
2,072✔
2359
                }
2,072✔
2360
        } else {
16✔
2361
                return false
16✔
2362
        }
16✔
2363
        // Get the current account name
2364
        var curAccName string
2,072✔
2365
        if c.acc != nil {
4,144✔
2366
                curAccName = c.acc.Name
2,072✔
2367
        }
2,072✔
2368
        if nu != nil && nu.Account != nil {
2,072✔
2369
                return curAccName != nu.Account.Name
×
2370
        } else if u != nil && u.Account != nil {
4,144✔
2371
                return curAccName != u.Account.Name
2,072✔
2372
        }
2,072✔
2373
        // user/nkey no longer exists.
2374
        return true
×
2375
}
2376

2377
// reloadClusterPermissions reconfigures the cluster's permssions
2378
// and set the permissions to all existing routes, sending an
2379
// update INFO protocol so that remote can resend their local
2380
// subs if needed, and sending local subs matching cluster's
2381
// import subjects.
2382
func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) {
×
2383
        s.mu.Lock()
×
2384
        newPerms := s.getOpts().Cluster.Permissions
×
2385
        routes := make(map[uint64]*client, s.numRoutes())
×
2386
        // Get all connected routes
×
2387
        s.forEachRoute(func(route *client) {
×
2388
                route.mu.Lock()
×
2389
                routes[route.cid] = route
×
2390
                route.mu.Unlock()
×
2391
        })
×
2392
        // If new permissions is nil, then clear routeInfo import/export
2393
        if newPerms == nil {
×
2394
                s.routeInfo.Import = nil
×
2395
                s.routeInfo.Export = nil
×
2396
        } else {
×
2397
                s.routeInfo.Import = newPerms.Import
×
2398
                s.routeInfo.Export = newPerms.Export
×
2399
        }
×
2400
        infoJSON := generateInfoJSON(&s.routeInfo)
×
2401
        s.mu.Unlock()
×
2402

×
2403
        // Close connections for routes that don't understand async INFO.
×
2404
        for _, route := range routes {
×
2405
                route.mu.Lock()
×
2406
                close := route.opts.Protocol < RouteProtoInfo
×
2407
                cid := route.cid
×
2408
                route.mu.Unlock()
×
2409
                if close {
×
2410
                        route.closeConnection(RouteRemoved)
×
2411
                        delete(routes, cid)
×
2412
                }
×
2413
        }
2414

2415
        // If there are no route left, we are done
2416
        if len(routes) == 0 {
×
2417
                return
×
2418
        }
×
2419

2420
        // Fake clients to test cluster permissions
2421
        oldPermsTester := &client{}
×
2422
        oldPermsTester.setRoutePermissions(oldPerms)
×
2423
        newPermsTester := &client{}
×
2424
        newPermsTester.setRoutePermissions(newPerms)
×
2425

×
2426
        var (
×
2427
                _localSubs       [4096]*subscription
×
2428
                subsNeedSUB      = map[*client][]*subscription{}
×
2429
                subsNeedUNSUB    = map[*client][]*subscription{}
×
2430
                deleteRoutedSubs []*subscription
×
2431
        )
×
2432

×
2433
        getRouteForAccount := func(accName string, poolIdx int) *client {
×
2434
                for _, r := range routes {
×
2435
                        r.mu.Lock()
×
2436
                        ok := (poolIdx >= 0 && poolIdx == r.route.poolIdx) || (string(r.route.accName) == accName) || r.route.noPool
×
2437
                        r.mu.Unlock()
×
2438
                        if ok {
×
2439
                                return r
×
2440
                        }
×
2441
                }
2442
                return nil
×
2443
        }
2444

2445
        // First set the new permissions on all routes.
2446
        for _, route := range routes {
×
2447
                route.mu.Lock()
×
2448
                route.setRoutePermissions(newPerms)
×
2449
                route.mu.Unlock()
×
2450
        }
×
2451

2452
        // Then, go over all accounts and gather local subscriptions that need to be
2453
        // sent over as SUB or removed as UNSUB, and routed subscriptions that need
2454
        // to be dropped due to export permissions.
2455
        s.accounts.Range(func(_, v any) bool {
×
2456
                acc := v.(*Account)
×
2457
                acc.mu.RLock()
×
2458
                accName, sl, poolIdx := acc.Name, acc.sl, acc.routePoolIdx
×
2459
                acc.mu.RUnlock()
×
2460
                // Get the route handling this account. If no route or sublist, bail out.
×
2461
                route := getRouteForAccount(accName, poolIdx)
×
2462
                if route == nil || sl == nil {
×
2463
                        return true
×
2464
                }
×
2465
                localSubs := _localSubs[:0]
×
2466
                sl.localSubs(&localSubs, false)
×
2467

×
2468
                // Go through all local subscriptions
×
2469
                for _, sub := range localSubs {
×
2470
                        // Get all subs that can now be imported
×
2471
                        subj := string(sub.subject)
×
2472
                        couldImportThen := oldPermsTester.canImport(subj)
×
2473
                        canImportNow := newPermsTester.canImport(subj)
×
2474
                        if canImportNow {
×
2475
                                // If we could not before, then will need to send a SUB protocol.
×
2476
                                if !couldImportThen {
×
2477
                                        subsNeedSUB[route] = append(subsNeedSUB[route], sub)
×
2478
                                }
×
2479
                        } else if couldImportThen {
×
2480
                                // We were previously able to import this sub, but now
×
2481
                                // we can't so we need to send an UNSUB protocol
×
2482
                                subsNeedUNSUB[route] = append(subsNeedUNSUB[route], sub)
×
2483
                        }
×
2484
                }
2485
                deleteRoutedSubs = deleteRoutedSubs[:0]
×
2486
                route.mu.Lock()
×
2487
                pa, _, hasSubType := route.getRoutedSubKeyInfo()
×
2488
                for key, sub := range route.subs {
×
2489
                        // If this is not a pinned-account route, we need to get the
×
2490
                        // account name from the key to see if we collect this sub.
×
2491
                        if !pa {
×
2492
                                if an := getAccNameFromRoutedSubKey(sub, key, hasSubType); an != accName {
×
2493
                                        continue
×
2494
                                }
2495
                        }
2496
                        // If we can't export, we need to drop the subscriptions that
2497
                        // we have on behalf of this route.
2498
                        // Need to make a string cast here since canExport call sl.Match()
2499
                        subj := string(sub.subject)
×
2500
                        if !route.canExport(subj) {
×
2501
                                // We can use bytesToString() here.
×
2502
                                delete(route.subs, bytesToString(sub.sid))
×
2503
                                deleteRoutedSubs = append(deleteRoutedSubs, sub)
×
2504
                        }
×
2505
                }
2506
                route.mu.Unlock()
×
2507
                // Remove as a batch all the subs that we have removed from each route.
×
2508
                sl.RemoveBatch(deleteRoutedSubs)
×
2509
                return true
×
2510
        })
2511

2512
        // Send an update INFO, which will allow remote server to show
2513
        // our current route config in monitoring and resend subscriptions
2514
        // that we now possibly allow with a change of Export permissions.
2515
        for _, route := range routes {
×
2516
                route.mu.Lock()
×
2517
                route.enqueueProto(infoJSON)
×
2518
                // Now send SUB and UNSUB protocols as needed.
×
2519
                if subs, ok := subsNeedSUB[route]; ok && len(subs) > 0 {
×
2520
                        route.sendRouteSubProtos(subs, false, nil)
×
2521
                }
×
2522
                if unsubs, ok := subsNeedUNSUB[route]; ok && len(unsubs) > 0 {
×
2523
                        route.sendRouteUnSubProtos(unsubs, false, nil)
×
2524
                }
×
2525
                route.mu.Unlock()
×
2526
        }
2527
}
2528

2529
func (s *Server) reloadClusterPoolAndAccounts(co *clusterOption, opts *Options) {
17✔
2530
        s.mu.Lock()
17✔
2531
        // Prevent adding new routes until we are ready to do so.
17✔
2532
        s.routesReject = true
17✔
2533
        var ch chan struct{}
17✔
2534
        // For accounts that have been added to the list of dedicated routes,
17✔
2535
        // send a protocol to their current assigned routes to allow the
17✔
2536
        // other side to prepare for the changes.
17✔
2537
        if len(co.accsAdded) > 0 {
18✔
2538
                protosSent := 0
1✔
2539
                s.accAddedReqID = nuid.Next()
1✔
2540
                for _, an := range co.accsAdded {
2✔
2541
                        if s.accRoutes == nil {
2✔
2542
                                s.accRoutes = make(map[string]map[string]*client)
1✔
2543
                        }
1✔
2544
                        // In case a config reload was first done on another server,
2545
                        // we may have already switched this account to a dedicated route.
2546
                        // But we still want to send the protocol over the routes that
2547
                        // would have otherwise handled it.
2548
                        if _, ok := s.accRoutes[an]; !ok {
2✔
2549
                                s.accRoutes[an] = make(map[string]*client)
1✔
2550
                        }
1✔
2551
                        if a, ok := s.accounts.Load(an); ok {
2✔
2552
                                acc := a.(*Account)
1✔
2553
                                acc.mu.Lock()
1✔
2554
                                sl := acc.sl
1✔
2555
                                // Get the current route pool index before calling setRouteInfo.
1✔
2556
                                rpi := acc.routePoolIdx
1✔
2557
                                // Switch to per-account route if not already done.
1✔
2558
                                if rpi >= 0 {
2✔
2559
                                        s.setRouteInfo(acc)
1✔
2560
                                } else {
1✔
2561
                                        // If it was transitioning, make sure we set it to the state
×
2562
                                        // that indicates that it has a dedicated route
×
2563
                                        if rpi == accTransitioningToDedicatedRoute {
×
2564
                                                acc.routePoolIdx = accDedicatedRoute
×
2565
                                        }
×
2566
                                        // Otherwise get the route pool index it would have been before
2567
                                        // the move so we can send the protocol to those routes.
2568
                                        rpi = computeRoutePoolIdx(s.routesPoolSize, acc.Name)
×
2569
                                }
2570
                                acc.mu.Unlock()
1✔
2571
                                // Generate the INFO protocol to send indicating that this account
1✔
2572
                                // is being moved to a dedicated route.
1✔
2573
                                ri := Info{
1✔
2574
                                        RoutePoolSize: s.routesPoolSize,
1✔
2575
                                        RouteAccount:  an,
1✔
2576
                                        RouteAccReqID: s.accAddedReqID,
1✔
2577
                                }
1✔
2578
                                proto := generateInfoJSON(&ri)
1✔
2579
                                // Since v2.11.0, we support remotes with a different pool size
1✔
2580
                                // (for rolling upgrades), so we need to use the remote route
1✔
2581
                                // pool index (based on the remote configured pool size) since
1✔
2582
                                // the remote subscriptions will be attached to the route at
1✔
2583
                                // that index, not at our account's route pool index. However,
1✔
2584
                                // we are going to send the protocol through the route that
1✔
2585
                                // handles this account from our pool size perspective (that
1✔
2586
                                // would be the route at index `rpi`).
1✔
2587
                                removeSubsAndSendProto := func(r *client, doSubs, doProto bool) {
1✔
2588
                                        r.mu.Lock()
×
2589
                                        defer r.mu.Unlock()
×
2590
                                        // Exclude routes to servers that don't support pooling.
×
2591
                                        if r.route.noPool {
×
2592
                                                return
×
2593
                                        }
×
2594
                                        if doSubs {
×
2595
                                                if subs := r.removeRemoteSubsForAcc(an); len(subs) > 0 {
×
2596
                                                        sl.RemoveBatch(subs)
×
2597
                                                }
×
2598
                                        }
2599
                                        if doProto {
×
2600
                                                r.enqueueProto(proto)
×
2601
                                                protosSent++
×
2602
                                        }
×
2603
                                }
2604
                                for remote, conns := range s.routes {
1✔
2605
                                        r := conns[rpi]
×
2606
                                        // The route connection at this index is currently not up,
×
2607
                                        // so we won't be able to send the protocol, so move to the
×
2608
                                        // next remote.
×
2609
                                        if r == nil {
×
2610
                                                continue
×
2611
                                        }
2612
                                        doSubs := true
×
2613
                                        // Check the remote's route pool size and if different than
×
2614
                                        // ours, remove the subs on that other route.
×
2615
                                        remotePoolSize, ok := s.remoteRoutePoolSize[remote]
×
2616
                                        if ok && remotePoolSize != s.routesPoolSize {
×
2617
                                                // This is the remote's route pool index for this account
×
2618
                                                rrpi := computeRoutePoolIdx(remotePoolSize, an)
×
2619
                                                if rr := conns[rrpi]; rr != nil {
×
2620
                                                        removeSubsAndSendProto(rr, true, false)
×
2621
                                                        // Indicate that we have already remove the subs.
×
2622
                                                        doSubs = false
×
2623
                                                }
×
2624
                                        }
2625
                                        // Now send the protocol from the route that handles the
2626
                                        // account from this server perspective.
2627
                                        removeSubsAndSendProto(r, doSubs, true)
×
2628
                                }
2629
                        }
2630
                }
2631
                if protosSent > 0 {
1✔
2632
                        s.accAddedCh = make(chan struct{}, protosSent)
×
2633
                        ch = s.accAddedCh
×
2634
                }
×
2635
        }
2636
        // Collect routes that need to be closed.
2637
        routes := make(map[*client]struct{})
17✔
2638
        // Collect the per-account routes that need to be closed.
17✔
2639
        if len(co.accsRemoved) > 0 {
17✔
2640
                for _, an := range co.accsRemoved {
×
2641
                        if remotes, ok := s.accRoutes[an]; ok && remotes != nil {
×
2642
                                for _, r := range remotes {
×
2643
                                        if r != nil {
×
2644
                                                r.setNoReconnect()
×
2645
                                                routes[r] = struct{}{}
×
2646
                                        }
×
2647
                                }
2648
                        }
2649
                }
2650
        }
2651
        // If the pool size has changed, we need to close all pooled routes.
2652
        if co.poolSizeChanged {
34✔
2653
                s.forEachNonPerAccountRoute(func(r *client) {
17✔
2654
                        routes[r] = struct{}{}
×
2655
                })
×
2656
        }
2657
        // If there are routes to close, we need to release the server lock.
2658
        // Same if we need to wait on responses from the remotes when
2659
        // processing new per-account routes.
2660
        if len(routes) > 0 || len(ch) > 0 {
17✔
2661
                s.mu.Unlock()
×
2662

×
2663
                for done := false; !done && len(ch) > 0; {
×
2664
                        select {
×
2665
                        case <-ch:
×
2666
                        case <-time.After(2 * time.Second):
×
2667
                                s.Warnf("Timed out waiting for confirmation from all routes regarding per-account routes changes")
×
2668
                                done = true
×
2669
                        }
2670
                }
2671

2672
                for r := range routes {
×
2673
                        r.closeConnection(RouteRemoved)
×
2674
                }
×
2675

2676
                s.mu.Lock()
×
2677
        }
2678
        // Clear the accAddedCh/ReqID fields in case they were set.
2679
        s.accAddedReqID, s.accAddedCh = _EMPTY_, nil
17✔
2680
        // Now that per-account routes that needed to be closed are closed,
17✔
2681
        // remove them from s.accRoutes. Doing so before would prevent
17✔
2682
        // removeRoute() to do proper cleanup because the route would not
17✔
2683
        // be found in s.accRoutes.
17✔
2684
        for _, an := range co.accsRemoved {
17✔
2685
                delete(s.accRoutes, an)
×
2686
                // Do not lookup and call setRouteInfo() on the accounts here.
×
2687
                // We need first to set the new s.routesPoolSize value and
×
2688
                // anyway, there is no need to do here if the pool size has
×
2689
                // changed (since it will be called for all accounts).
×
2690
        }
×
2691
        // We have already added the accounts to s.accRoutes that needed to
2692
        // be added.
2693

2694
        // We should always have at least the system account with a dedicated route,
2695
        // but in case we have a configuration that disables pooling and without
2696
        // a system account, possibly set the accRoutes to nil.
2697
        if len(opts.Cluster.PinnedAccounts) == 0 {
33✔
2698
                s.accRoutes = nil
16✔
2699
        }
16✔
2700
        // Now deal with pool size updates.
2701
        if ps := opts.Cluster.PoolSize; ps > 0 {
18✔
2702
                s.routesPoolSize = ps
1✔
2703
                s.routeInfo.RoutePoolSize = ps
1✔
2704
        } else {
17✔
2705
                s.routesPoolSize = 1
16✔
2706
                s.routeInfo.RoutePoolSize = 0
16✔
2707
        }
16✔
2708
        // If the pool size has changed, we need to recompute all accounts' route
2709
        // pool index. Note that the added/removed accounts will be reset there
2710
        // too, but that's ok (we could use a map to exclude them, but not worth it).
2711
        if co.poolSizeChanged {
34✔
2712
                s.accounts.Range(func(_, v any) bool {
52✔
2713
                        acc := v.(*Account)
35✔
2714
                        acc.mu.Lock()
35✔
2715
                        s.setRouteInfo(acc)
35✔
2716
                        acc.mu.Unlock()
35✔
2717
                        return true
35✔
2718
                })
35✔
2719
        } else if len(co.accsRemoved) > 0 {
×
2720
                // For accounts that no longer have a dedicated route, we need to send
×
2721
                // the subsriptions on the existing pooled routes for those accounts.
×
2722
                for _, an := range co.accsRemoved {
×
2723
                        if a, ok := s.accounts.Load(an); ok {
×
2724
                                acc := a.(*Account)
×
2725
                                acc.mu.Lock()
×
2726
                                // First call this which will assign a new route pool index.
×
2727
                                s.setRouteInfo(acc)
×
2728
                                // Get the value so we can send the subscriptions interest
×
2729
                                // on all routes with this pool index.
×
2730
                                rpi := acc.routePoolIdx
×
2731
                                acc.mu.Unlock()
×
2732
                                s.forEachRouteIdx(rpi, func(r *client) bool {
×
2733
                                        // We have the guarantee that if the route exists, it
×
2734
                                        // is not a new one that would have been created when
×
2735
                                        // we released the server lock if some routes needed
×
2736
                                        // to be closed, because we have set s.routesReject
×
2737
                                        // to `true` at the top of this function.
×
2738
                                        s.sendSubsToRoute(r, rpi, an)
×
2739
                                        return true
×
2740
                                })
×
2741
                        }
2742
                }
2743
        }
2744
        // Allow routes to be accepted now.
2745
        s.routesReject = false
17✔
2746
        // If there is a pool size change or added accounts, solicit routes now.
17✔
2747
        if co.poolSizeChanged || len(co.accsAdded) > 0 {
34✔
2748
                s.solicitRoutes(opts.Routes, co.accsAdded)
17✔
2749
        }
17✔
2750
        s.mu.Unlock()
17✔
2751
}
2752

2753
// validateClusterOpts ensures the new ClusterOpts does not change some of the
2754
// fields that do not support reload.
2755
func validateClusterOpts(old, new ClusterOpts) error {
23✔
2756
        if old.Host != new.Host {
23✔
2757
                return fmt.Errorf("config reload not supported for cluster host: old=%s, new=%s",
×
2758
                        old.Host, new.Host)
×
2759
        }
×
2760
        if old.Port != new.Port {
23✔
2761
                return fmt.Errorf("config reload not supported for cluster port: old=%d, new=%d",
×
2762
                        old.Port, new.Port)
×
2763
        }
×
2764
        // Validate Cluster.Advertise syntax
2765
        if new.Advertise != "" {
24✔
2766
                if _, _, err := parseHostPort(new.Advertise, 0); err != nil {
1✔
2767
                        return fmt.Errorf("invalid Cluster.Advertise value of %s, err=%v", new.Advertise, err)
×
2768
                }
×
2769
        }
2770
        return nil
23✔
2771
}
2772

2773
// diffRoutes diffs the old routes and the new routes and returns the ones that
2774
// should be added and removed from the server.
2775
func diffRoutes(old, new []*url.URL) (add, remove []*url.URL) {
2✔
2776
        // Find routes to remove.
2✔
2777
removeLoop:
2✔
2778
        for _, oldRoute := range old {
5✔
2779
                for _, newRoute := range new {
5✔
2780
                        if urlsAreEqual(oldRoute, newRoute) {
3✔
2781
                                continue removeLoop
1✔
2782
                        }
2783
                }
2784
                remove = append(remove, oldRoute)
2✔
2785
        }
2786

2787
        // Find routes to add.
2788
addLoop:
2✔
2789
        for _, newRoute := range new {
4✔
2790
                for _, oldRoute := range old {
4✔
2791
                        if urlsAreEqual(oldRoute, newRoute) {
3✔
2792
                                continue addLoop
1✔
2793
                        }
2794
                }
2795
                add = append(add, newRoute)
1✔
2796
        }
2797

2798
        return add, remove
2✔
2799
}
2800

2801
func diffProxiesTrustedKeys(old, new []*ProxyConfig) ([]string, []string) {
1✔
2802
        var add []string
1✔
2803
        var del []string
1✔
2804
        // Both "old" and "new" lists should be small...
1✔
2805
        for _, op := range old {
3✔
2806
                if !slices.ContainsFunc(new, func(pc *ProxyConfig) bool {
6✔
2807
                        return pc.Key == op.Key
4✔
2808
                }) {
5✔
2809
                        del = append(del, op.Key)
1✔
2810
                }
1✔
2811
        }
2812
        for _, np := range new {
3✔
2813
                if !slices.ContainsFunc(old, func(pc *ProxyConfig) bool {
6✔
2814
                        return pc.Key == np.Key
4✔
2815
                }) {
5✔
2816
                        add = append(add, np.Key)
1✔
2817
                }
1✔
2818
        }
2819
        return add, del
1✔
2820
}
2821

2822
// This function calls `reflect.DeepEqual` on all public fields that are
2823
// not part of the `ignoreFields` list. If they are all equal, returns nil,
2824
// otherwise returns an error that will contain the name of the first field
2825
// that fails the comparison, along with its old and new values.
2826
func checkConfigsEqual(c1, c2 any, ignoreFields []string) error {
43✔
2827
        oldConfig := reflect.ValueOf(c1).Elem()
43✔
2828
        newConfig := reflect.ValueOf(c2).Elem()
43✔
2829
        for i := 0; i < oldConfig.NumField(); i++ {
1,207✔
2830
                field := oldConfig.Type().Field(i)
1,164✔
2831
                // field.PkgPath is empty for exported fields, and is not for unexported ones.
1,164✔
2832
                // We skip the unexported fields.
1,164✔
2833
                if field.PkgPath != _EMPTY_ {
1,306✔
2834
                        continue
142✔
2835
                }
2836
                // If it is in the set of fields to ignore, move to the next.
2837
                // We expect the number of ignore fields to be small.
2838
                var ignored bool
1,022✔
2839
                for _, f := range ignoreFields {
6,329✔
2840
                        if f == field.Name {
5,555✔
2841
                                ignored = true
248✔
2842
                                break
248✔
2843
                        }
2844
                }
2845
                if ignored {
1,270✔
2846
                        continue
248✔
2847
                }
2848
                oldValue := oldConfig.Field(i).Interface()
774✔
2849
                newValue := newConfig.Field(i).Interface()
774✔
2850
                if !reflect.DeepEqual(oldValue, newValue) {
774✔
2851
                        return fmt.Errorf("field %q: old=%v, new=%v", field.Name, oldValue, newValue)
×
2852
                }
×
2853
        }
2854
        return nil
43✔
2855
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc