• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 4745685227

pending completion
4745685227

push

github

Vladimir Dementyev
docs: add quick start to broker

5021 of 6405 relevant lines covered (78.39%)

7459.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.46
/node/session.go
1
package node
2

3
import (
4
        "encoding/json"
5
        "errors"
6
        "net/url"
7
        "sync"
8
        "time"
9

10
        "github.com/anycable/anycable-go/common"
11
        "github.com/anycable/anycable-go/encoders"
12
        "github.com/anycable/anycable-go/metrics"
13
        "github.com/anycable/anycable-go/ws"
14
        "github.com/apex/log"
15
)
16

17
const (
18
        writeWait = 10 * time.Second
19

20
        prevSessionHeader = "X-ANYCABLE-RESTORE-SID"
21
        prevSessionParam  = "sid"
22
)
23

24
// Executor handles incoming commands (messages)
25
type Executor interface {
26
        HandleCommand(*Session, *common.Message) error
27
        Disconnect(*Session) error
28
}
29

30
type SubscriptionState struct {
31
        channels map[string]map[string]struct{}
32
        mu       sync.RWMutex
33
}
34

35
func NewSubscriptionState() *SubscriptionState {
2,418✔
36
        return &SubscriptionState{channels: make(map[string]map[string]struct{})}
2,418✔
37
}
2,418✔
38

39
func (st *SubscriptionState) HasChannel(id string) bool {
3,396✔
40
        st.mu.RLock()
3,396✔
41
        defer st.mu.RUnlock()
3,396✔
42

3,396✔
43
        _, ok := st.channels[id]
3,396✔
44
        return ok
3,396✔
45
}
3,396✔
46

47
func (st *SubscriptionState) AddChannel(id string) {
2,335✔
48
        st.mu.Lock()
2,335✔
49
        defer st.mu.Unlock()
2,335✔
50

2,335✔
51
        st.channels[id] = make(map[string]struct{})
2,335✔
52
}
2,335✔
53

54
func (st *SubscriptionState) RemoveChannel(id string) {
26 all except 4745685227.3 and 4745685227.10 ✔
55
        st.mu.Lock()
26 all except 4745685227.3 and 4745685227.10 ✔
56
        defer st.mu.Unlock()
26 all except 4745685227.3 and 4745685227.10 ✔
57

26 all except 4745685227.3 and 4745685227.10 ✔
58
        delete(st.channels, id)
26 all except 4745685227.3 and 4745685227.10 ✔
59
}
26 all except 4745685227.3 and 4745685227.10 ✔
60

61
func (st *SubscriptionState) Channels() []string {
1,415✔
62
        st.mu.RLock()
1,415✔
63
        defer st.mu.RUnlock()
1,415✔
64

1,415✔
65
        keys := make([]string, len(st.channels))
1,415✔
66
        i := 0
1,415✔
67

1,415✔
68
        for k := range st.channels {
2,789✔
69
                keys[i] = k
1,374✔
70
                i++
1,374✔
71
        }
1,374✔
72
        return keys
1,415✔
73
}
74

75
func (st *SubscriptionState) ToMap() map[string][]string {
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
76
        st.mu.RLock()
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
77
        defer st.mu.RUnlock()
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
78

282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
79
        res := make(map[string][]string, len(st.channels))
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
80

282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
81
        for k, v := range st.channels {
437 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
82
                streams := make([]string, len(v))
155 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
83

155 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
84
                i := 0
155 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
85
                for name := range v {
304 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
86
                        streams[i] = name
149 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
87
                        i++
149 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
88
                }
149 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
89

90
                res[k] = streams
155 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
91
        }
92

93
        return res
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
94
}
95

96
func (st *SubscriptionState) AddChannelStream(id string, stream string) {
2,299✔
97
        st.mu.Lock()
2,299✔
98
        defer st.mu.Unlock()
2,299✔
99

2,299✔
100
        if _, ok := st.channels[id]; ok {
4,598✔
101
                st.channels[id][stream] = struct{}{}
2,299✔
102
        }
2,299✔
103
}
104

105
func (st *SubscriptionState) RemoveChannelStream(id string, stream string) {
16 all except 4745685227.3 and 4745685227.10 ✔
106
        st.mu.Lock()
16 all except 4745685227.3 and 4745685227.10 ✔
107
        defer st.mu.Unlock()
16 all except 4745685227.3 and 4745685227.10 ✔
108

16 all except 4745685227.3 and 4745685227.10 ✔
109
        if _, ok := st.channels[id]; ok {
32 all except 4745685227.3 and 4745685227.10 ✔
110
                delete(st.channels[id], stream)
16 all except 4745685227.3 and 4745685227.10 ✔
111
        }
16 all except 4745685227.3 and 4745685227.10 ✔
112
}
113

114
func (st *SubscriptionState) RemoveChannelStreams(id string) []string {
38 all except 4745685227.3 and 4745685227.10 ✔
115
        st.mu.Lock()
38 all except 4745685227.3 and 4745685227.10 ✔
116
        defer st.mu.Unlock()
38 all except 4745685227.3 and 4745685227.10 ✔
117

38 all except 4745685227.3 and 4745685227.10 ✔
118
        if streamNames, ok := st.channels[id]; ok {
50 all except 4745685227.3 and 4745685227.10 ✔
119
                st.channels[id] = make(map[string]struct{})
12 all except 4745685227.3 and 4745685227.10 ✔
120

12 all except 4745685227.3 and 4745685227.10 ✔
121
                streams := make([]string, len(streamNames))
12 all except 4745685227.3 and 4745685227.10 ✔
122

12 all except 4745685227.3 and 4745685227.10 ✔
123
                i := 0
12 all except 4745685227.3 and 4745685227.10 ✔
124
                for key := range streamNames {
16 all except 4745685227.3 and 4745685227.10 ✔
125
                        streams[i] = key
4 only 4745685227.2 and 4745685227.1 ✔
126
                        i++
4 only 4745685227.2 and 4745685227.1 ✔
127
                }
4 only 4745685227.2 and 4745685227.1 ✔
128

129
                return streams
12 all except 4745685227.3 and 4745685227.10 ✔
130
        }
131

132
        return nil
26 all except 4745685227.3 and 4745685227.10 ✔
133
}
134

135
func (st *SubscriptionState) StreamsFor(id string) []string {
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
136
        st.mu.RLock()
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
137
        defer st.mu.RUnlock()
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
138

47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
139
        if streamNames, ok := st.channels[id]; ok {
94 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
140
                streams := make([]string, len(streamNames))
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
141

47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
142
                i := 0
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
143
                for key := range streamNames {
110 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
144
                        streams[i] = key
63 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
145
                        i++
63 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
146
                }
63 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
147

148
                return streams
47 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
149
        }
150

151
        return nil
×
152
}
153

154
// Session represents active client
155
type Session struct {
156
        conn          Connection
157
        uid           string
158
        encoder       encoders.Encoder
159
        executor      Executor
160
        metrics       metrics.Instrumenter
161
        env           *common.SessionEnv
162
        subscriptions *SubscriptionState
163
        closed        bool
164

165
        // Main mutex (for read/write and important session updates)
166
        mu sync.Mutex
167
        // Mutex for protocol-related state (env, subscriptions)
168
        smu sync.Mutex
169

170
        sendCh chan *ws.SentFrame
171

172
        pingTimer    *time.Timer
173
        pingInterval time.Duration
174

175
        pingTimestampPrecision string
176

177
        Connected bool
178
        // Could be used to store arbitrary data within a session
179
        InternalState map[string]interface{}
180
        Log           *log.Entry
181
}
182

183
// NewSession build a new Session struct from ws connetion and http request
184
func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string) *Session {
2,324 all except 4745685227.2 and 4745685227.1 ✔
185
        session := &Session{
2,324 all except 4745685227.2 and 4745685227.1 ✔
186
                conn:                   conn,
2,324 all except 4745685227.2 and 4745685227.1 ✔
187
                metrics:                node.metrics,
2,324 all except 4745685227.2 and 4745685227.1 ✔
188
                env:                    common.NewSessionEnv(url, headers),
2,324 all except 4745685227.2 and 4745685227.1 ✔
189
                subscriptions:          NewSubscriptionState(),
2,324 all except 4745685227.2 and 4745685227.1 ✔
190
                sendCh:                 make(chan *ws.SentFrame, 256),
2,324 all except 4745685227.2 and 4745685227.1 ✔
191
                closed:                 false,
2,324 all except 4745685227.2 and 4745685227.1 ✔
192
                Connected:              false,
2,324 all except 4745685227.2 and 4745685227.1 ✔
193
                pingInterval:           time.Duration(node.config.PingInterval) * time.Second,
2,324 all except 4745685227.2 and 4745685227.1 ✔
194
                pingTimestampPrecision: node.config.PingTimestampPrecision,
2,324 all except 4745685227.2 and 4745685227.1 ✔
195
                // Use JSON by default
2,324 all except 4745685227.2 and 4745685227.1 ✔
196
                encoder: encoders.JSON{},
2,324 all except 4745685227.2 and 4745685227.1 ✔
197
                // Use Action Cable executor by default (implemented by node)
2,324 all except 4745685227.2 and 4745685227.1 ✔
198
                executor: node,
2,324 all except 4745685227.2 and 4745685227.1 ✔
199
        }
2,324 all except 4745685227.2 and 4745685227.1 ✔
200

2,324 all except 4745685227.2 and 4745685227.1 ✔
201
        session.uid = uid
2,324 all except 4745685227.2 and 4745685227.1 ✔
202

2,324 all except 4745685227.2 and 4745685227.1 ✔
203
        ctx := node.log.WithFields(log.Fields{
2,324 all except 4745685227.2 and 4745685227.1 ✔
204
                "sid": session.uid,
2,324 all except 4745685227.2 and 4745685227.1 ✔
205
        })
2,324 all except 4745685227.2 and 4745685227.1 ✔
206

2,324 all except 4745685227.2 and 4745685227.1 ✔
207
        session.Log = ctx
2,324 all except 4745685227.2 and 4745685227.1 ✔
208

2,324 all except 4745685227.2 and 4745685227.1 ✔
209
        session.addPing()
2,324 all except 4745685227.2 and 4745685227.1 ✔
210
        go session.SendMessages()
2,324 all except 4745685227.2 and 4745685227.1 ✔
211

2,324 all except 4745685227.2 and 4745685227.1 ✔
212
        return session
2,324 all except 4745685227.2 and 4745685227.1 ✔
213
}
2,324 all except 4745685227.2 and 4745685227.1 ✔
214

215
func (s *Session) SetEncoder(enc encoders.Encoder) {
×
216
        s.encoder = enc
×
217
}
×
218

219
func (s *Session) SetExecutor(ex Executor) {
×
220
        s.executor = ex
×
221
}
×
222

223
func (s *Session) SetMetrics(m metrics.Instrumenter) {
×
224
        s.metrics = m
×
225
}
×
226

227
func (s *Session) GetEnv() *common.SessionEnv {
2 only 4745685227.2 and 4745685227.1 ✔
228
        return s.env
2 only 4745685227.2 and 4745685227.1 ✔
229
}
2 only 4745685227.2 and 4745685227.1 ✔
230

231
func (s *Session) SetEnv(env *common.SessionEnv) {
2 only 4745685227.2 and 4745685227.1 ✔
232
        s.env = env
2 only 4745685227.2 and 4745685227.1 ✔
233
}
2 only 4745685227.2 and 4745685227.1 ✔
234

235
func (s *Session) SetIdleTimeout(val time.Duration) {
×
236
        time.AfterFunc(val, s.maybeDisconnectIdle)
×
237
}
×
238

239
func (s *Session) maybeDisconnectIdle() {
×
240
        s.mu.Lock()
×
241

×
242
        if s.Connected {
×
243
                s.mu.Unlock()
×
244
                return
×
245
        }
×
246

247
        s.mu.Unlock()
×
248

×
249
        s.Log.Warnf("Disconnecting idle session")
×
250

×
251
        s.Send(common.NewDisconnectMessage(common.IDLE_TIMEOUT_REASON, false))
×
252
        s.Disconnect("Idle Timeout", ws.CloseNormalClosure)
×
253
}
254

255
func (s *Session) GetID() string {
24,474✔
256
        return s.uid
24,474✔
257
}
24,474✔
258

259
func (s *Session) SetID(id string) {
×
260
        s.uid = id
×
261
}
×
262

263
func (s *Session) GetIdentifiers() string {
9,702✔
264
        return s.env.Identifiers
9,702✔
265
}
9,702✔
266

267
func (s *Session) SetIdentifiers(ids string) {
2,408✔
268
        s.env.Identifiers = ids
2,408✔
269
}
2,408✔
270

271
// Merge connection and channel states into current env.
272
// This method locks the state for writing (so, goroutine-safe)
273
func (s *Session) MergeEnv(env *common.SessionEnv) {
4 only 4745685227.2 and 4745685227.1 ✔
274
        s.smu.Lock()
4 only 4745685227.2 and 4745685227.1 ✔
275
        defer s.smu.Unlock()
4 only 4745685227.2 and 4745685227.1 ✔
276

4 only 4745685227.2 and 4745685227.1 ✔
277
        if env.ConnectionState != nil {
6 only 4745685227.2 and 4745685227.1 ✔
278
                s.env.MergeConnectionState(env.ConnectionState)
2 only 4745685227.2 and 4745685227.1 ✔
279
        }
2 only 4745685227.2 and 4745685227.1 ✔
280

281
        if env.ChannelStates != nil {
6 only 4745685227.2 and 4745685227.1 ✔
282
                states := *env.ChannelStates
2 only 4745685227.2 and 4745685227.1 ✔
283
                for id, state := range states { // #nosec
6 only 4745685227.2 and 4745685227.1 ✔
284
                        s.env.MergeChannelState(id, &state)
4 only 4745685227.2 and 4745685227.1 ✔
285
                }
4 only 4745685227.2 and 4745685227.1 ✔
286
        }
287
}
288

289
// WriteInternalState
290
func (s *Session) WriteInternalState(key string, val interface{}) {
×
291
        s.mu.Lock()
×
292
        defer s.mu.Unlock()
×
293

×
294
        if s.InternalState == nil {
×
295
                s.InternalState = make(map[string]interface{})
×
296
        }
×
297

298
        s.InternalState[key] = val
×
299
}
300

301
// ReadInternalState reads internal state value by key
302
func (s *Session) ReadInternalState(key string) (interface{}, bool) {
×
303
        s.mu.Lock()
×
304
        defer s.mu.Unlock()
×
305

×
306
        if s.InternalState == nil {
×
307
                return nil, false
×
308
        }
×
309

310
        val, ok := s.InternalState[key]
×
311

×
312
        return val, ok
×
313
}
314

315
// Serve enters a loop to read incoming data
316
func (s *Session) Serve(callback func()) error {
2,324 all except 4745685227.2 and 4745685227.1 ✔
317
        go func() {
4,648 all except 4745685227.2 and 4745685227.1 ✔
318
                defer callback()
2,324 all except 4745685227.2 and 4745685227.1 ✔
319

2,324 all except 4745685227.2 and 4745685227.1 ✔
320
                for {
7,985 all except 4745685227.2 and 4745685227.1 ✔
321
                        message, err := s.conn.Read()
5,661 all except 4745685227.2 and 4745685227.1 ✔
322

5,661 all except 4745685227.2 and 4745685227.1 ✔
323
                        if err != nil {
7,985 all except 4745685227.2 and 4745685227.1 ✔
324
                                if ws.IsCloseError(err) {
2,601 all except 4745685227.2 and 4745685227.1 ✔
325
                                        s.Log.Debugf("Websocket closed: %v", err)
277 all except 4745685227.2, 4745685227.3, 4745685227.10, and 4745685227.1 ✔
326
                                        s.disconnectNow("Read closed", ws.CloseNormalClosure)
277 all except 4745685227.2, 4745685227.3, 4745685227.10, and 4745685227.1 ✔
327
                                } else {
2,324 all except 4745685227.2 and 4745685227.1 ✔
328
                                        s.Log.Debugf("Websocket close error: %v", err)
2,047 all except 4745685227.2 and 4745685227.1 ✔
329
                                        s.disconnectNow("Read failed", ws.CloseAbnormalClosure)
2,047 all except 4745685227.2 and 4745685227.1 ✔
330
                                }
2,047 all except 4745685227.2 and 4745685227.1 ✔
331
                                return
2,324 all except 4745685227.2 and 4745685227.1 ✔
332
                        }
333

334
                        err = s.ReadMessage(message)
3,337 all except 4745685227.2 and 4745685227.1 ✔
335

3,337 all except 4745685227.2 and 4745685227.1 ✔
336
                        if err != nil {
3,337 all except 4745685227.2 and 4745685227.1 ✔
337
                                return
×
338
                        }
×
339
                }
340
        }()
341

342
        return nil
2,324 all except 4745685227.2 and 4745685227.1 ✔
343
}
344

345
// SendMessages waits for incoming messages and send them to the client connection
346
func (s *Session) SendMessages() {
2,404✔
347
        defer s.disconnectNow("Write Failed", ws.CloseAbnormalClosure)
2,404✔
348

2,404✔
349
        for message := range s.sendCh {
443,290✔
350
                err := s.writeFrame(message)
440,886✔
351

440,886✔
352
                if err != nil {
440,944✔
353
                        s.metrics.CounterIncrement(metricsFailedSent)
58 all except 4745685227.3 and 4745685227.10 ✔
354
                        return
58 all except 4745685227.3 and 4745685227.10 ✔
355
                }
58 all except 4745685227.3 and 4745685227.10 ✔
356

357
                s.metrics.CounterIncrement(metricsSentMsg)
440,828✔
358
        }
359
}
360

361
// ReadMessage reads messages from ws connection and send them to node
362
func (s *Session) ReadMessage(message []byte) error {
3,337 all except 4745685227.2 and 4745685227.1 ✔
363
        s.metrics.CounterAdd(metricsDataReceived, uint64(len(message)))
3,337 all except 4745685227.2 and 4745685227.1 ✔
364

3,337 all except 4745685227.2 and 4745685227.1 ✔
365
        command, err := s.decodeMessage(message)
3,337 all except 4745685227.2 and 4745685227.1 ✔
366

3,337 all except 4745685227.2 and 4745685227.1 ✔
367
        if err != nil {
3,337 all except 4745685227.2 and 4745685227.1 ✔
368
                s.metrics.CounterIncrement(metricsFailedCommandReceived)
×
369
                return err
×
370
        }
×
371

372
        if command == nil {
3,337 all except 4745685227.2 and 4745685227.1 ✔
373
                return nil
×
374
        }
×
375

376
        s.metrics.CounterIncrement(metricsReceivedMsg)
3,337 all except 4745685227.2 and 4745685227.1 ✔
377

3,337 all except 4745685227.2 and 4745685227.1 ✔
378
        if err := s.executor.HandleCommand(s, command); err != nil {
3,337 all except 4745685227.2 and 4745685227.1 ✔
379
                s.metrics.CounterIncrement(metricsFailedCommandReceived)
×
380
                s.Log.Warnf("Failed to handle incoming message '%s' with error: %v", message, err)
×
381
        }
×
382

383
        return nil
3,337 all except 4745685227.2 and 4745685227.1 ✔
384
}
385

386
// Send schedules a data transmission
387
func (s *Session) Send(msg encoders.EncodedMessage) {
435,048✔
388
        if b, err := s.encodeMessage(msg); err == nil {
870,096✔
389
                if b != nil {
870,096✔
390
                        s.sendFrame(b)
435,048✔
391
                }
435,048✔
392
        } else {
×
393
                s.Log.Warnf("Failed to encode message %v. Error: %v", msg, err)
×
394
        }
×
395
}
396

397
// SendJSONTransmission is used to propagate the direct transmission to the client
398
// (from RPC call result)
399
func (s *Session) SendJSONTransmission(msg string) {
5,678✔
400
        if b, err := s.encodeTransmission(msg); err == nil {
11,356✔
401
                if b != nil {
11,356✔
402
                        s.sendFrame(b)
5,678✔
403
                }
5,678✔
404
        } else {
×
405
                s.Log.Warnf("Failed to encode transmission %v. Error: %v", msg, err)
×
406
        }
×
407
}
408

409
// Disconnect schedules connection disconnect
410
func (s *Session) Disconnect(reason string, code int) {
58 all except 4745685227.3 and 4745685227.10 ✔
411
        s.disconnectFromNode()
58 all except 4745685227.3 and 4745685227.10 ✔
412
        s.sendClose(reason, code)
58 all except 4745685227.3 and 4745685227.10 ✔
413
        s.close()
58 all except 4745685227.3 and 4745685227.10 ✔
414
}
58 all except 4745685227.3 and 4745685227.10 ✔
415

416
func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string) {
26 all except 4745685227.3 and 4745685227.10 ✔
417
        s.Send(msg)
26 all except 4745685227.3 and 4745685227.10 ✔
418

26 all except 4745685227.3 and 4745685227.10 ✔
419
        reason := ""
26 all except 4745685227.3 and 4745685227.10 ✔
420
        wsCode := ws.CloseNormalClosure
26 all except 4745685227.3 and 4745685227.10 ✔
421

26 all except 4745685227.3 and 4745685227.10 ✔
422
        switch code {
26 all except 4745685227.3 and 4745685227.10 ✔
423
        case common.SERVER_RESTART_REASON:
16 all except 4745685227.3 and 4745685227.10 ✔
424
                reason = "Server restart"
16 all except 4745685227.3 and 4745685227.10 ✔
425
                wsCode = ws.CloseGoingAway
16 all except 4745685227.3 and 4745685227.10 ✔
426
        case common.REMOTE_DISCONNECT_REASON:
10 all except 4745685227.3 and 4745685227.10 ✔
427
                reason = "Closed remotely"
10 all except 4745685227.3 and 4745685227.10 ✔
428
        }
429

430
        s.Disconnect(reason, wsCode)
26 all except 4745685227.3 and 4745685227.10 ✔
431
}
432

433
type cacheEntry struct {
434
        Identifiers     string                       `json:"ids"`
435
        Subscriptions   map[string][]string          `json:"subs"`
436
        ConnectionState map[string]string            `json:"cstate"`
437
        ChannelsState   map[string]map[string]string `json:"istate"`
438
}
439

440
func (s *Session) ToCacheEntry() ([]byte, error) {
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
441
        s.smu.Lock()
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
442
        defer s.smu.Unlock()
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
443

282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
444
        entry := cacheEntry{
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
445
                Identifiers:     s.GetIdentifiers(),
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
446
                Subscriptions:   s.subscriptions.ToMap(),
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
447
                ConnectionState: *s.env.ConnectionState,
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
448
                ChannelsState:   *s.env.ChannelStates,
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
449
        }
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
450

282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
451
        return json.Marshal(&entry)
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
452
}
282 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
453

454
func (s *Session) RestoreFromCache(cached []byte) error {
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
455
        var entry cacheEntry
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
456

13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
457
        err := json.Unmarshal(cached, &entry)
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
458

13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
459
        if err != nil {
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
460
                return err
×
461
        }
×
462

463
        s.smu.Lock()
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
464
        defer s.smu.Unlock()
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
465

13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
466
        s.SetIdentifiers(entry.Identifiers)
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
467
        s.env.MergeConnectionState(&entry.ConnectionState)
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
468

13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
469
        for k := range entry.ChannelsState {
21 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
470
                v := entry.ChannelsState[k]
8 only 4745685227.2 and 4745685227.1 ✔
471
                s.env.MergeChannelState(k, &v)
8 only 4745685227.2 and 4745685227.1 ✔
472
        }
8 only 4745685227.2 and 4745685227.1 ✔
473

474
        for k, v := range entry.Subscriptions {
28 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
475
                s.subscriptions.AddChannel(k)
15 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
476

15 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
477
                for _, stream := range v {
38 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
478
                        s.subscriptions.AddChannelStream(k, stream)
23 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
479
                }
23 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
480
        }
481

482
        return nil
13 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
483
}
484

485
func (s *Session) PrevSid() (psid string) {
2,356✔
486
        if s.env.Headers != nil {
4,700✔
487
                if v, ok := (*s.env.Headers)[prevSessionHeader]; ok {
2,351✔
488
                        psid = v
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
489
                        // This header is of one-time use,
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
490
                        // no need to leak it to the RPC app
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
491
                        delete(*s.env.Headers, prevSessionHeader)
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
492
                        return
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
493
                }
7 only 4745685227.2, 4745685227.9, 4745685227.12, 4745685227.11, and 4745685227.1 ✔
494
        }
495

496
        u, err := url.Parse(s.env.URL)
2,349✔
497

2,349✔
498
        if err != nil {
2,349✔
499
                return
×
500
        }
×
501

502
        m, err := url.ParseQuery(u.RawQuery)
2,349✔
503

2,349✔
504
        if err != nil {
2,349✔
505
                return
×
506
        }
×
507

508
        if v, ok := m[prevSessionParam]; ok {
2,359✔
509
                psid = v[0]
10 only 4745685227.2 and 4745685227.1 ✔
510
        }
10 only 4745685227.2 and 4745685227.1 ✔
511

512
        return
2,349✔
513
}
514

515
func (s *Session) disconnectFromNode() {
4,724✔
516
        s.mu.Lock()
4,724✔
517
        if s.Connected {
7,036✔
518
                defer s.executor.Disconnect(s) // nolint:errcheck
2,312✔
519
        }
2,312✔
520
        s.Connected = false
4,724✔
521
        s.mu.Unlock()
4,724✔
522
}
523

524
func (s *Session) disconnectNow(reason string, code int) {
4,666✔
525
        s.disconnectFromNode()
4,666✔
526
        s.writeFrame(&ws.SentFrame{ // nolint:errcheck
4,666✔
527
                FrameType:   ws.CloseFrame,
4,666✔
528
                CloseReason: reason,
4,666✔
529
                CloseCode:   code,
4,666✔
530
        })
4,666✔
531

4,666✔
532
        s.mu.Lock()
4,666✔
533
        if s.sendCh != nil {
7,008✔
534
                close(s.sendCh)
2,342✔
535
                s.sendCh = nil
2,342✔
536
        }
2,342✔
537
        s.mu.Unlock()
4,666✔
538

4,666✔
539
        s.close()
4,666✔
540
}
541

542
func (s *Session) close() {
4,724✔
543
        s.mu.Lock()
4,724✔
544

4,724✔
545
        if s.closed {
7,122✔
546
                s.mu.Unlock()
2,398✔
547
                return
2,398✔
548
        }
2,398✔
549

550
        s.closed = true
2,326✔
551
        defer s.mu.Unlock()
2,326✔
552

2,326✔
553
        if s.pingTimer != nil {
4,650✔
554
                s.pingTimer.Stop()
2,324 all except 4745685227.2 and 4745685227.1 ✔
555
        }
2,324 all except 4745685227.2 and 4745685227.1 ✔
556
}
557

558
func (s *Session) sendClose(reason string, code int) {
58 all except 4745685227.3 and 4745685227.10 ✔
559
        s.sendFrame(&ws.SentFrame{
58 all except 4745685227.3 and 4745685227.10 ✔
560
                FrameType:   ws.CloseFrame,
58 all except 4745685227.3 and 4745685227.10 ✔
561
                CloseReason: reason,
58 all except 4745685227.3 and 4745685227.10 ✔
562
                CloseCode:   code,
58 all except 4745685227.3 and 4745685227.10 ✔
563
        })
58 all except 4745685227.3 and 4745685227.10 ✔
564
}
58 all except 4745685227.3 and 4745685227.10 ✔
565

566
func (s *Session) sendFrame(message *ws.SentFrame) {
440,886✔
567
        s.mu.Lock()
440,886✔
568

440,886✔
569
        if s.sendCh == nil {
440,886✔
570
                s.mu.Unlock()
×
571
                return
×
572
        }
×
573

574
        select {
440,886✔
575
        case s.sendCh <- message:
440,886✔
576
        default:
×
577
                if s.sendCh != nil {
×
578
                        close(s.sendCh)
×
579
                        defer s.Disconnect("Write failed", ws.CloseAbnormalClosure)
×
580
                }
×
581

582
                s.sendCh = nil
×
583
        }
584

585
        s.mu.Unlock()
440,886✔
586
}
587

588
func (s *Session) writeFrame(message *ws.SentFrame) error {
445,552✔
589
        return s.writeFrameWithDeadline(message, time.Now().Add(writeWait))
445,552✔
590
}
445,552✔
591

592
func (s *Session) writeFrameWithDeadline(message *ws.SentFrame, deadline time.Time) error {
456,776✔
593
        s.metrics.CounterAdd(metricsDataSent, uint64(len(message.Payload)))
456,776✔
594

456,776✔
595
        switch message.FrameType {
456,776✔
596
        case ws.TextFrame:
452,052✔
597
                s.mu.Lock()
452,052✔
598
                defer s.mu.Unlock()
452,052✔
599

452,052✔
600
                err := s.conn.Write(message.Payload, deadline)
452,052✔
601
                return err
452,052✔
602
        case ws.BinaryFrame:
×
603
                s.mu.Lock()
×
604
                defer s.mu.Unlock()
×
605

×
606
                err := s.conn.WriteBinary(message.Payload, deadline)
×
607

×
608
                return err
×
609
        case ws.CloseFrame:
4,724✔
610
                s.conn.Close(message.CloseCode, message.CloseReason)
4,724✔
611
                return errors.New("Closed")
4,724✔
612
        default:
×
613
                s.Log.Errorf("Unknown frame type: %v", message)
×
614
                return errors.New("Unknown frame type")
×
615
        }
616
}
617

618
func (s *Session) sendPing() {
11,224 all except 4745685227.2 and 4745685227.1 ✔
619
        s.mu.Lock()
11,224 all except 4745685227.2 and 4745685227.1 ✔
620
        if s.closed {
11,224 all except 4745685227.2 and 4745685227.1 ✔
621
                s.mu.Unlock()
×
622
                return
×
623
        }
×
624
        s.mu.Unlock()
11,224 all except 4745685227.2 and 4745685227.1 ✔
625

11,224 all except 4745685227.2 and 4745685227.1 ✔
626
        deadline := time.Now().Add(s.pingInterval / 2)
11,224 all except 4745685227.2 and 4745685227.1 ✔
627

11,224 all except 4745685227.2 and 4745685227.1 ✔
628
        b, err := s.encodeMessage(newPingMessage(s.pingTimestampPrecision))
11,224 all except 4745685227.2 and 4745685227.1 ✔
629

11,224 all except 4745685227.2 and 4745685227.1 ✔
630
        if err != nil {
11,224 all except 4745685227.2 and 4745685227.1 ✔
631
                s.Log.Errorf("Failed to encode ping message: %v", err)
×
632
        } else if b != nil {
22,448 all except 4745685227.2 and 4745685227.1 ✔
633
                err = s.writeFrameWithDeadline(b, deadline)
11,224 all except 4745685227.2 and 4745685227.1 ✔
634
        }
11,224 all except 4745685227.2 and 4745685227.1 ✔
635

636
        if err != nil {
11,224 all except 4745685227.2 and 4745685227.1 ✔
637
                s.Disconnect("Ping failed", ws.CloseAbnormalClosure)
×
638
                return
×
639
        }
×
640

641
        s.addPing()
11,224 all except 4745685227.2 and 4745685227.1 ✔
642
}
643

644
func (s *Session) addPing() {
13,548 all except 4745685227.2 and 4745685227.1 ✔
645
        s.mu.Lock()
13,548 all except 4745685227.2 and 4745685227.1 ✔
646
        defer s.mu.Unlock()
13,548 all except 4745685227.2 and 4745685227.1 ✔
647

13,548 all except 4745685227.2 and 4745685227.1 ✔
648
        s.pingTimer = time.AfterFunc(s.pingInterval, s.sendPing)
13,548 all except 4745685227.2 and 4745685227.1 ✔
649
}
13,548 all except 4745685227.2 and 4745685227.1 ✔
650

651
func newPingMessage(format string) *common.PingMessage {
11,224 all except 4745685227.2 and 4745685227.1 ✔
652
        var ts int64
11,224 all except 4745685227.2 and 4745685227.1 ✔
653

11,224 all except 4745685227.2 and 4745685227.1 ✔
654
        switch format {
11,224 all except 4745685227.2 and 4745685227.1 ✔
655
        case "ns":
×
656
                ts = time.Now().UnixNano()
×
657
        case "ms":
×
658
                ts = time.Now().UnixNano() / int64(time.Millisecond)
×
659
        default:
11,224 all except 4745685227.2 and 4745685227.1 ✔
660
                ts = time.Now().Unix()
11,224 all except 4745685227.2 and 4745685227.1 ✔
661
        }
662

663
        return (&common.PingMessage{Type: "ping", Message: ts})
11,224 all except 4745685227.2 and 4745685227.1 ✔
664
}
665

666
func (s *Session) encodeMessage(msg encoders.EncodedMessage) (*ws.SentFrame, error) {
446,272✔
667
        if cm, ok := msg.(*encoders.CachedEncodedMessage); ok {
881,215✔
668
                return cm.Fetch(
434,943✔
669
                        s.encoder.ID(),
434,943✔
670
                        func(m encoders.EncodedMessage) (*ws.SentFrame, error) {
436,147✔
671
                                return s.encoder.Encode(m)
1,204✔
672
                        })
1,204✔
673
        }
674

675
        return s.encoder.Encode(msg)
11,329✔
676
}
677

678
func (s *Session) encodeTransmission(msg string) (*ws.SentFrame, error) {
5,678✔
679
        return s.encoder.EncodeTransmission(msg)
5,678✔
680
}
5,678✔
681

682
func (s *Session) decodeMessage(raw []byte) (*common.Message, error) {
3,337 all except 4745685227.2 and 4745685227.1 ✔
683
        return s.encoder.Decode(raw)
3,337 all except 4745685227.2 and 4745685227.1 ✔
684
}
3,337 all except 4745685227.2 and 4745685227.1 ✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc