• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pmorelli92 / bunnify / 25337805379

04 May 2026 07:04PM UTC coverage: 81.227% (-0.5%) from 81.763%
25337805379

Pull #116

github

pmorelli92
fix unbuffered exit
Pull Request #116: Fix/publisher concurrency

20 of 30 new or added lines in 4 files covered. (66.67%)

4 existing lines in 1 file now uncovered.

675 of 831 relevant lines covered (81.23%)

76.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.29
/consumerLoop.go
1
package bunnify
2

3
import (
4
        "encoding/json"
5
        "errors"
6
        "sync"
7
        "time"
8

9
        amqp "github.com/rabbitmq/amqp091-go"
10
)
11

12
func (c *Consumer) loop(channel *amqp.Channel, deliveries <-chan amqp.Delivery) {
113✔
13
        mutex := sync.Mutex{}
113✔
14
        for delivery := range deliveries {
529✔
15
                c.handle(delivery, &mutex)
416✔
16
        }
416✔
17

18
        // If the for exits, it means the channel stopped. Close it and try to
19
        // reconnect — unless the user closed the connection, in which case exit
20
        // cleanly without firing channel-lost / channel-failed notifications.
21
        if !channel.IsClosed() {
113✔
22
                channel.Close()
×
23
        }
×
24

25
        err := c.Consume()
113✔
26
        if errors.Is(err, errConnectionClosedBySystem) {
226✔
27
                return
113✔
28
        }
113✔
29

NEW
30
        notifyChannelLost(c.options.notificationCh, NotificationSourceConsumer)
×
NEW
31
        if err != nil {
×
UNCOV
32
                notifyChannelFailed(c.options.notificationCh, NotificationSourceConsumer, err)
×
UNCOV
33
        }
×
34
}
35

36
func (c *Consumer) parallelLoop(channel *amqp.Channel, deliveries <-chan amqp.Delivery) {
×
37
        mutex := sync.Mutex{}
×
38
        for delivery := range deliveries {
×
39
                go c.handle(delivery, &mutex)
×
40
        }
×
41

UNCOV
42
        if !channel.IsClosed() {
×
UNCOV
43
                channel.Close()
×
44
        }
×
45

NEW
46
        err := c.ConsumeParallel()
×
NEW
47
        if errors.Is(err, errConnectionClosedBySystem) {
×
NEW
48
                return
×
NEW
49
        }
×
50

NEW
51
        notifyChannelLost(c.options.notificationCh, NotificationSourceConsumer)
×
NEW
52
        if err != nil {
×
53
                notifyChannelFailed(c.options.notificationCh, NotificationSourceConsumer, err)
×
54
        }
×
55
}
56

57
func (c *Consumer) handle(delivery amqp.Delivery, mutex *sync.Mutex) {
416✔
58
        startTime := time.Now()
416✔
59
        deliveryInfo := getDeliveryInfo(c.queueName, delivery)
416✔
60
        eventReceived(c.queueName, deliveryInfo.RoutingKey)
416✔
61

416✔
62
        // Establish which handler is invoked
416✔
63
        mutex.Lock()
416✔
64
        handler, ok := c.options.handlers[deliveryInfo.RoutingKey]
416✔
65
        mutex.Unlock()
416✔
66
        if !ok {
419✔
67
                if c.options.defaultHandler == nil {
4✔
68
                        _ = delivery.Nack(false, false)
1✔
69
                        notifyEventHandlerNotFound(c.options.notificationCh, deliveryInfo.RoutingKey)
1✔
70
                        eventWithoutHandler(c.queueName, deliveryInfo.RoutingKey)
1✔
71
                        return
1✔
72
                }
1✔
73
                handler = c.options.defaultHandler
2✔
74
        }
75

76
        uevt := unmarshalEvent{DeliveryInfo: deliveryInfo}
415✔
77

415✔
78
        // For this error to happen an event not published by Bunnify is required
415✔
79
        if err := json.Unmarshal(delivery.Body, &uevt); err != nil {
415✔
80
                _ = delivery.Nack(false, false)
×
81
                eventNotParsable(c.queueName, deliveryInfo.RoutingKey)
×
82
                return
×
83
        }
×
84

85
        tracingCtx := extractToContext(delivery.Headers)
415✔
86
        if err := handler(tracingCtx, uevt); err != nil {
423✔
87
                elapsed := time.Since(startTime).Milliseconds()
8✔
88
                notifyEventHandlerFailed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed, err)
8✔
89
                _ = delivery.Nack(false, c.shouldRetry(delivery.Headers))
8✔
90
                eventNack(c.queueName, deliveryInfo.RoutingKey, elapsed)
8✔
91
                return
8✔
92
        }
8✔
93

94
        elapsed := time.Since(startTime).Milliseconds()
407✔
95
        notifyEventHandlerSucceed(c.options.notificationCh, deliveryInfo.RoutingKey, elapsed)
407✔
96
        _ = delivery.Ack(false)
407✔
97
        eventAck(c.queueName, deliveryInfo.RoutingKey, elapsed)
407✔
98
}
99

100
func (c *Consumer) shouldRetry(headers amqp.Table) bool {
8✔
101
        if c.options.retries <= 0 {
10✔
102
                return false
2✔
103
        }
2✔
104

105
        // On RabbitMQ 4+, basic.nack with requeue increments x-acquired-count on the
106
        // redelivery. On RabbitMQ 3, the equivalent counter was x-delivery-count.
107
        // Before the first retry, neither header is present (so 0).
108
        count, ok := headers["x-acquired-count"]
6✔
109
        if !ok {
8✔
110
                count, ok = headers["x-delivery-count"]
2✔
111
        }
2✔
112
        if !ok {
8✔
113
                return true
2✔
114
        }
2✔
115

116
        r, _ := count.(int64)
4✔
117
        return c.options.retries > int(r)
4✔
118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc