• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 5447480684

03 Jul 2023 06:14PM UTC coverage: 62.238% (+0.03%) from 62.204%
5447480684

Pull #313

github

rsafonseca
fix rebase
Pull Request #313: (bugfix): Fix 2 session handling bugs

9 of 9 new or added lines in 1 file covered. (100.0%)

4783 of 7685 relevant lines covered (62.24%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.88
/session/session.go
1
// Copyright (c) nano Author and TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package session
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "fmt"
27
        "net"
28
        "reflect"
29
        "sync"
30
        "sync/atomic"
31
        "time"
32

33
        "github.com/golang/protobuf/proto"
34
        nats "github.com/nats-io/nats.go"
35
        "github.com/topfreegames/pitaya/v2/constants"
36
        "github.com/topfreegames/pitaya/v2/logger"
37
        "github.com/topfreegames/pitaya/v2/networkentity"
38
        "github.com/topfreegames/pitaya/v2/protos"
39
)
40

41
type sessionPoolImpl struct {
42
        sessionBindCallbacks []func(ctx context.Context, s Session) error
43
        afterBindCallbacks   []func(ctx context.Context, s Session) error
44
        handshakeValidators  map[string]func(data *HandshakeData) error
45

46
        // SessionCloseCallbacks contains global session close callbacks
47
        SessionCloseCallbacks []func(s Session)
48
        sessionsByUID         sync.Map
49
        sessionsByID          sync.Map
50
        sessionIDSvc          *sessionIDService
51
        // SessionCount keeps the current number of sessions
52
        SessionCount int64
53
}
54

55
// SessionPool centralizes all sessions within a Pitaya app
56
type SessionPool interface {
57
        NewSession(entity networkentity.NetworkEntity, frontend bool, UID ...string) Session
58
        GetSessionCount() int64
59
        GetSessionCloseCallbacks() []func(s Session)
60
        GetSessionByUID(uid string) Session
61
        GetSessionByID(id int64) Session
62
        OnSessionBind(f func(ctx context.Context, s Session) error)
63
        OnAfterSessionBind(f func(ctx context.Context, s Session) error)
64
        OnSessionClose(f func(s Session))
65
        CloseAll()
66
        AddHandshakeValidator(name string, f func(data *HandshakeData) error)
67
}
68

69
// HandshakeClientData represents information about the client sent on the handshake.
70
type HandshakeClientData struct {
71
        Platform    string `json:"platform"`
72
        LibVersion  string `json:"libVersion"`
73
        BuildNumber string `json:"clientBuildNumber"`
74
        Version     string `json:"clientVersion"`
75
}
76

77
// HandshakeData represents information about the handshake sent by the client.
78
// `sys` corresponds to information independent from the app and `user` information
79
// that depends on the app and is customized by the user.
80
type HandshakeData struct {
81
        Sys  HandshakeClientData    `json:"sys"`
82
        User map[string]interface{} `json:"user,omitempty"`
83
}
84

85
type sessionImpl struct {
86
        sync.RWMutex                                              // protect data
87
        id                  int64                                 // session global unique id
88
        uid                 string                                // binding user id
89
        lastTime            int64                                 // last heartbeat time
90
        entity              networkentity.NetworkEntity           // low-level network entity
91
        data                map[string]interface{}                // session data store
92
        handshakeData       *HandshakeData                        // handshake data received by the client
93
        handshakeValidators map[string]func(*HandshakeData) error // validations to run on handshake
94
        encodedData         []byte                                // session data encoded as a byte array
95
        OnCloseCallbacks    []func()                              //onClose callbacks
96
        IsFrontend          bool                                  // if session is a frontend session
97
        frontendID          string                                // the id of the frontend that owns the session
98
        frontendSessionID   int64                                 // the id of the session on the frontend server
99
        Subscriptions       []*nats.Subscription                  // subscription created on bind when using nats rpc server
100
        requestsInFlight    ReqInFlight                           // whether the session is waiting from a response from a remote
101
        pool                *sessionPoolImpl
102
}
103

104
type ReqInFlight struct {
105
        m  map[string]string
106
        mu sync.RWMutex
107
}
108

109
// Session represents a client session, which can store data during the connection.
110
// All data is released when the low-level connection is broken.
111
// Session instance related to the client will be passed to Handler method in the
112
// context parameter.
113
type Session interface {
114
        GetOnCloseCallbacks() []func()
115
        GetIsFrontend() bool
116
        GetSubscriptions() []*nats.Subscription
117
        SetOnCloseCallbacks(callbacks []func())
118
        SetIsFrontend(isFrontend bool)
119
        SetSubscriptions(subscriptions []*nats.Subscription)
120
        HasRequestsInFlight() bool
121
        GetRequestsInFlight() ReqInFlight
122
        SetRequestInFlight(reqID string, reqData string, inFlight bool)
123

124
        Push(route string, v interface{}) error
125
        ResponseMID(ctx context.Context, mid uint, v interface{}, err ...bool) error
126
        ID() int64
127
        UID() string
128
        GetData() map[string]interface{}
129
        SetData(data map[string]interface{}) error
130
        GetDataEncoded() []byte
131
        SetDataEncoded(encodedData []byte) error
132
        SetFrontendData(frontendID string, frontendSessionID int64)
133
        Bind(ctx context.Context, uid string) error
134
        Kick(ctx context.Context) error
135
        OnClose(c func()) error
136
        Close()
137
        RemoteAddr() net.Addr
138
        Remove(key string) error
139
        Set(key string, value interface{}) error
140
        HasKey(key string) bool
141
        Get(key string) interface{}
142
        Int(key string) int
143
        Int8(key string) int8
144
        Int16(key string) int16
145
        Int32(key string) int32
146
        Int64(key string) int64
147
        Uint(key string) uint
148
        Uint8(key string) uint8
149
        Uint16(key string) uint16
150
        Uint32(key string) uint32
151
        Uint64(key string) uint64
152
        Float32(key string) float32
153
        Float64(key string) float64
154
        String(key string) string
155
        Value(key string) interface{}
156
        PushToFront(ctx context.Context) error
157
        Clear()
158
        SetHandshakeData(data *HandshakeData)
159
        GetHandshakeData() *HandshakeData
160
        ValidateHandshake(data *HandshakeData) error
161
        GetHandshakeValidators() map[string]func(data *HandshakeData) error
162
}
163

164
type sessionIDService struct {
165
        sid int64
166
}
167

168
func newSessionIDService() *sessionIDService {
1✔
169
        return &sessionIDService{
1✔
170
                sid: 0,
1✔
171
        }
1✔
172
}
1✔
173

174
// SessionID returns the session id
175
func (c *sessionIDService) sessionID() int64 {
1✔
176
        return atomic.AddInt64(&c.sid, 1)
1✔
177
}
1✔
178

179
// NewSession returns a new session instance
180
// a networkentity.NetworkEntity is a low-level network instance
181
func (pool *sessionPoolImpl) NewSession(entity networkentity.NetworkEntity, frontend bool, UID ...string) Session {
1✔
182
        s := &sessionImpl{
1✔
183
                id:                  pool.sessionIDSvc.sessionID(),
1✔
184
                entity:              entity,
1✔
185
                data:                make(map[string]interface{}),
1✔
186
                handshakeData:       nil,
1✔
187
                handshakeValidators: pool.handshakeValidators,
1✔
188
                lastTime:            time.Now().Unix(),
1✔
189
                OnCloseCallbacks:    []func(){},
1✔
190
                IsFrontend:          frontend,
1✔
191
                pool:                pool,
1✔
192
                requestsInFlight:    ReqInFlight{m: make(map[string]string)},
1✔
193
        }
1✔
194
        if frontend {
2✔
195
                pool.sessionsByID.Store(s.id, s)
1✔
196
                atomic.AddInt64(&pool.SessionCount, 1)
1✔
197
        }
1✔
198
        if len(UID) > 0 {
2✔
199
                s.uid = UID[0]
1✔
200
        }
1✔
201
        return s
1✔
202
}
203

204
// NewSessionPool returns a new session pool instance
205
func NewSessionPool() SessionPool {
1✔
206
        return &sessionPoolImpl{
1✔
207
                sessionBindCallbacks:  make([]func(ctx context.Context, s Session) error, 0),
1✔
208
                afterBindCallbacks:    make([]func(ctx context.Context, s Session) error, 0),
1✔
209
                handshakeValidators:   make(map[string]func(data *HandshakeData) error, 0),
1✔
210
                SessionCloseCallbacks: make([]func(s Session), 0),
1✔
211
                sessionIDSvc:          newSessionIDService(),
1✔
212
        }
1✔
213
}
1✔
214

215
func (pool *sessionPoolImpl) GetSessionCount() int64 {
×
216
        return pool.SessionCount
×
217
}
×
218

219
func (pool *sessionPoolImpl) GetSessionCloseCallbacks() []func(s Session) {
×
220
        return pool.SessionCloseCallbacks
×
221
}
×
222

223
// GetSessionByUID return a session bound to an user id
224
func (pool *sessionPoolImpl) GetSessionByUID(uid string) Session {
1✔
225
        // TODO: Block this operation in backend servers
1✔
226
        if val, ok := pool.sessionsByUID.Load(uid); ok {
2✔
227
                return val.(Session)
1✔
228
        }
1✔
229
        return nil
1✔
230
}
231

232
// GetSessionByID return a session bound to a frontend server id
233
func (pool *sessionPoolImpl) GetSessionByID(id int64) Session {
1✔
234
        // TODO: Block this operation in backend servers
1✔
235
        if val, ok := pool.sessionsByID.Load(id); ok {
2✔
236
                return val.(Session)
1✔
237
        }
1✔
238
        return nil
1✔
239
}
240

241
// OnSessionBind adds a method to be called when a session is bound
242
// same function cannot be added twice!
243
func (pool *sessionPoolImpl) OnSessionBind(f func(ctx context.Context, s Session) error) {
1✔
244
        // Prevents the same function to be added twice in onSessionBind
1✔
245
        sf1 := reflect.ValueOf(f)
1✔
246
        for _, fun := range pool.sessionBindCallbacks {
1✔
247
                sf2 := reflect.ValueOf(fun)
×
248
                if sf1.Pointer() == sf2.Pointer() {
×
249
                        return
×
250
                }
×
251
        }
252
        pool.sessionBindCallbacks = append(pool.sessionBindCallbacks, f)
1✔
253
}
254

255
// OnAfterSessionBind adds a method to be called when session is bound and after all sessionBind callbacks
256
func (pool *sessionPoolImpl) OnAfterSessionBind(f func(ctx context.Context, s Session) error) {
×
257
        // Prevents the same function to be added twice in onSessionBind
×
258
        sf1 := reflect.ValueOf(f)
×
259
        for _, fun := range pool.afterBindCallbacks {
×
260
                sf2 := reflect.ValueOf(fun)
×
261
                if sf1.Pointer() == sf2.Pointer() {
×
262
                        return
×
263
                }
×
264
        }
265
        pool.afterBindCallbacks = append(pool.afterBindCallbacks, f)
×
266
}
267

268
// OnSessionClose adds a method that will be called when every session closes
269
func (pool *sessionPoolImpl) OnSessionClose(f func(s Session)) {
×
270
        sf1 := reflect.ValueOf(f)
×
271
        for _, fun := range pool.SessionCloseCallbacks {
×
272
                sf2 := reflect.ValueOf(fun)
×
273
                if sf1.Pointer() == sf2.Pointer() {
×
274
                        return
×
275
                }
×
276
        }
277
        pool.SessionCloseCallbacks = append(pool.SessionCloseCallbacks, f)
×
278
}
279

280
// CloseAll calls Close on all sessions
281
func (pool *sessionPoolImpl) CloseAll() {
1✔
282
        logger.Log.Infof("closing all sessions, %d sessions", pool.SessionCount)
1✔
283
        for pool.SessionCount > 0 {
2✔
284
                pool.sessionsByID.Range(func(_, value interface{}) bool {
2✔
285
                        s := value.(Session)
1✔
286
                        if s.HasRequestsInFlight() {
1✔
287
                                reqsInFlight := s.GetRequestsInFlight()
×
288
                                reqsInFlight.mu.RLock()
×
289
                                for _, route := range reqsInFlight.m {
×
290
                                        logger.Log.Debugf("Session for user %s is waiting on a response for route %s from a remote server. Delaying session close.", s.UID(), route)
×
291
                                }
×
292
                                reqsInFlight.mu.RUnlock()
×
293
                                return false
×
294
                        } else {
1✔
295
                                s.Close()
1✔
296
                                return true
1✔
297
                        }
1✔
298
                })
299
                logger.Log.Debugf("%d sessions remaining", pool.SessionCount)
1✔
300
                if pool.SessionCount > 0 {
1✔
301
                        time.Sleep(100 * time.Millisecond)
×
302
                }
×
303
        }
304
        logger.Log.Info("finished closing sessions")
1✔
305
}
306

307
// AddHandshakeValidator allows adds validation functions that will run when
308
// handshake packets are processed. Errors will be raised with the given name.
309
func (pool *sessionPoolImpl) AddHandshakeValidator(name string, f func(data *HandshakeData) error) {
1✔
310
        pool.handshakeValidators[name] = f
1✔
311
}
1✔
312

313
func (s *sessionImpl) updateEncodedData() error {
1✔
314
        var b []byte
1✔
315
        b, err := json.Marshal(s.data)
1✔
316
        if err != nil {
1✔
317
                return err
×
318
        }
×
319
        s.encodedData = b
1✔
320
        return nil
1✔
321
}
322

323
// GetOnCloseCallbacks ...
324
func (s *sessionImpl) GetOnCloseCallbacks() []func() {
1✔
325
        return s.OnCloseCallbacks
1✔
326
}
1✔
327

328
// GetIsFrontend ...
329
func (s *sessionImpl) GetIsFrontend() bool {
×
330
        return s.IsFrontend
×
331
}
×
332

333
// GetSubscriptions ...
334
func (s *sessionImpl) GetSubscriptions() []*nats.Subscription {
×
335
        return s.Subscriptions
×
336
}
×
337

338
// SetOnCloseCallbacks ...
339
func (s *sessionImpl) SetOnCloseCallbacks(callbacks []func()) {
×
340
        s.OnCloseCallbacks = callbacks
×
341
}
×
342

343
// SetIsFrontend ...
344
func (s *sessionImpl) SetIsFrontend(isFrontend bool) {
×
345
        s.IsFrontend = isFrontend
×
346
}
×
347

348
// SetSubscriptions ...
349
func (s *sessionImpl) SetSubscriptions(subscriptions []*nats.Subscription) {
1✔
350
        s.Subscriptions = subscriptions
1✔
351
}
1✔
352

353
// Push message to client
354
func (s *sessionImpl) Push(route string, v interface{}) error {
1✔
355
        return s.entity.Push(route, v)
1✔
356
}
1✔
357

358
// ResponseMID responses message to client, mid is
359
// request message ID
360
func (s *sessionImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, err ...bool) error {
1✔
361
        return s.entity.ResponseMID(ctx, mid, v, err...)
1✔
362
}
1✔
363

364
// ID returns the session id
365
func (s *sessionImpl) ID() int64 {
1✔
366
        return s.id
1✔
367
}
1✔
368

369
// UID returns uid that bind to current session
370
func (s *sessionImpl) UID() string {
1✔
371
        return s.uid
1✔
372
}
1✔
373

374
// GetData gets the data
375
func (s *sessionImpl) GetData() map[string]interface{} {
1✔
376
        s.RLock()
1✔
377
        defer s.RUnlock()
1✔
378

1✔
379
        return s.data
1✔
380
}
1✔
381

382
// SetData sets the whole session data
383
func (s *sessionImpl) SetData(data map[string]interface{}) error {
1✔
384
        s.Lock()
1✔
385
        defer s.Unlock()
1✔
386

1✔
387
        s.data = data
1✔
388
        return s.updateEncodedData()
1✔
389
}
1✔
390

391
// GetDataEncoded returns the session data as an encoded value
392
func (s *sessionImpl) GetDataEncoded() []byte {
1✔
393
        return s.encodedData
1✔
394
}
1✔
395

396
// SetDataEncoded sets the whole session data from an encoded value
397
func (s *sessionImpl) SetDataEncoded(encodedData []byte) error {
1✔
398
        if len(encodedData) == 0 {
1✔
399
                return nil
×
400
        }
×
401
        var data map[string]interface{}
1✔
402
        err := json.Unmarshal(encodedData, &data)
1✔
403
        if err != nil {
1✔
404
                return err
×
405
        }
×
406
        return s.SetData(data)
1✔
407
}
408

409
// SetFrontendData sets frontend id and session id
410
func (s *sessionImpl) SetFrontendData(frontendID string, frontendSessionID int64) {
1✔
411
        s.frontendID = frontendID
1✔
412
        s.frontendSessionID = frontendSessionID
1✔
413
}
1✔
414

415
// Bind bind UID to current session
416
func (s *sessionImpl) Bind(ctx context.Context, uid string) error {
1✔
417
        if uid == "" {
2✔
418
                return constants.ErrIllegalUID
1✔
419
        }
1✔
420

421
        if s.UID() != "" {
2✔
422
                return constants.ErrSessionAlreadyBound
1✔
423
        }
1✔
424

425
        s.uid = uid
1✔
426
        for _, cb := range s.pool.sessionBindCallbacks {
2✔
427
                err := cb(ctx, s)
1✔
428
                if err != nil {
2✔
429
                        s.uid = ""
1✔
430
                        return err
1✔
431
                }
1✔
432
        }
433

434
        for _, cb := range s.pool.afterBindCallbacks {
1✔
435
                err := cb(ctx, s)
×
436
                if err != nil {
×
437
                        s.uid = ""
×
438
                        return err
×
439
                }
×
440
        }
441

442
        // if code running on frontend server
443
        if s.IsFrontend {
2✔
444
                // If a session with the same UID already exists in this frontend server, close it
1✔
445
                if val, ok := s.pool.sessionsByUID.Load(uid); ok {
1✔
446
                        val.(Session).Close()
×
447
                }
×
448
                s.pool.sessionsByUID.Store(uid, s)
1✔
449
        } else {
1✔
450
                // If frontentID is set this means it is a remote call and the current server
1✔
451
                // is not the frontend server that received the user request
1✔
452
                err := s.bindInFront(ctx)
1✔
453
                if err != nil {
2✔
454
                        logger.Log.Error("error while trying to push session to front: ", err)
1✔
455
                        s.uid = ""
1✔
456
                        return err
1✔
457
                }
1✔
458
        }
459
        return nil
1✔
460
}
461

462
// Kick kicks the user
463
func (s *sessionImpl) Kick(ctx context.Context) error {
1✔
464
        err := s.entity.Kick(ctx)
1✔
465
        if err != nil {
1✔
466
                return err
×
467
        }
×
468
        return s.entity.Close()
1✔
469
}
470

471
// OnClose adds the function it receives to the callbacks that will be called
472
// when the session is closed
473
func (s *sessionImpl) OnClose(c func()) error {
1✔
474
        if !s.IsFrontend {
2✔
475
                return constants.ErrOnCloseBackend
1✔
476
        }
1✔
477
        s.OnCloseCallbacks = append(s.OnCloseCallbacks, c)
1✔
478
        return nil
1✔
479
}
480

481
// Close terminates current session, session related data will not be released,
482
// all related data should be cleared explicitly in Session closed callback
483
func (s *sessionImpl) Close() {
1✔
484
        atomic.AddInt64(&s.pool.SessionCount, -1)
1✔
485
        s.pool.sessionsByID.Delete(s.ID())
1✔
486
        // Only remove session by UID if the session ID matches the one being closed. This avoids problems with removing a valid session after the user has already reconnected before this session's heartbeat times out
1✔
487
        if val, ok := s.pool.sessionsByUID.Load(s.UID()); ok {
2✔
488
                if (val.(Session)).ID() == s.ID() {
2✔
489
                        s.pool.sessionsByUID.Delete(s.UID())
1✔
490
                }
1✔
491
        }
492
        // TODO: this logic should be moved to nats rpc server
493
        if s.IsFrontend && s.Subscriptions != nil && len(s.Subscriptions) > 0 {
2✔
494
                // if the user is bound to an userid and nats rpc server is being used we need to unsubscribe
1✔
495
                for _, sub := range s.Subscriptions {
2✔
496
                        err := sub.Drain()
1✔
497
                        if err != nil {
1✔
498
                                logger.Log.Errorf("error unsubscribing to user's messages channel: %s, this can cause performance and leak issues", err.Error())
×
499
                        } else {
1✔
500
                                logger.Log.Debugf("successfully unsubscribed to user's %s messages channel", s.UID())
1✔
501
                        }
1✔
502
                }
503
        }
504
        s.entity.Close()
1✔
505
}
506

507
// RemoteAddr returns the remote network address.
508
func (s *sessionImpl) RemoteAddr() net.Addr {
1✔
509
        return s.entity.RemoteAddr()
1✔
510
}
1✔
511

512
// Remove delete data associated with the key from session storage
513
func (s *sessionImpl) Remove(key string) error {
1✔
514
        s.Lock()
1✔
515
        defer s.Unlock()
1✔
516

1✔
517
        delete(s.data, key)
1✔
518
        return s.updateEncodedData()
1✔
519
}
1✔
520

521
// Set associates value with the key in session storage
522
func (s *sessionImpl) Set(key string, value interface{}) error {
1✔
523
        s.Lock()
1✔
524
        defer s.Unlock()
1✔
525

1✔
526
        s.data[key] = value
1✔
527
        return s.updateEncodedData()
1✔
528
}
1✔
529

530
// HasKey decides whether a key has associated value
531
func (s *sessionImpl) HasKey(key string) bool {
1✔
532
        s.RLock()
1✔
533
        defer s.RUnlock()
1✔
534

1✔
535
        _, has := s.data[key]
1✔
536
        return has
1✔
537
}
1✔
538

539
// Get returns a key value
540
func (s *sessionImpl) Get(key string) interface{} {
1✔
541
        s.RLock()
1✔
542
        defer s.RUnlock()
1✔
543

1✔
544
        v, ok := s.data[key]
1✔
545
        if !ok {
2✔
546
                return nil
1✔
547
        }
1✔
548
        return v
1✔
549
}
550

551
// Int returns the value associated with the key as a int.
552
func (s *sessionImpl) Int(key string) int {
1✔
553
        s.RLock()
1✔
554
        defer s.RUnlock()
1✔
555

1✔
556
        v, ok := s.data[key]
1✔
557
        if !ok {
2✔
558
                return 0
1✔
559
        }
1✔
560

561
        value, ok := v.(int)
1✔
562
        if !ok {
2✔
563
                return 0
1✔
564
        }
1✔
565
        return value
1✔
566
}
567

568
// Int8 returns the value associated with the key as a int8.
569
func (s *sessionImpl) Int8(key string) int8 {
1✔
570
        s.RLock()
1✔
571
        defer s.RUnlock()
1✔
572

1✔
573
        v, ok := s.data[key]
1✔
574
        if !ok {
2✔
575
                return 0
1✔
576
        }
1✔
577

578
        value, ok := v.(int8)
1✔
579
        if !ok {
2✔
580
                return 0
1✔
581
        }
1✔
582
        return value
1✔
583
}
584

585
// Int16 returns the value associated with the key as a int16.
586
func (s *sessionImpl) Int16(key string) int16 {
1✔
587
        s.RLock()
1✔
588
        defer s.RUnlock()
1✔
589

1✔
590
        v, ok := s.data[key]
1✔
591
        if !ok {
2✔
592
                return 0
1✔
593
        }
1✔
594

595
        value, ok := v.(int16)
1✔
596
        if !ok {
2✔
597
                return 0
1✔
598
        }
1✔
599
        return value
1✔
600
}
601

602
// Int32 returns the value associated with the key as a int32.
603
func (s *sessionImpl) Int32(key string) int32 {
1✔
604
        s.RLock()
1✔
605
        defer s.RUnlock()
1✔
606

1✔
607
        v, ok := s.data[key]
1✔
608
        if !ok {
2✔
609
                return 0
1✔
610
        }
1✔
611

612
        value, ok := v.(int32)
1✔
613
        if !ok {
2✔
614
                return 0
1✔
615
        }
1✔
616
        return value
1✔
617
}
618

619
// Int64 returns the value associated with the key as a int64.
620
func (s *sessionImpl) Int64(key string) int64 {
1✔
621
        s.RLock()
1✔
622
        defer s.RUnlock()
1✔
623

1✔
624
        v, ok := s.data[key]
1✔
625
        if !ok {
2✔
626
                return 0
1✔
627
        }
1✔
628

629
        value, ok := v.(int64)
1✔
630
        if !ok {
2✔
631
                return 0
1✔
632
        }
1✔
633
        return value
1✔
634
}
635

636
// Uint returns the value associated with the key as a uint.
637
func (s *sessionImpl) Uint(key string) uint {
1✔
638
        s.RLock()
1✔
639
        defer s.RUnlock()
1✔
640

1✔
641
        v, ok := s.data[key]
1✔
642
        if !ok {
2✔
643
                return 0
1✔
644
        }
1✔
645

646
        value, ok := v.(uint)
1✔
647
        if !ok {
2✔
648
                return 0
1✔
649
        }
1✔
650
        return value
1✔
651
}
652

653
// Uint8 returns the value associated with the key as a uint8.
654
func (s *sessionImpl) Uint8(key string) uint8 {
1✔
655
        s.RLock()
1✔
656
        defer s.RUnlock()
1✔
657

1✔
658
        v, ok := s.data[key]
1✔
659
        if !ok {
2✔
660
                return 0
1✔
661
        }
1✔
662

663
        value, ok := v.(uint8)
1✔
664
        if !ok {
2✔
665
                return 0
1✔
666
        }
1✔
667
        return value
1✔
668
}
669

670
// Uint16 returns the value associated with the key as a uint16.
671
func (s *sessionImpl) Uint16(key string) uint16 {
1✔
672
        s.RLock()
1✔
673
        defer s.RUnlock()
1✔
674

1✔
675
        v, ok := s.data[key]
1✔
676
        if !ok {
2✔
677
                return 0
1✔
678
        }
1✔
679

680
        value, ok := v.(uint16)
1✔
681
        if !ok {
2✔
682
                return 0
1✔
683
        }
1✔
684
        return value
1✔
685
}
686

687
// Uint32 returns the value associated with the key as a uint32.
688
func (s *sessionImpl) Uint32(key string) uint32 {
1✔
689
        s.RLock()
1✔
690
        defer s.RUnlock()
1✔
691

1✔
692
        v, ok := s.data[key]
1✔
693
        if !ok {
2✔
694
                return 0
1✔
695
        }
1✔
696

697
        value, ok := v.(uint32)
1✔
698
        if !ok {
2✔
699
                return 0
1✔
700
        }
1✔
701
        return value
1✔
702
}
703

704
// Uint64 returns the value associated with the key as a uint64.
705
func (s *sessionImpl) Uint64(key string) uint64 {
1✔
706
        s.RLock()
1✔
707
        defer s.RUnlock()
1✔
708

1✔
709
        v, ok := s.data[key]
1✔
710
        if !ok {
2✔
711
                return 0
1✔
712
        }
1✔
713

714
        value, ok := v.(uint64)
1✔
715
        if !ok {
2✔
716
                return 0
1✔
717
        }
1✔
718
        return value
1✔
719
}
720

721
// Float32 returns the value associated with the key as a float32.
722
func (s *sessionImpl) Float32(key string) float32 {
1✔
723
        s.RLock()
1✔
724
        defer s.RUnlock()
1✔
725

1✔
726
        v, ok := s.data[key]
1✔
727
        if !ok {
2✔
728
                return 0
1✔
729
        }
1✔
730

731
        value, ok := v.(float32)
1✔
732
        if !ok {
2✔
733
                return 0
1✔
734
        }
1✔
735
        return value
1✔
736
}
737

738
// Float64 returns the value associated with the key as a float64.
739
func (s *sessionImpl) Float64(key string) float64 {
1✔
740
        s.RLock()
1✔
741
        defer s.RUnlock()
1✔
742

1✔
743
        v, ok := s.data[key]
1✔
744
        if !ok {
2✔
745
                return 0
1✔
746
        }
1✔
747

748
        value, ok := v.(float64)
1✔
749
        if !ok {
2✔
750
                return 0
1✔
751
        }
1✔
752
        return value
1✔
753
}
754

755
// String returns the value associated with the key as a string.
756
func (s *sessionImpl) String(key string) string {
1✔
757
        s.RLock()
1✔
758
        defer s.RUnlock()
1✔
759

1✔
760
        v, ok := s.data[key]
1✔
761
        if !ok {
2✔
762
                return ""
1✔
763
        }
1✔
764

765
        value, ok := v.(string)
1✔
766
        if !ok {
2✔
767
                return ""
1✔
768
        }
1✔
769
        return value
1✔
770
}
771

772
// Value returns the value associated with the key as a interface{}.
773
func (s *sessionImpl) Value(key string) interface{} {
1✔
774
        s.RLock()
1✔
775
        defer s.RUnlock()
1✔
776

1✔
777
        return s.data[key]
1✔
778
}
1✔
779

780
func (s *sessionImpl) bindInFront(ctx context.Context) error {
1✔
781
        return s.sendRequestToFront(ctx, constants.SessionBindRoute, false)
1✔
782
}
1✔
783

784
// PushToFront updates the session in the frontend
785
func (s *sessionImpl) PushToFront(ctx context.Context) error {
1✔
786
        if s.IsFrontend {
2✔
787
                return constants.ErrFrontSessionCantPushToFront
1✔
788
        }
1✔
789
        return s.sendRequestToFront(ctx, constants.SessionPushRoute, true)
1✔
790
}
791

792
// Clear releases all data related to current session
793
func (s *sessionImpl) Clear() {
1✔
794
        s.Lock()
1✔
795
        defer s.Unlock()
1✔
796

1✔
797
        s.uid = ""
1✔
798
        s.data = map[string]interface{}{}
1✔
799
        s.updateEncodedData()
1✔
800
}
1✔
801

802
// SetHandshakeData sets the handshake data received by the client.
803
func (s *sessionImpl) SetHandshakeData(data *HandshakeData) {
1✔
804
        s.Lock()
1✔
805
        defer s.Unlock()
1✔
806

1✔
807
        s.handshakeData = data
1✔
808
}
1✔
809

810
// GetHandshakeData gets the handshake data received by the client.
811
func (s *sessionImpl) GetHandshakeData() *HandshakeData {
1✔
812
        return s.handshakeData
1✔
813
}
1✔
814

815
// GetHandshakeValidators return the handshake validators associated with the session.
816
func (s *sessionImpl) GetHandshakeValidators() map[string]func(data *HandshakeData) error {
1✔
817
        return s.handshakeValidators
1✔
818
}
1✔
819

820
func (s *sessionImpl) ValidateHandshake(data *HandshakeData) error {
1✔
821
        for name, fun := range s.handshakeValidators {
2✔
822
                if err := fun(data); err != nil {
2✔
823
                        return fmt.Errorf("failed to run '%s' validator: %w. SessionId=%d", name, err, s.ID())
1✔
824
                }
1✔
825
        }
826

827
        return nil
1✔
828
}
829

830
func (s *sessionImpl) sendRequestToFront(ctx context.Context, route string, includeData bool) error {
1✔
831
        sessionData := &protos.Session{
1✔
832
                Id:  s.frontendSessionID,
1✔
833
                Uid: s.uid,
1✔
834
        }
1✔
835
        if includeData {
2✔
836
                sessionData.Data = s.encodedData
1✔
837
        }
1✔
838
        b, err := proto.Marshal(sessionData)
1✔
839
        if err != nil {
1✔
840
                return err
×
841
        }
×
842
        res, err := s.entity.SendRequest(ctx, s.frontendID, route, b)
1✔
843
        if err != nil {
2✔
844
                return err
1✔
845
        }
1✔
846
        logger.Log.Debugf("%s Got response: %+v", route, res)
1✔
847
        return nil
1✔
848
}
849

850
func (s *sessionImpl) HasRequestsInFlight() bool {
1✔
851
        return len(s.requestsInFlight.m) != 0
1✔
852
}
1✔
853

854
func (s *sessionImpl) GetRequestsInFlight() ReqInFlight {
×
855
        return s.requestsInFlight
×
856
}
×
857

858
func (s *sessionImpl) SetRequestInFlight(reqID string, reqData string, inFlight bool) {
×
859
        s.requestsInFlight.mu.Lock()
×
860
        if inFlight {
×
861
                s.requestsInFlight.m[reqID] = reqData
×
862
        } else {
×
863
                if _, ok := s.requestsInFlight.m[reqID]; ok {
×
864
                        delete(s.requestsInFlight.m, reqID)
×
865
                }
×
866
        }
867
        s.requestsInFlight.mu.Unlock()
×
868
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc