• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

valinurovam / garagemq / 10917143521

18 Sep 2024 06:59AM UTC coverage: 85.514% (-0.03%) from 85.541%
10917143521

Pull #117

github

web-flow
build(deps): bump send and express in /admin-frontend

Bumps [send](https://github.com/pillarjs/send) and [express](https://github.com/expressjs/express). These dependencies needed to be updated together.

Updates `send` from 0.18.0 to 0.19.0
- [Release notes](https://github.com/pillarjs/send/releases)
- [Changelog](https://github.com/pillarjs/send/blob/master/HISTORY.md)
- [Commits](https://github.com/pillarjs/send/compare/0.18.0...0.19.0)

Updates `express` from 4.18.2 to 4.21.0
- [Release notes](https://github.com/expressjs/express/releases)
- [Changelog](https://github.com/expressjs/express/blob/4.21.0/History.md)
- [Commits](https://github.com/expressjs/express/compare/4.18.2...4.21.0)

---
updated-dependencies:
- dependency-name: send
  dependency-type: indirect
- dependency-name: express
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #117: build(deps): bump send and express in /admin-frontend

3176 of 3714 relevant lines covered (85.51%)

9767.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.44
/queue/queue.go
1
package queue
2

3
import (
4
        "bytes"
5
        "errors"
6
        "fmt"
7
        "sync"
8
        "sync/atomic"
9

10
        "github.com/sasha-s/go-deadlock"
11

12
        "github.com/valinurovam/garagemq/amqp"
13
        "github.com/valinurovam/garagemq/config"
14
        "github.com/valinurovam/garagemq/interfaces"
15
        "github.com/valinurovam/garagemq/metrics"
16
        "github.com/valinurovam/garagemq/qos"
17
        "github.com/valinurovam/garagemq/safequeue"
18
)
19

20
// MetricsState represents current metrics states for queue
21
type MetricsState struct {
22
        Ready    *metrics.TrackCounter
23
        Unacked  *metrics.TrackCounter
24
        Total    *metrics.TrackCounter
25
        Incoming *metrics.TrackCounter
26
        Deliver  *metrics.TrackCounter
27
        Get      *metrics.TrackCounter
28
        Ack      *metrics.TrackCounter
29

30
        ServerReady   *metrics.TrackCounter
31
        ServerUnacked *metrics.TrackCounter
32
        ServerTotal   *metrics.TrackCounter
33
        ServerDeliver *metrics.TrackCounter
34
        ServerAck     *metrics.TrackCounter
35
}
36

37
// Queue is an implementation of the AMQP-queue entity
38
type Queue struct {
39
        safequeue.SafeQueue
40
        name        string
41
        connID      uint64
42
        exclusive   bool
43
        autoDelete  bool
44
        durable     bool
45
        cmrLock     deadlock.RWMutex
46
        consumers   []interfaces.Consumer
47
        consumeExcl bool
48
        call        chan struct{}
49
        wasConsumed bool
50
        shardSize   int
51
        actLock     deadlock.RWMutex
52
        active      bool
53
        // persistent storage
54
        msgPStorage interfaces.MsgStorage
55
        // transient storage
56
        msgTStorage     interfaces.MsgStorage
57
        currentConsumer int
58
        metrics         *MetricsState
59
        autoDeleteQueue chan string
60
        queueLength     int64
61

62
        // lock for sync load swapped-messages from disk
63
        loadSwapLock           deadlock.Mutex
64
        maxMessagesInRAM       uint64
65
        lastStoredMsgID        uint64
66
        lastMemMsgID           uint64
67
        swappedToDisk          bool
68
        maybeLoadFromStorageCh chan struct{}
69
        wg                     *sync.WaitGroup
70
}
71

72
// NewQueue returns new instance of Queue
73
func NewQueue(name string, connID uint64, exclusive bool, autoDelete bool, durable bool, config config.Queue, msgStorageP interfaces.MsgStorage, msgStorageT interfaces.MsgStorage, autoDeleteQueue chan string) *Queue {
39✔
74
        return &Queue{
39✔
75
                SafeQueue:              *safequeue.NewSafeQueue(config.ShardSize),
39✔
76
                name:                   name,
39✔
77
                connID:                 connID,
39✔
78
                exclusive:              exclusive,
39✔
79
                autoDelete:             autoDelete,
39✔
80
                durable:                durable,
39✔
81
                call:                   make(chan struct{}, 1),
39✔
82
                maybeLoadFromStorageCh: make(chan struct{}, 1),
39✔
83
                wasConsumed:            false,
39✔
84
                active:                 false,
39✔
85
                shardSize:              config.ShardSize,
39✔
86
                maxMessagesInRAM:       config.MaxMessagesInRAM,
39✔
87
                msgPStorage:            msgStorageP,
39✔
88
                msgTStorage:            msgStorageT,
39✔
89
                currentConsumer:        0,
39✔
90
                autoDeleteQueue:        autoDeleteQueue,
39✔
91
                swappedToDisk:          false,
39✔
92
                wg:                     &sync.WaitGroup{},
39✔
93
                metrics: &MetricsState{
39✔
94
                        Ready:    metrics.NewTrackCounter(0, true),
39✔
95
                        Unacked:  metrics.NewTrackCounter(0, true),
39✔
96
                        Total:    metrics.NewTrackCounter(0, true),
39✔
97
                        Incoming: metrics.NewTrackCounter(0, true),
39✔
98
                        Deliver:  metrics.NewTrackCounter(0, true),
39✔
99
                        Get:      metrics.NewTrackCounter(0, true),
39✔
100
                        Ack:      metrics.NewTrackCounter(0, true),
39✔
101

39✔
102
                        ServerReady:   metrics.NewTrackCounter(0, true),
39✔
103
                        ServerUnacked: metrics.NewTrackCounter(0, true),
39✔
104
                        ServerTotal:   metrics.NewTrackCounter(0, true),
39✔
105
                        ServerDeliver: metrics.NewTrackCounter(0, true),
39✔
106
                        ServerAck:     metrics.NewTrackCounter(0, true),
39✔
107
                },
39✔
108
        }
39✔
109
}
39✔
110

111
// Start starts base queue loop to send events to consumers
112
// Current consumer to handle message from queue selected by round robin
113
func (queue *Queue) Start() error {
23✔
114
        queue.actLock.Lock()
23✔
115
        defer queue.actLock.Unlock()
23✔
116

23✔
117
        if queue.active {
23✔
118
                return nil
×
119
        }
×
120

121
        queue.active = true
23✔
122
        queue.wg.Add(1)
23✔
123
        go func() {
44✔
124
                defer queue.wg.Done()
21✔
125
                for range queue.call {
641✔
126
                        func() {
1,240✔
127
                                queue.cmrLock.RLock()
620✔
128
                                defer queue.cmrLock.RUnlock()
620✔
129
                                cmrCount := len(queue.consumers)
620✔
130
                                for i := 0; i < cmrCount; i++ {
623✔
131
                                        if !queue.active {
3✔
132
                                                return
×
133
                                        }
×
134
                                        queue.currentConsumer = (queue.currentConsumer + 1) % cmrCount
3✔
135
                                        cmr := queue.consumers[queue.currentConsumer]
3✔
136
                                        if cmr.Consume() {
6✔
137
                                                return
3✔
138
                                        }
3✔
139
                                }
140
                        }()
141
                }
142
        }()
143

144
        queue.wg.Add(1)
23✔
145
        go func() {
44✔
146
                defer queue.wg.Done()
21✔
147
                for range queue.maybeLoadFromStorageCh {
352✔
148
                        queue.mayBeLoadFromStorage()
331✔
149
                }
331✔
150
        }()
151

152
        return nil
23✔
153
}
154

155
// Stop stops main queue loop
156
// After stop no one can send or receive messages from queue
157
func (queue *Queue) Stop() error {
4✔
158
        queue.actLock.Lock()
4✔
159
        defer queue.actLock.Unlock()
4✔
160

4✔
161
        queue.active = false
4✔
162
        close(queue.maybeLoadFromStorageCh)
4✔
163
        close(queue.call)
4✔
164
        queue.wg.Wait()
4✔
165
        return nil
4✔
166
}
4✔
167

168
// GetName returns queue name
169
func (queue *Queue) GetName() string {
2✔
170
        return queue.name
2✔
171
}
2✔
172

173
// Push append message into queue tail and put it into message storage
174
// if queue is durable and message's persistent flag is true
175
func (queue *Queue) Push(message *amqp.Message) {
2,409✔
176
        queue.actLock.Lock()
2,409✔
177
        defer queue.actLock.Unlock()
2,409✔
178

2,409✔
179
        if !queue.active {
2,412✔
180
                return
3✔
181
        }
3✔
182

183
        atomic.AddInt64(&queue.queueLength, 1)
2,406✔
184

2,406✔
185
        queue.metrics.ServerTotal.Counter.Inc(1)
2,406✔
186
        queue.metrics.ServerReady.Counter.Inc(1)
2,406✔
187

2,406✔
188
        queue.metrics.Total.Counter.Inc(1)
2,406✔
189
        queue.metrics.Ready.Counter.Inc(1)
2,406✔
190

2,406✔
191
        message.GenerateSeq()
2,406✔
192

2,406✔
193
        persisted := false
2,406✔
194
        if queue.durable && message.IsPersistent() {
2,457✔
195
                queue.msgPStorage.Add(message, queue.name)
51✔
196
                persisted = true
51✔
197
        } else {
2,406✔
198
                if queue.SafeQueue.Length() > queue.maxMessagesInRAM || queue.swappedToDisk {
2,400✔
199
                        queue.msgTStorage.Add(message, queue.name)
45✔
200
                        persisted = true
45✔
201
                }
45✔
202

203
                if message.ConfirmMeta != nil {
2,355✔
204
                        message.ConfirmMeta.ActualConfirms++
×
205
                }
×
206
        }
207

208
        if persisted && !queue.swappedToDisk && queue.SafeQueue.Length() > queue.maxMessagesInRAM {
2,407✔
209
                queue.swappedToDisk = true
1✔
210
                queue.lastStoredMsgID = message.ID
1✔
211
        }
1✔
212

213
        queue.metrics.Incoming.Counter.Inc(1)
2,406✔
214

2,406✔
215
        if queue.SafeQueue.Length() <= queue.maxMessagesInRAM && !queue.swappedToDisk {
4,723✔
216
                queue.SafeQueue.Push(message)
2,317✔
217
                queue.lastMemMsgID = message.ID
2,317✔
218
        }
2,317✔
219

220
        queue.callConsumers()
2,406✔
221
}
222

223
// Pop returns message from queue head without QOS check
224
func (queue *Queue) Pop() *amqp.Message {
820✔
225
        return queue.PopQos([]*qos.AmqpQos{})
820✔
226
}
820✔
227

228
// PopQos returns message from queue head with QOS check
229
func (queue *Queue) PopQos(qosList []*qos.AmqpQos) *amqp.Message {
1,590✔
230
        queue.actLock.RLock()
1,590✔
231
        if !queue.active {
1,593✔
232
                queue.actLock.RUnlock()
3✔
233
                return nil
3✔
234
        }
3✔
235
        queue.actLock.RUnlock()
1,587✔
236

1,587✔
237
        select {
1,587✔
238
        case queue.maybeLoadFromStorageCh <- struct{}{}:
331✔
239
        default:
1,256✔
240
        }
241

242
        queue.SafeQueue.Lock()
1,587✔
243
        var message *amqp.Message
1,587✔
244
        if message = queue.SafeQueue.HeadItem(); message != nil {
3,173✔
245
                allowed := true
1,586✔
246
                for _, q := range qosList {
2,382✔
247
                        if !q.IsActive() {
1,052✔
248
                                continue
256✔
249
                        }
250
                        if !q.Inc(1, uint32(message.BodySize)) {
1,014✔
251
                                allowed = false
474✔
252
                                break
474✔
253
                        }
254
                }
255

256
                if allowed {
2,698✔
257
                        queue.SafeQueue.DirtyPop()
1,112✔
258
                        atomic.AddInt64(&queue.queueLength, -1)
1,112✔
259
                } else {
1,586✔
260
                        message = nil
474✔
261
                }
474✔
262
        }
263
        queue.SafeQueue.Unlock()
1,587✔
264

1,587✔
265
        return message
1,587✔
266
}
267

268
func (queue *Queue) mayBeLoadFromStorage() {
331✔
269
        swappedToPersistent := true
331✔
270
        swappedToTransient := true
331✔
271

331✔
272
        currentLength := queue.SafeQueue.Length()
331✔
273
        needle := queue.maxMessagesInRAM - currentLength
331✔
274

331✔
275
        if currentLength >= queue.maxMessagesInRAM/2 || needle <= 0 || !queue.swappedToDisk {
653✔
276
                return
322✔
277
        }
322✔
278

279
        pMessages := make([]*amqp.Message, 0, needle)
9✔
280
        tMessages := make([]*amqp.Message, 0, needle)
9✔
281

9✔
282
        var lastIteratedMsgID uint64
9✔
283
        lastMemMsgID := queue.lastMemMsgID
9✔
284

9✔
285
        var wg sync.WaitGroup
9✔
286
        // 2 - search for transient and persistent
9✔
287
        wg.Add(2)
9✔
288

9✔
289
        go func() {
18✔
290
                if currentLength < queue.maxMessagesInRAM/2 && queue.swappedToDisk {
18✔
291
                        iterated := queue.msgPStorage.IterateByQueueFromMsgID(queue.name, queue.lastStoredMsgID, needle, func(message *amqp.Message) {
63✔
292
                                lastIteratedMsgID = message.ID
54✔
293
                                pMessages = append(pMessages, message)
54✔
294
                        })
54✔
295

296
                        if iterated == 0 || lastMemMsgID == lastIteratedMsgID {
9✔
297
                                swappedToPersistent = false
×
298
                        }
×
299
                }
300
                wg.Done()
9✔
301
        }()
302

303
        go func() {
18✔
304
                if currentLength < queue.maxMessagesInRAM/2 && queue.swappedToDisk {
18✔
305
                        iterated := queue.msgTStorage.IterateByQueueFromMsgID(queue.name, queue.lastStoredMsgID, needle, func(message *amqp.Message) {
63✔
306
                                lastIteratedMsgID = message.ID
54✔
307
                                tMessages = append(tMessages, message)
54✔
308
                        })
54✔
309

310
                        if iterated == 0 || lastMemMsgID == lastIteratedMsgID {
9✔
311
                                swappedToTransient = false
×
312
                        }
×
313
                }
314
                wg.Done()
9✔
315
        }()
316

317
        wg.Wait()
9✔
318

9✔
319
        sortedMessages := queue.mergeSortedMessageSlices(pMessages, tMessages)
9✔
320
        sortedMessageslength := uint64(len(sortedMessages))
9✔
321

9✔
322
        var pos uint64
9✔
323
        if sortedMessageslength <= needle {
9✔
324
                pos = sortedMessageslength
×
325
        } else {
9✔
326
                pos = needle
9✔
327
        }
9✔
328

329
        for _, message := range sortedMessages[0:pos] {
63✔
330
                if message.ID == lastMemMsgID {
62✔
331
                        continue
8✔
332
                }
333
                queue.SafeQueue.Push(message)
46✔
334
                queue.lastMemMsgID = message.ID
46✔
335
                queue.lastStoredMsgID = message.ID
46✔
336
                queue.callConsumers()
46✔
337
        }
338

339
        queue.swappedToDisk = swappedToPersistent || swappedToTransient
9✔
340
}
341

342
func (queue *Queue) mergeSortedMessageSlices(A, B []*amqp.Message) []*amqp.Message {
9✔
343
        result := make([]*amqp.Message, len(A)+len(B))
9✔
344

9✔
345
        idxA, idxB := 0, 0
9✔
346

9✔
347
        for i := 0; i < len(result); i++ {
117✔
348
                if idxA >= len(A) {
112✔
349
                        result[i] = B[idxB]
4✔
350
                        idxB++
4✔
351
                        continue
4✔
352
                } else if idxB >= len(B) {
109✔
353
                        result[i] = A[idxA]
5✔
354
                        idxA++
5✔
355
                        continue
5✔
356
                }
357

358
                if A[idxA].ID < B[idxB].ID {
148✔
359
                        result[i] = A[idxA]
49✔
360
                        idxA++
49✔
361
                } else {
99✔
362
                        result[i] = B[idxB]
50✔
363
                        idxB++
50✔
364
                }
50✔
365
        }
366

367
        return result
9✔
368
}
369

370
// LoadFromMsgStorage loads messages into queue from msgstorage
371
func (queue *Queue) LoadFromMsgStorage() {
2✔
372
        iterated := queue.msgPStorage.IterateByQueueFromMsgID(queue.name, 0, queue.maxMessagesInRAM, func(message *amqp.Message) {
212✔
373
                queue.SafeQueue.Push(message)
210✔
374

210✔
375
                queue.lastStoredMsgID = message.ID
210✔
376
                queue.lastMemMsgID = message.ID
210✔
377
        })
210✔
378

379
        if queue.SafeQueue.Length() >= queue.maxMessagesInRAM {
3✔
380
                queue.swappedToDisk = true
1✔
381
        }
1✔
382

383
        if iterated >= queue.maxMessagesInRAM {
3✔
384
                queue.queueLength = int64(queue.msgPStorage.GetQueueLength(queue.name))
1✔
385
        } else {
2✔
386
                queue.queueLength = int64(iterated)
1✔
387
        }
1✔
388
        queue.metrics.ServerTotal.Counter.Inc(queue.queueLength)
2✔
389
        queue.metrics.ServerReady.Counter.Inc(queue.queueLength)
2✔
390

2✔
391
        queue.metrics.Total.Counter.Inc(queue.queueLength)
2✔
392
        queue.metrics.Ready.Counter.Inc(queue.queueLength)
2✔
393
}
394

395
// AckMsg accept ack event for message
396
func (queue *Queue) AckMsg(message *amqp.Message) {
2✔
397
        queue.actLock.RLock()
2✔
398
        if !queue.active {
3✔
399
                queue.actLock.RUnlock()
1✔
400
                return
1✔
401
        }
1✔
402
        queue.actLock.RUnlock()
1✔
403

1✔
404
        if queue.durable && message.IsPersistent() {
2✔
405
                // TODO handle error
1✔
406
                queue.msgPStorage.Del(message, queue.name)
1✔
407
        }
1✔
408

409
        queue.metrics.Ack.Counter.Inc(1)
1✔
410
        queue.metrics.Total.Counter.Dec(1)
1✔
411

1✔
412
        queue.metrics.ServerAck.Counter.Inc(1)
1✔
413
        queue.metrics.ServerTotal.Counter.Dec(1)
1✔
414

1✔
415
        queue.metrics.Unacked.Counter.Dec(1)
1✔
416
        queue.metrics.ServerUnacked.Counter.Dec(1)
1✔
417
}
418

419
// Requeue add message into queue head
420
func (queue *Queue) Requeue(message *amqp.Message) {
257✔
421
        queue.actLock.RLock()
257✔
422
        if !queue.active {
257✔
423
                queue.actLock.RUnlock()
×
424
                return
×
425
        }
×
426
        queue.actLock.RUnlock()
257✔
427

257✔
428
        message.DeliveryCount++
257✔
429
        queue.SafeQueue.PushHead(message)
257✔
430
        if queue.durable && message.IsPersistent() {
258✔
431
                // TODO handle error
1✔
432
                queue.msgPStorage.Update(message, queue.name)
1✔
433
        }
1✔
434
        queue.metrics.Ready.Counter.Inc(1)
257✔
435
        queue.metrics.ServerReady.Counter.Inc(1)
257✔
436

257✔
437
        queue.metrics.Unacked.Counter.Dec(1)
257✔
438
        queue.metrics.ServerUnacked.Counter.Dec(1)
257✔
439

257✔
440
        atomic.AddInt64(&queue.queueLength, 1)
257✔
441

257✔
442
        queue.callConsumers()
257✔
443
}
444

445
// Purge clean queue and message storage for durable queues
446
func (queue *Queue) Purge() (length uint64) {
2✔
447
        queue.SafeQueue.Lock()
2✔
448
        defer queue.SafeQueue.Unlock()
2✔
449
        length = uint64(atomic.LoadInt64(&queue.queueLength))
2✔
450
        queue.SafeQueue.DirtyPurge()
2✔
451

2✔
452
        if queue.durable {
3✔
453
                queue.msgPStorage.PurgeQueue(queue.name)
1✔
454
        }
1✔
455
        queue.metrics.Total.Counter.Dec(int64(length))
2✔
456
        queue.metrics.Ready.Counter.Dec(int64(length))
2✔
457

2✔
458
        queue.metrics.ServerTotal.Counter.Dec(int64(length))
2✔
459
        queue.metrics.ServerReady.Counter.Dec(int64(length))
2✔
460
        atomic.StoreInt64(&queue.queueLength, 0)
2✔
461
        return
2✔
462
}
463

464
// Delete cancel consumers and delete its messages from storage
465
func (queue *Queue) Delete(ifUnused bool, ifEmpty bool) (uint64, error) {
7✔
466
        queue.actLock.Lock()
7✔
467
        queue.cmrLock.Lock()
7✔
468
        queue.SafeQueue.Lock()
7✔
469
        defer queue.actLock.Unlock()
7✔
470
        defer queue.cmrLock.Unlock()
7✔
471
        defer queue.SafeQueue.Unlock()
7✔
472

7✔
473
        queue.active = false
7✔
474

7✔
475
        if ifUnused && len(queue.consumers) != 0 {
8✔
476
                return 0, errors.New("queue has consumers")
1✔
477
        }
1✔
478

479
        if ifEmpty && queue.SafeQueue.DirtyLength() != 0 {
7✔
480
                return 0, errors.New("queue has messages")
1✔
481
        }
1✔
482

483
        queue.cancelConsumers()
5✔
484
        length := uint64(atomic.LoadInt64(&queue.queueLength))
5✔
485

5✔
486
        if queue.durable {
6✔
487
                queue.msgPStorage.PurgeQueue(queue.name)
1✔
488
        }
1✔
489

490
        queue.metrics.Total.Counter.Dec(int64(length))
5✔
491
        queue.metrics.Ready.Counter.Dec(int64(length))
5✔
492

5✔
493
        queue.metrics.ServerTotal.Counter.Dec(int64(length))
5✔
494
        queue.metrics.ServerReady.Counter.Dec(int64(length))
5✔
495

5✔
496
        return length, nil
5✔
497
}
498

499
// AddConsumer add consumer to consumer messages with exclusive check
500
func (queue *Queue) AddConsumer(consumer interfaces.Consumer, exclusive bool) error {
9✔
501
        queue.cmrLock.Lock()
9✔
502
        defer queue.cmrLock.Unlock()
9✔
503

9✔
504
        if !queue.active {
10✔
505
                return fmt.Errorf("queue is not active")
1✔
506
        }
1✔
507
        queue.wasConsumed = true
8✔
508

8✔
509
        if len(queue.consumers) != 0 && (queue.consumeExcl || exclusive) {
10✔
510
                return fmt.Errorf("queue is busy by %d consumers", len(queue.consumers))
2✔
511
        }
2✔
512

513
        if exclusive {
7✔
514
                queue.consumeExcl = true
1✔
515
        }
1✔
516

517
        queue.consumers = append(queue.consumers, consumer)
6✔
518

6✔
519
        queue.callConsumers()
6✔
520
        return nil
6✔
521
}
522

523
// RemoveConsumer remove consumer
524
// If it was last consumer and queue is auto-delete - queue will be removed
525
func (queue *Queue) RemoveConsumer(cTag string) {
3✔
526
        queue.cmrLock.Lock()
3✔
527
        defer queue.cmrLock.Unlock()
3✔
528

3✔
529
        for i, cmr := range queue.consumers {
6✔
530
                if cmr.Tag() == cTag {
5✔
531
                        queue.consumers = append(queue.consumers[:i], queue.consumers[i+1:]...)
2✔
532
                        break
2✔
533
                }
534
        }
535
        cmrCount := len(queue.consumers)
3✔
536
        if cmrCount == 0 {
5✔
537
                queue.currentConsumer = 0
2✔
538
                queue.consumeExcl = false
2✔
539
        } else {
3✔
540
                queue.currentConsumer = (queue.currentConsumer + 1) % cmrCount
1✔
541
        }
1✔
542

543
        if cmrCount == 0 && queue.wasConsumed && queue.autoDelete {
4✔
544
                queue.autoDeleteQueue <- queue.name
1✔
545
        }
1✔
546
}
547

548
// Send event to call next consumer, that it can receive next message
549
func (queue *Queue) callConsumers() {
2,715✔
550
        if !queue.active {
2,715✔
551
                return
×
552
        }
×
553
        select {
2,715✔
554
        case queue.call <- struct{}{}:
622✔
555
        default:
2,093✔
556
        }
557
}
558

559
func (queue *Queue) cancelConsumers() {
5✔
560
        for _, cmr := range queue.consumers {
6✔
561
                cmr.Cancel()
1✔
562
        }
1✔
563
}
564

565
// Length returns queue length
566
func (queue *Queue) Length() uint64 {
7✔
567
        return uint64(atomic.LoadInt64(&queue.queueLength))
7✔
568
}
7✔
569

570
// ConsumersCount returns consumers count
571
func (queue *Queue) ConsumersCount() int {
4✔
572
        queue.cmrLock.RLock()
4✔
573
        defer queue.cmrLock.RUnlock()
4✔
574
        return len(queue.consumers)
4✔
575
}
4✔
576

577
// EqualWithErr returns is given queue equal to current
578
func (queue *Queue) EqualWithErr(qB *Queue) error {
4✔
579
        errTemplate := "inequivalent arg '%s' for queue '%s': received '%t' but current is '%t'"
4✔
580
        if queue.durable != qB.IsDurable() {
5✔
581
                return fmt.Errorf(errTemplate, "durable", queue.name, qB.IsDurable(), queue.durable)
1✔
582
        }
1✔
583
        if queue.autoDelete != qB.autoDelete {
4✔
584
                return fmt.Errorf(errTemplate, "autoDelete", queue.name, qB.autoDelete, queue.autoDelete)
1✔
585
        }
1✔
586
        if queue.exclusive != qB.IsExclusive() {
3✔
587
                return fmt.Errorf(errTemplate, "exclusive", queue.name, qB.IsExclusive(), queue.exclusive)
1✔
588
        }
1✔
589
        return nil
1✔
590
}
591

592
// Marshal returns raw representation of queue to store into storage
593
func (queue *Queue) Marshal(protoVersion string) (data []byte, err error) {
1✔
594
        buf := bytes.NewBuffer(make([]byte, 0))
1✔
595
        if err = amqp.WriteShortstr(buf, queue.name); err != nil {
1✔
596
                return nil, err
×
597
        }
×
598

599
        var autoDelete byte
1✔
600
        if queue.autoDelete {
1✔
601
                autoDelete = 1
×
602
        } else {
1✔
603
                autoDelete = 0
1✔
604
        }
1✔
605

606
        if err = amqp.WriteOctet(buf, autoDelete); err != nil {
1✔
607
                return nil, err
×
608
        }
×
609
        return buf.Bytes(), nil
1✔
610
}
611

612
// Unmarshal returns queue from storage raw bytes data
613
func (queue *Queue) Unmarshal(data []byte, protoVersion string) (err error) {
3✔
614
        buf := bytes.NewReader(data)
3✔
615
        if queue.name, err = amqp.ReadShortstr(buf); err != nil {
4✔
616
                return err
1✔
617
        }
1✔
618

619
        var autoDelete byte
2✔
620

2✔
621
        if autoDelete, err = amqp.ReadOctet(buf); err != nil {
3✔
622
                return err
1✔
623
        }
1✔
624
        queue.autoDelete = autoDelete > 0
1✔
625
        queue.durable = true
1✔
626
        return
1✔
627
}
628

629
// IsDurable returns is queue durable
630
func (queue *Queue) IsDurable() bool {
6✔
631
        return queue.durable
6✔
632
}
6✔
633

634
// IsExclusive returns is queue exclusive
635
func (queue *Queue) IsExclusive() bool {
4✔
636
        return queue.exclusive
4✔
637
}
4✔
638

639
// IsAutoDelete returns is queue should be deleted automatically
640
func (queue *Queue) IsAutoDelete() bool {
1✔
641
        return queue.autoDelete
1✔
642
}
1✔
643

644
// ConnID returns ID of connection that create this queue
645
func (queue *Queue) ConnID() uint64 {
1✔
646
        return queue.connID
1✔
647
}
1✔
648

649
// IsActive returns is queue's main loop is active
650
func (queue *Queue) IsActive() bool {
4✔
651
        return queue.active
4✔
652
}
4✔
653

654
// SetMetrics set external metrics
655
func (queue *Queue) SetMetrics(m *MetricsState) {
1✔
656
        queue.metrics = m
1✔
657
}
1✔
658

659
// GetMetrics returns metrics
660
func (queue *Queue) GetMetrics() *MetricsState {
1✔
661
        return queue.metrics
1✔
662
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc