• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 19996014785

06 Dec 2025 11:51PM UTC coverage: 73.98% (-0.6%) from 74.604%
19996014785

push

github

web-flow
Update deps (#369)

* Update k6 to v1.4.2
* Update hambra/avro
* Update aws-sdk-go-v2 and aws-sdk-go-v2/config
* Update docs
* Migrate to golangci-lint v2
* Update golangci-lint action
* Rename and update config file (to be correctly picked up)
* Fix issues
* Remove invalid config
* Run kafka in the background
* Remove unnecessary step

92 of 142 new or added lines in 14 files covered. (64.79%)

47 existing lines in 4 files now uncovered.

1578 of 2133 relevant lines covered (73.98%)

14.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.03
/reader.go
1
package kafka
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "fmt"
8
        "io"
9
        "strconv"
10
        "time"
11

12
        "github.com/grafana/sobek"
13
        kafkago "github.com/segmentio/kafka-go"
14
        "github.com/sirupsen/logrus"
15
        "go.k6.io/k6/js/common"
16
        "go.k6.io/k6/metrics"
17
)
18

19
var (
20
        // Group balancers.
21
        groupBalancerRange        = "group_balancer_range"
22
        groupBalancerRoundRobin   = "group_balancer_round_robin"
23
        groupBalancerRackAffinity = "group_balancer_rack_affinity"
24

25
        GroupBalancers map[string]kafkago.GroupBalancer
26

27
        // Isolation levels.
28
        isolationLevelReadUncommitted = "isolation_level_read_uncommitted"
29
        isolationLevelReadCommitted   = "isolation_level_read_committed"
30

31
        IsolationLevels map[string]kafkago.IsolationLevel
32

33
        // Start offsets.
34
        lastOffset  = "start_offsets_last_offset"
35
        firstOffset = "start_offsets_first_offset"
36

37
        // StartOffsets determines from whence the consumer group should begin
38
        // consuming when it finds a partition without a committed offset.  If
39
        // non-zero, it must be set to one of FirstOffset or LastOffset.
40
        //
41
        // Default: FirstOffset
42
        //
43
        // Only used when GroupID is set
44
        // Ref: https://github.com/segmentio/kafka-go/blob/a8e5eabf4a90025a4ad2c28e929324d18db21103/reader.go#L481-L488
45
        StartOffsets map[string]int64
46

47
        RebalanceTimeout       = time.Second * 5
48
        HeartbeatInterval      = time.Second * 3
49
        SessionTimeout         = time.Second * 30
50
        PartitionWatchInterval = time.Second * 5
51
        JoinGroupBackoff       = time.Second * 5
52
        RetentionTime          = time.Hour * 24
53
)
54

55
type ReaderConfig struct {
56
        WatchPartitionChanges  bool          `json:"watchPartitionChanges"`
57
        ConnectLogger          bool          `json:"connectLogger"`
58
        Partition              int           `json:"partition"`
59
        QueueCapacity          int           `json:"queueCapacity"`
60
        MinBytes               int           `json:"minBytes"`
61
        MaxBytes               int           `json:"maxBytes"`
62
        MaxAttempts            int           `json:"maxAttempts"`
63
        GroupID                string        `json:"groupId"`
64
        Topic                  string        `json:"topic"`
65
        IsolationLevel         string        `json:"isolationLevel"`
66
        StartOffset            string        `json:"startOffset"`
67
        Offset                 int64         `json:"offset"`
68
        Brokers                []string      `json:"brokers"`
69
        GroupTopics            []string      `json:"groupTopics"`
70
        GroupBalancers         []string      `json:"groupBalancers"`
71
        MaxWait                Duration      `json:"maxWait"`
72
        ReadBatchTimeout       time.Duration `json:"readBatchTimeout"`
73
        ReadLagInterval        time.Duration `json:"readLagInterval"`
74
        HeartbeatInterval      time.Duration `json:"heartbeatInterval"`
75
        CommitInterval         time.Duration `json:"commitInterval"`
76
        PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
77
        SessionTimeout         time.Duration `json:"sessionTimeout"`
78
        RebalanceTimeout       time.Duration `json:"rebalanceTimeout"`
79
        JoinGroupBackoff       time.Duration `json:"joinGroupBackoff"`
80
        RetentionTime          time.Duration `json:"retentionTime"`
81
        ReadBackoffMin         time.Duration `json:"readBackoffMin"`
82
        ReadBackoffMax         time.Duration `json:"readBackoffMax"`
83
        OffsetOutOfRangeError  bool          `json:"offsetOutOfRangeError"` // deprecated, do not use
84
        SASL                   SASLConfig    `json:"sasl"`
85
        TLS                    TLSConfig     `json:"tls"`
86
}
87

88
type ConsumeConfig struct {
89
        Limit         int64 `json:"limit"`
90
        NanoPrecision bool  `json:"nanoPrecision"`
91
        ExpectTimeout bool  `json:"expectTimeout"`
92
}
93

94
type Duration struct {
95
        time.Duration
96
}
97

98
func (d Duration) MarshalJSON() ([]byte, error) {
×
NEW
99
        data, err := json.Marshal(d.String())
×
NEW
100
        if err != nil {
×
NEW
101
                return nil, fmt.Errorf("failed to marshal duration: %w", err)
×
NEW
102
        }
×
NEW
103
        return data, nil
×
104
}
105

106
func (d *Duration) UnmarshalJSON(b []byte) error {
1✔
107
        var v any
1✔
108
        if err := json.Unmarshal(b, &v); err != nil {
1✔
NEW
109
                return fmt.Errorf("failed to unmarshal duration: %w", err)
×
110
        }
×
111

112
        switch value := v.(type) {
1✔
113
        case string:
1✔
114
                var err error
1✔
115
                d.Duration, err = time.ParseDuration(value)
1✔
116
                if err != nil {
1✔
NEW
117
                        return fmt.Errorf("failed to parse duration: %w", err)
×
118
                }
×
119
                return nil
1✔
120
        default:
×
NEW
121
                return ErrInvalidDuration
×
122
        }
123
}
124

125
// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
126
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
127
// nolint: funlen
128
func (k *Kafka) readerClass(call sobek.ConstructorCall) *sobek.Object {
1✔
129
        runtime := k.vu.Runtime()
1✔
130
        var readerConfig *ReaderConfig
1✔
131
        if len(call.Arguments) == 0 {
1✔
132
                common.Throw(runtime, ErrNotEnoughArguments)
×
133
        }
×
134

135
        if params, ok := call.Argument(0).Export().(map[string]any); ok {
2✔
136
                if b, err := json.Marshal(params); err != nil {
1✔
137
                        common.Throw(runtime, err)
×
138
                } else {
1✔
139
                        if err = json.Unmarshal(b, &readerConfig); err != nil {
1✔
140
                                common.Throw(runtime, err)
×
141
                        }
×
142
                }
143
        }
144

145
        reader := k.reader(readerConfig)
1✔
146

1✔
147
        readerObject := runtime.NewObject()
1✔
148
        // This is the reader object itself
1✔
149
        if err := readerObject.Set("This", reader); err != nil {
1✔
150
                common.Throw(runtime, err)
×
151
        }
×
152

153
        err := readerObject.Set("consume", func(call sobek.FunctionCall) sobek.Value {
2✔
154
                var consumeConfig *ConsumeConfig
1✔
155
                if len(call.Arguments) == 0 {
1✔
156
                        common.Throw(runtime, ErrNotEnoughArguments)
×
157
                }
×
158

159
                if params, ok := call.Argument(0).Export().(map[string]any); ok {
2✔
160
                        if b, err := json.Marshal(params); err != nil {
1✔
161
                                common.Throw(runtime, err)
×
162
                        } else {
1✔
163
                                if err = json.Unmarshal(b, &consumeConfig); err != nil {
1✔
164
                                        common.Throw(runtime, err)
×
165
                                }
×
166
                        }
167
                }
168

169
                return runtime.ToValue(k.consume(reader, consumeConfig))
1✔
170
        })
171
        if err != nil {
1✔
172
                common.Throw(runtime, err)
×
173
        }
×
174

175
        // This is unnecessary, but it's here for reference purposes
176
        err = readerObject.Set("close", func(_ sobek.FunctionCall) sobek.Value {
2✔
177
                if err := reader.Close(); err != nil {
1✔
178
                        common.Throw(runtime, err)
×
179
                }
×
180

181
                return sobek.Undefined()
1✔
182
        })
183
        if err != nil {
1✔
184
                common.Throw(runtime, err)
×
185
        }
×
186

187
        freeze(readerObject)
1✔
188

1✔
189
        return runtime.ToValue(readerObject).ToObject(runtime)
1✔
190
}
191

192
// reader creates a Kafka reader with the given configuration
193
// nolint: funlen
194
func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
8✔
195
        dialer, err := GetDialer(readerConfig.SASL, readerConfig.TLS)
8✔
196
        if err != nil {
8✔
197
                if err.Unwrap() != nil {
×
198
                        logger.WithField("error", err).Error(err)
×
199
                }
×
200
                common.Throw(k.vu.Runtime(), err)
×
201
        }
202

203
        if readerConfig.Partition != 0 && readerConfig.GroupID != "" {
8✔
204
                common.Throw(k.vu.Runtime(), ErrPartitionAndGroupID)
×
205
        }
×
206

207
        if readerConfig.Topic != "" && readerConfig.GroupID != "" {
8✔
208
                common.Throw(k.vu.Runtime(), ErrTopicAndGroupID)
×
209
        }
×
210

211
        if readerConfig.GroupID != "" &&
8✔
212
                readerConfig.HeartbeatInterval == 0 {
8✔
UNCOV
213
                readerConfig.HeartbeatInterval = HeartbeatInterval
×
214
        }
×
215

216
        if readerConfig.GroupID != "" && readerConfig.SessionTimeout == 0 {
8✔
217
                readerConfig.SessionTimeout = SessionTimeout
×
218
        }
×
219

220
        if readerConfig.GroupID != "" && readerConfig.RebalanceTimeout == 0 {
8✔
221
                readerConfig.RebalanceTimeout = RebalanceTimeout
×
222
        }
×
223

224
        if readerConfig.GroupID != "" && readerConfig.JoinGroupBackoff == 0 {
8✔
225
                readerConfig.JoinGroupBackoff = JoinGroupBackoff
×
226
        }
×
227

228
        if readerConfig.GroupID != "" && readerConfig.PartitionWatchInterval == 0 {
8✔
229
                readerConfig.PartitionWatchInterval = PartitionWatchInterval
×
230
        }
×
231

232
        if readerConfig.GroupID != "" && readerConfig.RetentionTime == 0 {
8✔
233
                readerConfig.RetentionTime = RetentionTime
×
234
        }
×
235

236
        var groupBalancers []kafkago.GroupBalancer
8✔
237
        if readerConfig.GroupID != "" {
8✔
238
                groupBalancers = make([]kafkago.GroupBalancer, 0, len(readerConfig.GroupBalancers))
×
239
                for _, balancer := range readerConfig.GroupBalancers {
×
240
                        if b, ok := GroupBalancers[balancer]; ok {
×
241
                                groupBalancers = append(groupBalancers, b)
×
242
                        }
×
243
                }
244
                if len(groupBalancers) == 0 {
×
245
                        // Default to [Range, RoundRobin] if no balancer is specified
×
246
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRange])
×
247
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRoundRobin])
×
248
                }
×
249
        }
250

251
        isolationLevel := IsolationLevels[isolationLevelReadUncommitted]
8✔
252
        if readerConfig.IsolationLevel != "" {
8✔
253
                isolationLevel = IsolationLevels[readerConfig.IsolationLevel]
×
254
        }
×
255

256
        var startOffset int64 // Will be set if GroupID is specified and valid StartOffset is provided
8✔
257
        if readerConfig.GroupID != "" && readerConfig.StartOffset != "" {
8✔
258
                // Check if StartOffset is a predefined value
×
259
                if predefinedOffset, exists := StartOffsets[readerConfig.StartOffset]; exists {
×
260
                        startOffset = predefinedOffset
×
261
                } else {
×
262
                        // Attempt to parse StartOffset as an integer
×
263
                        parsedOffset, err := strconv.ParseInt(readerConfig.StartOffset, 10, 64)
×
264
                        if err != nil {
×
265
                                wrappedError := NewXk6KafkaError(
×
266
                                        failedParseStartOffset,
×
267
                                        "Failed to parse StartOffset, defaulting to FirstOffset", err)
×
268
                                // Log the error and default to FirstOffset
×
269
                                logger.WithFields(logrus.Fields{
×
270
                                        "error":        err,
×
271
                                        "start_offset": readerConfig.StartOffset,
×
272
                                }).Warn(wrappedError)
×
273
                                startOffset = StartOffsets[firstOffset]
×
274
                        } else {
×
275
                                // Use the parsed offset if valid
×
276
                                startOffset = parsedOffset
×
277
                        }
×
278
                }
279
        }
280

281
        consolidatedConfig := kafkago.ReaderConfig{
8✔
282
                Brokers:                readerConfig.Brokers,
8✔
283
                GroupID:                readerConfig.GroupID,
8✔
284
                GroupTopics:            readerConfig.GroupTopics,
8✔
285
                Topic:                  readerConfig.Topic,
8✔
286
                Partition:              readerConfig.Partition,
8✔
287
                QueueCapacity:          readerConfig.QueueCapacity,
8✔
288
                MinBytes:               readerConfig.MinBytes,
8✔
289
                MaxBytes:               readerConfig.MaxBytes,
8✔
290
                MaxWait:                readerConfig.MaxWait.Duration,
8✔
291
                ReadBatchTimeout:       readerConfig.ReadBatchTimeout,
8✔
292
                ReadLagInterval:        readerConfig.ReadLagInterval,
8✔
293
                GroupBalancers:         groupBalancers,
8✔
294
                HeartbeatInterval:      readerConfig.HeartbeatInterval,
8✔
295
                CommitInterval:         readerConfig.CommitInterval,
8✔
296
                PartitionWatchInterval: readerConfig.PartitionWatchInterval,
8✔
297
                WatchPartitionChanges:  readerConfig.WatchPartitionChanges,
8✔
298
                SessionTimeout:         readerConfig.SessionTimeout,
8✔
299
                RebalanceTimeout:       readerConfig.RebalanceTimeout,
8✔
300
                JoinGroupBackoff:       readerConfig.JoinGroupBackoff,
8✔
301
                RetentionTime:          readerConfig.RetentionTime,
8✔
302
                StartOffset:            startOffset,
8✔
303
                ReadBackoffMin:         readerConfig.ReadBackoffMin,
8✔
304
                ReadBackoffMax:         readerConfig.ReadBackoffMax,
8✔
305
                IsolationLevel:         isolationLevel,
8✔
306
                MaxAttempts:            readerConfig.MaxAttempts,
8✔
307
                OffsetOutOfRangeError:  readerConfig.OffsetOutOfRangeError,
8✔
308
                Dialer:                 dialer,
8✔
309
        }
8✔
310

8✔
311
        if readerConfig.ConnectLogger {
8✔
312
                consolidatedConfig.Logger = logger
×
313
        }
×
314

315
        reader := kafkago.NewReader(consolidatedConfig)
8✔
316

8✔
317
        if readerConfig.Offset > 0 {
8✔
318
                if readerConfig.GroupID == "" {
×
319
                        if err := reader.SetOffset(readerConfig.Offset); err != nil {
×
320
                                wrappedError := NewXk6KafkaError(
×
321
                                        failedSetOffset, "Unable to set offset, yet returning the reader.", err)
×
322
                                logger.WithField("error", wrappedError).Warn(wrappedError)
×
323
                                return reader
×
324
                        }
×
325
                } else {
×
326
                        err := NewXk6KafkaError(
×
327
                                failedSetOffset, "Offset and groupID are mutually exclusive options, "+
×
328
                                        "so offset is not set, yet returning the reader.", nil)
×
329
                        logger.WithField("error", err).Warn(err)
×
330
                        return reader
×
331
                }
×
332
        }
333

334
        return reader
8✔
335
}
336

337
// consume consumes messages from the given reader.
338
// nolint: funlen
339
func (k *Kafka) consume(
340
        reader *kafkago.Reader, consumeConfig *ConsumeConfig,
341
) []map[string]any {
14✔
342
        if state := k.vu.State(); state == nil {
14✔
343
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
344
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
345
        }
×
346

347
        var ctx context.Context
14✔
348
        if ctx = k.vu.Context(); ctx == nil {
14✔
349
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
350
                logger.WithField("error", err).Info(err)
×
351
                common.Throw(k.vu.Runtime(), err)
×
352
        }
×
353

354
        if consumeConfig.Limit <= 0 {
14✔
355
                consumeConfig.Limit = 1
×
356
        }
×
357

358
        messages := make([]map[string]any, 0)
14✔
359

14✔
360
        maxWait := reader.Config().MaxWait
14✔
361

14✔
362
        for range consumeConfig.Limit {
29✔
363
                ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
15✔
364
                msg, err := reader.ReadMessage(ctxWithTimeout)
15✔
365
                cancel()
15✔
366
                if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) && consumeConfig.ExpectTimeout {
16✔
367
                        k.reportReaderStats(reader.Stats())
1✔
368

1✔
369
                        return messages
1✔
370
                }
1✔
371
                if errors.Is(err, io.EOF) {
15✔
372
                        k.reportReaderStats(reader.Stats())
1✔
373

1✔
374
                        err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil)
1✔
375
                        logger.WithField("error", err).Info(err)
1✔
376
                        common.Throw(k.vu.Runtime(), err)
1✔
377
                }
1✔
378

379
                if err != nil {
15✔
380
                        k.reportReaderStats(reader.Stats())
2✔
381

2✔
382
                        err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", err)
2✔
383
                        logger.WithField("error", err).Error(err)
2✔
384
                        common.Throw(k.vu.Runtime(), err)
2✔
385
                }
2✔
386

387
                var messageTime string
11✔
388
                if consumeConfig.NanoPrecision {
11✔
389
                        messageTime = msg.Time.Format(time.RFC3339Nano)
×
390
                } else {
11✔
391
                        messageTime = time.Unix(msg.Time.Unix(), 0).Format(time.RFC3339)
11✔
392
                }
11✔
393

394
                // Rest of the fields of a given message
395
                message := map[string]any{
11✔
396
                        "topic":         msg.Topic,
11✔
397
                        "partition":     msg.Partition,
11✔
398
                        "offset":        msg.Offset,
11✔
399
                        "time":          messageTime,
11✔
400
                        "highWaterMark": msg.HighWaterMark,
11✔
401
                        "headers":       make(map[string]any),
11✔
402
                }
11✔
403

11✔
404
                if headers, ok := message["headers"].(map[string]any); ok {
22✔
405
                        for _, header := range msg.Headers {
11✔
406
                                headers[header.Key] = header.Value
×
407
                        }
×
408
                } else {
×
409
                        err = NewXk6KafkaError(failedTypeCast, "Failed to cast to map.", nil)
×
410
                        logger.WithField("error", err).Error(err)
×
411
                }
×
412

413
                if len(msg.Key) > 0 {
13✔
414
                        message["key"] = msg.Key
2✔
415
                }
2✔
416

417
                if len(msg.Value) > 0 {
22✔
418
                        message["value"] = msg.Value
11✔
419
                }
11✔
420

421
                messages = append(messages, message)
11✔
422
        }
423

424
        k.reportReaderStats(reader.Stats())
10✔
425
        return messages
10✔
426
}
427

428
// reportReaderStats reports the reader stats
429
//
430
//nolint:funlen
431
func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) {
14✔
432
        state := k.vu.State()
14✔
433
        if state == nil {
14✔
434
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
435
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
436
        }
×
437

438
        ctx := k.vu.Context()
14✔
439
        if ctx == nil {
14✔
440
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
441
                logger.WithField("error", err).Info(err)
×
442
                common.Throw(k.vu.Runtime(), err)
×
443
        }
×
444

445
        ctm := k.vu.State().Tags.GetCurrentValues()
14✔
446
        sampleTags := ctm.Tags.With("topic", currentStats.Topic)
14✔
447
        sampleTags = sampleTags.With("clientid", currentStats.ClientID)
14✔
448
        sampleTags = sampleTags.With("partition", currentStats.Partition)
14✔
449

14✔
450
        now := time.Now()
14✔
451
        metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{
14✔
452
                Samples: []metrics.Sample{
14✔
453
                        {
14✔
454
                                Time: now,
14✔
455
                                TimeSeries: metrics.TimeSeries{
14✔
456
                                        Metric: k.metrics.ReaderDials,
14✔
457
                                        Tags:   sampleTags,
14✔
458
                                },
14✔
459
                                Value:    float64(currentStats.Dials),
14✔
460
                                Metadata: ctm.Metadata,
14✔
461
                        },
14✔
462
                        {
14✔
463
                                Time: now,
14✔
464
                                TimeSeries: metrics.TimeSeries{
14✔
465
                                        Metric: k.metrics.ReaderFetches,
14✔
466
                                        Tags:   sampleTags,
14✔
467
                                },
14✔
468
                                Value:    float64(currentStats.Fetches),
14✔
469
                                Metadata: ctm.Metadata,
14✔
470
                        },
14✔
471
                        {
14✔
472
                                Time: now,
14✔
473
                                TimeSeries: metrics.TimeSeries{
14✔
474
                                        Metric: k.metrics.ReaderMessages,
14✔
475
                                        Tags:   sampleTags,
14✔
476
                                },
14✔
477
                                Value:    float64(currentStats.Messages),
14✔
478
                                Metadata: ctm.Metadata,
14✔
479
                        },
14✔
480
                        {
14✔
481
                                Time: now,
14✔
482
                                TimeSeries: metrics.TimeSeries{
14✔
483
                                        Metric: k.metrics.ReaderBytes,
14✔
484
                                        Tags:   sampleTags,
14✔
485
                                },
14✔
486
                                Value:    float64(currentStats.Bytes),
14✔
487
                                Metadata: ctm.Metadata,
14✔
488
                        },
14✔
489
                        {
14✔
490
                                Time: now,
14✔
491
                                TimeSeries: metrics.TimeSeries{
14✔
492
                                        Metric: k.metrics.ReaderRebalances,
14✔
493
                                        Tags:   sampleTags,
14✔
494
                                },
14✔
495
                                Value:    float64(currentStats.Rebalances),
14✔
496
                                Metadata: ctm.Metadata,
14✔
497
                        },
14✔
498
                        {
14✔
499
                                Time: now,
14✔
500
                                TimeSeries: metrics.TimeSeries{
14✔
501
                                        Metric: k.metrics.ReaderTimeouts,
14✔
502
                                        Tags:   sampleTags,
14✔
503
                                },
14✔
504
                                Value:    float64(currentStats.Timeouts),
14✔
505
                                Metadata: ctm.Metadata,
14✔
506
                        },
14✔
507
                        {
14✔
508
                                Time: now,
14✔
509
                                TimeSeries: metrics.TimeSeries{
14✔
510
                                        Metric: k.metrics.ReaderErrors,
14✔
511
                                        Tags:   sampleTags,
14✔
512
                                },
14✔
513
                                Value:    float64(currentStats.Errors),
14✔
514
                                Metadata: ctm.Metadata,
14✔
515
                        },
14✔
516
                        {
14✔
517
                                Time: now,
14✔
518
                                TimeSeries: metrics.TimeSeries{
14✔
519
                                        Metric: k.metrics.ReaderDialTime,
14✔
520
                                        Tags:   sampleTags,
14✔
521
                                },
14✔
522
                                Value:    metrics.D(currentStats.DialTime.Avg),
14✔
523
                                Metadata: ctm.Metadata,
14✔
524
                        },
14✔
525
                        {
14✔
526
                                Time: now,
14✔
527
                                TimeSeries: metrics.TimeSeries{
14✔
528
                                        Metric: k.metrics.ReaderReadTime,
14✔
529
                                        Tags:   sampleTags,
14✔
530
                                },
14✔
531
                                Value:    metrics.D(currentStats.ReadTime.Avg),
14✔
532
                                Metadata: ctm.Metadata,
14✔
533
                        },
14✔
534
                        {
14✔
535
                                Time: now,
14✔
536
                                TimeSeries: metrics.TimeSeries{
14✔
537
                                        Metric: k.metrics.ReaderWaitTime,
14✔
538
                                        Tags:   sampleTags,
14✔
539
                                },
14✔
540
                                Value:    metrics.D(currentStats.WaitTime.Avg),
14✔
541
                                Metadata: ctm.Metadata,
14✔
542
                        },
14✔
543
                        {
14✔
544
                                Time: now,
14✔
545
                                TimeSeries: metrics.TimeSeries{
14✔
546
                                        Metric: k.metrics.ReaderFetchSize,
14✔
547
                                        Tags:   sampleTags,
14✔
548
                                },
14✔
549
                                Value:    float64(currentStats.FetchSize.Avg),
14✔
550
                                Metadata: ctm.Metadata,
14✔
551
                        },
14✔
552
                        {
14✔
553
                                Time: now,
14✔
554
                                TimeSeries: metrics.TimeSeries{
14✔
555
                                        Metric: k.metrics.ReaderFetchBytes,
14✔
556
                                        Tags:   sampleTags,
14✔
557
                                },
14✔
558
                                Value:    float64(currentStats.FetchBytes.Min),
14✔
559
                                Metadata: ctm.Metadata,
14✔
560
                        },
14✔
561
                        {
14✔
562
                                Time: now,
14✔
563
                                TimeSeries: metrics.TimeSeries{
14✔
564
                                        Metric: k.metrics.ReaderFetchBytes,
14✔
565
                                        Tags:   sampleTags,
14✔
566
                                },
14✔
567
                                Value:    float64(currentStats.FetchBytes.Max),
14✔
568
                                Metadata: ctm.Metadata,
14✔
569
                        },
14✔
570
                        {
14✔
571
                                Time: now,
14✔
572
                                TimeSeries: metrics.TimeSeries{
14✔
573
                                        Metric: k.metrics.ReaderOffset,
14✔
574
                                        Tags:   sampleTags,
14✔
575
                                },
14✔
576
                                Value:    float64(currentStats.Offset),
14✔
577
                                Metadata: ctm.Metadata,
14✔
578
                        },
14✔
579
                        {
14✔
580
                                Time: now,
14✔
581
                                TimeSeries: metrics.TimeSeries{
14✔
582
                                        Metric: k.metrics.ReaderLag,
14✔
583
                                        Tags:   sampleTags,
14✔
584
                                },
14✔
585
                                Value:    float64(currentStats.Lag),
14✔
586
                                Metadata: ctm.Metadata,
14✔
587
                        },
14✔
588
                        {
14✔
589
                                Time: now,
14✔
590
                                TimeSeries: metrics.TimeSeries{
14✔
591
                                        Metric: k.metrics.ReaderMinBytes,
14✔
592
                                        Tags:   sampleTags,
14✔
593
                                },
14✔
594
                                Value:    float64(currentStats.MinBytes),
14✔
595
                                Metadata: ctm.Metadata,
14✔
596
                        },
14✔
597
                        {
14✔
598
                                Time: now,
14✔
599
                                TimeSeries: metrics.TimeSeries{
14✔
600
                                        Metric: k.metrics.ReaderMaxBytes,
14✔
601
                                        Tags:   sampleTags,
14✔
602
                                },
14✔
603
                                Value:    float64(currentStats.MaxBytes),
14✔
604
                                Metadata: ctm.Metadata,
14✔
605
                        },
14✔
606
                        {
14✔
607
                                Time: now,
14✔
608
                                TimeSeries: metrics.TimeSeries{
14✔
609
                                        Metric: k.metrics.ReaderMaxWait,
14✔
610
                                        Tags:   sampleTags,
14✔
611
                                },
14✔
612
                                Value:    metrics.D(currentStats.MaxWait),
14✔
613
                                Metadata: ctm.Metadata,
14✔
614
                        },
14✔
615
                        {
14✔
616
                                Time: now,
14✔
617
                                TimeSeries: metrics.TimeSeries{
14✔
618
                                        Metric: k.metrics.ReaderQueueLength,
14✔
619
                                        Tags:   sampleTags,
14✔
620
                                },
14✔
621
                                Value:    float64(currentStats.QueueLength),
14✔
622
                                Metadata: ctm.Metadata,
14✔
623
                        },
14✔
624
                        {
14✔
625
                                Time: now,
14✔
626
                                TimeSeries: metrics.TimeSeries{
14✔
627
                                        Metric: k.metrics.ReaderQueueCapacity,
14✔
628
                                        Tags:   sampleTags,
14✔
629
                                },
14✔
630
                                Value:    float64(currentStats.QueueCapacity),
14✔
631
                                Metadata: ctm.Metadata,
14✔
632
                        },
14✔
633
                },
14✔
634
                Tags: sampleTags,
14✔
635
                Time: now,
14✔
636
        })
14✔
637
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc