• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 4700525510

pending completion
4700525510

push

github

Vladimir Dementyev
fix: metrics shutdown race conditions

3 of 3 new or added lines in 1 file covered. (100.0%)

7 existing lines in 2 files now uncovered.

3810 of 5213 relevant lines covered (73.09%)

382.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.73
/node/node.go
1
package node
2

3
import (
4
        "errors"
5
        "fmt"
6
        "runtime"
7
        "sync"
8
        "time"
9

10
        "github.com/anycable/anycable-go/common"
11
        "github.com/anycable/anycable-go/hub"
12
        "github.com/anycable/anycable-go/metrics"
13
        "github.com/anycable/anycable-go/ws"
14
        "github.com/apex/log"
15
)
16

17
const (
18
        metricsGoroutines      = "goroutines_num"
19
        metricsMemSys          = "mem_sys_bytes"
20
        metricsClientsNum      = "clients_num"
21
        metricsUniqClientsNum  = "clients_uniq_num"
22
        metricsStreamsNum      = "broadcast_streams_num"
23
        metricsDisconnectQueue = "disconnect_queue_size"
24

25
        metricsFailedAuths           = "failed_auths_total"
26
        metricsReceivedMsg           = "client_msg_total"
27
        metricsFailedCommandReceived = "failed_client_msg_total"
28
        metricsBroadcastMsg          = "broadcast_msg_total"
29
        metricsUnknownBroadcast      = "failed_broadcast_msg_total"
30

31
        metricsSentMsg    = "server_msg_total"
32
        metricsFailedSent = "failed_server_msg_total"
33

34
        metricsDataSent     = "data_sent_total"
35
        metricsDataReceived = "data_rcvd_total"
36
)
37

38
// AppNode describes a basic node interface
39
type AppNode interface {
40
        HandlePubSub(msg []byte)
41
        LookupSession(id string) *Session
42
        Authenticate(s *Session) (*common.ConnectResult, error)
43
        Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
44
        Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
45
        Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
46
        Disconnect(s *Session) error
47
}
48

49
type AppNodeExt interface {
50
        AppNode
51
        AuthenticateWithOptions(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
52
}
53

54
// Connection represents underlying connection
55
type Connection interface {
56
        Write(msg []byte, deadline time.Time) error
57
        WriteBinary(msg []byte, deadline time.Time) error
58
        Read() ([]byte, error)
59
        Close(code int, reason string)
60
}
61

62
// Node represents the whole application
63
type Node struct {
64
        metrics metrics.Instrumenter
65

66
        config       *Config
67
        hub          *hub.Hub
68
        controller   Controller
69
        disconnector Disconnector
70
        shutdownCh   chan struct{}
71
        shutdownMu   sync.Mutex
72
        closed       bool
73
        log          *log.Entry
74
}
75

76
var _ AppNode = (*Node)(nil)
77

78
// NewNode builds new node struct
79
func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node {
47✔
80
        node := &Node{
47✔
81
                metrics:    metrics,
47✔
82
                config:     config,
47✔
83
                controller: controller,
47✔
84
                shutdownCh: make(chan struct{}),
47✔
85
                log:        log.WithFields(log.Fields{"context": "node"}),
47✔
86
        }
47✔
87

47✔
88
        node.hub = hub.NewHub(config.HubGopoolSize)
47✔
89

47✔
90
        if metrics != nil {
94✔
91
                node.registerMetrics()
47✔
92
        }
47✔
93

94
        return node
47✔
95
}
96

97
// Start runs all the required goroutines
98
func (n *Node) Start() error {
11 all except 4700525510.1 and 4700525510.2 ✔
99
        go n.hub.Run()
11 all except 4700525510.1 and 4700525510.2 ✔
100
        go n.collectStats()
11 all except 4700525510.1 and 4700525510.2 ✔
101

11 all except 4700525510.1 and 4700525510.2 ✔
102
        return nil
11 all except 4700525510.1 and 4700525510.2 ✔
103
}
11 all except 4700525510.1 and 4700525510.2 ✔
104

105
// SetDisconnector set disconnector for the node
106
func (n *Node) SetDisconnector(d Disconnector) {
47✔
107
        n.disconnector = d
47✔
108
}
47✔
109

110
// HandleCommand parses incoming message from client and
111
// execute the command (if recognized)
112
func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
195 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
113
        s.Log.Debugf("Incoming message: %v", msg)
195 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
114
        switch msg.Command {
195 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
115
        case "subscribe":
160 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
116
                _, err = n.Subscribe(s, msg)
160 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
117
        case "unsubscribe":
15 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
118
                _, err = n.Unsubscribe(s, msg)
15 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
119
        case "message":
20 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
120
                _, err = n.Perform(s, msg)
20 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
121
        default:
×
122
                err = fmt.Errorf("Unknown command: %s", msg.Command)
×
123
        }
124

125
        return
195 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
126
}
127

128
// HandlePubSub parses incoming pubsub message and broadcast it
129
func (n *Node) HandlePubSub(raw []byte) {
144 all except 4700525510.5 ✔
130
        msg, err := common.PubSubMessageFromJSON(raw)
144 all except 4700525510.5 ✔
131

144 all except 4700525510.5 ✔
132
        if err != nil {
144 all except 4700525510.5 ✔
133
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
134
                n.log.Warnf("Failed to parse pubsub message '%s' with error: %v", raw, err)
×
135
                return
×
136
        }
×
137

138
        switch v := msg.(type) {
144 all except 4700525510.5 ✔
139
        case common.StreamMessage:
137 all except 4700525510.5 ✔
140
                n.Broadcast(&v)
137 all except 4700525510.5 ✔
141
        case common.RemoteDisconnectMessage:
7 all except 4700525510.5 ✔
142
                n.RemoteDisconnect(&v)
7 all except 4700525510.5 ✔
143
        }
144
}
145

146
func (n *Node) LookupSession(id string) *Session {
4 only 4700525510.1 and 4700525510.2 ✔
147
        hubSession := n.hub.FindByIdentifier(id)
4 only 4700525510.1 and 4700525510.2 ✔
148
        session, _ := hubSession.(*Session)
4 only 4700525510.1 and 4700525510.2 ✔
149
        return session
4 only 4700525510.1 and 4700525510.2 ✔
150
}
4 only 4700525510.1 and 4700525510.2 ✔
151

152
// Shutdown stops all services (hub, controller)
153
func (n *Node) Shutdown() (err error) {
11 all except 4700525510.1 and 4700525510.2 ✔
154
        n.shutdownMu.Lock()
11 all except 4700525510.1 and 4700525510.2 ✔
155
        if n.closed {
11 all except 4700525510.1 and 4700525510.2 ✔
156
                n.shutdownMu.Unlock()
×
157
                return errors.New("Already shut down")
×
158
        }
×
159

160
        close(n.shutdownCh)
11 all except 4700525510.1 and 4700525510.2 ✔
161

11 all except 4700525510.1 and 4700525510.2 ✔
162
        n.closed = true
11 all except 4700525510.1 and 4700525510.2 ✔
163
        n.shutdownMu.Unlock()
11 all except 4700525510.1 and 4700525510.2 ✔
164

11 all except 4700525510.1 and 4700525510.2 ✔
165
        if n.hub != nil {
22 all except 4700525510.1 and 4700525510.2 ✔
166
                n.hub.Shutdown()
11 all except 4700525510.1 and 4700525510.2 ✔
167

11 all except 4700525510.1 and 4700525510.2 ✔
168
                active := n.hub.Size()
11 all except 4700525510.1 and 4700525510.2 ✔
169

11 all except 4700525510.1 and 4700525510.2 ✔
170
                if active > 0 {
16 all except 4700525510.1 and 4700525510.2 ✔
171
                        n.log.Infof("Closing active connections: %d", active)
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
172
                        disconnectMessage := common.NewDisconnectMessage(common.SERVER_RESTART_REASON, true)
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
173

5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
174
                        n.hub.DisconnectSesssions(disconnectMessage, common.SERVER_RESTART_REASON)
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
175

5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
176
                        n.log.Info("All active connections closed")
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
177

5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
178
                        // Wait to make sure that disconnect queue is not empty
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
179
                        time.Sleep(time.Second)
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
180
                }
5 all except 4700525510.1, 4700525510.2, and 4700525510.5 ✔
181
        }
182

183
        if n.disconnector != nil {
22 all except 4700525510.1 and 4700525510.2 ✔
184
                err := n.disconnector.Shutdown()
11 all except 4700525510.1 and 4700525510.2 ✔
185

11 all except 4700525510.1 and 4700525510.2 ✔
186
                if err != nil {
11 all except 4700525510.1 and 4700525510.2 ✔
187
                        n.log.Warnf("%v", err)
×
188
                }
×
189
        }
190

191
        if n.controller != nil {
22 all except 4700525510.1 and 4700525510.2 ✔
192
                err := n.controller.Shutdown()
11 all except 4700525510.1 and 4700525510.2 ✔
193

11 all except 4700525510.1 and 4700525510.2 ✔
194
                if err != nil {
13 all except 4700525510.1 and 4700525510.2 ✔
195
                        n.log.Warnf("%v", err)
2 only 4700525510.4 and 4700525510.7 ✔
196
                }
2 only 4700525510.4 and 4700525510.7 ✔
197
        }
198

199
        return
11 all except 4700525510.1 and 4700525510.2 ✔
200
}
201

202
type authOptions struct {
203
        DisconnectOnFailure bool
204
}
205

206
func newAuthOptions(modifiers []AuthOption) *authOptions {
195✔
207
        base := &authOptions{
195✔
208
                DisconnectOnFailure: true,
195✔
209
        }
195✔
210

195✔
211
        for _, modifier := range modifiers {
195✔
212
                modifier(base)
×
213
        }
×
214

215
        return base
195✔
216
}
217

218
type AuthOption = func(*authOptions)
219

220
func WithDisconnectOnFailure(disconnect bool) AuthOption {
×
221
        return func(opts *authOptions) {
×
222
                opts.DisconnectOnFailure = disconnect
×
223
        }
×
224
}
225

226
// Authenticate calls controller to perform authentication.
227
// If authentication is successful, session is registered with a hub.
228
func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error) {
195✔
229
        return n.AuthenticateWithOptions(s)
195✔
230
}
195✔
231

232
// AuthenticateWithOptions provides more control on how authentication is performed.
233
func (n *Node) AuthenticateWithOptions(s *Session, options ...AuthOption) (res *common.ConnectResult, err error) {
195✔
234
        opts := newAuthOptions(options)
195✔
235

195✔
236
        res, err = n.controller.Authenticate(s.GetID(), s.env)
195✔
237

195✔
238
        if err != nil {
197✔
239
                s.Disconnect("Auth Error", ws.CloseInternalServerErr)
2 only 4700525510.1 and 4700525510.2 ✔
240
                return
2 only 4700525510.1 and 4700525510.2 ✔
241
        }
2 only 4700525510.1 and 4700525510.2 ✔
242

243
        if res.Status == common.SUCCESS {
369✔
244
                s.SetIdentifiers(res.Identifier)
176✔
245
                s.Connected = true
176✔
246

176✔
247
                n.hub.AddSession(s)
176✔
248
        } else {
193✔
249
                if res.Status == common.FAILURE {
34 all except 4700525510.5 ✔
250
                        n.metrics.CounterIncrement(metricsFailedAuths)
17 all except 4700525510.5 ✔
251
                }
17 all except 4700525510.5 ✔
252

253
                if opts.DisconnectOnFailure {
34 all except 4700525510.5 ✔
254
                        defer s.Disconnect("Auth Failed", ws.CloseNormalClosure)
17 all except 4700525510.5 ✔
255
                }
17 all except 4700525510.5 ✔
256
        }
257

258
        n.handleCallReply(s, res.ToCallResult())
193✔
259

193✔
260
        return
193✔
261
}
262

263
// Subscribe subscribes session to a channel
264
func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
168 all except 4700525510.5 ✔
265
        s.smu.Lock()
168 all except 4700525510.5 ✔
266

168 all except 4700525510.5 ✔
267
        if ok := s.subscriptions.HasChannel(msg.Identifier); ok {
168 all except 4700525510.5 ✔
268
                s.smu.Unlock()
×
269
                err = fmt.Errorf("Already subscribed to %s", msg.Identifier)
×
270
                return
×
271
        }
×
272

273
        res, err = n.controller.Subscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
168 all except 4700525510.5 ✔
274

168 all except 4700525510.5 ✔
275
        if err != nil {
170 all except 4700525510.5 ✔
276
                if res == nil || res.Status == common.ERROR {
4 only 4700525510.1 and 4700525510.2 ✔
277
                        s.Log.Errorf("Subscribe error: %v", err)
2 only 4700525510.1 and 4700525510.2 ✔
278
                }
2 only 4700525510.1 and 4700525510.2 ✔
279
        } else {
166 all except 4700525510.5 ✔
280
                s.subscriptions.AddChannel(msg.Identifier)
166 all except 4700525510.5 ✔
281
                s.Log.Debugf("Subscribed to channel: %s", msg.Identifier)
166 all except 4700525510.5 ✔
282
        }
166 all except 4700525510.5 ✔
283

284
        s.smu.Unlock()
168 all except 4700525510.5 ✔
285

168 all except 4700525510.5 ✔
286
        if res != nil {
334 all except 4700525510.5 ✔
287
                n.handleCommandReply(s, msg, res)
166 all except 4700525510.5 ✔
288
        }
166 all except 4700525510.5 ✔
289

290
        return
168 all except 4700525510.5 ✔
291
}
292

293
// Unsubscribe unsubscribes session from a channel
294
func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
19 all except 4700525510.5 ✔
295
        s.smu.Lock()
19 all except 4700525510.5 ✔
296

19 all except 4700525510.5 ✔
297
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
19 all except 4700525510.5 ✔
298
                s.smu.Unlock()
×
299
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
300
                return
×
301
        }
×
302

303
        res, err = n.controller.Unsubscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
19 all except 4700525510.5 ✔
304

19 all except 4700525510.5 ✔
305
        if err != nil {
21 all except 4700525510.5 ✔
306
                if res == nil || res.Status == common.ERROR {
4 only 4700525510.1 and 4700525510.2 ✔
307
                        s.Log.Errorf("Unsubscribe error: %v", err)
2 only 4700525510.1 and 4700525510.2 ✔
308
                }
2 only 4700525510.1 and 4700525510.2 ✔
309
        } else {
17 all except 4700525510.5 ✔
310
                // Make sure to remove all streams subscriptions
17 all except 4700525510.5 ✔
311
                res.StopAllStreams = true
17 all except 4700525510.5 ✔
312

17 all except 4700525510.5 ✔
313
                s.subscriptions.RemoveChannel(msg.Identifier)
17 all except 4700525510.5 ✔
314

17 all except 4700525510.5 ✔
315
                s.Log.Debugf("Unsubscribed from channel: %s", msg.Identifier)
17 all except 4700525510.5 ✔
316
        }
17 all except 4700525510.5 ✔
317

318
        s.smu.Unlock()
19 all except 4700525510.5 ✔
319

19 all except 4700525510.5 ✔
320
        if res != nil {
36 all except 4700525510.5 ✔
321
                n.handleCommandReply(s, msg, res)
17 all except 4700525510.5 ✔
322
        }
17 all except 4700525510.5 ✔
323

324
        return
19 all except 4700525510.5 ✔
325
}
326

327
// Perform executes client command
328
func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
34 all except 4700525510.5 ✔
329
        s.smu.Lock()
34 all except 4700525510.5 ✔
330

34 all except 4700525510.5 ✔
331
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
34 all except 4700525510.5 ✔
332
                s.smu.Unlock()
×
333
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
334
                return
×
335
        }
×
336

337
        s.smu.Unlock()
34 all except 4700525510.5 ✔
338

34 all except 4700525510.5 ✔
339
        data, ok := msg.Data.(string)
34 all except 4700525510.5 ✔
340

34 all except 4700525510.5 ✔
341
        if !ok {
34 all except 4700525510.5 ✔
342
                err = fmt.Errorf("Perform data must be a string, got %v", msg.Data)
×
343
                return
×
344
        }
×
345

346
        res, err = n.controller.Perform(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier, data)
34 all except 4700525510.5 ✔
347

34 all except 4700525510.5 ✔
348
        if err != nil {
36 all except 4700525510.5 ✔
349
                if res == nil || res.Status == common.ERROR {
4 only 4700525510.1 and 4700525510.2 ✔
350
                        s.Log.Errorf("Perform error: %v", err)
2 only 4700525510.1 and 4700525510.2 ✔
351
                }
2 only 4700525510.1 and 4700525510.2 ✔
352
        } else {
32 all except 4700525510.5 ✔
353
                s.Log.Debugf("Perform result: %v", res)
32 all except 4700525510.5 ✔
354
        }
32 all except 4700525510.5 ✔
355

356
        if res != nil {
66 all except 4700525510.5 ✔
357
                n.handleCommandReply(s, msg, res)
32 all except 4700525510.5 ✔
358
        }
32 all except 4700525510.5 ✔
359

360
        return
34 all except 4700525510.5 ✔
361
}
362

363
// Broadcast message to stream
364
func (n *Node) Broadcast(msg *common.StreamMessage) {
137 all except 4700525510.5 ✔
365
        n.metrics.CounterIncrement(metricsBroadcastMsg)
137 all except 4700525510.5 ✔
366
        n.log.Debugf("Incoming pubsub message: %v", msg)
137 all except 4700525510.5 ✔
367
        n.hub.BroadcastMessage(msg)
137 all except 4700525510.5 ✔
368
}
137 all except 4700525510.5 ✔
369

370
// Disconnect adds session to disconnector queue and unregister session from hub
371
func (n *Node) Disconnect(s *Session) error {
176✔
372
        n.hub.RemoveSessionLater(s)
176✔
373
        return n.disconnector.Enqueue(s)
176✔
374
}
176✔
375

376
// DisconnectNow execute disconnect on controller
377
func (n *Node) DisconnectNow(s *Session) error {
176✔
378
        sessionSubscriptions := s.subscriptions.Channels()
176✔
379

176✔
380
        ids := s.GetIdentifiers()
176✔
381

176✔
382
        s.Log.Debugf("Disconnect %s %s %v %v", ids, s.env.URL, s.env.Headers, sessionSubscriptions)
176✔
383

176✔
384
        err := n.controller.Disconnect(
176✔
385
                s.GetID(),
176✔
386
                s.env,
176✔
387
                ids,
176✔
388
                sessionSubscriptions,
176✔
389
        )
176✔
390

176✔
391
        if err != nil {
176✔
UNCOV
392
                s.Log.Errorf("Disconnect error: %v", err)
×
UNCOV
393
        }
×
394

395
        return err
174✔
396
}
397

398
// RemoteDisconnect find a session by identifier and closes it
399
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage) {
7 all except 4700525510.5 ✔
400
        n.metrics.CounterIncrement(metricsBroadcastMsg)
7 all except 4700525510.5 ✔
401
        n.log.Debugf("Incoming pubsub command: %v", msg)
7 all except 4700525510.5 ✔
402
        n.hub.RemoteDisconnect(msg)
7 all except 4700525510.5 ✔
403
}
7 all except 4700525510.5 ✔
404

405
func transmit(s *Session, transmissions []string) {
380✔
406
        for _, msg := range transmissions {
770✔
407
                s.SendJSONTransmission(msg)
390✔
408
        }
390✔
409
}
410

411
func (n *Node) handleCommandReply(s *Session, msg *common.Message, reply *common.CommandResult) bool {
215 all except 4700525510.5 ✔
412
        // Returns true if any of the subscriptions/channel/connections state has changed
215 all except 4700525510.5 ✔
413
        isDirty := false
215 all except 4700525510.5 ✔
414

215 all except 4700525510.5 ✔
415
        if reply.Disconnect {
215 all except 4700525510.5 ✔
416
                defer s.Disconnect("Command Failed", ws.CloseAbnormalClosure)
×
417
        }
×
418

419
        uid := s.GetID()
215 all except 4700525510.5 ✔
420

215 all except 4700525510.5 ✔
421
        if reply.StopAllStreams {
239 all except 4700525510.5 ✔
422
                n.hub.UnsubscribeSessionFromChannel(uid, msg.Identifier)
24 all except 4700525510.5 ✔
423
                s.subscriptions.RemoveChannelStreams(msg.Identifier)
24 all except 4700525510.5 ✔
424
        } else if reply.StoppedStreams != nil {
224 all except 4700525510.5 ✔
425
                isDirty = true
9 all except 4700525510.5 ✔
426

9 all except 4700525510.5 ✔
427
                for _, stream := range reply.StoppedStreams {
18 all except 4700525510.5 ✔
428
                        n.hub.UnsubscribeSession(uid, stream, msg.Identifier)
9 all except 4700525510.5 ✔
429
                        s.subscriptions.RemoveChannelStream(msg.Identifier, stream)
9 all except 4700525510.5 ✔
430
                }
9 all except 4700525510.5 ✔
431
        }
432

433
        if reply.Streams != nil {
339 all except 4700525510.5 ✔
434
                isDirty = true
124 all except 4700525510.5 ✔
435

124 all except 4700525510.5 ✔
436
                for _, stream := range reply.Streams {
263 all except 4700525510.5 ✔
437
                        n.hub.SubscribeSession(uid, stream, msg.Identifier)
139 all except 4700525510.5 ✔
438
                        s.subscriptions.AddChannelStream(msg.Identifier, stream)
139 all except 4700525510.5 ✔
439
                }
139 all except 4700525510.5 ✔
440
        }
441

442
        if reply.IState != nil {
237 all except 4700525510.5 ✔
443
                isDirty = true
22 all except 4700525510.5 ✔
444

22 all except 4700525510.5 ✔
445
                s.smu.Lock()
22 all except 4700525510.5 ✔
446
                s.env.MergeChannelState(msg.Identifier, &reply.IState)
22 all except 4700525510.5 ✔
447
                s.smu.Unlock()
22 all except 4700525510.5 ✔
448
        }
22 all except 4700525510.5 ✔
449

450
        isConnectionDirty := n.handleCallReply(s, reply.ToCallResult())
215 all except 4700525510.5 ✔
451
        return isDirty || isConnectionDirty
215 all except 4700525510.5 ✔
452
}
453

454
func (n *Node) handleCallReply(s *Session, reply *common.CallResult) bool {
408✔
455
        isDirty := false
408✔
456

408✔
457
        if reply.CState != nil {
412✔
458
                isDirty = true
4 only 4700525510.1 and 4700525510.2 ✔
459

4 only 4700525510.1 and 4700525510.2 ✔
460
                s.smu.Lock()
4 only 4700525510.1 and 4700525510.2 ✔
461
                s.env.MergeConnectionState(&reply.CState)
4 only 4700525510.1 and 4700525510.2 ✔
462
                s.smu.Unlock()
4 only 4700525510.1 and 4700525510.2 ✔
463
        }
4 only 4700525510.1 and 4700525510.2 ✔
464

465
        if reply.Broadcasts != nil {
408✔
466
                for _, broadcast := range reply.Broadcasts {
×
467
                        n.Broadcast(broadcast)
×
468
                }
×
469
        }
470

471
        if reply.Transmissions != nil {
788✔
472
                transmit(s, reply.Transmissions)
380✔
473
        }
380✔
474

475
        return isDirty
408✔
476
}
477

478
func (n *Node) collectStats() {
11 all except 4700525510.1 and 4700525510.2 ✔
479
        statsCollectInterval := time.Duration(n.config.StatsRefreshInterval) * time.Second
11 all except 4700525510.1 and 4700525510.2 ✔
480

11 all except 4700525510.1 and 4700525510.2 ✔
481
        for {
56 all except 4700525510.1 and 4700525510.2 ✔
482
                select {
45 all except 4700525510.1 and 4700525510.2 ✔
483
                case <-n.shutdownCh:
11 all except 4700525510.1 and 4700525510.2 ✔
484
                        return
11 all except 4700525510.1 and 4700525510.2 ✔
485
                case <-time.After(statsCollectInterval):
34 all except 4700525510.1 and 4700525510.2 ✔
486
                        n.collectStatsOnce()
34 all except 4700525510.1 and 4700525510.2 ✔
487
                }
488
        }
489
}
490

491
func (n *Node) collectStatsOnce() {
34 all except 4700525510.1 and 4700525510.2 ✔
492
        n.metrics.GaugeSet(metricsGoroutines, uint64(runtime.NumGoroutine()))
34 all except 4700525510.1 and 4700525510.2 ✔
493

34 all except 4700525510.1 and 4700525510.2 ✔
494
        var m runtime.MemStats
34 all except 4700525510.1 and 4700525510.2 ✔
495
        runtime.ReadMemStats(&m)
34 all except 4700525510.1 and 4700525510.2 ✔
496
        n.metrics.GaugeSet(metricsMemSys, m.Sys)
34 all except 4700525510.1 and 4700525510.2 ✔
497

34 all except 4700525510.1 and 4700525510.2 ✔
498
        n.metrics.GaugeSet(metricsClientsNum, uint64(n.hub.Size()))
34 all except 4700525510.1 and 4700525510.2 ✔
499
        n.metrics.GaugeSet(metricsUniqClientsNum, uint64(n.hub.UniqSize()))
34 all except 4700525510.1 and 4700525510.2 ✔
500
        n.metrics.GaugeSet(metricsStreamsNum, uint64(n.hub.StreamsSize()))
34 all except 4700525510.1 and 4700525510.2 ✔
501
        n.metrics.GaugeSet(metricsDisconnectQueue, uint64(n.disconnector.Size()))
34 all except 4700525510.1 and 4700525510.2 ✔
502
}
34 all except 4700525510.1 and 4700525510.2 ✔
503

504
func (n *Node) registerMetrics() {
47✔
505
        n.metrics.RegisterGauge(metricsGoroutines, "The number of Go routines")
47✔
506
        n.metrics.RegisterGauge(metricsMemSys, "The total bytes of memory obtained from the OS")
47✔
507

47✔
508
        n.metrics.RegisterGauge(metricsClientsNum, "The number of active clients")
47✔
509
        n.metrics.RegisterGauge(metricsUniqClientsNum, "The number of unique clients (with respect to connection identifiers)")
47✔
510
        n.metrics.RegisterGauge(metricsStreamsNum, "The number of active broadcasting streams")
47✔
511
        n.metrics.RegisterGauge(metricsDisconnectQueue, "The size of delayed disconnect")
47✔
512

47✔
513
        n.metrics.RegisterCounter(metricsFailedAuths, "The total number of failed authentication attempts")
47✔
514
        n.metrics.RegisterCounter(metricsReceivedMsg, "The total number of received messages from clients")
47✔
515
        n.metrics.RegisterCounter(metricsFailedCommandReceived, "The total number of unrecognized messages received from clients")
47✔
516
        n.metrics.RegisterCounter(metricsBroadcastMsg, "The total number of messages received through PubSub (for broadcast)")
47✔
517
        n.metrics.RegisterCounter(metricsUnknownBroadcast, "The total number of unrecognized messages received through PubSub")
47✔
518

47✔
519
        n.metrics.RegisterCounter(metricsSentMsg, "The total number of messages sent to clients")
47✔
520
        n.metrics.RegisterCounter(metricsFailedSent, "The total number of messages failed to send to clients")
47✔
521

47✔
522
        n.metrics.RegisterCounter(metricsDataSent, "The total amount of bytes sent to clients")
47✔
523
        n.metrics.RegisterCounter(metricsDataReceived, "The total amount of bytes received from clients")
47✔
524
}
47✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc