• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 4683248384

pending completion
4683248384

push

github

Vladimir Dementyev
tests: mruby printer integration test

3702 of 5186 relevant lines covered (71.38%)

384.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.15
/node/node.go
1
package node
2

3
import (
4
        "errors"
5
        "fmt"
6
        "runtime"
7
        "sync"
8
        "time"
9

10
        "github.com/anycable/anycable-go/common"
11
        "github.com/anycable/anycable-go/hub"
12
        "github.com/anycable/anycable-go/metrics"
13
        "github.com/anycable/anycable-go/ws"
14
        "github.com/apex/log"
15
)
16

17
const (
18
        metricsGoroutines      = "goroutines_num"
19
        metricsMemSys          = "mem_sys_bytes"
20
        metricsClientsNum      = "clients_num"
21
        metricsUniqClientsNum  = "clients_uniq_num"
22
        metricsStreamsNum      = "broadcast_streams_num"
23
        metricsDisconnectQueue = "disconnect_queue_size"
24

25
        metricsFailedAuths           = "failed_auths_total"
26
        metricsReceivedMsg           = "client_msg_total"
27
        metricsFailedCommandReceived = "failed_client_msg_total"
28
        metricsBroadcastMsg          = "broadcast_msg_total"
29
        metricsUnknownBroadcast      = "failed_broadcast_msg_total"
30

31
        metricsSentMsg    = "server_msg_total"
32
        metricsFailedSent = "failed_server_msg_total"
33

34
        metricsDataSent     = "data_sent_total"
35
        metricsDataReceived = "data_rcvd_total"
36
)
37

38
// AppNode describes a basic node interface
39
type AppNode interface {
40
        HandlePubSub(msg []byte)
41
        LookupSession(id string) *Session
42
        Authenticate(s *Session) (*common.ConnectResult, error)
43
        Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
44
        Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
45
        Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
46
        Disconnect(s *Session) error
47
}
48

49
type AppNodeExt interface {
50
        AppNode
51
        AuthenticateWithOptions(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
52
}
53

54
// Connection represents underlying connection
55
type Connection interface {
56
        Write(msg []byte, deadline time.Time) error
57
        WriteBinary(msg []byte, deadline time.Time) error
58
        Read() ([]byte, error)
59
        Close(code int, reason string)
60
}
61

62
// Node represents the whole application
63
type Node struct {
64
        metrics metrics.Instrumenter
65

66
        config       *Config
67
        hub          *hub.Hub
68
        controller   Controller
69
        disconnector Disconnector
70
        shutdownCh   chan struct{}
71
        shutdownMu   sync.Mutex
72
        closed       bool
73
        log          *log.Entry
74
}
75

76
var _ AppNode = (*Node)(nil)
77

78
// NewNode builds new node struct
79
func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node {
46✔
80
        node := &Node{
46✔
81
                metrics:    metrics,
46✔
82
                config:     config,
46✔
83
                controller: controller,
46✔
84
                shutdownCh: make(chan struct{}),
46✔
85
                log:        log.WithFields(log.Fields{"context": "node"}),
46✔
86
        }
46✔
87

46✔
88
        node.hub = hub.NewHub(config.HubGopoolSize)
46✔
89

46✔
90
        if metrics != nil {
92✔
91
                node.registerMetrics()
46✔
92
        }
46✔
93

94
        return node
46✔
95
}
96

97
// Start runs all the required goroutines
98
func (n *Node) Start() error {
10 all except 4683248384.1 and 4683248384.2 ✔
99
        go n.hub.Run()
10 all except 4683248384.1 and 4683248384.2 ✔
100
        go n.collectStats()
10 all except 4683248384.1 and 4683248384.2 ✔
101

10 all except 4683248384.1 and 4683248384.2 ✔
102
        return nil
10 all except 4683248384.1 and 4683248384.2 ✔
103
}
10 all except 4683248384.1 and 4683248384.2 ✔
104

105
// SetDisconnector set disconnector for the node
106
func (n *Node) SetDisconnector(d Disconnector) {
46✔
107
        n.disconnector = d
46✔
108
}
46✔
109

110
// HandleCommand parses incoming message from client and
111
// execute the command (if recognized)
112
func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
195 all except 4683248384.1 and 4683248384.2 ✔
113
        s.Log.Debugf("Incoming message: %v", msg)
195 all except 4683248384.1 and 4683248384.2 ✔
114
        switch msg.Command {
195 all except 4683248384.1 and 4683248384.2 ✔
115
        case "subscribe":
160 all except 4683248384.1 and 4683248384.2 ✔
116
                _, err = n.Subscribe(s, msg)
160 all except 4683248384.1 and 4683248384.2 ✔
117
        case "unsubscribe":
15 all except 4683248384.1 and 4683248384.2 ✔
118
                _, err = n.Unsubscribe(s, msg)
15 all except 4683248384.1 and 4683248384.2 ✔
119
        case "message":
20 all except 4683248384.1 and 4683248384.2 ✔
120
                _, err = n.Perform(s, msg)
20 all except 4683248384.1 and 4683248384.2 ✔
121
        default:
×
122
                err = fmt.Errorf("Unknown command: %s", msg.Command)
×
123
        }
124

125
        return
195 all except 4683248384.1 and 4683248384.2 ✔
126
}
127

128
// HandlePubSub parses incoming pubsub message and broadcast it
129
func (n *Node) HandlePubSub(raw []byte) {
144✔
130
        msg, err := common.PubSubMessageFromJSON(raw)
144✔
131

144✔
132
        if err != nil {
144✔
133
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
134
                n.log.Warnf("Failed to parse pubsub message '%s' with error: %v", raw, err)
×
135
                return
×
136
        }
×
137

138
        switch v := msg.(type) {
144✔
139
        case common.StreamMessage:
137✔
140
                n.Broadcast(&v)
137✔
141
        case common.RemoteDisconnectMessage:
7✔
142
                n.RemoteDisconnect(&v)
7✔
143
        }
144
}
145

146
func (n *Node) LookupSession(id string) *Session {
4 only 4683248384.1 and 4683248384.2 ✔
147
        hubSession := n.hub.FindByIdentifier(id)
4 only 4683248384.1 and 4683248384.2 ✔
148
        session, _ := hubSession.(*Session)
4 only 4683248384.1 and 4683248384.2 ✔
149
        return session
4 only 4683248384.1 and 4683248384.2 ✔
150
}
4 only 4683248384.1 and 4683248384.2 ✔
151

152
// Shutdown stops all services (hub, controller)
153
func (n *Node) Shutdown() (err error) {
10 all except 4683248384.1 and 4683248384.2 ✔
154
        n.shutdownMu.Lock()
10 all except 4683248384.1 and 4683248384.2 ✔
155
        if n.closed {
10 all except 4683248384.1 and 4683248384.2 ✔
156
                n.shutdownMu.Unlock()
×
157
                return errors.New("Already shut down")
×
158
        }
×
159

160
        close(n.shutdownCh)
10 all except 4683248384.1 and 4683248384.2 ✔
161

10 all except 4683248384.1 and 4683248384.2 ✔
162
        n.closed = true
10 all except 4683248384.1 and 4683248384.2 ✔
163
        n.shutdownMu.Unlock()
10 all except 4683248384.1 and 4683248384.2 ✔
164

10 all except 4683248384.1 and 4683248384.2 ✔
165
        if n.hub != nil {
20 all except 4683248384.1 and 4683248384.2 ✔
166
                n.hub.Shutdown()
10 all except 4683248384.1 and 4683248384.2 ✔
167

10 all except 4683248384.1 and 4683248384.2 ✔
168
                active := n.hub.Size()
10 all except 4683248384.1 and 4683248384.2 ✔
169

10 all except 4683248384.1 and 4683248384.2 ✔
170
                if active > 0 {
15 all except 4683248384.1 and 4683248384.2 ✔
171
                        n.log.Infof("Closing active connections: %d", active)
5 all except 4683248384.1 and 4683248384.2 ✔
172
                        disconnectMessage := common.NewDisconnectMessage(common.SERVER_RESTART_REASON, true)
5 all except 4683248384.1 and 4683248384.2 ✔
173

5 all except 4683248384.1 and 4683248384.2 ✔
174
                        n.hub.DisconnectSesssions(disconnectMessage, common.SERVER_RESTART_REASON)
5 all except 4683248384.1 and 4683248384.2 ✔
175

5 all except 4683248384.1 and 4683248384.2 ✔
176
                        n.log.Info("All active connections closed")
5 all except 4683248384.1 and 4683248384.2 ✔
177

5 all except 4683248384.1 and 4683248384.2 ✔
178
                        // Wait to make sure that disconnect queue is not empty
5 all except 4683248384.1 and 4683248384.2 ✔
179
                        time.Sleep(time.Second)
5 all except 4683248384.1 and 4683248384.2 ✔
180
                }
5 all except 4683248384.1 and 4683248384.2 ✔
181
        }
182

183
        if n.disconnector != nil {
20 all except 4683248384.1 and 4683248384.2 ✔
184
                err := n.disconnector.Shutdown()
10 all except 4683248384.1 and 4683248384.2 ✔
185

10 all except 4683248384.1 and 4683248384.2 ✔
186
                if err != nil {
10 all except 4683248384.1 and 4683248384.2 ✔
187
                        n.log.Warnf("%v", err)
×
188
                }
×
189
        }
190

191
        if n.controller != nil {
20 all except 4683248384.1 and 4683248384.2 ✔
192
                err := n.controller.Shutdown()
10 all except 4683248384.1 and 4683248384.2 ✔
193

10 all except 4683248384.1 and 4683248384.2 ✔
194
                if err != nil {
10 all except 4683248384.1 and 4683248384.2 ✔
195
                        n.log.Warnf("%v", err)
×
196
                }
×
197
        }
198

199
        return
10 all except 4683248384.1 and 4683248384.2 ✔
200
}
201

202
type authOptions struct {
203
        DisconnectOnFailure bool
204
}
205

206
func newAuthOptions(modifiers []AuthOption) *authOptions {
193✔
207
        base := &authOptions{
193✔
208
                DisconnectOnFailure: true,
193✔
209
        }
193✔
210

193✔
211
        for _, modifier := range modifiers {
193✔
212
                modifier(base)
×
213
        }
×
214

215
        return base
193✔
216
}
217

218
type AuthOption = func(*authOptions)
219

220
func WithDisconnectOnFailure(disconnect bool) AuthOption {
×
221
        return func(opts *authOptions) {
×
222
                opts.DisconnectOnFailure = disconnect
×
223
        }
×
224
}
225

226
// Authenticate calls controller to perform authentication.
227
// If authentication is successful, session is registered with a hub.
228
func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error) {
193✔
229
        return n.AuthenticateWithOptions(s)
193✔
230
}
193✔
231

232
// AuthenticateWithOptions provides more control on how authentication is performed.
233
func (n *Node) AuthenticateWithOptions(s *Session, options ...AuthOption) (res *common.ConnectResult, err error) {
193✔
234
        opts := newAuthOptions(options)
193✔
235

193✔
236
        res, err = n.controller.Authenticate(s.GetID(), s.env)
193✔
237

193✔
238
        if err != nil {
195✔
239
                s.Disconnect("Auth Error", ws.CloseInternalServerErr)
2 only 4683248384.1 and 4683248384.2 ✔
240
                return
2 only 4683248384.1 and 4683248384.2 ✔
241
        }
2 only 4683248384.1 and 4683248384.2 ✔
242

243
        if res.Status == common.SUCCESS {
365✔
244
                s.SetIdentifiers(res.Identifier)
174✔
245
                s.Connected = true
174✔
246

174✔
247
                n.hub.AddSession(s)
174✔
248
        } else {
191✔
249
                if res.Status == common.FAILURE {
34✔
250
                        n.metrics.CounterIncrement(metricsFailedAuths)
17✔
251
                }
17✔
252

253
                if opts.DisconnectOnFailure {
34✔
254
                        defer s.Disconnect("Auth Failed", ws.CloseNormalClosure)
17✔
255
                }
17✔
256
        }
257

258
        n.handleCallReply(s, res.ToCallResult())
191✔
259

191✔
260
        return
191✔
261
}
262

263
// Subscribe subscribes session to a channel
264
func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
168✔
265
        s.smu.Lock()
168✔
266

168✔
267
        if ok := s.subscriptions.HasChannel(msg.Identifier); ok {
168✔
268
                s.smu.Unlock()
×
269
                err = fmt.Errorf("Already subscribed to %s", msg.Identifier)
×
270
                return
×
271
        }
×
272

273
        res, err = n.controller.Subscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
168✔
274

168✔
275
        if err != nil {
170✔
276
                if res == nil || res.Status == common.ERROR {
4 only 4683248384.1 and 4683248384.2 ✔
277
                        s.Log.Errorf("Subscribe error: %v", err)
2 only 4683248384.1 and 4683248384.2 ✔
278
                }
2 only 4683248384.1 and 4683248384.2 ✔
279
        } else {
166✔
280
                s.subscriptions.AddChannel(msg.Identifier)
166✔
281
                s.Log.Debugf("Subscribed to channel: %s", msg.Identifier)
166✔
282
        }
166✔
283

284
        s.smu.Unlock()
168✔
285

168✔
286
        if res != nil {
334✔
287
                n.handleCommandReply(s, msg, res)
166✔
288
        }
166✔
289

290
        return
168✔
291
}
292

293
// Unsubscribe unsubscribes session from a channel
294
func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
19✔
295
        s.smu.Lock()
19✔
296

19✔
297
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
19✔
298
                s.smu.Unlock()
×
299
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
300
                return
×
301
        }
×
302

303
        res, err = n.controller.Unsubscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
19✔
304

19✔
305
        if err != nil {
21✔
306
                if res == nil || res.Status == common.ERROR {
4 only 4683248384.1 and 4683248384.2 ✔
307
                        s.Log.Errorf("Unsubscribe error: %v", err)
2 only 4683248384.1 and 4683248384.2 ✔
308
                }
2 only 4683248384.1 and 4683248384.2 ✔
309
        } else {
17✔
310
                // Make sure to remove all streams subscriptions
17✔
311
                res.StopAllStreams = true
17✔
312

17✔
313
                s.subscriptions.RemoveChannel(msg.Identifier)
17✔
314

17✔
315
                s.Log.Debugf("Unsubscribed from channel: %s", msg.Identifier)
17✔
316
        }
17✔
317

318
        s.smu.Unlock()
19✔
319

19✔
320
        if res != nil {
36✔
321
                n.handleCommandReply(s, msg, res)
17✔
322
        }
17✔
323

324
        return
19✔
325
}
326

327
// Perform executes client command
328
func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
34✔
329
        s.smu.Lock()
34✔
330

34✔
331
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
34✔
332
                s.smu.Unlock()
×
333
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
334
                return
×
335
        }
×
336

337
        s.smu.Unlock()
34✔
338

34✔
339
        data, ok := msg.Data.(string)
34✔
340

34✔
341
        if !ok {
34✔
342
                err = fmt.Errorf("Perform data must be a string, got %v", msg.Data)
×
343
                return
×
344
        }
×
345

346
        res, err = n.controller.Perform(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier, data)
34✔
347

34✔
348
        if err != nil {
36✔
349
                if res == nil || res.Status == common.ERROR {
4 only 4683248384.1 and 4683248384.2 ✔
350
                        s.Log.Errorf("Perform error: %v", err)
2 only 4683248384.1 and 4683248384.2 ✔
351
                }
2 only 4683248384.1 and 4683248384.2 ✔
352
        } else {
32✔
353
                s.Log.Debugf("Perform result: %v", res)
32✔
354
        }
32✔
355

356
        if res != nil {
66✔
357
                n.handleCommandReply(s, msg, res)
32✔
358
        }
32✔
359

360
        return
34✔
361
}
362

363
// Broadcast message to stream
364
func (n *Node) Broadcast(msg *common.StreamMessage) {
137✔
365
        n.metrics.CounterIncrement(metricsBroadcastMsg)
137✔
366
        n.log.Debugf("Incoming pubsub message: %v", msg)
137✔
367
        n.hub.BroadcastMessage(msg)
137✔
368
}
137✔
369

370
// Disconnect adds session to disconnector queue and unregister session from hub
371
func (n *Node) Disconnect(s *Session) error {
174✔
372
        n.hub.RemoveSessionLater(s)
174✔
373
        return n.disconnector.Enqueue(s)
174✔
374
}
174✔
375

376
// DisconnectNow execute disconnect on controller
377
func (n *Node) DisconnectNow(s *Session) error {
174✔
378
        sessionSubscriptions := s.subscriptions.Channels()
174✔
379

174✔
380
        ids := s.GetIdentifiers()
174✔
381

174✔
382
        s.Log.Debugf("Disconnect %s %s %v %v", ids, s.env.URL, s.env.Headers, sessionSubscriptions)
174✔
383

174✔
384
        err := n.controller.Disconnect(
174✔
385
                s.GetID(),
174✔
386
                s.env,
174✔
387
                ids,
174✔
388
                sessionSubscriptions,
174✔
389
        )
174✔
390

174✔
391
        if err != nil {
174✔
392
                s.Log.Errorf("Disconnect error: %v", err)
×
393
        }
×
394

395
        return err
174✔
396
}
397

398
// RemoteDisconnect find a session by identifier and closes it
399
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage) {
7✔
400
        n.metrics.CounterIncrement(metricsBroadcastMsg)
7✔
401
        n.log.Debugf("Incoming pubsub command: %v", msg)
7✔
402
        n.hub.RemoteDisconnect(msg)
7✔
403
}
7✔
404

405
func transmit(s *Session, transmissions []string) {
378✔
406
        for _, msg := range transmissions {
766✔
407
                s.SendJSONTransmission(msg)
388✔
408
        }
388✔
409
}
410

411
func (n *Node) handleCommandReply(s *Session, msg *common.Message, reply *common.CommandResult) bool {
215✔
412
        // Returns true if any of the subscriptions/channel/connections state has changed
215✔
413
        isDirty := false
215✔
414

215✔
415
        if reply.Disconnect {
215✔
416
                defer s.Disconnect("Command Failed", ws.CloseAbnormalClosure)
×
417
        }
×
418

419
        uid := s.GetID()
215✔
420

215✔
421
        if reply.StopAllStreams {
239✔
422
                n.hub.UnsubscribeSessionFromChannel(uid, msg.Identifier)
24✔
423
                s.subscriptions.RemoveChannelStreams(msg.Identifier)
24✔
424
        } else if reply.StoppedStreams != nil {
224✔
425
                isDirty = true
9✔
426

9✔
427
                for _, stream := range reply.StoppedStreams {
18✔
428
                        n.hub.UnsubscribeSession(uid, stream, msg.Identifier)
9✔
429
                        s.subscriptions.RemoveChannelStream(msg.Identifier, stream)
9✔
430
                }
9✔
431
        }
432

433
        if reply.Streams != nil {
339✔
434
                isDirty = true
124✔
435

124✔
436
                for _, stream := range reply.Streams {
263✔
437
                        n.hub.SubscribeSession(uid, stream, msg.Identifier)
139✔
438
                        s.subscriptions.AddChannelStream(msg.Identifier, stream)
139✔
439
                }
139✔
440
        }
441

442
        if reply.IState != nil {
237✔
443
                isDirty = true
22✔
444

22✔
445
                s.smu.Lock()
22✔
446
                s.env.MergeChannelState(msg.Identifier, &reply.IState)
22✔
447
                s.smu.Unlock()
22✔
448
        }
22✔
449

450
        isConnectionDirty := n.handleCallReply(s, reply.ToCallResult())
215✔
451
        return isDirty || isConnectionDirty
215✔
452
}
453

454
func (n *Node) handleCallReply(s *Session, reply *common.CallResult) bool {
406✔
455
        isDirty := false
406✔
456

406✔
457
        if reply.CState != nil {
410✔
458
                isDirty = true
4 only 4683248384.1 and 4683248384.2 ✔
459

4 only 4683248384.1 and 4683248384.2 ✔
460
                s.smu.Lock()
4 only 4683248384.1 and 4683248384.2 ✔
461
                s.env.MergeConnectionState(&reply.CState)
4 only 4683248384.1 and 4683248384.2 ✔
462
                s.smu.Unlock()
4 only 4683248384.1 and 4683248384.2 ✔
463
        }
4 only 4683248384.1 and 4683248384.2 ✔
464

465
        if reply.Broadcasts != nil {
406✔
466
                for _, broadcast := range reply.Broadcasts {
×
467
                        n.Broadcast(broadcast)
×
468
                }
×
469
        }
470

471
        if reply.Transmissions != nil {
784✔
472
                transmit(s, reply.Transmissions)
378✔
473
        }
378✔
474

475
        return isDirty
406✔
476
}
477

478
func (n *Node) collectStats() {
10 all except 4683248384.1 and 4683248384.2 ✔
479
        statsCollectInterval := time.Duration(n.config.StatsRefreshInterval) * time.Second
10 all except 4683248384.1 and 4683248384.2 ✔
480

10 all except 4683248384.1 and 4683248384.2 ✔
481
        for {
51 all except 4683248384.1 and 4683248384.2 ✔
482
                select {
41 all except 4683248384.1 and 4683248384.2 ✔
483
                case <-n.shutdownCh:
10 all except 4683248384.1 and 4683248384.2 ✔
484
                        return
10 all except 4683248384.1 and 4683248384.2 ✔
485
                case <-time.After(statsCollectInterval):
31 all except 4683248384.1 and 4683248384.2 ✔
486
                        n.collectStatsOnce()
31 all except 4683248384.1 and 4683248384.2 ✔
487
                }
488
        }
489
}
490

491
func (n *Node) collectStatsOnce() {
31 all except 4683248384.1 and 4683248384.2 ✔
492
        n.metrics.GaugeSet(metricsGoroutines, uint64(runtime.NumGoroutine()))
31 all except 4683248384.1 and 4683248384.2 ✔
493

31 all except 4683248384.1 and 4683248384.2 ✔
494
        var m runtime.MemStats
31 all except 4683248384.1 and 4683248384.2 ✔
495
        runtime.ReadMemStats(&m)
31 all except 4683248384.1 and 4683248384.2 ✔
496
        n.metrics.GaugeSet(metricsMemSys, m.Sys)
31 all except 4683248384.1 and 4683248384.2 ✔
497

31 all except 4683248384.1 and 4683248384.2 ✔
498
        n.metrics.GaugeSet(metricsClientsNum, uint64(n.hub.Size()))
31 all except 4683248384.1 and 4683248384.2 ✔
499
        n.metrics.GaugeSet(metricsUniqClientsNum, uint64(n.hub.UniqSize()))
31 all except 4683248384.1 and 4683248384.2 ✔
500
        n.metrics.GaugeSet(metricsStreamsNum, uint64(n.hub.StreamsSize()))
31 all except 4683248384.1 and 4683248384.2 ✔
501
        n.metrics.GaugeSet(metricsDisconnectQueue, uint64(n.disconnector.Size()))
31 all except 4683248384.1 and 4683248384.2 ✔
502
}
31 all except 4683248384.1 and 4683248384.2 ✔
503

504
func (n *Node) registerMetrics() {
46✔
505
        n.metrics.RegisterGauge(metricsGoroutines, "The number of Go routines")
46✔
506
        n.metrics.RegisterGauge(metricsMemSys, "The total bytes of memory obtained from the OS")
46✔
507

46✔
508
        n.metrics.RegisterGauge(metricsClientsNum, "The number of active clients")
46✔
509
        n.metrics.RegisterGauge(metricsUniqClientsNum, "The number of unique clients (with respect to connection identifiers)")
46✔
510
        n.metrics.RegisterGauge(metricsStreamsNum, "The number of active broadcasting streams")
46✔
511
        n.metrics.RegisterGauge(metricsDisconnectQueue, "The size of delayed disconnect")
46✔
512

46✔
513
        n.metrics.RegisterCounter(metricsFailedAuths, "The total number of failed authentication attempts")
46✔
514
        n.metrics.RegisterCounter(metricsReceivedMsg, "The total number of received messages from clients")
46✔
515
        n.metrics.RegisterCounter(metricsFailedCommandReceived, "The total number of unrecognized messages received from clients")
46✔
516
        n.metrics.RegisterCounter(metricsBroadcastMsg, "The total number of messages received through PubSub (for broadcast)")
46✔
517
        n.metrics.RegisterCounter(metricsUnknownBroadcast, "The total number of unrecognized messages received through PubSub")
46✔
518

46✔
519
        n.metrics.RegisterCounter(metricsSentMsg, "The total number of messages sent to clients")
46✔
520
        n.metrics.RegisterCounter(metricsFailedSent, "The total number of messages failed to send to clients")
46✔
521

46✔
522
        n.metrics.RegisterCounter(metricsDataSent, "The total amount of bytes sent to clients")
46✔
523
        n.metrics.RegisterCounter(metricsDataReceived, "The total amount of bytes received from clients")
46✔
524
}
46✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc