• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mostafa / xk6-kafka / 6955736719

22 Nov 2023 10:05AM UTC coverage: 75.246% (-0.1%) from 75.344%
6955736719

push

github

mostafa
Fix faulty check in assignment to isolation level

1 of 1 new or added line in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

1529 of 2032 relevant lines covered (75.25%)

13.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.97
/reader.go
1
package kafka
2

3
import (
4
        "context"
5
        "encoding/json"
6
        "errors"
7
        "io"
8
        "time"
9

10
        "github.com/dop251/goja"
11
        kafkago "github.com/segmentio/kafka-go"
12
        "go.k6.io/k6/js/common"
13
        "go.k6.io/k6/metrics"
14
)
15

16
var (
17
        // Group balancers.
18
        groupBalancerRange        = "group_balancer_range"
19
        groupBalancerRoundRobin   = "group_balancer_round_robin"
20
        groupBalancerRackAffinity = "group_balancer_rack_affinity"
21

22
        GroupBalancers map[string]kafkago.GroupBalancer
23

24
        // Isolation levels.
25
        isolationLevelReadUncommitted = "isolation_level_read_uncommitted"
26
        isolationLevelReadCommitted   = "isolation_level_read_committed"
27

28
        IsolationLevels map[string]kafkago.IsolationLevel
29

30
        // Start offsets.
31
        lastOffset  = "start_offset_last_offset"
32
        firstOffset = "start_offset_first_offset"
33

34
        StartOffsets map[string]int64
35

36
        RebalanceTimeout       = time.Second * 5
37
        HeartbeatInterval      = time.Second * 3
38
        SessionTimeout         = time.Second * 30
39
        PartitionWatchInterval = time.Second * 5
40
        JoinGroupBackoff       = time.Second * 5
41
        RetentionTime          = time.Hour * 24
42
)
43

44
type ReaderConfig struct {
45
        WatchPartitionChanges  bool          `json:"watchPartitionChanges"`
46
        ConnectLogger          bool          `json:"connectLogger"`
47
        Partition              int           `json:"partition"`
48
        QueueCapacity          int           `json:"queueCapacity"`
49
        MinBytes               int           `json:"minBytes"`
50
        MaxBytes               int           `json:"maxBytes"`
51
        MaxAttempts            int           `json:"maxAttempts"`
52
        GroupID                string        `json:"groupId"`
53
        Topic                  string        `json:"topic"`
54
        IsolationLevel         string        `json:"isolationLevel"`
55
        StartOffset            string        `json:"startOffset"`
56
        Offset                 int64         `json:"offset"`
57
        Brokers                []string      `json:"brokers"`
58
        GroupTopics            []string      `json:"groupTopics"`
59
        GroupBalancers         []string      `json:"groupBalancers"`
60
        MaxWait                Duration      `json:"maxWait"`
61
        ReadBatchTimeout       time.Duration `json:"readBatchTimeout"`
62
        ReadLagInterval        time.Duration `json:"readLagInterval"`
63
        HeartbeatInterval      time.Duration `json:"heartbeatInterval"`
64
        CommitInterval         time.Duration `json:"commitInterval"`
65
        PartitionWatchInterval time.Duration `json:"partitionWatchInterval"`
66
        SessionTimeout         time.Duration `json:"sessionTimeout"`
67
        RebalanceTimeout       time.Duration `json:"rebalanceTimeout"`
68
        JoinGroupBackoff       time.Duration `json:"joinGroupBackoff"`
69
        RetentionTime          time.Duration `json:"retentionTime"`
70
        ReadBackoffMin         time.Duration `json:"readBackoffMin"`
71
        ReadBackoffMax         time.Duration `json:"readBackoffMax"`
72
        OffsetOutOfRangeError  bool          `json:"offsetOutOfRangeError"` // deprecated, do not use
73
        SASL                   SASLConfig    `json:"sasl"`
74
        TLS                    TLSConfig     `json:"tls"`
75
}
76

77
type ConsumeConfig struct {
78
        Limit int64 `json:"limit"`
79
}
80

81
type Duration struct {
82
        time.Duration
83
}
84

85
func (d Duration) MarshalJSON() ([]byte, error) {
×
86
        return json.Marshal(d.String())
×
87
}
×
88

89
func (d *Duration) UnmarshalJSON(b []byte) error {
1✔
90
        var v interface{}
1✔
91
        if err := json.Unmarshal(b, &v); err != nil {
1✔
92
                return err
×
93
        }
×
94

95
        switch value := v.(type) {
1✔
96
        case string:
1✔
97
                var err error
1✔
98
                d.Duration, err = time.ParseDuration(value)
1✔
99
                if err != nil {
1✔
100
                        return err
×
101
                }
×
102
                return nil
1✔
103
        default:
×
104
                return errors.New("invalid duration")
×
105
        }
106
}
107

108
// readerClass is a wrapper around kafkago.reader and acts as a JS constructor
109
// for this extension, thus it must be called with new operator, e.g. new Reader(...).
110
// nolint: funlen
111
func (k *Kafka) readerClass(call goja.ConstructorCall) *goja.Object {
1✔
112
        runtime := k.vu.Runtime()
1✔
113
        var readerConfig *ReaderConfig
1✔
114
        if len(call.Arguments) == 0 {
1✔
115
                common.Throw(runtime, ErrNotEnoughArguments)
×
116
        }
×
117

118
        if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
119
                if b, err := json.Marshal(params); err != nil {
1✔
120
                        common.Throw(runtime, err)
×
121
                } else {
1✔
122
                        if err = json.Unmarshal(b, &readerConfig); err != nil {
1✔
123
                                common.Throw(runtime, err)
×
124
                        }
×
125
                }
126
        }
127

128
        reader := k.reader(readerConfig)
1✔
129

1✔
130
        readerObject := runtime.NewObject()
1✔
131
        // This is the reader object itself
1✔
132
        if err := readerObject.Set("This", reader); err != nil {
1✔
133
                common.Throw(runtime, err)
×
134
        }
×
135

136
        err := readerObject.Set("consume", func(call goja.FunctionCall) goja.Value {
2✔
137
                var consumeConfig *ConsumeConfig
1✔
138
                if len(call.Arguments) == 0 {
1✔
139
                        common.Throw(runtime, ErrNotEnoughArguments)
×
140
                }
×
141

142
                if params, ok := call.Argument(0).Export().(map[string]interface{}); ok {
2✔
143
                        if b, err := json.Marshal(params); err != nil {
1✔
144
                                common.Throw(runtime, err)
×
145
                        } else {
1✔
146
                                if err = json.Unmarshal(b, &consumeConfig); err != nil {
1✔
147
                                        common.Throw(runtime, err)
×
148
                                }
×
149
                        }
150
                }
151

152
                return runtime.ToValue(k.consume(reader, consumeConfig))
1✔
153
        })
154
        if err != nil {
1✔
155
                common.Throw(runtime, err)
×
156
        }
×
157

158
        // This is unnecessary, but it's here for reference purposes
159
        err = readerObject.Set("close", func(call goja.FunctionCall) goja.Value {
2✔
160
                if err := reader.Close(); err != nil {
1✔
161
                        common.Throw(runtime, err)
×
162
                }
×
163

164
                return goja.Undefined()
1✔
165
        })
166
        if err != nil {
1✔
167
                common.Throw(runtime, err)
×
168
        }
×
169

170
        freeze(readerObject)
1✔
171

1✔
172
        return runtime.ToValue(readerObject).ToObject(runtime)
1✔
173
}
174

175
// reader creates a Kafka reader with the given configuration
176
// nolint: funlen
177
func (k *Kafka) reader(readerConfig *ReaderConfig) *kafkago.Reader {
7✔
178
        dialer, err := GetDialer(readerConfig.SASL, readerConfig.TLS)
7✔
179
        if err != nil {
7✔
180
                if err.Unwrap() != nil {
×
181
                        logger.WithField("error", err).Error(err)
×
182
                }
×
183
                common.Throw(k.vu.Runtime(), err)
×
184
        }
185

186
        if readerConfig.Partition != 0 && readerConfig.GroupID != "" {
7✔
187
                common.Throw(k.vu.Runtime(), ErrPartitionAndGroupID)
×
188
        }
×
189

190
        if readerConfig.Topic != "" && readerConfig.GroupID != "" {
7✔
191
                common.Throw(k.vu.Runtime(), ErrTopicAndGroupID)
×
192
        }
×
193

194
        if readerConfig.GroupID != "" &&
7✔
195
                len(readerConfig.GroupTopics) >= 0 &&
7✔
196
                readerConfig.HeartbeatInterval == 0 {
7✔
197
                readerConfig.HeartbeatInterval = HeartbeatInterval
×
198
        }
×
199

200
        if readerConfig.GroupID != "" && readerConfig.SessionTimeout == 0 {
7✔
201
                readerConfig.SessionTimeout = SessionTimeout
×
202
        }
×
203

204
        if readerConfig.GroupID != "" && readerConfig.RebalanceTimeout == 0 {
7✔
205
                readerConfig.RebalanceTimeout = RebalanceTimeout
×
206
        }
×
207

208
        if readerConfig.GroupID != "" && readerConfig.JoinGroupBackoff == 0 {
7✔
209
                readerConfig.JoinGroupBackoff = JoinGroupBackoff
×
210
        }
×
211

212
        if readerConfig.GroupID != "" && readerConfig.PartitionWatchInterval == 0 {
7✔
213
                readerConfig.PartitionWatchInterval = PartitionWatchInterval
×
214
        }
×
215

216
        if readerConfig.GroupID != "" && readerConfig.RetentionTime == 0 {
7✔
217
                readerConfig.RetentionTime = RetentionTime
×
218
        }
×
219

220
        var groupBalancers []kafkago.GroupBalancer
7✔
221
        if readerConfig.GroupID != "" {
7✔
222
                groupBalancers = make([]kafkago.GroupBalancer, 0, len(readerConfig.GroupBalancers))
×
223
                for _, balancer := range readerConfig.GroupBalancers {
×
224
                        if b, ok := GroupBalancers[balancer]; ok {
×
225
                                groupBalancers = append(groupBalancers, b)
×
226
                        }
×
227
                }
228
                if len(groupBalancers) == 0 {
×
229
                        // Default to [Range, RoundRobin] if no balancer is specified
×
230
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRange])
×
231
                        groupBalancers = append(groupBalancers, GroupBalancers[groupBalancerRoundRobin])
×
232
                }
×
233
        }
234

235
        isolationLevel := IsolationLevels[isolationLevelReadUncommitted]
7✔
236
        if readerConfig.IsolationLevel != "" {
7✔
UNCOV
237
                isolationLevel = IsolationLevels[readerConfig.IsolationLevel]
×
UNCOV
238
        }
×
239

240
        var startOffset int64
7✔
241
        if readerConfig.GroupID != "" && readerConfig.StartOffset != "" {
7✔
242
                startOffset = StartOffsets[firstOffset] // Default to FirstOffset
×
243
                if s, ok := StartOffsets[readerConfig.StartOffset]; ok {
×
244
                        startOffset = s
×
245
                }
×
246
        }
247

248
        consolidatedConfig := kafkago.ReaderConfig{
7✔
249
                Brokers:                readerConfig.Brokers,
7✔
250
                GroupID:                readerConfig.GroupID,
7✔
251
                GroupTopics:            readerConfig.GroupTopics,
7✔
252
                Topic:                  readerConfig.Topic,
7✔
253
                Partition:              readerConfig.Partition,
7✔
254
                QueueCapacity:          readerConfig.QueueCapacity,
7✔
255
                MinBytes:               readerConfig.MinBytes,
7✔
256
                MaxBytes:               readerConfig.MaxBytes,
7✔
257
                MaxWait:                readerConfig.MaxWait.Duration,
7✔
258
                ReadBatchTimeout:       readerConfig.ReadBatchTimeout,
7✔
259
                ReadLagInterval:        readerConfig.ReadLagInterval,
7✔
260
                GroupBalancers:         groupBalancers,
7✔
261
                HeartbeatInterval:      readerConfig.HeartbeatInterval,
7✔
262
                CommitInterval:         readerConfig.CommitInterval,
7✔
263
                PartitionWatchInterval: readerConfig.PartitionWatchInterval,
7✔
264
                WatchPartitionChanges:  readerConfig.WatchPartitionChanges,
7✔
265
                SessionTimeout:         readerConfig.SessionTimeout,
7✔
266
                RebalanceTimeout:       readerConfig.RebalanceTimeout,
7✔
267
                JoinGroupBackoff:       readerConfig.JoinGroupBackoff,
7✔
268
                RetentionTime:          readerConfig.RetentionTime,
7✔
269
                StartOffset:            startOffset,
7✔
270
                ReadBackoffMin:         readerConfig.ReadBackoffMin,
7✔
271
                ReadBackoffMax:         readerConfig.ReadBackoffMax,
7✔
272
                IsolationLevel:         isolationLevel,
7✔
273
                MaxAttempts:            readerConfig.MaxAttempts,
7✔
274
                OffsetOutOfRangeError:  readerConfig.OffsetOutOfRangeError,
7✔
275
                Dialer:                 dialer,
7✔
276
        }
7✔
277

7✔
278
        if readerConfig.ConnectLogger {
7✔
279
                consolidatedConfig.Logger = logger
×
280
        }
×
281

282
        reader := kafkago.NewReader(consolidatedConfig)
7✔
283

7✔
284
        if readerConfig.Offset > 0 {
9✔
285
                if readerConfig.GroupID == "" {
4✔
286
                        if err := reader.SetOffset(readerConfig.Offset); err != nil {
2✔
287
                                wrappedError := NewXk6KafkaError(
×
288
                                        failedSetOffset, "Unable to set offset, yet returning the reader.", err)
×
289
                                logger.WithField("error", wrappedError).Warn(wrappedError)
×
290
                                return reader
×
291
                        }
×
292
                } else {
×
293
                        err := NewXk6KafkaError(
×
294
                                failedSetOffset, "Offset and groupID are mutually exclusive options, "+
×
295
                                        "so offset is not set, yet returning the reader.", nil)
×
296
                        logger.WithField("error", err).Warn(err)
×
297
                        return reader
×
298
                }
×
299
        }
300

301
        return reader
7✔
302
}
303

304
// consume consumes messages from the given reader.
305
// nolint: funlen
306
func (k *Kafka) consume(
307
        reader *kafkago.Reader, consumeConfig *ConsumeConfig,
308
) []map[string]interface{} {
12✔
309
        if state := k.vu.State(); state == nil {
12✔
310
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
311
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
312
        }
×
313

314
        var ctx context.Context
12✔
315
        if ctx = k.vu.Context(); ctx == nil {
12✔
316
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
317
                logger.WithField("error", err).Info(err)
×
318
                common.Throw(k.vu.Runtime(), err)
×
319
        }
×
320

321
        if consumeConfig.Limit <= 0 {
12✔
322
                consumeConfig.Limit = 1
×
323
        }
×
324

325
        messages := make([]map[string]interface{}, 0)
12✔
326

12✔
327
        maxWait := reader.Config().MaxWait
12✔
328

12✔
329
        for i := int64(0); i < consumeConfig.Limit; i++ {
24✔
330
                ctxWithTimeout, cancel := context.WithTimeout(ctx, maxWait)
12✔
331
                msg, err := reader.ReadMessage(ctxWithTimeout)
12✔
332
                cancel()
12✔
333
                if errors.Is(err, io.EOF) {
12✔
334
                        k.reportReaderStats(reader.Stats())
×
335

×
336
                        err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil)
×
337
                        logger.WithField("error", err).Info(err)
×
338
                        return messages
×
339
                }
×
340

341
                if err != nil {
14✔
342
                        k.reportReaderStats(reader.Stats())
2✔
343

2✔
344
                        err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil)
2✔
345
                        logger.WithField("error", err).Error(err)
2✔
346
                        return messages
2✔
347
                }
2✔
348

349
                // Rest of the fields of a given message
350
                message := map[string]interface{}{
10✔
351
                        "topic":         msg.Topic,
10✔
352
                        "partition":     msg.Partition,
10✔
353
                        "offset":        msg.Offset,
10✔
354
                        "time":          time.Unix(msg.Time.Unix(), 0).Format(time.RFC3339),
10✔
355
                        "highWaterMark": msg.HighWaterMark,
10✔
356
                        "headers":       make(map[string]interface{}),
10✔
357
                }
10✔
358

10✔
359
                if headers, ok := message["headers"].(map[string]interface{}); ok {
20✔
360
                        for _, header := range msg.Headers {
10✔
361
                                headers[header.Key] = header.Value
×
362
                        }
×
363
                } else {
×
364
                        err = NewXk6KafkaError(failedTypeCast, "Failed to cast to map.", nil)
×
365
                        logger.WithField("error", err).Error(err)
×
366
                }
×
367

368
                if len(msg.Key) > 0 {
12✔
369
                        message["key"] = msg.Key
2✔
370
                }
2✔
371

372
                if len(msg.Value) > 0 {
20✔
373
                        message["value"] = msg.Value
10✔
374
                }
10✔
375

376
                messages = append(messages, message)
10✔
377
        }
378

379
        k.reportReaderStats(reader.Stats())
10✔
380
        return messages
10✔
381
}
382

383
// reportReaderStats reports the reader stats
384
// nolint:funlen
385
func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) {
12✔
386
        state := k.vu.State()
12✔
387
        if state == nil {
12✔
388
                logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext)
×
389
                common.Throw(k.vu.Runtime(), ErrForbiddenInInitContext)
×
390
        }
×
391

392
        ctx := k.vu.Context()
12✔
393
        if ctx == nil {
12✔
394
                err := NewXk6KafkaError(noContextError, "No context.", nil)
×
395
                logger.WithField("error", err).Info(err)
×
396
                common.Throw(k.vu.Runtime(), err)
×
397
        }
×
398

399
        ctm := k.vu.State().Tags.GetCurrentValues()
12✔
400
        sampleTags := ctm.Tags.With("topic", currentStats.Topic)
12✔
401
        sampleTags = sampleTags.With("clientid", currentStats.ClientID)
12✔
402
        sampleTags = sampleTags.With("partition", currentStats.Partition)
12✔
403

12✔
404
        now := time.Now()
12✔
405
        metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{
12✔
406
                Samples: []metrics.Sample{
12✔
407
                        {
12✔
408
                                Time: now,
12✔
409
                                TimeSeries: metrics.TimeSeries{
12✔
410
                                        Metric: k.metrics.ReaderDials,
12✔
411
                                        Tags:   sampleTags,
12✔
412
                                },
12✔
413
                                Value:    float64(currentStats.Dials),
12✔
414
                                Metadata: ctm.Metadata,
12✔
415
                        },
12✔
416
                        {
12✔
417
                                Time: now,
12✔
418
                                TimeSeries: metrics.TimeSeries{
12✔
419
                                        Metric: k.metrics.ReaderFetches,
12✔
420
                                        Tags:   sampleTags,
12✔
421
                                },
12✔
422
                                Value:    float64(currentStats.Fetches),
12✔
423
                                Metadata: ctm.Metadata,
12✔
424
                        },
12✔
425
                        {
12✔
426
                                Time: now,
12✔
427
                                TimeSeries: metrics.TimeSeries{
12✔
428
                                        Metric: k.metrics.ReaderMessages,
12✔
429
                                        Tags:   sampleTags,
12✔
430
                                },
12✔
431
                                Value:    float64(currentStats.Messages),
12✔
432
                                Metadata: ctm.Metadata,
12✔
433
                        },
12✔
434
                        {
12✔
435
                                Time: now,
12✔
436
                                TimeSeries: metrics.TimeSeries{
12✔
437
                                        Metric: k.metrics.ReaderBytes,
12✔
438
                                        Tags:   sampleTags,
12✔
439
                                },
12✔
440
                                Value:    float64(currentStats.Bytes),
12✔
441
                                Metadata: ctm.Metadata,
12✔
442
                        },
12✔
443
                        {
12✔
444
                                Time: now,
12✔
445
                                TimeSeries: metrics.TimeSeries{
12✔
446
                                        Metric: k.metrics.ReaderRebalances,
12✔
447
                                        Tags:   sampleTags,
12✔
448
                                },
12✔
449
                                Value:    float64(currentStats.Rebalances),
12✔
450
                                Metadata: ctm.Metadata,
12✔
451
                        },
12✔
452
                        {
12✔
453
                                Time: now,
12✔
454
                                TimeSeries: metrics.TimeSeries{
12✔
455
                                        Metric: k.metrics.ReaderTimeouts,
12✔
456
                                        Tags:   sampleTags,
12✔
457
                                },
12✔
458
                                Value:    float64(currentStats.Timeouts),
12✔
459
                                Metadata: ctm.Metadata,
12✔
460
                        },
12✔
461
                        {
12✔
462
                                Time: now,
12✔
463
                                TimeSeries: metrics.TimeSeries{
12✔
464
                                        Metric: k.metrics.ReaderErrors,
12✔
465
                                        Tags:   sampleTags,
12✔
466
                                },
12✔
467
                                Value:    float64(currentStats.Errors),
12✔
468
                                Metadata: ctm.Metadata,
12✔
469
                        },
12✔
470
                        {
12✔
471
                                Time: now,
12✔
472
                                TimeSeries: metrics.TimeSeries{
12✔
473
                                        Metric: k.metrics.ReaderDialTime,
12✔
474
                                        Tags:   sampleTags,
12✔
475
                                },
12✔
476
                                Value:    metrics.D(currentStats.DialTime.Avg),
12✔
477
                                Metadata: ctm.Metadata,
12✔
478
                        },
12✔
479
                        {
12✔
480
                                Time: now,
12✔
481
                                TimeSeries: metrics.TimeSeries{
12✔
482
                                        Metric: k.metrics.ReaderReadTime,
12✔
483
                                        Tags:   sampleTags,
12✔
484
                                },
12✔
485
                                Value:    metrics.D(currentStats.ReadTime.Avg),
12✔
486
                                Metadata: ctm.Metadata,
12✔
487
                        },
12✔
488
                        {
12✔
489
                                Time: now,
12✔
490
                                TimeSeries: metrics.TimeSeries{
12✔
491
                                        Metric: k.metrics.ReaderWaitTime,
12✔
492
                                        Tags:   sampleTags,
12✔
493
                                },
12✔
494
                                Value:    metrics.D(currentStats.WaitTime.Avg),
12✔
495
                                Metadata: ctm.Metadata,
12✔
496
                        },
12✔
497
                        {
12✔
498
                                Time: now,
12✔
499
                                TimeSeries: metrics.TimeSeries{
12✔
500
                                        Metric: k.metrics.ReaderFetchSize,
12✔
501
                                        Tags:   sampleTags,
12✔
502
                                },
12✔
503
                                Value:    float64(currentStats.FetchSize.Avg),
12✔
504
                                Metadata: ctm.Metadata,
12✔
505
                        },
12✔
506
                        {
12✔
507
                                Time: now,
12✔
508
                                TimeSeries: metrics.TimeSeries{
12✔
509
                                        Metric: k.metrics.ReaderFetchBytes,
12✔
510
                                        Tags:   sampleTags,
12✔
511
                                },
12✔
512
                                Value:    float64(currentStats.FetchBytes.Min),
12✔
513
                                Metadata: ctm.Metadata,
12✔
514
                        },
12✔
515
                        {
12✔
516
                                Time: now,
12✔
517
                                TimeSeries: metrics.TimeSeries{
12✔
518
                                        Metric: k.metrics.ReaderFetchBytes,
12✔
519
                                        Tags:   sampleTags,
12✔
520
                                },
12✔
521
                                Value:    float64(currentStats.FetchBytes.Max),
12✔
522
                                Metadata: ctm.Metadata,
12✔
523
                        },
12✔
524
                        {
12✔
525
                                Time: now,
12✔
526
                                TimeSeries: metrics.TimeSeries{
12✔
527
                                        Metric: k.metrics.ReaderOffset,
12✔
528
                                        Tags:   sampleTags,
12✔
529
                                },
12✔
530
                                Value:    float64(currentStats.Offset),
12✔
531
                                Metadata: ctm.Metadata,
12✔
532
                        },
12✔
533
                        {
12✔
534
                                Time: now,
12✔
535
                                TimeSeries: metrics.TimeSeries{
12✔
536
                                        Metric: k.metrics.ReaderLag,
12✔
537
                                        Tags:   sampleTags,
12✔
538
                                },
12✔
539
                                Value:    float64(currentStats.Lag),
12✔
540
                                Metadata: ctm.Metadata,
12✔
541
                        },
12✔
542
                        {
12✔
543
                                Time: now,
12✔
544
                                TimeSeries: metrics.TimeSeries{
12✔
545
                                        Metric: k.metrics.ReaderMinBytes,
12✔
546
                                        Tags:   sampleTags,
12✔
547
                                },
12✔
548
                                Value:    float64(currentStats.MinBytes),
12✔
549
                                Metadata: ctm.Metadata,
12✔
550
                        },
12✔
551
                        {
12✔
552
                                Time: now,
12✔
553
                                TimeSeries: metrics.TimeSeries{
12✔
554
                                        Metric: k.metrics.ReaderMaxBytes,
12✔
555
                                        Tags:   sampleTags,
12✔
556
                                },
12✔
557
                                Value:    float64(currentStats.MaxBytes),
12✔
558
                                Metadata: ctm.Metadata,
12✔
559
                        },
12✔
560
                        {
12✔
561
                                Time: now,
12✔
562
                                TimeSeries: metrics.TimeSeries{
12✔
563
                                        Metric: k.metrics.ReaderMaxWait,
12✔
564
                                        Tags:   sampleTags,
12✔
565
                                },
12✔
566
                                Value:    metrics.D(currentStats.MaxWait),
12✔
567
                                Metadata: ctm.Metadata,
12✔
568
                        },
12✔
569
                        {
12✔
570
                                Time: now,
12✔
571
                                TimeSeries: metrics.TimeSeries{
12✔
572
                                        Metric: k.metrics.ReaderQueueLength,
12✔
573
                                        Tags:   sampleTags,
12✔
574
                                },
12✔
575
                                Value:    float64(currentStats.QueueLength),
12✔
576
                                Metadata: ctm.Metadata,
12✔
577
                        },
12✔
578
                        {
12✔
579
                                Time: now,
12✔
580
                                TimeSeries: metrics.TimeSeries{
12✔
581
                                        Metric: k.metrics.ReaderQueueCapacity,
12✔
582
                                        Tags:   sampleTags,
12✔
583
                                },
12✔
584
                                Value:    float64(currentStats.QueueCapacity),
12✔
585
                                Metadata: ctm.Metadata,
12✔
586
                        },
12✔
587
                },
12✔
588
                Tags: sampleTags,
12✔
589
                Time: now,
12✔
590
        })
12✔
591
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc