• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.95
/stats.go
1
package kafka
2

3
import (
4
        "errors"
5

6
        "go.k6.io/k6/js/modules"
7
        "go.k6.io/k6/metrics"
8
)
9

10
type kafkaMetrics struct {
11
        ReaderDials      *metrics.Metric
12
        ReaderFetches    *metrics.Metric
13
        ReaderMessages   *metrics.Metric
14
        ReaderBytes      *metrics.Metric
15
        ReaderRebalances *metrics.Metric
16
        ReaderTimeouts   *metrics.Metric
17
        ReaderErrors     *metrics.Metric
18

19
        ReaderDialTime   *metrics.Metric
20
        ReaderReadTime   *metrics.Metric
21
        ReaderWaitTime   *metrics.Metric
22
        ReaderFetchSize  *metrics.Metric
23
        ReaderFetchBytes *metrics.Metric
24

25
        ReaderOffset        *metrics.Metric
26
        ReaderLag           *metrics.Metric
27
        ReaderMinBytes      *metrics.Metric
28
        ReaderMaxBytes      *metrics.Metric
29
        ReaderMaxWait       *metrics.Metric
30
        ReaderQueueLength   *metrics.Metric
31
        ReaderQueueCapacity *metrics.Metric
32

33
        WriterWrites   *metrics.Metric
34
        WriterMessages *metrics.Metric
35
        WriterBytes    *metrics.Metric
36
        WriterErrors   *metrics.Metric
37

38
        WriterBatchTime      *metrics.Metric
39
        WriterBatchQueueTime *metrics.Metric
40
        WriterWriteTime      *metrics.Metric
41
        WriterWaitTime       *metrics.Metric
42
        WriterRetries        *metrics.Metric
43
        WriterBatchSize      *metrics.Metric
44
        WriterBatchBytes     *metrics.Metric
45

46
        WriterMaxAttempts  *metrics.Metric
47
        WriterMaxBatchSize *metrics.Metric
48
        WriterBatchTimeout *metrics.Metric
49
        WriterReadTimeout  *metrics.Metric
50
        WriterWriteTimeout *metrics.Metric
51
        WriterRequiredAcks *metrics.Metric
52
        WriterAsync        *metrics.Metric
53
}
54

55
// registerMetrics registers the metrics for the kafka module in the metrics registry
56
// nolint: funlen,maintidx
57
func registerMetrics(vu modules.VU) (kafkaMetrics, error) {
35✔
58
        var err error
35✔
59
        registry := vu.InitEnv().Registry
35✔
60
        kafkaMetrics := kafkaMetrics{}
35✔
61

35✔
62
        if kafkaMetrics.ReaderDials, err = registry.NewMetric(
35✔
63
                "kafka_reader_dial_count", metrics.Counter); err != nil {
35✔
64
                return kafkaMetrics, errors.Unwrap(err)
×
65
        }
×
66

67
        if kafkaMetrics.ReaderFetches, err = registry.NewMetric(
35✔
68
                "kafka_reader_fetches_count", metrics.Counter); err != nil {
35✔
69
                return kafkaMetrics, errors.Unwrap(err)
×
70
        }
×
71

72
        if kafkaMetrics.ReaderMessages, err = registry.NewMetric(
35✔
73
                "kafka_reader_message_count", metrics.Counter); err != nil {
35✔
74
                return kafkaMetrics, errors.Unwrap(err)
×
75
        }
×
76

77
        if kafkaMetrics.ReaderBytes, err = registry.NewMetric(
35✔
78
                "kafka_reader_message_bytes", metrics.Counter, metrics.Data); err != nil {
35✔
79
                return kafkaMetrics, errors.Unwrap(err)
×
80
        }
×
81

82
        if kafkaMetrics.ReaderRebalances, err = registry.NewMetric(
35✔
83
                "kafka_reader_rebalance_count", metrics.Counter); err != nil {
35✔
84
                return kafkaMetrics, errors.Unwrap(err)
×
85
        }
×
86

87
        if kafkaMetrics.ReaderTimeouts, err = registry.NewMetric(
35✔
88
                "kafka_reader_timeouts_count", metrics.Counter); err != nil {
35✔
89
                return kafkaMetrics, errors.Unwrap(err)
×
90
        }
×
91

92
        if kafkaMetrics.ReaderErrors, err = registry.NewMetric(
35✔
93
                "kafka_reader_error_count", metrics.Counter); err != nil {
35✔
94
                return kafkaMetrics, errors.Unwrap(err)
×
95
        }
×
96

97
        if kafkaMetrics.ReaderDialTime, err = registry.NewMetric(
35✔
98
                "kafka_reader_dial_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
99
                return kafkaMetrics, errors.Unwrap(err)
×
100
        }
×
101

102
        if kafkaMetrics.ReaderReadTime, err = registry.NewMetric(
35✔
103
                "kafka_reader_read_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
104
                return kafkaMetrics, errors.Unwrap(err)
×
105
        }
×
106

107
        if kafkaMetrics.ReaderWaitTime, err = registry.NewMetric(
35✔
108
                "kafka_reader_wait_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
109
                return kafkaMetrics, errors.Unwrap(err)
×
110
        }
×
111

112
        if kafkaMetrics.ReaderFetchSize, err = registry.NewMetric(
35✔
113
                "kafka_reader_fetch_size", metrics.Counter); err != nil {
35✔
114
                return kafkaMetrics, errors.Unwrap(err)
×
115
        }
×
116

117
        if kafkaMetrics.ReaderFetchBytes, err = registry.NewMetric(
35✔
118
                "kafka_reader_fetch_bytes", metrics.Counter, metrics.Data); err != nil {
35✔
119
                return kafkaMetrics, errors.Unwrap(err)
×
120
        }
×
121

122
        if kafkaMetrics.ReaderOffset, err = registry.NewMetric(
35✔
123
                "kafka_reader_offset", metrics.Gauge); err != nil {
35✔
124
                return kafkaMetrics, errors.Unwrap(err)
×
125
        }
×
126

127
        if kafkaMetrics.ReaderLag, err = registry.NewMetric(
35✔
128
                "kafka_reader_lag", metrics.Gauge); err != nil {
35✔
129
                return kafkaMetrics, errors.Unwrap(err)
×
130
        }
×
131

132
        if kafkaMetrics.ReaderMinBytes, err = registry.NewMetric(
35✔
133
                "kafka_reader_fetch_bytes_min", metrics.Gauge); err != nil {
35✔
134
                return kafkaMetrics, errors.Unwrap(err)
×
135
        }
×
136

137
        if kafkaMetrics.ReaderMaxBytes, err = registry.NewMetric(
35✔
138
                "kafka_reader_fetch_bytes_max", metrics.Gauge); err != nil {
35✔
139
                return kafkaMetrics, errors.Unwrap(err)
×
140
        }
×
141

142
        if kafkaMetrics.ReaderMaxWait, err = registry.NewMetric(
35✔
143
                "kafka_reader_fetch_wait_max", metrics.Gauge, metrics.Time); err != nil {
35✔
144
                return kafkaMetrics, errors.Unwrap(err)
×
145
        }
×
146

147
        if kafkaMetrics.ReaderQueueLength, err = registry.NewMetric(
35✔
148
                "kafka_reader_queue_length", metrics.Gauge); err != nil {
35✔
149
                return kafkaMetrics, errors.Unwrap(err)
×
150
        }
×
151

152
        if kafkaMetrics.ReaderQueueCapacity, err = registry.NewMetric(
35✔
153
                "kafka_reader_queue_capacity", metrics.Gauge); err != nil {
35✔
154
                return kafkaMetrics, errors.Unwrap(err)
×
155
        }
×
156

157
        if kafkaMetrics.WriterWrites, err = registry.NewMetric(
35✔
158
                "kafka_writer_write_count", metrics.Counter); err != nil {
35✔
159
                return kafkaMetrics, errors.Unwrap(err)
×
160
        }
×
161

162
        if kafkaMetrics.WriterMessages, err = registry.NewMetric(
35✔
163
                "kafka_writer_message_count", metrics.Counter); err != nil {
35✔
164
                return kafkaMetrics, errors.Unwrap(err)
×
165
        }
×
166

167
        if kafkaMetrics.WriterBytes, err = registry.NewMetric(
35✔
168
                "kafka_writer_message_bytes", metrics.Counter, metrics.Data); err != nil {
35✔
169
                return kafkaMetrics, errors.Unwrap(err)
×
170
        }
×
171

172
        if kafkaMetrics.WriterErrors, err = registry.NewMetric(
35✔
173
                "kafka_writer_error_count", metrics.Counter); err != nil {
35✔
174
                return kafkaMetrics, errors.Unwrap(err)
×
175
        }
×
176

177
        if kafkaMetrics.WriterBatchTime, err = registry.NewMetric(
35✔
178
                "kafka_writer_batch_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
179
                return kafkaMetrics, errors.Unwrap(err)
×
180
        }
×
181

182
        if kafkaMetrics.WriterBatchQueueTime, err = registry.NewMetric(
35✔
183
                "kafka_writer_batch_queue_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
184
                return kafkaMetrics, errors.Unwrap(err)
×
185
        }
×
186

187
        if kafkaMetrics.WriterWriteTime, err = registry.NewMetric(
35✔
188
                "kafka_writer_write_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
189
                return kafkaMetrics, errors.Unwrap(err)
×
190
        }
×
191

192
        if kafkaMetrics.WriterWaitTime, err = registry.NewMetric(
35✔
193
                "kafka_writer_wait_seconds", metrics.Trend, metrics.Time); err != nil {
35✔
194
                return kafkaMetrics, errors.Unwrap(err)
×
195
        }
×
196

197
        if kafkaMetrics.WriterRetries, err = registry.NewMetric(
35✔
198
                "kafka_writer_retries_count", metrics.Counter); err != nil {
35✔
199
                return kafkaMetrics, errors.Unwrap(err)
×
200
        }
×
201

202
        if kafkaMetrics.WriterBatchSize, err = registry.NewMetric(
35✔
203
                "kafka_writer_batch_size", metrics.Counter); err != nil {
35✔
204
                return kafkaMetrics, errors.Unwrap(err)
×
205
        }
×
206

207
        if kafkaMetrics.WriterBatchBytes, err = registry.NewMetric(
35✔
208
                "kafka_writer_batch_bytes", metrics.Counter, metrics.Data); err != nil {
35✔
209
                return kafkaMetrics, errors.Unwrap(err)
×
210
        }
×
211

212
        if kafkaMetrics.WriterMaxAttempts, err = registry.NewMetric(
35✔
213
                "kafka_writer_attempts_max", metrics.Gauge); err != nil {
35✔
214
                return kafkaMetrics, errors.Unwrap(err)
×
215
        }
×
216

217
        if kafkaMetrics.WriterMaxBatchSize, err = registry.NewMetric(
35✔
218
                "kafka_writer_batch_max", metrics.Gauge); err != nil {
35✔
219
                return kafkaMetrics, errors.Unwrap(err)
×
220
        }
×
221

222
        if kafkaMetrics.WriterBatchTimeout, err = registry.NewMetric(
35✔
223
                "kafka_writer_batch_timeout", metrics.Gauge, metrics.Time); err != nil {
35✔
224
                return kafkaMetrics, errors.Unwrap(err)
×
225
        }
×
226

227
        if kafkaMetrics.WriterReadTimeout, err = registry.NewMetric(
35✔
228
                "kafka_writer_read_timeout", metrics.Gauge, metrics.Time); err != nil {
35✔
229
                return kafkaMetrics, errors.Unwrap(err)
×
230
        }
×
231

232
        if kafkaMetrics.WriterWriteTimeout, err = registry.NewMetric(
35✔
233
                "kafka_writer_write_timeout", metrics.Gauge, metrics.Time); err != nil {
35✔
234
                return kafkaMetrics, errors.Unwrap(err)
×
235
        }
×
236

237
        if kafkaMetrics.WriterRequiredAcks, err = registry.NewMetric(
35✔
238
                "kafka_writer_acks_required", metrics.Gauge); err != nil {
35✔
239
                return kafkaMetrics, errors.Unwrap(err)
×
240
        }
×
241

242
        if kafkaMetrics.WriterAsync, err = registry.NewMetric(
35✔
243
                "kafka_writer_async", metrics.Rate); err != nil {
35✔
244
                return kafkaMetrics, errors.Unwrap(err)
×
245
        }
×
246

247
        return kafkaMetrics, nil
35✔
248
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc