• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valinurovam / garagemq / 6729693490

02 Nov 2023 07:48AM UTC coverage: 85.38% (-0.05%) from 85.433%
6729693490

push

github

web-flow
Merge pull request #79 from valinurovam/dependabot/npm_and_yarn/admin-frontend/minimist-and-mkdirp-1.2.8

build(deps): bump minimist and mkdirp in /admin-frontend

3171 of 3714 relevant lines covered (85.38%)

9767.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.44
/server/connection.go
1
package server
2

3
import (
4
        "bufio"
5
        "bytes"
6
        "context"
7
        "fmt"
8
        "net"
9
        "sort"
10
        "strings"
11
        "sync"
12
        "sync/atomic"
13
        "time"
14

15
        "github.com/sasha-s/go-deadlock"
16

17
        log "github.com/sirupsen/logrus"
18

19
        "github.com/valinurovam/garagemq/amqp"
20
        "github.com/valinurovam/garagemq/metrics"
21
        "github.com/valinurovam/garagemq/qos"
22
)
23

24
// connection status list
25
const (
26
        ConnStart = iota
27
        ConnStartOK
28
        ConnSecure
29
        ConnSecureOK
30
        ConnTune
31
        ConnTuneOK
32
        ConnOpen
33
        ConnOpenOK
34
        ConnCloseOK
35
        ConnClosed
36
)
37

38
// From https://github.com/rabbitmq/rabbitmq-common/blob/master/src/rabbit_writer.erl
39
// When the amount of protocol method data buffered exceeds
40
// this threshold, a socket flush is performed.
41
//
42
// This magic number is the tcp-over-ethernet MSS (1460) minus the
43
// minimum size of a AMQP 0-9-1 basic.deliver method frame (24) plus basic
44
// content header (22). The idea is that we want to flush just before
45
// exceeding the MSS.
46
const flushThreshold = 1414
47

48
type ConnMetricsState struct {
49
        TrafficIn  *metrics.TrackCounter
50
        TrafficOut *metrics.TrackCounter
51
}
52

53
// Connection represents AMQP-connection
54
type Connection struct {
55
        id               uint64
56
        server           *Server
57
        netConn          *net.TCPConn
58
        logger           *log.Entry
59
        channelsLock     deadlock.RWMutex
60
        channels         map[uint16]*Channel
61
        outgoing         chan *amqp.Frame
62
        clientProperties *amqp.Table
63
        maxChannels      uint16
64
        maxFrameSize     uint32
65
        statusLock       deadlock.RWMutex
66
        status           int
67
        qos              *qos.AmqpQos
68
        virtualHost      *VirtualHost
69
        vhostName        string
70
        closeCh          chan bool
71
        srvMetrics       *SrvMetricsState
72
        metrics          *ConnMetricsState
73
        userName         string
74

75
        wg        *sync.WaitGroup
76
        ctx       context.Context
77
        cancelCtx context.CancelFunc
78

79
        heartbeatInterval uint16
80
        heartbeatTimeout  uint16
81
        heartbeatTimer    *time.Ticker
82

83
        lastOutgoingTS chan time.Time
84
}
85

86
// NewConnection returns new instance of amqp Connection
87
func NewConnection(server *Server, netConn *net.TCPConn) (connection *Connection) {
189✔
88
        connection = &Connection{
189✔
89
                id:                atomic.AddUint64(&server.connSeq, 1),
189✔
90
                server:            server,
189✔
91
                netConn:           netConn,
189✔
92
                channels:          make(map[uint16]*Channel),
189✔
93
                outgoing:          make(chan *amqp.Frame, 128),
189✔
94
                maxChannels:       server.config.Connection.ChannelsMax,
189✔
95
                maxFrameSize:      server.config.Connection.FrameMaxSize,
189✔
96
                qos:               qos.NewAmqpQos(0, 0),
189✔
97
                closeCh:           make(chan bool, 2),
189✔
98
                srvMetrics:        server.metrics,
189✔
99
                wg:                &sync.WaitGroup{},
189✔
100
                lastOutgoingTS:    make(chan time.Time),
189✔
101
                heartbeatInterval: 60, // default value as RabbitMQ default value
189✔
102
        }
189✔
103

189✔
104
        connection.logger = log.WithFields(log.Fields{
189✔
105
                "connectionId": connection.id,
189✔
106
        })
189✔
107

189✔
108
        connection.initMetrics()
189✔
109

189✔
110
        return
189✔
111
}
189✔
112

113
func (conn *Connection) initMetrics() {
189✔
114
        conn.metrics = &ConnMetricsState{
189✔
115
                TrafficIn:  metrics.AddCounter(fmt.Sprintf("conn.%d.traffic_in", conn.id)),
189✔
116
                TrafficOut: metrics.AddCounter(fmt.Sprintf("conn.%d.traffic_out", conn.id)),
189✔
117
        }
189✔
118
}
189✔
119

120
func (conn *Connection) close() {
538✔
121
        conn.statusLock.Lock()
538✔
122
        if conn.status == ConnClosed {
893✔
123
                conn.statusLock.Unlock()
355✔
124
                return
355✔
125
        }
355✔
126

127
        if conn.heartbeatTimer != nil {
360✔
128
                conn.heartbeatTimer.Stop()
177✔
129
        }
177✔
130

131
        conn.status = ConnClosed
183✔
132
        conn.statusLock.Unlock()
183✔
133

183✔
134
        // @todo should we check for errors here? And what should we do if error occur
183✔
135
        _ = conn.netConn.Close()
183✔
136

183✔
137
        if conn.cancelCtx != nil {
362✔
138
                conn.cancelCtx()
179✔
139
        }
179✔
140

141
        conn.wg.Wait()
183✔
142

183✔
143
        // channel0 should we be closed at the end
183✔
144
        channelIds := make([]int, 0)
183✔
145
        conn.channelsLock.Lock()
183✔
146
        for chID := range conn.channels {
452✔
147
                channelIds = append(channelIds, int(chID))
269✔
148
        }
269✔
149
        sort.Sort(sort.Reverse(sort.IntSlice(channelIds)))
183✔
150
        for _, chID := range channelIds {
452✔
151
                channel := conn.channels[uint16(chID)]
269✔
152
                channel.delete()
269✔
153
                delete(conn.channels, uint16(chID))
269✔
154
        }
269✔
155
        conn.channelsLock.Unlock()
183✔
156
        conn.clearQueues()
183✔
157

183✔
158
        conn.logger.WithFields(log.Fields{
183✔
159
                "vhost": conn.vhostName,
183✔
160
                "from":  conn.netConn.RemoteAddr(),
183✔
161
        }).Info("Connection closed")
183✔
162
        conn.server.removeConnection(conn.id)
183✔
163

183✔
164
        conn.closeCh <- true
183✔
165
}
166

167
func (conn *Connection) getChannel(id uint16) *Channel {
198✔
168
        conn.channelsLock.Lock()
198✔
169
        channel := conn.channels[id]
198✔
170
        conn.channelsLock.Unlock()
198✔
171
        return channel
198✔
172
}
198✔
173

174
func (conn *Connection) safeClose() {
180✔
175
        ch := conn.getChannel(0)
180✔
176
        if ch == nil {
186✔
177
                return
6✔
178
        }
6✔
179
        ch.SendMethod(&amqp.ConnectionClose{
174✔
180
                ReplyCode: amqp.ConnectionForced,
174✔
181
                ReplyText: "Server shutdown",
174✔
182
                ClassID:   0,
174✔
183
                MethodID:  0,
174✔
184
        })
174✔
185

174✔
186
        // let clients proper handle connection closing in 10 sec
174✔
187
        timeOut := time.After(10 * time.Second)
174✔
188

174✔
189
        select {
174✔
190
        case <-timeOut:
×
191
                conn.close()
×
192
                return
×
193
        case <-conn.closeCh:
174✔
194
                return
174✔
195
        }
196
}
197

198
func (conn *Connection) clearQueues() {
183✔
199
        virtualHost := conn.GetVirtualHost()
183✔
200
        if virtualHost == nil {
190✔
201
                // it is possible when conn close before open, for example login failure
7✔
202
                return
7✔
203
        }
7✔
204
        for _, queue := range virtualHost.GetQueues() {
291✔
205
                if queue.IsExclusive() && queue.ConnID() == conn.id {
126✔
206
                        _, err := virtualHost.DeleteQueue(queue.GetName(), false, false)
11✔
207
                        if err != nil {
11✔
208
                                // todo: what should server do?
×
209
                                continue
×
210
                        }
211
                        conn.logger.WithFields(log.Fields{
11✔
212
                                "vhost": conn.vhostName,
11✔
213
                                "queue": queue.GetName(),
11✔
214
                        }).Info("Queue deleted by exclusive connection")
11✔
215
                }
216
        }
217
}
218

219
func (conn *Connection) handleConnection() {
189✔
220
        buf := make([]byte, 8)
189✔
221
        _, err := conn.netConn.Read(buf)
189✔
222
        if err != nil {
192✔
223
                conn.logger.WithError(err).WithFields(log.Fields{
3✔
224
                        "read buffer": buf,
3✔
225
                }).Error("Error on read protocol header")
3✔
226
                conn.close()
3✔
227
                return
3✔
228
        }
3✔
229

230
        // @spec-note
231
        // If the server cannot support the protocol specified in the protocol header,
232
        // it MUST respond with a valid protocol header and then close the socket connection.
233
        // The client MUST start a new connection by sending a protocol header
234
        if !bytes.Equal(buf, amqp.AmqpHeader) {
187✔
235
                conn.logger.WithFields(log.Fields{
1✔
236
                        "given":     buf,
1✔
237
                        "supported": amqp.AmqpHeader,
1✔
238
                }).Warn("Unsupported protocol")
1✔
239
                _, _ = conn.netConn.Write(amqp.AmqpHeader)
1✔
240
                conn.close()
1✔
241
                return
1✔
242
        }
1✔
243

244
        conn.ctx, conn.cancelCtx = context.WithCancel(context.Background())
185✔
245

185✔
246
        channel := NewChannel(0, conn)
185✔
247
        conn.channelsLock.Lock()
185✔
248
        conn.channels[channel.id] = channel
185✔
249
        conn.channelsLock.Unlock()
185✔
250

185✔
251
        channel.start()
185✔
252
        conn.wg.Add(1)
185✔
253
        go conn.handleOutgoing()
185✔
254
        conn.wg.Add(1)
185✔
255
        go conn.handleIncoming()
185✔
256
}
257

258
func (conn *Connection) handleOutgoing() {
185✔
259
        defer func() {
364✔
260
                close(conn.lastOutgoingTS)
179✔
261
                conn.wg.Done()
179✔
262
                conn.close()
179✔
263
        }()
179✔
264

265
        var err error
185✔
266
        buffer := bufio.NewWriterSize(conn.netConn, 128<<10)
185✔
267
        for {
1,734✔
268
                select {
1,549✔
269
                case <-conn.ctx.Done():
176✔
270
                        return
176✔
271
                case frame := <-conn.outgoing:
1,367✔
272
                        if frame == nil {
1,367✔
273
                                conn.logger.Warn("unexpected nil frame")
×
274
                                return
×
275
                        }
×
276

277
                        if frame.Type == amqp.FrameHeartbeat {
1,367✔
278
                                conn.logger.Debug("Outgoing -> Heartbeat")
×
279
                        }
×
280

281
                        if err = amqp.WriteFrame(buffer, frame); err != nil && !conn.isClosedError(err) {
1,367✔
282
                                conn.logger.WithError(err).Warn("writing frame")
×
283
                                return
×
284
                        }
×
285

286
                        if frame.CloseAfter {
1,370✔
287
                                if err = buffer.Flush(); err != nil && !conn.isClosedError(err) {
3✔
288
                                        conn.logger.WithError(err).Warn("writing frame")
×
289
                                }
×
290
                                return
3✔
291
                        }
292

293
                        if frame.Sync {
2,372✔
294
                                conn.srvMetrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
1,008✔
295
                                conn.metrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
1,008✔
296
                                if err = buffer.Flush(); err != nil && !conn.isClosedError(err) {
1,008✔
297
                                        conn.logger.WithError(err).Warn("writing frame")
×
298
                                        return
×
299
                                }
×
300
                        } else {
356✔
301
                                if err = conn.mayBeFlushBuffer(buffer); err != nil && !conn.isClosedError(err) {
356✔
302
                                        conn.logger.WithError(err).Warn("writing frame")
×
303
                                        return
×
304
                                }
×
305
                        }
306

307
                        select {
1,364✔
308
                        case conn.lastOutgoingTS <- time.Now():
553✔
309
                        default:
811✔
310
                        }
311
                }
312
        }
313
}
314

315
func (conn *Connection) mayBeFlushBuffer(buffer *bufio.Writer) (err error) {
356✔
316
        if buffer.Buffered() >= flushThreshold {
356✔
317
                conn.srvMetrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
×
318
                conn.metrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
×
319
                if err = buffer.Flush(); err != nil {
×
320
                        return err
×
321
                }
×
322
        }
323

324
        if len(conn.outgoing) == 0 {
378✔
325
                // outgoing channel is buffered and we can check is here more messages for store into buffer
22✔
326
                // if nothing to store into buffer - we flush
22✔
327
                conn.srvMetrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
22✔
328
                conn.metrics.TrafficOut.Counter.Inc(int64(buffer.Buffered()))
22✔
329
                if err = buffer.Flush(); err != nil {
22✔
330
                        return err
×
331
                }
×
332
        }
333
        return
356✔
334
}
335

336
func (conn *Connection) handleIncoming() {
185✔
337
        defer func() {
364✔
338
                conn.wg.Done()
179✔
339
                conn.close()
179✔
340
        }()
179✔
341

342
        buffer := bufio.NewReaderSize(conn.netConn, 128<<10)
185✔
343

185✔
344
        for {
2,063✔
345
                // TODO
1,878✔
346
                // @spec-note
1,878✔
347
                // After sending connection.close, any received methods except Close and Close­OK MUST be discarded.
1,878✔
348
                // The response to receiving a Close after sending Close must be to send Close­Ok.
1,878✔
349
                frame, err := amqp.ReadFrame(buffer)
1,878✔
350
                if err != nil {
2,057✔
351
                        if err.Error() != "EOF" && !conn.isClosedError(err) {
179✔
352
                                conn.logger.WithError(err).Warn("reading frame")
×
353
                        }
×
354
                        return
179✔
355
                }
356

357
                if conn.status < ConnOpen && frame.ChannelID != 0 {
1,693✔
358
                        conn.logger.WithError(err).Error("Frame not allowed for unopened connection")
×
359
                        return
×
360
                }
×
361
                conn.srvMetrics.TrafficIn.Counter.Inc(int64(len(frame.Payload)))
1,693✔
362
                conn.metrics.TrafficIn.Counter.Inc(int64(len(frame.Payload)))
1,693✔
363

1,693✔
364
                conn.channelsLock.RLock()
1,693✔
365
                channel, ok := conn.channels[frame.ChannelID]
1,693✔
366
                conn.channelsLock.RUnlock()
1,693✔
367

1,693✔
368
                if !ok {
1,786✔
369
                        channel = NewChannel(frame.ChannelID, conn)
93✔
370

93✔
371
                        conn.channelsLock.Lock()
93✔
372
                        conn.channels[frame.ChannelID] = channel
93✔
373
                        conn.channelsLock.Unlock()
93✔
374

93✔
375
                        channel.start()
93✔
376
                }
93✔
377

378
                if conn.heartbeatTimeout > 0 {
2,847✔
379
                        if err = conn.netConn.SetReadDeadline(time.Now().Add(time.Duration(conn.heartbeatTimeout) * time.Second)); err != nil {
1,154✔
380
                                conn.logger.WithError(err).Warn("reading frame")
×
381
                                return
×
382
                        }
×
383
                }
384

385
                if frame.Type == amqp.FrameHeartbeat && frame.ChannelID != 0 {
1,693✔
386
                        return
×
387
                }
×
388

389
                if frame.Type == amqp.FrameHeartbeat {
1,693✔
390
                        conn.logger.Debug("Incoming <- Heartbeat")
×
391
                }
×
392

393
                select {
1,693✔
394
                case <-conn.ctx.Done():
×
395
                        close(channel.incoming)
×
396
                        return
×
397
                case channel.incoming <- frame:
1,693✔
398
                }
399
        }
400
}
401

402
func (conn *Connection) heartBeater() {
183✔
403
        defer conn.wg.Done()
183✔
404

183✔
405
        interval := time.Duration(conn.heartbeatInterval) * time.Second / 2
183✔
406
        conn.heartbeatTimer = time.NewTicker(interval)
183✔
407

183✔
408
        var (
183✔
409
                ok     bool
183✔
410
                lastTs = time.Now()
183✔
411
        )
183✔
412

183✔
413
        heartbeatFrame := &amqp.Frame{Type: byte(amqp.FrameHeartbeat), ChannelID: 0, Payload: []byte{}, CloseAfter: false, Sync: true}
183✔
414

183✔
415
        conn.wg.Add(1)
183✔
416
        go func() {
366✔
417
                defer conn.wg.Done()
183✔
418
                for {
588✔
419
                        lastTs, ok = <-conn.lastOutgoingTS
405✔
420
                        if !ok {
582✔
421
                                return
177✔
422
                        }
177✔
423
                }
424
        }()
425

426
        for {
697✔
427
                select {
514✔
428
                case ts, stillOutgoing := <-conn.lastOutgoingTS:
334✔
429
                        if !stillOutgoing {
337✔
430
                                return
3✔
431
                        }
3✔
432
                        lastTs = ts
331✔
433
                case tickTime := <-conn.heartbeatTimer.C:
×
434
                        if tickTime.Sub(lastTs) >= interval-time.Second {
×
435
                                conn.outgoing <- heartbeatFrame
×
436
                        }
×
437
                case <-conn.ctx.Done():
174✔
438
                        return
174✔
439
                }
440
        }
441
}
442

443
func (conn *Connection) isClosedError(err error) bool {
58✔
444
        // See: https://github.com/golang/go/issues/4373
58✔
445
        return err != nil && strings.Contains(err.Error(), "use of closed network connection")
58✔
446
}
58✔
447

448
func (conn *Connection) GetVirtualHost() *VirtualHost {
1,190✔
449
        return conn.virtualHost
1,190✔
450
}
1,190✔
451

452
func (conn *Connection) GetRemoteAddr() net.Addr {
×
453
        return conn.netConn.RemoteAddr()
×
454
}
×
455

456
func (conn *Connection) GetChannels() map[uint16]*Channel {
×
457
        return conn.channels
×
458
}
×
459

460
func (conn *Connection) GetID() uint64 {
×
461
        return conn.id
×
462
}
×
463

464
func (conn *Connection) GetUsername() string {
×
465
        return conn.userName
×
466
}
×
467

468
// GetMetrics returns metrics
469
func (conn *Connection) GetMetrics() *ConnMetricsState {
×
470
        return conn.metrics
×
471
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc