• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 10187728597

31 Jul 2024 09:11PM UTC coverage: 58.588% (-0.6%) from 59.186%
10187728597

Pull #414

github

felipejfc
remove: grpc e2e tests from ci
Pull Request #414: Remove grpc rpc

7 of 35 new or added lines in 2 files covered. (20.0%)

3 existing lines in 1 file now uncovered.

4673 of 7976 relevant lines covered (58.59%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pkg/client/protoclient.go
1
// Copyright (c) TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package client
22

23
import (
24
        "bytes"
25
        "compress/gzip"
26
        "crypto/tls"
27
        "encoding/json"
28
        "errors"
29
        "fmt"
30
        "io"
31
        "strings"
32
        "time"
33

34
        "github.com/sirupsen/logrus"
35
        "github.com/topfreegames/pitaya/v3/pkg/conn/message"
36
        "github.com/topfreegames/pitaya/v3/pkg/logger"
37
        "github.com/topfreegames/pitaya/v3/pkg/protos"
38
        "google.golang.org/protobuf/encoding/protojson"
39
        "google.golang.org/protobuf/proto"
40
        "google.golang.org/protobuf/reflect/protodesc"
41
        "google.golang.org/protobuf/reflect/protoreflect"
42
        "google.golang.org/protobuf/reflect/protoregistry"
43
        "google.golang.org/protobuf/types/descriptorpb"
44
        "google.golang.org/protobuf/types/dynamicpb"
45
)
46

47
// Command struct. Save the input and output type and proto descriptor for each
48
// one.
49
type Command struct {
50
        input               string // input command name
51
        output              string // output command name
52
        inputMsgDescriptor  protoreflect.MessageDescriptor
53
        outputMsgDescriptor protoreflect.MessageDescriptor
54
}
55

56
// ProtoBufferInfo save all commands from a server.
57
type ProtoBufferInfo struct {
58
        Commands map[string]*Command
59
}
60

61
// ProtoClient struct
62
type ProtoClient struct {
63
        Client
64
        descriptorsNames        map[string]bool
65
        info                    ProtoBufferInfo
66
        docsRoute               string
67
        descriptorsRoute        string
68
        IncomingMsgChan         chan *message.Message
69
        expectedInputDescriptor protoreflect.MessageDescriptor
70
        ready                   bool
71
        closeChan               chan bool
72
}
73

74
// MsgChannel return the incoming message channel
75
func (pc *ProtoClient) MsgChannel() chan *message.Message {
×
76
        return pc.IncomingMsgChan
×
77
}
×
78

79
// Receive a compressed byte slice and unpack it to a FileDescriptorProto
NEW
80
func unpackDescriptor(compressedDescriptor []byte) (*descriptorpb.FileDescriptorProto, error) {
×
81
        r, err := gzip.NewReader(bytes.NewReader(compressedDescriptor))
×
82
        if err != nil {
×
83
                return nil, err
×
84
        }
×
85
        defer r.Close()
×
86

×
87
        b, err := io.ReadAll(r)
×
88
        if err != nil {
×
89
                return nil, err
×
90
        }
×
91

NEW
92
        var fileDescriptorProto descriptorpb.FileDescriptorProto
×
93

×
94
        if err = proto.Unmarshal(b, &fileDescriptorProto); err != nil {
×
95
                return nil, err
×
96
        }
×
97

98
        return &fileDescriptorProto, nil
×
99
}
100

101
// Receive an array of descriptors in binary format. The function creates the
102
// protobuffer from this data and associates it to the message.
NEW
103
func (pc *ProtoClient) buildProtosFromDescriptor(descriptorArray []*descriptorpb.FileDescriptorProto) error {
×
104

×
NEW
105
        descriptorsMap := make(map[string]*protoreflect.MessageDescriptor)
×
NEW
106

×
NEW
107
        for _, fdp := range descriptorArray {
×
NEW
108
                fd, err := protodesc.NewFile(fdp, protoregistry.GlobalFiles)
×
NEW
109
                if err != nil {
×
NEW
110
                        return fmt.Errorf("error converting to FileDescriptor: %w", err)
×
NEW
111
                }
×
NEW
112
                if err := protoregistry.GlobalFiles.RegisterFile(fd); err != nil {
×
NEW
113
                        return fmt.Errorf("error registering file: %w", err)
×
NEW
114
                }
×
115

116
        }
117

118
        /*
119
                descriptors, err := protoreflect.FileDescriptor(descriptorArray)
120
                if err != nil {
121
                        return err
122
                }
123

124
                for name := range pc.descriptorsNames {
125
                        for _, v := range descriptors {
126
                                message := v.FindMessage(name)
127
                                if message != nil {
128
                                        descriptorsMap[name] = message
129
                                }
130
                        }
131
                }
132
        */
133
        for name, cmd := range pc.info.Commands {
×
134
                if msg, ok := descriptorsMap[cmd.input]; ok {
×
NEW
135
                        pc.info.Commands[name].inputMsgDescriptor = *msg
×
136
                }
×
137
                if msg, ok := descriptorsMap[cmd.output]; ok {
×
NEW
138
                        pc.info.Commands[name].outputMsgDescriptor = *msg
×
139
                }
×
140
        }
141

142
        return nil
×
143
}
144

145
// Receives each entry from the Unmarshal json from the Docs and read the inputs and
146
// outputs associated with it. Return the output type, the input and the error.
147
func getOutputInputNames(command map[string]interface{}) (string, string, error) {
×
148
        outputName := ""
×
149
        inputName := ""
×
150

×
151
        in := command["input"]
×
152
        inputDocs, ok := in.(map[string]interface{})
×
153
        if ok {
×
154
                for k := range inputDocs {
×
155
                        if strings.Contains(k, "proto") {
×
156
                                inputName = strings.Replace(k, "*", "", 1)
×
157
                        }
×
158
                }
159
        }
160

161
        out := command["output"]
×
162
        outputDocsArr := out.([]interface{})
×
163
        // we can have handlers that have no return specified.
×
164
        if len(outputDocsArr) == 0 {
×
165
                return inputName, "", nil
×
166
        }
×
167

168
        outputDocs, ok := outputDocsArr[0].(map[string]interface{})
×
169
        if ok {
×
170
                for k := range outputDocs {
×
171
                        if strings.Contains(k, "proto") {
×
172
                                outputName = strings.Replace(k, "*", "", 1)
×
173
                        }
×
174
                }
175
        }
176

177
        return inputName, outputName, nil
×
178
}
179

180
// Get recursively all protos needed in a Unmarshal json.
181
func getKeys(info map[string]interface{}, keysSet map[string]bool) {
×
182
        for k, v := range info {
×
183
                if strings.Contains(k, "*") {
×
184
                        kew := strings.Replace(k, "*", "", 1)
×
185
                        keysSet[kew] = true
×
186
                }
×
187

188
                listofouts, ok := v.([]interface{})
×
189
                if ok {
×
190
                        for i := range listofouts {
×
191
                                aux, ok := listofouts[i].(map[string]interface{})
×
192
                                if !ok {
×
193
                                        continue
×
194
                                }
195
                                getKeys(aux, keysSet)
×
196
                        }
197
                }
198

199
                if aux, ok := v.(map[string]interface{}); ok {
×
200
                        getKeys(aux, keysSet)
×
201
                }
×
202
        }
203
}
204

205
// Receives one json string from the auto documentation, decode it and request
206
// to the server the protobuf descriptors. If the the  descriptors route are
207
// not set, this function identify the route responsible for providing the
208
// protobuf descriptors.
209
func (pc *ProtoClient) getDescriptors(data string) error {
×
210
        d := []byte(data)
×
211
        var jsonmap interface{}
×
212
        if err := json.Unmarshal(d, &jsonmap); err != nil {
×
213
                return err
×
214
        }
×
215
        m := jsonmap.(map[string]interface{})
×
216
        keysSet := make(map[string]bool)
×
217
        getKeys(m, keysSet)
×
218

×
219
        // load predefined protos
×
220
        for _, commands := range pc.info.Commands {
×
221
                if commands.input != "" {
×
222
                        keysSet[commands.input] = true
×
223
                }
×
224
                if commands.output != "" {
×
225
                        keysSet[commands.output] = true
×
226
                }
×
227
        }
228

229
        // build commands reference
230
        handlers := m["handlers"].(map[string]interface{})
×
231
        for k, v := range handlers {
×
232
                cmdInfo := v.(map[string]interface{})
×
233
                in, out, err := getOutputInputNames(cmdInfo)
×
234
                if err != nil {
×
235
                        return fmt.Errorf("failed to get output and input names for '%s' handler: %w", k, err)
×
236
                }
×
237

238
                var command Command
×
239
                command.input = in
×
240
                command.output = out
×
241

×
242
                pc.info.Commands[k] = &command
×
243
                if pc.descriptorsRoute == "" && in == "protos.ProtoNames" && out == "protos.ProtoDescriptors" {
×
244
                        pc.descriptorsRoute = k
×
245
                }
×
246
        }
247

248
        remotes := m["remotes"].(map[string]interface{})
×
249
        for k, v := range remotes {
×
250
                cmdInfo := v.(map[string]interface{})
×
251
                in, out, err := getOutputInputNames(cmdInfo)
×
252
                if err != nil {
×
253
                        return err
×
254
                }
×
255

256
                var command Command
×
257
                command.input = in
×
258
                command.output = out
×
259

×
260
                pc.info.Commands[k] = &command
×
261
        }
262

263
        names := make([]string, 0, len(keysSet))
×
264
        for key := range keysSet {
×
265
                names = append(names, key)
×
266
        }
×
267

268
        protname := &protos.ProtoNames{
×
269
                Name: names,
×
270
        }
×
271

×
272
        encodedNames, err := proto.Marshal(protname)
×
273
        if err != nil {
×
274
                return fmt.Errorf("failed to encode proto names: %w", err)
×
275
        }
×
276
        _, err = pc.SendRequest(pc.descriptorsRoute, encodedNames)
×
277
        if err != nil {
×
278
                return fmt.Errorf("failed to send proto descriptors request: %w", err)
×
279
        }
×
280

281
        response := <-pc.Client.IncomingMsgChan
×
282
        descriptors := &protos.ProtoDescriptors{}
×
283
        if err := proto.Unmarshal(response.Data, descriptors); err != nil {
×
284
                return fmt.Errorf("failed to unmarshal proto descriptors response: %w", err)
×
285
        }
×
286

287
        // get all proto types
NEW
288
        descriptorArray := make([]*descriptorpb.FileDescriptorProto, 0)
×
289
        for i := range descriptors.Desc {
×
290
                fileDescriptorProto, err := unpackDescriptor(descriptors.Desc[i])
×
291
                if err != nil {
×
292
                        return fmt.Errorf("failed to unpack descriptor: %w", err)
×
293
                }
×
294

295
                descriptorArray = append(descriptorArray, fileDescriptorProto)
×
296
                pc.descriptorsNames[names[i]] = true
×
297
        }
298

299
        if err = pc.buildProtosFromDescriptor(descriptorArray); err != nil {
×
300
                return fmt.Errorf("failed to build proto from descriptor: %w", err)
×
301
        }
×
302

303
        return nil
×
304
}
305

306
// Return the basic structure for the ProtoClient struct.
307
func newProto(docslogLevel logrus.Level, requestTimeout ...time.Duration) *ProtoClient {
×
308
        return &ProtoClient{
×
309
                Client:           *New(docslogLevel, requestTimeout...),
×
310
                descriptorsNames: make(map[string]bool),
×
311
                info: ProtoBufferInfo{
×
312
                        Commands: make(map[string]*Command),
×
313
                },
×
314
                docsRoute:        "",
×
315
                descriptorsRoute: "",
×
316
                IncomingMsgChan:  make(chan *message.Message, 10),
×
317
                closeChan:        make(chan bool),
×
318
        }
×
319
}
×
320

321
// NewProto returns a new protoclient with the auto documentation route.
322
func NewProto(docsRoute string, docslogLevel logrus.Level, requestTimeout ...time.Duration) *ProtoClient {
×
323
        newclient := newProto(docslogLevel, requestTimeout...)
×
324
        newclient.docsRoute = docsRoute
×
325
        return newclient
×
326
}
×
327

328
// NewWithDescriptor returns a new protoclient with the descriptors route and
329
// auto documentation route.
330
func NewWithDescriptor(descriptorsRoute string, docsRoute string, docslogLevel logrus.Level, requestTimeout ...time.Duration) *ProtoClient {
×
331
        newclient := newProto(docslogLevel, requestTimeout...)
×
332
        newclient.docsRoute = docsRoute
×
333
        newclient.descriptorsRoute = descriptorsRoute
×
334
        return newclient
×
335
}
×
336

337
// LoadServerInfo load commands information from the server. Addr is the
338
// server address.
339
func (pc *ProtoClient) LoadServerInfo(addr string) error {
×
340
        pc.ready = false
×
341

×
342
        if err := pc.Client.ConnectToWS(addr, "", &tls.Config{
×
343
                InsecureSkipVerify: true,
×
344
        }); err != nil {
×
345
                if err := pc.Client.ConnectToWS(addr, ""); err != nil {
×
346
                        if err := pc.Client.ConnectTo(addr, &tls.Config{
×
347
                                InsecureSkipVerify: true,
×
348
                        }); err != nil {
×
349
                                if err := pc.Client.ConnectTo(addr); err != nil {
×
350
                                        return err
×
351
                                }
×
352
                        }
353
                }
354
        }
355

356
        // request doc info
357
        _, err := pc.SendRequest(pc.docsRoute, make([]byte, 0))
×
358
        if err != nil {
×
359
                return err
×
360
        }
×
361
        response := <-pc.Client.IncomingMsgChan
×
362

×
363
        docs := &protos.Doc{}
×
364
        if err := proto.Unmarshal(response.Data, docs); err != nil {
×
365
                return fmt.Errorf("failed to unmarshal docs route response: %w", err)
×
366
        }
×
367

368
        if err := pc.getDescriptors(docs.Doc); err != nil {
×
369
                return fmt.Errorf("failed to read proto descriptors: %w", err)
×
370
        }
×
371

372
        pc.Disconnect()
×
373
        pc.ready = true
×
374

×
375
        return nil
×
376
}
377

378
// Disconnect the client
379
func (pc *ProtoClient) Disconnect() {
×
380
        pc.Client.Disconnect()
×
381
        if pc.ready {
×
382
                pc.closeChan <- true
×
383
        }
×
384
}
385

386
// Wait for new messages from the server or the connection end. If the menssage
387
// has a response.Route, it decodes based on it. If not, it will try to decode
388
// the menssage using the last expected response.
389
func (pc *ProtoClient) waitForData() {
×
390
        for {
×
391
                select {
×
392
                case response := <-pc.Client.IncomingMsgChan:
×
NEW
393
                        inputMsg := dynamicpb.NewMessage(pc.expectedInputDescriptor)
×
394

×
395
                        msg, ok := pc.info.Commands[response.Route]
×
396
                        if ok {
×
NEW
397
                                inputMsg = dynamicpb.NewMessage(msg.outputMsgDescriptor)
×
398
                        } else {
×
399
                                pc.expectedInputDescriptor = nil
×
400
                        }
×
401

402
                        if response.Err {
×
403
                                errMsg := &protos.Error{}
×
404
                                err := proto.Unmarshal(response.Data, errMsg)
×
405
                                if err != nil {
×
406
                                        logger.Log.Errorf("Erro decode error data: %s", string(response.Data))
×
407
                                        continue
×
408
                                }
409
                                response.Data, err = json.Marshal(errMsg)
×
410
                                if err != nil {
×
411
                                        logger.Log.Errorf("error encode error to json: %s", string(response.Data))
×
412
                                        continue
×
413
                                }
414
                                pc.IncomingMsgChan <- response
×
415
                                continue
×
416
                        }
417

418
                        if inputMsg == nil {
×
419
                                logger.Log.Errorf("not expected data: %s", string(response.Data))
×
420
                                continue
×
421
                        }
422

NEW
423
                        err := proto.Unmarshal(response.Data, inputMsg)
×
424
                        if err != nil {
×
425
                                logger.Log.Errorf("error decode data: %s", string(response.Data))
×
426
                                continue
×
427
                        }
428

NEW
429
                        data, err2 := protojson.Marshal(inputMsg)
×
430
                        if err2 != nil {
×
431
                                logger.Log.Errorf("error encode data to json: %s", string(response.Data))
×
432
                                continue
×
433
                        }
434

435
                        response.Data = data
×
436
                        pc.IncomingMsgChan <- response
×
437
                case <-pc.closeChan:
×
438
                        return
×
439
                }
440
        }
441
}
442

443
// ConnectTo connects to the server at addr, for now the only supported protocol is tcp
444
// this methods blocks as it also handles the messages from the server
445
func (pc *ProtoClient) ConnectTo(addr string, tlsConfig ...*tls.Config) error {
×
446
        err := pc.Client.ConnectTo(addr, tlsConfig...)
×
447
        if err != nil {
×
448
                return err
×
449
        }
×
450

451
        if !pc.ready {
×
452
                err = pc.LoadServerInfo(addr)
×
453
                if err != nil {
×
454
                        return err
×
455
                }
×
456
        }
457

458
        if pc.ready {
×
459
                go pc.waitForData()
×
460
        }
×
461
        return nil
×
462
}
463

464
// ExportInformation export supported server commands information
465
func (pc *ProtoClient) ExportInformation() *ProtoBufferInfo {
×
466
        if !pc.ready {
×
467
                return nil
×
468
        }
×
469
        return &pc.info
×
470
}
471

472
// LoadInfo load commands information form ProtoBufferInfo
473
func (pc *ProtoClient) LoadInfo(info *ProtoBufferInfo) error {
×
474
        if info == nil {
×
475
                return errors.New("protobuffer information invalid")
×
476
        }
×
477
        pc.info = *info
×
478
        pc.ready = true
×
479
        return nil
×
480
}
481

482
// AddPushResponse add a push response. Must be ladded before LoadInfo.
483
func (pc *ProtoClient) AddPushResponse(route string, protoName string) {
×
484
        if route != "" && protoName != "" {
×
485
                var command Command
×
486
                command.input = ""
×
487
                command.output = protoName
×
488

×
489
                pc.info.Commands[route] = &command
×
490
        }
×
491
}
492

493
// SendRequest sends a request to the server
494
func (pc *ProtoClient) SendRequest(route string, data []byte) (uint, error) {
×
495

×
496
        if !pc.ready {
×
497
                return pc.Client.SendRequest(route, data)
×
498
        }
×
499

500
        if cmd, ok := pc.info.Commands[route]; ok {
×
501
                if len(data) < 0 || string(data) == "{}" || cmd.inputMsgDescriptor == nil {
×
502
                        pc.expectedInputDescriptor = cmd.outputMsgDescriptor
×
503
                        data = data[:0]
×
504
                        return pc.Client.SendRequest(route, data)
×
505
                }
×
NEW
506
                inputMsg := dynamicpb.NewMessage(cmd.inputMsgDescriptor)
×
NEW
507
                if err := proto.Unmarshal(data, inputMsg); err != nil {
×
508
                        return 0, err
×
509
                }
×
NEW
510
                realdata, err := protojson.Marshal(inputMsg)
×
511
                if err != nil {
×
512
                        return 0, err
×
513
                }
×
514
                pc.expectedInputDescriptor = cmd.outputMsgDescriptor
×
515
                return pc.Client.SendRequest(route, realdata)
×
516
        }
517

518
        return 0, errors.New("Invalid Route: " + route)
×
519
}
520

521
// SendNotify sends a notify to the server
522
func (pc *ProtoClient) SendNotify(route string, data []byte) error {
×
523

×
524
        if cmd, ok := pc.info.Commands[route]; ok {
×
NEW
525
                inputMsg := dynamicpb.NewMessage(cmd.inputMsgDescriptor)
×
NEW
526
                err := proto.Unmarshal(data, inputMsg)
×
527
                if err != nil {
×
528
                        return err
×
529
                }
×
NEW
530
                realdata, err := protojson.Marshal(inputMsg)
×
531
                if err != nil {
×
532
                        return err
×
533
                }
×
534
                return pc.Client.SendNotify(route, realdata)
×
535
        }
536

537
        return errors.New("invalid route")
×
538
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc