• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 15070819794

16 May 2025 01:13PM UTC coverage: 62.699% (+0.5%) from 62.235%
15070819794

Pull #452

github

Felippe Durán
Add e2e test to validate unique session doesn't fail if kick fails
Pull Request #452: Draft: Fix unique session kick failure preventing clients to connect

78 of 83 new or added lines in 4 files covered. (93.98%)

20 existing lines in 3 files now uncovered.

5152 of 8217 relevant lines covered (62.7%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.11
/service/handler.go
1
// Copyright (c) nano Author and TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package service
22

23
import (
24
        "context"
25
        "encoding/json"
26
        "errors"
27
        "fmt"
28
        "net"
29
        "strings"
30
        "time"
31

32
        "github.com/nats-io/nuid"
33

34
        "github.com/topfreegames/pitaya/v2/acceptor"
35
        "github.com/topfreegames/pitaya/v2/pipeline"
36

37
        opentracing "github.com/opentracing/opentracing-go"
38
        "github.com/topfreegames/pitaya/v2/agent"
39
        "github.com/topfreegames/pitaya/v2/cluster"
40
        "github.com/topfreegames/pitaya/v2/component"
41
        "github.com/topfreegames/pitaya/v2/conn/codec"
42
        "github.com/topfreegames/pitaya/v2/conn/message"
43
        "github.com/topfreegames/pitaya/v2/conn/packet"
44
        "github.com/topfreegames/pitaya/v2/constants"
45
        pcontext "github.com/topfreegames/pitaya/v2/context"
46
        "github.com/topfreegames/pitaya/v2/docgenerator"
47
        e "github.com/topfreegames/pitaya/v2/errors"
48
        "github.com/topfreegames/pitaya/v2/logger"
49
        "github.com/topfreegames/pitaya/v2/metrics"
50
        "github.com/topfreegames/pitaya/v2/route"
51
        "github.com/topfreegames/pitaya/v2/serialize"
52
        "github.com/topfreegames/pitaya/v2/session"
53
        "github.com/topfreegames/pitaya/v2/timer"
54
        "github.com/topfreegames/pitaya/v2/tracing"
55
)
56

57
var (
58
        handlerType = "handler"
59
)
60

61
type (
62
        // HandlerService service
63
        HandlerService struct {
64
                baseService
65
                chLocalProcess   chan unhandledMessage // channel of messages that will be processed locally
66
                chRemoteProcess  chan unhandledMessage // channel of messages that will be processed remotely
67
                decoder          codec.PacketDecoder   // binary decoder
68
                remoteService    *RemoteService
69
                serializer       serialize.Serializer          // message serializer
70
                server           *cluster.Server               // server obj
71
                services         map[string]*component.Service // all registered service
72
                metricsReporters []metrics.Reporter
73
                agentFactory     agent.AgentFactory
74
                handlerPool      *HandlerPool
75
                handlers         map[string]*component.Handler // all handler method
76
        }
77

78
        unhandledMessage struct {
79
                ctx   context.Context
80
                agent agent.Agent
81
                route *route.Route
82
                msg   *message.Message
83
        }
84
)
85

86
// NewHandlerService creates and returns a new handler service
87
func NewHandlerService(
88
        packetDecoder codec.PacketDecoder,
89
        serializer serialize.Serializer,
90
        localProcessBufferSize int,
91
        remoteProcessBufferSize int,
92
        server *cluster.Server,
93
        remoteService *RemoteService,
94
        agentFactory agent.AgentFactory,
95
        metricsReporters []metrics.Reporter,
96
        handlerHooks *pipeline.HandlerHooks,
97
        handlerPool *HandlerPool,
98
) *HandlerService {
1✔
99
        h := &HandlerService{
1✔
100
                services:         make(map[string]*component.Service),
1✔
101
                chLocalProcess:   make(chan unhandledMessage, localProcessBufferSize),
1✔
102
                chRemoteProcess:  make(chan unhandledMessage, remoteProcessBufferSize),
1✔
103
                decoder:          packetDecoder,
1✔
104
                serializer:       serializer,
1✔
105
                server:           server,
1✔
106
                remoteService:    remoteService,
1✔
107
                agentFactory:     agentFactory,
1✔
108
                metricsReporters: metricsReporters,
1✔
109
                handlerPool:      handlerPool,
1✔
110
                handlers:         make(map[string]*component.Handler),
1✔
111
        }
1✔
112

1✔
113
        h.handlerHooks = handlerHooks
1✔
114

1✔
115
        return h
1✔
116
}
1✔
117

118
// Dispatch message to corresponding logic handler
119
func (h *HandlerService) Dispatch(thread int) {
×
120
        // TODO: This timer is being stopped multiple times, it probably doesn't need to be stopped here
×
121
        defer timer.GlobalTicker.Stop()
×
122

×
123
        for {
×
124
                // Calls to remote servers block calls to local server
×
125
                select {
×
126
                case lm := <-h.chLocalProcess:
×
127
                        metrics.ReportMessageProcessDelayFromCtx(lm.ctx, h.metricsReporters, "local")
×
128
                        h.localProcess(lm.ctx, lm.agent, lm.route, lm.msg)
×
129

130
                case rm := <-h.chRemoteProcess:
×
131
                        metrics.ReportMessageProcessDelayFromCtx(rm.ctx, h.metricsReporters, "remote")
×
132
                        h.remoteService.remoteProcess(rm.ctx, nil, rm.agent, rm.route, rm.msg)
×
133

134
                case <-timer.GlobalTicker.C: // execute cron task
×
135
                        timer.Cron()
×
136

137
                case t := <-timer.Manager.ChCreatedTimer: // new Timers
×
138
                        timer.AddTimer(t)
×
139

140
                case id := <-timer.Manager.ChClosingTimer: // closing Timers
×
141
                        timer.RemoveTimer(id)
×
142
                }
143
        }
144
}
145

146
// Register registers components
147
func (h *HandlerService) Register(comp component.Component, opts []component.Option) error {
1✔
148
        s := component.NewService(comp, opts)
1✔
149

1✔
150
        if _, ok := h.services[s.Name]; ok {
2✔
151
                return fmt.Errorf("handler: service already defined: %s", s.Name)
1✔
152
        }
1✔
153

154
        if err := s.ExtractHandler(); err != nil {
2✔
155
                return err
1✔
156
        }
1✔
157

158
        // register all handlers
159
        h.services[s.Name] = s
1✔
160
        for name, handler := range s.Handlers {
2✔
161
                h.handlerPool.Register(s.Name, name, handler)
1✔
162
        }
1✔
163
        return nil
1✔
164
}
165

166
// Handle handles messages from a conn
167
func (h *HandlerService) Handle(conn acceptor.PlayerConn) {
1✔
168
        // create a client agent and startup write goroutine
1✔
169
        a := h.agentFactory.CreateAgent(conn)
1✔
170

1✔
171
        // startup agent goroutine
1✔
172
        go a.Handle()
1✔
173

1✔
174
        logger := logger.Log.WithFields(map[string]interface{}{
1✔
175
                "session_id": a.GetSession().ID(),
1✔
176
                "uid":        a.GetSession().UID(),
1✔
177
                "remote":     a.RemoteAddr().String(),
1✔
178
        })
1✔
179

1✔
180
        logger.Debugf("New session established")
1✔
181

1✔
182
        // guarantee agent related resource is destroyed
1✔
183
        defer func() {
2✔
184
                a.GetSession().Close()
1✔
185
                logger.Debugf("Session read goroutine exit")
1✔
186
        }()
1✔
187

188
        for {
2✔
189
                msg, err := conn.GetNextMessage()
1✔
190

1✔
191
                if err != nil {
2✔
192
                        // Check if this is an expected error due to connection being closed
1✔
193
                        if errors.Is(err, net.ErrClosed) || err == constants.ErrConnectionClosed {
1✔
NEW
194
                                logger.WithError(err).Debug("Connection no longer available while reading next available message")
×
195
                        } else {
1✔
196
                                // Differentiate errors for valid sessions, to avoid noise from load balancer healthchecks and other internet noise
1✔
197
                                if a.GetStatus() != constants.StatusStart {
2✔
198
                                        logger.WithError(err).Error("Error reading next available message")
1✔
199
                                } else {
1✔
NEW
200
                                        logger.WithError(err).Debug("Error reading next available message on initial connection")
×
201
                                }
×
202
                        }
203

204
                        return
1✔
205
                }
206

207
                packets, err := h.decoder.Decode(msg)
1✔
208
                if err != nil {
1✔
NEW
209
                        logger.WithError(err).Errorf("Failed to decode message")
×
210
                        return
×
211
                }
×
212

213
                if len(packets) < 1 {
1✔
NEW
214
                        logger.WithField("data", msg).Warnf("Read no packets")
×
215
                        continue
×
216
                }
217

218
                // process all packet
219
                for i := range packets {
2✔
220
                        if err := h.processPacket(a, packets[i]); err != nil {
1✔
NEW
221
                                logger.WithError(err).Errorf("Failed to process packet")
×
222
                                return
×
223
                        }
×
224
                }
225
        }
226
}
227

228
func (h *HandlerService) processPacket(a agent.Agent, p *packet.Packet) error {
1✔
229
        switch p.Type {
1✔
230
        case packet.Handshake:
1✔
231
                logger.Log.Debug("Received handshake packet")
1✔
232

1✔
233
                // Parse the json sent with the handshake by the client
1✔
234
                handshakeData := &session.HandshakeData{}
1✔
235
                if err := json.Unmarshal(p.Data, handshakeData); err != nil {
2✔
236
                        defer a.Close()
1✔
237
                        logger.Log.Errorf("Failed to unmarshal handshake data: %s", err.Error())
1✔
238
                        if serr := a.SendHandshakeErrorResponse(); serr != nil {
1✔
239
                                logger.Log.Errorf("Error sending handshake error response: %s", err.Error())
×
240
                                return err
×
241
                        }
×
242

243
                        return fmt.Errorf("invalid handshake data. Id=%d", a.GetSession().ID())
1✔
244
                }
245

246
                if err := a.GetSession().ValidateHandshake(handshakeData); err != nil {
2✔
247
                        defer a.Close()
1✔
248
                        logger.Log.Errorf("Handshake validation failed: %s", err.Error())
1✔
249
                        if serr := a.SendHandshakeErrorResponse(); serr != nil {
1✔
250
                                logger.Log.Errorf("Error sending handshake error response: %s", err.Error())
×
251
                                return err
×
252
                        }
×
253

254
                        return fmt.Errorf("handshake validation failed: %w. SessionId=%d", err, a.GetSession().ID())
1✔
255
                }
256

257
                if err := a.SendHandshakeResponse(); err != nil {
1✔
258
                        logger.Log.Errorf("Error sending handshake response: %s", err.Error())
×
259
                        return err
×
260
                }
×
261
                logger.Log.Debugf("Session handshake Id=%d, Remote=%s", a.GetSession().ID(), a.RemoteAddr())
1✔
262

1✔
263
                a.GetSession().SetHandshakeData(handshakeData)
1✔
264
                a.SetStatus(constants.StatusHandshake)
1✔
265
                err := a.GetSession().Set(constants.IPVersionKey, a.IPVersion())
1✔
266
                if err != nil {
1✔
267
                        logger.Log.Warnf("failed to save ip version on session: %q\n", err)
×
268
                }
×
269

270
                logger.Log.Debug("Successfully saved handshake data")
1✔
271

272
        case packet.HandshakeAck:
1✔
273
                a.SetStatus(constants.StatusWorking)
1✔
274
                logger.Log.Debugf("Receive handshake ACK Id=%d, Remote=%s", a.GetSession().ID(), a.RemoteAddr())
1✔
275

276
        case packet.Data:
1✔
277
                if a.GetStatus() < constants.StatusWorking {
2✔
278
                        return fmt.Errorf("receive data on socket which is not yet ACK, session will be closed immediately, remote=%s",
1✔
279
                                a.RemoteAddr().String())
1✔
280
                }
1✔
281

282
                msg, err := message.Decode(p.Data)
1✔
283
                if err != nil {
2✔
284
                        return err
1✔
285
                }
1✔
286
                h.processMessage(a, msg)
1✔
287

288
        case packet.Heartbeat:
1✔
289
                // expected
290
        }
291

292
        a.SetLastAt()
1✔
293
        return nil
1✔
294
}
295

296
func (h *HandlerService) processMessage(a agent.Agent, msg *message.Message) {
1✔
297
        requestID := nuid.New().Next()
1✔
298
        ctx := pcontext.AddToPropagateCtx(context.Background(), constants.StartTimeKey, time.Now().UnixNano())
1✔
299
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RouteKey, msg.Route)
1✔
300
        ctx = pcontext.AddToPropagateCtx(ctx, constants.RequestIDKey, requestID)
1✔
301
        tags := opentracing.Tags{
1✔
302
                "local.id":   h.server.ID,
1✔
303
                "span.kind":  "server",
1✔
304
                "msg.type":   strings.ToLower(msg.Type.String()),
1✔
305
                "user.id":    a.GetSession().UID(),
1✔
306
                "request.id": requestID,
1✔
307
        }
1✔
308
        ctx = tracing.StartSpan(ctx, msg.Route, tags)
1✔
309
        ctx = context.WithValue(ctx, constants.SessionCtxKey, a.GetSession())
1✔
310

1✔
311
        r, err := route.Decode(msg.Route)
1✔
312
        if err != nil {
2✔
313
                logger.Log.Errorf("Failed to decode route: %s", err.Error())
1✔
314
                a.AnswerWithError(ctx, msg.ID, e.NewError(err, e.ErrBadRequestCode))
1✔
315
                return
1✔
316
        }
1✔
317

318
        if r.SvType == "" {
2✔
319
                r.SvType = h.server.Type
1✔
320
        }
1✔
321

322
        message := unhandledMessage{
1✔
323
                ctx:   ctx,
1✔
324
                agent: a,
1✔
325
                route: r,
1✔
326
                msg:   msg,
1✔
327
        }
1✔
328
        if r.SvType == h.server.Type {
2✔
329
                h.chLocalProcess <- message
1✔
330
        } else {
2✔
331
                if h.remoteService != nil {
2✔
332
                        h.chRemoteProcess <- message
1✔
333
                } else {
1✔
334
                        logger.Log.Warnf("request made to another server type but no remoteService running")
×
335
                }
×
336
        }
337
}
338

339
func (h *HandlerService) localProcess(ctx context.Context, a agent.Agent, route *route.Route, msg *message.Message) {
1✔
340
        var mid uint
1✔
341
        switch msg.Type {
1✔
342
        case message.Request:
1✔
343
                mid = msg.ID
1✔
344
        case message.Notify:
×
345
                mid = 0
×
346
        }
347

348
        ret, err := h.handlerPool.ProcessHandlerMessage(ctx, route, h.serializer, h.handlerHooks, a.GetSession(), msg.Data, msg.Type, false)
1✔
349
        if msg.Type != message.Notify {
2✔
350
                if err != nil {
2✔
351
                        logger.Log.Errorf("Failed to process handler message: %s", err.Error())
1✔
352
                        a.AnswerWithError(ctx, mid, err)
1✔
353
                } else {
2✔
354
                        err := a.GetSession().ResponseMID(ctx, mid, ret)
1✔
355
                        if err != nil {
1✔
356
                                logger.Log.Errorf("Failed to process handler message: %s", err.Error())
×
357
                                tracing.FinishSpan(ctx, err)
×
358
                                metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, err)
×
359
                        }
×
360
                }
361
        } else {
×
362
                metrics.ReportTimingFromCtx(ctx, h.metricsReporters, handlerType, err)
×
363
                tracing.FinishSpan(ctx, err)
×
364
                if err != nil {
×
365
                        logger.Log.Errorf("Failed to process notify message: %s", err.Error())
×
366
                }
×
367
        }
368
}
369

370
// DumpServices outputs all registered services
371
func (h *HandlerService) DumpServices() {
×
372
        handlers := h.handlerPool.GetHandlers()
×
373
        for name := range handlers {
×
374
                logger.Log.Infof("registered handler %s, isRawArg: %v", name, handlers[name].IsRawArg)
×
375
        }
×
376
}
377

378
// Docs returns documentation for handlers
379
func (h *HandlerService) Docs(getPtrNames bool) (map[string]interface{}, error) {
×
380
        if h == nil {
×
381
                return map[string]interface{}{}, nil
×
382
        }
×
383
        return docgenerator.HandlersDocs(h.server.Type, h.services, getPtrNames)
×
384
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc