• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 5050400886

22 May 2023 09:25PM UTC coverage: 62.185% (+0.05%) from 62.131%
5050400886

Pull #308

github

Reinaldo Oliveira
Adding documentation about the handshake validation process
Pull Request #308: Add handshake validators

84 of 84 new or added lines in 3 files covered. (100.0%)

4769 of 7669 relevant lines covered (62.19%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.07
/session/session.go
1
// Copyright (c) nano Author and TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package session
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "net"
28
        "reflect"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/golang/protobuf/proto"
34
        nats "github.com/nats-io/nats.go"
35
        "github.com/topfreegames/pitaya/v2/constants"
36
        "github.com/topfreegames/pitaya/v2/logger"
37
        "github.com/topfreegames/pitaya/v2/networkentity"
38
        "github.com/topfreegames/pitaya/v2/protos"
39
)
40

41
type sessionPoolImpl struct {
42
        sessionBindCallbacks []func(ctx context.Context, s Session) error
43
        afterBindCallbacks   []func(ctx context.Context, s Session) error
44
        handshakeValidators  map[string]func(data *HandshakeData) error
45

46
        // SessionCloseCallbacks contains global session close callbacks
47
        SessionCloseCallbacks []func(s Session)
48
        sessionsByUID         sync.Map
49
        sessionsByID          sync.Map
50
        sessionIDSvc          *sessionIDService
51
        // SessionCount keeps the current number of sessions
52
        SessionCount int64
53
}
54

55
// SessionPool centralizes all sessions within a Pitaya app
56
type SessionPool interface {
57
        NewSession(entity networkentity.NetworkEntity, frontend bool, UID ...string) Session
58
        GetSessionCount() int64
59
        GetSessionCloseCallbacks() []func(s Session)
60
        GetSessionByUID(uid string) Session
61
        GetSessionByID(id int64) Session
62
        OnSessionBind(f func(ctx context.Context, s Session) error)
63
        OnAfterSessionBind(f func(ctx context.Context, s Session) error)
64
        OnSessionClose(f func(s Session))
65
        CloseAll()
66
        AddHandshakeValidator(name string, f func(data *HandshakeData) error)
67
}
68

69
// HandshakeClientData represents information about the client sent on the handshake.
70
type HandshakeClientData struct {
71
        Platform    string `json:"platform"`
72
        LibVersion  string `json:"libVersion"`
73
        BuildNumber string `json:"clientBuildNumber"`
74
        Version     string `json:"clientVersion"`
75
}
76

77
// HandshakeData represents information about the handshake sent by the client.
78
// `sys` corresponds to information independent from the app and `user` information
79
// that depends on the app and is customized by the user.
80
type HandshakeData struct {
81
        Sys  HandshakeClientData    `json:"sys"`
82
        User map[string]interface{} `json:"user,omitempty"`
83
}
84

85
type sessionImpl struct {
86
        sync.RWMutex                                              // protect data
87
        id                  int64                                 // session global unique id
88
        uid                 string                                // binding user id
89
        lastTime            int64                                 // last heartbeat time
90
        entity              networkentity.NetworkEntity           // low-level network entity
91
        data                map[string]interface{}                // session data store
92
        handshakeData       *HandshakeData                        // handshake data received by the client
93
        handshakeValidators map[string]func(*HandshakeData) error // validations to run on handshake
94
        encodedData         []byte                                // session data encoded as a byte array
95
        OnCloseCallbacks    []func()                              //onClose callbacks
96
        IsFrontend          bool                                  // if session is a frontend session
97
        frontendID          string                                // the id of the frontend that owns the session
98
        frontendSessionID   int64                                 // the id of the session on the frontend server
99
        Subscriptions       []*nats.Subscription                  // subscription created on bind when using nats rpc server
100
        requestsInFlight    ReqInFlight                           // whether the session is waiting from a response from a remote
101
        pool                *sessionPoolImpl
102
}
103

104
type ReqInFlight struct {
105
        m  map[string]string
106
        mu sync.RWMutex
107
}
108

109
// Session represents a client session, which can store data during the connection.
110
// All data is released when the low-level connection is broken.
111
// Session instance related to the client will be passed to Handler method in the
112
// context parameter.
113
type Session interface {
114
        GetOnCloseCallbacks() []func()
115
        GetIsFrontend() bool
116
        GetSubscriptions() []*nats.Subscription
117
        SetOnCloseCallbacks(callbacks []func())
118
        SetIsFrontend(isFrontend bool)
119
        SetSubscriptions(subscriptions []*nats.Subscription)
120
        HasRequestsInFlight() bool
121
        GetRequestsInFlight() ReqInFlight
122
        SetRequestInFlight(reqID string, reqData string, inFlight bool)
123

124
        Push(route string, v interface{}) error
125
        ResponseMID(ctx context.Context, mid uint, v interface{}, err ...bool) error
126
        ID() int64
127
        UID() string
128
        GetData() map[string]interface{}
129
        SetData(data map[string]interface{}) error
130
        GetDataEncoded() []byte
131
        SetDataEncoded(encodedData []byte) error
132
        SetFrontendData(frontendID string, frontendSessionID int64)
133
        Bind(ctx context.Context, uid string) error
134
        Kick(ctx context.Context) error
135
        OnClose(c func()) error
136
        Close()
137
        RemoteAddr() net.Addr
138
        Remove(key string) error
139
        Set(key string, value interface{}) error
140
        HasKey(key string) bool
141
        Get(key string) interface{}
142
        Int(key string) int
143
        Int8(key string) int8
144
        Int16(key string) int16
145
        Int32(key string) int32
146
        Int64(key string) int64
147
        Uint(key string) uint
148
        Uint8(key string) uint8
149
        Uint16(key string) uint16
150
        Uint32(key string) uint32
151
        Uint64(key string) uint64
152
        Float32(key string) float32
153
        Float64(key string) float64
154
        String(key string) string
155
        Value(key string) interface{}
156
        PushToFront(ctx context.Context) error
157
        Clear()
158
        SetHandshakeData(data *HandshakeData)
159
        GetHandshakeData() *HandshakeData
160
        ValidateHandshake(data *HandshakeData) error
161
        GetHandshakeValidators() map[string]func(data *HandshakeData) error
162
}
163

164
type sessionIDService struct {
165
        sid int64
166
}
167

168
func newSessionIDService() *sessionIDService {
1✔
169
        return &sessionIDService{
1✔
170
                sid: 0,
1✔
171
        }
1✔
172
}
1✔
173

174
// SessionID returns the session id
175
func (c *sessionIDService) sessionID() int64 {
1✔
176
        return atomic.AddInt64(&c.sid, 1)
1✔
177
}
1✔
178

179
// NewSession returns a new session instance
180
// a networkentity.NetworkEntity is a low-level network instance
181
func (pool *sessionPoolImpl) NewSession(entity networkentity.NetworkEntity, frontend bool, UID ...string) Session {
1✔
182
        s := &sessionImpl{
1✔
183
                id:                  pool.sessionIDSvc.sessionID(),
1✔
184
                entity:              entity,
1✔
185
                data:                make(map[string]interface{}),
1✔
186
                handshakeData:       nil,
1✔
187
                handshakeValidators: pool.handshakeValidators,
1✔
188
                lastTime:            time.Now().Unix(),
1✔
189
                OnCloseCallbacks:    []func(){},
1✔
190
                IsFrontend:          frontend,
1✔
191
                pool:                pool,
1✔
192
                requestsInFlight:    ReqInFlight{m: make(map[string]string)},
1✔
193
        }
1✔
194
        if frontend {
2✔
195
                pool.sessionsByID.Store(s.id, s)
1✔
196
                atomic.AddInt64(&pool.SessionCount, 1)
1✔
197
        }
1✔
198
        if len(UID) > 0 {
2✔
199
                s.uid = UID[0]
1✔
200
        }
1✔
201
        return s
1✔
202
}
203

204
// NewSessionPool returns a new session pool instance
205
func NewSessionPool() SessionPool {
1✔
206
        return &sessionPoolImpl{
1✔
207
                sessionBindCallbacks:  make([]func(ctx context.Context, s Session) error, 0),
1✔
208
                afterBindCallbacks:    make([]func(ctx context.Context, s Session) error, 0),
1✔
209
                handshakeValidators:   make(map[string]func(data *HandshakeData) error, 0),
1✔
210
                SessionCloseCallbacks: make([]func(s Session), 0),
1✔
211
                sessionIDSvc:          newSessionIDService(),
1✔
212
        }
1✔
213
}
1✔
214

215
func (pool *sessionPoolImpl) GetSessionCount() int64 {
×
216
        return pool.SessionCount
×
217
}
×
218

219
func (pool *sessionPoolImpl) GetSessionCloseCallbacks() []func(s Session) {
×
220
        return pool.SessionCloseCallbacks
×
221
}
×
222

223
// GetSessionByUID return a session bound to an user id
224
func (pool *sessionPoolImpl) GetSessionByUID(uid string) Session {
1✔
225
        // TODO: Block this operation in backend servers
1✔
226
        if val, ok := pool.sessionsByUID.Load(uid); ok {
2✔
227
                return val.(Session)
1✔
228
        }
1✔
229
        return nil
1✔
230
}
231

232
// GetSessionByID return a session bound to a frontend server id
233
func (pool *sessionPoolImpl) GetSessionByID(id int64) Session {
1✔
234
        // TODO: Block this operation in backend servers
1✔
235
        if val, ok := pool.sessionsByID.Load(id); ok {
2✔
236
                return val.(Session)
1✔
237
        }
1✔
238
        return nil
1✔
239
}
240

241
// OnSessionBind adds a method to be called when a session is bound
242
// same function cannot be added twice!
243
func (pool *sessionPoolImpl) OnSessionBind(f func(ctx context.Context, s Session) error) {
1✔
244
        // Prevents the same function to be added twice in onSessionBind
1✔
245
        sf1 := reflect.ValueOf(f)
1✔
246
        for _, fun := range pool.sessionBindCallbacks {
1✔
247
                sf2 := reflect.ValueOf(fun)
×
248
                if sf1.Pointer() == sf2.Pointer() {
×
249
                        return
×
250
                }
×
251
        }
252
        pool.sessionBindCallbacks = append(pool.sessionBindCallbacks, f)
1✔
253
}
254

255
// OnAfterSessionBind adds a method to be called when session is bound and after all sessionBind callbacks
256
func (pool *sessionPoolImpl) OnAfterSessionBind(f func(ctx context.Context, s Session) error) {
×
257
        // Prevents the same function to be added twice in onSessionBind
×
258
        sf1 := reflect.ValueOf(f)
×
259
        for _, fun := range pool.afterBindCallbacks {
×
260
                sf2 := reflect.ValueOf(fun)
×
261
                if sf1.Pointer() == sf2.Pointer() {
×
262
                        return
×
263
                }
×
264
        }
265
        pool.afterBindCallbacks = append(pool.afterBindCallbacks, f)
×
266
}
267

268
// OnSessionClose adds a method that will be called when every session closes
269
func (pool *sessionPoolImpl) OnSessionClose(f func(s Session)) {
×
270
        sf1 := reflect.ValueOf(f)
×
271
        for _, fun := range pool.SessionCloseCallbacks {
×
272
                sf2 := reflect.ValueOf(fun)
×
273
                if sf1.Pointer() == sf2.Pointer() {
×
274
                        return
×
275
                }
×
276
        }
277
        pool.SessionCloseCallbacks = append(pool.SessionCloseCallbacks, f)
×
278
}
279

280
// CloseAll calls Close on all sessions
281
func (pool *sessionPoolImpl) CloseAll() {
1✔
282
        logger.Log.Infof("closing all sessions, %d sessions", pool.SessionCount)
1✔
283
        for pool.SessionCount > 0 {
2✔
284
                pool.sessionsByID.Range(func(_, value interface{}) bool {
2✔
285
                        s := value.(Session)
1✔
286
                        if s.HasRequestsInFlight() {
1✔
287
                                reqsInFlight := s.GetRequestsInFlight()
×
288
                                reqsInFlight.mu.RLock()
×
289
                                for _, route := range reqsInFlight.m {
×
290
                                        logger.Log.Debugf("Session for user %s is waiting on a response for route %s from a remote server. Delaying session close.", s.UID(), route)
×
291
                                }
×
292
                                reqsInFlight.mu.RUnlock()
×
293
                                return false
×
294
                        } else {
1✔
295
                                s.Close()
1✔
296
                                return true
1✔
297
                        }
1✔
298
                })
299
                logger.Log.Debugf("%d sessions remaining", pool.SessionCount)
1✔
300
                if pool.SessionCount > 0 {
1✔
301
                        time.Sleep(100 * time.Millisecond)
×
302
                }
×
303
        }
304
        logger.Log.Info("finished closing sessions")
1✔
305
}
306

307
// AddHandshakeValidator allows adds validation functions that will run when
308
// handshake packets are processed. Errors will be raised with the given name.
309
func (pool *sessionPoolImpl) AddHandshakeValidator(name string, f func(data *HandshakeData) error) {
1✔
310
        pool.handshakeValidators[name] = f
1✔
311
}
1✔
312

313
func (s *sessionImpl) updateEncodedData() error {
1✔
314
        var b []byte
1✔
315
        b, err := json.Marshal(s.data)
1✔
316
        if err != nil {
1✔
317
                return err
×
318
        }
×
319
        s.encodedData = b
1✔
320
        return nil
1✔
321
}
322

323
// GetOnCloseCallbacks ...
324
func (s *sessionImpl) GetOnCloseCallbacks() []func() {
1✔
325
        return s.OnCloseCallbacks
1✔
326
}
1✔
327

328
// GetIsFrontend ...
329
func (s *sessionImpl) GetIsFrontend() bool {
×
330
        return s.IsFrontend
×
331
}
×
332

333
// GetSubscriptions ...
334
func (s *sessionImpl) GetSubscriptions() []*nats.Subscription {
×
335
        return s.Subscriptions
×
336
}
×
337

338
// SetOnCloseCallbacks ...
339
func (s *sessionImpl) SetOnCloseCallbacks(callbacks []func()) {
×
340
        s.OnCloseCallbacks = callbacks
×
341
}
×
342

343
// SetIsFrontend ...
344
func (s *sessionImpl) SetIsFrontend(isFrontend bool) {
×
345
        s.IsFrontend = isFrontend
×
346
}
×
347

348
// SetSubscriptions ...
349
func (s *sessionImpl) SetSubscriptions(subscriptions []*nats.Subscription) {
1✔
350
        s.Subscriptions = subscriptions
1✔
351
}
1✔
352

353
// Push message to client
354
func (s *sessionImpl) Push(route string, v interface{}) error {
1✔
355
        return s.entity.Push(route, v)
1✔
356
}
1✔
357

358
// ResponseMID responses message to client, mid is
359
// request message ID
360
func (s *sessionImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, err ...bool) error {
1✔
361
        return s.entity.ResponseMID(ctx, mid, v, err...)
1✔
362
}
1✔
363

364
// ID returns the session id
365
func (s *sessionImpl) ID() int64 {
1✔
366
        return s.id
1✔
367
}
1✔
368

369
// UID returns uid that bind to current session
370
func (s *sessionImpl) UID() string {
1✔
371
        return s.uid
1✔
372
}
1✔
373

374
// GetData gets the data
375
func (s *sessionImpl) GetData() map[string]interface{} {
1✔
376
        s.RLock()
1✔
377
        defer s.RUnlock()
1✔
378

1✔
379
        return s.data
1✔
380
}
1✔
381

382
// SetData sets the whole session data
383
func (s *sessionImpl) SetData(data map[string]interface{}) error {
1✔
384
        s.Lock()
1✔
385
        defer s.Unlock()
1✔
386

1✔
387
        s.data = data
1✔
388
        return s.updateEncodedData()
1✔
389
}
1✔
390

391
// GetDataEncoded returns the session data as an encoded value
392
func (s *sessionImpl) GetDataEncoded() []byte {
1✔
393
        return s.encodedData
1✔
394
}
1✔
395

396
// SetDataEncoded sets the whole session data from an encoded value
397
func (s *sessionImpl) SetDataEncoded(encodedData []byte) error {
1✔
398
        if len(encodedData) == 0 {
1✔
399
                return nil
×
400
        }
×
401
        var data map[string]interface{}
1✔
402
        err := json.Unmarshal(encodedData, &data)
1✔
403
        if err != nil {
1✔
404
                return err
×
405
        }
×
406
        return s.SetData(data)
1✔
407
}
408

409
// SetFrontendData sets frontend id and session id
410
func (s *sessionImpl) SetFrontendData(frontendID string, frontendSessionID int64) {
1✔
411
        s.frontendID = frontendID
1✔
412
        s.frontendSessionID = frontendSessionID
1✔
413
}
1✔
414

415
// Bind bind UID to current session
416
func (s *sessionImpl) Bind(ctx context.Context, uid string) error {
1✔
417
        if uid == "" {
2✔
418
                return constants.ErrIllegalUID
1✔
419
        }
1✔
420

421
        if s.UID() != "" {
2✔
422
                return constants.ErrSessionAlreadyBound
1✔
423
        }
1✔
424

425
        s.uid = uid
1✔
426
        for _, cb := range s.pool.sessionBindCallbacks {
2✔
427
                err := cb(ctx, s)
1✔
428
                if err != nil {
2✔
429
                        s.uid = ""
1✔
430
                        return err
1✔
431
                }
1✔
432
        }
433

434
        for _, cb := range s.pool.afterBindCallbacks {
1✔
435
                err := cb(ctx, s)
×
436
                if err != nil {
×
437
                        s.uid = ""
×
438
                        return err
×
439
                }
×
440
        }
441

442
        // if code running on frontend server
443
        if s.IsFrontend {
2✔
444
                s.pool.sessionsByUID.Store(uid, s)
1✔
445
        } else {
2✔
446
                // If frontentID is set this means it is a remote call and the current server
1✔
447
                // is not the frontend server that received the user request
1✔
448
                err := s.bindInFront(ctx)
1✔
449
                if err != nil {
2✔
450
                        logger.Log.Error("error while trying to push session to front: ", err)
1✔
451
                        s.uid = ""
1✔
452
                        return err
1✔
453
                }
1✔
454
        }
455
        return nil
1✔
456
}
457

458
// Kick kicks the user
459
func (s *sessionImpl) Kick(ctx context.Context) error {
1✔
460
        err := s.entity.Kick(ctx)
1✔
461
        if err != nil {
1✔
462
                return err
×
463
        }
×
464
        return s.entity.Close()
1✔
465
}
466

467
// OnClose adds the function it receives to the callbacks that will be called
468
// when the session is closed
469
func (s *sessionImpl) OnClose(c func()) error {
1✔
470
        if !s.IsFrontend {
2✔
471
                return constants.ErrOnCloseBackend
1✔
472
        }
1✔
473
        s.OnCloseCallbacks = append(s.OnCloseCallbacks, c)
1✔
474
        return nil
1✔
475
}
476

477
// Close terminates current session, session related data will not be released,
478
// all related data should be cleared explicitly in Session closed callback
479
func (s *sessionImpl) Close() {
1✔
480
        atomic.AddInt64(&s.pool.SessionCount, -1)
1✔
481
        s.pool.sessionsByID.Delete(s.ID())
1✔
482
        s.pool.sessionsByUID.Delete(s.UID())
1✔
483
        // TODO: this logic should be moved to nats rpc server
1✔
484
        if s.IsFrontend && s.Subscriptions != nil && len(s.Subscriptions) > 0 {
2✔
485
                // if the user is bound to an userid and nats rpc server is being used we need to unsubscribe
1✔
486
                for _, sub := range s.Subscriptions {
2✔
487
                        err := sub.Drain()
1✔
488
                        if err != nil {
1✔
489
                                logger.Log.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error())
×
490
                        } else {
1✔
491
                                logger.Log.Debugf("successfully unsubscribed to user's %s messages channel", s.UID())
1✔
492
                        }
1✔
493
                }
494
        }
495
        s.entity.Close()
1✔
496
}
497

498
// RemoteAddr returns the remote network address.
499
func (s *sessionImpl) RemoteAddr() net.Addr {
1✔
500
        return s.entity.RemoteAddr()
1✔
501
}
1✔
502

503
// Remove delete data associated with the key from session storage
504
func (s *sessionImpl) Remove(key string) error {
1✔
505
        s.Lock()
1✔
506
        defer s.Unlock()
1✔
507

1✔
508
        delete(s.data, key)
1✔
509
        return s.updateEncodedData()
1✔
510
}
1✔
511

512
// Set associates value with the key in session storage
513
func (s *sessionImpl) Set(key string, value interface{}) error {
1✔
514
        s.Lock()
1✔
515
        defer s.Unlock()
1✔
516

1✔
517
        s.data[key] = value
1✔
518
        return s.updateEncodedData()
1✔
519
}
1✔
520

521
// HasKey decides whether a key has associated value
522
func (s *sessionImpl) HasKey(key string) bool {
1✔
523
        s.RLock()
1✔
524
        defer s.RUnlock()
1✔
525

1✔
526
        _, has := s.data[key]
1✔
527
        return has
1✔
528
}
1✔
529

530
// Get returns a key value
531
func (s *sessionImpl) Get(key string) interface{} {
1✔
532
        s.RLock()
1✔
533
        defer s.RUnlock()
1✔
534

1✔
535
        v, ok := s.data[key]
1✔
536
        if !ok {
2✔
537
                return nil
1✔
538
        }
1✔
539
        return v
1✔
540
}
541

542
// Int returns the value associated with the key as a int.
543
func (s *sessionImpl) Int(key string) int {
1✔
544
        s.RLock()
1✔
545
        defer s.RUnlock()
1✔
546

1✔
547
        v, ok := s.data[key]
1✔
548
        if !ok {
2✔
549
                return 0
1✔
550
        }
1✔
551

552
        value, ok := v.(int)
1✔
553
        if !ok {
2✔
554
                return 0
1✔
555
        }
1✔
556
        return value
1✔
557
}
558

559
// Int8 returns the value associated with the key as a int8.
560
func (s *sessionImpl) Int8(key string) int8 {
1✔
561
        s.RLock()
1✔
562
        defer s.RUnlock()
1✔
563

1✔
564
        v, ok := s.data[key]
1✔
565
        if !ok {
2✔
566
                return 0
1✔
567
        }
1✔
568

569
        value, ok := v.(int8)
1✔
570
        if !ok {
2✔
571
                return 0
1✔
572
        }
1✔
573
        return value
1✔
574
}
575

576
// Int16 returns the value associated with the key as a int16.
577
func (s *sessionImpl) Int16(key string) int16 {
1✔
578
        s.RLock()
1✔
579
        defer s.RUnlock()
1✔
580

1✔
581
        v, ok := s.data[key]
1✔
582
        if !ok {
2✔
583
                return 0
1✔
584
        }
1✔
585

586
        value, ok := v.(int16)
1✔
587
        if !ok {
2✔
588
                return 0
1✔
589
        }
1✔
590
        return value
1✔
591
}
592

593
// Int32 returns the value associated with the key as a int32.
594
func (s *sessionImpl) Int32(key string) int32 {
1✔
595
        s.RLock()
1✔
596
        defer s.RUnlock()
1✔
597

1✔
598
        v, ok := s.data[key]
1✔
599
        if !ok {
2✔
600
                return 0
1✔
601
        }
1✔
602

603
        value, ok := v.(int32)
1✔
604
        if !ok {
2✔
605
                return 0
1✔
606
        }
1✔
607
        return value
1✔
608
}
609

610
// Int64 returns the value associated with the key as a int64.
611
func (s *sessionImpl) Int64(key string) int64 {
1✔
612
        s.RLock()
1✔
613
        defer s.RUnlock()
1✔
614

1✔
615
        v, ok := s.data[key]
1✔
616
        if !ok {
2✔
617
                return 0
1✔
618
        }
1✔
619

620
        value, ok := v.(int64)
1✔
621
        if !ok {
2✔
622
                return 0
1✔
623
        }
1✔
624
        return value
1✔
625
}
626

627
// Uint returns the value associated with the key as a uint.
628
func (s *sessionImpl) Uint(key string) uint {
1✔
629
        s.RLock()
1✔
630
        defer s.RUnlock()
1✔
631

1✔
632
        v, ok := s.data[key]
1✔
633
        if !ok {
2✔
634
                return 0
1✔
635
        }
1✔
636

637
        value, ok := v.(uint)
1✔
638
        if !ok {
2✔
639
                return 0
1✔
640
        }
1✔
641
        return value
1✔
642
}
643

644
// Uint8 returns the value associated with the key as a uint8.
645
func (s *sessionImpl) Uint8(key string) uint8 {
1✔
646
        s.RLock()
1✔
647
        defer s.RUnlock()
1✔
648

1✔
649
        v, ok := s.data[key]
1✔
650
        if !ok {
2✔
651
                return 0
1✔
652
        }
1✔
653

654
        value, ok := v.(uint8)
1✔
655
        if !ok {
2✔
656
                return 0
1✔
657
        }
1✔
658
        return value
1✔
659
}
660

661
// Uint16 returns the value associated with the key as a uint16.
662
func (s *sessionImpl) Uint16(key string) uint16 {
1✔
663
        s.RLock()
1✔
664
        defer s.RUnlock()
1✔
665

1✔
666
        v, ok := s.data[key]
1✔
667
        if !ok {
2✔
668
                return 0
1✔
669
        }
1✔
670

671
        value, ok := v.(uint16)
1✔
672
        if !ok {
2✔
673
                return 0
1✔
674
        }
1✔
675
        return value
1✔
676
}
677

678
// Uint32 returns the value associated with the key as a uint32.
679
func (s *sessionImpl) Uint32(key string) uint32 {
1✔
680
        s.RLock()
1✔
681
        defer s.RUnlock()
1✔
682

1✔
683
        v, ok := s.data[key]
1✔
684
        if !ok {
2✔
685
                return 0
1✔
686
        }
1✔
687

688
        value, ok := v.(uint32)
1✔
689
        if !ok {
2✔
690
                return 0
1✔
691
        }
1✔
692
        return value
1✔
693
}
694

695
// Uint64 returns the value associated with the key as a uint64.
696
func (s *sessionImpl) Uint64(key string) uint64 {
1✔
697
        s.RLock()
1✔
698
        defer s.RUnlock()
1✔
699

1✔
700
        v, ok := s.data[key]
1✔
701
        if !ok {
2✔
702
                return 0
1✔
703
        }
1✔
704

705
        value, ok := v.(uint64)
1✔
706
        if !ok {
2✔
707
                return 0
1✔
708
        }
1✔
709
        return value
1✔
710
}
711

712
// Float32 returns the value associated with the key as a float32.
713
func (s *sessionImpl) Float32(key string) float32 {
1✔
714
        s.RLock()
1✔
715
        defer s.RUnlock()
1✔
716

1✔
717
        v, ok := s.data[key]
1✔
718
        if !ok {
2✔
719
                return 0
1✔
720
        }
1✔
721

722
        value, ok := v.(float32)
1✔
723
        if !ok {
2✔
724
                return 0
1✔
725
        }
1✔
726
        return value
1✔
727
}
728

729
// Float64 returns the value associated with the key as a float64.
730
func (s *sessionImpl) Float64(key string) float64 {
1✔
731
        s.RLock()
1✔
732
        defer s.RUnlock()
1✔
733

1✔
734
        v, ok := s.data[key]
1✔
735
        if !ok {
2✔
736
                return 0
1✔
737
        }
1✔
738

739
        value, ok := v.(float64)
1✔
740
        if !ok {
2✔
741
                return 0
1✔
742
        }
1✔
743
        return value
1✔
744
}
745

746
// String returns the value associated with the key as a string.
747
func (s *sessionImpl) String(key string) string {
1✔
748
        s.RLock()
1✔
749
        defer s.RUnlock()
1✔
750

1✔
751
        v, ok := s.data[key]
1✔
752
        if !ok {
2✔
753
                return ""
1✔
754
        }
1✔
755

756
        value, ok := v.(string)
1✔
757
        if !ok {
2✔
758
                return ""
1✔
759
        }
1✔
760
        return value
1✔
761
}
762

763
// Value returns the value associated with the key as a interface{}.
764
func (s *sessionImpl) Value(key string) interface{} {
1✔
765
        s.RLock()
1✔
766
        defer s.RUnlock()
1✔
767

1✔
768
        return s.data[key]
1✔
769
}
1✔
770

771
func (s *sessionImpl) bindInFront(ctx context.Context) error {
1✔
772
        return s.sendRequestToFront(ctx, constants.SessionBindRoute, false)
1✔
773
}
1✔
774

775
// PushToFront updates the session in the frontend
776
func (s *sessionImpl) PushToFront(ctx context.Context) error {
1✔
777
        if s.IsFrontend {
2✔
778
                return constants.ErrFrontSessionCantPushToFront
1✔
779
        }
1✔
780
        return s.sendRequestToFront(ctx, constants.SessionPushRoute, true)
1✔
781
}
782

783
// Clear releases all data related to current session
784
func (s *sessionImpl) Clear() {
1✔
785
        s.Lock()
1✔
786
        defer s.Unlock()
1✔
787

1✔
788
        s.uid = ""
1✔
789
        s.data = map[string]interface{}{}
1✔
790
        s.updateEncodedData()
1✔
791
}
1✔
792

793
// SetHandshakeData sets the handshake data received by the client.
794
func (s *sessionImpl) SetHandshakeData(data *HandshakeData) {
1✔
795
        s.Lock()
1✔
796
        defer s.Unlock()
1✔
797

1✔
798
        s.handshakeData = data
1✔
799
}
1✔
800

801
// GetHandshakeData gets the handshake data received by the client.
802
func (s *sessionImpl) GetHandshakeData() *HandshakeData {
1✔
803
        return s.handshakeData
1✔
804
}
1✔
805

806
// GetHandshakeValidators return the handshake validators associated with the session.
807
func (s *sessionImpl) GetHandshakeValidators() map[string]func(data *HandshakeData) error {
1✔
808
        return s.handshakeValidators
1✔
809
}
1✔
810

811
func (s *sessionImpl) ValidateHandshake(data *HandshakeData) error {
1✔
812
        for name, fun := range s.handshakeValidators {
2✔
813
                if err := fun(data); err != nil {
2✔
814
                        return fmt.Errorf("failed to run '%s' validator: %w. SessionId=%d", name, err, s.ID())
1✔
815
                }
1✔
816
        }
817

818
        return nil
1✔
819
}
820

821
func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, includeData bool) error {
1✔
822
        sessionData := &protos.Session{
1✔
823
                Id:  s.frontendSessionID,
1✔
824
                Uid: s.uid,
1✔
825
        }
1✔
826
        if includeData {
2✔
827
                sessionData.Data = s.encodedData
1✔
828
        }
1✔
829
        b, err := proto.Marshal(sessionData)
1✔
830
        if err != nil {
1✔
831
                return err
×
832
        }
×
833
        res, err := s.entity.SendRequest(ctx, s.frontendID, route, b)
1✔
834
        if err != nil {
2✔
835
                return err
1✔
836
        }
1✔
837
        logger.Log.Debugf("%s Got response: %+v", route, res)
1✔
838
        return nil
1✔
839
}
840

841
func (s *sessionImpl) HasRequestsInFlight() bool {
1✔
842
        return len(s.requestsInFlight.m) != 0
1✔
843
}
1✔
844

845
func (s *sessionImpl) GetRequestsInFlight() ReqInFlight {
×
846
        return s.requestsInFlight
×
847
}
×
848

849
func (s *sessionImpl) SetRequestInFlight(reqID string, reqData string, inFlight bool) {
×
850
        s.requestsInFlight.mu.Lock()
×
851
        if inFlight {
×
852
                s.requestsInFlight.m[reqID] = reqData
×
853
        } else {
×
854
                if _, ok := s.requestsInFlight.m[reqID]; ok {
×
855
                        delete(s.requestsInFlight.m, reqID)
×
856
                }
×
857
        }
858
        s.requestsInFlight.mu.Unlock()
×
859
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc