• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

oliver006 / redis_exporter / 17172001870

23 Aug 2025 05:58AM UTC coverage: 84.921% (-0.9%) from 85.87%
17172001870

Pull #1028

github

web-flow
Merge 891f7f01e into 7632b7b20
Pull Request #1028: sirupsen/log --> log/slog

121 of 249 new or added lines in 18 files covered. (48.59%)

6 existing lines in 1 file now uncovered.

2568 of 3024 relevant lines covered (84.92%)

13254.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.8
/exporter/streams.go
1
package exporter
2

3
import (
4
        "log/slog"
5
        "strconv"
6
        "strings"
7

8
        "github.com/gomodule/redigo/redis"
9
        "github.com/prometheus/client_golang/prometheus"
10
)
11

12
// All fields of the streamInfo struct must be exported
13
// because of redis.ScanStruct (reflect) limitations
14
type streamInfo struct {
15
        Length            int64  `redis:"length"`
16
        RadixTreeKeys     int64  `redis:"radix-tree-keys"`
17
        RadixTreeNodes    int64  `redis:"radix-tree-nodes"`
18
        LastGeneratedId   string `redis:"last-generated-id"`
19
        Groups            int64  `redis:"groups"`
20
        MaxDeletedEntryId string `redis:"max-deleted-entry-id"`
21
        FirstEntryId      string
22
        LastEntryId       string
23
        StreamGroupsInfo  []streamGroupsInfo
24
}
25

26
type streamGroupsInfo struct {
27
        Name                     string `redis:"name"`
28
        Consumers                int64  `redis:"consumers"`
29
        Pending                  int64  `redis:"pending"`
30
        LastDeliveredId          string `redis:"last-delivered-id"`
31
        EntriesRead              int64  `redis:"entries-read"`
32
        Lag                      int64  `redis:"lag"`
33
        StreamGroupConsumersInfo []streamGroupConsumersInfo
34
}
35

36
type streamGroupConsumersInfo struct {
37
        Name    string `redis:"name"`
38
        Pending int64  `redis:"pending"`
39
        Idle    int64  `redis:"idle"`
40
}
41

42
func getStreamInfo(c redis.Conn, key string) (*streamInfo, error) {
17✔
43
        values, err := redis.Values(doRedisCmd(c, "XINFO", "STREAM", key))
17✔
44
        if err != nil {
17✔
45
                return nil, err
×
46
        }
×
47

48
        // Scan slice to struct
49
        var stream streamInfo
17✔
50
        if err := redis.ScanStruct(values, &stream); err != nil {
17✔
51
                return nil, err
×
52
        }
×
53

54
        // Extract first and last id from slice
55
        for idx, v := range values {
351✔
56
                vbytes, ok := v.([]byte)
334✔
57
                if !ok {
452✔
58
                        continue
118✔
59
                }
60
                if string(vbytes) == "first-entry" {
233✔
61
                        stream.FirstEntryId = getStreamEntryId(values, idx+1)
17✔
62
                }
17✔
63
                if string(vbytes) == "last-entry" {
233✔
64
                        stream.LastEntryId = getStreamEntryId(values, idx+1)
17✔
65
                }
17✔
66
        }
67

68
        stream.StreamGroupsInfo, err = scanStreamGroups(c, key)
17✔
69
        if err != nil {
17✔
70
                return nil, err
×
71
        }
×
72

73
        slog.Debug("Retrieved stream info", "stream", &stream)
17✔
74
        return &stream, nil
17✔
75
}
76

77
func getStreamEntryId(redisValue []interface{}, index int) string {
34✔
78
        if values, ok := redisValue[index].([]interface{}); !ok || len(values) < 2 {
34✔
NEW
79
                slog.Debug("Failed to parse StreamEntryId")
×
80
                return ""
×
81
        }
×
82

83
        if len(redisValue) < index || redisValue[index] == nil {
34✔
NEW
84
                slog.Debug("Failed to parse StreamEntryId")
×
85
                return ""
×
86
        }
×
87

88
        entryId, ok := redisValue[index].([]interface{})[0].([]byte)
34✔
89
        if !ok {
34✔
NEW
90
                slog.Debug("Failed to parse StreamEntryId")
×
91
                return ""
×
92
        }
×
93
        return string(entryId)
34✔
94
}
95

96
func scanStreamGroups(c redis.Conn, stream string) ([]streamGroupsInfo, error) {
21✔
97
        groups, err := redis.Values(doRedisCmd(c, "XINFO", "GROUPS", stream))
21✔
98
        if err != nil {
21✔
99
                return nil, err
×
100
        }
×
101

102
        var result []streamGroupsInfo
21✔
103
        for _, g := range groups {
55✔
104
                v, err := redis.Values(g, nil)
34✔
105
                if err != nil {
34✔
NEW
106
                        slog.Error("Couldn't convert group values for stream", "stream", stream, "error", err)
×
107
                        continue
×
108
                }
109
                slog.Debug("streamGroupsInfo value", "value", v)
34✔
110

34✔
111
                var group streamGroupsInfo
34✔
112
                if err := redis.ScanStruct(v, &group); err != nil {
34✔
NEW
113
                        slog.Error("Couldn't scan group in stream", "stream", stream, "error", err)
×
114
                        continue
×
115
                }
116

117
                group.StreamGroupConsumersInfo, err = scanStreamGroupConsumers(c, stream, group.Name)
34✔
118
                if err != nil {
34✔
119
                        return nil, err
×
120
                }
×
121

122
                result = append(result, group)
34✔
123
        }
124

125
        slog.Debug("groups", "result", result)
21✔
126
        return result, nil
21✔
127
}
128

129
func scanStreamGroupConsumers(c redis.Conn, stream string, group string) ([]streamGroupConsumersInfo, error) {
36✔
130
        consumers, err := redis.Values(doRedisCmd(c, "XINFO", "CONSUMERS", stream, group))
36✔
131
        if err != nil {
36✔
132
                return nil, err
×
133
        }
×
134

135
        var result []streamGroupConsumersInfo
36✔
136
        for _, c := range consumers {
87✔
137

51✔
138
                v, err := redis.Values(c, nil)
51✔
139
                if err != nil {
51✔
NEW
140
                        slog.Error("Couldn't convert consumer values for group in stream", "group", group, "stream", stream, "error", err)
×
141
                        continue
×
142
                }
143
                slog.Debug("streamGroupConsumersInfo value", "value", v)
51✔
144

51✔
145
                var consumer streamGroupConsumersInfo
51✔
146
                if err := redis.ScanStruct(v, &consumer); err != nil {
51✔
NEW
147
                        slog.Error("Couldn't scan consumers for group in stream", "group", group, "stream", stream, "error", err)
×
148
                        continue
×
149
                }
150

151
                result = append(result, consumer)
51✔
152
        }
153

154
        slog.Debug("consumers", "result", result)
36✔
155
        return result, nil
36✔
156
}
157

158
func parseStreamItemId(id string) float64 {
104✔
159
        if strings.TrimSpace(id) == "" {
106✔
160
                return 0
2✔
161
        }
2✔
162
        frags := strings.Split(id, "-")
102✔
163
        if len(frags) == 0 {
102✔
NEW
164
                slog.Error("Couldn't parse StreamItemId", "id", id)
×
165
                return 0
×
166
        }
×
167
        parsedId, err := strconv.ParseFloat(strings.Split(id, "-")[0], 64)
102✔
168
        if err != nil {
102✔
NEW
169
                slog.Error("Couldn't parse given StreamItemId", "id", id, "error", err)
×
170
        }
×
171
        return parsedId
102✔
172
}
173

174
func (e *Exporter) extractStreamMetrics(ch chan<- prometheus.Metric, c redis.Conn) {
2,095✔
175
        streams, err := parseKeyArg(e.options.CheckStreams)
2,095✔
176
        if err != nil {
2,095✔
NEW
177
                slog.Error("Couldn't parse given stream keys", "error", err)
×
178
                return
×
179
        }
×
180

181
        singleStreams, err := parseKeyArg(e.options.CheckSingleStreams)
2,095✔
182
        if err != nil {
2,095✔
NEW
183
                slog.Error("Couldn't parse check-single-streams", "error", err)
×
184
                return
×
185
        }
×
186
        allStreams := append([]dbKeyPair{}, singleStreams...)
2,095✔
187

2,095✔
188
        scannedStreams, err := getKeysFromPatterns(c, streams, e.options.CheckKeysBatchSize)
2,095✔
189
        if err != nil {
2,095✔
NEW
190
                slog.Error("Error expanding key patterns", "error", err)
×
191
        } else {
2,095✔
192
                allStreams = append(allStreams, scannedStreams...)
2,095✔
193
        }
2,095✔
194

195
        slog.Debug("allStreams", "allStreams", allStreams)
2,095✔
196
        for _, k := range allStreams {
2,110✔
197
                if _, err := doRedisCmd(c, "SELECT", k.db); err != nil {
15✔
NEW
198
                        slog.Debug("Couldn't select database when getting stream info", "db", k.db)
×
199
                        continue
×
200
                }
201
                info, err := getStreamInfo(c, k.key)
15✔
202
                if err != nil {
15✔
NEW
203
                        slog.Error("couldn't get info for stream", "stream", k.key, "error", err)
×
204
                        continue
×
205
                }
206
                dbLabel := "db" + k.db
15✔
207

15✔
208
                e.registerConstMetricGauge(ch, "stream_length", float64(info.Length), dbLabel, k.key)
15✔
209
                e.registerConstMetricGauge(ch, "stream_radix_tree_keys", float64(info.RadixTreeKeys), dbLabel, k.key)
15✔
210
                e.registerConstMetricGauge(ch, "stream_radix_tree_nodes", float64(info.RadixTreeNodes), dbLabel, k.key)
15✔
211
                e.registerConstMetricGauge(ch, "stream_last_generated_id", parseStreamItemId(info.LastGeneratedId), dbLabel, k.key)
15✔
212
                e.registerConstMetricGauge(ch, "stream_groups", float64(info.Groups), dbLabel, k.key)
15✔
213
                e.registerConstMetricGauge(ch, "stream_max_deleted_entry_id", parseStreamItemId(info.MaxDeletedEntryId), dbLabel, k.key)
15✔
214
                e.registerConstMetricGauge(ch, "stream_first_entry_id", parseStreamItemId(info.FirstEntryId), dbLabel, k.key)
15✔
215
                e.registerConstMetricGauge(ch, "stream_last_entry_id", parseStreamItemId(info.LastEntryId), dbLabel, k.key)
15✔
216

15✔
217
                for _, g := range info.StreamGroupsInfo {
39✔
218
                        e.registerConstMetricGauge(ch, "stream_group_consumers", float64(g.Consumers), dbLabel, k.key, g.Name)
24✔
219
                        e.registerConstMetricGauge(ch, "stream_group_messages_pending", float64(g.Pending), dbLabel, k.key, g.Name)
24✔
220
                        e.registerConstMetricGauge(ch, "stream_group_last_delivered_id", parseStreamItemId(g.LastDeliveredId), dbLabel, k.key, g.Name)
24✔
221
                        e.registerConstMetricGauge(ch, "stream_group_entries_read", float64(g.EntriesRead), dbLabel, k.key, g.Name)
24✔
222
                        e.registerConstMetricGauge(ch, "stream_group_lag", float64(g.Lag), dbLabel, k.key, g.Name)
24✔
223
                        if !e.options.StreamsExcludeConsumerMetrics {
46✔
224
                                for _, c := range g.StreamGroupConsumersInfo {
55✔
225
                                        e.registerConstMetricGauge(ch, "stream_group_consumer_messages_pending", float64(c.Pending), dbLabel, k.key, g.Name, c.Name)
33✔
226
                                        e.registerConstMetricGauge(ch, "stream_group_consumer_idle_seconds", float64(c.Idle)/1e3, dbLabel, k.key, g.Name, c.Name)
33✔
227
                                }
33✔
228
                        }
229
                }
230
        }
231
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc