• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 8103893093

29 Feb 2024 11:13PM UTC coverage: 80.12% (-0.3%) from 80.397%
8103893093

push

github

palkan
fix(logs): truncate long values, handle bytes, improve attrs

129 of 178 new or added lines in 11 files covered. (72.47%)

27 existing lines in 5 files now uncovered.

7331 of 9150 relevant lines covered (80.12%)

5520.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.66
/node/node.go
1
package node
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "runtime"
9
        "sync"
10
        "time"
11

12
        "github.com/anycable/anycable-go/broker"
13
        "github.com/anycable/anycable-go/common"
14
        "github.com/anycable/anycable-go/hub"
15
        "github.com/anycable/anycable-go/logger"
16
        "github.com/anycable/anycable-go/metrics"
17
        "github.com/anycable/anycable-go/utils"
18
        "github.com/anycable/anycable-go/ws"
19
        "github.com/joomcode/errorx"
20
)
21

22
const (
23
        metricsGoroutines      = "goroutines_num"
24
        metricsMemSys          = "mem_sys_bytes"
25
        metricsClientsNum      = "clients_num"
26
        metricsUniqClientsNum  = "clients_uniq_num"
27
        metricsStreamsNum      = "broadcast_streams_num"
28
        metricsDisconnectQueue = "disconnect_queue_size"
29

30
        metricsFailedAuths           = "failed_auths_total"
31
        metricsReceivedMsg           = "client_msg_total"
32
        metricsFailedCommandReceived = "failed_client_msg_total"
33
        metricsBroadcastMsg          = "broadcast_msg_total"
34
        metricsUnknownBroadcast      = "failed_broadcast_msg_total"
35

36
        metricsSentMsg    = "server_msg_total"
37
        metricsFailedSent = "failed_server_msg_total"
38

39
        metricsDataSent     = "data_sent_total"
40
        metricsDataReceived = "data_rcvd_total"
41
)
42

43
// AppNode describes a basic node interface
44
//
45
//go:generate mockery --name AppNode --output "../node_mocks" --outpkg node_mocks
46
type AppNode interface {
47
        HandlePubSub(msg []byte)
48
        LookupSession(id string) *Session
49
        Authenticate(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
50
        Authenticated(s *Session, identifiers string)
51
        Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
52
        Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
53
        Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
54
        Disconnect(s *Session) error
55
}
56

57
// Connection represents underlying connection
58
type Connection interface {
59
        Write(msg []byte, deadline time.Time) error
60
        WriteBinary(msg []byte, deadline time.Time) error
61
        Read() ([]byte, error)
62
        Close(code int, reason string)
63
}
64

65
// Node represents the whole application
66
type Node struct {
67
        id      string
68
        metrics metrics.Instrumenter
69

70
        config       *Config
71
        hub          *hub.Hub
72
        broker       broker.Broker
73
        controller   Controller
74
        disconnector Disconnector
75
        shutdownCh   chan struct{}
76
        shutdownMu   sync.Mutex
77
        closed       bool
78
        log          *slog.Logger
79
}
80

81
var _ AppNode = (*Node)(nil)
82

83
type NodeOption = func(*Node)
84

85
func WithController(c Controller) NodeOption {
82✔
86
        return func(n *Node) {
164✔
87
                n.controller = c
82✔
88
        }
82✔
89
}
90

91
func WithInstrumenter(i metrics.Instrumenter) NodeOption {
82✔
92
        return func(n *Node) {
164✔
93
                n.metrics = i
82✔
94
        }
82✔
95
}
96

97
func WithLogger(l *slog.Logger) NodeOption {
34✔
98
        return func(n *Node) {
68✔
99
                n.log = l.With("context", "node")
34✔
100
        }
34✔
101
}
102

103
func WithID(id string) NodeOption {
34✔
104
        return func(n *Node) {
68✔
105
                n.id = id
34✔
106
        }
34✔
107
}
108

109
// NewNode builds new node struct
110
func NewNode(config *Config, opts ...NodeOption) *Node {
82✔
111
        n := &Node{
82✔
112
                config:     config,
82✔
113
                shutdownCh: make(chan struct{}),
82✔
114
        }
82✔
115

82✔
116
        for _, opt := range opts {
314✔
117
                opt(n)
232✔
118
        }
232✔
119

120
        // Setup default logger
121
        if n.log == nil {
130✔
122
                n.log = slog.With("context", "node")
48✔
123
        }
48✔
124

125
        n.hub = hub.NewHub(config.HubGopoolSize, n.log)
82✔
126

82✔
127
        if n.metrics != nil {
164✔
128
                n.registerMetrics()
82✔
129
        }
82✔
130

131
        return n
82✔
132
}
133

134
// Start runs all the required goroutines
135
func (n *Node) Start() error {
42✔
136
        go n.hub.Run()
42✔
137
        go n.collectStats()
42✔
138

42✔
139
        return nil
42✔
140
}
42✔
141

142
// ID returns node identifier
143
func (n *Node) ID() string {
×
144
        return n.id
×
145
}
×
146

147
// SetDisconnector set disconnector for the node
148
func (n *Node) SetDisconnector(d Disconnector) {
82✔
149
        n.disconnector = d
82✔
150
}
82✔
151

152
func (n *Node) SetBroker(b broker.Broker) {
86✔
153
        n.broker = b
86✔
154
}
86✔
155

156
// Return current instrumenter for the node
157
func (n *Node) Instrumenter() metrics.Instrumenter {
×
158
        return n.metrics
×
159
}
×
160

161
// HandleCommand parses incoming message from client and
162
// execute the command (if recognized)
163
func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
3,433✔
164
        s.Log.Debug("incoming message", "data", msg)
3,433✔
165
        switch msg.Command {
3,433✔
166
        case "pong":
1✔
167
                s.handlePong(msg)
1✔
168
        case "subscribe":
2,351✔
169
                _, err = n.Subscribe(s, msg)
2,351✔
170
        case "unsubscribe":
27✔
171
                _, err = n.Unsubscribe(s, msg)
27✔
172
        case "message":
1,048✔
173
                _, err = n.Perform(s, msg)
1,048✔
174
        case "history":
6✔
175
                err = n.History(s, msg)
6✔
176
        default:
×
177
                err = fmt.Errorf("unknown command: %s", msg.Command)
×
178
        }
179

180
        return
3,433✔
181
}
182

183
// HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes
184
func (n *Node) HandleBroadcast(raw []byte) {
233✔
185
        msg, err := common.PubSubMessageFromJSON(raw)
233✔
186

233✔
187
        if err != nil {
233✔
188
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
NEW
189
                n.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(raw), "error", err)
×
190
                return
×
191
        }
×
192

193
        switch v := msg.(type) {
233✔
194
        case common.StreamMessage:
226✔
195
                n.log.Debug("handle broadcast message", "payload", &v)
226✔
196
                n.broker.HandleBroadcast(&v)
226✔
197
        case []*common.StreamMessage:
2✔
198
                n.log.Debug("handle batch-broadcast message", "payload", &v)
2✔
199
                for _, el := range v {
8✔
200
                        n.broker.HandleBroadcast(el)
6✔
201
                }
6✔
202
        case common.RemoteCommandMessage:
5✔
203
                n.log.Debug("handle remote command", "command", &v)
5✔
204
                n.broker.HandleCommand(&v)
5✔
205
        }
206
}
207

208
// HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)
209
func (n *Node) HandlePubSub(raw []byte) {
1,125✔
210
        msg, err := common.PubSubMessageFromJSON(raw)
1,125✔
211

1,125✔
212
        if err != nil {
1,125✔
213
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
NEW
214
                n.log.Warn("failed to parse pubsub message", "data", logger.CompactValue(raw), "error", err)
×
215
                return
×
216
        }
×
217

218
        switch v := msg.(type) {
1,125✔
219
        case common.StreamMessage:
1,117✔
220
                n.Broadcast(&v)
1,117✔
221
        case []*common.StreamMessage:
2✔
222
                for _, el := range v {
8✔
223
                        n.Broadcast(el)
6✔
224
                }
6✔
225
        case common.RemoteCommandMessage:
6✔
226
                n.ExecuteRemoteCommand(&v)
6✔
227
        }
228
}
229

230
func (n *Node) LookupSession(id string) *Session {
4✔
231
        hubSession := n.hub.FindByIdentifier(id)
4✔
232
        session, _ := hubSession.(*Session)
4✔
233
        return session
4✔
234
}
4✔
235

236
// Shutdown stops all services (hub, controller)
237
func (n *Node) Shutdown(ctx context.Context) (err error) {
42✔
238
        n.shutdownMu.Lock()
42✔
239
        if n.closed {
42✔
240
                n.shutdownMu.Unlock()
×
241
                return errors.New("already shut down")
×
242
        }
×
243

244
        close(n.shutdownCh)
42✔
245

42✔
246
        n.closed = true
42✔
247
        n.shutdownMu.Unlock()
42✔
248

42✔
249
        if n.hub != nil {
84✔
250
                active := n.hub.Size()
42✔
251

42✔
252
                if active > 0 {
60✔
253
                        n.log.Info("closing active connections", "num", active)
18✔
254
                        n.disconnectAll(ctx)
18✔
255
                }
18✔
256

257
                n.hub.Shutdown()
42✔
258
        }
259

260
        if n.disconnector != nil {
84✔
261
                err := n.disconnector.Shutdown(ctx)
42✔
262

42✔
263
                if err != nil {
42✔
264
                        n.log.Warn("failed to shutdown disconnector gracefully", "error", err)
×
265
                }
×
266
        }
267

268
        if n.controller != nil {
84✔
269
                err := n.controller.Shutdown()
42✔
270

42✔
271
                if err != nil {
44✔
272
                        n.log.Warn("failed to shutdown controller gracefully", "error", err)
2✔
273
                }
2✔
274
        }
275

276
        return
42✔
277
}
278

279
func (n *Node) IsShuttingDown() bool {
2,412✔
280
        n.shutdownMu.Lock()
2,412✔
281
        defer n.shutdownMu.Unlock()
2,412✔
282

2,412✔
283
        return n.closed
2,412✔
284
}
2,412✔
285

286
type AuthOptions struct {
287
        DisconnectOnFailure bool
288
}
289

290
func newAuthOptions(modifiers []AuthOption) *AuthOptions {
2,449✔
291
        base := &AuthOptions{
2,449✔
292
                DisconnectOnFailure: true,
2,449✔
293
        }
2,449✔
294

2,449✔
295
        for _, modifier := range modifiers {
2,449✔
296
                modifier(base)
×
297
        }
×
298

299
        return base
2,449✔
300
}
301

302
type AuthOption = func(*AuthOptions)
303

304
func WithDisconnectOnFailure(disconnect bool) AuthOption {
×
305
        return func(opts *AuthOptions) {
×
306
                opts.DisconnectOnFailure = disconnect
×
307
        }
×
308
}
309

310
// Authenticate calls controller to perform authentication.
311
// If authentication is successful, session is registered with a hub.
312
func (n *Node) Authenticate(s *Session, options ...AuthOption) (*common.ConnectResult, error) {
2,449✔
313
        opts := newAuthOptions(options)
2,449✔
314

2,449✔
315
        if s.IsResumeable() {
2,470✔
316
                restored := n.TryRestoreSession(s)
21✔
317

21✔
318
                if restored {
30✔
319
                        return &common.ConnectResult{Status: common.SUCCESS}, nil
9✔
320
                }
9✔
321
        }
322

323
        res, err := n.controller.Authenticate(s.GetID(), s.env)
2,440✔
324

2,440✔
325
        s.Log.Debug("controller authenticate", "response", res, "err", err)
2,440✔
326

2,440✔
327
        if err != nil {
2,442✔
328
                s.Disconnect("Auth Error", ws.CloseInternalServerErr)
2✔
329
                return nil, errorx.Decorate(err, "failed to authenticate")
2✔
330
        }
2✔
331

332
        if res.Status == common.SUCCESS {
4,847✔
333
                n.Authenticated(s, res.Identifier)
2,409✔
334
        } else {
2,438✔
335
                if res.Status == common.FAILURE {
58✔
336
                        n.metrics.CounterIncrement(metricsFailedAuths)
29✔
337
                }
29✔
338

339
                if opts.DisconnectOnFailure {
58✔
340
                        defer s.Disconnect("Auth Failed", ws.CloseNormalClosure)
29✔
341
                }
29✔
342
        }
343

344
        n.handleCallReply(s, res.ToCallResult())
2,438✔
345
        n.markDisconnectable(s, res.DisconnectInterest)
2,438✔
346

2,438✔
347
        if s.IsResumeable() {
2,450✔
348
                if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
12✔
349
                        s.Log.Error("failed to persist session in cache", "error", berr)
×
350
                }
×
351
        }
352

353
        return res, nil
2,438✔
354
}
355

356
// Mark session as authenticated and register it with a hub.
357
// Useful when you perform authentication manually, not using a controller.
358
func (n *Node) Authenticated(s *Session, ids string) {
2,409✔
359
        s.SetIdentifiers(ids)
2,409✔
360
        s.Connected = true
2,409✔
361
        n.hub.AddSession(s)
2,409✔
362
}
2,409✔
363

364
func (n *Node) TryRestoreSession(s *Session) (restored bool) {
21✔
365
        sid := s.GetID()
21✔
366
        prev_sid := s.PrevSid()
21✔
367

21✔
368
        if prev_sid == "" {
29✔
369
                return false
8✔
370
        }
8✔
371

372
        cached_session, err := n.broker.RestoreSession(prev_sid)
13✔
373

13✔
374
        if err != nil {
13✔
375
                s.Log.Error("failed to fetch session cache", "old_sid", prev_sid, "error", err)
×
376
                return false
×
377
        }
×
378

379
        if cached_session == nil {
17✔
380
                s.Log.Debug("session not found in cache", "old_sid", prev_sid)
4✔
381
                return false
4✔
382
        }
4✔
383

384
        err = s.RestoreFromCache(cached_session)
9✔
385

9✔
386
        if err != nil {
9✔
387
                s.Log.Error("failed to restore session from cache", "old_sid", prev_sid, "error", err)
×
388
                return false
×
389
        }
×
390

391
        s.Log.Debug("session restored", "old_sid", prev_sid)
9✔
392

9✔
393
        s.Connected = true
9✔
394
        n.hub.AddSession(s)
9✔
395

9✔
396
        // Resubscribe to streams
9✔
397
        for identifier, channel_streams := range s.subscriptions.channels {
22✔
398
                for stream := range channel_streams {
32✔
399
                        streamId := n.broker.Subscribe(stream)
19✔
400
                        n.hub.SubscribeSession(s, streamId, identifier)
19✔
401
                }
19✔
402
        }
403

404
        // Send welcome message
405
        s.Send(&common.Reply{
9✔
406
                Type:        common.WelcomeType,
9✔
407
                Sid:         sid,
9✔
408
                Restored:    true,
9✔
409
                RestoredIDs: utils.Keys(s.subscriptions.channels),
9✔
410
        })
9✔
411

9✔
412
        if s.IsResumeable() {
18✔
413
                if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
9✔
414
                        s.Log.Error("failed to persist session in cache", "error", berr)
×
415
                }
×
416
        }
417

418
        return true
9✔
419
}
420

421
// Subscribe subscribes session to a channel
422
func (n *Node) Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error) {
2,379✔
423
        s.smu.Lock()
2,379✔
424

2,379✔
425
        if ok := s.subscriptions.HasChannel(msg.Identifier); ok {
2,379✔
426
                s.smu.Unlock()
×
427
                return nil, fmt.Errorf("already subscribed to %s", msg.Identifier)
×
428
        }
×
429

430
        res, err := n.controller.Subscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
2,379✔
431

2,379✔
432
        s.Log.Debug("controller subscribe", "response", res, "err", err)
2,379✔
433

2,379✔
434
        var confirmed bool
2,379✔
435

2,379✔
436
        if err != nil { // nolint: gocritic
2,381✔
437
                if res == nil || res.Status == common.ERROR {
4✔
438
                        return nil, errorx.Decorate(err, "subscribe failed for %s", msg.Identifier)
2✔
439
                }
2✔
440
        } else if res.Status == common.SUCCESS {
4,743✔
441
                confirmed = true
2,366✔
442
                s.subscriptions.AddChannel(msg.Identifier)
2,366✔
443
                s.Log.Debug("subscribed", "identifier", msg.Identifier)
2,366✔
444
        } else {
2,377✔
445
                s.Log.Debug("subscription rejected", "identifier", msg.Identifier)
11✔
446
        }
11✔
447

448
        s.smu.Unlock()
2,377✔
449

2,377✔
450
        if res != nil {
4,754✔
451
                n.handleCommandReply(s, msg, res)
2,377✔
452
                n.markDisconnectable(s, res.DisconnectInterest)
2,377✔
453
        }
2,377✔
454

455
        if confirmed {
4,743✔
456
                if s.IsResumeable() {
2,377✔
457
                        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
11✔
458
                                s.Log.Error("failed to persist session in cache", "error", berr)
×
459
                        }
×
460
                }
461

462
                if msg.History.Since > 0 || msg.History.Streams != nil {
2,376✔
463
                        if err := n.History(s, msg); err != nil {
10✔
464
                                s.Log.Warn("couldn't retrieve history", "identifier", msg.Identifier, "error", err)
×
465
                        }
×
466

467
                        return res, nil
10✔
468
                }
469
        }
470

471
        return res, nil
2,367✔
472
}
473

474
// Unsubscribe unsubscribes session from a channel
475
func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error) {
31✔
476
        s.smu.Lock()
31✔
477

31✔
478
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
31✔
479
                s.smu.Unlock()
×
480
                return nil, fmt.Errorf("unknown subscription: %s", msg.Identifier)
×
481
        }
×
482

483
        res, err := n.controller.Unsubscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
31✔
484

31✔
485
        s.Log.Debug("controller unsubscribe", "response", res, "err", err)
31✔
486

31✔
487
        if err != nil {
33✔
488
                if res == nil || res.Status == common.ERROR {
4✔
489
                        return nil, errorx.Decorate(err, "failed to unsubscribe from %s", msg.Identifier)
2✔
490
                }
2✔
491
        } else {
29✔
492
                // Make sure to remove all streams subscriptions
29✔
493
                res.StopAllStreams = true
29✔
494

29✔
495
                s.subscriptions.RemoveChannel(msg.Identifier)
29✔
496

29✔
497
                s.Log.Debug("unsubscribed", "identifier", msg.Identifier)
29✔
498
        }
29✔
499

500
        s.smu.Unlock()
29✔
501

29✔
502
        if res != nil {
58✔
503
                n.handleCommandReply(s, msg, res)
29✔
504
        }
29✔
505

506
        if s.IsResumeable() {
29✔
507
                if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
×
508
                        s.Log.Error("failed to persist session in cache", "error", berr)
×
509
                }
×
510
        }
511

512
        return res, nil
29✔
513
}
514

515
// Perform executes client command
516
func (n *Node) Perform(s *Session, msg *common.Message) (*common.CommandResult, error) {
1,066✔
517
        s.smu.Lock()
1,066✔
518

1,066✔
519
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
1,066✔
520
                s.smu.Unlock()
×
521
                return nil, fmt.Errorf("unknown subscription %s", msg.Identifier)
×
522
        }
×
523

524
        s.smu.Unlock()
1,066✔
525

1,066✔
526
        data, ok := msg.Data.(string)
1,066✔
527

1,066✔
528
        if !ok {
1,066✔
529
                return nil, fmt.Errorf("perform data must be a string, got %v", msg.Data)
×
530
        }
×
531

532
        res, err := n.controller.Perform(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier, data)
1,066✔
533

1,066✔
534
        s.Log.Debug("controller perform", "response", res, "err", err)
1,066✔
535

1,066✔
536
        if err != nil {
1,068✔
537
                if res == nil || res.Status == common.ERROR {
4✔
538
                        return nil, errorx.Decorate(err, "perform failed for %s", msg.Identifier)
2✔
539
                }
2✔
540
        }
541

542
        if res != nil {
2,128✔
543
                if n.handleCommandReply(s, msg, res) {
1,092✔
544
                        if s.IsResumeable() {
28✔
545
                                if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
×
546
                                        s.Log.Error("failed to persist session in cache", "error", berr)
×
547
                                }
×
548
                        }
549
                }
550
        }
551

552
        return res, nil
1,064✔
553
}
554

555
// History fetches the stream history for the specified identifier
556
func (n *Node) History(s *Session, msg *common.Message) error {
26✔
557
        s.smu.Lock()
26✔
558

26✔
559
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
26✔
560
                s.smu.Unlock()
×
561
                return fmt.Errorf("unknown subscription %s", msg.Identifier)
×
562
        }
×
563

564
        subscriptionStreams := s.subscriptions.StreamsFor(msg.Identifier)
26✔
565

26✔
566
        s.smu.Unlock()
26✔
567

26✔
568
        history := msg.History
26✔
569

26✔
570
        if history.Since == 0 && history.Streams == nil {
26✔
571
                return fmt.Errorf("history request is missing, got %v", msg)
×
572
        }
×
573

574
        backlog, err := n.retreiveHistory(&history, subscriptionStreams)
26✔
575

26✔
576
        if err != nil {
28✔
577
                s.Send(&common.Reply{
2✔
578
                        Type:       common.HistoryRejectedType,
2✔
579
                        Identifier: msg.Identifier,
2✔
580
                })
2✔
581

2✔
582
                return err
2✔
583
        }
2✔
584

585
        for _, el := range backlog {
90✔
586
                s.Send(el.ToReplyFor(msg.Identifier))
66✔
587
        }
66✔
588

589
        s.Send(&common.Reply{
24✔
590
                Type:       common.HistoryConfirmedType,
24✔
591
                Identifier: msg.Identifier,
24✔
592
        })
24✔
593

24✔
594
        return nil
24✔
595
}
596

597
func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string) (backlog []common.StreamMessage, err error) {
26✔
598
        backlog = []common.StreamMessage{}
26✔
599

26✔
600
        for _, stream := range streams {
60✔
601
                if history.Streams != nil {
52✔
602
                        pos, ok := history.Streams[stream]
18✔
603

18✔
604
                        if ok {
28✔
605
                                streamBacklog, err := n.broker.HistoryFrom(stream, pos.Epoch, pos.Offset)
10✔
606

10✔
607
                                if err != nil {
10✔
608
                                        return nil, err
×
609
                                }
×
610

611
                                backlog = append(backlog, streamBacklog...)
10✔
612

10✔
613
                                continue
10✔
614
                        }
615
                }
616

617
                if history.Since > 0 {
44✔
618
                        streamBacklog, err := n.broker.HistorySince(stream, history.Since)
20✔
619

20✔
620
                        if err != nil {
22✔
621
                                return nil, err
2✔
622
                        }
2✔
623

624
                        backlog = append(backlog, streamBacklog...)
18✔
625
                }
626
        }
627

628
        return backlog, nil
24✔
629
}
630

631
// Broadcast message to stream (locally)
632
func (n *Node) Broadcast(msg *common.StreamMessage) {
1,350✔
633
        n.metrics.CounterIncrement(metricsBroadcastMsg)
1,350✔
634
        n.log.Debug("incoming broadcast message", "payload", msg)
1,350✔
635
        n.hub.BroadcastMessage(msg)
1,350✔
636
}
1,350✔
637

638
// Execute remote command (locally)
639
func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage) {
11✔
640
        // TODO: Add remote commands metrics
11✔
641
        // n.metrics.CounterIncrement(metricsRemoteCommandsMsg)
11✔
642
        switch msg.Command { // nolint:gocritic
11✔
643
        case "disconnect":
11✔
644
                dmsg, err := msg.ToRemoteDisconnectMessage()
11✔
645
                if err != nil {
11✔
NEW
646
                        n.log.Warn("failed to parse remote disconnect command", "data", msg, "error", err)
×
647
                        return
×
648
                }
×
649

650
                n.log.Debug("incoming remote command", "command", dmsg)
11✔
651

11✔
652
                n.RemoteDisconnect(dmsg)
11✔
653
        }
654
}
655

656
// Disconnect adds session to disconnector queue and unregister session from hub
657
func (n *Node) Disconnect(s *Session) error {
2,412✔
658
        if s.IsResumeable() {
2,431✔
659
                n.broker.FinishSession(s.GetID()) // nolint:errcheck
19✔
660
        }
19✔
661

662
        if n.IsShuttingDown() {
2,440✔
663
                if s.IsDisconnectable() {
40✔
664
                        return n.DisconnectNow(s)
12✔
665
                }
12✔
666
        } else {
2,384✔
667
                n.hub.RemoveSessionLater(s)
2,384✔
668

2,384✔
669
                if s.IsDisconnectable() {
4,754✔
670
                        return n.disconnector.Enqueue(s)
2,370✔
671
                }
2,370✔
672
        }
673

674
        return nil
30✔
675
}
676

677
// DisconnectNow execute disconnect on controller
678
func (n *Node) DisconnectNow(s *Session) error {
2,384✔
679
        sessionSubscriptions := s.subscriptions.Channels()
2,384✔
680

2,384✔
681
        ids := s.GetIdentifiers()
2,384✔
682

2,384✔
683
        s.Log.Debug("disconnect", "ids", ids, "url", s.env.URL, "headers", s.env.Headers, "subscriptions", sessionSubscriptions)
2,384✔
684

2,384✔
685
        err := n.controller.Disconnect(
2,384✔
686
                s.GetID(),
2,384✔
687
                s.env,
2,384✔
688
                ids,
2,384✔
689
                sessionSubscriptions,
2,384✔
690
        )
2,384✔
691

2,384✔
692
        if err != nil {
3,280✔
693
                s.Log.Error("controller disconnect failed", "error", err)
896✔
694
        }
896✔
695

696
        s.Log.Debug("controller disconnect succeeded")
2,384✔
697

2,384✔
698
        return err
2,384✔
699
}
700

701
// RemoteDisconnect find a session by identifier and closes it
702
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage) {
11✔
703
        n.metrics.CounterIncrement(metricsBroadcastMsg)
11✔
704
        n.log.Debug("incoming pubsub command", "data", msg)
11✔
705
        n.hub.RemoteDisconnect(msg)
11✔
706
}
11✔
707

708
// Interest is represented as a int; -1 indicates no interest, 0 indicates lack of such information,
709
// and 1 indicates interest.
710
func (n *Node) markDisconnectable(s *Session, interest int) {
4,815✔
711
        switch n.config.DisconnectMode {
4,815✔
712
        case "always":
3✔
713
                s.MarkDisconnectable(true)
3✔
714
        case "never":
34✔
715
                s.MarkDisconnectable(false)
34✔
716
        case "auto":
4,778✔
717
                s.MarkDisconnectable(interest >= 0)
4,778✔
718
        }
719
}
720

721
func (n *Node) Size() int {
×
722
        return n.hub.Size()
×
723
}
×
724

725
func transmit(s *Session, transmissions []string) {
5,857✔
726
        for _, msg := range transmissions {
11,736✔
727
                s.SendJSONTransmission(msg)
5,879✔
728
        }
5,879✔
729
}
730

731
func (n *Node) handleCommandReply(s *Session, msg *common.Message, reply *common.CommandResult) bool {
3,470✔
732
        // Returns true if any of the subscriptions/channel/connections state has changed
3,470✔
733
        isDirty := false
3,470✔
734

3,470✔
735
        if reply.Disconnect {
3,470✔
736
                defer s.Disconnect("Command Failed", ws.CloseAbnormalClosure)
×
737
        }
×
738

739
        if reply.StopAllStreams {
3,510✔
740
                n.hub.UnsubscribeSessionFromChannel(s, msg.Identifier)
40✔
741
                removedStreams := s.subscriptions.RemoveChannelStreams(msg.Identifier)
40✔
742

40✔
743
                for _, stream := range removedStreams {
40✔
744
                        isDirty = true
×
745
                        n.broker.Unsubscribe(stream)
×
746
                }
×
747

748
        } else if reply.StoppedStreams != nil {
3,443✔
749
                isDirty = true
13✔
750

13✔
751
                for _, stream := range reply.StoppedStreams {
26✔
752
                        streamId := n.broker.Unsubscribe(stream)
13✔
753
                        n.hub.UnsubscribeSession(s, streamId, msg.Identifier)
13✔
754
                        s.subscriptions.RemoveChannelStream(msg.Identifier, streamId)
13✔
755
                }
13✔
756
        }
757

758
        if reply.Streams != nil {
5,773✔
759
                isDirty = true
2,303✔
760

2,303✔
761
                for _, stream := range reply.Streams {
4,641✔
762
                        streamId := n.broker.Subscribe(stream)
2,338✔
763
                        n.hub.SubscribeSession(s, streamId, msg.Identifier)
2,338✔
764
                        s.subscriptions.AddChannelStream(msg.Identifier, streamId)
2,338✔
765
                }
2,338✔
766
        }
767

768
        if reply.IState != nil {
3,512✔
769
                isDirty = true
42✔
770

42✔
771
                s.smu.Lock()
42✔
772
                s.env.MergeChannelState(msg.Identifier, &reply.IState)
42✔
773
                s.smu.Unlock()
42✔
774
        }
42✔
775

776
        isConnectionDirty := n.handleCallReply(s, reply.ToCallResult())
3,470✔
777
        return isDirty || isConnectionDirty
3,470✔
778
}
779

780
func (n *Node) handleCallReply(s *Session, reply *common.CallResult) bool {
5,908✔
781
        isDirty := false
5,908✔
782

5,908✔
783
        if reply.CState != nil {
5,920✔
784
                isDirty = true
12✔
785

12✔
786
                s.smu.Lock()
12✔
787
                s.env.MergeConnectionState(&reply.CState)
12✔
788
                s.smu.Unlock()
12✔
789
        }
12✔
790

791
        if reply.Broadcasts != nil {
5,908✔
792
                for _, broadcast := range reply.Broadcasts {
×
793
                        n.broker.HandleBroadcast(broadcast)
×
794
                }
×
795
        }
796

797
        if reply.Transmissions != nil {
11,765✔
798
                transmit(s, reply.Transmissions)
5,857✔
799
        }
5,857✔
800

801
        return isDirty
5,908✔
802
}
803

804
// disconnectScheduler controls how quickly to disconnect sessions
805
type disconnectScheduler interface {
806
        // This method is called when a session is ready to be disconnected,
807
        // so it can block the operation or cancel it (by returning false).
808
        Continue() bool
809
}
810

811
type noopScheduler struct {
812
        ctx context.Context
813
}
814

815
func (s *noopScheduler) Continue() bool {
28✔
816
        return s.ctx.Err() == nil
28✔
817
}
28✔
818

819
func (n *Node) disconnectAll(ctx context.Context) {
18✔
820
        disconnectMessage := common.NewDisconnectMessage(common.SERVER_RESTART_REASON, true)
18✔
821

18✔
822
        // To speed up the process we disconnect all sessions in parallel using a pool of workers
18✔
823
        pool := utils.NewGoPool("disconnect", n.config.ShutdownDisconnectPoolSize)
18✔
824

18✔
825
        sessions := n.hub.Sessions()
18✔
826

18✔
827
        var scheduler disconnectScheduler // nolint:gosimple
18✔
828

18✔
829
        scheduler = &noopScheduler{ctx}
18✔
830

18✔
831
        var wg sync.WaitGroup
18✔
832

18✔
833
        wg.Add(len(sessions))
18✔
834

18✔
835
        for _, s := range sessions {
46✔
836
                s := s.(*Session)
28✔
837
                pool.Schedule(func() {
56✔
838
                        if scheduler.Continue() {
56✔
839
                                if s.IsConnected() {
56✔
840
                                        s.DisconnectWithMessage(disconnectMessage, common.SERVER_RESTART_REASON)
28✔
841
                                }
28✔
842
                                wg.Done()
28✔
843
                        }
844
                })
845
        }
846

847
        done := make(chan struct{})
18✔
848

18✔
849
        go func() {
36✔
850
                wg.Wait()
18✔
851
                close(done)
18✔
852
        }()
18✔
853

854
        select {
18✔
855
        case <-ctx.Done():
×
856
                n.log.Warn("terminated while disconnecting active sessions", "num", n.hub.Size())
×
857
        case <-done:
18✔
858
                n.log.Info("all active connections closed")
18✔
859
        }
860
}
861

862
func (n *Node) collectStats() {
42✔
863
        if n.config.StatsRefreshInterval == 0 {
42✔
864
                return
×
865
        }
×
866

867
        statsCollectInterval := time.Duration(n.config.StatsRefreshInterval) * time.Second
42✔
868

42✔
869
        for {
178✔
870
                select {
136✔
871
                case <-n.shutdownCh:
42✔
872
                        return
42✔
873
                case <-time.After(statsCollectInterval):
94✔
874
                        n.collectStatsOnce()
94✔
875
                }
876
        }
877
}
878

879
func (n *Node) collectStatsOnce() {
94✔
880
        n.metrics.GaugeSet(metricsGoroutines, uint64(runtime.NumGoroutine()))
94✔
881

94✔
882
        var m runtime.MemStats
94✔
883
        runtime.ReadMemStats(&m)
94✔
884
        n.metrics.GaugeSet(metricsMemSys, m.Sys)
94✔
885

94✔
886
        n.metrics.GaugeSet(metricsClientsNum, uint64(n.hub.Size()))
94✔
887
        n.metrics.GaugeSet(metricsUniqClientsNum, uint64(n.hub.UniqSize()))
94✔
888
        n.metrics.GaugeSet(metricsStreamsNum, uint64(n.hub.StreamsSize()))
94✔
889
        n.metrics.GaugeSet(metricsDisconnectQueue, uint64(n.disconnector.Size()))
94✔
890
}
94✔
891

892
func (n *Node) registerMetrics() {
82✔
893
        n.metrics.RegisterGauge(metricsGoroutines, "The number of Go routines")
82✔
894
        n.metrics.RegisterGauge(metricsMemSys, "The total bytes of memory obtained from the OS")
82✔
895

82✔
896
        n.metrics.RegisterGauge(metricsClientsNum, "The number of active clients")
82✔
897
        n.metrics.RegisterGauge(metricsUniqClientsNum, "The number of unique clients (with respect to connection identifiers)")
82✔
898
        n.metrics.RegisterGauge(metricsStreamsNum, "The number of active broadcasting streams")
82✔
899
        n.metrics.RegisterGauge(metricsDisconnectQueue, "The size of delayed disconnect")
82✔
900

82✔
901
        n.metrics.RegisterCounter(metricsFailedAuths, "The total number of failed authentication attempts")
82✔
902
        n.metrics.RegisterCounter(metricsReceivedMsg, "The total number of received messages from clients")
82✔
903
        n.metrics.RegisterCounter(metricsFailedCommandReceived, "The total number of unrecognized messages received from clients")
82✔
904
        n.metrics.RegisterCounter(metricsBroadcastMsg, "The total number of messages received through PubSub (for broadcast)")
82✔
905
        n.metrics.RegisterCounter(metricsUnknownBroadcast, "The total number of unrecognized messages received through PubSub")
82✔
906

82✔
907
        n.metrics.RegisterCounter(metricsSentMsg, "The total number of messages sent to clients")
82✔
908
        n.metrics.RegisterCounter(metricsFailedSent, "The total number of messages failed to send to clients")
82✔
909

82✔
910
        n.metrics.RegisterCounter(metricsDataSent, "The total amount of bytes sent to clients")
82✔
911
        n.metrics.RegisterCounter(metricsDataReceived, "The total amount of bytes received from clients")
82✔
912
}
82✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc