• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 10814152930

11 Sep 2024 02:44PM UTC coverage: 75.27%. Remained the same
10814152930

push

github

web-flow
Update deps (#305)

* Update k6 to v0.53.0
* Update Go to v1.23.1
* Update deps
* Update Go version in actions
* Pin package to avoid conflicts

1534 of 2038 relevant lines covered (75.27%)

13.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.07
/reader.go
1
package kafka
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "io"
8
        "time"
9

10
        "github.com/grafana/sobek"
11
        kafkago "github.com/segmentio/kafka-go"
12
        "go.k6.io/k6/js/common"
13
        "go.k6.io/k6/metrics"
14
)
15

16
var (
17
        // Group balancers.
18
        groupBalancerRange        = "group_balancer_range"
19
        groupBalancerRoundRobin   = "group_balancer_round_robin"
20
        groupBalancerRackAffinity = "group_balancer_rack_affinity"
21

22
        GroupBalancers map[string]kafkago.GroupBalancer
23

24
        // Isolation levels.
25
        isolationLevelReadUncommitted = "isolation_level_read_uncommitted"
26
        isolationLevelReadCommitted   = "isolation_level_read_committed"
27

28
        IsolationLevels map[string]kafkago.IsolationLevel
29

30
        // Start offsets.
31
        lastOffset  = "start_offset_last_offset"
32
        firstOffset = "start_offset_first_offset"
33

34
        StartOffsets map[string]int64
35

36
        RebalanceTimeout       = time.Second * 5
37
        HeartbeatInterval      = time.Second * 3
38
        SessionTimeout         = time.Second * 30
39
        PartitionWatchInterval = time.Second * 5
40
        JoinGroupBackoff       = time.Second * 5
41
        RetentionTime          = time.Hour * 24
42
)
43

44
type ReaderConfig struct {
45
        WatchPartitionChanges  bool          `json:"watchPartitionChanges"`
46
        ConnectLogger          bool          `json:"connectLogger"`
47
        Partition              int           `json:"partition"`
48
        QueueCapacity          int           `json:"queueCapacity"`
49
        MinBytes               int           `json:"minBytes"`
50
        MaxBytes               int           `json:"maxBytes"`
51
        MaxAttempts            int           `json:"maxAttempts"`
52
        GroupID                string        `json:"groupId"`
53
        Topic                  string        `json:"topic"`
54
        IsolationLevel         string        `json:"isolationLevel"`
55
        StartOffset            string        `json:"startOffset"`
56
        Offset                 int64         `json:"offset"`
57
        Brokers                []string      `json:"brokers"`
58
        GroupTopics            []string      `json:"groupTopics"`
59
        GroupBalancers         []string      `json:"groupBalancers"`
60
        MaxWait                Duration      `json:"maxWait"`
61
        ReadBatchTimeout       time.Duration `json:"readBatchTimeout"`
62
        ReadLagInterval        time.Duration `json:"readLagInterval"`
63
        HeartbeatInterval      time.Duration `json:"heartbeatInterval"`
64
        CommitInterval         time.Duration `json:"commitInterval"`
65
        PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
66
        SessionTimeout         time.Duration `json:"sessionTimeout"`
67
        RebalanceTimeout       time.Duration `json:"rebalanceTimeout"`
68
        JoinGroupBackoff       time.Duration `json:"joinGroupBackoff"`
69
        RetentionTime          time.Duration `json:"retentionTime"`
70
        ReadBackoffMin         time.Duration `json:"readBackoffMin"`
71
        ReadBackoffMax         time.Duration `json:"readBackoffMax"`
72
        OffsetOutOfRangeError  bool          `json:"offsetOutOfRangeError"` // deprecated, do not use
73
        SASL                   SASLConfig    `json:"sasl"`
74
        TLS                    TLSConfig     `json:"tls"`
75
}
76

77
type ConsumeConfig struct {
78
        Limit         int64 `json:"limit"`
79
        NanoPrecision bool  `json:"nanoPrecision"`
80
}
81

82
type Duration struct {
83
        time.Duration
84
}
85

86
func (d Duration) MarshalJSON() ([]byte, error) {
×
87
        return json.Marshal(d.String())
×
88
}
×
89

90
func (d *Duration) UnmarshalJSON(b []byte) error {
1✔
91
        var v interface{}
1✔
92
        if err := json.Unmarshal(b, &v); err != nil {
1✔
93
                return err
×
94
        }
×
95

96
        switch value := v.(type) {
1✔
97
        case string:
1✔
98
                var err error
1✔
99
                d.Duration, err = time.ParseDuration(value)
1✔
100
                if err != nil {
1✔
101
                        return err
×
102
                }
×
103
                return nil
1✔
104
        default:
×
105
                return errors.New("invalid duration")
×
106
        }
107
}
108

109
// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
110
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
111
// nolint: funlen
112
func (k *Kafka) readerClass(call sobek.ConstructorCall) *sobek.Object {
1✔
113
        runtime := k.vu.Runtime()
1✔
114
        var readerConfig *ReaderConfig
1✔
115
        if len(call.Arguments) == 0 {
1✔
116
                common.Throw(runtime, ErrNotEnoughArguments)
×
117
        }
×
118

119
        if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
120
                if b, err := json.Marshal(params); err != nil {
1✔
121
                        common.Throw(runtime, err)
×
122
                } else {
1✔
123
                        if err = json.Unmarshal(b, &readerConfig); err != nil {
1✔
124
                                common.Throw(runtime, err)
×
125
                        }
×
126
                }
127
        }
128

129
        reader := k.reader(readerConfig)
1✔
130

1✔
131
        readerObject := runtime.NewObject()
1✔
132
        // This is the reader object itself
1✔
133
        if err := readerObject.Set("This", reader); err != nil {
1✔
134
                common.Throw(runtime, err)
×
135
        }
×
136

137
        err := readerObject.Set("consume", func(call sobek.FunctionCall) sobek.Value {
2✔
138
                var consumeConfig *ConsumeConfig
1✔
139
                if len(call.Arguments) == 0 {
1✔
140
                        common.Throw(runtime, ErrNotEnoughArguments)
×
141
                }
×
142

143
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
144
                        if b, err := json.Marshal(params); err != nil {
1✔
145
                                common.Throw(runtime, err)
×
146
                        } else {
1✔
147
                                if err = json.Unmarshal(b, &consumeConfig); err != nil {
1✔
148
                                        common.Throw(runtime, err)
×
149
                                }
×
150
                        }
151
                }
152

153
                return runtime.ToValue(k.consume(reader, consumeConfig))
1✔
154
        })
155
        if err != nil {
1✔
156
                common.Throw(runtime, err)
×
157
        }
×
158

159
        // This is unnecessary, but it's here for reference purposes
160
        err = readerObject.Set("close", func(call sobek.FunctionCall) sobek.Value {
2✔
161
                if err := reader.Close(); err != nil {
1✔
162
                        common.Throw(runtime, err)
×
163
                }
×
164

165
                return sobek.Undefined()
1✔
166
        })
167
        if err != nil {
1✔
168
                common.Throw(runtime, err)
×
169
        }
×
170

171
        freeze(readerObject)
1✔
172

1✔
173
        return runtime.ToValue(readerObject).ToObject(runtime)
1✔
174
}
175

176
// reader creates a Kafka reader with the given configuration
177
// nolint: funlen
178
func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
7✔
179
        dialer, err := GetDialer(readerConfig.SASL, readerConfig.TLS)
7✔
180
        if err != nil {
7✔
181
                if err.Unwrap() != nil {
×
182
                        logger.WithField("error", err).Error(err)
×
183
                }
×
184
                common.Throw(k.vu.Runtime(), err)
×
185
        }
186

187
        if readerConfig.Partition != 0 && readerConfig.GroupID != "" {
7✔
188
                common.Throw(k.vu.Runtime(), ErrPartitionAndGroupID)
×
189
        }
×
190

191
        if readerConfig.Topic != "" && readerConfig.GroupID != "" {
7✔
192
                common.Throw(k.vu.Runtime(), ErrTopicAndGroupID)
×
193
        }
×
194

195
        if readerConfig.GroupID != "" &&
7✔
196
                len(readerConfig.GroupTopics) >= 0 &&
7✔
197
                readerConfig.HeartbeatInterval == 0 {
7✔
198
                readerConfig.HeartbeatInterval = HeartbeatInterval
×
199
        }
×
200

201
        if readerConfig.GroupID != "" && readerConfig.SessionTimeout == 0 {
7✔
202
                readerConfig.SessionTimeout = SessionTimeout
×
203
        }
×
204

205
        if readerConfig.GroupID != "" && readerConfig.RebalanceTimeout == 0 {
7✔
206
                readerConfig.RebalanceTimeout = RebalanceTimeout
×
207
        }
×
208

209
        if readerConfig.GroupID != "" && readerConfig.JoinGroupBackoff == 0 {
7✔
210
                readerConfig.JoinGroupBackoff = JoinGroupBackoff
×
211
        }
×
212

213
        if readerConfig.GroupID != "" && readerConfig.PartitionWatchInterval == 0 {
7✔
214
                readerConfig.PartitionWatchInterval = PartitionWatchInterval
×
215
        }
×
216

217
        if readerConfig.GroupID != "" && readerConfig.RetentionTime == 0 {
7✔
218
                readerConfig.RetentionTime = RetentionTime
×
219
        }
×
220

221
        var groupBalancers []kafkago.GroupBalancer
7✔
222
        if readerConfig.GroupID != "" {
7✔
223
                groupBalancers = make([]kafkago.GroupBalancer, 0, len(readerConfig.GroupBalancers))
×
224
                for _, balancer := range readerConfig.GroupBalancers {
×
225
                        if b, ok := GroupBalancers[balancer]; ok {
×
226
                                groupBalancers = append(groupBalancers, b)
×
227
                        }
×
228
                }
229
                if len(groupBalancers) == 0 {
×
230
                        // Default to [Range, RoundRobin] if no balancer is specified
×
231
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRange])
×
232
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRoundRobin])
×
233
                }
×
234
        }
235

236
        isolationLevel := IsolationLevels[isolationLevelReadUncommitted]
7✔
237
        if readerConfig.IsolationLevel != "" {
7✔
238
                isolationLevel = IsolationLevels[readerConfig.IsolationLevel]
×
239
        }
×
240

241
        var startOffset int64
7✔
242
        if readerConfig.GroupID != "" && readerConfig.StartOffset != "" {
7✔
243
                startOffset = StartOffsets[firstOffset] // Default to FirstOffset
×
244
                if s, ok := StartOffsets[readerConfig.StartOffset]; ok {
×
245
                        startOffset = s
×
246
                }
×
247
        }
248

249
        consolidatedConfig := kafkago.ReaderConfig{
7✔
250
                Brokers:                readerConfig.Brokers,
7✔
251
                GroupID:                readerConfig.GroupID,
7✔
252
                GroupTopics:            readerConfig.GroupTopics,
7✔
253
                Topic:                  readerConfig.Topic,
7✔
254
                Partition:              readerConfig.Partition,
7✔
255
                QueueCapacity:          readerConfig.QueueCapacity,
7✔
256
                MinBytes:               readerConfig.MinBytes,
7✔
257
                MaxBytes:               readerConfig.MaxBytes,
7✔
258
                MaxWait:                readerConfig.MaxWait.Duration,
7✔
259
                ReadBatchTimeout:       readerConfig.ReadBatchTimeout,
7✔
260
                ReadLagInterval:        readerConfig.ReadLagInterval,
7✔
261
                GroupBalancers:         groupBalancers,
7✔
262
                HeartbeatInterval:      readerConfig.HeartbeatInterval,
7✔
263
                CommitInterval:         readerConfig.CommitInterval,
7✔
264
                PartitionWatchInterval: readerConfig.PartitionWatchInterval,
7✔
265
                WatchPartitionChanges:  readerConfig.WatchPartitionChanges,
7✔
266
                SessionTimeout:         readerConfig.SessionTimeout,
7✔
267
                RebalanceTimeout:       readerConfig.RebalanceTimeout,
7✔
268
                JoinGroupBackoff:       readerConfig.JoinGroupBackoff,
7✔
269
                RetentionTime:          readerConfig.RetentionTime,
7✔
270
                StartOffset:            startOffset,
7✔
271
                ReadBackoffMin:         readerConfig.ReadBackoffMin,
7✔
272
                ReadBackoffMax:         readerConfig.ReadBackoffMax,
7✔
273
                IsolationLevel:         isolationLevel,
7✔
274
                MaxAttempts:            readerConfig.MaxAttempts,
7✔
275
                OffsetOutOfRangeError:  readerConfig.OffsetOutOfRangeError,
7✔
276
                Dialer:                 dialer,
7✔
277
        }
7✔
278

7✔
279
        if readerConfig.ConnectLogger {
7✔
280
                consolidatedConfig.Logger = logger
×
281
        }
×
282

283
        reader := kafkago.NewReader(consolidatedConfig)
7✔
284

7✔
285
        if readerConfig.Offset > 0 {
9✔
286
                if readerConfig.GroupID == "" {
4✔
287
                        if err := reader.SetOffset(readerConfig.Offset); err != nil {
2✔
288
                                wrappedError := NewXk6KafkaError(
×
289
                                        failedSetOffset, "Unable to set offset, yet returning the reader.", err)
×
290
                                logger.WithField("error", wrappedError).Warn(wrappedError)
×
291
                                return reader
×
292
                        }
×
293
                } else {
×
294
                        err := NewXk6KafkaError(
×
295
                                failedSetOffset, "Offset and groupID are mutually exclusive options, "+
×
296
                                        "so offset is not set, yet returning the reader.", nil)
×
297
                        logger.WithField("error", err).Warn(err)
×
298
                        return reader
×
299
                }
×
300
        }
301

302
        return reader
7✔
303
}
304

305
// consume consumes messages from the given reader.
306
// nolint: funlen
307
func (k *Kafka) consume(
308
        reader *kafkago.Reader, consumeConfig *ConsumeConfig,
309
) []map[string]interface{} {
12✔
310
        if state := k.vu.State(); state == nil {
12✔
311
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
312
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
313
        }
×
314

315
        var ctx context.Context
12✔
316
        if ctx = k.vu.Context(); ctx == nil {
12✔
317
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
318
                logger.WithField("error", err).Info(err)
×
319
                common.Throw(k.vu.Runtime(), err)
×
320
        }
×
321

322
        if consumeConfig.Limit <= 0 {
12✔
323
                consumeConfig.Limit = 1
×
324
        }
×
325

326
        messages := make([]map[string]interface{}, 0)
12✔
327

12✔
328
        maxWait := reader.Config().MaxWait
12✔
329

12✔
330
        for i := int64(0); i < consumeConfig.Limit; i++ {
24✔
331
                ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
12✔
332
                msg, err := reader.ReadMessage(ctxWithTimeout)
12✔
333
                cancel()
12✔
334
                if errors.Is(err, io.EOF) {
12✔
335
                        k.reportReaderStats(reader.Stats())
×
336

×
337
                        err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil)
×
338
                        logger.WithField("error", err).Info(err)
×
339
                        common.Throw(k.vu.Runtime(), err)
×
340
                }
×
341

342
                if err != nil {
14✔
343
                        k.reportReaderStats(reader.Stats())
2✔
344

2✔
345
                        err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", err)
2✔
346
                        logger.WithField("error", err).Error(err)
2✔
347
                        common.Throw(k.vu.Runtime(), err)
2✔
348
                }
2✔
349

350
                var messageTime string
10✔
351
                if consumeConfig.NanoPrecision {
10✔
352
                        messageTime = msg.Time.Format(time.RFC3339Nano)
×
353
                } else {
10✔
354
                        messageTime = time.Unix(msg.Time.Unix(), 0).Format(time.RFC3339)
10✔
355
                }
10✔
356

357
                // Rest of the fields of a given message
358
                message := map[string]interface{}{
10✔
359
                        "topic":         msg.Topic,
10✔
360
                        "partition":     msg.Partition,
10✔
361
                        "offset":        msg.Offset,
10✔
362
                        "time":          messageTime,
10✔
363
                        "highWaterMark": msg.HighWaterMark,
10✔
364
                        "headers":       make(map[string]interface{}),
10✔
365
                }
10✔
366

10✔
367
                if headers, ok := message["headers"].(map[string]interface{}); ok {
20✔
368
                        for _, header := range msg.Headers {
10✔
369
                                headers[header.Key] = header.Value
×
370
                        }
×
371
                } else {
×
372
                        err = NewXk6KafkaError(failedTypeCast, "Failed to cast to map.", nil)
×
373
                        logger.WithField("error", err).Error(err)
×
374
                }
×
375

376
                if len(msg.Key) > 0 {
12✔
377
                        message["key"] = msg.Key
2✔
378
                }
2✔
379

380
                if len(msg.Value) > 0 {
20✔
381
                        message["value"] = msg.Value
10✔
382
                }
10✔
383

384
                messages = append(messages, message)
10✔
385
        }
386

387
        k.reportReaderStats(reader.Stats())
10✔
388
        return messages
10✔
389
}
390

391
// reportReaderStats reports the reader stats
392
// nolint:funlen
393
func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) {
12✔
394
        state := k.vu.State()
12✔
395
        if state == nil {
12✔
396
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
397
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
398
        }
×
399

400
        ctx := k.vu.Context()
12✔
401
        if ctx == nil {
12✔
402
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
403
                logger.WithField("error", err).Info(err)
×
404
                common.Throw(k.vu.Runtime(), err)
×
405
        }
×
406

407
        ctm := k.vu.State().Tags.GetCurrentValues()
12✔
408
        sampleTags := ctm.Tags.With("topic", currentStats.Topic)
12✔
409
        sampleTags = sampleTags.With("clientid", currentStats.ClientID)
12✔
410
        sampleTags = sampleTags.With("partition", currentStats.Partition)
12✔
411

12✔
412
        now := time.Now()
12✔
413
        metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{
12✔
414
                Samples: []metrics.Sample{
12✔
415
                        {
12✔
416
                                Time: now,
12✔
417
                                TimeSeries: metrics.TimeSeries{
12✔
418
                                        Metric: k.metrics.ReaderDials,
12✔
419
                                        Tags:   sampleTags,
12✔
420
                                },
12✔
421
                                Value:    float64(currentStats.Dials),
12✔
422
                                Metadata: ctm.Metadata,
12✔
423
                        },
12✔
424
                        {
12✔
425
                                Time: now,
12✔
426
                                TimeSeries: metrics.TimeSeries{
12✔
427
                                        Metric: k.metrics.ReaderFetches,
12✔
428
                                        Tags:   sampleTags,
12✔
429
                                },
12✔
430
                                Value:    float64(currentStats.Fetches),
12✔
431
                                Metadata: ctm.Metadata,
12✔
432
                        },
12✔
433
                        {
12✔
434
                                Time: now,
12✔
435
                                TimeSeries: metrics.TimeSeries{
12✔
436
                                        Metric: k.metrics.ReaderMessages,
12✔
437
                                        Tags:   sampleTags,
12✔
438
                                },
12✔
439
                                Value:    float64(currentStats.Messages),
12✔
440
                                Metadata: ctm.Metadata,
12✔
441
                        },
12✔
442
                        {
12✔
443
                                Time: now,
12✔
444
                                TimeSeries: metrics.TimeSeries{
12✔
445
                                        Metric: k.metrics.ReaderBytes,
12✔
446
                                        Tags:   sampleTags,
12✔
447
                                },
12✔
448
                                Value:    float64(currentStats.Bytes),
12✔
449
                                Metadata: ctm.Metadata,
12✔
450
                        },
12✔
451
                        {
12✔
452
                                Time: now,
12✔
453
                                TimeSeries: metrics.TimeSeries{
12✔
454
                                        Metric: k.metrics.ReaderRebalances,
12✔
455
                                        Tags:   sampleTags,
12✔
456
                                },
12✔
457
                                Value:    float64(currentStats.Rebalances),
12✔
458
                                Metadata: ctm.Metadata,
12✔
459
                        },
12✔
460
                        {
12✔
461
                                Time: now,
12✔
462
                                TimeSeries: metrics.TimeSeries{
12✔
463
                                        Metric: k.metrics.ReaderTimeouts,
12✔
464
                                        Tags:   sampleTags,
12✔
465
                                },
12✔
466
                                Value:    float64(currentStats.Timeouts),
12✔
467
                                Metadata: ctm.Metadata,
12✔
468
                        },
12✔
469
                        {
12✔
470
                                Time: now,
12✔
471
                                TimeSeries: metrics.TimeSeries{
12✔
472
                                        Metric: k.metrics.ReaderErrors,
12✔
473
                                        Tags:   sampleTags,
12✔
474
                                },
12✔
475
                                Value:    float64(currentStats.Errors),
12✔
476
                                Metadata: ctm.Metadata,
12✔
477
                        },
12✔
478
                        {
12✔
479
                                Time: now,
12✔
480
                                TimeSeries: metrics.TimeSeries{
12✔
481
                                        Metric: k.metrics.ReaderDialTime,
12✔
482
                                        Tags:   sampleTags,
12✔
483
                                },
12✔
484
                                Value:    metrics.D(currentStats.DialTime.Avg),
12✔
485
                                Metadata: ctm.Metadata,
12✔
486
                        },
12✔
487
                        {
12✔
488
                                Time: now,
12✔
489
                                TimeSeries: metrics.TimeSeries{
12✔
490
                                        Metric: k.metrics.ReaderReadTime,
12✔
491
                                        Tags:   sampleTags,
12✔
492
                                },
12✔
493
                                Value:    metrics.D(currentStats.ReadTime.Avg),
12✔
494
                                Metadata: ctm.Metadata,
12✔
495
                        },
12✔
496
                        {
12✔
497
                                Time: now,
12✔
498
                                TimeSeries: metrics.TimeSeries{
12✔
499
                                        Metric: k.metrics.ReaderWaitTime,
12✔
500
                                        Tags:   sampleTags,
12✔
501
                                },
12✔
502
                                Value:    metrics.D(currentStats.WaitTime.Avg),
12✔
503
                                Metadata: ctm.Metadata,
12✔
504
                        },
12✔
505
                        {
12✔
506
                                Time: now,
12✔
507
                                TimeSeries: metrics.TimeSeries{
12✔
508
                                        Metric: k.metrics.ReaderFetchSize,
12✔
509
                                        Tags:   sampleTags,
12✔
510
                                },
12✔
511
                                Value:    float64(currentStats.FetchSize.Avg),
12✔
512
                                Metadata: ctm.Metadata,
12✔
513
                        },
12✔
514
                        {
12✔
515
                                Time: now,
12✔
516
                                TimeSeries: metrics.TimeSeries{
12✔
517
                                        Metric: k.metrics.ReaderFetchBytes,
12✔
518
                                        Tags:   sampleTags,
12✔
519
                                },
12✔
520
                                Value:    float64(currentStats.FetchBytes.Min),
12✔
521
                                Metadata: ctm.Metadata,
12✔
522
                        },
12✔
523
                        {
12✔
524
                                Time: now,
12✔
525
                                TimeSeries: metrics.TimeSeries{
12✔
526
                                        Metric: k.metrics.ReaderFetchBytes,
12✔
527
                                        Tags:   sampleTags,
12✔
528
                                },
12✔
529
                                Value:    float64(currentStats.FetchBytes.Max),
12✔
530
                                Metadata: ctm.Metadata,
12✔
531
                        },
12✔
532
                        {
12✔
533
                                Time: now,
12✔
534
                                TimeSeries: metrics.TimeSeries{
12✔
535
                                        Metric: k.metrics.ReaderOffset,
12✔
536
                                        Tags:   sampleTags,
12✔
537
                                },
12✔
538
                                Value:    float64(currentStats.Offset),
12✔
539
                                Metadata: ctm.Metadata,
12✔
540
                        },
12✔
541
                        {
12✔
542
                                Time: now,
12✔
543
                                TimeSeries: metrics.TimeSeries{
12✔
544
                                        Metric: k.metrics.ReaderLag,
12✔
545
                                        Tags:   sampleTags,
12✔
546
                                },
12✔
547
                                Value:    float64(currentStats.Lag),
12✔
548
                                Metadata: ctm.Metadata,
12✔
549
                        },
12✔
550
                        {
12✔
551
                                Time: now,
12✔
552
                                TimeSeries: metrics.TimeSeries{
12✔
553
                                        Metric: k.metrics.ReaderMinBytes,
12✔
554
                                        Tags:   sampleTags,
12✔
555
                                },
12✔
556
                                Value:    float64(currentStats.MinBytes),
12✔
557
                                Metadata: ctm.Metadata,
12✔
558
                        },
12✔
559
                        {
12✔
560
                                Time: now,
12✔
561
                                TimeSeries: metrics.TimeSeries{
12✔
562
                                        Metric: k.metrics.ReaderMaxBytes,
12✔
563
                                        Tags:   sampleTags,
12✔
564
                                },
12✔
565
                                Value:    float64(currentStats.MaxBytes),
12✔
566
                                Metadata: ctm.Metadata,
12✔
567
                        },
12✔
568
                        {
12✔
569
                                Time: now,
12✔
570
                                TimeSeries: metrics.TimeSeries{
12✔
571
                                        Metric: k.metrics.ReaderMaxWait,
12✔
572
                                        Tags:   sampleTags,
12✔
573
                                },
12✔
574
                                Value:    metrics.D(currentStats.MaxWait),
12✔
575
                                Metadata: ctm.Metadata,
12✔
576
                        },
12✔
577
                        {
12✔
578
                                Time: now,
12✔
579
                                TimeSeries: metrics.TimeSeries{
12✔
580
                                        Metric: k.metrics.ReaderQueueLength,
12✔
581
                                        Tags:   sampleTags,
12✔
582
                                },
12✔
583
                                Value:    float64(currentStats.QueueLength),
12✔
584
                                Metadata: ctm.Metadata,
12✔
585
                        },
12✔
586
                        {
12✔
587
                                Time: now,
12✔
588
                                TimeSeries: metrics.TimeSeries{
12✔
589
                                        Metric: k.metrics.ReaderQueueCapacity,
12✔
590
                                        Tags:   sampleTags,
12✔
591
                                },
12✔
592
                                Value:    float64(currentStats.QueueCapacity),
12✔
593
                                Metadata: ctm.Metadata,
12✔
594
                        },
12✔
595
                },
12✔
596
                Tags: sampleTags,
12✔
597
                Time: now,
12✔
598
        })
12✔
599
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc