• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

anycable / anycable-go / 8885188650

29 Apr 2024 08:45PM UTC coverage: 79.678% (-0.1%) from 79.824%
8885188650

push

github

palkan
fix(docs): correct http rpc link

7677 of 9635 relevant lines covered (79.68%)

5127.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.11
/node/session.go
1
package node
2

3
import (
4
        "encoding/json"
5
        "errors"
6
        "fmt"
7
        "log/slog"
8
        "math/rand"
9
        "sync"
10
        "time"
11

12
        "github.com/anycable/anycable-go/common"
13
        "github.com/anycable/anycable-go/encoders"
14
        "github.com/anycable/anycable-go/logger"
15
        "github.com/anycable/anycable-go/metrics"
16
        "github.com/anycable/anycable-go/ws"
17
)
18

19
const (
20
        writeWait = 10 * time.Second
21
)
22

23
// Executor handles incoming commands (messages)
24
type Executor interface {
25
        HandleCommand(*Session, *common.Message) error
26
        Disconnect(*Session) error
27
}
28

29
// Session represents active client
30
type Session struct {
31
        conn          Connection
32
        uid           string
33
        encoder       encoders.Encoder
34
        executor      Executor
35
        metrics       metrics.Instrumenter
36
        env           *common.SessionEnv
37
        subscriptions *SubscriptionState
38
        closed        bool
39

40
        // Defines if we should perform Disconnect RPC for this session
41
        disconnectInterest bool
42

43
        // Main mutex (for read/write and important session updates)
44
        mu sync.Mutex
45
        // Mutex for protocol-related state (env, subscriptions)
46
        smu sync.Mutex
47

48
        sendCh chan *ws.SentFrame
49

50
        pingTimer    *time.Timer
51
        pingInterval time.Duration
52

53
        pingTimestampPrecision string
54

55
        handshakeDeadline time.Time
56

57
        pongTimeout time.Duration
58
        pongTimer   *time.Timer
59

60
        resumable bool
61
        prevSid   string
62

63
        Connected bool
64
        // Could be used to store arbitrary data within a session
65
        InternalState map[string]interface{}
66
        Log           *slog.Logger
67
}
68

69
type SessionOption = func(*Session)
70

71
// WithPingInterval allows to set a custom ping interval for a session
72
// or disable pings at all (by passing 0)
73
func WithPingInterval(interval time.Duration) SessionOption {
2✔
74
        return func(s *Session) {
4✔
75
                s.pingInterval = interval
2✔
76
        }
2✔
77
}
78

79
// WithPingPrecision allows to configure precision for timestamps attached to pings
80
func WithPingPrecision(val string) SessionOption {
1✔
81
        return func(s *Session) {
2✔
82
                s.pingTimestampPrecision = val
1✔
83
        }
1✔
84
}
85

86
// WithEncoder allows to set a custom encoder for a session
87
func WithEncoder(enc encoders.Encoder) SessionOption {
3✔
88
        return func(s *Session) {
6✔
89
                s.encoder = enc
3✔
90
        }
3✔
91
}
92

93
// WithExecutor allows to set a custom executor for a session
94
func WithExecutor(ex Executor) SessionOption {
×
95
        return func(s *Session) {
×
96
                s.executor = ex
×
97
        }
×
98
}
99

100
// WithHandshakeMessageDeadline allows to set a custom deadline for handshake messages.
101
// This option also indicates that we MUST NOT perform Authenticate on connect.
102
func WithHandshakeMessageDeadline(deadline time.Time) SessionOption {
×
103
        return func(s *Session) {
×
104
                s.handshakeDeadline = deadline
×
105
        }
×
106
}
107

108
// WithMetrics allows to set a custom metrics instrumenter for a session
109
func WithMetrics(m metrics.Instrumenter) SessionOption {
×
110
        return func(s *Session) {
×
111
                s.metrics = m
×
112
        }
×
113
}
114

115
// WithResumable allows marking session as resumable (so we store its state in cache)
116
func WithResumable(val bool) SessionOption {
23✔
117
        return func(s *Session) {
46✔
118
                s.resumable = val
23✔
119
        }
23✔
120
}
121

122
// WithPrevSID allows providing the previous session ID to restore from
123
func WithPrevSID(sid string) SessionOption {
13✔
124
        return func(s *Session) {
26✔
125
                s.prevSid = sid
13✔
126
        }
13✔
127
}
128

129
// WithPongTimeout allows to set a custom pong timeout for a session
130
func WithPongTimeout(timeout time.Duration) SessionOption {
1✔
131
        return func(s *Session) {
2✔
132
                s.pongTimeout = timeout
1✔
133
        }
1✔
134
}
135

136
// NewSession build a new Session struct from ws connetion and http request
137
func NewSession(node *Node, conn Connection, url string, headers *map[string]string, uid string, opts ...SessionOption) *Session {
2,428✔
138
        session := &Session{
2,428✔
139
                conn:                   conn,
2,428✔
140
                metrics:                node.metrics,
2,428✔
141
                env:                    common.NewSessionEnv(url, headers),
2,428✔
142
                subscriptions:          NewSubscriptionState(),
2,428✔
143
                sendCh:                 make(chan *ws.SentFrame, 256),
2,428✔
144
                closed:                 false,
2,428✔
145
                Connected:              false,
2,428✔
146
                pingInterval:           time.Duration(node.config.PingInterval) * time.Second,
2,428✔
147
                pingTimestampPrecision: node.config.PingTimestampPrecision,
2,428✔
148
                // Use JSON by default
2,428✔
149
                encoder: encoders.JSON{},
2,428✔
150
                // Use Action Cable executor by default (implemented by node)
2,428✔
151
                executor: node,
2,428✔
152
        }
2,428✔
153

2,428✔
154
        session.uid = uid
2,428✔
155

2,428✔
156
        ctx := node.log.With("sid", session.uid)
2,428✔
157

2,428✔
158
        session.Log = ctx
2,428✔
159

2,428✔
160
        for _, opt := range opts {
2,445✔
161
                opt(session)
17✔
162
        }
17✔
163

164
        if session.pingInterval > 0 {
4,856✔
165
                session.startPing()
2,428✔
166
        }
2,428✔
167

168
        if !session.handshakeDeadline.IsZero() {
2,428✔
169
                val := time.Until(session.handshakeDeadline)
×
170
                time.AfterFunc(val, session.maybeDisconnectIdle)
×
171
        }
×
172

173
        go session.SendMessages()
2,428✔
174

2,428✔
175
        return session
2,428✔
176
}
177

178
func (s *Session) GetEnv() *common.SessionEnv {
13✔
179
        return s.env
13✔
180
}
13✔
181

182
func (s *Session) SetEnv(env *common.SessionEnv) {
2✔
183
        s.env = env
2✔
184
}
2✔
185

186
func (s *Session) UnderlyingConn() Connection {
3✔
187
        return s.conn
3✔
188
}
3✔
189

190
func (s *Session) AuthenticateOnConnect() bool {
2,425✔
191
        return s.handshakeDeadline.IsZero()
2,425✔
192
}
2,425✔
193

194
func (s *Session) IsConnected() bool {
28✔
195
        s.mu.Lock()
28✔
196
        defer s.mu.Unlock()
28✔
197

28✔
198
        return s.Connected
28✔
199
}
28✔
200

201
func (s *Session) IsResumeable() bool {
9,789✔
202
        return s.resumable
9,789✔
203
}
9,789✔
204

205
func (s *Session) maybeDisconnectIdle() {
×
206
        s.mu.Lock()
×
207

×
208
        if s.Connected {
×
209
                s.mu.Unlock()
×
210
                return
×
211
        }
×
212

213
        s.mu.Unlock()
×
214

×
215
        s.Log.Warn("disconnecting idle session")
×
216

×
217
        s.Send(common.NewDisconnectMessage(common.IDLE_TIMEOUT_REASON, false))
×
218
        s.Disconnect("Idle Timeout", ws.CloseNormalClosure)
×
219
}
220

221
func (s *Session) GetID() string {
18,184✔
222
        return s.uid
18,184✔
223
}
18,184✔
224

225
func (s *Session) SetID(id string) {
×
226
        s.uid = id
×
227
}
×
228

229
func (s *Session) GetIdentifiers() string {
10,789✔
230
        return s.env.Identifiers
10,789✔
231
}
10,789✔
232

233
func (s *Session) SetIdentifiers(ids string) {
2,538✔
234
        s.env.Identifiers = ids
2,538✔
235
}
2,538✔
236

237
// Merge connection and channel states into current env.
238
// This method locks the state for writing (so, goroutine-safe)
239
func (s *Session) MergeEnv(env *common.SessionEnv) {
4✔
240
        s.smu.Lock()
4✔
241
        defer s.smu.Unlock()
4✔
242

4✔
243
        if env.ConnectionState != nil {
6✔
244
                s.env.MergeConnectionState(env.ConnectionState)
2✔
245
        }
2✔
246

247
        if env.ChannelStates != nil {
6✔
248
                states := *env.ChannelStates
2✔
249
                for id, state := range states { // #nosec
6✔
250
                        s.env.MergeChannelState(id, &state)
4✔
251
                }
4✔
252
        }
253
}
254

255
// WriteInternalState
256
func (s *Session) WriteInternalState(key string, val interface{}) {
×
257
        s.mu.Lock()
×
258
        defer s.mu.Unlock()
×
259

×
260
        if s.InternalState == nil {
×
261
                s.InternalState = make(map[string]interface{})
×
262
        }
×
263

264
        s.InternalState[key] = val
×
265
}
266

267
// ReadInternalState reads internal state value by key
268
func (s *Session) ReadInternalState(key string) (interface{}, bool) {
×
269
        s.mu.Lock()
×
270
        defer s.mu.Unlock()
×
271

×
272
        if s.InternalState == nil {
×
273
                return nil, false
×
274
        }
×
275

276
        val, ok := s.InternalState[key]
×
277

×
278
        return val, ok
×
279
}
280

281
func (s *Session) IsDisconnectable() bool {
2,436✔
282
        s.mu.Lock()
2,436✔
283
        defer s.mu.Unlock()
2,436✔
284

2,436✔
285
        return s.disconnectInterest
2,436✔
286
}
2,436✔
287

288
func (s *Session) MarkDisconnectable(val bool) {
4,865✔
289
        s.mu.Lock()
4,865✔
290
        defer s.mu.Unlock()
4,865✔
291

4,865✔
292
        s.disconnectInterest = s.disconnectInterest || val
4,865✔
293
}
4,865✔
294

295
// Serve enters a loop to read incoming data
296
func (s *Session) Serve(callback func()) error {
2,425✔
297
        go func() {
4,850✔
298
                defer callback()
2,425✔
299

2,425✔
300
                for {
8,302✔
301
                        if s.IsClosed() {
5,905✔
302
                                return
28✔
303
                        }
28✔
304

305
                        message, err := s.conn.Read()
5,849✔
306

5,849✔
307
                        if err != nil {
8,246✔
308
                                if ws.IsCloseError(err) {
2,748✔
309
                                        s.Log.Debug("WebSocket closed", "error", err)
351✔
310
                                        s.disconnectNow("Read closed", ws.CloseNormalClosure)
351✔
311
                                } else {
2,397✔
312
                                        s.Log.Debug("WebSocket close error", "error", err)
2,046✔
313
                                        s.disconnectNow("Read failed", ws.CloseAbnormalClosure)
2,046✔
314
                                }
2,046✔
315
                                return
2,397✔
316
                        }
317

318
                        err = s.ReadMessage(message)
3,452✔
319

3,452✔
320
                        if err != nil {
3,452✔
321
                                s.Log.Debug("WebSocket read failed", "error", err)
×
322
                                return
×
323
                        }
×
324
                }
325
        }()
326

327
        return nil
2,425✔
328
}
329

330
// SendMessages waits for incoming messages and send them to the client connection
331
func (s *Session) SendMessages() {
2,526✔
332
        for message := range s.sendCh {
465,835✔
333
                err := s.writeFrame(message)
463,309✔
334

463,309✔
335
                if message.FrameType == ws.CloseFrame {
463,389✔
336
                        s.disconnectNow("Close frame sent", ws.CloseNormalClosure)
80✔
337
                        return
80✔
338
                }
80✔
339

340
                if err != nil {
463,229✔
341
                        s.metrics.CounterIncrement(metricsFailedSent)
×
342
                        s.disconnectNow("Write Failed", ws.CloseAbnormalClosure)
×
343
                        return
×
344
                }
×
345

346
                s.metrics.CounterIncrement(metricsSentMsg)
463,229✔
347
        }
348
}
349

350
// ReadMessage reads messages from ws connection and send them to node
351
func (s *Session) ReadMessage(message []byte) error {
3,452✔
352
        s.metrics.CounterAdd(metricsDataReceived, uint64(len(message)))
3,452✔
353

3,452✔
354
        command, err := s.decodeMessage(message)
3,452✔
355

3,452✔
356
        if err != nil {
3,452✔
357
                s.metrics.CounterIncrement(metricsFailedCommandReceived)
×
358
                return err
×
359
        }
×
360

361
        if command == nil {
3,452✔
362
                return nil
×
363
        }
×
364

365
        s.metrics.CounterIncrement(metricsReceivedMsg)
3,452✔
366

3,452✔
367
        if err := s.executor.HandleCommand(s, command); err != nil {
3,452✔
368
                s.metrics.CounterIncrement(metricsFailedCommandReceived)
×
369
                s.Log.Warn("failed to handle incoming message", "data", logger.CompactValue(message), "error", err)
×
370
        }
×
371

372
        return nil
3,452✔
373
}
374

375
// Send schedules a data transmission
376
func (s *Session) Send(msg encoders.EncodedMessage) {
457,220✔
377
        if b, err := s.encodeMessage(msg); err == nil {
914,440✔
378
                if b != nil {
914,440✔
379
                        s.sendFrame(b)
457,220✔
380
                }
457,220✔
381
        } else {
×
382
                s.Log.Warn("failed to encode message", "data", msg, "error", err)
×
383
        }
×
384
}
385

386
// SendJSONTransmission is used to propagate the direct transmission to the client
387
// (from RPC call result)
388
func (s *Session) SendJSONTransmission(msg string) {
5,907✔
389
        if b, err := s.encodeTransmission(msg); err == nil {
11,814✔
390
                if b != nil {
11,814✔
391
                        s.sendFrame(b)
5,907✔
392
                }
5,907✔
393
        } else {
×
394
                s.Log.Warn("failed to encode transmission", "data", logger.CompactValue(msg), "error", err)
×
395
        }
×
396
}
397

398
// Disconnect schedules connection disconnect
399
func (s *Session) Disconnect(reason string, code int) {
83✔
400
        s.sendClose(reason, code)
83✔
401
        s.close()
83✔
402
        s.disconnectFromNode()
83✔
403
}
83✔
404

405
func (s *Session) DisconnectWithMessage(msg encoders.EncodedMessage, code string) {
41✔
406
        s.Send(msg)
41✔
407

41✔
408
        reason := ""
41✔
409
        wsCode := ws.CloseNormalClosure
41✔
410

41✔
411
        switch code {
41✔
412
        case common.SERVER_RESTART_REASON:
30✔
413
                reason = "Server restart"
30✔
414
                wsCode = ws.CloseGoingAway
30✔
415
        case common.REMOTE_DISCONNECT_REASON:
11✔
416
                reason = "Closed remotely"
11✔
417
        }
418

419
        s.Disconnect(reason, wsCode)
41✔
420
}
421

422
// String returns session string representation (for %v in Printf-like functions)
423
func (s *Session) String() string {
2✔
424
        return fmt.Sprintf("Session(%s)", s.uid)
2✔
425
}
2✔
426

427
type cacheEntry struct {
428
        Identifiers     string                       `json:"ids"`
429
        Subscriptions   map[string][]string          `json:"subs"`
430
        ConnectionState map[string]string            `json:"cstate"`
431
        ChannelsState   map[string]map[string]string `json:"istate"`
432
        Disconnectable  bool
433
}
434

435
func (s *Session) ToCacheEntry() ([]byte, error) {
35✔
436
        s.smu.Lock()
35✔
437
        defer s.smu.Unlock()
35✔
438

35✔
439
        entry := cacheEntry{
35✔
440
                Identifiers:     s.GetIdentifiers(),
35✔
441
                Subscriptions:   s.subscriptions.ToMap(),
35✔
442
                ConnectionState: *s.env.ConnectionState,
35✔
443
                ChannelsState:   *s.env.ChannelStates,
35✔
444
                Disconnectable:  s.disconnectInterest,
35✔
445
        }
35✔
446

35✔
447
        return json.Marshal(&entry)
35✔
448
}
35✔
449

450
func (s *Session) RestoreFromCache(cached []byte) error {
13✔
451
        var entry cacheEntry
13✔
452

13✔
453
        err := json.Unmarshal(cached, &entry)
13✔
454

13✔
455
        if err != nil {
13✔
456
                return err
×
457
        }
×
458

459
        s.smu.Lock()
13✔
460
        defer s.smu.Unlock()
13✔
461

13✔
462
        s.MarkDisconnectable(entry.Disconnectable)
13✔
463
        s.SetIdentifiers(entry.Identifiers)
13✔
464
        s.env.MergeConnectionState(&entry.ConnectionState)
13✔
465

13✔
466
        for k := range entry.ChannelsState {
21✔
467
                v := entry.ChannelsState[k]
8✔
468
                s.env.MergeChannelState(k, &v)
8✔
469
        }
8✔
470

471
        for k, v := range entry.Subscriptions {
30✔
472
                s.subscriptions.AddChannel(k)
17✔
473

17✔
474
                for _, stream := range v {
42✔
475
                        s.subscriptions.AddChannelStream(k, stream)
25✔
476
                }
25✔
477
        }
478

479
        return nil
13✔
480
}
481

482
func (s *Session) PrevSid() string {
21✔
483
        return s.prevSid
21✔
484
}
21✔
485

486
func (s *Session) disconnectFromNode() {
2,459✔
487
        s.mu.Lock()
2,459✔
488
        if s.Connected {
4,881✔
489
                defer s.executor.Disconnect(s) // nolint:errcheck
2,422✔
490
        }
2,422✔
491
        s.Connected = false
2,459✔
492
        s.mu.Unlock()
2,459✔
493
}
494

495
func (s *Session) DisconnectNow(reason string, code int) {
1✔
496
        s.disconnectNow(reason, code)
1✔
497
}
1✔
498

499
func (s *Session) disconnectNow(reason string, code int) {
2,478✔
500
        s.mu.Lock()
2,478✔
501
        if !s.Connected {
2,580✔
502
                s.mu.Unlock()
102✔
503
                return
102✔
504
        }
102✔
505
        s.mu.Unlock()
2,376✔
506

2,376✔
507
        s.disconnectFromNode()
2,376✔
508
        s.writeFrame(&ws.SentFrame{ // nolint:errcheck
2,376✔
509
                FrameType:   ws.CloseFrame,
2,376✔
510
                CloseReason: reason,
2,376✔
511
                CloseCode:   code,
2,376✔
512
        })
2,376✔
513

2,376✔
514
        s.mu.Lock()
2,376✔
515
        if s.sendCh != nil {
4,752✔
516
                close(s.sendCh)
2,376✔
517
                s.sendCh = nil
2,376✔
518
        }
2,376✔
519
        s.mu.Unlock()
2,376✔
520

2,376✔
521
        s.close()
2,376✔
522
}
523

524
func (s *Session) close() {
2,459✔
525
        s.mu.Lock()
2,459✔
526

2,459✔
527
        if s.closed {
2,488✔
528
                s.mu.Unlock()
29✔
529
                return
29✔
530
        }
29✔
531

532
        s.closed = true
2,430✔
533
        defer s.mu.Unlock()
2,430✔
534

2,430✔
535
        if s.pingTimer != nil {
4,858✔
536
                s.pingTimer.Stop()
2,428✔
537
        }
2,428✔
538

539
        if s.pongTimer != nil {
2,431✔
540
                s.pongTimer.Stop()
1✔
541
        }
1✔
542
}
543

544
func (s *Session) IsClosed() bool {
5,877✔
545
        s.mu.Lock()
5,877✔
546
        defer s.mu.Unlock()
5,877✔
547

5,877✔
548
        return s.closed
5,877✔
549
}
5,877✔
550

551
func (s *Session) sendClose(reason string, code int) {
83✔
552
        s.sendFrame(&ws.SentFrame{
83✔
553
                FrameType:   ws.CloseFrame,
83✔
554
                CloseReason: reason,
83✔
555
                CloseCode:   code,
83✔
556
        })
83✔
557
}
83✔
558

559
func (s *Session) sendFrame(message *ws.SentFrame) {
463,312✔
560
        s.mu.Lock()
463,312✔
561

463,312✔
562
        if s.sendCh == nil {
463,313✔
563
                s.mu.Unlock()
1✔
564
                return
1✔
565
        }
1✔
566

567
        select {
463,311✔
568
        case s.sendCh <- message:
463,311✔
569
        default:
×
570
                if s.sendCh != nil {
×
571
                        close(s.sendCh)
×
572
                        defer s.Disconnect("Write failed", ws.CloseAbnormalClosure)
×
573
                }
×
574

575
                s.sendCh = nil
×
576
        }
577

578
        s.mu.Unlock()
463,311✔
579
}
580

581
func (s *Session) writeFrame(message *ws.SentFrame) error {
465,685✔
582
        return s.writeFrameWithDeadline(message, time.Now().Add(writeWait))
465,685✔
583
}
465,685✔
584

585
func (s *Session) writeFrameWithDeadline(message *ws.SentFrame, deadline time.Time) error {
475,362✔
586
        s.metrics.CounterAdd(metricsDataSent, uint64(len(message.Payload)))
475,362✔
587

475,362✔
588
        switch message.FrameType {
475,362✔
589
        case ws.TextFrame:
472,906✔
590
                s.mu.Lock()
472,906✔
591
                defer s.mu.Unlock()
472,906✔
592

472,906✔
593
                err := s.conn.Write(message.Payload, deadline)
472,906✔
594
                return err
472,906✔
595
        case ws.BinaryFrame:
×
596
                s.mu.Lock()
×
597
                defer s.mu.Unlock()
×
598

×
599
                err := s.conn.WriteBinary(message.Payload, deadline)
×
600

×
601
                return err
×
602
        case ws.CloseFrame:
2,456✔
603
                s.conn.Close(message.CloseCode, message.CloseReason)
2,456✔
604
                return errors.New("closed")
2,456✔
605
        default:
×
606
                s.Log.Error("unknown frame type", "msg", message)
×
607
                return errors.New("unknown frame type")
×
608
        }
609
}
610

611
func (s *Session) sendPing() {
9,677✔
612
        s.mu.Lock()
9,677✔
613
        if s.closed {
9,677✔
614
                s.mu.Unlock()
×
615
                return
×
616
        }
×
617
        s.mu.Unlock()
9,677✔
618

9,677✔
619
        deadline := time.Now().Add(s.pingInterval / 2)
9,677✔
620

9,677✔
621
        b, err := s.encodeMessage(newPingMessage(s.pingTimestampPrecision))
9,677✔
622

9,677✔
623
        if err != nil {
9,677✔
624
                s.Log.Error("failed to encode ping message", "error", err)
×
625
        } else if b != nil {
19,354✔
626
                err = s.writeFrameWithDeadline(b, deadline)
9,677✔
627
        }
9,677✔
628

629
        if err != nil {
9,677✔
630
                s.Disconnect("Ping failed", ws.CloseAbnormalClosure)
×
631
                return
×
632
        }
×
633

634
        s.addPing()
9,677✔
635
}
636

637
func (s *Session) startPing() {
2,428✔
638
        s.mu.Lock()
2,428✔
639
        defer s.mu.Unlock()
2,428✔
640

2,428✔
641
        // Calculate the minimum and maximum durations
2,428✔
642
        minDuration := s.pingInterval / 2
2,428✔
643
        maxDuration := s.pingInterval * 3 / 2
2,428✔
644

2,428✔
645
        initialInterval := time.Duration(rand.Int63n(int64(maxDuration-minDuration))) + minDuration // nolint:gosec
2,428✔
646

2,428✔
647
        s.pingTimer = time.AfterFunc(initialInterval, s.sendPing)
2,428✔
648

2,428✔
649
        if s.pongTimeout > 0 {
2,429✔
650
                s.pongTimer = time.AfterFunc(s.pongTimeout+initialInterval, s.handleNoPong)
1✔
651
        }
1✔
652
}
653

654
func (s *Session) addPing() {
9,677✔
655
        s.mu.Lock()
9,677✔
656
        defer s.mu.Unlock()
9,677✔
657

9,677✔
658
        s.pingTimer = time.AfterFunc(s.pingInterval, s.sendPing)
9,677✔
659
}
9,677✔
660

661
func newPingMessage(format string) *common.PingMessage {
9,677✔
662
        var ts int64
9,677✔
663

9,677✔
664
        switch format {
9,677✔
665
        case "ns":
2✔
666
                ts = time.Now().UnixNano()
2✔
667
        case "ms":
×
668
                ts = time.Now().UnixNano() / int64(time.Millisecond)
×
669
        default:
9,675✔
670
                ts = time.Now().Unix()
9,675✔
671
        }
672

673
        return (&common.PingMessage{Type: "ping", Message: ts})
9,677✔
674
}
675

676
func (s *Session) handlePong(msg *common.Message) {
1✔
677
        s.mu.Lock()
1✔
678
        defer s.mu.Unlock()
1✔
679

1✔
680
        if s.pongTimer == nil {
1✔
681
                s.Log.Debug("unexpected pong received")
×
682
                return
×
683
        }
×
684

685
        s.pongTimer.Reset(s.pongTimeout)
1✔
686
}
687

688
func (s *Session) handleNoPong() {
1✔
689
        s.mu.Lock()
1✔
690

1✔
691
        if !s.Connected {
1✔
692
                s.mu.Unlock()
×
693
                return
×
694
        }
×
695

696
        s.mu.Unlock()
1✔
697

1✔
698
        s.Log.Warn("disconnecting session due to no pongs")
1✔
699

1✔
700
        s.Send(common.NewDisconnectMessage(common.NO_PONG_REASON, true)) // nolint:errcheck
1✔
701
        s.Disconnect("No Pong", ws.CloseNormalClosure)
1✔
702
}
703

704
func (s *Session) encodeMessage(msg encoders.EncodedMessage) (*ws.SentFrame, error) {
466,897✔
705
        if cm, ok := msg.(*encoders.CachedEncodedMessage); ok {
923,980✔
706
                return cm.Fetch(
457,083✔
707
                        s.encoder.ID(),
457,083✔
708
                        func(m encoders.EncodedMessage) (*ws.SentFrame, error) {
458,351✔
709
                                return s.encoder.Encode(m)
1,268✔
710
                        })
1,268✔
711
        }
712

713
        return s.encoder.Encode(msg)
9,814✔
714
}
715

716
func (s *Session) encodeTransmission(msg string) (*ws.SentFrame, error) {
5,907✔
717
        return s.encoder.EncodeTransmission(msg)
5,907✔
718
}
5,907✔
719

720
func (s *Session) decodeMessage(raw []byte) (*common.Message, error) {
3,452✔
721
        return s.encoder.Decode(raw)
3,452✔
722
}
3,452✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc