• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

beatlabs / patron / 3775912372

pending completion
3775912372

Pull #598

github

GitHub
Merge 140bb99b3 into 0b7963476
Pull Request #598: Simplify examples

6268 of 7150 relevant lines covered (87.66%)

444.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/component/async/kafka/simple/duration_client.go
1
package simple
2

3
import (
4
        "context"
5
        "errors"
6
        "fmt"
7
        "time"
8

9
        "github.com/beatlabs/patron/log"
10
)
11

12
type durationClient struct {
13
        client durationKafkaClientAPI
14
}
15

16
func newDurationClient(client durationKafkaClientAPI) (durationClient, error) {
11✔
17
        if client == nil {
11✔
18
                return durationClient{}, errors.New("empty client api")
×
19
        }
×
20
        return durationClient{client: client}, nil
11✔
21
}
22

23
func (d durationClient) getTimeBasedOffsetsPerPartition(ctx context.Context, topic string, since time.Time, timeExtractor TimeExtractor) (map[int32]int64, error) {
11✔
24
        partitionIDs, err := d.client.getPartitionIDs(topic)
11✔
25
        if err != nil {
12✔
26
                return nil, err
1✔
27
        }
1✔
28

29
        responseCh := make(chan partitionOffsetResponse, len(partitionIDs))
10✔
30
        d.triggerWorkers(ctx, topic, since, timeExtractor, partitionIDs, responseCh)
10✔
31
        return d.aggregateResponses(ctx, partitionIDs, responseCh)
10✔
32
}
33

34
type partitionOffsetResponse struct {
35
        partitionID int32
36
        offset      int64
37
        err         error
38
}
39

40
func (d durationClient) triggerWorkers(ctx context.Context, topic string, since time.Time, timeExtractor TimeExtractor, partitionIDs []int32, responseCh chan<- partitionOffsetResponse) {
10✔
41
        for _, partitionID := range partitionIDs {
22✔
42
                partitionID := partitionID
12✔
43
                go func() {
24✔
44
                        offset, err := d.getTimeBasedOffset(ctx, topic, since, partitionID, timeExtractor)
12✔
45
                        select {
12✔
46
                        case <-ctx.Done():
×
47
                                return
×
48
                        case responseCh <- partitionOffsetResponse{
49
                                partitionID: partitionID,
50
                                offset:      offset,
51
                                err:         err,
52
                        }:
12✔
53
                        }
54
                }()
55
        }
56
}
57

58
func (d durationClient) aggregateResponses(ctx context.Context, partitionIDs []int32, responseCh <-chan partitionOffsetResponse) (map[int32]int64, error) {
10✔
59
        numberOfPartitions := len(partitionIDs)
10✔
60
        offsets := make(map[int32]int64, numberOfPartitions)
10✔
61
        numberOfResponses := 0
10✔
62
        for {
22✔
63
                select {
12✔
64
                case <-ctx.Done():
1✔
65
                        return nil, fmt.Errorf("context cancelled before collecting partition responses: %w", ctx.Err())
1✔
66
                case response := <-responseCh:
11✔
67
                        if response.err != nil {
14✔
68
                                return nil, response.err
3✔
69
                        }
3✔
70

71
                        offsets[response.partitionID] = response.offset
8✔
72
                        numberOfResponses++
8✔
73
                        if numberOfResponses == numberOfPartitions {
14✔
74
                                return offsets, nil
6✔
75
                        }
6✔
76
                }
77
        }
78
}
79

80
func (d durationClient) getTimeBasedOffset(ctx context.Context, topic string, since time.Time, partitionID int32, timeExtractor TimeExtractor) (int64, error) {
12✔
81
        left, err := d.client.getOldestOffset(topic, partitionID)
12✔
82
        if err != nil {
13✔
83
                return 0, err
1✔
84
        }
1✔
85

86
        newestOffset, err := d.client.getNewestOffset(topic, partitionID)
11✔
87
        if err != nil {
12✔
88
                return 0, err
1✔
89
        }
1✔
90
        // The right boundary must be inclusive
91
        right := newestOffset - 1
10✔
92

10✔
93
        return d.offsetBinarySearch(ctx, topic, since, partitionID, timeExtractor, left, right)
10✔
94
}
95

96
func (d durationClient) offsetBinarySearch(ctx context.Context, topic string, since time.Time, partitionID int32, timeExtractor TimeExtractor, left, right int64) (int64, error) {
10✔
97
        for left <= right {
36✔
98
                mid := left + (right-left)/2
26✔
99

26✔
100
                msg, err := d.client.getMessageAtOffset(ctx, topic, partitionID, mid)
26✔
101
                if err != nil {
28✔
102
                        // Under extraordinary circumstances (e.g. the retention policy being applied just before retrieving the message at a particular offset),
2✔
103
                        // the offset might not be accessible anymore.
2✔
104
                        // In this case, we simply log a warning and restrict the interval to the right.
2✔
105
                        if errors.Is(err, &outOfRangeOffsetError{}) {
3✔
106
                                log.Warnf("offset %d on partition %d is out of range: %v", mid, partitionID, err)
1✔
107
                                left = mid + 1
1✔
108
                                continue
1✔
109
                        }
110
                        return 0, fmt.Errorf("error while retrieving message offset %d on partition %d: %w", mid, partitionID, err)
1✔
111
                }
112

113
                t, err := timeExtractor(msg)
24✔
114
                if err != nil {
26✔
115
                        log.FromContext(ctx).Warnf("error while executing comparator: %v", err)
2✔
116
                        // In case of a failure, we compress the range so that the next calculated mid is different
2✔
117
                        left++
2✔
118
                        continue
2✔
119
                }
120

121
                if t.Equal(since) {
23✔
122
                        return mid, nil
1✔
123
                }
1✔
124
                if t.Before(since) {
32✔
125
                        left = mid + 1
11✔
126
                } else {
21✔
127
                        right = mid - 1
10✔
128
                }
10✔
129
        }
130

131
        return left, nil
8✔
132
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc