• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

topfreegames / pitaya / 5050400886

22 May 2023 09:25PM UTC coverage: 62.185% (+0.05%) from 62.131%
5050400886

Pull #308

github

Reinaldo Oliveira
Adding documentation about the handshake validation process
Pull Request #308: Add handshake validators

84 of 84 new or added lines in 3 files covered. (100.0%)

4769 of 7669 relevant lines covered (62.19%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.68
/agent/agent.go
1
// Copyright (c) nano Author and TFG Co. All Rights Reserved.
2
//
3
// Permission is hereby granted, free of charge, to any person obtaining a copy
4
// of this software and associated documentation files (the "Software"), to deal
5
// in the Software without restriction, including without limitation the rights
6
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7
// copies of the Software, and to permit persons to whom the Software is
8
// furnished to do so, subject to the following conditions:
9
//
10
// The above copyright notice and this permission notice shall be included in all
11
// copies or substantial portions of the Software.
12
//
13
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19
// SOFTWARE.
20

21
package agent
22

23
import (
24
        "context"
25
        gojson "encoding/json"
26
        e "errors"
27
        "fmt"
28
        "net"
29
        "strings"
30
        "sync"
31
        "sync/atomic"
32
        "time"
33

34
        "github.com/topfreegames/pitaya/v2/conn/codec"
35
        "github.com/topfreegames/pitaya/v2/conn/message"
36
        "github.com/topfreegames/pitaya/v2/conn/packet"
37
        "github.com/topfreegames/pitaya/v2/constants"
38
        "github.com/topfreegames/pitaya/v2/errors"
39
        "github.com/topfreegames/pitaya/v2/logger"
40
        "github.com/topfreegames/pitaya/v2/metrics"
41
        "github.com/topfreegames/pitaya/v2/protos"
42
        "github.com/topfreegames/pitaya/v2/serialize"
43
        "github.com/topfreegames/pitaya/v2/session"
44
        "github.com/topfreegames/pitaya/v2/tracing"
45
        "github.com/topfreegames/pitaya/v2/util"
46
        "github.com/topfreegames/pitaya/v2/util/compression"
47

48
        opentracing "github.com/opentracing/opentracing-go"
49
)
50

51
var (
52
        // hbd contains the heartbeat packet data
53
        hbd []byte
54
        // hrd contains the handshake response data
55
        hrd []byte
56
        // herd contains the handshake error response data
57
        herd []byte
58
        once sync.Once
59
)
60

61
const handlerType = "handler"
62

63
type (
64
        agentImpl struct {
65
                Session            session.Session // session
66
                sessionPool        session.SessionPool
67
                appDieChan         chan bool         // app die channel
68
                chDie              chan struct{}     // wait for close
69
                chSend             chan pendingWrite // push message queue
70
                chStopHeartbeat    chan struct{}     // stop heartbeats
71
                chStopWrite        chan struct{}     // stop writing messages
72
                closeMutex         sync.Mutex
73
                conn               net.Conn            // low-level conn fd
74
                decoder            codec.PacketDecoder // binary decoder
75
                encoder            codec.PacketEncoder // binary encoder
76
                heartbeatTimeout   time.Duration
77
                lastAt             int64 // last heartbeat unix time stamp
78
                messageEncoder     message.Encoder
79
                messagesBufferSize int // size of the pending messages buffer
80
                metricsReporters   []metrics.Reporter
81
                serializer         serialize.Serializer // message serializer
82
                state              int32                // current agent state
83
        }
84

85
        pendingMessage struct {
86
                ctx     context.Context
87
                typ     message.Type // message type
88
                route   string       // message route (push)
89
                mid     uint         // response message id (response)
90
                payload interface{}  // payload
91
                err     bool         // if its an error message
92
        }
93

94
        pendingWrite struct {
95
                ctx  context.Context
96
                data []byte
97
                err  error
98
        }
99

100
        // Agent corresponds to a user and is used for storing raw Conn information
101
        Agent interface {
102
                GetSession() session.Session
103
                Push(route string, v interface{}) error
104
                ResponseMID(ctx context.Context, mid uint, v interface{}, isError ...bool) error
105
                Close() error
106
                RemoteAddr() net.Addr
107
                String() string
108
                GetStatus() int32
109
                Kick(ctx context.Context) error
110
                SetLastAt()
111
                SetStatus(state int32)
112
                Handle()
113
                IPVersion() string
114
                SendHandshakeResponse() error
115
                SendHandshakeErrorResponse() error
116
                SendRequest(ctx context.Context, serverID, route string, v interface{}) (*protos.Response, error)
117
                AnswerWithError(ctx context.Context, mid uint, err error)
118
        }
119

120
        // AgentFactory factory for creating Agent instances
121
        AgentFactory interface {
122
                CreateAgent(conn net.Conn) Agent
123
        }
124

125
        agentFactoryImpl struct {
126
                sessionPool        session.SessionPool
127
                appDieChan         chan bool           // app die channel
128
                decoder            codec.PacketDecoder // binary decoder
129
                encoder            codec.PacketEncoder // binary encoder
130
                heartbeatTimeout   time.Duration
131
                messageEncoder     message.Encoder
132
                messagesBufferSize int // size of the pending messages buffer
133
                metricsReporters   []metrics.Reporter
134
                serializer         serialize.Serializer // message serializer
135
        }
136
)
137

138
// NewAgentFactory ctor
139
func NewAgentFactory(
140
        appDieChan chan bool,
141
        decoder codec.PacketDecoder,
142
        encoder codec.PacketEncoder,
143
        serializer serialize.Serializer,
144
        heartbeatTimeout time.Duration,
145
        messageEncoder message.Encoder,
146
        messagesBufferSize int,
147
        sessionPool session.SessionPool,
148
        metricsReporters []metrics.Reporter,
149
) AgentFactory {
×
150
        return &agentFactoryImpl{
×
151
                appDieChan:         appDieChan,
×
152
                decoder:            decoder,
×
153
                encoder:            encoder,
×
154
                heartbeatTimeout:   heartbeatTimeout,
×
155
                messageEncoder:     messageEncoder,
×
156
                messagesBufferSize: messagesBufferSize,
×
157
                sessionPool:        sessionPool,
×
158
                metricsReporters:   metricsReporters,
×
159
                serializer:         serializer,
×
160
        }
×
161
}
×
162

163
// CreateAgent returns a new agent
164
func (f *agentFactoryImpl) CreateAgent(conn net.Conn) Agent {
×
165
        return newAgent(conn, f.decoder, f.encoder, f.serializer, f.heartbeatTimeout, f.messagesBufferSize, f.appDieChan, f.messageEncoder, f.metricsReporters, f.sessionPool)
×
166
}
×
167

168
// NewAgent create new agent instance
169
func newAgent(
170
        conn net.Conn,
171
        packetDecoder codec.PacketDecoder,
172
        packetEncoder codec.PacketEncoder,
173
        serializer serialize.Serializer,
174
        heartbeatTime time.Duration,
175
        messagesBufferSize int,
176
        dieChan chan bool,
177
        messageEncoder message.Encoder,
178
        metricsReporters []metrics.Reporter,
179
        sessionPool session.SessionPool,
180
) Agent {
1✔
181
        // initialize heartbeat and handshake data on first user connection
1✔
182
        serializerName := serializer.GetName()
1✔
183

1✔
184
        once.Do(func() {
2✔
185
                hbdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
1✔
186
                herdEncode(heartbeatTime, packetEncoder, messageEncoder.IsCompressionEnabled(), serializerName)
1✔
187
        })
1✔
188

189
        a := &agentImpl{
1✔
190
                appDieChan:         dieChan,
1✔
191
                chDie:              make(chan struct{}),
1✔
192
                chSend:             make(chan pendingWrite, messagesBufferSize),
1✔
193
                chStopHeartbeat:    make(chan struct{}),
1✔
194
                chStopWrite:        make(chan struct{}),
1✔
195
                messagesBufferSize: messagesBufferSize,
1✔
196
                conn:               conn,
1✔
197
                decoder:            packetDecoder,
1✔
198
                encoder:            packetEncoder,
1✔
199
                heartbeatTimeout:   heartbeatTime,
1✔
200
                lastAt:             time.Now().Unix(),
1✔
201
                serializer:         serializer,
1✔
202
                state:              constants.StatusStart,
1✔
203
                messageEncoder:     messageEncoder,
1✔
204
                metricsReporters:   metricsReporters,
1✔
205
                sessionPool:        sessionPool,
1✔
206
        }
1✔
207

1✔
208
        // binding session
1✔
209
        s := sessionPool.NewSession(a, true)
1✔
210
        metrics.ReportNumberOfConnectedClients(metricsReporters, sessionPool.GetSessionCount())
1✔
211
        a.Session = s
1✔
212
        return a
1✔
213
}
214

215
func (a *agentImpl) getMessageFromPendingMessage(pm pendingMessage) (*message.Message, error) {
1✔
216
        payload, err := util.SerializeOrRaw(a.serializer, pm.payload)
1✔
217
        if err != nil {
2✔
218
                payload, err = util.GetErrorPayload(a.serializer, err)
1✔
219
                if err != nil {
1✔
220
                        return nil, err
×
221
                }
×
222
        }
223

224
        // construct message and encode
225
        m := &message.Message{
1✔
226
                Type:  pm.typ,
1✔
227
                Data:  payload,
1✔
228
                Route: pm.route,
1✔
229
                ID:    pm.mid,
1✔
230
                Err:   pm.err,
1✔
231
        }
1✔
232

1✔
233
        return m, nil
1✔
234
}
235

236
func (a *agentImpl) packetEncodeMessage(m *message.Message) ([]byte, error) {
1✔
237
        em, err := a.messageEncoder.Encode(m)
1✔
238
        if err != nil {
1✔
239
                return nil, err
×
240
        }
×
241

242
        // packet encode
243
        p, err := a.encoder.Encode(packet.Data, em)
1✔
244
        if err != nil {
1✔
245
                return nil, err
×
246
        }
×
247
        return p, nil
1✔
248
}
249

250
func (a *agentImpl) send(pendingMsg pendingMessage) (err error) {
1✔
251
        defer func() {
2✔
252
                if e := recover(); e != nil {
2✔
253
                        err = errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
1✔
254
                }
1✔
255
        }()
256
        a.reportChannelSize()
1✔
257

1✔
258
        m, err := a.getMessageFromPendingMessage(pendingMsg)
1✔
259
        if err != nil {
1✔
260
                return err
×
261
        }
×
262

263
        // packet encode
264
        p, err := a.packetEncodeMessage(m)
1✔
265
        if err != nil {
1✔
266
                return err
×
267
        }
×
268

269
        pWrite := pendingWrite{
1✔
270
                ctx:  pendingMsg.ctx,
1✔
271
                data: p,
1✔
272
        }
1✔
273

1✔
274
        if pendingMsg.err {
2✔
275
                pWrite.err = util.GetErrorFromPayload(a.serializer, m.Data)
1✔
276
        }
1✔
277

278
        // chSend is never closed so we need this to don't block if agent is already closed
279
        select {
1✔
280
        case a.chSend <- pWrite:
1✔
281
        case <-a.chDie:
×
282
        }
283
        return
1✔
284
}
285

286
// GetSession returns the agent session
287
func (a *agentImpl) GetSession() session.Session {
×
288
        return a.Session
×
289
}
×
290

291
// Push implementation for NetworkEntity interface
292
func (a *agentImpl) Push(route string, v interface{}) error {
1✔
293
        if a.GetStatus() == constants.StatusClosed {
2✔
294
                return errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
1✔
295
        }
1✔
296

297
        switch d := v.(type) {
1✔
298
        case []byte:
1✔
299
                logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%dbytes",
1✔
300
                        a.Session.ID(), a.Session.UID(), route, len(d))
1✔
301
        default:
1✔
302
                logger.Log.Debugf("Type=Push, ID=%d, UID=%s, Route=%s, Data=%+v",
1✔
303
                        a.Session.ID(), a.Session.UID(), route, v)
1✔
304
        }
305
        return a.send(pendingMessage{typ: message.Push, route: route, payload: v})
1✔
306
}
307

308
// ResponseMID implementation for NetworkEntity interface
309
// Respond message to session
310
func (a *agentImpl) ResponseMID(ctx context.Context, mid uint, v interface{}, isError ...bool) error {
1✔
311
        err := false
1✔
312
        if len(isError) > 0 {
2✔
313
                err = isError[0]
1✔
314
        }
1✔
315
        if a.GetStatus() == constants.StatusClosed {
2✔
316
                return errors.NewError(constants.ErrBrokenPipe, errors.ErrClientClosedRequest)
1✔
317
        }
1✔
318

319
        if mid <= 0 {
2✔
320
                return constants.ErrSessionOnNotify
1✔
321
        }
1✔
322

323
        switch d := v.(type) {
1✔
324
        case []byte:
1✔
325
                logger.Log.Debugf("Type=Response, ID=%d, UID=%s, MID=%d, Data=%dbytes",
1✔
326
                        a.Session.ID(), a.Session.UID(), mid, len(d))
1✔
327
        default:
1✔
328
                logger.Log.Infof("Type=Response, ID=%d, UID=%s, MID=%d, Data=%+v",
1✔
329
                        a.Session.ID(), a.Session.UID(), mid, v)
1✔
330
        }
331

332
        return a.send(pendingMessage{ctx: ctx, typ: message.Response, mid: mid, payload: v, err: err})
1✔
333
}
334

335
// Close closes the agent, cleans inner state and closes low-level connection.
336
// Any blocked Read or Write operations will be unblocked and return errors.
337
func (a *agentImpl) Close() error {
1✔
338
        a.closeMutex.Lock()
1✔
339
        defer a.closeMutex.Unlock()
1✔
340
        if a.GetStatus() == constants.StatusClosed {
2✔
341
                return constants.ErrCloseClosedSession
1✔
342
        }
1✔
343
        a.SetStatus(constants.StatusClosed)
1✔
344

1✔
345
        logger.Log.Debugf("Session closed, ID=%d, UID=%s, IP=%s",
1✔
346
                a.Session.ID(), a.Session.UID(), a.conn.RemoteAddr())
1✔
347

1✔
348
        // prevent closing closed channel
1✔
349
        select {
1✔
350
        case <-a.chDie:
×
351
                // expect
352
        default:
1✔
353
                close(a.chStopWrite)
1✔
354
                close(a.chStopHeartbeat)
1✔
355
                close(a.chDie)
1✔
356
                a.onSessionClosed(a.Session)
1✔
357
        }
358

359
        metrics.ReportNumberOfConnectedClients(a.metricsReporters, a.sessionPool.GetSessionCount())
1✔
360

1✔
361
        return a.conn.Close()
1✔
362
}
363

364
// RemoteAddr implementation for NetworkEntity interface
365
// returns the remote network address.
366
func (a *agentImpl) RemoteAddr() net.Addr {
1✔
367
        return a.conn.RemoteAddr()
1✔
368
}
1✔
369

370
// String, implementation for Stringer interface
371
func (a *agentImpl) String() string {
1✔
372
        return fmt.Sprintf("Remote=%s, LastTime=%d", a.conn.RemoteAddr().String(), atomic.LoadInt64(&a.lastAt))
1✔
373
}
1✔
374

375
// GetStatus gets the status
376
func (a *agentImpl) GetStatus() int32 {
1✔
377
        return atomic.LoadInt32(&a.state)
1✔
378
}
1✔
379

380
// Kick sends a kick packet to a client
381
func (a *agentImpl) Kick(ctx context.Context) error {
1✔
382
        // packet encode
1✔
383
        p, err := a.encoder.Encode(packet.Kick, nil)
1✔
384
        if err != nil {
1✔
385
                return err
×
386
        }
×
387
        _, err = a.conn.Write(p)
1✔
388
        return err
1✔
389
}
390

391
// SetLastAt sets the last at to now
392
func (a *agentImpl) SetLastAt() {
1✔
393
        atomic.StoreInt64(&a.lastAt, time.Now().Unix())
1✔
394
}
1✔
395

396
// SetStatus sets the agent status
397
func (a *agentImpl) SetStatus(state int32) {
1✔
398
        atomic.StoreInt32(&a.state, state)
1✔
399
}
1✔
400

401
// Handle handles the messages from and to a client
402
func (a *agentImpl) Handle() {
1✔
403
        defer func() {
2✔
404
                a.Close()
1✔
405
                logger.Log.Debugf("Session handle goroutine exit, SessionID=%d, UID=%s", a.Session.ID(), a.Session.UID())
1✔
406
        }()
1✔
407

408
        go a.write()
1✔
409
        go a.heartbeat()
1✔
410
        <-a.chDie // agent closed signal
1✔
411
}
412

413
// IPVersion returns the remote address ip version.
414
// net.TCPAddr and net.UDPAddr implementations of String()
415
// always construct result as <ip>:<port> on both
416
// ipv4 and ipv6. Also, to see if the ip is ipv6 they both
417
// check if there is a colon on the string.
418
// So checking if there are more than one colon here is safe.
419
func (a *agentImpl) IPVersion() string {
1✔
420
        version := constants.IPv4
1✔
421

1✔
422
        ipPort := a.RemoteAddr().String()
1✔
423
        if strings.Count(ipPort, ":") > 1 {
2✔
424
                version = constants.IPv6
1✔
425
        }
1✔
426

427
        return version
1✔
428
}
429

430
func (a *agentImpl) heartbeat() {
1✔
431
        ticker := time.NewTicker(a.heartbeatTimeout)
1✔
432

1✔
433
        defer func() {
2✔
434
                ticker.Stop()
1✔
435
                a.Close()
1✔
436
        }()
1✔
437

438
        for {
2✔
439
                select {
1✔
440
                case <-ticker.C:
1✔
441
                        deadline := time.Now().Add(-2 * a.heartbeatTimeout).Unix()
1✔
442
                        if atomic.LoadInt64(&a.lastAt) < deadline {
2✔
443
                                logger.Log.Debugf("Session heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&a.lastAt), deadline)
1✔
444
                                return
1✔
445
                        }
1✔
446

447
                        // chSend is never closed so we need this to don't block if agent is already closed
448
                        select {
1✔
449
                        case a.chSend <- pendingWrite{data: hbd}:
1✔
450
                        case <-a.chDie:
×
451
                                return
×
452
                        case <-a.chStopHeartbeat:
×
453
                                return
×
454
                        }
455
                case <-a.chDie:
×
456
                        return
×
457
                case <-a.chStopHeartbeat:
1✔
458
                        return
1✔
459
                }
460
        }
461
}
462

463
func (a *agentImpl) onSessionClosed(s session.Session) {
1✔
464
        defer func() {
2✔
465
                if err := recover(); err != nil {
2✔
466
                        logger.Log.Errorf("pitaya/onSessionClosed: %v", err)
1✔
467
                }
1✔
468
        }()
469

470
        for _, fn1 := range s.GetOnCloseCallbacks() {
2✔
471
                fn1()
1✔
472
        }
1✔
473

474
        for _, fn2 := range a.sessionPool.GetSessionCloseCallbacks() {
1✔
475
                fn2(s)
×
476
        }
×
477
}
478

479
// SendHandshakeResponse sends a handshake response
480
func (a *agentImpl) SendHandshakeResponse() error {
1✔
481
        _, err := a.conn.Write(hrd)
1✔
482

1✔
483
        return err
1✔
484
}
1✔
485

486
func (a *agentImpl) SendHandshakeErrorResponse() error {
×
487
        a.SetStatus(constants.StatusClosed)
×
488
        _, err := a.conn.Write(herd)
×
489

×
490
        return err
×
491
}
×
492

493
func (a *agentImpl) write() {
1✔
494
        // clean func
1✔
495
        defer func() {
2✔
496
                a.Close()
1✔
497
        }()
1✔
498

499
        for {
2✔
500
                select {
1✔
501
                case pWrite := <-a.chSend:
1✔
502
                        // close agent if low-level Conn broken
1✔
503
                        if _, err := a.conn.Write(pWrite.data); err != nil {
1✔
504
                                tracing.FinishSpan(pWrite.ctx, err)
×
505
                                metrics.ReportTimingFromCtx(pWrite.ctx, a.metricsReporters, handlerType, err)
×
506
                                logger.Log.Errorf("Failed to write in conn: %s", err.Error())
×
507
                                return
×
508
                        }
×
509
                        var e error
1✔
510
                        tracing.FinishSpan(pWrite.ctx, e)
1✔
511
                        metrics.ReportTimingFromCtx(pWrite.ctx, a.metricsReporters, handlerType, pWrite.err)
1✔
512
                case <-a.chStopWrite:
1✔
513
                        return
1✔
514
                }
515
        }
516
}
517

518
// SendRequest sends a request to a server
519
func (a *agentImpl) SendRequest(ctx context.Context, serverID, route string, v interface{}) (*protos.Response, error) {
×
520
        return nil, e.New("not implemented")
×
521
}
×
522

523
// AnswerWithError answers with an error
524
func (a *agentImpl) AnswerWithError(ctx context.Context, mid uint, err error) {
1✔
525
        var e error
1✔
526
        defer func() {
2✔
527
                if e != nil {
2✔
528
                        tracing.FinishSpan(ctx, e)
1✔
529
                        metrics.ReportTimingFromCtx(ctx, a.metricsReporters, handlerType, e)
1✔
530
                }
1✔
531
        }()
532
        if ctx != nil && err != nil {
1✔
533
                s := opentracing.SpanFromContext(ctx)
×
534
                if s != nil {
×
535
                        tracing.LogError(s, err.Error())
×
536
                }
×
537
        }
538
        p, e := util.GetErrorPayload(a.serializer, err)
1✔
539
        if e != nil {
2✔
540
                logger.Log.Errorf("error answering the user with an error: %s", e.Error())
1✔
541
                return
1✔
542
        }
1✔
543
        e = a.Session.ResponseMID(ctx, mid, p, true)
1✔
544
        if e != nil {
1✔
545
                logger.Log.Errorf("error answering the user with an error: %s", e.Error())
×
546
        }
×
547
}
548

549
func hbdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string) {
1✔
550
        hData := map[string]interface{}{
1✔
551
                "code": 200,
1✔
552
                "sys": map[string]interface{}{
1✔
553
                        "heartbeat":  heartbeatTimeout.Seconds(),
1✔
554
                        "dict":       message.GetDictionary(),
1✔
555
                        "serializer": serializerName,
1✔
556
                },
1✔
557
        }
1✔
558

1✔
559
        data, err := encodeAndCompress(hData, dataCompression)
1✔
560
        if err != nil {
1✔
561
                panic(err)
×
562
        }
563

564
        hrd, err = packetEncoder.Encode(packet.Handshake, data)
1✔
565
        if err != nil {
1✔
566
                panic(err)
×
567
        }
568

569
        hbd, err = packetEncoder.Encode(packet.Heartbeat, nil)
1✔
570
        if err != nil {
1✔
571
                panic(err)
×
572
        }
573
}
574

575
func herdEncode(heartbeatTimeout time.Duration, packetEncoder codec.PacketEncoder, dataCompression bool, serializerName string) {
1✔
576
        hErrData := map[string]interface{}{
1✔
577
                "code": 400,
1✔
578
                "sys": map[string]interface{}{
1✔
579
                        "heartbeat":  heartbeatTimeout.Seconds(),
1✔
580
                        "dict":       message.GetDictionary(),
1✔
581
                        "serializer": serializerName,
1✔
582
                },
1✔
583
        }
1✔
584

1✔
585
        errData, err := encodeAndCompress(hErrData, dataCompression)
1✔
586
        if err != nil {
1✔
587
                panic(err)
×
588
        }
589

590
        herd, err = packetEncoder.Encode(packet.Handshake, errData)
1✔
591
        if err != nil {
1✔
592
                panic(err)
×
593
        }
594
}
595

596
func encodeAndCompress(data interface{}, dataCompression bool) ([]byte, error) {
1✔
597
        encData, err := gojson.Marshal(data)
1✔
598
        if err != nil {
1✔
599
                return nil, err
×
600
        }
×
601

602
        if dataCompression {
1✔
603
                compressedData, err := compression.DeflateData(encData)
×
604
                if err != nil {
×
605
                        return nil, err
×
606
                }
×
607

608
                if len(compressedData) < len(encData) {
×
609
                        encData = compressedData
×
610
                }
×
611
        }
612
        return encData, nil
1✔
613
}
614

615
func (a *agentImpl) reportChannelSize() {
1✔
616
        chSendCapacity := a.messagesBufferSize - len(a.chSend)
1✔
617
        if chSendCapacity == 0 {
2✔
618
                logger.Log.Warnf("chSend is at maximum capacity")
1✔
619
        }
1✔
620
        for _, mr := range a.metricsReporters {
2✔
621
                if err := mr.ReportGauge(metrics.ChannelCapacity, map[string]string{"channel": "agent_chsend"}, float64(chSendCapacity)); err != nil {
1✔
622
                        logger.Log.Warnf("failed to report chSend channel capaacity: %s", err.Error())
×
623
                }
×
624
        }
625
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc