• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pmorelli92 / bunnify / 25373849739

05 May 2026 11:32AM UTC coverage: 80.976% (-0.8%) from 81.763%
25373849739

push

github

web-flow
Fix/publisher concurrency (#116)

20 of 30 new or added lines in 4 files covered. (66.67%)

4 existing lines in 1 file now uncovered.

664 of 820 relevant lines covered (80.98%)

76.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.29
/consumerLoop.go
1
package bunnify
2

3
import (
4
        "encoding/json"
5
        "errors"
6
        "sync"
7
        "time"
8

9
        amqp "github.com/rabbitmq/amqp091-go"
10
)
11

12
func (c *Consumer) loop(channel *amqp.Channel, deliveries <-chan amqp.Delivery) {
113✔
13
        mutex := sync.Mutex{}
113✔
14
        for delivery := range deliveries {
529✔
15
                c.handle(delivery, &mutex)
416✔
16
        }
416✔
17

18
        // If the for exits, it means the channel stopped. Close it and try to reconnect
19
        if !channel.IsClosed() {
113✔
20
                channel.Close()
×
21
        }
×
22

23
        err := c.Consume()
113✔
24
        if errors.Is(err, errConnectionClosedByUser) {
226✔
25
                return
113✔
26
        }
113✔
27

NEW
28
        notifyChannelLost(c.options.notificationCh, NotificationSourceConsumer)
×
NEW
29
        if err != nil {
×
UNCOV
30
                notifyChannelFailed(c.options.notificationCh, NotificationSourceConsumer, err)
×
UNCOV
31
        }
×
32
}
33

34
func (c *Consumer) parallelLoop(channel *amqp.Channel, deliveries <-chan amqp.Delivery) {
×
35
        mutex := sync.Mutex{}
×
36
        for delivery := range deliveries {
×
37
                go c.handle(delivery, &mutex)
×
38
        }
×
39

UNCOV
40
        if !channel.IsClosed() {
×
UNCOV
41
                channel.Close()
×
42
        }
×
43

NEW
44
        err := c.ConsumeParallel()
×
NEW
45
        if errors.Is(err, errConnectionClosedByUser) {
×
NEW
46
                return
×
NEW
47
        }
×
48

NEW
49
        notifyChannelLost(c.options.notificationCh, NotificationSourceConsumer)
×
NEW
50
        if err != nil {
×
51
                notifyChannelFailed(c.options.notificationCh, NotificationSourceConsumer, err)
×
52
        }
×
53
}
54

55
func (c *Consumer) handle(delivery amqp.Delivery, mutex *sync.Mutex) {
416✔
56
        startTime := time.Now()
416✔
57
        deliveryInfo := getDeliveryInfo(c.queueName, delivery)
416✔
58
        eventReceived(c.queueName, deliveryInfo.RoutingKey)
416✔
59

416✔
60
        // Establish which handler is invoked
416✔
61
        mutex.Lock()
416✔
62
        handler, ok := c.options.handlers[deliveryInfo.RoutingKey]
416✔
63
        mutex.Unlock()
416✔
64
        if !ok {
419✔
65
                if c.options.defaultHandler == nil {
4✔
66
                        _ = delivery.Nack(false, false)
1✔
67
                        notifyEventHandlerNotFound(c.options.notificationCh, deliveryInfo.RoutingKey)
1✔
68
                        eventWithoutHandler(c.queueName, deliveryInfo.RoutingKey)
1✔
69
                        return
1✔
70
                }
1✔
71
                handler = c.options.defaultHandler
2✔
72
        }
73

74
        uevt := unmarshalEvent{DeliveryInfo: deliveryInfo}
415✔
75

415✔
76
        // For this error to happen an event not published by Bunnify is required
415✔
77
        if err := json.Unmarshal(delivery.Body, &uevt); err != nil {
415✔
78
                _ = delivery.Nack(false, false)
×
79
                eventNotParsable(c.queueName, deliveryInfo.RoutingKey)
×
80
                return
×
81
        }
×
82

83
        tracingCtx := extractToContext(delivery.Headers)
415✔
84
        if err := handler(tracingCtx, uevt); err != nil {
423✔
85
                elapsed := time.Since(startTime).Milliseconds()
8✔
86
                notifyEventHandlerFailed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed, err)
8✔
87
                _ = delivery.Nack(false, c.shouldRetry(delivery.Headers))
8✔
88
                eventNack(c.queueName, deliveryInfo.RoutingKey, elapsed)
8✔
89
                return
8✔
90
        }
8✔
91

92
        elapsed := time.Since(startTime).Milliseconds()
407✔
93
        notifyEventHandlerSucceed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed)
407✔
94
        _ = delivery.Ack(false)
407✔
95
        eventAck(c.queueName, deliveryInfo.RoutingKey, elapsed)
407✔
96
}
97

98
func (c *Consumer) shouldRetry(headers amqp.Table) bool {
8✔
99
        if c.options.retries <= 0 {
10✔
100
                return false
2✔
101
        }
2✔
102

103
        // On RabbitMQ 4+, basic.nack with requeue increments x-acquired-count on the
104
        // redelivery. On RabbitMQ 3, the equivalent counter was x-delivery-count.
105
        // Before the first retry, neither header is present (so 0).
106
        count, ok := headers["x-acquired-count"]
6✔
107
        if !ok {
8✔
108
                count, ok = headers["x-delivery-count"]
2✔
109
        }
2✔
110
        if !ok {
8✔
111
                return true
2✔
112
        }
2✔
113

114
        r, _ := count.(int64)
4✔
115
        return c.options.retries > int(r)
4✔
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc