• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 4737922678

pending completion
4737922678

push

github

Vladimir Dementyev
version: 1.4.0

1 of 1 new or added line in 1 file covered. (100.0%)

350 existing lines in 9 files now uncovered.

5018 of 6405 relevant lines covered (78.35%)

7424.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.94
/node/node.go
1
package node
2

3
import (
4
        "errors"
5
        "fmt"
6
        "runtime"
7
        "sync"
8
        "time"
9

10
        "github.com/anycable/anycable-go/broker"
11
        "github.com/anycable/anycable-go/common"
12
        "github.com/anycable/anycable-go/hub"
13
        "github.com/anycable/anycable-go/metrics"
14
        "github.com/anycable/anycable-go/utils"
15
        "github.com/anycable/anycable-go/ws"
16
        "github.com/apex/log"
17
)
18

19
const (
20
        metricsGoroutines      = "goroutines_num"
21
        metricsMemSys          = "mem_sys_bytes"
22
        metricsClientsNum      = "clients_num"
23
        metricsUniqClientsNum  = "clients_uniq_num"
24
        metricsStreamsNum      = "broadcast_streams_num"
25
        metricsDisconnectQueue = "disconnect_queue_size"
26

27
        metricsFailedAuths           = "failed_auths_total"
28
        metricsReceivedMsg           = "client_msg_total"
29
        metricsFailedCommandReceived = "failed_client_msg_total"
30
        metricsBroadcastMsg          = "broadcast_msg_total"
31
        metricsUnknownBroadcast      = "failed_broadcast_msg_total"
32

33
        metricsSentMsg    = "server_msg_total"
34
        metricsFailedSent = "failed_server_msg_total"
35

36
        metricsDataSent     = "data_sent_total"
37
        metricsDataReceived = "data_rcvd_total"
38
)
39

40
// AppNode describes a basic node interface
41
type AppNode interface {
42
        HandlePubSub(msg []byte)
43
        LookupSession(id string) *Session
44
        Authenticate(s *Session) (*common.ConnectResult, error)
45
        Subscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
46
        Unsubscribe(s *Session, msg *common.Message) (*common.CommandResult, error)
47
        Perform(s *Session, msg *common.Message) (*common.CommandResult, error)
48
        Disconnect(s *Session) error
49
}
50

51
type AppNodeExt interface {
52
        AppNode
53
        AuthenticateWithOptions(s *Session, opts ...AuthOption) (*common.ConnectResult, error)
54
}
55

56
// Connection represents underlying connection
57
type Connection interface {
58
        Write(msg []byte, deadline time.Time) error
59
        WriteBinary(msg []byte, deadline time.Time) error
60
        Read() ([]byte, error)
61
        Close(code int, reason string)
62
}
63

64
// Node represents the whole application
65
type Node struct {
66
        metrics metrics.Instrumenter
67

68
        config       *Config
69
        hub          *hub.Hub
70
        broker       broker.Broker
71
        controller   Controller
72
        disconnector Disconnector
73
        shutdownCh   chan struct{}
74
        shutdownMu   sync.Mutex
75
        closed       bool
76
        log          *log.Entry
77
}
78

79
var _ AppNode = (*Node)(nil)
80

81
// NewNode builds new node struct
82
func NewNode(controller Controller, metrics *metrics.Metrics, config *Config) *Node {
67✔
83
        node := &Node{
67✔
84
                metrics:    metrics,
67✔
85
                config:     config,
67✔
86
                controller: controller,
67✔
87
                shutdownCh: make(chan struct{}),
67✔
88
                log:        log.WithFields(log.Fields{"context": "node"}),
67✔
89
        }
67✔
90

67✔
91
        node.hub = hub.NewHub(config.HubGopoolSize)
67✔
92

67✔
93
        if metrics != nil {
134✔
94
                node.registerMetrics()
67✔
95
        }
67✔
96

97
        return node
67✔
98
}
99

100
// Start runs all the required goroutines
101
func (n *Node) Start() error {
27✔
102
        go n.hub.Run()
27✔
103
        go n.collectStats()
27✔
104

27✔
105
        if err := n.broker.Start(); err != nil {
27✔
UNCOV
106
                return err
×
UNCOV
107
        }
×
108

109
        return nil
27✔
110
}
111

112
// SetDisconnector set disconnector for the node
113
func (n *Node) SetDisconnector(d Disconnector) {
67✔
114
        n.disconnector = d
67✔
115
}
67✔
116

117
func (n *Node) SetBroker(b broker.Broker) {
71✔
118
        n.broker = b
71✔
119
}
71✔
120

121
// HandleCommand parses incoming message from client and
122
// execute the command (if recognized)
123
func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
3,337 all except 4737922678.1 and 4737922678.2 ✔
124
        s.Log.Debugf("Incoming message: %v", msg)
3,337 all except 4737922678.1 and 4737922678.2 ✔
125
        switch msg.Command {
3,337 all except 4737922678.1 and 4737922678.2 ✔
126
        case "subscribe":
2,274 all except 4737922678.1 and 4737922678.2 ✔
127
                _, err = n.Subscribe(s, msg)
2,274 all except 4737922678.1 and 4737922678.2 ✔
128
        case "unsubscribe":
24 all except 4737922678.3, 4737922678.12, 4737922678.1, and 4737922678.2 ✔
129
                _, err = n.Unsubscribe(s, msg)
24 all except 4737922678.3, 4737922678.12, 4737922678.1, and 4737922678.2 ✔
130
        case "message":
1,033 all except 4737922678.1 and 4737922678.2 ✔
131
                _, err = n.Perform(s, msg)
1,033 all except 4737922678.1 and 4737922678.2 ✔
132
        case "history":
6 only 4737922678.7, 4737922678.8, and 4737922678.9 ✔
133
                err = n.History(s, msg)
6 only 4737922678.7, 4737922678.8, and 4737922678.9 ✔
134
        default:
×
135
                err = fmt.Errorf("Unknown command: %s", msg.Command)
×
136
        }
137

138
        return
3,337 all except 4737922678.1 and 4737922678.2 ✔
139
}
140

141
// HandleBroadcast parses incoming broadcast message, record it and re-transmit to other nodes
142
func (n *Node) HandleBroadcast(raw []byte) {
165 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
143
        msg, err := common.PubSubMessageFromJSON(raw)
165 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
144

165 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
145
        if err != nil {
165 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
UNCOV
146
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
UNCOV
147
                n.log.Warnf("Failed to parse pubsub message '%s' with error: %v", raw, err)
×
UNCOV
148
                return
×
UNCOV
149
        }
×
150

151
        switch v := msg.(type) {
165 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
152
        case common.StreamMessage:
161 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
153
                n.broker.HandleBroadcast(&v)
161 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.10, 4737922678.1, and 4737922678.2 ✔
154
        case common.RemoteCommandMessage:
4 only 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
155
                n.broker.HandleCommand(&v)
4 only 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
156
        }
157
}
158

159
// HandlePubSub parses incoming pubsub message and broadcast it to all clients (w/o using a broker)
160
func (n *Node) HandlePubSub(raw []byte) {
1,119 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
161
        msg, err := common.PubSubMessageFromJSON(raw)
1,119 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
162

1,119 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
163
        if err != nil {
1,119 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
UNCOV
164
                n.metrics.CounterIncrement(metricsUnknownBroadcast)
×
UNCOV
165
                n.log.Warnf("Failed to parse pubsub message '%s' with error: %v", raw, err)
×
UNCOV
166
                return
×
UNCOV
167
        }
×
168

169
        switch v := msg.(type) {
1,119 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
170
        case common.StreamMessage:
1,113 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
171
                n.Broadcast(&v)
1,113 all except 4737922678.7, 4737922678.8, 4737922678.9, and 4737922678.10 ✔
172
        case common.RemoteCommandMessage:
6 only 4737922678.6, 4737922678.5, 4737922678.4, 4737922678.11, 4737922678.1, and 4737922678.2 ✔
173
                n.ExecuteRemoteCommand(&v)
6 only 4737922678.6, 4737922678.5, 4737922678.4, 4737922678.11, 4737922678.1, and 4737922678.2 ✔
174
        }
175
}
176

177
func (n *Node) LookupSession(id string) *Session {
4 only 4737922678.1 and 4737922678.2 ✔
178
        hubSession := n.hub.FindByIdentifier(id)
4 only 4737922678.1 and 4737922678.2 ✔
179
        session, _ := hubSession.(*Session)
4 only 4737922678.1 and 4737922678.2 ✔
180
        return session
4 only 4737922678.1 and 4737922678.2 ✔
181
}
4 only 4737922678.1 and 4737922678.2 ✔
182

183
// Shutdown stops all services (hub, controller)
184
func (n *Node) Shutdown() (err error) {
27✔
185
        n.shutdownMu.Lock()
27✔
186
        if n.closed {
27✔
UNCOV
187
                n.shutdownMu.Unlock()
×
UNCOV
188
                return errors.New("Already shut down")
×
UNCOV
189
        }
×
190

191
        close(n.shutdownCh)
27✔
192

27✔
193
        n.closed = true
27✔
194
        n.shutdownMu.Unlock()
27✔
195

27✔
196
        if n.hub != nil {
54✔
197
                n.hub.Shutdown()
27✔
198

27✔
199
                active := n.hub.Size()
27✔
200

27✔
201
                if active > 0 {
39✔
202
                        n.log.Infof("Closing active connections: %d", active)
12 all except 4737922678.3 and 4737922678.12 ✔
203
                        disconnectMessage := common.NewDisconnectMessage(common.SERVER_RESTART_REASON, true)
12 all except 4737922678.3 and 4737922678.12 ✔
204

12 all except 4737922678.3 and 4737922678.12 ✔
205
                        n.hub.DisconnectSesssions(disconnectMessage, common.SERVER_RESTART_REASON)
12 all except 4737922678.3 and 4737922678.12 ✔
206

12 all except 4737922678.3 and 4737922678.12 ✔
207
                        n.log.Info("All active connections closed")
12 all except 4737922678.3 and 4737922678.12 ✔
208

12 all except 4737922678.3 and 4737922678.12 ✔
209
                        // Wait to make sure that disconnect queue is not empty
12 all except 4737922678.3 and 4737922678.12 ✔
210
                        time.Sleep(time.Second)
12 all except 4737922678.3 and 4737922678.12 ✔
211
                }
12 all except 4737922678.3 and 4737922678.12 ✔
212
        }
213

214
        if n.disconnector != nil {
54✔
215
                err := n.disconnector.Shutdown()
27✔
216

27✔
217
                if err != nil {
29✔
218
                        n.log.Warnf("%v", err)
2 only 4737922678.12 ✔
219
                }
2 only 4737922678.12 ✔
220
        }
221

222
        if n.controller != nil {
54✔
223
                err := n.controller.Shutdown()
27✔
224

27✔
225
                if err != nil {
31✔
226
                        n.log.Warnf("%v", err)
4 only 4737922678.9, 4737922678.11, and 4737922678.12 ✔
227
                }
4 only 4737922678.9, 4737922678.11, and 4737922678.12 ✔
228
        }
229

230
        return
27✔
231
}
232

233
type authOptions struct {
234
        DisconnectOnFailure bool
235
}
236

237
func newAuthOptions(modifiers []AuthOption) *authOptions {
2,348✔
238
        base := &authOptions{
2,348✔
239
                DisconnectOnFailure: true,
2,348✔
240
        }
2,348✔
241

2,348✔
242
        for _, modifier := range modifiers {
2,348✔
UNCOV
243
                modifier(base)
×
UNCOV
244
        }
×
245

246
        return base
2,348✔
247
}
248

249
type AuthOption = func(*authOptions)
250

UNCOV
251
func WithDisconnectOnFailure(disconnect bool) AuthOption {
×
UNCOV
252
        return func(opts *authOptions) {
×
UNCOV
253
                opts.DisconnectOnFailure = disconnect
×
UNCOV
254
        }
×
255
}
256

257
// Authenticate calls controller to perform authentication.
258
// If authentication is successful, session is registered with a hub.
259
func (n *Node) Authenticate(s *Session) (res *common.ConnectResult, err error) {
2,348✔
260
        return n.AuthenticateWithOptions(s)
2,348✔
261
}
2,348✔
262

263
// AuthenticateWithOptions provides more control on how authentication is performed.
264
func (n *Node) AuthenticateWithOptions(s *Session, options ...AuthOption) (res *common.ConnectResult, err error) {
2,348✔
265
        opts := newAuthOptions(options)
2,348✔
266

2,348✔
267
        restored := n.TryRestoreSession(s)
2,348✔
268

2,348✔
269
        if restored {
2,357✔
270
                return &common.ConnectResult{Status: common.SUCCESS}, nil
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
271
        }
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
272

273
        res, err = n.controller.Authenticate(s.GetID(), s.env)
2,339✔
274

2,339✔
275
        if err != nil {
2,341✔
276
                s.Disconnect("Auth Error", ws.CloseInternalServerErr)
2 only 4737922678.1 and 4737922678.2 ✔
277
                return
2 only 4737922678.1 and 4737922678.2 ✔
278
        }
2 only 4737922678.1 and 4737922678.2 ✔
279

280
        if res.Status == common.SUCCESS {
4,648✔
281
                s.SetIdentifiers(res.Identifier)
2,311✔
282
                s.Connected = true
2,311✔
283

2,311✔
284
                n.hub.AddSession(s)
2,311✔
285
        } else {
2,337✔
286
                if res.Status == common.FAILURE {
52 all except 4737922678.3 and 4737922678.12 ✔
287
                        n.metrics.CounterIncrement(metricsFailedAuths)
26 all except 4737922678.3 and 4737922678.12 ✔
288
                }
26 all except 4737922678.3 and 4737922678.12 ✔
289

290
                if opts.DisconnectOnFailure {
52 all except 4737922678.3 and 4737922678.12 ✔
291
                        defer s.Disconnect("Auth Failed", ws.CloseNormalClosure)
26 all except 4737922678.3 and 4737922678.12 ✔
292
                }
26 all except 4737922678.3 and 4737922678.12 ✔
293
        }
294

295
        n.handleCallReply(s, res.ToCallResult())
2,337✔
296

2,337✔
297
        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
2,337✔
298
                s.Log.Errorf("Failed to persist session in cache: %v", berr)
×
299
        }
×
300

301
        return
2,337✔
302
}
303

304
func (n *Node) TryRestoreSession(s *Session) (restored bool) {
2,348✔
305
        sid := s.GetID()
2,348✔
306
        prev_sid := s.PrevSid()
2,348✔
307

2,348✔
308
        if prev_sid == "" {
4,685✔
309
                return false
2,337✔
310
        }
2,337✔
311

312
        cached_session, err := n.broker.RestoreSession(prev_sid)
11 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
313

11 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
314
        if err != nil {
11 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
315
                s.Log.Errorf("Failed to fetch session cache %s: %s", prev_sid, err.Error())
×
UNCOV
316
                return false
×
UNCOV
317
        }
×
318

319
        if cached_session == nil {
13 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
320
                s.Log.Debugf("Couldn't find session to restore from: %s", prev_sid)
2 only 4737922678.1 and 4737922678.2 ✔
321
                return false
2 only 4737922678.1 and 4737922678.2 ✔
322
        }
2 only 4737922678.1 and 4737922678.2 ✔
323

324
        err = s.RestoreFromCache(cached_session)
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
325

9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
326
        if err != nil {
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
327
                s.Log.Errorf("Failed to restore session from cache %s: %s", prev_sid, err.Error())
×
UNCOV
328
                return false
×
UNCOV
329
        }
×
330

331
        s.Log.Debugf("Session restored from: %s", prev_sid)
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
332

9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
333
        s.Connected = true
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
334
        n.hub.AddSession(s)
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
335

9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
336
        // Resubscribe to streams
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
337
        for identifier, channel_streams := range s.subscriptions.channels {
20 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
338
                for stream := range channel_streams {
28 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
339
                        streamId := n.broker.Subscribe(stream)
17 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
340
                        n.hub.SubscribeSession(sid, streamId, identifier)
17 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
341
                }
17 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
342
        }
343

344
        // Send welcome message
345
        s.Send(&common.Reply{
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
346
                Type:        common.WelcomeType,
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
347
                Sid:         sid,
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
348
                Restored:    true,
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
349
                RestoredIDs: utils.Keys(s.subscriptions.channels),
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
350
        })
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
351

9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
352
        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
353
                s.Log.Errorf("Failed to persist session in cache: %v", berr)
×
UNCOV
354
        }
×
355

356
        return true
9 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
357
}
358

359
// Subscribe subscribes session to a channel
360
func (n *Node) Subscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
2,292✔
361
        s.smu.Lock()
2,292✔
362

2,292✔
363
        if ok := s.subscriptions.HasChannel(msg.Identifier); ok {
2,292✔
UNCOV
364
                s.smu.Unlock()
×
UNCOV
365
                err = fmt.Errorf("Already subscribed to %s", msg.Identifier)
×
UNCOV
366
                return
×
UNCOV
367
        }
×
368

369
        res, err = n.controller.Subscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
2,292✔
370

2,292✔
371
        if err != nil {
2,294✔
372
                if res == nil || res.Status == common.ERROR {
4 only 4737922678.1 and 4737922678.2 ✔
373
                        s.Log.Errorf("Subscribe error: %v", err)
2 only 4737922678.1 and 4737922678.2 ✔
374
                }
2 only 4737922678.1 and 4737922678.2 ✔
375
        } else {
2,290✔
376
                s.subscriptions.AddChannel(msg.Identifier)
2,290✔
377
                s.Log.Debugf("Subscribed to channel: %s", msg.Identifier)
2,290✔
378
        }
2,290✔
379

380
        s.smu.Unlock()
2,292✔
381

2,292✔
382
        if res != nil {
4,582✔
383
                n.handleCommandReply(s, msg, res)
2,290✔
384
        }
2,290✔
385

386
        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
2,292✔
UNCOV
387
                s.Log.Errorf("Failed to persist session in cache: %v", berr)
×
UNCOV
388
        }
×
389

390
        if msg.History.Since > 0 || msg.History.Streams != nil {
2,299✔
391
                return res, n.History(s, msg)
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
392
        }
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
393

394
        return
2,285✔
395
}
396

397
// Unsubscribe unsubscribes session from a channel
398
func (n *Node) Unsubscribe(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
28 all except 4737922678.3 and 4737922678.12 ✔
399
        s.smu.Lock()
28 all except 4737922678.3 and 4737922678.12 ✔
400

28 all except 4737922678.3 and 4737922678.12 ✔
401
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
28 all except 4737922678.3 and 4737922678.12 ✔
UNCOV
402
                s.smu.Unlock()
×
UNCOV
403
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
UNCOV
404
                return
×
UNCOV
405
        }
×
406

407
        res, err = n.controller.Unsubscribe(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier)
28 all except 4737922678.3 and 4737922678.12 ✔
408

28 all except 4737922678.3 and 4737922678.12 ✔
409
        if err != nil {
30 all except 4737922678.3 and 4737922678.12 ✔
410
                if res == nil || res.Status == common.ERROR {
4 only 4737922678.1 and 4737922678.2 ✔
411
                        s.Log.Errorf("Unsubscribe error: %v", err)
2 only 4737922678.1 and 4737922678.2 ✔
412
                }
2 only 4737922678.1 and 4737922678.2 ✔
413
        } else {
26 all except 4737922678.3 and 4737922678.12 ✔
414
                // Make sure to remove all streams subscriptions
26 all except 4737922678.3 and 4737922678.12 ✔
415
                res.StopAllStreams = true
26 all except 4737922678.3 and 4737922678.12 ✔
416

26 all except 4737922678.3 and 4737922678.12 ✔
417
                s.subscriptions.RemoveChannel(msg.Identifier)
26 all except 4737922678.3 and 4737922678.12 ✔
418

26 all except 4737922678.3 and 4737922678.12 ✔
419
                s.Log.Debugf("Unsubscribed from channel: %s", msg.Identifier)
26 all except 4737922678.3 and 4737922678.12 ✔
420
        }
26 all except 4737922678.3 and 4737922678.12 ✔
421

422
        s.smu.Unlock()
28 all except 4737922678.3 and 4737922678.12 ✔
423

28 all except 4737922678.3 and 4737922678.12 ✔
424
        if res != nil {
54 all except 4737922678.3 and 4737922678.12 ✔
425
                n.handleCommandReply(s, msg, res)
26 all except 4737922678.3 and 4737922678.12 ✔
426
        }
26 all except 4737922678.3 and 4737922678.12 ✔
427

428
        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
28 all except 4737922678.3 and 4737922678.12 ✔
UNCOV
429
                s.Log.Errorf("Failed to persist session in cache: %v", berr)
×
UNCOV
430
        }
×
431

432
        return
28 all except 4737922678.3 and 4737922678.12 ✔
433
}
434

435
// Perform executes client command
436
func (n *Node) Perform(s *Session, msg *common.Message) (res *common.CommandResult, err error) {
1,049✔
437
        s.smu.Lock()
1,049✔
438

1,049✔
439
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
1,049✔
UNCOV
440
                s.smu.Unlock()
×
UNCOV
441
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
UNCOV
442
                return
×
UNCOV
443
        }
×
444

445
        s.smu.Unlock()
1,049✔
446

1,049✔
447
        data, ok := msg.Data.(string)
1,049✔
448

1,049✔
449
        if !ok {
1,049✔
UNCOV
450
                err = fmt.Errorf("Perform data must be a string, got %v", msg.Data)
×
UNCOV
451
                return
×
UNCOV
452
        }
×
453

454
        res, err = n.controller.Perform(s.GetID(), s.env, s.GetIdentifiers(), msg.Identifier, data)
1,049✔
455

1,049✔
456
        if err != nil {
1,051✔
457
                if res == nil || res.Status == common.ERROR {
4 only 4737922678.1 and 4737922678.2 ✔
458
                        s.Log.Errorf("Perform error: %v", err)
2 only 4737922678.1 and 4737922678.2 ✔
459
                }
2 only 4737922678.1 and 4737922678.2 ✔
460
        } else {
1,047✔
461
                s.Log.Debugf("Perform result: %v", res)
1,047✔
462
        }
1,047✔
463

464
        if res != nil {
2,096✔
465
                if n.handleCommandReply(s, msg, res) {
1,073✔
466
                        if berr := n.broker.CommitSession(s.GetID(), s); berr != nil {
26 all except 4737922678.3 and 4737922678.12 ✔
467
                                s.Log.Errorf("Failed to persist session in cache: %v", berr)
×
468
                        }
×
469
                }
470
        }
471

472
        return
1,049✔
473
}
474

475
// History fetches the stream history for the specified identifier
476
func (n *Node) History(s *Session, msg *common.Message) (err error) {
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
477
        s.smu.Lock()
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
478

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
479
        if ok := s.subscriptions.HasChannel(msg.Identifier); !ok {
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
480
                s.smu.Unlock()
×
UNCOV
481
                err = fmt.Errorf("Unknown subscription %s", msg.Identifier)
×
UNCOV
482
                return
×
UNCOV
483
        }
×
484

485
        subscriptionStreams := s.subscriptions.StreamsFor(msg.Identifier)
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
486

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
487
        s.smu.Unlock()
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
488

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
489
        history := msg.History
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
490

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
491
        if history.Since == 0 && history.Streams == nil {
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
492
                err = fmt.Errorf("History request is missing, got %v", msg)
×
UNCOV
493
                return
×
UNCOV
494
        }
×
495

496
        backlog, err := n.retreiveHistory(&history, subscriptionStreams)
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
497

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
498
        if err != nil {
23 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
499
                s.Send(&common.Reply{
2 only 4737922678.1 and 4737922678.2 ✔
500
                        Type:       common.HistoryRejectedType,
2 only 4737922678.1 and 4737922678.2 ✔
501
                        Identifier: msg.Identifier,
2 only 4737922678.1 and 4737922678.2 ✔
502
                })
2 only 4737922678.1 and 4737922678.2 ✔
503

2 only 4737922678.1 and 4737922678.2 ✔
504
                return err
2 only 4737922678.1 and 4737922678.2 ✔
505
        }
2 only 4737922678.1 and 4737922678.2 ✔
506

507
        for _, el := range backlog {
68 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
508
                s.Send(el.ToReplyFor(msg.Identifier))
49 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
509
        }
49 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
510

511
        s.Send(&common.Reply{
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
512
                Type:       common.HistoryConfirmedType,
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
513
                Identifier: msg.Identifier,
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
514
        })
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
515

19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
516
        return
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
517
}
518

519
func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string) (backlog []common.StreamMessage, err error) {
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
520
        backlog = []common.StreamMessage{}
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
521

21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
522
        for _, stream := range streams {
49 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
523
                if history.Streams != nil {
41 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
524
                        pos, ok := history.Streams[stream]
13 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
525

13 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
526
                        if ok {
20 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
527
                                streamBacklog, err := n.broker.HistoryFrom(stream, pos.Epoch, pos.Offset)
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
528

7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
529
                                if err != nil {
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
UNCOV
530
                                        return nil, err
×
UNCOV
531
                                }
×
532

533
                                backlog = append(backlog, streamBacklog...)
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
534

7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
535
                                continue
7 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
536
                        }
537
                }
538

539
                if history.Since > 0 {
40 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
540
                        streamBacklog, err := n.broker.HistorySince(stream, history.Since)
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
541

19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
542
                        if err != nil {
21 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
543
                                return nil, err
2 only 4737922678.1 and 4737922678.2 ✔
544
                        }
2 only 4737922678.1 and 4737922678.2 ✔
545

546
                        backlog = append(backlog, streamBacklog...)
17 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
547
                }
548
        }
549

550
        return backlog, nil
19 only 4737922678.7, 4737922678.8, 4737922678.9, 4737922678.1, and 4737922678.2 ✔
551
}
552

553
// Broadcast message to stream (locally)
554
func (n *Node) Broadcast(msg *common.StreamMessage) {
1,249✔
555
        n.metrics.CounterIncrement(metricsBroadcastMsg)
1,249✔
556
        n.log.Debugf("Incoming broadcast message: %v", msg)
1,249✔
557
        n.hub.BroadcastMessage(msg)
1,249✔
558
}
1,249✔
559

560
// Execute remote command (locally)
561
func (n *Node) ExecuteRemoteCommand(msg *common.RemoteCommandMessage) {
10 all except 4737922678.3 and 4737922678.12 ✔
562
        // TODO: Add remote commands metrics
10 all except 4737922678.3 and 4737922678.12 ✔
563
        // n.metrics.CounterIncrement(metricsRemoteCommandsMsg)
10 all except 4737922678.3 and 4737922678.12 ✔
564
        n.log.Debugf("Incoming remote command: %v", msg)
10 all except 4737922678.3 and 4737922678.12 ✔
565

10 all except 4737922678.3 and 4737922678.12 ✔
566
        switch msg.Command { // nolint:gocritic
10 all except 4737922678.3 and 4737922678.12 ✔
567
        case "disconnect":
10 all except 4737922678.3 and 4737922678.12 ✔
568
                dmsg, err := msg.ToRemoteDisconnectMessage()
10 all except 4737922678.3 and 4737922678.12 ✔
569
                if err != nil {
10 all except 4737922678.3 and 4737922678.12 ✔
UNCOV
570
                        n.log.Warnf("Failed to parse remote disconnect command: %v", err)
×
UNCOV
571
                        return
×
UNCOV
572
                }
×
573

574
                n.RemoteDisconnect(dmsg)
10 all except 4737922678.3 and 4737922678.12 ✔
575
        }
576
}
577

578
// Disconnect adds session to disconnector queue and unregister session from hub
579
func (n *Node) Disconnect(s *Session) error {
2,314✔
580
        n.hub.RemoveSessionLater(s)
2,314✔
581
        n.broker.FinishSession(s.GetID()) // nolint:errcheck
2,314✔
582
        return n.disconnector.Enqueue(s)
2,314✔
583
}
2,314✔
584

585
// DisconnectNow execute disconnect on controller
586
func (n *Node) DisconnectNow(s *Session) error {
1,411✔
587
        sessionSubscriptions := s.subscriptions.Channels()
1,411✔
588

1,411✔
589
        ids := s.GetIdentifiers()
1,411✔
590

1,411✔
591
        s.Log.Debugf("Disconnect %s %s %v %v", ids, s.env.URL, s.env.Headers, sessionSubscriptions)
1,411✔
592

1,411✔
593
        err := n.controller.Disconnect(
1,411✔
594
                s.GetID(),
1,411✔
595
                s.env,
1,411✔
596
                ids,
1,411✔
597
                sessionSubscriptions,
1,411✔
598
        )
1,411✔
599

1,411✔
600
        if err != nil {
1,415✔
601
                s.Log.Errorf("Disconnect error: %v", err)
4 only 4737922678.12 ✔
602
        }
4 only 4737922678.12 ✔
603

604
        return err
1,408✔
605
}
606

607
// RemoteDisconnect find a session by identifier and closes it
608
func (n *Node) RemoteDisconnect(msg *common.RemoteDisconnectMessage) {
10 all except 4737922678.3 and 4737922678.12 ✔
609
        n.metrics.CounterIncrement(metricsBroadcastMsg)
10 all except 4737922678.3 and 4737922678.12 ✔
610
        n.log.Debugf("Incoming pubsub command: %v", msg)
10 all except 4737922678.3 and 4737922678.12 ✔
611
        n.hub.RemoteDisconnect(msg)
10 all except 4737922678.3 and 4737922678.12 ✔
612
}
10 all except 4737922678.3 and 4737922678.12 ✔
613

614
func transmit(s *Session, transmissions []string) {
5,660✔
615
        for _, msg := range transmissions {
11,338✔
616
                s.SendJSONTransmission(msg)
5,678✔
617
        }
5,678✔
618
}
619

620
func (n *Node) handleCommandReply(s *Session, msg *common.Message, reply *common.CommandResult) bool {
3,363✔
621
        // Returns true if any of the subscriptions/channel/connections state has changed
3,363✔
622
        isDirty := false
3,363✔
623

3,363✔
624
        if reply.Disconnect {
3,363✔
UNCOV
625
                defer s.Disconnect("Command Failed", ws.CloseAbnormalClosure)
×
UNCOV
626
        }
×
627

628
        uid := s.GetID()
3,363✔
629

3,363✔
630
        if reply.StopAllStreams {
3,399✔
631
                n.hub.UnsubscribeSessionFromChannel(uid, msg.Identifier)
36 all except 4737922678.3 and 4737922678.12 ✔
632
                removedStreams := s.subscriptions.RemoveChannelStreams(msg.Identifier)
36 all except 4737922678.3 and 4737922678.12 ✔
633

36 all except 4737922678.3 and 4737922678.12 ✔
634
                for _, stream := range removedStreams {
36 all except 4737922678.3 and 4737922678.12 ✔
UNCOV
635
                        isDirty = true
×
UNCOV
636
                        n.broker.Unsubscribe(stream)
×
UNCOV
637
                }
×
638

639
        } else if reply.StoppedStreams != nil {
3,339✔
640
                isDirty = true
12 all except 4737922678.3 and 4737922678.12 ✔
641

12 all except 4737922678.3 and 4737922678.12 ✔
642
                for _, stream := range reply.StoppedStreams {
24 all except 4737922678.3 and 4737922678.12 ✔
643
                        streamId := n.broker.Unsubscribe(stream)
12 all except 4737922678.3 and 4737922678.12 ✔
644
                        n.hub.UnsubscribeSession(uid, streamId, msg.Identifier)
12 all except 4737922678.3 and 4737922678.12 ✔
645
                        s.subscriptions.RemoveChannelStream(msg.Identifier, streamId)
12 all except 4737922678.3 and 4737922678.12 ✔
646
                }
12 all except 4737922678.3 and 4737922678.12 ✔
647
        }
648

649
        if reply.Streams != nil {
5,587✔
650
                isDirty = true
2,224✔
651

2,224✔
652
                for _, stream := range reply.Streams {
4,476✔
653
                        streamId := n.broker.Subscribe(stream)
2,252✔
654
                        n.hub.SubscribeSession(uid, streamId, msg.Identifier)
2,252✔
655
                        s.subscriptions.AddChannelStream(msg.Identifier, streamId)
2,252✔
656
                }
2,252✔
657
        }
658

659
        if reply.IState != nil {
3,399✔
660
                isDirty = true
36 all except 4737922678.3 and 4737922678.12 ✔
661

36 all except 4737922678.3 and 4737922678.12 ✔
662
                s.smu.Lock()
36 all except 4737922678.3 and 4737922678.12 ✔
663
                s.env.MergeChannelState(msg.Identifier, &reply.IState)
36 all except 4737922678.3 and 4737922678.12 ✔
664
                s.smu.Unlock()
36 all except 4737922678.3 and 4737922678.12 ✔
665
        }
36 all except 4737922678.3 and 4737922678.12 ✔
666

667
        isConnectionDirty := n.handleCallReply(s, reply.ToCallResult())
3,363✔
668
        return isDirty || isConnectionDirty
3,363✔
669
}
670

671
func (n *Node) handleCallReply(s *Session, reply *common.CallResult) bool {
5,700✔
672
        isDirty := false
5,700✔
673

5,700✔
674
        if reply.CState != nil {
5,706✔
675
                isDirty = true
6 only 4737922678.1 and 4737922678.2 ✔
676

6 only 4737922678.1 and 4737922678.2 ✔
677
                s.smu.Lock()
6 only 4737922678.1 and 4737922678.2 ✔
678
                s.env.MergeConnectionState(&reply.CState)
6 only 4737922678.1 and 4737922678.2 ✔
679
                s.smu.Unlock()
6 only 4737922678.1 and 4737922678.2 ✔
680
        }
6 only 4737922678.1 and 4737922678.2 ✔
681

682
        if reply.Broadcasts != nil {
5,700✔
UNCOV
683
                for _, broadcast := range reply.Broadcasts {
×
UNCOV
684
                        n.Broadcast(broadcast)
×
UNCOV
685
                }
×
686
        }
687

688
        if reply.Transmissions != nil {
11,360✔
689
                transmit(s, reply.Transmissions)
5,660✔
690
        }
5,660✔
691

692
        return isDirty
5,700✔
693
}
694

695
func (n *Node) collectStats() {
27✔
696
        statsCollectInterval := time.Duration(n.config.StatsRefreshInterval) * time.Second
27✔
697

27✔
698
        for {
128✔
699
                select {
101✔
700
                case <-n.shutdownCh:
27✔
701
                        return
27✔
702
                case <-time.After(statsCollectInterval):
74 all except 4737922678.1 and 4737922678.2 ✔
703
                        n.collectStatsOnce()
74 all except 4737922678.1 and 4737922678.2 ✔
704
                }
705
        }
706
}
707

708
func (n *Node) collectStatsOnce() {
74 all except 4737922678.1 and 4737922678.2 ✔
709
        n.metrics.GaugeSet(metricsGoroutines, uint64(runtime.NumGoroutine()))
74 all except 4737922678.1 and 4737922678.2 ✔
710

74 all except 4737922678.1 and 4737922678.2 ✔
711
        var m runtime.MemStats
74 all except 4737922678.1 and 4737922678.2 ✔
712
        runtime.ReadMemStats(&m)
74 all except 4737922678.1 and 4737922678.2 ✔
713
        n.metrics.GaugeSet(metricsMemSys, m.Sys)
74 all except 4737922678.1 and 4737922678.2 ✔
714

74 all except 4737922678.1 and 4737922678.2 ✔
715
        n.metrics.GaugeSet(metricsClientsNum, uint64(n.hub.Size()))
74 all except 4737922678.1 and 4737922678.2 ✔
716
        n.metrics.GaugeSet(metricsUniqClientsNum, uint64(n.hub.UniqSize()))
74 all except 4737922678.1 and 4737922678.2 ✔
717
        n.metrics.GaugeSet(metricsStreamsNum, uint64(n.hub.StreamsSize()))
74 all except 4737922678.1 and 4737922678.2 ✔
718
        n.metrics.GaugeSet(metricsDisconnectQueue, uint64(n.disconnector.Size()))
74 all except 4737922678.1 and 4737922678.2 ✔
719
}
74 all except 4737922678.1 and 4737922678.2 ✔
720

721
func (n *Node) registerMetrics() {
67✔
722
        n.metrics.RegisterGauge(metricsGoroutines, "The number of Go routines")
67✔
723
        n.metrics.RegisterGauge(metricsMemSys, "The total bytes of memory obtained from the OS")
67✔
724

67✔
725
        n.metrics.RegisterGauge(metricsClientsNum, "The number of active clients")
67✔
726
        n.metrics.RegisterGauge(metricsUniqClientsNum, "The number of unique clients (with respect to connection identifiers)")
67✔
727
        n.metrics.RegisterGauge(metricsStreamsNum, "The number of active broadcasting streams")
67✔
728
        n.metrics.RegisterGauge(metricsDisconnectQueue, "The size of delayed disconnect")
67✔
729

67✔
730
        n.metrics.RegisterCounter(metricsFailedAuths, "The total number of failed authentication attempts")
67✔
731
        n.metrics.RegisterCounter(metricsReceivedMsg, "The total number of received messages from clients")
67✔
732
        n.metrics.RegisterCounter(metricsFailedCommandReceived, "The total number of unrecognized messages received from clients")
67✔
733
        n.metrics.RegisterCounter(metricsBroadcastMsg, "The total number of messages received through PubSub (for broadcast)")
67✔
734
        n.metrics.RegisterCounter(metricsUnknownBroadcast, "The total number of unrecognized messages received through PubSub")
67✔
735

67✔
736
        n.metrics.RegisterCounter(metricsSentMsg, "The total number of messages sent to clients")
67✔
737
        n.metrics.RegisterCounter(metricsFailedSent, "The total number of messages failed to send to clients")
67✔
738

67✔
739
        n.metrics.RegisterCounter(metricsDataSent, "The total amount of bytes sent to clients")
67✔
740
        n.metrics.RegisterCounter(metricsDataReceived, "The total amount of bytes received from clients")
67✔
741
}
67✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc